protoflow_core/runtimes/
std.rs

1// This is free and unencumbered software released into the public domain.
2
3use crate::{
4    prelude::{
5        Arc, AtomicBool, AtomicUsize, Box, Cow, Duration, Instant, Ordering, Range, Rc, RefCell,
6        ToString, Vec,
7    },
8    transport::Transport,
9    transports::MpscTransport,
10    Block, BlockError, BlockResult, BlockRuntime, Port, Process, ProcessID, Runtime, System,
11};
12
13#[cfg(feature = "std")]
14extern crate std;
15
16#[allow(unused)]
17pub struct StdRuntime<T: Transport = MpscTransport> {
18    pub(crate) transport: Arc<T>,
19    is_alive: AtomicBool,
20    process_id: AtomicUsize,
21}
22
23#[allow(unused)]
24impl<T: Transport> StdRuntime<T> {
25    pub fn new(transport: T) -> Result<Arc<Self>, BlockError> {
26        Ok(Arc::new(Self {
27            transport: Arc::new(transport),
28            is_alive: AtomicBool::new(true),
29            process_id: AtomicUsize::new(1),
30        }))
31    }
32}
33
34impl<T: Transport + 'static> Runtime for Arc<StdRuntime<T>> {
35    fn execute_block(&mut self, block: Box<dyn Block>) -> BlockResult<Rc<dyn Process>> {
36        let block_runtime = Arc::new((*self).clone()) as Arc<dyn BlockRuntime>;
37        let block_process = Rc::new(RunningBlock {
38            id: self.process_id.fetch_add(1, Ordering::SeqCst),
39            runtime: self.clone(),
40            handle: RefCell::new(Some(
41                std::thread::Builder::new()
42                    .name(
43                        block
44                            .name()
45                            .unwrap_or_else(|| Cow::Borrowed("<unnamed>"))
46                            .to_string(),
47                    )
48                    .spawn(move || {
49                        let mut block = block;
50                        std::thread::park();
51                        let block_mut = block.as_mut();
52                        let block_runtime_ref = block_runtime.as_ref();
53                        Block::prepare(block_mut, block_runtime_ref)
54                            .and_then(|_| <dyn Block>::pre_execute(block_mut, block_runtime_ref))
55                            .and_then(|_| Block::execute(block_mut, block_runtime_ref))
56                            .and_then(|_| <dyn Block>::post_execute(block_mut, block_runtime_ref))
57                    })
58                    .unwrap(),
59            )),
60        });
61        block_process
62            .handle
63            .borrow()
64            .as_ref()
65            .unwrap()
66            .thread()
67            .unpark();
68        Ok(block_process)
69    }
70
71    fn execute<X: Transport + Default>(
72        &mut self,
73        mut system: System<X>,
74    ) -> BlockResult<Rc<dyn Process>> {
75        let mut system_process = RunningSystem {
76            id: self.process_id.fetch_add(1, Ordering::SeqCst),
77            runtime: self.clone(),
78            transport: self.transport.clone(),
79            blocks: Vec::new(),
80        };
81
82        while let Some(block) = system.blocks.pop_front() {
83            system_process.blocks.push(self.execute_block(block)?);
84        }
85
86        Ok(Rc::new(system_process))
87    }
88}
89
90impl<T: Transport> BlockRuntime for Arc<StdRuntime<T>> {
91    fn is_alive(&self) -> bool {
92        self.is_alive.load(Ordering::SeqCst)
93    }
94
95    fn sleep_for(&self, duration: Duration) -> BlockResult {
96        #[cfg(feature = "std")]
97        std::thread::sleep(duration);
98        #[cfg(not(feature = "std"))]
99        unimplemented!("std::thread::sleep requires the 'std' feature");
100        Ok(())
101    }
102
103    fn sleep_until(&self, _instant: Instant) -> BlockResult {
104        todo!() // TODO
105    }
106
107    fn wait_for(&self, port: &dyn Port) -> BlockResult {
108        loop {
109            if !self.is_alive() {
110                return Err(BlockError::Terminated);
111            }
112            if port.is_connected() {
113                return Ok(());
114            }
115            self.yield_now()?;
116        }
117    }
118
119    fn yield_now(&self) -> Result<(), BlockError> {
120        #[cfg(feature = "std")]
121        std::thread::yield_now();
122        #[cfg(not(feature = "std"))]
123        unimplemented!("std::thread::yield_now requires the 'std' feature");
124        Ok(())
125    }
126
127    fn random_duration(&self, range: Range<Duration>) -> Duration {
128        #[cfg(all(feature = "std", feature = "rand"))]
129        {
130            use rand::Rng;
131            let mut rng = rand::thread_rng();
132            let low = range.start.as_nanos() as u64;
133            let high = range.end.as_nanos() as u64;
134            Duration::from_nanos(rng.gen_range(low..high))
135        }
136        #[cfg(not(all(feature = "std", feature = "rand")))]
137        {
138            drop(range);
139            let mut _rng = todo!();
140        }
141    }
142}
143
144#[allow(unused)]
145struct RunningBlock<T: Transport> {
146    id: ProcessID,
147    runtime: Arc<StdRuntime<T>>,
148    handle: RefCell<Option<std::thread::JoinHandle<BlockResult>>>,
149}
150
151#[allow(unused)]
152impl<T: Transport> RunningBlock<T> {
153    //fn thread(&self) -> Option<&std::thread::Thread> {
154    //    self.handle.borrow().as_ref().map(|handle| handle.thread())
155    //}
156}
157
158impl<T: Transport> Process for RunningBlock<T> {
159    fn id(&self) -> ProcessID {
160        self.id
161    }
162
163    fn is_alive(&self) -> bool {
164        self.handle
165            .borrow()
166            .as_ref()
167            .map(|handle| !handle.is_finished())
168            .unwrap_or(false)
169    }
170
171    fn join(&self) -> BlockResult {
172        let handle = self.handle.take().unwrap();
173        handle
174            .join()
175            .map_err(<Box<dyn core::any::Any + Send>>::from)?
176    }
177}
178
179#[allow(unused)]
180struct RunningSystem<T: Transport> {
181    id: ProcessID,
182    runtime: Arc<StdRuntime<T>>,
183    transport: Arc<T>,
184    blocks: Vec<Rc<dyn Process>>,
185}
186
187impl<T: Transport> Process for RunningSystem<T> {
188    fn id(&self) -> ProcessID {
189        self.id
190    }
191
192    fn is_alive(&self) -> bool {
193        self.blocks.iter().any(|block| block.is_alive())
194    }
195
196    fn join(&self) -> BlockResult {
197        for block in self.blocks.iter() {
198            block.join()?;
199        }
200        Ok(())
201    }
202}