deltaflow/
pipeline.rs

1//! Pipeline builder and executor.
2
3use async_trait::async_trait;
4use serde::Serialize;
5use std::sync::Arc;
6use thiserror::Error;
7
8use crate::recorder::{NoopRecorder, Recorder, RunId, RunStatus, StepStatus};
9use crate::retry::RetryPolicy;
10use crate::step::{Step, StepError};
11
12/// Type alias for fork predicate functions.
13type ForkPredicate<O> = Arc<dyn Fn(&O) -> bool + Send + Sync>;
14
15/// Type alias for dynamic spawn generator functions.
16type SpawnGenerator<O> = Arc<dyn Fn(&O) -> Vec<serde_json::Value> + Send + Sync>;
17
18use std::collections::HashMap;
19
20/// Metadata for steps and spawn operations.
21///
22/// Used to attach descriptions and tags for visualization and analytics.
23#[derive(Default, Clone, Debug, Serialize)]
24pub struct Metadata {
25    /// Human-readable description for visualization.
26    pub description: Option<String>,
27    /// Arbitrary key-value tags for filtering and analytics.
28    pub tags: HashMap<String, String>,
29}
30
31impl Metadata {
32    /// Create empty metadata.
33    pub fn new() -> Self {
34        Self::default()
35    }
36
37    /// Set the description.
38    pub fn with_description(mut self, description: &str) -> Self {
39        self.description = Some(description.to_string());
40        self
41    }
42
43    /// Add a tag.
44    pub fn with_tag(mut self, key: &str, value: &str) -> Self {
45        self.tags.insert(key.to_string(), value.to_string());
46        self
47    }
48}
49
50/// A rule for spawning work after pipeline completion.
51#[derive(Clone)]
52pub enum SpawnRule<O> {
53    /// Conditional fork: spawn to target if predicate returns true.
54    Fork {
55        target: &'static str,
56        predicate: ForkPredicate<O>,
57        metadata: Metadata,
58    },
59    /// Static fan-out: always spawn to these targets.
60    FanOut {
61        targets: Vec<&'static str>,
62        metadata: Metadata,
63    },
64    /// Dynamic spawn: generate tasks from output.
65    Dynamic {
66        target: &'static str,
67        generator: SpawnGenerator<O>,
68        metadata: Metadata,
69    },
70}
71
72/// Serializable representation of a pipeline's structure for visualization.
73#[derive(Debug, Clone, Serialize)]
74pub struct PipelineGraph {
75    pub name: String,
76    pub steps: Vec<StepNode>,
77    pub forks: Vec<ForkNode>,
78    pub fan_outs: Vec<FanOutNode>,
79    pub emits: Vec<EmitNode>,
80}
81
82/// A step in the pipeline graph.
83#[derive(Debug, Clone, Serialize)]
84pub struct StepNode {
85    pub name: String,
86    pub index: usize,
87    #[serde(flatten)]
88    pub metadata: Metadata,
89}
90
91/// A conditional fork declaration.
92#[derive(Debug, Clone, Serialize)]
93pub struct ForkNode {
94    pub target_pipeline: String,
95    pub condition: String,
96    #[serde(flatten)]
97    pub metadata: Metadata,
98}
99
100/// A static fan-out declaration.
101#[derive(Debug, Clone, Serialize)]
102pub struct FanOutNode {
103    pub targets: Vec<String>,
104    #[serde(flatten)]
105    pub metadata: Metadata,
106}
107
108/// A dynamic spawn (emit) declaration.
109#[derive(Debug, Clone, Serialize)]
110pub struct EmitNode {
111    pub target_pipeline: String,
112    #[serde(flatten)]
113    pub metadata: Metadata,
114}
115
116/// Error returned by pipeline execution.
117#[derive(Error, Debug)]
118pub enum PipelineError {
119    /// A step failed permanently.
120    #[error("step '{step}' failed: {source}")]
121    StepFailed {
122        step: &'static str,
123        #[source]
124        source: anyhow::Error,
125    },
126
127    /// A step exhausted all retries.
128    #[error("step '{step}' exhausted {attempts} retries: {source}")]
129    RetriesExhausted {
130        step: &'static str,
131        attempts: u32,
132        #[source]
133        source: anyhow::Error,
134    },
135
136    /// Recording failed.
137    #[error("recorder error: {0}")]
138    RecorderError(#[from] anyhow::Error),
139}
140
141/// Trait for types that can provide an entity ID for recording.
142pub trait HasEntityId {
143    /// Returns the entity identifier for this input.
144    fn entity_id(&self) -> String;
145}
146
147// Blanket impl for String
148impl HasEntityId for String {
149    fn entity_id(&self) -> String {
150        self.clone()
151    }
152}
153
154// Blanket impl for &str
155impl HasEntityId for &str {
156    fn entity_id(&self) -> String {
157        self.to_string()
158    }
159}
160
161/// Internal trait for boxed step execution.
162#[doc(hidden)]
163#[async_trait]
164pub trait BoxedStep<I, O>: Send + Sync {
165    fn name(&self) -> &'static str;
166    async fn execute(&self, input: I) -> Result<O, StepError>;
167}
168
169/// Wrapper to make any Step into a BoxedStep.
170#[doc(hidden)]
171pub struct StepWrapper<S>(pub S);
172
173#[async_trait]
174impl<S> BoxedStep<S::Input, S::Output> for StepWrapper<S>
175where
176    S: Step,
177{
178    fn name(&self) -> &'static str {
179        self.0.name()
180    }
181
182    async fn execute(&self, input: S::Input) -> Result<S::Output, StepError> {
183        self.0.execute(input).await
184    }
185}
186
187/// A chain of steps that transforms I -> O.
188#[doc(hidden)]
189#[async_trait]
190pub trait StepChain<I, O>: Send + Sync {
191    async fn run(
192        &self,
193        input: I,
194        run_id: RunId,
195        recorder: &dyn Recorder,
196        retry_policy: &RetryPolicy,
197        start_index: u32,
198    ) -> Result<O, PipelineError>;
199
200    /// Returns the number of steps in this chain.
201    fn step_count(&self) -> u32;
202
203    /// Collect step names in order.
204    fn collect_step_names(&self, names: &mut Vec<&'static str>);
205}
206
207/// Terminal chain - identity transform.
208#[doc(hidden)]
209pub struct Identity;
210
211#[async_trait]
212impl<T: Send + 'static> StepChain<T, T> for Identity {
213    async fn run(
214        &self,
215        input: T,
216        _run_id: RunId,
217        _recorder: &dyn Recorder,
218        _retry_policy: &RetryPolicy,
219        _start_index: u32,
220    ) -> Result<T, PipelineError> {
221        Ok(input)
222    }
223
224    fn step_count(&self) -> u32 {
225        0
226    }
227
228    fn collect_step_names(&self, _names: &mut Vec<&'static str>) {}
229}
230
231/// Chain that runs a step then continues with the rest.
232#[doc(hidden)]
233pub struct ChainedStep<S, Next, I, M, O>
234where
235    S: BoxedStep<I, M>,
236    Next: StepChain<M, O>,
237{
238    pub step: S,
239    pub next: Next,
240    pub _phantom: std::marker::PhantomData<(I, M, O)>,
241}
242
243#[async_trait]
244impl<S, Next, I, M, O> StepChain<I, O> for ChainedStep<S, Next, I, M, O>
245where
246    I: Send + Sync + Clone + 'static,
247    M: Send + Sync + 'static,
248    O: Send + Sync + 'static,
249    S: BoxedStep<I, M> + Send + Sync,
250    Next: StepChain<M, O> + Send + Sync,
251{
252    async fn run(
253        &self,
254        input: I,
255        run_id: RunId,
256        recorder: &dyn Recorder,
257        retry_policy: &RetryPolicy,
258        start_index: u32,
259    ) -> Result<O, PipelineError> {
260        let step_name = self.step.name();
261        let step_id = recorder.start_step(run_id, step_name, start_index).await?;
262
263        // Execute with retry
264        let mut attempt = 0u32;
265        let output = loop {
266            attempt += 1;
267            match self.step.execute(input.clone()).await {
268                Ok(output) => break output,
269                Err(StepError::Permanent(e)) => {
270                    recorder
271                        .complete_step(
272                            step_id,
273                            StepStatus::Failed {
274                                error: e.to_string(),
275                                attempt,
276                            },
277                        )
278                        .await?;
279                    return Err(PipelineError::StepFailed {
280                        step: step_name,
281                        source: e,
282                    });
283                }
284                Err(StepError::Retryable(e)) => {
285                    if let Some(delay) = retry_policy.delay_for_attempt(attempt) {
286                        tokio::time::sleep(delay).await;
287                    } else {
288                        recorder
289                            .complete_step(
290                                step_id,
291                                StepStatus::Failed {
292                                    error: e.to_string(),
293                                    attempt,
294                                },
295                            )
296                            .await?;
297                        return Err(PipelineError::RetriesExhausted {
298                            step: step_name,
299                            attempts: attempt,
300                            source: e,
301                        });
302                    }
303                }
304            }
305        };
306
307        recorder
308            .complete_step(step_id, StepStatus::Completed)
309            .await?;
310
311        // Continue with next steps
312        self.next
313            .run(output, run_id, recorder, retry_policy, start_index + 1)
314            .await
315    }
316
317    fn step_count(&self) -> u32 {
318        1 + self.next.step_count()
319    }
320
321    fn collect_step_names(&self, names: &mut Vec<&'static str>) {
322        names.push(self.step.name());
323        self.next.collect_step_names(names);
324    }
325}
326
327/// Builder for constructing pipelines.
328pub struct Pipeline<I, O, Chain>
329where
330    Chain: StepChain<I, O>,
331{
332    name: &'static str,
333    chain: Chain,
334    retry_policy: RetryPolicy,
335    recorder: Arc<dyn Recorder>,
336    spawn_rules: Vec<SpawnRule<O>>,
337    step_metadata: Vec<Metadata>,
338    _phantom: std::marker::PhantomData<(I, O)>,
339}
340
341impl Pipeline<(), (), Identity> {
342    /// Create a new pipeline builder with the given name.
343    pub fn new(name: &'static str) -> Self {
344        Self {
345            name,
346            chain: Identity,
347            retry_policy: RetryPolicy::default(),
348            recorder: Arc::new(NoopRecorder),
349            spawn_rules: Vec::new(),
350            step_metadata: Vec::new(),
351            _phantom: std::marker::PhantomData,
352        }
353    }
354}
355
356impl<O, Chain> Pipeline<(), O, Chain>
357where
358    Chain: StepChain<(), O> + Send + Sync + 'static,
359    O: Send + 'static,
360{
361    /// Add the first step to the pipeline.
362    #[allow(clippy::type_complexity)]
363    pub fn start_with<S>(
364        self,
365        step: S,
366    ) -> StepBuilder<
367        S::Input,
368        S::Output,
369        ChainedStep<StepWrapper<S>, Identity, S::Input, S::Output, S::Output>,
370    >
371    where
372        S: Step + 'static,
373    {
374        let pipeline = Pipeline {
375            name: self.name,
376            chain: ChainedStep {
377                step: StepWrapper(step),
378                next: Identity,
379                _phantom: std::marker::PhantomData,
380            },
381            retry_policy: self.retry_policy,
382            recorder: self.recorder,
383            spawn_rules: Vec::new(),
384            step_metadata: vec![Metadata::default()],
385            _phantom: std::marker::PhantomData,
386        };
387        StepBuilder {
388            pipeline,
389            step_index: 0,
390        }
391    }
392}
393
394impl<I, O, Chain> Pipeline<I, O, Chain>
395where
396    I: Send + Sync + Clone + 'static,
397    O: Send + Sync + Clone + 'static,
398    Chain: StepChain<I, O> + Send + Sync + 'static,
399{
400    /// Add a step to the pipeline.
401    pub fn then<S>(self, step: S) -> Pipeline<I, S::Output, impl StepChain<I, S::Output>>
402    where
403        S: Step<Input = O> + 'static,
404    {
405        Pipeline {
406            name: self.name,
407            chain: ThenChain {
408                first: self.chain,
409                step: StepWrapper(step),
410                _phantom: std::marker::PhantomData,
411            },
412            retry_policy: self.retry_policy,
413            recorder: self.recorder,
414            spawn_rules: Vec::new(),
415            step_metadata: self.step_metadata,
416            _phantom: std::marker::PhantomData,
417        }
418    }
419
420    /// Set the retry policy for this pipeline.
421    pub fn with_retry(mut self, policy: RetryPolicy) -> Self {
422        self.retry_policy = policy;
423        self
424    }
425
426    /// Set the recorder for this pipeline.
427    pub fn with_recorder<R: Recorder + 'static>(mut self, recorder: R) -> Self {
428        self.recorder = Arc::new(recorder);
429        self
430    }
431
432    /// Conditionally fork to a target pipeline when predicate returns true.
433    ///
434    /// The output is serialized and sent to the target pipeline.
435    /// Multiple forks can match - they are not mutually exclusive.
436    pub fn fork_when<F>(mut self, predicate: F, target: &'static str) -> Self
437    where
438        F: Fn(&O) -> bool + Send + Sync + 'static,
439    {
440        self.spawn_rules.push(SpawnRule::Fork {
441            target,
442            predicate: Arc::new(predicate),
443            metadata: Metadata::default(),
444        });
445        self
446    }
447
448    /// Fan out to multiple target pipelines unconditionally.
449    ///
450    /// The output is serialized and sent to ALL specified targets.
451    pub fn fan_out(mut self, targets: &[&'static str]) -> Self {
452        self.spawn_rules.push(SpawnRule::FanOut {
453            targets: targets.to_vec(),
454            metadata: Metadata::default(),
455        });
456        self
457    }
458
459    /// Build the pipeline, ready for execution.
460    pub fn build(self) -> BuiltPipeline<I, O, Chain> {
461        BuiltPipeline {
462            name: self.name,
463            chain: self.chain,
464            retry_policy: self.retry_policy,
465            recorder: self.recorder,
466            spawn_rules: self.spawn_rules,
467            step_metadata: self.step_metadata,
468            _phantom: std::marker::PhantomData,
469        }
470    }
471}
472
473/// Builder returned after adding a step, allowing metadata configuration.
474pub struct StepBuilder<I, O, Chain>
475where
476    Chain: StepChain<I, O>,
477{
478    pipeline: Pipeline<I, O, Chain>,
479    step_index: usize,
480}
481
482impl<I, O, Chain> StepBuilder<I, O, Chain>
483where
484    I: Send + Sync + Clone + 'static,
485    O: Send + Sync + Clone + 'static,
486    Chain: StepChain<I, O> + Send + Sync + 'static,
487{
488    /// Add a description to the last added step.
489    pub fn desc(mut self, description: &str) -> Self {
490        if let Some(meta) = self.pipeline.step_metadata.get_mut(self.step_index) {
491            meta.description = Some(description.to_string());
492        }
493        self
494    }
495
496    /// Add a tag to the last added step.
497    pub fn tag(mut self, key: &str, value: &str) -> Self {
498        if let Some(meta) = self.pipeline.step_metadata.get_mut(self.step_index) {
499            meta.tags.insert(key.to_string(), value.to_string());
500        }
501        self
502    }
503
504    /// Add another step to the pipeline.
505    pub fn then<S>(self, step: S) -> StepBuilder<I, S::Output, impl StepChain<I, S::Output>>
506    where
507        S: Step<Input = O> + 'static,
508    {
509        let mut pipeline = Pipeline {
510            name: self.pipeline.name,
511            chain: ThenChain {
512                first: self.pipeline.chain,
513                step: StepWrapper(step),
514                _phantom: std::marker::PhantomData,
515            },
516            retry_policy: self.pipeline.retry_policy,
517            recorder: self.pipeline.recorder,
518            spawn_rules: Vec::new(),
519            step_metadata: self.pipeline.step_metadata,
520            _phantom: std::marker::PhantomData,
521        };
522        pipeline.step_metadata.push(Metadata::default());
523        let step_index = pipeline.step_metadata.len() - 1;
524        StepBuilder { pipeline, step_index }
525    }
526
527    /// Set the retry policy.
528    pub fn with_retry(mut self, policy: RetryPolicy) -> Self {
529        self.pipeline.retry_policy = policy;
530        self
531    }
532
533    /// Set the recorder.
534    pub fn with_recorder<R: Recorder + 'static>(mut self, recorder: R) -> Self {
535        self.pipeline.recorder = Arc::new(recorder);
536        self
537    }
538
539    /// Conditionally fork to a target pipeline.
540    pub fn fork_when<F>(mut self, predicate: F, target: &'static str) -> ForkBuilder<I, O, Chain>
541    where
542        F: Fn(&O) -> bool + Send + Sync + 'static,
543    {
544        self.pipeline.spawn_rules.push(SpawnRule::Fork {
545            target,
546            predicate: Arc::new(predicate),
547            metadata: Metadata::default(),
548        });
549        let rule_index = self.pipeline.spawn_rules.len() - 1;
550        ForkBuilder {
551            pipeline: self.pipeline,
552            rule_index,
553        }
554    }
555
556    /// Fan out to multiple targets.
557    pub fn fan_out(mut self, targets: &[&'static str]) -> FanOutBuilder<I, O, Chain> {
558        self.pipeline.spawn_rules.push(SpawnRule::FanOut {
559            targets: targets.to_vec(),
560            metadata: Metadata::default(),
561        });
562        let rule_index = self.pipeline.spawn_rules.len() - 1;
563        FanOutBuilder {
564            pipeline: self.pipeline,
565            rule_index,
566        }
567    }
568
569    /// Alias for spawn_from (emit).
570    pub fn emit<T, F>(mut self, target: &'static str, generator: F) -> EmitBuilder<I, O, Chain>
571    where
572        T: Serialize + 'static,
573        F: Fn(&O) -> Vec<T> + Send + Sync + 'static,
574    {
575        self.pipeline.spawn_rules.push(SpawnRule::Dynamic {
576            target,
577            generator: Arc::new(move |output| {
578                generator(output)
579                    .into_iter()
580                    .filter_map(|item| serde_json::to_value(item).ok())
581                    .collect()
582            }),
583            metadata: Metadata::default(),
584        });
585        let rule_index = self.pipeline.spawn_rules.len() - 1;
586        EmitBuilder {
587            pipeline: self.pipeline,
588            rule_index,
589        }
590    }
591
592    /// Build the pipeline.
593    pub fn build(self) -> BuiltPipeline<I, O, Chain> {
594        BuiltPipeline {
595            name: self.pipeline.name,
596            chain: self.pipeline.chain,
597            retry_policy: self.pipeline.retry_policy,
598            recorder: self.pipeline.recorder,
599            spawn_rules: self.pipeline.spawn_rules,
600            step_metadata: self.pipeline.step_metadata,
601            _phantom: std::marker::PhantomData,
602        }
603    }
604}
605
606/// Builder returned after fork_when, allowing metadata configuration.
607pub struct ForkBuilder<I, O, Chain>
608where
609    Chain: StepChain<I, O>,
610{
611    pipeline: Pipeline<I, O, Chain>,
612    rule_index: usize,
613}
614
615impl<I, O, Chain> ForkBuilder<I, O, Chain>
616where
617    I: Send + Sync + Clone + 'static,
618    O: Send + Sync + Clone + 'static,
619    Chain: StepChain<I, O> + Send + Sync + 'static,
620{
621    /// Add a description to this fork.
622    pub fn desc(mut self, description: &str) -> Self {
623        if let Some(SpawnRule::Fork { metadata, .. }) =
624            self.pipeline.spawn_rules.get_mut(self.rule_index)
625        {
626            metadata.description = Some(description.to_string());
627        }
628        self
629    }
630
631    /// Add a tag to this fork.
632    pub fn tag(mut self, key: &str, value: &str) -> Self {
633        if let Some(SpawnRule::Fork { metadata, .. }) =
634            self.pipeline.spawn_rules.get_mut(self.rule_index)
635        {
636            metadata.tags.insert(key.to_string(), value.to_string());
637        }
638        self
639    }
640
641    /// Declare follow-up tasks to spawn on successful completion (deprecated, use emit).
642    #[deprecated(since = "0.5.0", note = "Use emit() instead")]
643    pub fn spawn_from<T, F>(self, target: &'static str, generator: F) -> EmitBuilder<I, O, Chain>
644    where
645        T: Serialize + 'static,
646        F: Fn(&O) -> Vec<T> + Send + Sync + 'static,
647    {
648        self.emit(target, generator)
649    }
650
651    /// Add another step to the pipeline.
652    pub fn then<S>(self, step: S) -> StepBuilder<I, S::Output, impl StepChain<I, S::Output>>
653    where
654        S: Step<Input = O> + 'static,
655    {
656        let mut pipeline = Pipeline {
657            name: self.pipeline.name,
658            chain: ThenChain {
659                first: self.pipeline.chain,
660                step: StepWrapper(step),
661                _phantom: std::marker::PhantomData,
662            },
663            retry_policy: self.pipeline.retry_policy,
664            recorder: self.pipeline.recorder,
665            spawn_rules: Vec::new(),
666            step_metadata: self.pipeline.step_metadata,
667            _phantom: std::marker::PhantomData,
668        };
669        pipeline.step_metadata.push(Metadata::default());
670        let step_index = pipeline.step_metadata.len() - 1;
671        StepBuilder { pipeline, step_index }
672    }
673
674    /// Conditionally fork to a target pipeline.
675    pub fn fork_when<F>(mut self, predicate: F, target: &'static str) -> ForkBuilder<I, O, Chain>
676    where
677        F: Fn(&O) -> bool + Send + Sync + 'static,
678    {
679        self.pipeline.spawn_rules.push(SpawnRule::Fork {
680            target,
681            predicate: Arc::new(predicate),
682            metadata: Metadata::default(),
683        });
684        let rule_index = self.pipeline.spawn_rules.len() - 1;
685        ForkBuilder {
686            pipeline: self.pipeline,
687            rule_index,
688        }
689    }
690
691    /// Fan out to multiple targets.
692    pub fn fan_out(mut self, targets: &[&'static str]) -> FanOutBuilder<I, O, Chain> {
693        self.pipeline.spawn_rules.push(SpawnRule::FanOut {
694            targets: targets.to_vec(),
695            metadata: Metadata::default(),
696        });
697        let rule_index = self.pipeline.spawn_rules.len() - 1;
698        FanOutBuilder {
699            pipeline: self.pipeline,
700            rule_index,
701        }
702    }
703
704    /// Emit tasks dynamically.
705    pub fn emit<T, F>(mut self, target: &'static str, generator: F) -> EmitBuilder<I, O, Chain>
706    where
707        T: Serialize + 'static,
708        F: Fn(&O) -> Vec<T> + Send + Sync + 'static,
709    {
710        self.pipeline.spawn_rules.push(SpawnRule::Dynamic {
711            target,
712            generator: Arc::new(move |output| {
713                generator(output)
714                    .into_iter()
715                    .filter_map(|item| serde_json::to_value(item).ok())
716                    .collect()
717            }),
718            metadata: Metadata::default(),
719        });
720        let rule_index = self.pipeline.spawn_rules.len() - 1;
721        EmitBuilder {
722            pipeline: self.pipeline,
723            rule_index,
724        }
725    }
726
727    /// Set retry policy.
728    pub fn with_retry(mut self, policy: RetryPolicy) -> Self {
729        self.pipeline.retry_policy = policy;
730        self
731    }
732
733    /// Set recorder.
734    pub fn with_recorder<R: Recorder + 'static>(mut self, recorder: R) -> Self {
735        self.pipeline.recorder = Arc::new(recorder);
736        self
737    }
738
739    /// Build the pipeline.
740    pub fn build(self) -> BuiltPipeline<I, O, Chain> {
741        BuiltPipeline {
742            name: self.pipeline.name,
743            chain: self.pipeline.chain,
744            retry_policy: self.pipeline.retry_policy,
745            recorder: self.pipeline.recorder,
746            spawn_rules: self.pipeline.spawn_rules,
747            step_metadata: self.pipeline.step_metadata,
748            _phantom: std::marker::PhantomData,
749        }
750    }
751}
752
753/// Builder returned after fan_out, allowing metadata configuration.
754pub struct FanOutBuilder<I, O, Chain>
755where
756    Chain: StepChain<I, O>,
757{
758    pipeline: Pipeline<I, O, Chain>,
759    rule_index: usize,
760}
761
762impl<I, O, Chain> FanOutBuilder<I, O, Chain>
763where
764    I: Send + Sync + Clone + 'static,
765    O: Send + Sync + Clone + 'static,
766    Chain: StepChain<I, O> + Send + Sync + 'static,
767{
768    /// Add a description to this fan-out.
769    pub fn desc(mut self, description: &str) -> Self {
770        if let Some(SpawnRule::FanOut { metadata, .. }) =
771            self.pipeline.spawn_rules.get_mut(self.rule_index)
772        {
773            metadata.description = Some(description.to_string());
774        }
775        self
776    }
777
778    /// Add a tag to this fan-out.
779    pub fn tag(mut self, key: &str, value: &str) -> Self {
780        if let Some(SpawnRule::FanOut { metadata, .. }) =
781            self.pipeline.spawn_rules.get_mut(self.rule_index)
782        {
783            metadata.tags.insert(key.to_string(), value.to_string());
784        }
785        self
786    }
787
788    /// Declare follow-up tasks to spawn on successful completion (deprecated, use emit).
789    #[deprecated(since = "0.5.0", note = "Use emit() instead")]
790    pub fn spawn_from<T, F>(self, target: &'static str, generator: F) -> EmitBuilder<I, O, Chain>
791    where
792        T: Serialize + 'static,
793        F: Fn(&O) -> Vec<T> + Send + Sync + 'static,
794    {
795        self.emit(target, generator)
796    }
797
798    /// Add another step to the pipeline.
799    pub fn then<S>(self, step: S) -> StepBuilder<I, S::Output, impl StepChain<I, S::Output>>
800    where
801        S: Step<Input = O> + 'static,
802    {
803        let mut pipeline = Pipeline {
804            name: self.pipeline.name,
805            chain: ThenChain {
806                first: self.pipeline.chain,
807                step: StepWrapper(step),
808                _phantom: std::marker::PhantomData,
809            },
810            retry_policy: self.pipeline.retry_policy,
811            recorder: self.pipeline.recorder,
812            spawn_rules: Vec::new(),
813            step_metadata: self.pipeline.step_metadata,
814            _phantom: std::marker::PhantomData,
815        };
816        pipeline.step_metadata.push(Metadata::default());
817        let step_index = pipeline.step_metadata.len() - 1;
818        StepBuilder { pipeline, step_index }
819    }
820
821    /// Conditionally fork to a target pipeline.
822    pub fn fork_when<F>(mut self, predicate: F, target: &'static str) -> ForkBuilder<I, O, Chain>
823    where
824        F: Fn(&O) -> bool + Send + Sync + 'static,
825    {
826        self.pipeline.spawn_rules.push(SpawnRule::Fork {
827            target,
828            predicate: Arc::new(predicate),
829            metadata: Metadata::default(),
830        });
831        let rule_index = self.pipeline.spawn_rules.len() - 1;
832        ForkBuilder {
833            pipeline: self.pipeline,
834            rule_index,
835        }
836    }
837
838    /// Fan out to multiple targets.
839    pub fn fan_out(mut self, targets: &[&'static str]) -> FanOutBuilder<I, O, Chain> {
840        self.pipeline.spawn_rules.push(SpawnRule::FanOut {
841            targets: targets.to_vec(),
842            metadata: Metadata::default(),
843        });
844        let rule_index = self.pipeline.spawn_rules.len() - 1;
845        FanOutBuilder {
846            pipeline: self.pipeline,
847            rule_index,
848        }
849    }
850
851    /// Emit tasks dynamically.
852    pub fn emit<T, F>(mut self, target: &'static str, generator: F) -> EmitBuilder<I, O, Chain>
853    where
854        T: Serialize + 'static,
855        F: Fn(&O) -> Vec<T> + Send + Sync + 'static,
856    {
857        self.pipeline.spawn_rules.push(SpawnRule::Dynamic {
858            target,
859            generator: Arc::new(move |output| {
860                generator(output)
861                    .into_iter()
862                    .filter_map(|item| serde_json::to_value(item).ok())
863                    .collect()
864            }),
865            metadata: Metadata::default(),
866        });
867        let rule_index = self.pipeline.spawn_rules.len() - 1;
868        EmitBuilder {
869            pipeline: self.pipeline,
870            rule_index,
871        }
872    }
873
874    /// Set retry policy.
875    pub fn with_retry(mut self, policy: RetryPolicy) -> Self {
876        self.pipeline.retry_policy = policy;
877        self
878    }
879
880    /// Set recorder.
881    pub fn with_recorder<R: Recorder + 'static>(mut self, recorder: R) -> Self {
882        self.pipeline.recorder = Arc::new(recorder);
883        self
884    }
885
886    /// Build the pipeline.
887    pub fn build(self) -> BuiltPipeline<I, O, Chain> {
888        BuiltPipeline {
889            name: self.pipeline.name,
890            chain: self.pipeline.chain,
891            retry_policy: self.pipeline.retry_policy,
892            recorder: self.pipeline.recorder,
893            spawn_rules: self.pipeline.spawn_rules,
894            step_metadata: self.pipeline.step_metadata,
895            _phantom: std::marker::PhantomData,
896        }
897    }
898}
899
900/// Builder returned after emit, allowing metadata configuration.
901pub struct EmitBuilder<I, O, Chain>
902where
903    Chain: StepChain<I, O>,
904{
905    pipeline: Pipeline<I, O, Chain>,
906    rule_index: usize,
907}
908
909impl<I, O, Chain> EmitBuilder<I, O, Chain>
910where
911    I: Send + Sync + Clone + 'static,
912    O: Send + Sync + Clone + 'static,
913    Chain: StepChain<I, O> + Send + Sync + 'static,
914{
915    /// Add a description to this emit.
916    pub fn desc(mut self, description: &str) -> Self {
917        if let Some(SpawnRule::Dynamic { metadata, .. }) =
918            self.pipeline.spawn_rules.get_mut(self.rule_index)
919        {
920            metadata.description = Some(description.to_string());
921        }
922        self
923    }
924
925    /// Add a tag to this emit.
926    pub fn tag(mut self, key: &str, value: &str) -> Self {
927        if let Some(SpawnRule::Dynamic { metadata, .. }) =
928            self.pipeline.spawn_rules.get_mut(self.rule_index)
929        {
930            metadata.tags.insert(key.to_string(), value.to_string());
931        }
932        self
933    }
934
935    /// Declare follow-up tasks to spawn on successful completion (deprecated, use emit).
936    #[deprecated(since = "0.5.0", note = "Use emit() instead")]
937    pub fn spawn_from<T, F>(self, target: &'static str, generator: F) -> EmitBuilder<I, O, Chain>
938    where
939        T: Serialize + 'static,
940        F: Fn(&O) -> Vec<T> + Send + Sync + 'static,
941    {
942        self.emit(target, generator)
943    }
944
945    /// Add another step to the pipeline.
946    pub fn then<S>(self, step: S) -> StepBuilder<I, S::Output, impl StepChain<I, S::Output>>
947    where
948        S: Step<Input = O> + 'static,
949    {
950        let mut pipeline = Pipeline {
951            name: self.pipeline.name,
952            chain: ThenChain {
953                first: self.pipeline.chain,
954                step: StepWrapper(step),
955                _phantom: std::marker::PhantomData,
956            },
957            retry_policy: self.pipeline.retry_policy,
958            recorder: self.pipeline.recorder,
959            spawn_rules: Vec::new(),
960            step_metadata: self.pipeline.step_metadata,
961            _phantom: std::marker::PhantomData,
962        };
963        pipeline.step_metadata.push(Metadata::default());
964        let step_index = pipeline.step_metadata.len() - 1;
965        StepBuilder { pipeline, step_index }
966    }
967
968    /// Conditionally fork to a target pipeline.
969    pub fn fork_when<F>(mut self, predicate: F, target: &'static str) -> ForkBuilder<I, O, Chain>
970    where
971        F: Fn(&O) -> bool + Send + Sync + 'static,
972    {
973        self.pipeline.spawn_rules.push(SpawnRule::Fork {
974            target,
975            predicate: Arc::new(predicate),
976            metadata: Metadata::default(),
977        });
978        let rule_index = self.pipeline.spawn_rules.len() - 1;
979        ForkBuilder {
980            pipeline: self.pipeline,
981            rule_index,
982        }
983    }
984
985    /// Fan out to multiple targets.
986    pub fn fan_out(mut self, targets: &[&'static str]) -> FanOutBuilder<I, O, Chain> {
987        self.pipeline.spawn_rules.push(SpawnRule::FanOut {
988            targets: targets.to_vec(),
989            metadata: Metadata::default(),
990        });
991        let rule_index = self.pipeline.spawn_rules.len() - 1;
992        FanOutBuilder {
993            pipeline: self.pipeline,
994            rule_index,
995        }
996    }
997
998    /// Emit tasks dynamically.
999    pub fn emit<T, F>(mut self, target: &'static str, generator: F) -> EmitBuilder<I, O, Chain>
1000    where
1001        T: Serialize + 'static,
1002        F: Fn(&O) -> Vec<T> + Send + Sync + 'static,
1003    {
1004        self.pipeline.spawn_rules.push(SpawnRule::Dynamic {
1005            target,
1006            generator: Arc::new(move |output| {
1007                generator(output)
1008                    .into_iter()
1009                    .filter_map(|item| serde_json::to_value(item).ok())
1010                    .collect()
1011            }),
1012            metadata: Metadata::default(),
1013        });
1014        let rule_index = self.pipeline.spawn_rules.len() - 1;
1015        EmitBuilder {
1016            pipeline: self.pipeline,
1017            rule_index,
1018        }
1019    }
1020
1021    /// Set retry policy.
1022    pub fn with_retry(mut self, policy: RetryPolicy) -> Self {
1023        self.pipeline.retry_policy = policy;
1024        self
1025    }
1026
1027    /// Set recorder.
1028    pub fn with_recorder<R: Recorder + 'static>(mut self, recorder: R) -> Self {
1029        self.pipeline.recorder = Arc::new(recorder);
1030        self
1031    }
1032
1033    /// Build the pipeline.
1034    pub fn build(self) -> BuiltPipeline<I, O, Chain> {
1035        BuiltPipeline {
1036            name: self.pipeline.name,
1037            chain: self.pipeline.chain,
1038            retry_policy: self.pipeline.retry_policy,
1039            recorder: self.pipeline.recorder,
1040            spawn_rules: self.pipeline.spawn_rules,
1041            step_metadata: self.pipeline.step_metadata,
1042            _phantom: std::marker::PhantomData,
1043        }
1044    }
1045}
1046
1047/// Chain that runs first chain then a step.
1048#[doc(hidden)]
1049pub struct ThenChain<First, S, I, M, O>
1050where
1051    First: StepChain<I, M>,
1052    S: BoxedStep<M, O>,
1053{
1054    pub first: First,
1055    pub step: S,
1056    pub _phantom: std::marker::PhantomData<(I, M, O)>,
1057}
1058
1059#[async_trait]
1060impl<First, S, I, M, O> StepChain<I, O> for ThenChain<First, S, I, M, O>
1061where
1062    I: Send + Sync + Clone + 'static,
1063    M: Send + Sync + Clone + 'static,
1064    O: Send + Sync + 'static,
1065    First: StepChain<I, M> + Send + Sync,
1066    S: BoxedStep<M, O> + Send + Sync,
1067{
1068    async fn run(
1069        &self,
1070        input: I,
1071        run_id: RunId,
1072        recorder: &dyn Recorder,
1073        retry_policy: &RetryPolicy,
1074        start_index: u32,
1075    ) -> Result<O, PipelineError> {
1076        // Run first chain
1077        let mid = self
1078            .first
1079            .run(input, run_id, recorder, retry_policy, start_index)
1080            .await?;
1081
1082        let next_index = start_index + self.first.step_count();
1083
1084        let step_name = self.step.name();
1085        let step_id = recorder.start_step(run_id, step_name, next_index).await?;
1086
1087        // Execute with retry
1088        let mut attempt = 0u32;
1089        let output = loop {
1090            attempt += 1;
1091            match self.step.execute(mid.clone()).await {
1092                Ok(output) => break output,
1093                Err(StepError::Permanent(e)) => {
1094                    recorder
1095                        .complete_step(
1096                            step_id,
1097                            StepStatus::Failed {
1098                                error: e.to_string(),
1099                                attempt,
1100                            },
1101                        )
1102                        .await?;
1103                    return Err(PipelineError::StepFailed {
1104                        step: step_name,
1105                        source: e,
1106                    });
1107                }
1108                Err(StepError::Retryable(e)) => {
1109                    if let Some(delay) = retry_policy.delay_for_attempt(attempt) {
1110                        tokio::time::sleep(delay).await;
1111                    } else {
1112                        recorder
1113                            .complete_step(
1114                                step_id,
1115                                StepStatus::Failed {
1116                                    error: e.to_string(),
1117                                    attempt,
1118                                },
1119                            )
1120                            .await?;
1121                        return Err(PipelineError::RetriesExhausted {
1122                            step: step_name,
1123                            attempts: attempt,
1124                            source: e,
1125                        });
1126                    }
1127                }
1128            }
1129        };
1130
1131        recorder
1132            .complete_step(step_id, StepStatus::Completed)
1133            .await?;
1134        Ok(output)
1135    }
1136
1137    fn step_count(&self) -> u32 {
1138        self.first.step_count() + 1
1139    }
1140
1141    fn collect_step_names(&self, names: &mut Vec<&'static str>) {
1142        self.first.collect_step_names(names);
1143        names.push(self.step.name());
1144    }
1145}
1146
1147/// A built pipeline ready for execution.
1148pub struct BuiltPipeline<I, O, Chain>
1149where
1150    Chain: StepChain<I, O>,
1151{
1152    name: &'static str,
1153    chain: Chain,
1154    retry_policy: RetryPolicy,
1155    recorder: Arc<dyn Recorder>,
1156    pub(crate) spawn_rules: Vec<SpawnRule<O>>,
1157    step_metadata: Vec<Metadata>,
1158    _phantom: std::marker::PhantomData<(I, O)>,
1159}
1160
1161impl<I, O, Chain> BuiltPipeline<I, O, Chain>
1162where
1163    I: Send + Clone + HasEntityId + 'static,
1164    O: Send + Serialize + 'static,
1165    Chain: StepChain<I, O> + Send + Sync,
1166{
1167    /// Execute the pipeline with the given input.
1168    pub async fn run(&self, input: I) -> Result<O, PipelineError> {
1169        let entity_id = input.entity_id();
1170        let run_id = self.recorder.start_run(self.name, &entity_id).await?;
1171
1172        match self
1173            .chain
1174            .run(input, run_id, self.recorder.as_ref(), &self.retry_policy, 0)
1175            .await
1176        {
1177            Ok(output) => {
1178                self.recorder
1179                    .complete_run(run_id, RunStatus::Completed)
1180                    .await?;
1181                Ok(output)
1182            }
1183            Err(e) => {
1184                self.recorder
1185                    .complete_run(
1186                        run_id,
1187                        RunStatus::Failed {
1188                            error: e.to_string(),
1189                        },
1190                    )
1191                    .await?;
1192                Err(e)
1193            }
1194        }
1195    }
1196
1197    /// Get the pipeline name.
1198    pub fn name(&self) -> &'static str {
1199        self.name
1200    }
1201
1202    /// Get spawned tasks for the given output.
1203    pub fn get_spawned(&self, output: &O) -> Vec<(&'static str, serde_json::Value)> {
1204        let mut spawned = Vec::new();
1205
1206        for rule in &self.spawn_rules {
1207            match rule {
1208                SpawnRule::Fork {
1209                    target, predicate, ..
1210                } => {
1211                    if predicate(output) {
1212                        if let Ok(value) = serde_json::to_value(output) {
1213                            spawned.push((*target, value));
1214                        }
1215                    }
1216                }
1217                SpawnRule::FanOut { targets, .. } => {
1218                    if let Ok(value) = serde_json::to_value(output) {
1219                        for target in targets {
1220                            spawned.push((*target, value.clone()));
1221                        }
1222                    }
1223                }
1224                SpawnRule::Dynamic { target, generator, .. } => {
1225                    for input in generator(output) {
1226                        spawned.push((*target, input));
1227                    }
1228                }
1229            }
1230        }
1231
1232        spawned
1233    }
1234
1235    /// Export the pipeline structure as a graph for visualization.
1236    pub fn to_graph(&self) -> PipelineGraph {
1237        let mut step_names = Vec::new();
1238        self.chain.collect_step_names(&mut step_names);
1239
1240        let steps: Vec<StepNode> = step_names
1241            .into_iter()
1242            .enumerate()
1243            .map(|(index, name)| StepNode {
1244                name: name.to_string(),
1245                index,
1246                metadata: self
1247                    .step_metadata
1248                    .get(index)
1249                    .cloned()
1250                    .unwrap_or_default(),
1251            })
1252            .collect();
1253
1254        let mut forks = Vec::new();
1255        let mut fan_outs = Vec::new();
1256        let mut emits = Vec::new();
1257
1258        for rule in &self.spawn_rules {
1259            match rule {
1260                SpawnRule::Fork {
1261                    target,
1262                    metadata,
1263                    ..
1264                } => {
1265                    forks.push(ForkNode {
1266                        target_pipeline: target.to_string(),
1267                        condition: metadata
1268                            .description
1269                            .clone()
1270                            .unwrap_or_else(|| format!("fork to {}", target)),
1271                        metadata: metadata.clone(),
1272                    });
1273                }
1274                SpawnRule::FanOut { targets, metadata } => {
1275                    fan_outs.push(FanOutNode {
1276                        targets: targets.iter().map(|s| s.to_string()).collect(),
1277                        metadata: metadata.clone(),
1278                    });
1279                }
1280                SpawnRule::Dynamic { target, metadata, .. } => {
1281                    emits.push(EmitNode {
1282                        target_pipeline: target.to_string(),
1283                        metadata: metadata.clone(),
1284                    });
1285                }
1286            }
1287        }
1288
1289        PipelineGraph {
1290            name: self.name.to_string(),
1291            steps,
1292            forks,
1293            fan_outs,
1294            emits,
1295        }
1296    }
1297}