async_pipes/
lib.rs

1//! Create a lightweight, concurrent data processing pipeline for Rust applications.
2//!
3//! # Overview
4//!
5//! Async Pipes provides a simple way to create high-throughput data processing pipelines by
6//! utilizing Rust's asynchronous runtime capabilities. This is done by managing task execution and
7//! data flow so the developer only has to worry about the task-specific implementation for each
8//! stage in the pipeline.
9//!
10//! # Terminology
11//!
12//! All of these are abstractions to help conceptualize how data is transferred and operated on in
13//! the pipeline.
14//!
15//! * **Pipe** - Represents something where a type of data can flow.
16//!   An example of this being a pipe that allows strings to flow through it.
17//! * **Stage** - Represents the "nodes" in a pipeline where work is done.
18//!   A stage typically includes the definition of the worker, an optional pipe connection
19//!   for reading data from, and zero or more pipe connections for sending data to.
20//! * **Worker** - A worker is internally defined by this library, and does the work of
21//!   reading from the optional input pipe, performing a user-defined task on the input, and
22//!   then writing the output of that task to the zero or more output pipes.
23//! * **Pipeline** - Represents the overall set of stages and the pipes that connect the stages.
24//!   Pipelines don't necessarily have to be linear, they can branch off of one stage into
25//!   multiple stages.
26//!
27//! # Getting Started
28//!
29//! A pipeline can be built using the builder provided by [Pipeline::builder]. This allows the
30//! pipeline to be configured before any work is done.
31//! ```
32//! use async_pipes::{Pipeline, PipelineBuilder};
33//!
34//! let builder: PipelineBuilder = Pipeline::builder();
35//! ```
36//!
37//! Using the builder, stages can be defined, where a stage contains the name of a pipe to read from
38//! (if applicable), the name of a pipe to write to (or more if applicable), some options for the
39//! worker, and a user-defined "task" function.
40//!
41//! For information on what worker options are available, see [WorkerOptions].
42//!
43//! Demonstrated below is a pipeline being built with a producer stage, a regular stage, and a
44//! consuming stage.
45//! ```
46//! use async_pipes::Pipeline;
47//! use async_pipes::WorkerOptions;
48//!
49//! #[tokio::main]
50//! async fn main() {
51//!     let pipeline: Result<Pipeline, String> = Pipeline::builder()
52//!         .with_inputs("InputPipe", vec![1, 2, 3])
53//!         .with_stage(
54//!             "InputPipe",
55//!             "OutputPipe",
56//!             WorkerOptions::default(),
57//!             |n: i32| async move { Some(n + 1) }
58//!         )
59//!         .with_consumer(
60//!             "OutputPipe",
61//!             WorkerOptions::default_single_task(),
62//!             |n: i32| async move { println!("{}", n) }
63//!         )
64//!         .build();
65//!
66//!     assert!(pipeline.is_ok());
67//! }
68//! ```
69//!
70//! With the builder, any number of stages can be defined with any number of pipes, but there are a
71//! few requirements:
72//! 1. There must be at least one producer - how else will data get into the pipeline?
73//! 2. Every pipe must have a corresponding stage that reads data from it - this is required to
74//!    avoid a deadlock from pipes being filled up but not emptied.
75//!
76//! These requirements are enforced by [PipelineBuilder::build] returning a
77//! [Result<Pipeline, String>] where an error describing the missing requirement is returned.
78//!
79//! For example, here is an invalid pipeline due to requirement (1) not being followed:
80//! ```
81//! use async_pipes::Pipeline;
82//! use async_pipes::WorkerOptions;
83//!
84//! #[tokio::main]
85//! async fn main() {
86//!     let pipeline = Pipeline::builder()
87//!         .with_consumer("MyPipe", WorkerOptions::default(), |n: usize| async move {
88//!             println!("{}", n);
89//!         })
90//!         .build();
91//!
92//!     assert_eq!(pipeline.unwrap_err(), "pipeline must have at least one producer");
93//! }
94//! ```
95//!
96//! And here is an invalid pipeline due to requirement (2) not being followed:
97//! ```
98//! use async_pipes::Pipeline;
99//!
100//! #[tokio::main]
101//! async fn main() {
102//!     let pipeline = Pipeline::builder()
103//!         .with_inputs("MyPipe", vec![1, 2, 3])
104//!         .build();
105//!
106//!     assert_eq!(pipeline.unwrap_err(), "pipeline has open-ended pipe: 'MyPipe'");
107//! }
108//! ```
109//!
110//! Once an `Ok(Pipeline)` is returned, it can be waited on using [Pipeline::wait], where it will
111//! make progress until all workers finish or there is no more data in the pipeline.
112//!
113//! _Note_: When a pipeline is built, depending on the runtime it may or may not be running.
114//! In single-threaded runtimes no progress will be made as the workers can't make progress on their
115//! own unless the single thread yields to them. It is possible for them to make progress in multi-
116//! threaded runtimes. However, the pipeline will never "finish" until [Pipeline::wait] is called.
117//!
118//! ```
119//! use async_pipes::Pipeline;
120//! use async_pipes::WorkerOptions;
121//!
122//! #[tokio::main]
123//! async fn main() -> Result<(), String> {
124//!     Pipeline::builder()
125//!         .with_inputs("InputPipe", vec![1, 2, 3])
126//!         .with_stage("InputPipe", "OutputPipe", WorkerOptions::default(), |n: i32| async move {
127//!             Some(n + 1)
128//!         })
129//!         .with_consumer("OutputPipe", WorkerOptions::default(), |n: i32| async move {
130//!             println!("{}", n)
131//!         })
132//!         .build()?
133//!         .wait()
134//!         .await;
135//!
136//!     Ok(())
137//! }
138//! ```
139//!
140//! ### Stateful Stages
141//!
142//! It is possible to maintain state in a stage across tasks, however the state must be [Send].
143//! Usually this is best done for non-Send objects by wrapping them in an [std::sync::Mutex]
144//! (or even better, [tokio::sync::Mutex]).
145//!
146//! Another caveat with state in stages is that since the task function returns a future
147//! (`async move { ... }`), it requires ownership of non-`'static` lifetime values in order to
148//! continue working on other inputs as the future may not be able to reference borrowed state.
149//! A way around this is to wrap values that may be expensive to clone in [std::sync::Arc].
150//!
151//! The following is an example of a mutable sum being used as a stateful item in a stage:
152//! ```
153//! use async_pipes::Pipeline;
154//! use std::sync::Arc;
155//! use tokio::sync::Mutex;
156//! use async_pipes::WorkerOptions;
157//!
158//! #[tokio::main]
159//! async fn main() -> Result<(), String> {
160//!     // [AtomicUsize] may be preferred here, but we use [Mutex] for the sake of this example
161//!     let sum = Arc::new(Mutex::new(0));
162//!     // For the assertion at the end of this example
163//!     let test_sum = sum.clone();
164//!
165//!     Pipeline::builder()
166//!         .with_inputs("InputPipe", vec![1, 2, 3])
167//!         .with_stage("InputPipe", "OutputPipe", WorkerOptions::default(), move |n: i32| {
168//!             // As the sum is owned by this closure, we need to clone it to move an owned value
169//!             // into the `async move` block.
170//!             let sum = sum.clone();
171//!             async move {
172//!                 let mut sum = sum.lock().await;
173//!                 *sum += n;
174//!                 Some(*sum)
175//!             }
176//!         })
177//!         .with_consumer("OutputPipe", WorkerOptions::default(), |n: i32| async move {
178//!             println!("Counter now at: {}", n)
179//!         })
180//!         .build()?
181//!         .wait()
182//!         .await;
183//!
184//!     assert_eq!(*test_sum.lock().await, 6);
185//!     Ok(())
186//! }
187//! ```
188//!
189//! # Stage Categories <a name="stage-categories"></a>
190//!
191//! ### Producer ("entry stage")
192//! A producer is the only place where data can be fed into the pipeline.
193//!
194//! **Static (definite)**
195//!
196//! This is where a list of concrete values can be provided to the stage and the worker will loop
197//! over each value and feed it into a pipe.
198//! * [PipelineBuilder::with_inputs]
199//! * [PipelineBuilder::with_branching_inputs]
200//!
201//! **Dynamic (indefinite)**
202//!
203//! This is useful when there are no pre-defined input values. Instead, a function that produces a
204//! single value can be provided that produces an [Option] where it's continually called until
205//! [None] is returned. This can be useful when receiving data over the network, or data is read
206//! from a file.
207//! * [PipelineBuilder::with_producer]
208//! * [PipelineBuilder::with_branching_producer]
209//!
210//! ### Consumer ("terminating stage")
211//! A consumer is a final stage in the pipeline where data ends up. It takes in a single pipe to
212//! read from and produces no output.
213//! * [PipelineBuilder::with_consumer]
214//!
215//! ### Regular (1 input, 1 output)
216//! This is an intermediate stage in the pipeline that takes in a single input, and produces one or
217//! more output.
218//! * [PipelineBuilder::with_stage]
219//! * [PipelineBuilder::with_branching_stage]
220//!
221//! ### Utility
222//! This is an intermediate stage in the pipeline that can be used to do common operations on data
223//! between pipes.
224//! * [PipelineBuilder::with_flattener]
225//!
226//! # Stage Variants
227//!
228//! ### Branching (1 input, N outputs)
229//! A branching stage is a stage where multiple output pipes are connected. This means the task
230//! defined by the user in this stage returns two or more output values.
231//! * [PipelineBuilder::with_branching_inputs]
232//! * [PipelineBuilder::with_branching_producer]
233//! * [PipelineBuilder::with_branching_stage]
234//!
235//! # Examples
236//!
237//! ```
238//! use std::sync::Arc;
239//!
240//! use async_pipes::Pipeline;
241//!
242//! use std::sync::atomic::{AtomicUsize, Ordering};
243//! use tokio::sync::Mutex;
244//! use async_pipes::WorkerOptions;
245//!
246//! #[tokio::main]
247//! async fn main() -> Result<(), String> {
248//!     // Due to the task function returning a future (`async move { ... }`), data needs
249//!     // to be wrapped in an [Arc] and then cloned in order to be moved into the task
250//!     // while still referencing it from this scope
251//!     let total_count = Arc::new(AtomicUsize::new(0));
252//!     let task_total_count = total_count.clone();
253//!
254//!     Pipeline::builder()
255//!         .with_inputs("MapPipe", vec!["a", "bb", "ccc"])
256//!
257//!         // Read from the 'MapPipe' and write to the 'ReducePipe'
258//!         .with_stage(
259//!             "MapPipe",
260//!             "ReducePipe",
261//!             WorkerOptions::default(),
262//!             |value: &'static str| async move {
263//!                 // We return an option to tell the stage whether to write the new value
264//!                 // to the pipe or ignore it
265//!                 Some(format!("{}!", value))
266//!             }
267//!         )
268//!
269//!         // Read from the 'ReducePipe'.
270//!         .with_consumer("ReducePipe", WorkerOptions::default(), move |value: String| {
271//!             // The captured `task_total_count` can't move out of this closure, so we
272//!             // have to clone it to give ownership to the async block. Remember, it's
273//!             // wrapped in an [Arc] so we're still referring to the original data.
274//!             let total_count = task_total_count.clone();
275//!             async move {
276//!                 total_count.fetch_add(value.len(), Ordering::SeqCst);
277//!             }
278//!         })
279//!
280//!         // Build the pipeline and wait for it to finish
281//!         .build()?
282//!         .wait()
283//!         .await;
284//!
285//!     // We see that after the data goes through our map and reduce stages,
286//!     // we effectively get this: `len("a!") + len("bb!") + len("ccc!") = 9`
287//!     assert_eq!(total_count.load(Ordering::Acquire), 9);
288//!     Ok(())
289//! }
290//! ```
291//!
292#![warn(missing_docs)]
293
294pub use pipeline::*;
295
296mod pipeline;
297
298/// A value used in coordination with [branch] to indicate there is no value to be sent to a
299/// pipe.
300///
301/// # Examples
302///
303/// ```
304/// use async_pipes::{NoOutput, BoxedAnySend, branch};
305///
306/// let outputs: Vec<Option<BoxedAnySend>> = branch![
307///     "one",
308///     NoOutput,
309///     3,
310/// ];
311///
312/// assert!(outputs[0].is_some());
313/// assert!(outputs[1].is_none());
314/// assert!(outputs[2].is_some());
315/// ```
316#[derive(Debug, Clone, Hash, Ord, PartialOrd, Eq, PartialEq)]
317pub struct NoOutput;
318
319/// Defines an idiomatic way to provide values to a static branching producer stage (i.e. concrete
320/// input values).
321///
322/// A list of tuples of values (of possibly different types) can be provided, and those values will be boxed
323/// and then put into a [Vec].
324///
325/// # Examples
326///
327/// Here's an example of what is returned by the macro call.
328/// ```
329/// use async_pipes::{BoxedAnySend, branch_inputs};
330///
331/// let inputs: Vec<Vec<BoxedAnySend>> = branch_inputs![
332///     (1usize, 1i32, 1u8),
333///     (2usize, 2i32, 2u8),
334///     (3usize, 3i32, 3u8),
335/// ];
336///
337/// assert_eq!(inputs.len(), 3);
338/// ```
339///
340/// Here's an example of the macro being used in a pipeline.
341/// ```
342/// use async_pipes::{branch_inputs, Pipeline};
343/// use async_pipes::WorkerOptions;
344///
345/// #[tokio::main]
346/// async fn main() {
347///     Pipeline::builder()
348///         .with_branching_inputs(
349///             vec!["One", "Two"],
350///             branch_inputs![
351///                 (1usize, "Hello"),
352///                 (1usize, "World"),
353///                 (1usize, "!"),
354///             ],
355///         )
356///         .with_consumer("One", WorkerOptions::default(), |value: usize| async move {
357///             /* ... */
358///         })
359///         .with_consumer("Two", WorkerOptions::default(), |value: &'static str| async move {
360///             /* ... */
361///         })
362///         .build()
363///         .unwrap()
364///         .wait()
365///         .await;
366/// }
367/// ```
368#[macro_export]
369macro_rules! branch_inputs {
370    ($(( $($x:expr),+ $(,)? )),* $(,)?) => {
371        vec![
372            $( branch_inputs!($($x),+) ),*
373        ]
374    };
375
376    ($($x:expr),+ $(,)?) => {
377        vec![
378            $(std::boxed::Box::new($x) as $crate::BoxedAnySend),+
379        ]
380    };
381}
382
383/// Defines an idiomatic way to return values in a branching stage.
384///
385/// A list of values (possibly of different types) can be provided. These values will be boxed and
386/// then wrapped in a [Some]. In order to specify [None] (i.e. no value should be sent to the
387/// respective pipe), [NoOutput] should be used in the place of a value. The macro will detect this
388/// and use [None] in its place.
389///
390/// # Examples
391///
392/// Here's an example of what is returned by the macro call.
393/// ```
394/// use async_pipes::{BoxedAnySend, NoOutput, branch};
395///
396/// let inputs: Vec<Option<BoxedAnySend>> = branch![1, "hello", true, NoOutput, 12.0];
397///
398/// assert_eq!(inputs.len(), 5);
399/// assert!(inputs[3].is_none())
400/// ```
401///
402/// Here's an example of the macro being used in a pipeline.
403/// ```
404/// use async_pipes::{branch, branch_inputs, Pipeline};
405/// use std::sync::atomic::{AtomicUsize, Ordering};
406/// use std::sync::Arc;
407/// use async_pipes::WorkerOptions;
408///
409/// #[tokio::main]
410/// async fn main() {
411///     Pipeline::builder()
412///         .with_inputs("Count", vec![1, 2, 3])
413///         .with_branching_stage(
414///             "Count",
415///             vec!["Value", "Doubled"],
416///             WorkerOptions::default(),
417///             |value: i32| async move {
418///                 Some(branch![value, value * 2])
419///             }
420///         )
421///         .with_consumer("Value", WorkerOptions::default(), |value: i32| async move {
422///             /* ... */
423///         })
424///         .with_consumer("Doubled", WorkerOptions::default(), |value: i32| async move {
425///             /* ... */
426///         })
427///         .build()
428///         .unwrap()
429///         .wait()
430///         .await;
431/// }
432/// ```
433#[macro_export]
434macro_rules! branch {
435    ($($x:expr),+ $(,)?) => {
436        std::vec![
437            $({
438                let x: $crate::BoxedAnySend = std::boxed::Box::new($x);
439                x.downcast_ref::<$crate::NoOutput>().is_none().then_some(x)
440            }),+
441        ]
442    };
443}
444
445#[cfg(test)]
446mod tests {
447    use crate::{NoOutput, Pipeline, WorkerOptions};
448    use std::sync::atomic::{AtomicUsize, Ordering};
449    use std::sync::Arc;
450
451    /// Use the code of this test for the `README`'s `Simple, Linear Pipeline Example`.
452    #[tokio::test]
453    async fn test_readme_simple_linear_pipeline() {
454        let total = Arc::new(AtomicUsize::new(0));
455        let task_total = total.clone();
456
457        Pipeline::builder()
458            .with_inputs("MapPipe", vec!["a", "bb", "ccc"])
459            .with_stage(
460                "MapPipe",
461                "ReducePipe",
462                WorkerOptions::default(),
463                |value: &'static str| async move { Some(format!("{}!", value)) },
464            )
465            .with_consumer(
466                "ReducePipe",
467                WorkerOptions::default_single_task(),
468                move |value: String| {
469                    let total = task_total.clone();
470                    async move {
471                        total.fetch_add(value.len(), Ordering::SeqCst);
472                    }
473                },
474            )
475            .build()
476            .expect("failed to build pipeline!")
477            .wait()
478            .await;
479
480        assert_eq!(total.load(Ordering::Acquire), 9);
481    }
482
483    /// Use the code of this test for the `README`'s `Branching, Cyclic Pipeline Example`.
484    #[tokio::test]
485    async fn test_readme_branching_cyclic_pipeline() {
486        let initial_urls = vec![
487            "https://example.com".to_string(),
488            "https://rust-lang.org".to_string(),
489        ];
490
491        Pipeline::builder()
492            .with_inputs("ToFetch", initial_urls)
493            .with_flattener::<Vec<String>>("ToFlattenThenFetch", "ToFetch")
494            .with_stage(
495                "ToFetch",
496                "ToCrawl",
497                WorkerOptions::default_multi_task(),
498                |_url: String| async move {
499                    // Fetch content from url...
500                    Some("<html>Sample Content</html>".to_string())
501                },
502            )
503            .with_branching_stage(
504                "ToCrawl",
505                vec!["ToFlattenThenFetch", "ToLog"],
506                WorkerOptions::default_single_task(),
507                |_html: String| async move {
508                    // Crawl HTML, extracting embedded URLs and content
509                    let has_embedded_urls = false; // Mimic the crawler not finding any URLs
510
511                    let output = if has_embedded_urls {
512                        let urls = vec![
513                            "https://first.com".to_string(),
514                            "https://second.com".to_string(),
515                        ];
516                        branch![urls, NoOutput]
517                    } else {
518                        branch![NoOutput, "Extracted content".to_string()]
519                    };
520
521                    Some(output)
522                },
523            )
524            .with_consumer(
525                "ToLog",
526                WorkerOptions::default_single_task(),
527                |content: String| async move { println!("{content}") },
528            )
529            .build()
530            .expect("failed to build pipeline!")
531            .wait()
532            .await;
533    }
534}