futuresdr/runtime/
flowgraph.rs

1use async_lock::Mutex;
2use async_lock::MutexGuard;
3use std::fmt::Debug;
4use std::sync::Arc;
5
6use crate::runtime::Block;
7use crate::runtime::BlockId;
8use crate::runtime::BlockPortCtx;
9use crate::runtime::BufferReader;
10use crate::runtime::BufferWriter;
11use crate::runtime::Error;
12use crate::runtime::Kernel;
13use crate::runtime::KernelInterface;
14use crate::runtime::PortId;
15use crate::runtime::WrappedKernel;
16
17/// Reference to a [Block] that was added to the [Flowgraph].
18///
19/// Internally, it keeps an `Arc<Mutex<WrappedKernel<K>>>`, where `K` is the struct implementing
20/// the block.
21pub struct BlockRef<K: Kernel> {
22    id: BlockId,
23    block: Arc<Mutex<WrappedKernel<K>>>,
24}
25impl<K: Kernel> BlockRef<K> {
26    /// Get a mutable, typed handle to [WrappedKernel]
27    ///
28    /// Since [WrappedKernel] implements [Deref](std::ops::Deref) and
29    /// [DerefMut](std::ops::DerefMut), one can directly access the block.
30    pub fn get(&self) -> Result<MutexGuard<'_, WrappedKernel<K>>, Error> {
31        self.block.try_lock().ok_or(Error::LockError)
32    }
33}
34impl<K: Kernel> Clone for BlockRef<K> {
35    fn clone(&self) -> Self {
36        Self {
37            id: self.id,
38            block: self.block.clone(),
39        }
40    }
41}
42impl<K: Kernel> Debug for BlockRef<K> {
43    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
44        f.debug_struct("BlockRef")
45            .field("id", &self.id)
46            .field(
47                "instance_name",
48                &self.block.try_lock().map(|b| {
49                    b.meta
50                        .instance_name()
51                        .map(String::from)
52                        .unwrap_or("<unknown>".to_string())
53                }),
54            )
55            .finish()
56    }
57}
58impl<K: Kernel> From<BlockRef<K>> for BlockId {
59    fn from(value: BlockRef<K>) -> Self {
60        value.id
61    }
62}
63impl<K: Kernel> From<&BlockRef<K>> for BlockId {
64    fn from(value: &BlockRef<K>) -> Self {
65        value.id
66    }
67}
68
69/// The main component of any FutureSDR application.
70///
71/// A [Flowgraph] is composed of a set of blocks and connections between them. It is typically set
72/// up with the [connect](futuresdr::macros::connect) macro. Once it is configure, the [Flowgraph]
73/// is executed on a [Runtime](futuresdr::runtime::Runtime).
74///
75/// ```
76/// use anyhow::Result;
77/// use futuresdr::blocks::Head;
78/// use futuresdr::blocks::NullSink;
79/// use futuresdr::blocks::NullSource;
80/// use futuresdr::prelude::*;
81///
82/// fn main() -> Result<()> {
83///     let mut fg = Flowgraph::new();
84///
85///     let src = NullSource::<u8>::new();
86///     let head = Head::<u8>::new(1234);
87///     let snk = NullSink::<u8>::new();
88///
89///     connect!(fg, src > head > snk);
90///     Runtime::new().run(fg)?;
91///
92///     Ok(())
93/// }
94/// ```
95pub struct Flowgraph {
96    pub(crate) blocks: Vec<Arc<Mutex<dyn Block>>>,
97    pub(crate) stream_edges: Vec<(BlockId, PortId, BlockId, PortId)>,
98    pub(crate) message_edges: Vec<(BlockId, PortId, BlockId, PortId)>,
99}
100
101impl Flowgraph {
102    /// Create a [Flowgraph].
103    pub fn new() -> Flowgraph {
104        Flowgraph {
105            blocks: Vec::new(),
106            stream_edges: vec![],
107            message_edges: vec![],
108        }
109    }
110
111    /// Add a [`Block`] to the [Flowgraph]
112    ///
113    /// The returned reference is typed and can be used to access the block before and after the
114    /// flowgraph ran.
115    ///
116    /// Usually, this is done under the hood by the [connect](futuresdr::macros::connect) macro.
117    ///
118    /// ```
119    /// use anyhow::Result;
120    /// use futuresdr::blocks::Head;
121    /// use futuresdr::blocks::NullSink;
122    /// use futuresdr::blocks::NullSource;
123    /// use futuresdr::prelude::*;
124    ///
125    /// fn main() -> Result<()> {
126    ///     let mut fg = Flowgraph::new();
127    ///
128    ///     let src = NullSource::<u8>::new();
129    ///     let head = Head::<u8>::new(1234);
130    ///     let snk = NullSink::<u8>::new();
131    ///
132    ///     connect!(fg, src > head > snk);
133    ///     Runtime::new().run(fg)?;
134    ///
135    ///     // typed-access to the block
136    ///     let snk = snk.get();
137    ///     let n = snk.n_received();
138    ///     assert_eq!(n, 1234);
139    ///
140    ///     Ok(())
141    /// }
142    /// ```
143    pub fn add_block<K: Kernel + KernelInterface + 'static>(&mut self, block: K) -> BlockRef<K> {
144        let block_id = BlockId(self.blocks.len());
145        let mut b = WrappedKernel::new(block, block_id);
146        let block_name = b.type_name();
147        b.set_instance_name(&format!("{}-{}", block_name, block_id.0));
148        let b = Arc::new(Mutex::new(b));
149        self.blocks.push(b.clone());
150        BlockRef {
151            id: block_id,
152            block: b,
153        }
154    }
155
156    /// Make a stream connection
157    ///
158    /// This is the prefered way to connect stream ports. Usually, this function is not called
159    /// directly but used under-the-hood by the [connect](futuresdr::macros::connect) macro.
160    ///
161    /// ```
162    /// use anyhow::Result;
163    /// use futuresdr::blocks::Head;
164    /// use futuresdr::blocks::NullSink;
165    /// use futuresdr::blocks::NullSource;
166    /// use futuresdr::prelude::*;
167    ///
168    /// fn main() -> Result<()> {
169    ///     let mut fg = Flowgraph::new();
170    ///
171    ///     let src = NullSource::<u8>::new();
172    ///     let head = Head::<u8>::new(1234);
173    ///     let snk = NullSink::<u8>::new();
174    ///
175    ///     // here, it is used under the hood
176    ///     connect!(fg, src > head);
177    ///     // explicit use
178    ///     let snk = fg.add_block(snk);
179    ///     fg.connect_stream(head.get().output(), snk.get().input());
180    ///
181    ///     Runtime::new().run(fg)?;
182    ///     Ok(())
183    /// }
184    /// ```
185    pub fn connect_stream<B: BufferWriter>(&mut self, src_port: &mut B, dst_port: &mut B::Reader) {
186        self.stream_edges.push((
187            src_port.block_id(),
188            src_port.port_id(),
189            dst_port.block_id(),
190            dst_port.port_id(),
191        ));
192        src_port.connect(dst_port);
193    }
194
195    /// Connect stream ports non-type-safe
196    ///
197    /// This function only does runtime checks. If the stream ports exist and have compatible
198    /// types and sample types, will only be checked during runtime.
199    ///
200    /// If possible, it is, therefore, recommneded to use the typed version ([Flowgraph::connect_stream]).
201    ///
202    /// This function can be helpful when using types is not practical. For example, when a runtime
203    /// option switches between different block types, which is often used to switch between
204    /// reading samples from hardware or a file.
205    ///
206    /// ```
207    /// use anyhow::Result;
208    /// use futuresdr::blocks::Head;
209    /// use futuresdr::blocks::NullSink;
210    /// use futuresdr::blocks::NullSource;
211    /// use futuresdr::prelude::*;
212    ///
213    /// fn main() -> Result<()> {
214    ///     let mut fg = Flowgraph::new();
215    ///
216    ///     let src = NullSource::<u8>::new();
217    ///     let head = Head::<u8>::new(1234);
218    ///     let snk = NullSink::<u8>::new();
219    ///
220    ///     // type erasure for src
221    ///     let src = fg.add_block(src);
222    ///     let src: BlockId = src.into();
223    ///
224    ///     let head = fg.add_block(head);
225    ///
226    ///     // untyped connect
227    ///     fg.connect_dyn(src, "output", &head, "input")?;
228    ///     // typed connect
229    ///     connect!(fg, head > snk);
230    ///
231    ///     Runtime::new().run(fg)?;
232    ///     Ok(())
233    /// }
234    /// ```
235    pub fn connect_dyn(
236        &mut self,
237        src: impl Into<BlockId>,
238        src_port: impl Into<PortId>,
239        dst: impl Into<BlockId>,
240        dst_port: impl Into<PortId>,
241    ) -> Result<(), Error> {
242        let src_id = src.into();
243        let src_port = src_port.into();
244        let dst = dst.into();
245        let dst_port: PortId = dst_port.into();
246        let src = self
247            .blocks
248            .get(src_id.0)
249            .ok_or(Error::InvalidBlock(src_id))?;
250        let dst = self.blocks.get(dst.0).ok_or(Error::InvalidBlock(dst))?;
251        let mut tmp = dst.try_lock().ok_or(Error::LockError)?;
252        let reader = tmp
253            .stream_input(dst_port.name())
254            .ok_or(Error::InvalidStreamPort(BlockPortCtx::Id(src_id), dst_port))?;
255        src.try_lock()
256            .ok_or(Error::LockError)?
257            .connect_stream_output(src_port.name(), reader)
258    }
259
260    /// Make message connection
261    pub fn connect_message(
262        &mut self,
263        src_block: impl Into<BlockId>,
264        src_port: impl Into<PortId>,
265        dst_block: impl Into<BlockId>,
266        dst_port: impl Into<PortId>,
267    ) -> Result<(), Error> {
268        let src_id = src_block.into();
269        let dst_id = dst_block.into();
270        let src_port = src_port.into();
271        let dst_port = dst_port.into();
272        debug_assert_ne!(src_id, dst_id);
273
274        let mut src_block = self
275            .blocks
276            .get(src_id.0)
277            .ok_or(Error::InvalidBlock(src_id))?
278            .try_lock()
279            .ok_or_else(|| Error::RuntimeError(format!("unable to lock block {src_id:?}")))?;
280        let dst_block = self
281            .blocks
282            .get(dst_id.0)
283            .ok_or(Error::InvalidBlock(dst_id))?
284            .try_lock()
285            .ok_or_else(|| Error::RuntimeError(format!("unable to lock block {dst_id:?}")))?;
286        let dst_box = dst_block.inbox();
287
288        src_block.connect(&src_port, dst_box, &dst_port)?;
289        if !dst_block.message_inputs().contains(&dst_port.name()) {
290            return Err(Error::InvalidMessagePort(
291                BlockPortCtx::Id(dst_id),
292                dst_port,
293            ));
294        }
295        self.message_edges
296            .push((src_id, src_port, dst_id, dst_port));
297        Ok(())
298    }
299
300    /// Get dyn reference to [Block]
301    ///
302    /// This should only be used when a [BlockRef], i.e., a typed reference to the block is not
303    /// available.
304    ///
305    /// A dyn Block reference can be downcasted to a typed refrence, e.g.:
306    ///
307    /// ```rust
308    /// use anyhow::Result;
309    /// use futuresdr::blocks::Head;
310    /// use futuresdr::blocks::NullSink;
311    /// use futuresdr::blocks::NullSource;
312    /// use futuresdr::prelude::*;
313    /// use futuresdr::runtime::WrappedKernel;
314    ///
315    /// fn main() -> Result<()> {
316    ///     let mut fg = Flowgraph::new();
317    ///
318    ///     let src = NullSource::<u8>::new();
319    ///     let head = Head::<u8>::new(1234);
320    ///     let snk = NullSink::<u8>::new();
321    ///
322    ///     connect!(fg, src > head > snk);
323    ///
324    ///     // Let's assume this is required.
325    ///     let snk: BlockId = snk.into();
326    ///     fg = Runtime::new().run(fg)?;
327    ///
328    ///     let mut blk = fg.get_block(snk)?.lock_arc_blocking();
329    ///     let snk = blk
330    ///         .as_any_mut()
331    ///         .downcast_mut::<WrappedKernel<NullSink<u8>>>()
332    ///         .unwrap();
333    ///     let v = snk.n_received();
334    ///     assert_eq!(v, 1234);
335    ///
336    ///     Ok(())
337    /// }
338    /// ```
339    pub fn get_block(&self, id: BlockId) -> Result<Arc<Mutex<dyn Block>>, Error> {
340        Ok(self
341            .blocks
342            .get(id.0)
343            .ok_or(Error::InvalidBlock(id))?
344            .clone())
345    }
346}
347
348impl Default for Flowgraph {
349    fn default() -> Self {
350        Self::new()
351    }
352}