protoflow_core/runtimes/
std.rs1use crate::{
4 prelude::{
5 Arc, AtomicBool, AtomicUsize, Box, Cow, Duration, Instant, Ordering, Range, Rc, RefCell,
6 ToString, Vec,
7 },
8 transport::Transport,
9 transports::MpscTransport,
10 Block, BlockError, BlockResult, BlockRuntime, Port, Process, ProcessID, Runtime, System,
11};
12
13#[cfg(feature = "std")]
14extern crate std;
15
16#[allow(unused)]
17pub struct StdRuntime<T: Transport = MpscTransport> {
18 pub(crate) transport: Arc<T>,
19 is_alive: AtomicBool,
20 process_id: AtomicUsize,
21}
22
23#[allow(unused)]
24impl<T: Transport> StdRuntime<T> {
25 pub fn new(transport: T) -> Result<Arc<Self>, BlockError> {
26 Ok(Arc::new(Self {
27 transport: Arc::new(transport),
28 is_alive: AtomicBool::new(true),
29 process_id: AtomicUsize::new(1),
30 }))
31 }
32}
33
34impl<T: Transport + 'static> Runtime for Arc<StdRuntime<T>> {
35 fn execute_block(&mut self, block: Box<dyn Block>) -> BlockResult<Rc<dyn Process>> {
36 let block_runtime = Arc::new((*self).clone()) as Arc<dyn BlockRuntime>;
37 let block_process = Rc::new(RunningBlock {
38 id: self.process_id.fetch_add(1, Ordering::SeqCst),
39 runtime: self.clone(),
40 handle: RefCell::new(Some(
41 std::thread::Builder::new()
42 .name(
43 block
44 .name()
45 .unwrap_or_else(|| Cow::Borrowed("<unnamed>"))
46 .to_string(),
47 )
48 .spawn(move || {
49 let mut block = block;
50 std::thread::park();
51 let block_mut = block.as_mut();
52 let block_runtime_ref = block_runtime.as_ref();
53 Block::prepare(block_mut, block_runtime_ref)
54 .and_then(|_| <dyn Block>::pre_execute(block_mut, block_runtime_ref))
55 .and_then(|_| Block::execute(block_mut, block_runtime_ref))
56 .and_then(|_| <dyn Block>::post_execute(block_mut, block_runtime_ref))
57 })
58 .unwrap(),
59 )),
60 });
61 block_process
62 .handle
63 .borrow()
64 .as_ref()
65 .unwrap()
66 .thread()
67 .unpark();
68 Ok(block_process)
69 }
70
71 fn execute<X: Transport + Default>(
72 &mut self,
73 mut system: System<X>,
74 ) -> BlockResult<Rc<dyn Process>> {
75 let mut system_process = RunningSystem {
76 id: self.process_id.fetch_add(1, Ordering::SeqCst),
77 runtime: self.clone(),
78 transport: self.transport.clone(),
79 blocks: Vec::new(),
80 };
81
82 while let Some(block) = system.blocks.pop_front() {
83 system_process.blocks.push(self.execute_block(block)?);
84 }
85
86 Ok(Rc::new(system_process))
87 }
88}
89
90impl<T: Transport> BlockRuntime for Arc<StdRuntime<T>> {
91 fn is_alive(&self) -> bool {
92 self.is_alive.load(Ordering::SeqCst)
93 }
94
95 fn sleep_for(&self, duration: Duration) -> BlockResult {
96 #[cfg(feature = "std")]
97 std::thread::sleep(duration);
98 #[cfg(not(feature = "std"))]
99 unimplemented!("std::thread::sleep requires the 'std' feature");
100 Ok(())
101 }
102
103 fn sleep_until(&self, _instant: Instant) -> BlockResult {
104 todo!() }
106
107 fn wait_for(&self, port: &dyn Port) -> BlockResult {
108 loop {
109 if !self.is_alive() {
110 return Err(BlockError::Terminated);
111 }
112 if port.is_connected() {
113 return Ok(());
114 }
115 self.yield_now()?;
116 }
117 }
118
119 fn yield_now(&self) -> Result<(), BlockError> {
120 #[cfg(feature = "std")]
121 std::thread::yield_now();
122 #[cfg(not(feature = "std"))]
123 unimplemented!("std::thread::yield_now requires the 'std' feature");
124 Ok(())
125 }
126
127 fn random_duration(&self, range: Range<Duration>) -> Duration {
128 #[cfg(all(feature = "std", feature = "rand"))]
129 {
130 use rand::Rng;
131 let mut rng = rand::thread_rng();
132 let low = range.start.as_nanos() as u64;
133 let high = range.end.as_nanos() as u64;
134 Duration::from_nanos(rng.gen_range(low..high))
135 }
136 #[cfg(not(all(feature = "std", feature = "rand")))]
137 {
138 drop(range);
139 let mut _rng = todo!();
140 }
141 }
142}
143
144#[allow(unused)]
145struct RunningBlock<T: Transport> {
146 id: ProcessID,
147 runtime: Arc<StdRuntime<T>>,
148 handle: RefCell<Option<std::thread::JoinHandle<BlockResult>>>,
149}
150
151#[allow(unused)]
152impl<T: Transport> RunningBlock<T> {
153 }
157
158impl<T: Transport> Process for RunningBlock<T> {
159 fn id(&self) -> ProcessID {
160 self.id
161 }
162
163 fn is_alive(&self) -> bool {
164 self.handle
165 .borrow()
166 .as_ref()
167 .map(|handle| !handle.is_finished())
168 .unwrap_or(false)
169 }
170
171 fn join(&self) -> BlockResult {
172 let handle = self.handle.take().unwrap();
173 handle
174 .join()
175 .map_err(<Box<dyn core::any::Any + Send>>::from)?
176 }
177}
178
179#[allow(unused)]
180struct RunningSystem<T: Transport> {
181 id: ProcessID,
182 runtime: Arc<StdRuntime<T>>,
183 transport: Arc<T>,
184 blocks: Vec<Rc<dyn Process>>,
185}
186
187impl<T: Transport> Process for RunningSystem<T> {
188 fn id(&self) -> ProcessID {
189 self.id
190 }
191
192 fn is_alive(&self) -> bool {
193 self.blocks.iter().any(|block| block.is_alive())
194 }
195
196 fn join(&self) -> BlockResult {
197 for block in self.blocks.iter() {
198 block.join()?;
199 }
200 Ok(())
201 }
202}