rustradio/block.rs
1//! RustRadio Block implementation
2//!
3//! Blocks are the main building blocks of rustradio. They each do one
4//! thing, and you connect them together with streams to process the data.
5use crate::Result;
6use crate::stream::StreamWait;
7
8/// Return type for all blocks.
9///
10/// This will let the scheduler know if more data could come out of this block,
11/// or if it should just never bother calling it again.
12pub enum BlockRet<'a> {
13 /// Everything is fine, but no information about when more data could be
14 /// created.
15 ///
16 /// The graph scheduler should feel free to call the `work` function again
17 /// without waiting or sleeping.
18 ///
19 /// `Again` should not be returned for "polling". In other words, it should
20 /// not be returned repeatedly without data being consumed or produced.
21 ///
22 /// Good examples of returning `Again`:
23 /// * A block finished being in a state (e.g. writing headers), and does not
24 /// want to deal with restarting `work()` under the new state. Next time
25 /// `work()` is called, it'll be in a new state, so it's just temporary.
26 /// Example `AuEncode`.
27 /// * Stream status is checked at the start of `work()`, so instead of
28 /// re-checking status after a `produce()`/`consume()`, it's easier
29 /// to just let the graph call `work()` again.
30 /// Examples: `RtlSdrDecode` and `FirFilter`.
31 ///
32 /// Importantly, in both these examples, a second `work()` call is not
33 /// expected to do nothing, and just return `Again`. It'll either do useful
34 /// work, or it'll properly return a status showing what it's blocked on.
35 ///
36 /// Bad examples of returning `Again`:
37 /// * Can't be bothered identifying the stream we're waiting for.
38 ///
39 /// Returning `Again` indefinitely wastes CPU, and means the graph will
40 /// never finish.
41 Again,
42
43 /// Block didn't produce anything this time, but has a background
44 /// process that may suddenly produce.
45 ///
46 /// The difference between `Again` and `Pending` is that `Pending` implies
47 /// to the graph runner that it's reasonable to sleep a bit before calling
48 /// `work` again. And that activity on any stream won't help either way.
49 ///
50 /// Example: `RtlSdrSource` may not currently have any new data, but we
51 /// can't control when it does. But maybe this example would be better
52 /// served with `WaitForFunc`? That way at least a multithreaded graph
53 /// executor can sleep.
54 Pending,
55
56 /// Block indicates that there's no point calling it until the provided
57 /// function has been run.
58 ///
59 /// The function is blocking, but should not block for "too long", since it
60 /// prevents checking for exit conditions like stream EOF and Ctrl-C.
61 ///
62 /// For a single threaded graph, it would stall all blocks, so it's not
63 /// called at all, and thus becomes equivalent to returning `Again`.
64 ///
65 /// Discouraged: Prefer `WaitForStream` when possible.
66 WaitForFunc(Box<dyn Fn() + 'a>),
67
68 /// Signal that we're waiting for a stream. Either an input or output
69 /// stream.
70 ///
71 /// If a block is waiting for two streams, then pick one for this return. If
72 /// in the next invocation the other stream is the one preventing progress,
73 /// then return that then. Don't worry about not being able to return a
74 /// single status indicating both are being waited for.
75 ///
76 /// This is preferred over `WaitForFunc`, since graph executors know more
77 /// about the stream. E.g. if a block says that it's waiting for more data
78 /// on a stream, and the stream writer side goes away, then the waiting
79 /// block will never be satisfied, and is therefore also shut down.
80 WaitForStream(&'a dyn StreamWait, usize),
81
82 /// Block indicates that it will never produce more input.
83 ///
84 /// Examples:
85 /// * Reading from file, without repeating, and file reached EOF.
86 /// * Reading from a `VectorSource` that reached its end.
87 /// * Head block reached its max.
88 EOF,
89}
90
91impl std::fmt::Debug for BlockRet<'_> {
92 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::result::Result<(), std::fmt::Error> {
93 write!(
94 f,
95 "{}",
96 match self {
97 BlockRet::Again => "Again",
98 BlockRet::Pending => "Pending",
99 BlockRet::WaitForFunc(_) => "WaitForFunc",
100 BlockRet::WaitForStream(_, _) => "WaitForStream",
101 BlockRet::EOF => "EOF",
102 }
103 )
104 }
105}
106
107/// Provide name of block.
108///
109/// This has to be a separate trait, because often the `impl` is proc macro
110/// generated, and it's not possible to re-open the same trait `impl` in Rust.
111pub trait BlockName {
112 /// Name of block
113 ///
114 /// Not name of *instance* of block. But it may include the type. E.g.
115 /// `FileSource<Float>`.
116 fn block_name(&self) -> &str;
117}
118
119/// Enable asking if a block is done, and will never return any more data.
120///
121/// This has to be a separate trait, because often the `impl` is proc macro
122/// generated, and it's not possible to re-open the same trait `impl` in Rust.
123pub trait BlockEOF {
124 /// Return EOF status.
125 ///
126 /// Mutable because if eof, the block is also responsible setting EOF on its
127 /// output streams.
128 #[must_use]
129 fn eof(&mut self) -> bool;
130}
131
132/// Block trait. Must be implemented for all blocks.
133///
134/// Simpler blocks can use macros to avoid needing to implement `work()`.
135pub trait Block: BlockName + BlockEOF + Send {
136 /// Block work function
137 ///
138 /// A block implementation keeps track of its own inputs and outputs.
139 fn work(&mut self) -> Result<BlockRet<'_>>;
140}
141
142#[cfg(test)]
143#[cfg_attr(coverage_nightly, coverage(off))]
144mod tests {
145 use super::*;
146
147 struct FakeWait {}
148
149 #[async_trait::async_trait]
150 impl StreamWait for FakeWait {
151 fn id(&self) -> usize {
152 123
153 }
154 fn wait(&self, _need: usize) -> bool {
155 true
156 }
157 #[cfg(feature = "async")]
158 async fn wait_async(&self, _need: usize) -> bool {
159 true
160 }
161 fn closed(&self) -> bool {
162 true
163 }
164 }
165
166 #[test]
167 fn blockret_fmt() {
168 assert_eq!(format!("{:?}", BlockRet::Again), "Again");
169 assert_eq!(format!("{:?}", BlockRet::Pending), "Pending");
170 assert_eq!(
171 format!("{:?}", BlockRet::WaitForFunc(Box::new(|| {}))),
172 "WaitForFunc"
173 );
174 assert_eq!(
175 format!("{:?}", BlockRet::WaitForStream(&FakeWait {}, 1)),
176 "WaitForStream"
177 );
178 assert_eq!(format!("{:?}", BlockRet::EOF), "EOF");
179 }
180}
181/* vim: textwidth=80
182 */