rs_flow/context/
ctx.rs

1use std::collections::{HashMap, VecDeque};
2use std::sync::Arc;
3
4use crate::context::global::Global;
5
6use crate::component::{Id, Type};
7use crate::error::{FlowError, Result};
8use crate::package::Package;
9use crate::ports::PortId;
10use crate::prelude::Component;
11
12
13///
14/// Provide a interface to send and recieve [Package]'s to/from others [Component]'s
15/// and access to read and modify the global data of the [Flow](crate::flow::Flow).
16/// 
17pub struct Ctx<G: Send + Sync> {
18    pub(crate) id: Id,
19    pub(crate) ty: Type,
20    pub(crate) send: HashMap<PortId, VecDeque<Package>>,
21    pub(crate) receive: HashMap<PortId, VecDeque<Package>>,
22    pub(crate) consumed: bool,
23
24    global: Arc<Global<G>>,
25}
26
27impl<G> Ctx<G> 
28    where G: Send + Sync + 'static
29{
30
31    pub(crate) fn from(component: &Component<G>, global: &Arc<Global<G>>) -> Self {
32        let send = HashMap::from_iter(
33            component.data.outputs().0.iter().map(|port| (port.port, VecDeque::new()))
34        );
35        let receive = HashMap::from_iter(
36            component.data.inputs().0.iter().map(|port| (port.port, VecDeque::new()))
37        );
38        Self {
39            id: component.id,
40            ty: component.ty,
41            send,
42            receive,
43            consumed: false,
44            global: global.clone(),
45        }
46    }
47    
48    /// 
49    /// Recieve a [Package] from a [Port](crate::ports::Port)
50    /// 
51    /// # Panics
52    /// 
53    /// Panic if recieve from a [Input](crate::ports::Inputs) Port that not exist in this [Component]
54    /// 
55    pub fn receive(&mut self, in_port: PortId) -> Option<Package> {
56        let package = self.receive.get_mut(&in_port)
57            .ok_or(FlowError::QueueNotCreated { 
58                component: self.id, port: in_port 
59            })
60            .unwrap()
61            .pop_front();
62
63        self.consumed = true;
64        
65        package
66    }
67    
68    /// Send a [Package] to a [Port](crate::ports::Port), if one [Component] is connected to this port than he
69    /// can recieve that [Package] sent.
70    ///  
71    /// If more than one components is connected in this port, each one recieve a copy of this [Package].
72    ///  
73    /// # Panics
74    /// 
75    /// Panic if send to a [Output](crate::ports::Outputs) Port that not exist in this [Component]
76    /// 
77    pub fn send(&mut self, out_port: PortId, package: Package) {
78        self.send.get_mut(&out_port)
79            .ok_or(FlowError::QueueNotCreated { 
80                component: self.id, port: out_port 
81            })
82            .unwrap()
83            .push_front(package);
84    }
85    
86
87    /// Interface tha provide a way to read the global data of the [Flow](crate::flow::Flow)
88    pub fn with_global<R>(&self, call: impl FnOnce(&G) -> R) -> Result<R> {
89        self.global.with_global(call)
90    }
91    
92    /// Interface tha provide a way to read and modify the global data of the [Flow](crate::flow::Flow)
93    pub fn with_mut_global<R>(&self,  call: impl FnOnce(&mut G) -> R) -> Result<R> {
94        self.global.with_mut_global(call)
95    }
96
97}