protoflow_blocks/
system.rs1#![allow(dead_code)]
4
5use crate::{
6 prelude::{fmt, Arc, Box, FromStr, Rc, String, ToString},
7 types::{DelayType, Encoding},
8 AllBlocks, Buffer, Const, CoreBlocks, Count, Decode, DecodeJson, Delay, Drop, Encode,
9 EncodeHex, EncodeJson, FlowBlocks, HashBlocks, IoBlocks, MathBlocks, Random, ReadDir, ReadEnv,
10 ReadFile, ReadStdin, SysBlocks, TextBlocks, WriteFile, WriteStderr, WriteStdout,
11};
12use protoflow_core::{
13 Block, BlockID, BlockResult, InputPort, Message, OutputPort, PortID, PortResult, Process,
14 SystemBuilding, SystemExecution,
15};
16
17#[cfg(feature = "hash")]
18use crate::{types::HashAlgorithm, Hash};
19
20type Transport = protoflow_core::transports::MpscTransport;
21type Runtime = protoflow_core::runtimes::StdRuntime<Transport>;
22
23pub struct System(protoflow_core::System<Transport>);
24
25impl System {
26 pub fn run<F: FnOnce(&mut System)>(f: F) -> BlockResult {
28 Self::build(f).execute()?.join()
29 }
30
31 pub fn spawn<F: FnOnce(&mut System)>(f: F) -> BlockResult<Rc<dyn Process>> {
33 Self::build(f).execute()
34 }
35
36 pub fn build<F: FnOnce(&mut System)>(f: F) -> Self {
38 let transport = Transport::default();
39 let runtime = Runtime::new(transport).unwrap();
40 let mut system = System::new(&runtime);
41 f(&mut system);
42 system
43 }
44
45 pub fn new(runtime: &Arc<Runtime>) -> Self {
47 Self(protoflow_core::System::<Transport>::new(runtime))
48 }
49
50 #[doc(hidden)]
51 pub fn add_block(&mut self, block: Box<dyn Block>) -> BlockID {
52 self.0.add_block(block)
53 }
54
55 #[doc(hidden)]
56 pub fn get_block(&self, block_id: BlockID) -> Option<&Box<dyn Block>> {
57 self.0.get_block(block_id)
58 }
59
60 #[doc(hidden)]
61 pub fn connect_by_id(&mut self, source_id: PortID, target_id: PortID) -> PortResult<bool> {
62 self.0.connect_by_id(source_id, target_id)
63 }
64}
65
66impl fmt::Debug for System {
67 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
68 self.0.fmt(f)
69 }
70}
71
72impl SystemExecution for System {
73 fn execute(self) -> BlockResult<Rc<dyn Process>> {
74 self.0.execute()
75 }
76}
77
78impl SystemBuilding for System {
79 fn input<M: Message + 'static>(&self) -> InputPort<M> {
80 self.0.input()
81 }
82
83 fn output<M: Message + 'static>(&self) -> OutputPort<M> {
84 self.0.output()
85 }
86
87 fn block<B: Block + Clone + 'static>(&mut self, block: B) -> B {
88 self.0.block(block)
89 }
90
91 fn connect<M: Message>(&mut self, source: &OutputPort<M>, target: &InputPort<M>) -> bool {
92 self.0.connect(source, target)
93 }
94}
95
96impl AllBlocks for System {}
97
98impl CoreBlocks for System {
99 fn buffer<T: Message + Into<T> + 'static>(&mut self) -> Buffer<T> {
100 self.0.block(Buffer::<T>::with_system(self))
101 }
102
103 fn const_string(&mut self, value: impl ToString) -> Const<String> {
104 self.0
105 .block(Const::<String>::with_system(self, value.to_string()))
106 }
107
108 fn count<T: Message + 'static>(&mut self) -> Count<T> {
109 self.0.block(Count::<T>::with_system(self))
110 }
111
112 fn delay<T: Message + 'static>(&mut self) -> Delay<T> {
113 self.0.block(Delay::<T>::with_system(self, None))
114 }
115
116 fn delay_by<T: Message + 'static>(&mut self, delay: DelayType) -> Delay<T> {
117 self.0.block(Delay::<T>::with_system(self, Some(delay)))
118 }
119
120 fn drop<T: Message + 'static>(&mut self) -> Drop<T> {
121 self.0.block(Drop::<T>::with_system(self))
122 }
123
124 fn random<T: Message + 'static>(&mut self) -> Random<T> {
125 self.0.block(Random::<T>::with_system(self, None))
126 }
127
128 fn random_seeded<T: Message + 'static>(&mut self, seed: Option<u64>) -> Random<T> {
129 self.0.block(Random::<T>::with_system(self, seed))
130 }
131}
132
133impl FlowBlocks for System {}
134
135#[cfg(not(feature = "hash"))]
136impl HashBlocks for System {}
137
138#[cfg(feature = "hash")]
139impl HashBlocks for System {
140 fn hash_blake3(&mut self) -> Hash {
141 self.0
142 .block(Hash::with_system(self, Some(HashAlgorithm::BLAKE3)))
143 }
144}
145
146impl IoBlocks for System {
147 fn decode<T: Message + FromStr + 'static>(&mut self) -> Decode<T> {
148 self.0.block(Decode::<T>::with_system(self, None))
149 }
150
151 fn decode_json(&mut self) -> DecodeJson {
152 self.0.block(DecodeJson::with_system(self))
153 }
154
155 fn decode_with<T: Message + FromStr + 'static>(&mut self, encoding: Encoding) -> Decode<T> {
156 self.0.block(Decode::<T>::with_system(self, Some(encoding)))
157 }
158
159 fn encode<T: Message + ToString + 'static>(&mut self) -> Encode<T> {
160 self.0.block(Encode::<T>::with_system(self, None))
161 }
162
163 fn encode_with<T: Message + ToString + 'static>(&mut self, encoding: Encoding) -> Encode<T> {
164 self.0.block(Encode::<T>::with_system(self, Some(encoding)))
165 }
166
167 fn encode_hex(&mut self) -> EncodeHex {
168 self.0.block(EncodeHex::with_system(self))
169 }
170
171 fn encode_json(&mut self) -> EncodeJson {
172 self.0.block(EncodeJson::with_system(self))
173 }
174}
175
176impl MathBlocks for System {}
177
178#[cfg(not(feature = "std"))]
179impl SysBlocks for System {}
180
181#[cfg(feature = "std")]
182impl SysBlocks for System {
183 fn read_dir(&mut self) -> ReadDir {
184 self.0.block(ReadDir::with_system(self))
185 }
186
187 fn read_env(&mut self) -> ReadEnv {
188 self.0.block(ReadEnv::with_system(self))
189 }
190
191 fn read_file(&mut self) -> ReadFile {
192 self.0.block(ReadFile::with_system(self))
193 }
194
195 fn read_stdin(&mut self) -> ReadStdin {
196 self.0.block(ReadStdin::with_system(self, None))
197 }
198
199 fn write_file(&mut self) -> WriteFile {
200 self.0.block(WriteFile::with_system(self, None))
201 }
202
203 fn write_stderr(&mut self) -> WriteStderr {
204 self.0.block(WriteStderr::with_system(self))
205 }
206
207 fn write_stdout(&mut self) -> WriteStdout {
208 self.0.block(WriteStdout::with_system(self))
209 }
210}
211
212impl TextBlocks for System {}