Skip to main content

entelix_runnable/
ext.rs

1//! Extension trait providing `.pipe()` and the standard composition
2//! adapters on every `Runnable<I, O>`.
3
4use std::sync::Arc;
5use std::time::Duration;
6
7use entelix_core::transports::RetryPolicy;
8use entelix_core::{ExecutionContext, Result};
9
10use crate::configured::Configured;
11use crate::fallback::Fallback;
12use crate::mapping::Mapping;
13use crate::retrying::Retrying;
14use crate::runnable::Runnable;
15use crate::sequence::RunnableSequence;
16use crate::stream::{BoxStream, StreamChunk, StreamMode};
17use crate::timed::Timed;
18
19/// Ergonomic composition surface, blanket-implemented for every
20/// `Runnable<I, O>`.
21///
22/// Every method returns a concrete `Runnable<I, O>` — composition
23/// stays zero-cost in the steady state. Boxing happens only at the
24/// explicit `erase()` boundary
25/// ([`AnyRunnable`](crate::any_runnable::AnyRunnable), F12).
26#[async_trait::async_trait]
27pub trait RunnableExt<I, O>: Runnable<I, O> + Sized + 'static
28where
29    I: Send + 'static,
30    O: Send + 'static,
31{
32    /// Chain this runnable into `next`. The output `O` of `self`
33    /// becomes the input of `next`, producing a `Runnable<I, P>`.
34    ///
35    /// ```ignore
36    /// let chain = prompt.pipe(model).pipe(parser);
37    /// ```
38    fn pipe<P, R>(self, next: R) -> RunnableSequence<I, O, P>
39    where
40        P: Send + 'static,
41        R: Runnable<O, P> + 'static,
42    {
43        RunnableSequence::new(Arc::new(self), Arc::new(next))
44    }
45
46    /// Wrap `self` with retry semantics. The returned runnable
47    /// re-invokes the inner on transient errors per the `policy`.
48    /// The input must be `Clone` because each retry receives a
49    /// fresh copy.
50    ///
51    /// ```ignore
52    /// let resilient = model.with_retry(RetryPolicy::standard());
53    /// ```
54    fn with_retry(self, policy: RetryPolicy) -> Retrying<Self, I, O>
55    where
56        I: Clone,
57    {
58        Retrying::new(self, policy)
59    }
60
61    /// Wrap `self` with an ordered fallback chain. On a transient
62    /// error from the primary, the adapter tries each fallback in
63    /// turn. Permanent errors surface immediately. The classifier
64    /// is the same trait used by [`Self::with_retry`] —
65    /// `entelix_core::transports::DefaultRetryClassifier` by default.
66    ///
67    /// ```ignore
68    /// let resilient = primary.with_fallbacks(vec![secondary, tertiary]);
69    /// ```
70    fn with_fallbacks<F>(self, fallbacks: Vec<F>) -> Fallback<Self, F, I, O>
71    where
72        F: Runnable<I, O> + 'static,
73        I: Clone,
74    {
75        Fallback::new(self, fallbacks)
76    }
77
78    /// Map the inner's output through a pure synchronous function.
79    /// Equivalent to piping into a `RunnableLambda` but skipping the
80    /// async wrapper.
81    ///
82    /// ```ignore
83    /// let lengths = strings.map(|s: String| s.len());
84    /// ```
85    fn map<F, P>(self, f: F) -> Mapping<Self, F, I, O, P>
86    where
87        F: Fn(O) -> P + Send + Sync + 'static,
88        P: Send + 'static,
89    {
90        Mapping::new(self, f)
91    }
92
93    /// Run `configurer` on a cloned [`ExecutionContext`] before
94    /// delegating to the inner. The caller's `ctx` is not mutated.
95    ///
96    /// ```ignore
97    /// let with_short_deadline = inner.with_config(|ctx| {
98    ///     // ctx mutations apply only to the inner invocation
99    /// });
100    /// ```
101    fn with_config<F>(self, configurer: F) -> Configured<Self, F, I, O>
102    where
103        F: Fn(&mut ExecutionContext) + Send + Sync + 'static,
104    {
105        Configured::new(self, configurer)
106    }
107
108    /// Race the inner against a wall-clock timeout. On expiry the
109    /// adapter returns
110    /// [`Error::DeadlineExceeded`](entelix_core::Error::DeadlineExceeded);
111    /// caller cancellation still wins.
112    ///
113    /// ```ignore
114    /// let bounded = inner.with_timeout(Duration::from_secs(30));
115    /// ```
116    fn with_timeout(self, timeout: Duration) -> Timed<Self, I, O> {
117        Timed::new(self, timeout)
118    }
119
120    /// Convenience wrapper around [`Runnable::stream`] — same
121    /// arguments, no trait import needed at the call site.
122    async fn stream_with(
123        &self,
124        input: I,
125        mode: StreamMode,
126        ctx: &ExecutionContext,
127    ) -> Result<BoxStream<'_, Result<StreamChunk<O>>>> {
128        self.stream(input, mode, ctx).await
129    }
130}
131
132impl<T, I, O> RunnableExt<I, O> for T
133where
134    T: Runnable<I, O> + Sized + 'static,
135    I: Send + 'static,
136    O: Send + 'static,
137{
138}