entelix_runnable/ext.rs
1//! Extension trait providing `.pipe()` and the standard composition
2//! adapters on every `Runnable<I, O>`.
3
4use std::sync::Arc;
5use std::time::Duration;
6
7use entelix_core::transports::RetryPolicy;
8use entelix_core::{ExecutionContext, Result};
9
10use crate::configured::Configured;
11use crate::fallback::Fallback;
12use crate::mapping::Mapping;
13use crate::retrying::Retrying;
14use crate::runnable::Runnable;
15use crate::sequence::RunnableSequence;
16use crate::stream::{BoxStream, StreamChunk, StreamMode};
17use crate::timed::Timed;
18
19/// Ergonomic composition surface, blanket-implemented for every
20/// `Runnable<I, O>`.
21///
22/// Every method returns a concrete `Runnable<I, O>` — composition
23/// stays zero-cost in the steady state. Boxing happens only at the
24/// explicit `erase()` boundary
25/// ([`AnyRunnable`](crate::any_runnable::AnyRunnable), F12).
26#[async_trait::async_trait]
27pub trait RunnableExt<I, O>: Runnable<I, O> + Sized + 'static
28where
29 I: Send + 'static,
30 O: Send + 'static,
31{
32 /// Chain this runnable into `next`. The output `O` of `self`
33 /// becomes the input of `next`, producing a `Runnable<I, P>`.
34 ///
35 /// ```ignore
36 /// let chain = prompt.pipe(model).pipe(parser);
37 /// ```
38 fn pipe<P, R>(self, next: R) -> RunnableSequence<I, O, P>
39 where
40 P: Send + 'static,
41 R: Runnable<O, P> + 'static,
42 {
43 RunnableSequence::new(Arc::new(self), Arc::new(next))
44 }
45
46 /// Wrap `self` with retry semantics. The returned runnable
47 /// re-invokes the inner on transient errors per the `policy`.
48 /// The input must be `Clone` because each retry receives a
49 /// fresh copy.
50 ///
51 /// ```ignore
52 /// let resilient = model.with_retry(RetryPolicy::standard());
53 /// ```
54 fn with_retry(self, policy: RetryPolicy) -> Retrying<Self, I, O>
55 where
56 I: Clone,
57 {
58 Retrying::new(self, policy)
59 }
60
61 /// Wrap `self` with an ordered fallback chain. On a transient
62 /// error from the primary, the adapter tries each fallback in
63 /// turn. Permanent errors surface immediately. The classifier
64 /// is the same trait used by [`Self::with_retry`] —
65 /// `entelix_core::transports::DefaultRetryClassifier` by default.
66 ///
67 /// ```ignore
68 /// let resilient = primary.with_fallbacks(vec![secondary, tertiary]);
69 /// ```
70 fn with_fallbacks<F>(self, fallbacks: Vec<F>) -> Fallback<Self, F, I, O>
71 where
72 F: Runnable<I, O> + 'static,
73 I: Clone,
74 {
75 Fallback::new(self, fallbacks)
76 }
77
78 /// Map the inner's output through a pure synchronous function.
79 /// Equivalent to piping into a `RunnableLambda` but skipping the
80 /// async wrapper.
81 ///
82 /// ```ignore
83 /// let lengths = strings.map(|s: String| s.len());
84 /// ```
85 fn map<F, P>(self, f: F) -> Mapping<Self, F, I, O, P>
86 where
87 F: Fn(O) -> P + Send + Sync + 'static,
88 P: Send + 'static,
89 {
90 Mapping::new(self, f)
91 }
92
93 /// Run `configurer` on a cloned [`ExecutionContext`] before
94 /// delegating to the inner. The caller's `ctx` is not mutated.
95 ///
96 /// ```ignore
97 /// let with_short_deadline = inner.with_config(|ctx| {
98 /// // ctx mutations apply only to the inner invocation
99 /// });
100 /// ```
101 fn with_config<F>(self, configurer: F) -> Configured<Self, F, I, O>
102 where
103 F: Fn(&mut ExecutionContext) + Send + Sync + 'static,
104 {
105 Configured::new(self, configurer)
106 }
107
108 /// Race the inner against a wall-clock timeout. On expiry the
109 /// adapter returns
110 /// [`Error::DeadlineExceeded`](entelix_core::Error::DeadlineExceeded);
111 /// caller cancellation still wins.
112 ///
113 /// ```ignore
114 /// let bounded = inner.with_timeout(Duration::from_secs(30));
115 /// ```
116 fn with_timeout(self, timeout: Duration) -> Timed<Self, I, O> {
117 Timed::new(self, timeout)
118 }
119
120 /// Convenience wrapper around [`Runnable::stream`] — same
121 /// arguments, no trait import needed at the call site.
122 async fn stream_with(
123 &self,
124 input: I,
125 mode: StreamMode,
126 ctx: &ExecutionContext,
127 ) -> Result<BoxStream<'_, Result<StreamChunk<O>>>> {
128 self.stream(input, mode, ctx).await
129 }
130}
131
132impl<T, I, O> RunnableExt<I, O> for T
133where
134 T: Runnable<I, O> + Sized + 'static,
135 I: Send + 'static,
136 O: Send + 'static,
137{
138}