futuresdr/runtime/flowgraph.rs
1use async_lock::Mutex;
2use async_lock::MutexGuard;
3use std::fmt::Debug;
4use std::sync::Arc;
5
6use crate::runtime::Block;
7use crate::runtime::BlockId;
8use crate::runtime::BlockPortCtx;
9use crate::runtime::BufferReader;
10use crate::runtime::BufferWriter;
11use crate::runtime::Error;
12use crate::runtime::Kernel;
13use crate::runtime::KernelInterface;
14use crate::runtime::PortId;
15use crate::runtime::WrappedKernel;
16
17/// Reference to a [Block] that was added to the [Flowgraph].
18///
19/// Internally, it keeps an `Arc<Mutex<WrappedKernel<K>>>`, where `K` is the struct implementing
20/// the block.
21pub struct BlockRef<K: Kernel> {
22 id: BlockId,
23 block: Arc<Mutex<WrappedKernel<K>>>,
24}
25impl<K: Kernel> BlockRef<K> {
26 /// Get a mutable, typed handle to [WrappedKernel]
27 ///
28 /// Since [WrappedKernel] implements [Deref](std::ops::Deref) and
29 /// [DerefMut](std::ops::DerefMut), one can directly access the block.
30 pub fn get(&self) -> Result<MutexGuard<'_, WrappedKernel<K>>, Error> {
31 self.block.try_lock().ok_or(Error::LockError)
32 }
33}
34impl<K: Kernel> Clone for BlockRef<K> {
35 fn clone(&self) -> Self {
36 Self {
37 id: self.id,
38 block: self.block.clone(),
39 }
40 }
41}
42impl<K: Kernel> Debug for BlockRef<K> {
43 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
44 f.debug_struct("BlockRef")
45 .field("id", &self.id)
46 .field(
47 "instance_name",
48 &self.block.try_lock().map(|b| {
49 b.meta
50 .instance_name()
51 .map(String::from)
52 .unwrap_or("<unknown>".to_string())
53 }),
54 )
55 .finish()
56 }
57}
58impl<K: Kernel> From<BlockRef<K>> for BlockId {
59 fn from(value: BlockRef<K>) -> Self {
60 value.id
61 }
62}
63impl<K: Kernel> From<&BlockRef<K>> for BlockId {
64 fn from(value: &BlockRef<K>) -> Self {
65 value.id
66 }
67}
68
69/// The main component of any FutureSDR application.
70///
71/// A [Flowgraph] is composed of a set of blocks and connections between them. It is typically set
72/// up with the [connect](futuresdr::macros::connect) macro. Once it is configure, the [Flowgraph]
73/// is executed on a [Runtime](futuresdr::runtime::Runtime).
74///
75/// ```
76/// use anyhow::Result;
77/// use futuresdr::blocks::Head;
78/// use futuresdr::blocks::NullSink;
79/// use futuresdr::blocks::NullSource;
80/// use futuresdr::prelude::*;
81///
82/// fn main() -> Result<()> {
83/// let mut fg = Flowgraph::new();
84///
85/// let src = NullSource::<u8>::new();
86/// let head = Head::<u8>::new(1234);
87/// let snk = NullSink::<u8>::new();
88///
89/// connect!(fg, src > head > snk);
90/// Runtime::new().run(fg)?;
91///
92/// Ok(())
93/// }
94/// ```
95pub struct Flowgraph {
96 pub(crate) blocks: Vec<Arc<Mutex<dyn Block>>>,
97 pub(crate) stream_edges: Vec<(BlockId, PortId, BlockId, PortId)>,
98 pub(crate) message_edges: Vec<(BlockId, PortId, BlockId, PortId)>,
99}
100
101impl Flowgraph {
102 /// Create a [Flowgraph].
103 pub fn new() -> Flowgraph {
104 Flowgraph {
105 blocks: Vec::new(),
106 stream_edges: vec![],
107 message_edges: vec![],
108 }
109 }
110
111 /// Add a [`Block`] to the [Flowgraph]
112 ///
113 /// The returned reference is typed and can be used to access the block before and after the
114 /// flowgraph ran.
115 ///
116 /// Usually, this is done under the hood by the [connect](futuresdr::macros::connect) macro.
117 ///
118 /// ```
119 /// use anyhow::Result;
120 /// use futuresdr::blocks::Head;
121 /// use futuresdr::blocks::NullSink;
122 /// use futuresdr::blocks::NullSource;
123 /// use futuresdr::prelude::*;
124 ///
125 /// fn main() -> Result<()> {
126 /// let mut fg = Flowgraph::new();
127 ///
128 /// let src = NullSource::<u8>::new();
129 /// let head = Head::<u8>::new(1234);
130 /// let snk = NullSink::<u8>::new();
131 ///
132 /// connect!(fg, src > head > snk);
133 /// Runtime::new().run(fg)?;
134 ///
135 /// // typed-access to the block
136 /// let snk = snk.get();
137 /// let n = snk.n_received();
138 /// assert_eq!(n, 1234);
139 ///
140 /// Ok(())
141 /// }
142 /// ```
143 pub fn add_block<K: Kernel + KernelInterface + 'static>(&mut self, block: K) -> BlockRef<K> {
144 let block_id = BlockId(self.blocks.len());
145 let mut b = WrappedKernel::new(block, block_id);
146 let block_name = b.type_name();
147 b.set_instance_name(&format!("{}-{}", block_name, block_id.0));
148 let b = Arc::new(Mutex::new(b));
149 self.blocks.push(b.clone());
150 BlockRef {
151 id: block_id,
152 block: b,
153 }
154 }
155
156 /// Make a stream connection
157 ///
158 /// This is the prefered way to connect stream ports. Usually, this function is not called
159 /// directly but used under-the-hood by the [connect](futuresdr::macros::connect) macro.
160 ///
161 /// ```
162 /// use anyhow::Result;
163 /// use futuresdr::blocks::Head;
164 /// use futuresdr::blocks::NullSink;
165 /// use futuresdr::blocks::NullSource;
166 /// use futuresdr::prelude::*;
167 ///
168 /// fn main() -> Result<()> {
169 /// let mut fg = Flowgraph::new();
170 ///
171 /// let src = NullSource::<u8>::new();
172 /// let head = Head::<u8>::new(1234);
173 /// let snk = NullSink::<u8>::new();
174 ///
175 /// // here, it is used under the hood
176 /// connect!(fg, src > head);
177 /// // explicit use
178 /// let snk = fg.add_block(snk);
179 /// fg.connect_stream(head.get().output(), snk.get().input());
180 ///
181 /// Runtime::new().run(fg)?;
182 /// Ok(())
183 /// }
184 /// ```
185 pub fn connect_stream<B: BufferWriter>(&mut self, src_port: &mut B, dst_port: &mut B::Reader) {
186 self.stream_edges.push((
187 src_port.block_id(),
188 src_port.port_id(),
189 dst_port.block_id(),
190 dst_port.port_id(),
191 ));
192 src_port.connect(dst_port);
193 }
194
195 /// Connect stream ports non-type-safe
196 ///
197 /// This function only does runtime checks. If the stream ports exist and have compatible
198 /// types and sample types, will only be checked during runtime.
199 ///
200 /// If possible, it is, therefore, recommneded to use the typed version ([Flowgraph::connect_stream]).
201 ///
202 /// This function can be helpful when using types is not practical. For example, when a runtime
203 /// option switches between different block types, which is often used to switch between
204 /// reading samples from hardware or a file.
205 ///
206 /// ```
207 /// use anyhow::Result;
208 /// use futuresdr::blocks::Head;
209 /// use futuresdr::blocks::NullSink;
210 /// use futuresdr::blocks::NullSource;
211 /// use futuresdr::prelude::*;
212 ///
213 /// fn main() -> Result<()> {
214 /// let mut fg = Flowgraph::new();
215 ///
216 /// let src = NullSource::<u8>::new();
217 /// let head = Head::<u8>::new(1234);
218 /// let snk = NullSink::<u8>::new();
219 ///
220 /// // type erasure for src
221 /// let src = fg.add_block(src);
222 /// let src: BlockId = src.into();
223 ///
224 /// let head = fg.add_block(head);
225 ///
226 /// // untyped connect
227 /// fg.connect_dyn(src, "output", &head, "input")?;
228 /// // typed connect
229 /// connect!(fg, head > snk);
230 ///
231 /// Runtime::new().run(fg)?;
232 /// Ok(())
233 /// }
234 /// ```
235 pub fn connect_dyn(
236 &mut self,
237 src: impl Into<BlockId>,
238 src_port: impl Into<PortId>,
239 dst: impl Into<BlockId>,
240 dst_port: impl Into<PortId>,
241 ) -> Result<(), Error> {
242 let src_id = src.into();
243 let src_port = src_port.into();
244 let dst = dst.into();
245 let dst_port: PortId = dst_port.into();
246 let src = self
247 .blocks
248 .get(src_id.0)
249 .ok_or(Error::InvalidBlock(src_id))?;
250 let dst = self.blocks.get(dst.0).ok_or(Error::InvalidBlock(dst))?;
251 let mut tmp = dst.try_lock().ok_or(Error::LockError)?;
252 let reader = tmp
253 .stream_input(dst_port.name())
254 .ok_or(Error::InvalidStreamPort(BlockPortCtx::Id(src_id), dst_port))?;
255 src.try_lock()
256 .ok_or(Error::LockError)?
257 .connect_stream_output(src_port.name(), reader)
258 }
259
260 /// Make message connection
261 pub fn connect_message(
262 &mut self,
263 src_block: impl Into<BlockId>,
264 src_port: impl Into<PortId>,
265 dst_block: impl Into<BlockId>,
266 dst_port: impl Into<PortId>,
267 ) -> Result<(), Error> {
268 let src_id = src_block.into();
269 let dst_id = dst_block.into();
270 let src_port = src_port.into();
271 let dst_port = dst_port.into();
272 debug_assert_ne!(src_id, dst_id);
273
274 let mut src_block = self
275 .blocks
276 .get(src_id.0)
277 .ok_or(Error::InvalidBlock(src_id))?
278 .try_lock()
279 .ok_or_else(|| Error::RuntimeError(format!("unable to lock block {src_id:?}")))?;
280 let dst_block = self
281 .blocks
282 .get(dst_id.0)
283 .ok_or(Error::InvalidBlock(dst_id))?
284 .try_lock()
285 .ok_or_else(|| Error::RuntimeError(format!("unable to lock block {dst_id:?}")))?;
286 let dst_box = dst_block.inbox();
287
288 src_block.connect(&src_port, dst_box, &dst_port)?;
289 if !dst_block.message_inputs().contains(&dst_port.name()) {
290 return Err(Error::InvalidMessagePort(
291 BlockPortCtx::Id(dst_id),
292 dst_port,
293 ));
294 }
295 self.message_edges
296 .push((src_id, src_port, dst_id, dst_port));
297 Ok(())
298 }
299
300 /// Get dyn reference to [Block]
301 ///
302 /// This should only be used when a [BlockRef], i.e., a typed reference to the block is not
303 /// available.
304 ///
305 /// A dyn Block reference can be downcasted to a typed refrence, e.g.:
306 ///
307 /// ```rust
308 /// use anyhow::Result;
309 /// use futuresdr::blocks::Head;
310 /// use futuresdr::blocks::NullSink;
311 /// use futuresdr::blocks::NullSource;
312 /// use futuresdr::prelude::*;
313 /// use futuresdr::runtime::WrappedKernel;
314 ///
315 /// fn main() -> Result<()> {
316 /// let mut fg = Flowgraph::new();
317 ///
318 /// let src = NullSource::<u8>::new();
319 /// let head = Head::<u8>::new(1234);
320 /// let snk = NullSink::<u8>::new();
321 ///
322 /// connect!(fg, src > head > snk);
323 ///
324 /// // Let's assume this is required.
325 /// let snk: BlockId = snk.into();
326 /// fg = Runtime::new().run(fg)?;
327 ///
328 /// let mut blk = fg.get_block(snk)?.lock_arc_blocking();
329 /// let snk = blk
330 /// .as_any_mut()
331 /// .downcast_mut::<WrappedKernel<NullSink<u8>>>()
332 /// .unwrap();
333 /// let v = snk.n_received();
334 /// assert_eq!(v, 1234);
335 ///
336 /// Ok(())
337 /// }
338 /// ```
339 pub fn get_block(&self, id: BlockId) -> Result<Arc<Mutex<dyn Block>>, Error> {
340 Ok(self
341 .blocks
342 .get(id.0)
343 .ok_or(Error::InvalidBlock(id))?
344 .clone())
345 }
346}
347
348impl Default for Flowgraph {
349 fn default() -> Self {
350 Self::new()
351 }
352}