protoflow_core/
system.rs

1// This is free and unencumbered software released into the public domain.
2
3use crate::{
4    prelude::{fmt, Arc, Box, Bytes, PhantomData, Rc, String, VecDeque},
5    runtimes::StdRuntime,
6    transports::MpscTransport,
7    types::Any,
8    Block, BlockID, BlockResult, InputPort, InputPortID, Message, OutputPort, OutputPortID, PortID,
9    PortResult, Process, Runtime, Transport,
10};
11
12pub trait SystemBuilding {
13    fn input_any(&self) -> InputPort<Any> {
14        self.input()
15    }
16
17    fn input_bytes(&self) -> InputPort<Bytes> {
18        self.input()
19    }
20
21    fn input_string(&self) -> InputPort<String> {
22        self.input()
23    }
24
25    /// Creates a new input port inside the system.
26    fn input<M: Message + 'static>(&self) -> InputPort<M>;
27
28    fn output_any(&self) -> OutputPort<Any> {
29        self.output()
30    }
31
32    fn output_bytes(&self) -> OutputPort<Bytes> {
33        self.output()
34    }
35
36    fn output_string(&self) -> OutputPort<String> {
37        self.output()
38    }
39
40    /// Creates a new output port inside the system.
41    fn output<M: Message + 'static>(&self) -> OutputPort<M>;
42
43    /// Instantiates a block inside the system.
44    fn block<B: Block + Clone + 'static>(&mut self, block: B) -> B;
45
46    /// Connects two ports of two blocks in the system.
47    ///
48    /// Both ports must be of the same message type.
49    fn connect<M: Message>(&mut self, source: &OutputPort<M>, target: &InputPort<M>) -> bool;
50}
51
52pub trait SystemExecution {
53    /// Executes the system, returning the system process.
54    fn execute(self) -> BlockResult<Rc<dyn Process>>;
55}
56
57/// A system is a collection of blocks that are connected together.
58pub struct System<X: Transport + Default + 'static = MpscTransport> {
59    pub(crate) runtime: Arc<StdRuntime<X>>,
60
61    /// The registered blocks in the system.
62    pub(crate) blocks: VecDeque<Box<dyn Block>>,
63
64    _phantom: PhantomData<X>,
65}
66
67pub type Subsystem<X> = System<X>;
68
69impl fmt::Debug for System {
70    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
71        f.debug_struct("System")
72            .field("blocks", &self.blocks)
73            .finish()
74    }
75}
76
77impl<X: Transport + Default + 'static> System<X> {
78    /// Builds a new system.
79    pub fn build<F: FnOnce(&mut System<X>)>(f: F) -> Self {
80        let transport = X::default();
81        let runtime = StdRuntime::new(transport).unwrap();
82        let mut system = System::new(&runtime);
83        f(&mut system);
84        system
85    }
86
87    /// Instantiates a new system.
88    pub fn new(runtime: &Arc<StdRuntime<X>>) -> Self {
89        Self {
90            runtime: runtime.clone(),
91            blocks: VecDeque::new(),
92            _phantom: PhantomData,
93        }
94    }
95
96    pub fn execute(self) -> BlockResult<Rc<dyn Process>> {
97        let mut runtime = self.runtime.clone();
98        runtime.execute(self)
99    }
100
101    pub fn input<M: Message + 'static>(&self) -> InputPort<M> {
102        InputPort::new(self)
103    }
104
105    pub fn output<M: Message + 'static>(&self) -> OutputPort<M> {
106        OutputPort::new(self)
107    }
108
109    pub fn block<B: Block + Clone + 'static>(&mut self, block: B) -> B {
110        self.add_block(Box::new(block.clone()));
111        block
112    }
113
114    #[doc(hidden)]
115    pub fn add_block(&mut self, block: Box<dyn Block>) -> BlockID {
116        let block_id = BlockID::from(self.blocks.len());
117        self.blocks.push_back(block);
118        block_id
119    }
120
121    #[doc(hidden)]
122    pub fn get_block(&self, block_id: BlockID) -> Option<&Box<dyn Block>> {
123        self.blocks.get(block_id.into())
124    }
125
126    pub fn connect<M: Message>(&self, source: &OutputPort<M>, target: &InputPort<M>) -> bool {
127        self.connect_by_id(PortID::Output(source.id), PortID::Input(target.id))
128            .unwrap()
129    }
130
131    #[doc(hidden)]
132    pub fn connect_by_id(&self, source_id: PortID, target_id: PortID) -> PortResult<bool> {
133        let runtime = self.runtime.as_ref();
134        let transport = runtime.transport.as_ref();
135        transport.connect(
136            OutputPortID(source_id.into()),
137            InputPortID(target_id.into()),
138        )
139    }
140}
141
142impl SystemBuilding for System {
143    fn input<M: Message + 'static>(&self) -> InputPort<M> {
144        System::input(self)
145    }
146
147    fn output<M: Message + 'static>(&self) -> OutputPort<M> {
148        System::output(self)
149    }
150
151    fn block<B: Block + Clone + 'static>(&mut self, block: B) -> B {
152        System::block(self, block)
153    }
154
155    fn connect<M: Message>(&mut self, source: &OutputPort<M>, target: &InputPort<M>) -> bool {
156        System::connect(self, source, target)
157    }
158}
159
160impl SystemExecution for System {
161    fn execute(self) -> BlockResult<Rc<dyn Process>> {
162        System::execute(self)
163    }
164}