async_pipes/pipeline/
builder.rs

1use std::collections::HashMap;
2use std::future::Future;
3use std::sync::Arc;
4
5use itertools::Itertools;
6use tokio::task::JoinSet;
7
8use crate::pipeline::io::*;
9use crate::pipeline::sync::*;
10use crate::pipeline::workers::{
11    new_detached_flattener, new_detached_producer, new_detached_worker,
12};
13use crate::pipeline::*;
14
15struct CreateWorkersOutput {
16    producers: JoinSet<()>,
17    workers: JoinSet<()>,
18    signal_txs: Vec<Sender<StageWorkerSignal>>,
19}
20
21/// Used to construct a [Pipeline].
22///
23/// Can be created using [Pipeline::builder].
24///
25///
26#[derive(Default)]
27pub struct PipelineBuilder {
28    stages: Vec<Stage>,
29}
30
31impl PipelineBuilder {
32    /// A "producer" stage; registers a list of inputs to be written to a provided pipe.
33    ///
34    /// The string provided to `pipe` defines where the values will be written to.
35    /// The values will be written one at a time into the pipe.
36    ///
37    /// # Returns
38    ///
39    /// This pipeline builder.
40    ///
41    pub fn with_inputs<S, I>(self, pipe: S, inputs: Vec<I>) -> Self
42    where
43        S: AsRef<str>,
44        I: Send + 'static,
45    {
46        self.with_branching_inputs(
47            vec![pipe],
48            inputs
49                .into_iter()
50                .map(|i| vec![Box::new(i) as BoxedAnySend])
51                .collect(),
52        )
53    }
54
55    /// A "producer" stage; registers a list of multiple inputs to be written to a list of
56    /// corresponding pipes.
57    ///
58    /// The strings provided to `pipes` define where each input will go.
59    /// The values will be written one at a time into each pipe.
60    ///
61    /// For example, say you have the following:
62    ///
63    /// ```text
64    /// List of multiple inputs: [ [1, "hi", true], [2, "bye", false], [3, ".", false] ]
65    /// List of pipes: [ "numbers", "strings", "bools" ]
66    /// ```
67    ///
68    /// The inputs would be sent to the pipes like this:
69    ///
70    /// ```text
71    /// Pipe         1st   2nd    3rd
72    /// "numbers" <- 1     2      3
73    /// "strings" <- "hi"  "bye"  "."
74    /// "bools"   <- true  false  false
75    /// ```
76    ///
77    /// # Returns
78    ///
79    /// This pipeline builder.
80    ///
81    pub fn with_branching_inputs<S>(self, pipes: Vec<S>, inputs: Vec<Vec<BoxedAnySend>>) -> Self
82    where
83        S: AsRef<str>,
84    {
85        let mut iter = inputs.into_iter();
86        self.with_branching_producer(pipes, move || {
87            let inputs = iter.next();
88            async move { inputs.map(|is| is.into_iter().map(Some).collect()) }
89        })
90    }
91
92    /// A "producer" stage; registers a stage that produces values and writes them into a pipe.
93    ///
94    /// The strings provided to `pipes` define where each input will go.
95    ///
96    /// The producer will continue producing values while the user-provided task function returns
97    /// [Some]. This means that it is possible to create an infinite stream of values by simply
98    /// never returning [None].
99    ///
100    /// # Returns
101    ///
102    /// This pipeline builder.
103    ///
104    pub fn with_producer<S, I, F, Fut>(self, pipe: S, mut task: F) -> Self
105    where
106        S: AsRef<str>,
107        I: Send + 'static,
108        F: FnMut() -> Fut + Send + 'static,
109        Fut: Future<Output = Option<I>> + Send + 'static,
110    {
111        self.with_branching_producer(vec![pipe], move || {
112            let task_fut = task();
113            async move {
114                task_fut
115                    .await
116                    .map(|t| vec![Some(Box::new(t) as BoxedAnySend)])
117            }
118        })
119    }
120
121    /// A "producer" stage; registers a new stage that produces multiple values and writes them into
122    /// their respective pipe.
123    ///
124    /// The strings provided to `pipes` define where each input will go.
125    ///
126    /// The producer will continue producing values while the user-provided task function returns
127    /// [Some]. This means that it is possible to create an infinite stream of values by simply
128    /// never returning [None].
129    ///
130    /// Each individual [Option] within the task output determines whether it will be sent to the
131    /// corresponding pipe. If [Some] is specified, the inner value will be sent, if [None] is
132    /// specified, nothing will be sent.
133    ///
134    /// As with all stages that have more than one ("branching") outputs, it's possible that each
135    /// output could have a different type, and so to avoid large binary sizes from static
136    /// dispatching, dynamic dispatching is used instead, utilizing the [BoxedAnySend] type. For
137    /// examples on how to return these types of values in task functions, see [BoxedAnySend]'s
138    /// examples.
139    ///
140    /// # Returns
141    ///
142    /// This pipeline builder.
143    ///
144    pub fn with_branching_producer<S, F, Fut>(mut self, pipes: Vec<S>, mut task: F) -> Self
145    where
146        S: AsRef<str>,
147        F: FnMut() -> Fut + Send + 'static,
148        Fut: Future<Output = Option<Vec<Option<BoxedAnySend>>>> + Send + 'static,
149    {
150        let pipes = pipes.iter().map(|p| p.as_ref().to_string()).collect();
151        self.stages.push(Stage::Producer {
152            function: Box::new(move || Box::pin(task())),
153            pipes: ProducerPipeNames { writers: pipes },
154        });
155        self
156    }
157
158    /// A "consumer" stage; registers a new stage that consumes values from a pipe.
159    ///
160    /// The string provided to `pipe` define where values will be read from.
161    ///
162    /// The consumer will continue consuming values until the pipeline is terminated or the pipe it
163    /// is receiving from is closed.
164    ///
165    /// # Returns
166    ///
167    /// This pipeline builder.
168    ///
169    pub fn with_consumer<S, I, F, Fut>(self, pipe: S, options: WorkerOptions, task: F) -> Self
170    where
171        S: AsRef<str>,
172        I: Send + 'static,
173        F: Fn(I) -> Fut + Send + Sync + 'static,
174        Fut: Future<Output = ()> + Send + 'static,
175    {
176        self.with_branching_stage(pipe, Vec::<String>::new(), options, move |value| {
177            let task_fut = task(value);
178            async move {
179                task_fut.await;
180                Some(Vec::<Option<BoxedAnySend>>::new())
181            }
182        })
183    }
184
185    /// A "regular" stage; registers a new stage that operates on an input and produce a single
186    /// output value that is written to a pipe.
187    ///
188    /// The string provided to `input_pipe` defines where values will be read from.
189    /// The string provided to `output_pipe` defines where the produced output will go.
190    ///
191    /// The worker will continue working on input values until the pipeline is terminated or the
192    /// pipe it is receiving from is closed.
193    ///
194    /// The [Option] returned by the task function determines whether it will be sent to the output
195    /// pipe. If [Some] is specified, the inner value will be sent, if [None] is specified, nothing
196    /// will be sent.
197    ///
198    /// # Returns
199    ///
200    /// This pipeline builder.
201    ///
202    pub fn with_stage<I, O, F, Fut>(
203        self,
204        input_pipe: impl AsRef<str>,
205        output_pipe: impl AsRef<str>,
206        options: WorkerOptions,
207        task: F,
208    ) -> Self
209    where
210        I: Send + 'static,
211        O: Send + 'static,
212        F: Fn(I) -> Fut + Send + Sync + 'static,
213        Fut: Future<Output = Option<O>> + Send + 'static,
214    {
215        self.with_branching_stage(input_pipe, vec![output_pipe], options, move |value| {
216            let task_fut = task(value);
217            async move {
218                task_fut
219                    .await
220                    .map(|v| Box::new(v) as BoxedAnySend)
221                    .map(|v| vec![Some(v)])
222            }
223        })
224    }
225
226    /// A "regular" stage; registers a new stage that operates on an input and produces multiple
227    /// values that are written into their respective pipe.
228    ///
229    /// The string provided to `input_pipe` defines where values will be read from.
230    /// The strings provided to `output_pipes` define where each produced output will go.
231    ///
232    /// The worker will continue working on input values until the pipeline is terminated or the
233    /// pipe it is receiving from is closed.
234    ///
235    /// * If the user-defined task function returns [None], nothing will be done.
236    /// * If it returns [Some], the inner value ([`Vec<Option<BoxedAnySend>>`]) will have the
237    ///   following applied to each output option:
238    ///     * If [Some] is specified, the inner value will be sent to the corresponding pipe.
239    ///     * If [None] is specified, nothing will be sent.
240    ///
241    /// As with all stages that have more than one ("branching") outputs, it's possible that each
242    /// output could have a different type, and so to avoid large binary sizes from static
243    /// dispatching, dynamic dispatching is used instead, utilizing the [BoxedAnySend] type. For
244    /// examples on how to return these types of values in task functions, see [BoxedAnySend]'s
245    /// examples.
246    ///
247    /// # Returns
248    ///
249    /// This pipeline builder.
250    ///
251    pub fn with_branching_stage<I, F, Fut>(
252        mut self,
253        input_pipe: impl AsRef<str>,
254        output_pipes: Vec<impl AsRef<str>>,
255        options: WorkerOptions,
256        task: F,
257    ) -> Self
258    where
259        I: Send + 'static,
260        F: Fn(I) -> Fut + Send + Sync + 'static,
261        Fut: Future<Output = Option<Vec<Option<BoxedAnySend>>>> + Send + 'static,
262    {
263        let input_pipe = input_pipe.as_ref().to_string();
264        let output_pipes = output_pipes
265            .iter()
266            .map(|p| p.as_ref().to_string())
267            .collect();
268        let err_pipe = input_pipe.clone();
269
270        self.stages.push(Stage::Regular {
271            function: Box::new(move |value: BoxedAnySend| {
272                let value = downcast_from_pipe(value, &err_pipe);
273                Box::pin(task(*value))
274            }),
275            pipes: TaskPipeNames {
276                reader: input_pipe,
277                writers: output_pipes,
278            },
279            options,
280        });
281        self
282    }
283
284    /// An "iterator-based" stage; registers a new stage that takes the data from one pipe and
285    /// "flattens" it, feeding the results into another.
286    ///
287    /// This is useful if you have a pipe that produces a list of values in a single task execution,
288    /// but you want to use it as input to another stage that takes only the individual values.
289    ///
290    /// The generic parameter is used by the pipeline builder to know what concrete type to
291    /// cast the value to, which mean turbofish syntax will be needed to specify what the iterator
292    /// type of that pipe is, for example:
293    /// `Pipeline::builder().with_flattener::<Vec<u8>>("data", "bytes")`
294    ///
295    /// The string provided to `from_pipe` defines where the iterator of values will be read from.
296    /// The string provided to `to_pipe` defines where the individual values will go.
297    ///
298    /// The worker will continue working until the pipeline is terminated or the pipe it is
299    /// receiving from is closed.
300    ///
301    /// # Examples
302    ///
303    /// ```
304    /// use std::sync::Arc;
305    /// use std::sync::atomic::{AtomicI32, Ordering};
306    /// use async_pipes::Pipeline;
307    /// use async_pipes::WorkerOptions;
308    ///
309    /// #[tokio::main]
310    /// async fn main() {
311    ///     let sum = Arc::new(AtomicI32::new(0));
312    ///     let task_sum = sum.clone();
313    ///
314    ///     Pipeline::builder()
315    ///         .with_inputs("NumberSets", vec![vec![1, 2], vec![3, 4, 5]])
316    ///         .with_flattener::<Vec<i32>>("NumberSets", "Numbers")
317    ///         .with_consumer("Numbers", WorkerOptions::default_single_task(), move |value: i32| {
318    ///             let sum = task_sum.clone();
319    ///             async move {
320    ///                 sum.fetch_add(value, Ordering::SeqCst);
321    ///             }
322    ///         })
323    ///         .build()
324    ///         .expect("failed to build pipeline")
325    ///         .wait()
326    ///         .await;
327    ///
328    ///     assert_eq!(sum.load(Ordering::Acquire), 15);
329    /// }
330    /// ```
331    ///
332    /// # Returns
333    ///
334    /// This pipeline builder.
335    ///
336    pub fn with_flattener<It>(
337        mut self,
338        from_pipe: impl AsRef<str>,
339        to_pipe: impl AsRef<str>,
340    ) -> Self
341    where
342        It: IntoIterator + Send + 'static,
343        It::Item: Send,
344    {
345        let input_pipe = from_pipe.as_ref().to_string();
346        let output_pipe = to_pipe.as_ref().to_string();
347        let err_pipe = input_pipe.clone();
348
349        self.stages.push(Stage::Iterator {
350            stage_type: IterStageType::Flatten,
351            caster: Box::new(move |value: BoxedAnySend| {
352                downcast_from_pipe::<It>(value, &err_pipe)
353                    .into_iter()
354                    .map(|v| Box::new(v) as BoxedAnySend)
355                    .collect()
356            }),
357            pipes: TaskPipeNames {
358                reader: input_pipe,
359                writers: vec![output_pipe],
360            },
361            options: WorkerOptions::default_single_task(),
362        });
363        self
364    }
365
366    /// A utility function for improving readability when building pipelines.
367    ///
368    /// This makes splitting up task definitions into functions easier by allowing the caller
369    /// to pass in a function that takes `self` and returns `self`, effectively providing
370    /// continuity to a call chain.
371    ///
372    /// # Examples
373    ///
374    /// ```
375    /// use std::sync::Arc;
376    /// use std::sync::atomic::{AtomicI32, Ordering};
377    /// use async_pipes::Pipeline;
378    /// use async_pipes::PipelineBuilder;
379    ///
380    /// #[tokio::main]
381    /// async fn main() {
382    ///     let sum = Arc::new(AtomicI32::new(0));
383    ///
384    ///     Pipeline::builder()
385    ///         .with_inputs("Numbers", vec![1, 2, 3, 4, 5])
386    ///         .also(build_consumer(sum.clone()))
387    ///         .build()
388    ///         .expect("failed to build pipeline")
389    ///         .wait()
390    ///         .await;
391    ///
392    ///     assert_eq!(sum.load(Ordering::Acquire), 15);
393    /// }
394    ///
395    /// fn build_consumer(sum: Arc<AtomicI32>) -> impl FnOnce(PipelineBuilder) -> PipelineBuilder {
396    ///     use async_pipes::WorkerOptions;
397    /// |builder| builder.with_consumer("Numbers", WorkerOptions::default(), move |value: i32| {
398    ///         let sum = sum.clone();
399    ///         async move {
400    ///             sum.fetch_add(value, Ordering::SeqCst);
401    ///         }
402    ///     })
403    /// }
404    /// ```
405    ///
406    pub fn also(self, handler: impl FnOnce(PipelineBuilder) -> PipelineBuilder) -> Self {
407        handler(self)
408    }
409
410    /// When the pipeline is ready to be built, this is called and will return a pipeline if
411    /// it was successfully built, otherwise it will return an error describing why it could not be
412    /// built.
413    ///
414    /// # Errors
415    ///
416    /// 1. A pipe is "open-ended", meaning there's no stage consuming values from that pipe.
417    /// 2. The reader of a pipe was used more than once.
418    ///
419    pub fn build(self) -> Result<Pipeline, String> {
420        let configs = self.create_pipe_configs()?;
421
422        // Register pipes in the synchronizer
423        let mut synchronizer = Synchronizer::default();
424        self.register_pipe_configs(&mut synchronizer, &configs);
425
426        // Make the synchronizer immutable and the create the actual pipes
427        let synchronizer = Arc::new(synchronizer);
428        let pipes_map = self.create_pipes(&synchronizer, configs);
429
430        let CreateWorkersOutput {
431            producers,
432            workers,
433            signal_txs,
434        } = self.create_workers(pipes_map)?;
435
436        Ok(Pipeline {
437            synchronizer,
438            producers,
439            workers,
440            signal_txs,
441        })
442    }
443
444    /// Create the producers, workers, and the associated signal channels.
445    fn create_workers(
446        self,
447        mut pipes_map: HashMap<String, Pipe<BoxedAnySend>>,
448    ) -> Result<CreateWorkersOutput, String> {
449        let mut producers = JoinSet::new();
450        let mut workers = JoinSet::new();
451        let mut signal_txs = Vec::new();
452
453        let mut worker_args =
454            |pipe_names: TaskPipeNames, pipes: &mut HashMap<String, Pipe<BoxedAnySend>>| {
455                let writers = find_writers(&pipe_names.writers, pipes)?;
456                let reader = find_reader(&pipe_names.reader, pipes)?;
457
458                let (signal_tx, signal_rx) = tokio::sync::mpsc::channel(1);
459                signal_txs.push(signal_tx);
460
461                Result::<_, String>::Ok((reader, writers, signal_rx))
462            };
463
464        let mut has_producer = false;
465        for stage in self.stages {
466            match stage {
467                Stage::Producer { function, pipes } => {
468                    let writers = find_writers(&pipes.writers, &pipes_map)?;
469                    has_producer = true;
470                    producers.spawn(new_detached_producer(function, writers));
471                }
472
473                Stage::Regular {
474                    function,
475                    pipes,
476                    options,
477                } => {
478                    let (reader, writers, signal_rx) = worker_args(pipes, &mut pipes_map)?;
479                    workers.spawn(new_detached_worker(
480                        function,
481                        reader,
482                        writers,
483                        signal_rx,
484                        options.try_into()?,
485                    ));
486                }
487
488                Stage::Iterator {
489                    stage_type,
490                    caster,
491                    pipes,
492                    options,
493                } => {
494                    let (reader, writers, signal_rx) = worker_args(pipes, &mut pipes_map)?;
495                    workers.spawn(match stage_type {
496                        IterStageType::Flatten => new_detached_flattener(
497                            caster,
498                            reader,
499                            writers,
500                            signal_rx,
501                            options.try_into()?,
502                        ),
503                    });
504                }
505            }
506        }
507
508        if !has_producer {
509            return Err("pipeline must have at least one producer".to_string());
510        }
511
512        Ok(CreateWorkersOutput {
513            producers,
514            workers,
515            signal_txs,
516        })
517    }
518
519    /// Construct the configuration of all the pipes.
520    ///
521    /// We can achieve a set of unique names easily by using the `reader` of the "task" and "iter"
522    /// stages and de-duplicating that list.
523    ///
524    /// For each pipe, there is only one reader, but there can be multiple writers.
525    /// Also, producers do not have a reader, only writers.
526    fn create_pipe_configs(&self) -> Result<Vec<PipeConfig>, String> {
527        let mut configs = Vec::new();
528        for stage in &self.stages {
529            match stage {
530                Stage::Iterator { pipes, options, .. } => {
531                    let mut options: ValidWorkerOptions = options.clone().try_into()?;
532                    options.unbounded_buffer = true;
533
534                    configs.push(PipeConfig {
535                        name: pipes.reader.clone(),
536                        options,
537                    });
538                }
539                Stage::Regular { pipes, options, .. } => {
540                    configs.push(PipeConfig {
541                        name: pipes.reader.clone(),
542                        options: options.clone().try_into()?,
543                    });
544                }
545
546                _ => {}
547            }
548        }
549
550        Ok(configs
551            .into_iter()
552            .unique_by(|conf| conf.name.clone())
553            .collect())
554    }
555
556    /// Register all the pipe configurations to the synchronizer.
557    ///
558    /// This is done before creating the pipes as the synchronizer only needs to be mutable
559    /// for this step, afterward it can be considered "immutable".
560    fn register_pipe_configs(&self, sync: &mut Synchronizer, pipe_configs: &[PipeConfig]) {
561        for conf in pipe_configs {
562            sync.register(&conf.name);
563        }
564    }
565
566    /// The reader of a pipe is wrapped in an option to allow the build method to [Option::take] it
567    /// to maintain the invariant that there's only one reader per pipe.
568    fn create_pipes(
569        &self,
570        sync: &Arc<Synchronizer>,
571        configs: Vec<PipeConfig>,
572    ) -> HashMap<String, Pipe<BoxedAnySend>> {
573        configs
574            .into_iter()
575            .map(|conf| {
576                let (tx, rx): (VarSender<BoxedAnySend>, VarReceiver<BoxedAnySend>) =
577                    if conf.options.unbounded_buffer {
578                        let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
579                        (tx.into(), rx.into())
580                    } else {
581                        let buf_size = conf.options.reader_buffer_size.get();
582                        let (tx, rx) = tokio::sync::mpsc::channel(buf_size);
583                        (tx.into(), rx.into())
584                    };
585
586                let pipe = Pipe {
587                    writer: PipeWriter::new(conf.name.clone(), sync.clone(), tx),
588                    reader: Some(PipeReader::new(conf.name.clone(), sync.clone(), rx)),
589                };
590                (conf.name, pipe)
591            })
592            .collect()
593    }
594}