rustradio/
block.rs

1//! RustRadio Block implementation
2//!
3//! Blocks are the main building blocks of rustradio. They each do one
4//! thing, and you connect them together with streams to process the data.
5use crate::Result;
6use crate::stream::StreamWait;
7
8/// Return type for all blocks.
9///
10/// This will let the scheduler know if more data could come out of this block,
11/// or if it should just never bother calling it again.
12pub enum BlockRet<'a> {
13    /// Everything is fine, but no information about when more data could be
14    /// created.
15    ///
16    /// The graph scheduler should feel free to call the `work` function again
17    /// without waiting or sleeping.
18    ///
19    /// `Again` should not be returned for "polling". In other words, it should
20    /// not be returned repeatedly without data being consumed or produced.
21    ///
22    /// Good examples of returning `Again`:
23    /// * A block finished being in a state (e.g. writing headers), and does not
24    ///   want to deal with restarting `work()` under the new state. Next time
25    ///   `work()` is called, it'll be in a new state, so it's just temporary.
26    ///   Example `AuEncode`.
27    /// * Stream status is checked at the start of `work()`, so instead of
28    ///   re-checking status after a `produce()`/`consume()`, it's easier
29    ///   to just let the graph call `work()` again.
30    ///   Examples: `RtlSdrDecode` and `FirFilter`.
31    ///
32    /// Importantly, in both these examples, a second `work()` call is not
33    /// expected to do nothing, and just return `Again`. It'll either do useful
34    /// work, or it'll properly return a status showing what it's blocked on.
35    ///
36    /// Bad examples of returning `Again`:
37    /// * Can't be bothered identifying the stream we're waiting for.
38    ///
39    /// Returning `Again` indefinitely wastes CPU, and means the graph will
40    /// never finish.
41    Again,
42
43    /// Block didn't produce anything this time, but has a background
44    /// process that may suddenly produce.
45    ///
46    /// The difference between `Again` and `Pending` is that `Pending` implies
47    /// to the graph runner that it's reasonable to sleep a bit before calling
48    /// `work` again. And that activity on any stream won't help either way.
49    ///
50    /// Example: `RtlSdrSource` may not currently have any new data, but we
51    /// can't control when it does. But maybe this example would be better
52    /// served with `WaitForFunc`? That way at least a multithreaded graph
53    /// executor can sleep.
54    Pending,
55
56    /// Block indicates that there's no point calling it until the provided
57    /// function has been run.
58    ///
59    /// The function is blocking, but should not block for "too long", since it
60    /// prevents checking for exit conditions like stream EOF and Ctrl-C.
61    ///
62    /// For a single threaded graph, it would stall all blocks, so it's not
63    /// called at all, and thus becomes equivalent to returning `Again`.
64    ///
65    /// Discouraged: Prefer `WaitForStream` when possible.
66    WaitForFunc(Box<dyn Fn() + 'a>),
67
68    /// Signal that we're waiting for a stream. Either an input or output
69    /// stream.
70    ///
71    /// If a block is waiting for two streams, then pick one for this return. If
72    /// in the next invocation the other stream is the one preventing progress,
73    /// then return that then. Don't worry about not being able to return a
74    /// single status indicating both are being waited for.
75    ///
76    /// This is preferred over `WaitForFunc`, since graph executors know more
77    /// about the stream. E.g. if a block says that it's waiting for more data
78    /// on a stream, and the stream writer side goes away, then the waiting
79    /// block will never be satisfied, and is therefore also shut down.
80    WaitForStream(&'a dyn StreamWait, usize),
81
82    /// Block indicates that it will never produce more input.
83    ///
84    /// Examples:
85    /// * Reading from file, without repeating, and file reached EOF.
86    /// * Reading from a `VectorSource` that reached its end.
87    /// * Head block reached its max.
88    EOF,
89}
90
91impl std::fmt::Debug for BlockRet<'_> {
92    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::result::Result<(), std::fmt::Error> {
93        write!(
94            f,
95            "{}",
96            match self {
97                BlockRet::Again => "Again",
98                BlockRet::Pending => "Pending",
99                BlockRet::WaitForFunc(_) => "WaitForFunc",
100                BlockRet::WaitForStream(_, _) => "WaitForStream",
101                BlockRet::EOF => "EOF",
102            }
103        )
104    }
105}
106
107/// Provide name of block.
108///
109/// This has to be a separate trait, because often the `impl` is proc macro
110/// generated, and it's not possible to re-open the same trait `impl` in Rust.
111pub trait BlockName {
112    /// Name of block
113    ///
114    /// Not name of *instance* of block. But it may include the type. E.g.
115    /// `FileSource<Float>`.
116    fn block_name(&self) -> &str;
117}
118
119/// Enable asking if a block is done, and will never return any more data.
120///
121/// This has to be a separate trait, because often the `impl` is proc macro
122/// generated, and it's not possible to re-open the same trait `impl` in Rust.
123pub trait BlockEOF {
124    /// Return EOF status.
125    ///
126    /// Mutable because if eof, the block is also responsible setting EOF on its
127    /// output streams.
128    #[must_use]
129    fn eof(&mut self) -> bool;
130}
131
132/// Block trait. Must be implemented for all blocks.
133///
134/// Simpler blocks can use macros to avoid needing to implement `work()`.
135pub trait Block: BlockName + BlockEOF + Send {
136    /// Block work function
137    ///
138    /// A block implementation keeps track of its own inputs and outputs.
139    fn work(&mut self) -> Result<BlockRet<'_>>;
140}
141
142#[cfg(test)]
143#[cfg_attr(coverage_nightly, coverage(off))]
144mod tests {
145    use super::*;
146
147    struct FakeWait {}
148
149    #[async_trait::async_trait]
150    impl StreamWait for FakeWait {
151        fn id(&self) -> usize {
152            123
153        }
154        fn wait(&self, _need: usize) -> bool {
155            true
156        }
157        #[cfg(feature = "async")]
158        async fn wait_async(&self, _need: usize) -> bool {
159            true
160        }
161        fn closed(&self) -> bool {
162            true
163        }
164    }
165
166    #[test]
167    fn blockret_fmt() {
168        assert_eq!(format!("{:?}", BlockRet::Again), "Again");
169        assert_eq!(format!("{:?}", BlockRet::Pending), "Pending");
170        assert_eq!(
171            format!("{:?}", BlockRet::WaitForFunc(Box::new(|| {}))),
172            "WaitForFunc"
173        );
174        assert_eq!(
175            format!("{:?}", BlockRet::WaitForStream(&FakeWait {}, 1)),
176            "WaitForStream"
177        );
178        assert_eq!(format!("{:?}", BlockRet::EOF), "EOF");
179    }
180}
181/* vim: textwidth=80
182 */