async_pipes/pipeline/builder.rs
1use std::collections::HashMap;
2use std::future::Future;
3use std::sync::Arc;
4
5use itertools::Itertools;
6use tokio::task::JoinSet;
7
8use crate::pipeline::io::*;
9use crate::pipeline::sync::*;
10use crate::pipeline::workers::{
11 new_detached_flattener, new_detached_producer, new_detached_worker,
12};
13use crate::pipeline::*;
14
15struct CreateWorkersOutput {
16 producers: JoinSet<()>,
17 workers: JoinSet<()>,
18 signal_txs: Vec<Sender<StageWorkerSignal>>,
19}
20
21/// Used to construct a [Pipeline].
22///
23/// Can be created using [Pipeline::builder].
24///
25///
26#[derive(Default)]
27pub struct PipelineBuilder {
28 stages: Vec<Stage>,
29}
30
31impl PipelineBuilder {
32 /// A "producer" stage; registers a list of inputs to be written to a provided pipe.
33 ///
34 /// The string provided to `pipe` defines where the values will be written to.
35 /// The values will be written one at a time into the pipe.
36 ///
37 /// # Returns
38 ///
39 /// This pipeline builder.
40 ///
41 pub fn with_inputs<S, I>(self, pipe: S, inputs: Vec<I>) -> Self
42 where
43 S: AsRef<str>,
44 I: Send + 'static,
45 {
46 self.with_branching_inputs(
47 vec![pipe],
48 inputs
49 .into_iter()
50 .map(|i| vec![Box::new(i) as BoxedAnySend])
51 .collect(),
52 )
53 }
54
55 /// A "producer" stage; registers a list of multiple inputs to be written to a list of
56 /// corresponding pipes.
57 ///
58 /// The strings provided to `pipes` define where each input will go.
59 /// The values will be written one at a time into each pipe.
60 ///
61 /// For example, say you have the following:
62 ///
63 /// ```text
64 /// List of multiple inputs: [ [1, "hi", true], [2, "bye", false], [3, ".", false] ]
65 /// List of pipes: [ "numbers", "strings", "bools" ]
66 /// ```
67 ///
68 /// The inputs would be sent to the pipes like this:
69 ///
70 /// ```text
71 /// Pipe 1st 2nd 3rd
72 /// "numbers" <- 1 2 3
73 /// "strings" <- "hi" "bye" "."
74 /// "bools" <- true false false
75 /// ```
76 ///
77 /// # Returns
78 ///
79 /// This pipeline builder.
80 ///
81 pub fn with_branching_inputs<S>(self, pipes: Vec<S>, inputs: Vec<Vec<BoxedAnySend>>) -> Self
82 where
83 S: AsRef<str>,
84 {
85 let mut iter = inputs.into_iter();
86 self.with_branching_producer(pipes, move || {
87 let inputs = iter.next();
88 async move { inputs.map(|is| is.into_iter().map(Some).collect()) }
89 })
90 }
91
92 /// A "producer" stage; registers a stage that produces values and writes them into a pipe.
93 ///
94 /// The strings provided to `pipes` define where each input will go.
95 ///
96 /// The producer will continue producing values while the user-provided task function returns
97 /// [Some]. This means that it is possible to create an infinite stream of values by simply
98 /// never returning [None].
99 ///
100 /// # Returns
101 ///
102 /// This pipeline builder.
103 ///
104 pub fn with_producer<S, I, F, Fut>(self, pipe: S, mut task: F) -> Self
105 where
106 S: AsRef<str>,
107 I: Send + 'static,
108 F: FnMut() -> Fut + Send + 'static,
109 Fut: Future<Output = Option<I>> + Send + 'static,
110 {
111 self.with_branching_producer(vec![pipe], move || {
112 let task_fut = task();
113 async move {
114 task_fut
115 .await
116 .map(|t| vec![Some(Box::new(t) as BoxedAnySend)])
117 }
118 })
119 }
120
121 /// A "producer" stage; registers a new stage that produces multiple values and writes them into
122 /// their respective pipe.
123 ///
124 /// The strings provided to `pipes` define where each input will go.
125 ///
126 /// The producer will continue producing values while the user-provided task function returns
127 /// [Some]. This means that it is possible to create an infinite stream of values by simply
128 /// never returning [None].
129 ///
130 /// Each individual [Option] within the task output determines whether it will be sent to the
131 /// corresponding pipe. If [Some] is specified, the inner value will be sent, if [None] is
132 /// specified, nothing will be sent.
133 ///
134 /// As with all stages that have more than one ("branching") outputs, it's possible that each
135 /// output could have a different type, and so to avoid large binary sizes from static
136 /// dispatching, dynamic dispatching is used instead, utilizing the [BoxedAnySend] type. For
137 /// examples on how to return these types of values in task functions, see [BoxedAnySend]'s
138 /// examples.
139 ///
140 /// # Returns
141 ///
142 /// This pipeline builder.
143 ///
144 pub fn with_branching_producer<S, F, Fut>(mut self, pipes: Vec<S>, mut task: F) -> Self
145 where
146 S: AsRef<str>,
147 F: FnMut() -> Fut + Send + 'static,
148 Fut: Future<Output = Option<Vec<Option<BoxedAnySend>>>> + Send + 'static,
149 {
150 let pipes = pipes.iter().map(|p| p.as_ref().to_string()).collect();
151 self.stages.push(Stage::Producer {
152 function: Box::new(move || Box::pin(task())),
153 pipes: ProducerPipeNames { writers: pipes },
154 });
155 self
156 }
157
158 /// A "consumer" stage; registers a new stage that consumes values from a pipe.
159 ///
160 /// The string provided to `pipe` define where values will be read from.
161 ///
162 /// The consumer will continue consuming values until the pipeline is terminated or the pipe it
163 /// is receiving from is closed.
164 ///
165 /// # Returns
166 ///
167 /// This pipeline builder.
168 ///
169 pub fn with_consumer<S, I, F, Fut>(self, pipe: S, options: WorkerOptions, task: F) -> Self
170 where
171 S: AsRef<str>,
172 I: Send + 'static,
173 F: Fn(I) -> Fut + Send + Sync + 'static,
174 Fut: Future<Output = ()> + Send + 'static,
175 {
176 self.with_branching_stage(pipe, Vec::<String>::new(), options, move |value| {
177 let task_fut = task(value);
178 async move {
179 task_fut.await;
180 Some(Vec::<Option<BoxedAnySend>>::new())
181 }
182 })
183 }
184
185 /// A "regular" stage; registers a new stage that operates on an input and produce a single
186 /// output value that is written to a pipe.
187 ///
188 /// The string provided to `input_pipe` defines where values will be read from.
189 /// The string provided to `output_pipe` defines where the produced output will go.
190 ///
191 /// The worker will continue working on input values until the pipeline is terminated or the
192 /// pipe it is receiving from is closed.
193 ///
194 /// The [Option] returned by the task function determines whether it will be sent to the output
195 /// pipe. If [Some] is specified, the inner value will be sent, if [None] is specified, nothing
196 /// will be sent.
197 ///
198 /// # Returns
199 ///
200 /// This pipeline builder.
201 ///
202 pub fn with_stage<I, O, F, Fut>(
203 self,
204 input_pipe: impl AsRef<str>,
205 output_pipe: impl AsRef<str>,
206 options: WorkerOptions,
207 task: F,
208 ) -> Self
209 where
210 I: Send + 'static,
211 O: Send + 'static,
212 F: Fn(I) -> Fut + Send + Sync + 'static,
213 Fut: Future<Output = Option<O>> + Send + 'static,
214 {
215 self.with_branching_stage(input_pipe, vec![output_pipe], options, move |value| {
216 let task_fut = task(value);
217 async move {
218 task_fut
219 .await
220 .map(|v| Box::new(v) as BoxedAnySend)
221 .map(|v| vec![Some(v)])
222 }
223 })
224 }
225
226 /// A "regular" stage; registers a new stage that operates on an input and produces multiple
227 /// values that are written into their respective pipe.
228 ///
229 /// The string provided to `input_pipe` defines where values will be read from.
230 /// The strings provided to `output_pipes` define where each produced output will go.
231 ///
232 /// The worker will continue working on input values until the pipeline is terminated or the
233 /// pipe it is receiving from is closed.
234 ///
235 /// * If the user-defined task function returns [None], nothing will be done.
236 /// * If it returns [Some], the inner value ([`Vec<Option<BoxedAnySend>>`]) will have the
237 /// following applied to each output option:
238 /// * If [Some] is specified, the inner value will be sent to the corresponding pipe.
239 /// * If [None] is specified, nothing will be sent.
240 ///
241 /// As with all stages that have more than one ("branching") outputs, it's possible that each
242 /// output could have a different type, and so to avoid large binary sizes from static
243 /// dispatching, dynamic dispatching is used instead, utilizing the [BoxedAnySend] type. For
244 /// examples on how to return these types of values in task functions, see [BoxedAnySend]'s
245 /// examples.
246 ///
247 /// # Returns
248 ///
249 /// This pipeline builder.
250 ///
251 pub fn with_branching_stage<I, F, Fut>(
252 mut self,
253 input_pipe: impl AsRef<str>,
254 output_pipes: Vec<impl AsRef<str>>,
255 options: WorkerOptions,
256 task: F,
257 ) -> Self
258 where
259 I: Send + 'static,
260 F: Fn(I) -> Fut + Send + Sync + 'static,
261 Fut: Future<Output = Option<Vec<Option<BoxedAnySend>>>> + Send + 'static,
262 {
263 let input_pipe = input_pipe.as_ref().to_string();
264 let output_pipes = output_pipes
265 .iter()
266 .map(|p| p.as_ref().to_string())
267 .collect();
268 let err_pipe = input_pipe.clone();
269
270 self.stages.push(Stage::Regular {
271 function: Box::new(move |value: BoxedAnySend| {
272 let value = downcast_from_pipe(value, &err_pipe);
273 Box::pin(task(*value))
274 }),
275 pipes: TaskPipeNames {
276 reader: input_pipe,
277 writers: output_pipes,
278 },
279 options,
280 });
281 self
282 }
283
284 /// An "iterator-based" stage; registers a new stage that takes the data from one pipe and
285 /// "flattens" it, feeding the results into another.
286 ///
287 /// This is useful if you have a pipe that produces a list of values in a single task execution,
288 /// but you want to use it as input to another stage that takes only the individual values.
289 ///
290 /// The generic parameter is used by the pipeline builder to know what concrete type to
291 /// cast the value to, which mean turbofish syntax will be needed to specify what the iterator
292 /// type of that pipe is, for example:
293 /// `Pipeline::builder().with_flattener::<Vec<u8>>("data", "bytes")`
294 ///
295 /// The string provided to `from_pipe` defines where the iterator of values will be read from.
296 /// The string provided to `to_pipe` defines where the individual values will go.
297 ///
298 /// The worker will continue working until the pipeline is terminated or the pipe it is
299 /// receiving from is closed.
300 ///
301 /// # Examples
302 ///
303 /// ```
304 /// use std::sync::Arc;
305 /// use std::sync::atomic::{AtomicI32, Ordering};
306 /// use async_pipes::Pipeline;
307 /// use async_pipes::WorkerOptions;
308 ///
309 /// #[tokio::main]
310 /// async fn main() {
311 /// let sum = Arc::new(AtomicI32::new(0));
312 /// let task_sum = sum.clone();
313 ///
314 /// Pipeline::builder()
315 /// .with_inputs("NumberSets", vec![vec![1, 2], vec![3, 4, 5]])
316 /// .with_flattener::<Vec<i32>>("NumberSets", "Numbers")
317 /// .with_consumer("Numbers", WorkerOptions::default_single_task(), move |value: i32| {
318 /// let sum = task_sum.clone();
319 /// async move {
320 /// sum.fetch_add(value, Ordering::SeqCst);
321 /// }
322 /// })
323 /// .build()
324 /// .expect("failed to build pipeline")
325 /// .wait()
326 /// .await;
327 ///
328 /// assert_eq!(sum.load(Ordering::Acquire), 15);
329 /// }
330 /// ```
331 ///
332 /// # Returns
333 ///
334 /// This pipeline builder.
335 ///
336 pub fn with_flattener<It>(
337 mut self,
338 from_pipe: impl AsRef<str>,
339 to_pipe: impl AsRef<str>,
340 ) -> Self
341 where
342 It: IntoIterator + Send + 'static,
343 It::Item: Send,
344 {
345 let input_pipe = from_pipe.as_ref().to_string();
346 let output_pipe = to_pipe.as_ref().to_string();
347 let err_pipe = input_pipe.clone();
348
349 self.stages.push(Stage::Iterator {
350 stage_type: IterStageType::Flatten,
351 caster: Box::new(move |value: BoxedAnySend| {
352 downcast_from_pipe::<It>(value, &err_pipe)
353 .into_iter()
354 .map(|v| Box::new(v) as BoxedAnySend)
355 .collect()
356 }),
357 pipes: TaskPipeNames {
358 reader: input_pipe,
359 writers: vec![output_pipe],
360 },
361 options: WorkerOptions::default_single_task(),
362 });
363 self
364 }
365
366 /// A utility function for improving readability when building pipelines.
367 ///
368 /// This makes splitting up task definitions into functions easier by allowing the caller
369 /// to pass in a function that takes `self` and returns `self`, effectively providing
370 /// continuity to a call chain.
371 ///
372 /// # Examples
373 ///
374 /// ```
375 /// use std::sync::Arc;
376 /// use std::sync::atomic::{AtomicI32, Ordering};
377 /// use async_pipes::Pipeline;
378 /// use async_pipes::PipelineBuilder;
379 ///
380 /// #[tokio::main]
381 /// async fn main() {
382 /// let sum = Arc::new(AtomicI32::new(0));
383 ///
384 /// Pipeline::builder()
385 /// .with_inputs("Numbers", vec![1, 2, 3, 4, 5])
386 /// .also(build_consumer(sum.clone()))
387 /// .build()
388 /// .expect("failed to build pipeline")
389 /// .wait()
390 /// .await;
391 ///
392 /// assert_eq!(sum.load(Ordering::Acquire), 15);
393 /// }
394 ///
395 /// fn build_consumer(sum: Arc<AtomicI32>) -> impl FnOnce(PipelineBuilder) -> PipelineBuilder {
396 /// use async_pipes::WorkerOptions;
397 /// |builder| builder.with_consumer("Numbers", WorkerOptions::default(), move |value: i32| {
398 /// let sum = sum.clone();
399 /// async move {
400 /// sum.fetch_add(value, Ordering::SeqCst);
401 /// }
402 /// })
403 /// }
404 /// ```
405 ///
406 pub fn also(self, handler: impl FnOnce(PipelineBuilder) -> PipelineBuilder) -> Self {
407 handler(self)
408 }
409
410 /// When the pipeline is ready to be built, this is called and will return a pipeline if
411 /// it was successfully built, otherwise it will return an error describing why it could not be
412 /// built.
413 ///
414 /// # Errors
415 ///
416 /// 1. A pipe is "open-ended", meaning there's no stage consuming values from that pipe.
417 /// 2. The reader of a pipe was used more than once.
418 ///
419 pub fn build(self) -> Result<Pipeline, String> {
420 let configs = self.create_pipe_configs()?;
421
422 // Register pipes in the synchronizer
423 let mut synchronizer = Synchronizer::default();
424 self.register_pipe_configs(&mut synchronizer, &configs);
425
426 // Make the synchronizer immutable and the create the actual pipes
427 let synchronizer = Arc::new(synchronizer);
428 let pipes_map = self.create_pipes(&synchronizer, configs);
429
430 let CreateWorkersOutput {
431 producers,
432 workers,
433 signal_txs,
434 } = self.create_workers(pipes_map)?;
435
436 Ok(Pipeline {
437 synchronizer,
438 producers,
439 workers,
440 signal_txs,
441 })
442 }
443
444 /// Create the producers, workers, and the associated signal channels.
445 fn create_workers(
446 self,
447 mut pipes_map: HashMap<String, Pipe<BoxedAnySend>>,
448 ) -> Result<CreateWorkersOutput, String> {
449 let mut producers = JoinSet::new();
450 let mut workers = JoinSet::new();
451 let mut signal_txs = Vec::new();
452
453 let mut worker_args =
454 |pipe_names: TaskPipeNames, pipes: &mut HashMap<String, Pipe<BoxedAnySend>>| {
455 let writers = find_writers(&pipe_names.writers, pipes)?;
456 let reader = find_reader(&pipe_names.reader, pipes)?;
457
458 let (signal_tx, signal_rx) = tokio::sync::mpsc::channel(1);
459 signal_txs.push(signal_tx);
460
461 Result::<_, String>::Ok((reader, writers, signal_rx))
462 };
463
464 let mut has_producer = false;
465 for stage in self.stages {
466 match stage {
467 Stage::Producer { function, pipes } => {
468 let writers = find_writers(&pipes.writers, &pipes_map)?;
469 has_producer = true;
470 producers.spawn(new_detached_producer(function, writers));
471 }
472
473 Stage::Regular {
474 function,
475 pipes,
476 options,
477 } => {
478 let (reader, writers, signal_rx) = worker_args(pipes, &mut pipes_map)?;
479 workers.spawn(new_detached_worker(
480 function,
481 reader,
482 writers,
483 signal_rx,
484 options.try_into()?,
485 ));
486 }
487
488 Stage::Iterator {
489 stage_type,
490 caster,
491 pipes,
492 options,
493 } => {
494 let (reader, writers, signal_rx) = worker_args(pipes, &mut pipes_map)?;
495 workers.spawn(match stage_type {
496 IterStageType::Flatten => new_detached_flattener(
497 caster,
498 reader,
499 writers,
500 signal_rx,
501 options.try_into()?,
502 ),
503 });
504 }
505 }
506 }
507
508 if !has_producer {
509 return Err("pipeline must have at least one producer".to_string());
510 }
511
512 Ok(CreateWorkersOutput {
513 producers,
514 workers,
515 signal_txs,
516 })
517 }
518
519 /// Construct the configuration of all the pipes.
520 ///
521 /// We can achieve a set of unique names easily by using the `reader` of the "task" and "iter"
522 /// stages and de-duplicating that list.
523 ///
524 /// For each pipe, there is only one reader, but there can be multiple writers.
525 /// Also, producers do not have a reader, only writers.
526 fn create_pipe_configs(&self) -> Result<Vec<PipeConfig>, String> {
527 let mut configs = Vec::new();
528 for stage in &self.stages {
529 match stage {
530 Stage::Iterator { pipes, options, .. } => {
531 let mut options: ValidWorkerOptions = options.clone().try_into()?;
532 options.unbounded_buffer = true;
533
534 configs.push(PipeConfig {
535 name: pipes.reader.clone(),
536 options,
537 });
538 }
539 Stage::Regular { pipes, options, .. } => {
540 configs.push(PipeConfig {
541 name: pipes.reader.clone(),
542 options: options.clone().try_into()?,
543 });
544 }
545
546 _ => {}
547 }
548 }
549
550 Ok(configs
551 .into_iter()
552 .unique_by(|conf| conf.name.clone())
553 .collect())
554 }
555
556 /// Register all the pipe configurations to the synchronizer.
557 ///
558 /// This is done before creating the pipes as the synchronizer only needs to be mutable
559 /// for this step, afterward it can be considered "immutable".
560 fn register_pipe_configs(&self, sync: &mut Synchronizer, pipe_configs: &[PipeConfig]) {
561 for conf in pipe_configs {
562 sync.register(&conf.name);
563 }
564 }
565
566 /// The reader of a pipe is wrapped in an option to allow the build method to [Option::take] it
567 /// to maintain the invariant that there's only one reader per pipe.
568 fn create_pipes(
569 &self,
570 sync: &Arc<Synchronizer>,
571 configs: Vec<PipeConfig>,
572 ) -> HashMap<String, Pipe<BoxedAnySend>> {
573 configs
574 .into_iter()
575 .map(|conf| {
576 let (tx, rx): (VarSender<BoxedAnySend>, VarReceiver<BoxedAnySend>) =
577 if conf.options.unbounded_buffer {
578 let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
579 (tx.into(), rx.into())
580 } else {
581 let buf_size = conf.options.reader_buffer_size.get();
582 let (tx, rx) = tokio::sync::mpsc::channel(buf_size);
583 (tx.into(), rx.into())
584 };
585
586 let pipe = Pipe {
587 writer: PipeWriter::new(conf.name.clone(), sync.clone(), tx),
588 reader: Some(PipeReader::new(conf.name.clone(), sync.clone(), rx)),
589 };
590 (conf.name, pipe)
591 })
592 .collect()
593 }
594}