1use crate::{
4 prelude::{fmt, Arc, Box, Bytes, PhantomData, Rc, String, VecDeque},
5 runtimes::StdRuntime,
6 transports::MpscTransport,
7 types::Any,
8 Block, BlockID, BlockResult, InputPort, InputPortID, Message, OutputPort, OutputPortID, PortID,
9 PortResult, Process, Runtime, Transport,
10};
11
12pub trait SystemBuilding {
13 fn input_any(&self) -> InputPort<Any> {
14 self.input()
15 }
16
17 fn input_bytes(&self) -> InputPort<Bytes> {
18 self.input()
19 }
20
21 fn input_string(&self) -> InputPort<String> {
22 self.input()
23 }
24
25 fn input<M: Message + 'static>(&self) -> InputPort<M>;
27
28 fn output_any(&self) -> OutputPort<Any> {
29 self.output()
30 }
31
32 fn output_bytes(&self) -> OutputPort<Bytes> {
33 self.output()
34 }
35
36 fn output_string(&self) -> OutputPort<String> {
37 self.output()
38 }
39
40 fn output<M: Message + 'static>(&self) -> OutputPort<M>;
42
43 fn block<B: Block + Clone + 'static>(&mut self, block: B) -> B;
45
46 fn connect<M: Message>(&mut self, source: &OutputPort<M>, target: &InputPort<M>) -> bool;
50}
51
52pub trait SystemExecution {
53 fn execute(self) -> BlockResult<Rc<dyn Process>>;
55}
56
57pub struct System<X: Transport + Default + 'static = MpscTransport> {
59 pub(crate) runtime: Arc<StdRuntime<X>>,
60
61 pub(crate) blocks: VecDeque<Box<dyn Block>>,
63
64 _phantom: PhantomData<X>,
65}
66
67pub type Subsystem<X> = System<X>;
68
69impl fmt::Debug for System {
70 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
71 f.debug_struct("System")
72 .field("blocks", &self.blocks)
73 .finish()
74 }
75}
76
77impl<X: Transport + Default + 'static> System<X> {
78 pub fn build<F: FnOnce(&mut System<X>)>(f: F) -> Self {
80 let transport = X::default();
81 let runtime = StdRuntime::new(transport).unwrap();
82 let mut system = System::new(&runtime);
83 f(&mut system);
84 system
85 }
86
87 pub fn new(runtime: &Arc<StdRuntime<X>>) -> Self {
89 Self {
90 runtime: runtime.clone(),
91 blocks: VecDeque::new(),
92 _phantom: PhantomData,
93 }
94 }
95
96 pub fn execute(self) -> BlockResult<Rc<dyn Process>> {
97 let mut runtime = self.runtime.clone();
98 runtime.execute(self)
99 }
100
101 pub fn input<M: Message + 'static>(&self) -> InputPort<M> {
102 InputPort::new(self)
103 }
104
105 pub fn output<M: Message + 'static>(&self) -> OutputPort<M> {
106 OutputPort::new(self)
107 }
108
109 pub fn block<B: Block + Clone + 'static>(&mut self, block: B) -> B {
110 self.add_block(Box::new(block.clone()));
111 block
112 }
113
114 #[doc(hidden)]
115 pub fn add_block(&mut self, block: Box<dyn Block>) -> BlockID {
116 let block_id = BlockID::from(self.blocks.len());
117 self.blocks.push_back(block);
118 block_id
119 }
120
121 #[doc(hidden)]
122 pub fn get_block(&self, block_id: BlockID) -> Option<&Box<dyn Block>> {
123 self.blocks.get(block_id.into())
124 }
125
126 pub fn connect<M: Message>(&self, source: &OutputPort<M>, target: &InputPort<M>) -> bool {
127 self.connect_by_id(PortID::Output(source.id), PortID::Input(target.id))
128 .unwrap()
129 }
130
131 #[doc(hidden)]
132 pub fn connect_by_id(&self, source_id: PortID, target_id: PortID) -> PortResult<bool> {
133 let runtime = self.runtime.as_ref();
134 let transport = runtime.transport.as_ref();
135 transport.connect(
136 OutputPortID(source_id.into()),
137 InputPortID(target_id.into()),
138 )
139 }
140}
141
142impl SystemBuilding for System {
143 fn input<M: Message + 'static>(&self) -> InputPort<M> {
144 System::input(self)
145 }
146
147 fn output<M: Message + 'static>(&self) -> OutputPort<M> {
148 System::output(self)
149 }
150
151 fn block<B: Block + Clone + 'static>(&mut self, block: B) -> B {
152 System::block(self, block)
153 }
154
155 fn connect<M: Message>(&mut self, source: &OutputPort<M>, target: &InputPort<M>) -> bool {
156 System::connect(self, source, target)
157 }
158}
159
160impl SystemExecution for System {
161 fn execute(self) -> BlockResult<Rc<dyn Process>> {
162 System::execute(self)
163 }
164}