async_pipes/lib.rs
1//! Create a lightweight, concurrent data processing pipeline for Rust applications.
2//!
3//! # Overview
4//!
5//! Async Pipes provides a simple way to create high-throughput data processing pipelines by
6//! utilizing Rust's asynchronous runtime capabilities. This is done by managing task execution and
7//! data flow so the developer only has to worry about the task-specific implementation for each
8//! stage in the pipeline.
9//!
10//! # Terminology
11//!
12//! All of these are abstractions to help conceptualize how data is transferred and operated on in
13//! the pipeline.
14//!
15//! * **Pipe** - Represents something where a type of data can flow.
16//! An example of this being a pipe that allows strings to flow through it.
17//! * **Stage** - Represents the "nodes" in a pipeline where work is done.
18//! A stage typically includes the definition of the worker, an optional pipe connection
19//! for reading data from, and zero or more pipe connections for sending data to.
20//! * **Worker** - A worker is internally defined by this library, and does the work of
21//! reading from the optional input pipe, performing a user-defined task on the input, and
22//! then writing the output of that task to the zero or more output pipes.
23//! * **Pipeline** - Represents the overall set of stages and the pipes that connect the stages.
24//! Pipelines don't necessarily have to be linear, they can branch off of one stage into
25//! multiple stages.
26//!
27//! # Getting Started
28//!
29//! A pipeline can be built using the builder provided by [Pipeline::builder]. This allows the
30//! pipeline to be configured before any work is done.
31//! ```
32//! use async_pipes::{Pipeline, PipelineBuilder};
33//!
34//! let builder: PipelineBuilder = Pipeline::builder();
35//! ```
36//!
37//! Using the builder, stages can be defined, where a stage contains the name of a pipe to read from
38//! (if applicable), the name of a pipe to write to (or more if applicable), some options for the
39//! worker, and a user-defined "task" function.
40//!
41//! For information on what worker options are available, see [WorkerOptions].
42//!
43//! Demonstrated below is a pipeline being built with a producer stage, a regular stage, and a
44//! consuming stage.
45//! ```
46//! use async_pipes::Pipeline;
47//! use async_pipes::WorkerOptions;
48//!
49//! #[tokio::main]
50//! async fn main() {
51//! let pipeline: Result<Pipeline, String> = Pipeline::builder()
52//! .with_inputs("InputPipe", vec![1, 2, 3])
53//! .with_stage(
54//! "InputPipe",
55//! "OutputPipe",
56//! WorkerOptions::default(),
57//! |n: i32| async move { Some(n + 1) }
58//! )
59//! .with_consumer(
60//! "OutputPipe",
61//! WorkerOptions::default_single_task(),
62//! |n: i32| async move { println!("{}", n) }
63//! )
64//! .build();
65//!
66//! assert!(pipeline.is_ok());
67//! }
68//! ```
69//!
70//! With the builder, any number of stages can be defined with any number of pipes, but there are a
71//! few requirements:
72//! 1. There must be at least one producer - how else will data get into the pipeline?
73//! 2. Every pipe must have a corresponding stage that reads data from it - this is required to
74//! avoid a deadlock from pipes being filled up but not emptied.
75//!
76//! These requirements are enforced by [PipelineBuilder::build] returning a
77//! [Result<Pipeline, String>] where an error describing the missing requirement is returned.
78//!
79//! For example, here is an invalid pipeline due to requirement (1) not being followed:
80//! ```
81//! use async_pipes::Pipeline;
82//! use async_pipes::WorkerOptions;
83//!
84//! #[tokio::main]
85//! async fn main() {
86//! let pipeline = Pipeline::builder()
87//! .with_consumer("MyPipe", WorkerOptions::default(), |n: usize| async move {
88//! println!("{}", n);
89//! })
90//! .build();
91//!
92//! assert_eq!(pipeline.unwrap_err(), "pipeline must have at least one producer");
93//! }
94//! ```
95//!
96//! And here is an invalid pipeline due to requirement (2) not being followed:
97//! ```
98//! use async_pipes::Pipeline;
99//!
100//! #[tokio::main]
101//! async fn main() {
102//! let pipeline = Pipeline::builder()
103//! .with_inputs("MyPipe", vec![1, 2, 3])
104//! .build();
105//!
106//! assert_eq!(pipeline.unwrap_err(), "pipeline has open-ended pipe: 'MyPipe'");
107//! }
108//! ```
109//!
110//! Once an `Ok(Pipeline)` is returned, it can be waited on using [Pipeline::wait], where it will
111//! make progress until all workers finish or there is no more data in the pipeline.
112//!
113//! _Note_: When a pipeline is built, depending on the runtime it may or may not be running.
114//! In single-threaded runtimes no progress will be made as the workers can't make progress on their
115//! own unless the single thread yields to them. It is possible for them to make progress in multi-
116//! threaded runtimes. However, the pipeline will never "finish" until [Pipeline::wait] is called.
117//!
118//! ```
119//! use async_pipes::Pipeline;
120//! use async_pipes::WorkerOptions;
121//!
122//! #[tokio::main]
123//! async fn main() -> Result<(), String> {
124//! Pipeline::builder()
125//! .with_inputs("InputPipe", vec![1, 2, 3])
126//! .with_stage("InputPipe", "OutputPipe", WorkerOptions::default(), |n: i32| async move {
127//! Some(n + 1)
128//! })
129//! .with_consumer("OutputPipe", WorkerOptions::default(), |n: i32| async move {
130//! println!("{}", n)
131//! })
132//! .build()?
133//! .wait()
134//! .await;
135//!
136//! Ok(())
137//! }
138//! ```
139//!
140//! ### Stateful Stages
141//!
142//! It is possible to maintain state in a stage across tasks, however the state must be [Send].
143//! Usually this is best done for non-Send objects by wrapping them in an [std::sync::Mutex]
144//! (or even better, [tokio::sync::Mutex]).
145//!
146//! Another caveat with state in stages is that since the task function returns a future
147//! (`async move { ... }`), it requires ownership of non-`'static` lifetime values in order to
148//! continue working on other inputs as the future may not be able to reference borrowed state.
149//! A way around this is to wrap values that may be expensive to clone in [std::sync::Arc].
150//!
151//! The following is an example of a mutable sum being used as a stateful item in a stage:
152//! ```
153//! use async_pipes::Pipeline;
154//! use std::sync::Arc;
155//! use tokio::sync::Mutex;
156//! use async_pipes::WorkerOptions;
157//!
158//! #[tokio::main]
159//! async fn main() -> Result<(), String> {
160//! // [AtomicUsize] may be preferred here, but we use [Mutex] for the sake of this example
161//! let sum = Arc::new(Mutex::new(0));
162//! // For the assertion at the end of this example
163//! let test_sum = sum.clone();
164//!
165//! Pipeline::builder()
166//! .with_inputs("InputPipe", vec![1, 2, 3])
167//! .with_stage("InputPipe", "OutputPipe", WorkerOptions::default(), move |n: i32| {
168//! // As the sum is owned by this closure, we need to clone it to move an owned value
169//! // into the `async move` block.
170//! let sum = sum.clone();
171//! async move {
172//! let mut sum = sum.lock().await;
173//! *sum += n;
174//! Some(*sum)
175//! }
176//! })
177//! .with_consumer("OutputPipe", WorkerOptions::default(), |n: i32| async move {
178//! println!("Counter now at: {}", n)
179//! })
180//! .build()?
181//! .wait()
182//! .await;
183//!
184//! assert_eq!(*test_sum.lock().await, 6);
185//! Ok(())
186//! }
187//! ```
188//!
189//! # Stage Categories <a name="stage-categories"></a>
190//!
191//! ### Producer ("entry stage")
192//! A producer is the only place where data can be fed into the pipeline.
193//!
194//! **Static (definite)**
195//!
196//! This is where a list of concrete values can be provided to the stage and the worker will loop
197//! over each value and feed it into a pipe.
198//! * [PipelineBuilder::with_inputs]
199//! * [PipelineBuilder::with_branching_inputs]
200//!
201//! **Dynamic (indefinite)**
202//!
203//! This is useful when there are no pre-defined input values. Instead, a function that produces a
204//! single value can be provided that produces an [Option] where it's continually called until
205//! [None] is returned. This can be useful when receiving data over the network, or data is read
206//! from a file.
207//! * [PipelineBuilder::with_producer]
208//! * [PipelineBuilder::with_branching_producer]
209//!
210//! ### Consumer ("terminating stage")
211//! A consumer is a final stage in the pipeline where data ends up. It takes in a single pipe to
212//! read from and produces no output.
213//! * [PipelineBuilder::with_consumer]
214//!
215//! ### Regular (1 input, 1 output)
216//! This is an intermediate stage in the pipeline that takes in a single input, and produces one or
217//! more output.
218//! * [PipelineBuilder::with_stage]
219//! * [PipelineBuilder::with_branching_stage]
220//!
221//! ### Utility
222//! This is an intermediate stage in the pipeline that can be used to do common operations on data
223//! between pipes.
224//! * [PipelineBuilder::with_flattener]
225//!
226//! # Stage Variants
227//!
228//! ### Branching (1 input, N outputs)
229//! A branching stage is a stage where multiple output pipes are connected. This means the task
230//! defined by the user in this stage returns two or more output values.
231//! * [PipelineBuilder::with_branching_inputs]
232//! * [PipelineBuilder::with_branching_producer]
233//! * [PipelineBuilder::with_branching_stage]
234//!
235//! # Examples
236//!
237//! ```
238//! use std::sync::Arc;
239//!
240//! use async_pipes::Pipeline;
241//!
242//! use std::sync::atomic::{AtomicUsize, Ordering};
243//! use tokio::sync::Mutex;
244//! use async_pipes::WorkerOptions;
245//!
246//! #[tokio::main]
247//! async fn main() -> Result<(), String> {
248//! // Due to the task function returning a future (`async move { ... }`), data needs
249//! // to be wrapped in an [Arc] and then cloned in order to be moved into the task
250//! // while still referencing it from this scope
251//! let total_count = Arc::new(AtomicUsize::new(0));
252//! let task_total_count = total_count.clone();
253//!
254//! Pipeline::builder()
255//! .with_inputs("MapPipe", vec!["a", "bb", "ccc"])
256//!
257//! // Read from the 'MapPipe' and write to the 'ReducePipe'
258//! .with_stage(
259//! "MapPipe",
260//! "ReducePipe",
261//! WorkerOptions::default(),
262//! |value: &'static str| async move {
263//! // We return an option to tell the stage whether to write the new value
264//! // to the pipe or ignore it
265//! Some(format!("{}!", value))
266//! }
267//! )
268//!
269//! // Read from the 'ReducePipe'.
270//! .with_consumer("ReducePipe", WorkerOptions::default(), move |value: String| {
271//! // The captured `task_total_count` can't move out of this closure, so we
272//! // have to clone it to give ownership to the async block. Remember, it's
273//! // wrapped in an [Arc] so we're still referring to the original data.
274//! let total_count = task_total_count.clone();
275//! async move {
276//! total_count.fetch_add(value.len(), Ordering::SeqCst);
277//! }
278//! })
279//!
280//! // Build the pipeline and wait for it to finish
281//! .build()?
282//! .wait()
283//! .await;
284//!
285//! // We see that after the data goes through our map and reduce stages,
286//! // we effectively get this: `len("a!") + len("bb!") + len("ccc!") = 9`
287//! assert_eq!(total_count.load(Ordering::Acquire), 9);
288//! Ok(())
289//! }
290//! ```
291//!
292#![warn(missing_docs)]
293
294pub use pipeline::*;
295
296mod pipeline;
297
298/// A value used in coordination with [branch] to indicate there is no value to be sent to a
299/// pipe.
300///
301/// # Examples
302///
303/// ```
304/// use async_pipes::{NoOutput, BoxedAnySend, branch};
305///
306/// let outputs: Vec<Option<BoxedAnySend>> = branch![
307/// "one",
308/// NoOutput,
309/// 3,
310/// ];
311///
312/// assert!(outputs[0].is_some());
313/// assert!(outputs[1].is_none());
314/// assert!(outputs[2].is_some());
315/// ```
316#[derive(Debug, Clone, Hash, Ord, PartialOrd, Eq, PartialEq)]
317pub struct NoOutput;
318
319/// Defines an idiomatic way to provide values to a static branching producer stage (i.e. concrete
320/// input values).
321///
322/// A list of tuples of values (of possibly different types) can be provided, and those values will be boxed
323/// and then put into a [Vec].
324///
325/// # Examples
326///
327/// Here's an example of what is returned by the macro call.
328/// ```
329/// use async_pipes::{BoxedAnySend, branch_inputs};
330///
331/// let inputs: Vec<Vec<BoxedAnySend>> = branch_inputs![
332/// (1usize, 1i32, 1u8),
333/// (2usize, 2i32, 2u8),
334/// (3usize, 3i32, 3u8),
335/// ];
336///
337/// assert_eq!(inputs.len(), 3);
338/// ```
339///
340/// Here's an example of the macro being used in a pipeline.
341/// ```
342/// use async_pipes::{branch_inputs, Pipeline};
343/// use async_pipes::WorkerOptions;
344///
345/// #[tokio::main]
346/// async fn main() {
347/// Pipeline::builder()
348/// .with_branching_inputs(
349/// vec!["One", "Two"],
350/// branch_inputs![
351/// (1usize, "Hello"),
352/// (1usize, "World"),
353/// (1usize, "!"),
354/// ],
355/// )
356/// .with_consumer("One", WorkerOptions::default(), |value: usize| async move {
357/// /* ... */
358/// })
359/// .with_consumer("Two", WorkerOptions::default(), |value: &'static str| async move {
360/// /* ... */
361/// })
362/// .build()
363/// .unwrap()
364/// .wait()
365/// .await;
366/// }
367/// ```
368#[macro_export]
369macro_rules! branch_inputs {
370 ($(( $($x:expr),+ $(,)? )),* $(,)?) => {
371 vec![
372 $( branch_inputs!($($x),+) ),*
373 ]
374 };
375
376 ($($x:expr),+ $(,)?) => {
377 vec![
378 $(std::boxed::Box::new($x) as $crate::BoxedAnySend),+
379 ]
380 };
381}
382
383/// Defines an idiomatic way to return values in a branching stage.
384///
385/// A list of values (possibly of different types) can be provided. These values will be boxed and
386/// then wrapped in a [Some]. In order to specify [None] (i.e. no value should be sent to the
387/// respective pipe), [NoOutput] should be used in the place of a value. The macro will detect this
388/// and use [None] in its place.
389///
390/// # Examples
391///
392/// Here's an example of what is returned by the macro call.
393/// ```
394/// use async_pipes::{BoxedAnySend, NoOutput, branch};
395///
396/// let inputs: Vec<Option<BoxedAnySend>> = branch![1, "hello", true, NoOutput, 12.0];
397///
398/// assert_eq!(inputs.len(), 5);
399/// assert!(inputs[3].is_none())
400/// ```
401///
402/// Here's an example of the macro being used in a pipeline.
403/// ```
404/// use async_pipes::{branch, branch_inputs, Pipeline};
405/// use std::sync::atomic::{AtomicUsize, Ordering};
406/// use std::sync::Arc;
407/// use async_pipes::WorkerOptions;
408///
409/// #[tokio::main]
410/// async fn main() {
411/// Pipeline::builder()
412/// .with_inputs("Count", vec![1, 2, 3])
413/// .with_branching_stage(
414/// "Count",
415/// vec!["Value", "Doubled"],
416/// WorkerOptions::default(),
417/// |value: i32| async move {
418/// Some(branch![value, value * 2])
419/// }
420/// )
421/// .with_consumer("Value", WorkerOptions::default(), |value: i32| async move {
422/// /* ... */
423/// })
424/// .with_consumer("Doubled", WorkerOptions::default(), |value: i32| async move {
425/// /* ... */
426/// })
427/// .build()
428/// .unwrap()
429/// .wait()
430/// .await;
431/// }
432/// ```
433#[macro_export]
434macro_rules! branch {
435 ($($x:expr),+ $(,)?) => {
436 std::vec![
437 $({
438 let x: $crate::BoxedAnySend = std::boxed::Box::new($x);
439 x.downcast_ref::<$crate::NoOutput>().is_none().then_some(x)
440 }),+
441 ]
442 };
443}
444
445#[cfg(test)]
446mod tests {
447 use crate::{NoOutput, Pipeline, WorkerOptions};
448 use std::sync::atomic::{AtomicUsize, Ordering};
449 use std::sync::Arc;
450
451 /// Use the code of this test for the `README`'s `Simple, Linear Pipeline Example`.
452 #[tokio::test]
453 async fn test_readme_simple_linear_pipeline() {
454 let total = Arc::new(AtomicUsize::new(0));
455 let task_total = total.clone();
456
457 Pipeline::builder()
458 .with_inputs("MapPipe", vec!["a", "bb", "ccc"])
459 .with_stage(
460 "MapPipe",
461 "ReducePipe",
462 WorkerOptions::default(),
463 |value: &'static str| async move { Some(format!("{}!", value)) },
464 )
465 .with_consumer(
466 "ReducePipe",
467 WorkerOptions::default_single_task(),
468 move |value: String| {
469 let total = task_total.clone();
470 async move {
471 total.fetch_add(value.len(), Ordering::SeqCst);
472 }
473 },
474 )
475 .build()
476 .expect("failed to build pipeline!")
477 .wait()
478 .await;
479
480 assert_eq!(total.load(Ordering::Acquire), 9);
481 }
482
483 /// Use the code of this test for the `README`'s `Branching, Cyclic Pipeline Example`.
484 #[tokio::test]
485 async fn test_readme_branching_cyclic_pipeline() {
486 let initial_urls = vec![
487 "https://example.com".to_string(),
488 "https://rust-lang.org".to_string(),
489 ];
490
491 Pipeline::builder()
492 .with_inputs("ToFetch", initial_urls)
493 .with_flattener::<Vec<String>>("ToFlattenThenFetch", "ToFetch")
494 .with_stage(
495 "ToFetch",
496 "ToCrawl",
497 WorkerOptions::default_multi_task(),
498 |_url: String| async move {
499 // Fetch content from url...
500 Some("<html>Sample Content</html>".to_string())
501 },
502 )
503 .with_branching_stage(
504 "ToCrawl",
505 vec!["ToFlattenThenFetch", "ToLog"],
506 WorkerOptions::default_single_task(),
507 |_html: String| async move {
508 // Crawl HTML, extracting embedded URLs and content
509 let has_embedded_urls = false; // Mimic the crawler not finding any URLs
510
511 let output = if has_embedded_urls {
512 let urls = vec![
513 "https://first.com".to_string(),
514 "https://second.com".to_string(),
515 ];
516 branch![urls, NoOutput]
517 } else {
518 branch![NoOutput, "Extracted content".to_string()]
519 };
520
521 Some(output)
522 },
523 )
524 .with_consumer(
525 "ToLog",
526 WorkerOptions::default_single_task(),
527 |content: String| async move { println!("{content}") },
528 )
529 .build()
530 .expect("failed to build pipeline!")
531 .wait()
532 .await;
533 }
534}