1use async_trait::async_trait;
4use serde::Serialize;
5use std::sync::Arc;
6use thiserror::Error;
7
8use crate::recorder::{NoopRecorder, Recorder, RunId, RunStatus, StepStatus};
9use crate::retry::RetryPolicy;
10use crate::step::{Step, StepError};
11
12type ForkPredicate<O> = Arc<dyn Fn(&O) -> bool + Send + Sync>;
14
15type SpawnGenerator<O> = Arc<dyn Fn(&O) -> Vec<serde_json::Value> + Send + Sync>;
17
18use std::collections::HashMap;
19
20#[derive(Default, Clone, Debug, Serialize)]
24pub struct Metadata {
25 pub description: Option<String>,
27 pub tags: HashMap<String, String>,
29}
30
31impl Metadata {
32 pub fn new() -> Self {
34 Self::default()
35 }
36
37 pub fn with_description(mut self, description: &str) -> Self {
39 self.description = Some(description.to_string());
40 self
41 }
42
43 pub fn with_tag(mut self, key: &str, value: &str) -> Self {
45 self.tags.insert(key.to_string(), value.to_string());
46 self
47 }
48}
49
50#[derive(Clone)]
52pub enum SpawnRule<O> {
53 Fork {
55 target: &'static str,
56 predicate: ForkPredicate<O>,
57 metadata: Metadata,
58 },
59 FanOut {
61 targets: Vec<&'static str>,
62 metadata: Metadata,
63 },
64 Dynamic {
66 target: &'static str,
67 generator: SpawnGenerator<O>,
68 metadata: Metadata,
69 },
70}
71
72#[derive(Debug, Clone, Serialize)]
74pub struct PipelineGraph {
75 pub name: String,
76 pub steps: Vec<StepNode>,
77 pub forks: Vec<ForkNode>,
78 pub fan_outs: Vec<FanOutNode>,
79 pub emits: Vec<EmitNode>,
80}
81
82#[derive(Debug, Clone, Serialize)]
84pub struct StepNode {
85 pub name: String,
86 pub index: usize,
87 #[serde(flatten)]
88 pub metadata: Metadata,
89}
90
91#[derive(Debug, Clone, Serialize)]
93pub struct ForkNode {
94 pub target_pipeline: String,
95 pub condition: String,
96 #[serde(flatten)]
97 pub metadata: Metadata,
98}
99
100#[derive(Debug, Clone, Serialize)]
102pub struct FanOutNode {
103 pub targets: Vec<String>,
104 #[serde(flatten)]
105 pub metadata: Metadata,
106}
107
108#[derive(Debug, Clone, Serialize)]
110pub struct EmitNode {
111 pub target_pipeline: String,
112 #[serde(flatten)]
113 pub metadata: Metadata,
114}
115
116#[derive(Error, Debug)]
118pub enum PipelineError {
119 #[error("step '{step}' failed: {source}")]
121 StepFailed {
122 step: &'static str,
123 #[source]
124 source: anyhow::Error,
125 },
126
127 #[error("step '{step}' exhausted {attempts} retries: {source}")]
129 RetriesExhausted {
130 step: &'static str,
131 attempts: u32,
132 #[source]
133 source: anyhow::Error,
134 },
135
136 #[error("recorder error: {0}")]
138 RecorderError(#[from] anyhow::Error),
139}
140
141pub trait HasEntityId {
143 fn entity_id(&self) -> String;
145}
146
147impl HasEntityId for String {
149 fn entity_id(&self) -> String {
150 self.clone()
151 }
152}
153
154impl HasEntityId for &str {
156 fn entity_id(&self) -> String {
157 self.to_string()
158 }
159}
160
161#[doc(hidden)]
163#[async_trait]
164pub trait BoxedStep<I, O>: Send + Sync {
165 fn name(&self) -> &'static str;
166 async fn execute(&self, input: I) -> Result<O, StepError>;
167}
168
169#[doc(hidden)]
171pub struct StepWrapper<S>(pub S);
172
173#[async_trait]
174impl<S> BoxedStep<S::Input, S::Output> for StepWrapper<S>
175where
176 S: Step,
177{
178 fn name(&self) -> &'static str {
179 self.0.name()
180 }
181
182 async fn execute(&self, input: S::Input) -> Result<S::Output, StepError> {
183 self.0.execute(input).await
184 }
185}
186
187#[doc(hidden)]
189#[async_trait]
190pub trait StepChain<I, O>: Send + Sync {
191 async fn run(
192 &self,
193 input: I,
194 run_id: RunId,
195 recorder: &dyn Recorder,
196 retry_policy: &RetryPolicy,
197 start_index: u32,
198 ) -> Result<O, PipelineError>;
199
200 fn step_count(&self) -> u32;
202
203 fn collect_step_names(&self, names: &mut Vec<&'static str>);
205}
206
207#[doc(hidden)]
209pub struct Identity;
210
211#[async_trait]
212impl<T: Send + 'static> StepChain<T, T> for Identity {
213 async fn run(
214 &self,
215 input: T,
216 _run_id: RunId,
217 _recorder: &dyn Recorder,
218 _retry_policy: &RetryPolicy,
219 _start_index: u32,
220 ) -> Result<T, PipelineError> {
221 Ok(input)
222 }
223
224 fn step_count(&self) -> u32 {
225 0
226 }
227
228 fn collect_step_names(&self, _names: &mut Vec<&'static str>) {}
229}
230
231#[doc(hidden)]
233pub struct ChainedStep<S, Next, I, M, O>
234where
235 S: BoxedStep<I, M>,
236 Next: StepChain<M, O>,
237{
238 pub step: S,
239 pub next: Next,
240 pub _phantom: std::marker::PhantomData<(I, M, O)>,
241}
242
243#[async_trait]
244impl<S, Next, I, M, O> StepChain<I, O> for ChainedStep<S, Next, I, M, O>
245where
246 I: Send + Sync + Clone + 'static,
247 M: Send + Sync + 'static,
248 O: Send + Sync + 'static,
249 S: BoxedStep<I, M> + Send + Sync,
250 Next: StepChain<M, O> + Send + Sync,
251{
252 async fn run(
253 &self,
254 input: I,
255 run_id: RunId,
256 recorder: &dyn Recorder,
257 retry_policy: &RetryPolicy,
258 start_index: u32,
259 ) -> Result<O, PipelineError> {
260 let step_name = self.step.name();
261 let step_id = recorder.start_step(run_id, step_name, start_index).await?;
262
263 let mut attempt = 0u32;
265 let output = loop {
266 attempt += 1;
267 match self.step.execute(input.clone()).await {
268 Ok(output) => break output,
269 Err(StepError::Permanent(e)) => {
270 recorder
271 .complete_step(
272 step_id,
273 StepStatus::Failed {
274 error: e.to_string(),
275 attempt,
276 },
277 )
278 .await?;
279 return Err(PipelineError::StepFailed {
280 step: step_name,
281 source: e,
282 });
283 }
284 Err(StepError::Retryable(e)) => {
285 if let Some(delay) = retry_policy.delay_for_attempt(attempt) {
286 tokio::time::sleep(delay).await;
287 } else {
288 recorder
289 .complete_step(
290 step_id,
291 StepStatus::Failed {
292 error: e.to_string(),
293 attempt,
294 },
295 )
296 .await?;
297 return Err(PipelineError::RetriesExhausted {
298 step: step_name,
299 attempts: attempt,
300 source: e,
301 });
302 }
303 }
304 }
305 };
306
307 recorder
308 .complete_step(step_id, StepStatus::Completed)
309 .await?;
310
311 self.next
313 .run(output, run_id, recorder, retry_policy, start_index + 1)
314 .await
315 }
316
317 fn step_count(&self) -> u32 {
318 1 + self.next.step_count()
319 }
320
321 fn collect_step_names(&self, names: &mut Vec<&'static str>) {
322 names.push(self.step.name());
323 self.next.collect_step_names(names);
324 }
325}
326
327pub struct Pipeline<I, O, Chain>
329where
330 Chain: StepChain<I, O>,
331{
332 name: &'static str,
333 chain: Chain,
334 retry_policy: RetryPolicy,
335 recorder: Arc<dyn Recorder>,
336 spawn_rules: Vec<SpawnRule<O>>,
337 step_metadata: Vec<Metadata>,
338 _phantom: std::marker::PhantomData<(I, O)>,
339}
340
341impl Pipeline<(), (), Identity> {
342 pub fn new(name: &'static str) -> Self {
344 Self {
345 name,
346 chain: Identity,
347 retry_policy: RetryPolicy::default(),
348 recorder: Arc::new(NoopRecorder),
349 spawn_rules: Vec::new(),
350 step_metadata: Vec::new(),
351 _phantom: std::marker::PhantomData,
352 }
353 }
354}
355
356impl<O, Chain> Pipeline<(), O, Chain>
357where
358 Chain: StepChain<(), O> + Send + Sync + 'static,
359 O: Send + 'static,
360{
361 #[allow(clippy::type_complexity)]
363 pub fn start_with<S>(
364 self,
365 step: S,
366 ) -> StepBuilder<
367 S::Input,
368 S::Output,
369 ChainedStep<StepWrapper<S>, Identity, S::Input, S::Output, S::Output>,
370 >
371 where
372 S: Step + 'static,
373 {
374 let pipeline = Pipeline {
375 name: self.name,
376 chain: ChainedStep {
377 step: StepWrapper(step),
378 next: Identity,
379 _phantom: std::marker::PhantomData,
380 },
381 retry_policy: self.retry_policy,
382 recorder: self.recorder,
383 spawn_rules: Vec::new(),
384 step_metadata: vec![Metadata::default()],
385 _phantom: std::marker::PhantomData,
386 };
387 StepBuilder {
388 pipeline,
389 step_index: 0,
390 }
391 }
392}
393
394impl<I, O, Chain> Pipeline<I, O, Chain>
395where
396 I: Send + Sync + Clone + 'static,
397 O: Send + Sync + Clone + 'static,
398 Chain: StepChain<I, O> + Send + Sync + 'static,
399{
400 pub fn then<S>(self, step: S) -> Pipeline<I, S::Output, impl StepChain<I, S::Output>>
402 where
403 S: Step<Input = O> + 'static,
404 {
405 Pipeline {
406 name: self.name,
407 chain: ThenChain {
408 first: self.chain,
409 step: StepWrapper(step),
410 _phantom: std::marker::PhantomData,
411 },
412 retry_policy: self.retry_policy,
413 recorder: self.recorder,
414 spawn_rules: Vec::new(),
415 step_metadata: self.step_metadata,
416 _phantom: std::marker::PhantomData,
417 }
418 }
419
420 pub fn with_retry(mut self, policy: RetryPolicy) -> Self {
422 self.retry_policy = policy;
423 self
424 }
425
426 pub fn with_recorder<R: Recorder + 'static>(mut self, recorder: R) -> Self {
428 self.recorder = Arc::new(recorder);
429 self
430 }
431
432 pub fn fork_when<F>(mut self, predicate: F, target: &'static str) -> Self
437 where
438 F: Fn(&O) -> bool + Send + Sync + 'static,
439 {
440 self.spawn_rules.push(SpawnRule::Fork {
441 target,
442 predicate: Arc::new(predicate),
443 metadata: Metadata::default(),
444 });
445 self
446 }
447
448 pub fn fan_out(mut self, targets: &[&'static str]) -> Self {
452 self.spawn_rules.push(SpawnRule::FanOut {
453 targets: targets.to_vec(),
454 metadata: Metadata::default(),
455 });
456 self
457 }
458
459 pub fn build(self) -> BuiltPipeline<I, O, Chain> {
461 BuiltPipeline {
462 name: self.name,
463 chain: self.chain,
464 retry_policy: self.retry_policy,
465 recorder: self.recorder,
466 spawn_rules: self.spawn_rules,
467 step_metadata: self.step_metadata,
468 _phantom: std::marker::PhantomData,
469 }
470 }
471}
472
473pub struct StepBuilder<I, O, Chain>
475where
476 Chain: StepChain<I, O>,
477{
478 pipeline: Pipeline<I, O, Chain>,
479 step_index: usize,
480}
481
482impl<I, O, Chain> StepBuilder<I, O, Chain>
483where
484 I: Send + Sync + Clone + 'static,
485 O: Send + Sync + Clone + 'static,
486 Chain: StepChain<I, O> + Send + Sync + 'static,
487{
488 pub fn desc(mut self, description: &str) -> Self {
490 if let Some(meta) = self.pipeline.step_metadata.get_mut(self.step_index) {
491 meta.description = Some(description.to_string());
492 }
493 self
494 }
495
496 pub fn tag(mut self, key: &str, value: &str) -> Self {
498 if let Some(meta) = self.pipeline.step_metadata.get_mut(self.step_index) {
499 meta.tags.insert(key.to_string(), value.to_string());
500 }
501 self
502 }
503
504 pub fn then<S>(self, step: S) -> StepBuilder<I, S::Output, impl StepChain<I, S::Output>>
506 where
507 S: Step<Input = O> + 'static,
508 {
509 let mut pipeline = Pipeline {
510 name: self.pipeline.name,
511 chain: ThenChain {
512 first: self.pipeline.chain,
513 step: StepWrapper(step),
514 _phantom: std::marker::PhantomData,
515 },
516 retry_policy: self.pipeline.retry_policy,
517 recorder: self.pipeline.recorder,
518 spawn_rules: Vec::new(),
519 step_metadata: self.pipeline.step_metadata,
520 _phantom: std::marker::PhantomData,
521 };
522 pipeline.step_metadata.push(Metadata::default());
523 let step_index = pipeline.step_metadata.len() - 1;
524 StepBuilder { pipeline, step_index }
525 }
526
527 pub fn with_retry(mut self, policy: RetryPolicy) -> Self {
529 self.pipeline.retry_policy = policy;
530 self
531 }
532
533 pub fn with_recorder<R: Recorder + 'static>(mut self, recorder: R) -> Self {
535 self.pipeline.recorder = Arc::new(recorder);
536 self
537 }
538
539 pub fn fork_when<F>(mut self, predicate: F, target: &'static str) -> ForkBuilder<I, O, Chain>
541 where
542 F: Fn(&O) -> bool + Send + Sync + 'static,
543 {
544 self.pipeline.spawn_rules.push(SpawnRule::Fork {
545 target,
546 predicate: Arc::new(predicate),
547 metadata: Metadata::default(),
548 });
549 let rule_index = self.pipeline.spawn_rules.len() - 1;
550 ForkBuilder {
551 pipeline: self.pipeline,
552 rule_index,
553 }
554 }
555
556 pub fn fan_out(mut self, targets: &[&'static str]) -> FanOutBuilder<I, O, Chain> {
558 self.pipeline.spawn_rules.push(SpawnRule::FanOut {
559 targets: targets.to_vec(),
560 metadata: Metadata::default(),
561 });
562 let rule_index = self.pipeline.spawn_rules.len() - 1;
563 FanOutBuilder {
564 pipeline: self.pipeline,
565 rule_index,
566 }
567 }
568
569 pub fn emit<T, F>(mut self, target: &'static str, generator: F) -> EmitBuilder<I, O, Chain>
571 where
572 T: Serialize + 'static,
573 F: Fn(&O) -> Vec<T> + Send + Sync + 'static,
574 {
575 self.pipeline.spawn_rules.push(SpawnRule::Dynamic {
576 target,
577 generator: Arc::new(move |output| {
578 generator(output)
579 .into_iter()
580 .filter_map(|item| serde_json::to_value(item).ok())
581 .collect()
582 }),
583 metadata: Metadata::default(),
584 });
585 let rule_index = self.pipeline.spawn_rules.len() - 1;
586 EmitBuilder {
587 pipeline: self.pipeline,
588 rule_index,
589 }
590 }
591
592 pub fn build(self) -> BuiltPipeline<I, O, Chain> {
594 BuiltPipeline {
595 name: self.pipeline.name,
596 chain: self.pipeline.chain,
597 retry_policy: self.pipeline.retry_policy,
598 recorder: self.pipeline.recorder,
599 spawn_rules: self.pipeline.spawn_rules,
600 step_metadata: self.pipeline.step_metadata,
601 _phantom: std::marker::PhantomData,
602 }
603 }
604}
605
606pub struct ForkBuilder<I, O, Chain>
608where
609 Chain: StepChain<I, O>,
610{
611 pipeline: Pipeline<I, O, Chain>,
612 rule_index: usize,
613}
614
615impl<I, O, Chain> ForkBuilder<I, O, Chain>
616where
617 I: Send + Sync + Clone + 'static,
618 O: Send + Sync + Clone + 'static,
619 Chain: StepChain<I, O> + Send + Sync + 'static,
620{
621 pub fn desc(mut self, description: &str) -> Self {
623 if let Some(SpawnRule::Fork { metadata, .. }) =
624 self.pipeline.spawn_rules.get_mut(self.rule_index)
625 {
626 metadata.description = Some(description.to_string());
627 }
628 self
629 }
630
631 pub fn tag(mut self, key: &str, value: &str) -> Self {
633 if let Some(SpawnRule::Fork { metadata, .. }) =
634 self.pipeline.spawn_rules.get_mut(self.rule_index)
635 {
636 metadata.tags.insert(key.to_string(), value.to_string());
637 }
638 self
639 }
640
641 #[deprecated(since = "0.5.0", note = "Use emit() instead")]
643 pub fn spawn_from<T, F>(self, target: &'static str, generator: F) -> EmitBuilder<I, O, Chain>
644 where
645 T: Serialize + 'static,
646 F: Fn(&O) -> Vec<T> + Send + Sync + 'static,
647 {
648 self.emit(target, generator)
649 }
650
651 pub fn then<S>(self, step: S) -> StepBuilder<I, S::Output, impl StepChain<I, S::Output>>
653 where
654 S: Step<Input = O> + 'static,
655 {
656 let mut pipeline = Pipeline {
657 name: self.pipeline.name,
658 chain: ThenChain {
659 first: self.pipeline.chain,
660 step: StepWrapper(step),
661 _phantom: std::marker::PhantomData,
662 },
663 retry_policy: self.pipeline.retry_policy,
664 recorder: self.pipeline.recorder,
665 spawn_rules: Vec::new(),
666 step_metadata: self.pipeline.step_metadata,
667 _phantom: std::marker::PhantomData,
668 };
669 pipeline.step_metadata.push(Metadata::default());
670 let step_index = pipeline.step_metadata.len() - 1;
671 StepBuilder { pipeline, step_index }
672 }
673
674 pub fn fork_when<F>(mut self, predicate: F, target: &'static str) -> ForkBuilder<I, O, Chain>
676 where
677 F: Fn(&O) -> bool + Send + Sync + 'static,
678 {
679 self.pipeline.spawn_rules.push(SpawnRule::Fork {
680 target,
681 predicate: Arc::new(predicate),
682 metadata: Metadata::default(),
683 });
684 let rule_index = self.pipeline.spawn_rules.len() - 1;
685 ForkBuilder {
686 pipeline: self.pipeline,
687 rule_index,
688 }
689 }
690
691 pub fn fan_out(mut self, targets: &[&'static str]) -> FanOutBuilder<I, O, Chain> {
693 self.pipeline.spawn_rules.push(SpawnRule::FanOut {
694 targets: targets.to_vec(),
695 metadata: Metadata::default(),
696 });
697 let rule_index = self.pipeline.spawn_rules.len() - 1;
698 FanOutBuilder {
699 pipeline: self.pipeline,
700 rule_index,
701 }
702 }
703
704 pub fn emit<T, F>(mut self, target: &'static str, generator: F) -> EmitBuilder<I, O, Chain>
706 where
707 T: Serialize + 'static,
708 F: Fn(&O) -> Vec<T> + Send + Sync + 'static,
709 {
710 self.pipeline.spawn_rules.push(SpawnRule::Dynamic {
711 target,
712 generator: Arc::new(move |output| {
713 generator(output)
714 .into_iter()
715 .filter_map(|item| serde_json::to_value(item).ok())
716 .collect()
717 }),
718 metadata: Metadata::default(),
719 });
720 let rule_index = self.pipeline.spawn_rules.len() - 1;
721 EmitBuilder {
722 pipeline: self.pipeline,
723 rule_index,
724 }
725 }
726
727 pub fn with_retry(mut self, policy: RetryPolicy) -> Self {
729 self.pipeline.retry_policy = policy;
730 self
731 }
732
733 pub fn with_recorder<R: Recorder + 'static>(mut self, recorder: R) -> Self {
735 self.pipeline.recorder = Arc::new(recorder);
736 self
737 }
738
739 pub fn build(self) -> BuiltPipeline<I, O, Chain> {
741 BuiltPipeline {
742 name: self.pipeline.name,
743 chain: self.pipeline.chain,
744 retry_policy: self.pipeline.retry_policy,
745 recorder: self.pipeline.recorder,
746 spawn_rules: self.pipeline.spawn_rules,
747 step_metadata: self.pipeline.step_metadata,
748 _phantom: std::marker::PhantomData,
749 }
750 }
751}
752
753pub struct FanOutBuilder<I, O, Chain>
755where
756 Chain: StepChain<I, O>,
757{
758 pipeline: Pipeline<I, O, Chain>,
759 rule_index: usize,
760}
761
762impl<I, O, Chain> FanOutBuilder<I, O, Chain>
763where
764 I: Send + Sync + Clone + 'static,
765 O: Send + Sync + Clone + 'static,
766 Chain: StepChain<I, O> + Send + Sync + 'static,
767{
768 pub fn desc(mut self, description: &str) -> Self {
770 if let Some(SpawnRule::FanOut { metadata, .. }) =
771 self.pipeline.spawn_rules.get_mut(self.rule_index)
772 {
773 metadata.description = Some(description.to_string());
774 }
775 self
776 }
777
778 pub fn tag(mut self, key: &str, value: &str) -> Self {
780 if let Some(SpawnRule::FanOut { metadata, .. }) =
781 self.pipeline.spawn_rules.get_mut(self.rule_index)
782 {
783 metadata.tags.insert(key.to_string(), value.to_string());
784 }
785 self
786 }
787
788 #[deprecated(since = "0.5.0", note = "Use emit() instead")]
790 pub fn spawn_from<T, F>(self, target: &'static str, generator: F) -> EmitBuilder<I, O, Chain>
791 where
792 T: Serialize + 'static,
793 F: Fn(&O) -> Vec<T> + Send + Sync + 'static,
794 {
795 self.emit(target, generator)
796 }
797
798 pub fn then<S>(self, step: S) -> StepBuilder<I, S::Output, impl StepChain<I, S::Output>>
800 where
801 S: Step<Input = O> + 'static,
802 {
803 let mut pipeline = Pipeline {
804 name: self.pipeline.name,
805 chain: ThenChain {
806 first: self.pipeline.chain,
807 step: StepWrapper(step),
808 _phantom: std::marker::PhantomData,
809 },
810 retry_policy: self.pipeline.retry_policy,
811 recorder: self.pipeline.recorder,
812 spawn_rules: Vec::new(),
813 step_metadata: self.pipeline.step_metadata,
814 _phantom: std::marker::PhantomData,
815 };
816 pipeline.step_metadata.push(Metadata::default());
817 let step_index = pipeline.step_metadata.len() - 1;
818 StepBuilder { pipeline, step_index }
819 }
820
821 pub fn fork_when<F>(mut self, predicate: F, target: &'static str) -> ForkBuilder<I, O, Chain>
823 where
824 F: Fn(&O) -> bool + Send + Sync + 'static,
825 {
826 self.pipeline.spawn_rules.push(SpawnRule::Fork {
827 target,
828 predicate: Arc::new(predicate),
829 metadata: Metadata::default(),
830 });
831 let rule_index = self.pipeline.spawn_rules.len() - 1;
832 ForkBuilder {
833 pipeline: self.pipeline,
834 rule_index,
835 }
836 }
837
838 pub fn fan_out(mut self, targets: &[&'static str]) -> FanOutBuilder<I, O, Chain> {
840 self.pipeline.spawn_rules.push(SpawnRule::FanOut {
841 targets: targets.to_vec(),
842 metadata: Metadata::default(),
843 });
844 let rule_index = self.pipeline.spawn_rules.len() - 1;
845 FanOutBuilder {
846 pipeline: self.pipeline,
847 rule_index,
848 }
849 }
850
851 pub fn emit<T, F>(mut self, target: &'static str, generator: F) -> EmitBuilder<I, O, Chain>
853 where
854 T: Serialize + 'static,
855 F: Fn(&O) -> Vec<T> + Send + Sync + 'static,
856 {
857 self.pipeline.spawn_rules.push(SpawnRule::Dynamic {
858 target,
859 generator: Arc::new(move |output| {
860 generator(output)
861 .into_iter()
862 .filter_map(|item| serde_json::to_value(item).ok())
863 .collect()
864 }),
865 metadata: Metadata::default(),
866 });
867 let rule_index = self.pipeline.spawn_rules.len() - 1;
868 EmitBuilder {
869 pipeline: self.pipeline,
870 rule_index,
871 }
872 }
873
874 pub fn with_retry(mut self, policy: RetryPolicy) -> Self {
876 self.pipeline.retry_policy = policy;
877 self
878 }
879
880 pub fn with_recorder<R: Recorder + 'static>(mut self, recorder: R) -> Self {
882 self.pipeline.recorder = Arc::new(recorder);
883 self
884 }
885
886 pub fn build(self) -> BuiltPipeline<I, O, Chain> {
888 BuiltPipeline {
889 name: self.pipeline.name,
890 chain: self.pipeline.chain,
891 retry_policy: self.pipeline.retry_policy,
892 recorder: self.pipeline.recorder,
893 spawn_rules: self.pipeline.spawn_rules,
894 step_metadata: self.pipeline.step_metadata,
895 _phantom: std::marker::PhantomData,
896 }
897 }
898}
899
900pub struct EmitBuilder<I, O, Chain>
902where
903 Chain: StepChain<I, O>,
904{
905 pipeline: Pipeline<I, O, Chain>,
906 rule_index: usize,
907}
908
909impl<I, O, Chain> EmitBuilder<I, O, Chain>
910where
911 I: Send + Sync + Clone + 'static,
912 O: Send + Sync + Clone + 'static,
913 Chain: StepChain<I, O> + Send + Sync + 'static,
914{
915 pub fn desc(mut self, description: &str) -> Self {
917 if let Some(SpawnRule::Dynamic { metadata, .. }) =
918 self.pipeline.spawn_rules.get_mut(self.rule_index)
919 {
920 metadata.description = Some(description.to_string());
921 }
922 self
923 }
924
925 pub fn tag(mut self, key: &str, value: &str) -> Self {
927 if let Some(SpawnRule::Dynamic { metadata, .. }) =
928 self.pipeline.spawn_rules.get_mut(self.rule_index)
929 {
930 metadata.tags.insert(key.to_string(), value.to_string());
931 }
932 self
933 }
934
935 #[deprecated(since = "0.5.0", note = "Use emit() instead")]
937 pub fn spawn_from<T, F>(self, target: &'static str, generator: F) -> EmitBuilder<I, O, Chain>
938 where
939 T: Serialize + 'static,
940 F: Fn(&O) -> Vec<T> + Send + Sync + 'static,
941 {
942 self.emit(target, generator)
943 }
944
945 pub fn then<S>(self, step: S) -> StepBuilder<I, S::Output, impl StepChain<I, S::Output>>
947 where
948 S: Step<Input = O> + 'static,
949 {
950 let mut pipeline = Pipeline {
951 name: self.pipeline.name,
952 chain: ThenChain {
953 first: self.pipeline.chain,
954 step: StepWrapper(step),
955 _phantom: std::marker::PhantomData,
956 },
957 retry_policy: self.pipeline.retry_policy,
958 recorder: self.pipeline.recorder,
959 spawn_rules: Vec::new(),
960 step_metadata: self.pipeline.step_metadata,
961 _phantom: std::marker::PhantomData,
962 };
963 pipeline.step_metadata.push(Metadata::default());
964 let step_index = pipeline.step_metadata.len() - 1;
965 StepBuilder { pipeline, step_index }
966 }
967
968 pub fn fork_when<F>(mut self, predicate: F, target: &'static str) -> ForkBuilder<I, O, Chain>
970 where
971 F: Fn(&O) -> bool + Send + Sync + 'static,
972 {
973 self.pipeline.spawn_rules.push(SpawnRule::Fork {
974 target,
975 predicate: Arc::new(predicate),
976 metadata: Metadata::default(),
977 });
978 let rule_index = self.pipeline.spawn_rules.len() - 1;
979 ForkBuilder {
980 pipeline: self.pipeline,
981 rule_index,
982 }
983 }
984
985 pub fn fan_out(mut self, targets: &[&'static str]) -> FanOutBuilder<I, O, Chain> {
987 self.pipeline.spawn_rules.push(SpawnRule::FanOut {
988 targets: targets.to_vec(),
989 metadata: Metadata::default(),
990 });
991 let rule_index = self.pipeline.spawn_rules.len() - 1;
992 FanOutBuilder {
993 pipeline: self.pipeline,
994 rule_index,
995 }
996 }
997
998 pub fn emit<T, F>(mut self, target: &'static str, generator: F) -> EmitBuilder<I, O, Chain>
1000 where
1001 T: Serialize + 'static,
1002 F: Fn(&O) -> Vec<T> + Send + Sync + 'static,
1003 {
1004 self.pipeline.spawn_rules.push(SpawnRule::Dynamic {
1005 target,
1006 generator: Arc::new(move |output| {
1007 generator(output)
1008 .into_iter()
1009 .filter_map(|item| serde_json::to_value(item).ok())
1010 .collect()
1011 }),
1012 metadata: Metadata::default(),
1013 });
1014 let rule_index = self.pipeline.spawn_rules.len() - 1;
1015 EmitBuilder {
1016 pipeline: self.pipeline,
1017 rule_index,
1018 }
1019 }
1020
1021 pub fn with_retry(mut self, policy: RetryPolicy) -> Self {
1023 self.pipeline.retry_policy = policy;
1024 self
1025 }
1026
1027 pub fn with_recorder<R: Recorder + 'static>(mut self, recorder: R) -> Self {
1029 self.pipeline.recorder = Arc::new(recorder);
1030 self
1031 }
1032
1033 pub fn build(self) -> BuiltPipeline<I, O, Chain> {
1035 BuiltPipeline {
1036 name: self.pipeline.name,
1037 chain: self.pipeline.chain,
1038 retry_policy: self.pipeline.retry_policy,
1039 recorder: self.pipeline.recorder,
1040 spawn_rules: self.pipeline.spawn_rules,
1041 step_metadata: self.pipeline.step_metadata,
1042 _phantom: std::marker::PhantomData,
1043 }
1044 }
1045}
1046
1047#[doc(hidden)]
1049pub struct ThenChain<First, S, I, M, O>
1050where
1051 First: StepChain<I, M>,
1052 S: BoxedStep<M, O>,
1053{
1054 pub first: First,
1055 pub step: S,
1056 pub _phantom: std::marker::PhantomData<(I, M, O)>,
1057}
1058
1059#[async_trait]
1060impl<First, S, I, M, O> StepChain<I, O> for ThenChain<First, S, I, M, O>
1061where
1062 I: Send + Sync + Clone + 'static,
1063 M: Send + Sync + Clone + 'static,
1064 O: Send + Sync + 'static,
1065 First: StepChain<I, M> + Send + Sync,
1066 S: BoxedStep<M, O> + Send + Sync,
1067{
1068 async fn run(
1069 &self,
1070 input: I,
1071 run_id: RunId,
1072 recorder: &dyn Recorder,
1073 retry_policy: &RetryPolicy,
1074 start_index: u32,
1075 ) -> Result<O, PipelineError> {
1076 let mid = self
1078 .first
1079 .run(input, run_id, recorder, retry_policy, start_index)
1080 .await?;
1081
1082 let next_index = start_index + self.first.step_count();
1083
1084 let step_name = self.step.name();
1085 let step_id = recorder.start_step(run_id, step_name, next_index).await?;
1086
1087 let mut attempt = 0u32;
1089 let output = loop {
1090 attempt += 1;
1091 match self.step.execute(mid.clone()).await {
1092 Ok(output) => break output,
1093 Err(StepError::Permanent(e)) => {
1094 recorder
1095 .complete_step(
1096 step_id,
1097 StepStatus::Failed {
1098 error: e.to_string(),
1099 attempt,
1100 },
1101 )
1102 .await?;
1103 return Err(PipelineError::StepFailed {
1104 step: step_name,
1105 source: e,
1106 });
1107 }
1108 Err(StepError::Retryable(e)) => {
1109 if let Some(delay) = retry_policy.delay_for_attempt(attempt) {
1110 tokio::time::sleep(delay).await;
1111 } else {
1112 recorder
1113 .complete_step(
1114 step_id,
1115 StepStatus::Failed {
1116 error: e.to_string(),
1117 attempt,
1118 },
1119 )
1120 .await?;
1121 return Err(PipelineError::RetriesExhausted {
1122 step: step_name,
1123 attempts: attempt,
1124 source: e,
1125 });
1126 }
1127 }
1128 }
1129 };
1130
1131 recorder
1132 .complete_step(step_id, StepStatus::Completed)
1133 .await?;
1134 Ok(output)
1135 }
1136
1137 fn step_count(&self) -> u32 {
1138 self.first.step_count() + 1
1139 }
1140
1141 fn collect_step_names(&self, names: &mut Vec<&'static str>) {
1142 self.first.collect_step_names(names);
1143 names.push(self.step.name());
1144 }
1145}
1146
1147pub struct BuiltPipeline<I, O, Chain>
1149where
1150 Chain: StepChain<I, O>,
1151{
1152 name: &'static str,
1153 chain: Chain,
1154 retry_policy: RetryPolicy,
1155 recorder: Arc<dyn Recorder>,
1156 pub(crate) spawn_rules: Vec<SpawnRule<O>>,
1157 step_metadata: Vec<Metadata>,
1158 _phantom: std::marker::PhantomData<(I, O)>,
1159}
1160
1161impl<I, O, Chain> BuiltPipeline<I, O, Chain>
1162where
1163 I: Send + Clone + HasEntityId + 'static,
1164 O: Send + Serialize + 'static,
1165 Chain: StepChain<I, O> + Send + Sync,
1166{
1167 pub async fn run(&self, input: I) -> Result<O, PipelineError> {
1169 let entity_id = input.entity_id();
1170 let run_id = self.recorder.start_run(self.name, &entity_id).await?;
1171
1172 match self
1173 .chain
1174 .run(input, run_id, self.recorder.as_ref(), &self.retry_policy, 0)
1175 .await
1176 {
1177 Ok(output) => {
1178 self.recorder
1179 .complete_run(run_id, RunStatus::Completed)
1180 .await?;
1181 Ok(output)
1182 }
1183 Err(e) => {
1184 self.recorder
1185 .complete_run(
1186 run_id,
1187 RunStatus::Failed {
1188 error: e.to_string(),
1189 },
1190 )
1191 .await?;
1192 Err(e)
1193 }
1194 }
1195 }
1196
1197 pub fn name(&self) -> &'static str {
1199 self.name
1200 }
1201
1202 pub fn get_spawned(&self, output: &O) -> Vec<(&'static str, serde_json::Value)> {
1204 let mut spawned = Vec::new();
1205
1206 for rule in &self.spawn_rules {
1207 match rule {
1208 SpawnRule::Fork {
1209 target, predicate, ..
1210 } => {
1211 if predicate(output) {
1212 if let Ok(value) = serde_json::to_value(output) {
1213 spawned.push((*target, value));
1214 }
1215 }
1216 }
1217 SpawnRule::FanOut { targets, .. } => {
1218 if let Ok(value) = serde_json::to_value(output) {
1219 for target in targets {
1220 spawned.push((*target, value.clone()));
1221 }
1222 }
1223 }
1224 SpawnRule::Dynamic { target, generator, .. } => {
1225 for input in generator(output) {
1226 spawned.push((*target, input));
1227 }
1228 }
1229 }
1230 }
1231
1232 spawned
1233 }
1234
1235 pub fn to_graph(&self) -> PipelineGraph {
1237 let mut step_names = Vec::new();
1238 self.chain.collect_step_names(&mut step_names);
1239
1240 let steps: Vec<StepNode> = step_names
1241 .into_iter()
1242 .enumerate()
1243 .map(|(index, name)| StepNode {
1244 name: name.to_string(),
1245 index,
1246 metadata: self
1247 .step_metadata
1248 .get(index)
1249 .cloned()
1250 .unwrap_or_default(),
1251 })
1252 .collect();
1253
1254 let mut forks = Vec::new();
1255 let mut fan_outs = Vec::new();
1256 let mut emits = Vec::new();
1257
1258 for rule in &self.spawn_rules {
1259 match rule {
1260 SpawnRule::Fork {
1261 target,
1262 metadata,
1263 ..
1264 } => {
1265 forks.push(ForkNode {
1266 target_pipeline: target.to_string(),
1267 condition: metadata
1268 .description
1269 .clone()
1270 .unwrap_or_else(|| format!("fork to {}", target)),
1271 metadata: metadata.clone(),
1272 });
1273 }
1274 SpawnRule::FanOut { targets, metadata } => {
1275 fan_outs.push(FanOutNode {
1276 targets: targets.iter().map(|s| s.to_string()).collect(),
1277 metadata: metadata.clone(),
1278 });
1279 }
1280 SpawnRule::Dynamic { target, metadata, .. } => {
1281 emits.push(EmitNode {
1282 target_pipeline: target.to_string(),
1283 metadata: metadata.clone(),
1284 });
1285 }
1286 }
1287 }
1288
1289 PipelineGraph {
1290 name: self.name.to_string(),
1291 steps,
1292 forks,
1293 fan_outs,
1294 emits,
1295 }
1296 }
1297}