Skip to main content

ai_lib_rust/pipeline/
mod.rs

1//! Pipeline interpreter layer - the core operator execution engine
2//!
3//! This module implements the operator pipeline that processes streaming responses
4//! according to protocol configuration. The pipeline consists of:
5//! - Decoder: Parses raw bytes into frames
6//! - Transforms: A sequence of optional operators (Selector, Accumulator, FanOut, etc.)
7//! - EventMapper: Converts frames to unified events
8
9pub mod accumulate;
10pub mod decode;
11pub mod event_map;
12pub mod fan_out;
13pub mod select;
14
15// Resilience Operators
16pub mod fallback;
17pub mod retry;
18
19#[cfg(test)]
20mod tests;
21
22use crate::protocol::ProtocolManifest;
23use crate::types::events::StreamingEvent;
24use crate::{BoxStream, PipeResult};
25
26/// Core transformer interface: all logic operators follow this unified abstraction
27#[async_trait::async_trait]
28pub trait Transform: Send + Sync {
29    /// A transform takes a stream of JSON values and returns a new stream of JSON values
30    async fn transform(
31        &self,
32        input: BoxStream<'static, serde_json::Value>,
33    ) -> PipeResult<BoxStream<'static, serde_json::Value>>;
34}
35
36/// Specialized mapper for the final stage of the pipeline
37#[async_trait::async_trait]
38pub trait Mapper: Send + Sync {
39    /// A mapper takes a stream of JSON values and returns a stream of unified events
40    async fn map(
41        &self,
42        input: BoxStream<'static, serde_json::Value>,
43    ) -> PipeResult<BoxStream<'static, StreamingEvent>>;
44}
45
46/// Decoder trait for stream decoding
47#[async_trait::async_trait]
48pub trait Decoder: Send + Sync {
49    /// Decode a byte stream into JSON values
50    async fn decode_stream(
51        &self,
52        input: BoxStream<'static, bytes::Bytes>,
53    ) -> PipeResult<BoxStream<'static, serde_json::Value>>;
54}
55
56/// Pipeline error types
57#[derive(Debug, thiserror::Error)]
58pub enum PipelineError {
59    #[error("Decoder error: {0}")]
60    Decoder(String),
61
62    #[error("Selector error: {0}")]
63    Selector(String),
64
65    #[error("Accumulator error: {0}")]
66    Accumulator(String),
67
68    #[error("Event mapper error: {0}")]
69    EventMapper(String),
70
71    #[error("Configuration error: {0}")]
72    Configuration(String),
73
74    #[error("Missing required field: {name}{}", .hint.as_ref().map(|h| format!("\n💡 Hint: {}", h)).unwrap_or_default())]
75    MissingField { name: String, hint: Option<String> },
76
77    #[error("Invalid JSON path: {path} - {error}{}", .hint.as_ref().map(|h| format!("\n💡 Hint: {}", h)).unwrap_or_default())]
78    InvalidJsonPath {
79        path: String,
80        error: String,
81        hint: Option<String>,
82    },
83
84    #[error("Operator execution failed: {operator} - {reason}{}", .hint.as_ref().map(|h| format!("\n💡 Hint: {}", h)).unwrap_or_default())]
85    Execution {
86        operator: String,
87        reason: String,
88        hint: Option<String>,
89    },
90}
91
92impl PipelineError {
93    /// Attach an actionable hint to the error
94    pub fn with_hint(mut self, hint: impl Into<String>) -> Self {
95        let hint_val = Some(hint.into());
96        match self {
97            PipelineError::MissingField { ref mut hint, .. } => *hint = hint_val,
98            PipelineError::InvalidJsonPath { ref mut hint, .. } => *hint = hint_val,
99            PipelineError::Execution { ref mut hint, .. } => *hint = hint_val,
100            _ => (),
101        }
102        self
103    }
104}
105
106/// Pipeline builder that constructs the operator chain from protocol manifest
107pub struct PipelineBuilder {
108    decoder: Option<Box<dyn Decoder>>,
109    transforms: Vec<Box<dyn Transform>>,
110    mapper: Option<Box<dyn Mapper>>,
111}
112
113impl PipelineBuilder {
114    pub fn new() -> Self {
115        Self {
116            decoder: None,
117            transforms: Vec::new(),
118            mapper: None,
119        }
120    }
121
122    pub fn set_decoder(mut self, decoder: Box<dyn Decoder>) -> Self {
123        self.decoder = Some(decoder);
124        self
125    }
126
127    pub fn add_transform(mut self, transform: Box<dyn Transform>) -> Self {
128        self.transforms.push(transform);
129        self
130    }
131
132    pub fn set_mapper(mut self, mapper: Box<dyn Mapper>) -> Self {
133        self.mapper = Some(mapper);
134        self
135    }
136
137    pub fn build(self) -> Result<Pipeline, PipelineError> {
138        Ok(Pipeline {
139            decoder: self
140                .decoder
141                .ok_or_else(|| PipelineError::Configuration("Decoder is required".to_string()))?,
142            transforms: self.transforms,
143            mapper: self.mapper.ok_or_else(|| {
144                PipelineError::Configuration("Event mapper is required".to_string())
145            })?,
146        })
147    }
148}
149
150/// Pipeline that processes streaming responses
151pub struct Pipeline {
152    decoder: Box<dyn Decoder>,
153    transforms: Vec<Box<dyn Transform>>,
154    mapper: Box<dyn Mapper>,
155}
156
157impl Pipeline {
158    /// Create pipeline from protocol manifest
159    pub fn from_manifest(manifest: &ProtocolManifest) -> Result<Self, PipelineError> {
160        let mut builder = PipelineBuilder::new();
161
162        if let Some(streaming) = &manifest.streaming {
163            // 1. Build decoder
164            if let Some(decoder_config) = &streaming.decoder {
165                builder = builder.set_decoder(decode::create_decoder(decoder_config)?);
166            } else {
167                return Err(PipelineError::Configuration(
168                    "streaming.decoder is required for streaming pipelines".to_string(),
169                ));
170            }
171
172            // 2. Build transforms in order
173            if let Some(frame_selector) = &streaming.frame_selector {
174                builder = builder.add_transform(select::create_selector(frame_selector)?);
175            }
176
177            if let Some(accumulator_config) = &streaming.accumulator {
178                builder =
179                    builder.add_transform(accumulate::create_accumulator(accumulator_config)?);
180            }
181
182            if let Some(candidate_config) = &streaming.candidate {
183                if candidate_config.fan_out.unwrap_or(false) {
184                    builder = builder.add_transform(fan_out::create_fan_out(candidate_config)?);
185                }
186            }
187
188            // 3. Build event mapper
189            // Prefer manifest-driven rules. If none provided, fallback to adapter-based defaults.
190            if !streaming.event_map.is_empty() {
191                builder = builder.set_mapper(event_map::create_event_mapper(&streaming.event_map)?);
192            } else {
193                let tool_use = manifest.tooling.as_ref().and_then(|t| t.tool_use.clone());
194                // Default: manifest-driven path mapping for OpenAI-compatible streaming
195                builder = builder.set_mapper(Box::new(event_map::PathEventMapper::new(
196                    streaming.content_path.clone(),
197                    streaming.tool_call_path.clone(),
198                    streaming.usage_path.clone(),
199                    tool_use,
200                )));
201            }
202        }
203
204        builder.build()
205    }
206
207    /// Process a byte stream through the pipeline
208    pub async fn process_stream(
209        &self,
210        input: BoxStream<'static, bytes::Bytes>,
211    ) -> PipeResult<BoxStream<'static, StreamingEvent>> {
212        // 1. Start with decoding: Bytes -> JSON Value
213        let mut stream = self.decoder.decode_stream(input).await?;
214
215        // 2. Apply all transforms in sequence: Value -> Value
216        for transform in &self.transforms {
217            stream = transform.transform(stream).await?;
218        }
219
220        // 3. Final mapping to events: Value -> Event
221        let events = self.mapper.map(stream).await?;
222
223        Ok(events)
224    }
225
226    pub async fn process_stream_arc(
227        self: std::sync::Arc<Self>,
228        input: BoxStream<'static, bytes::Bytes>,
229    ) -> PipeResult<BoxStream<'static, StreamingEvent>> {
230        self.process_stream(input).await
231    }
232}
233
234impl Default for PipelineBuilder {
235    fn default() -> Self {
236        Self::new()
237    }
238}