Skip to main content

ai_lib_rust/pipeline/
mod.rs

1//! 流水线处理模块:实现流式响应处理的核心算子执行引擎。
2//!
3//! # Pipeline Interpreter Layer
4//!
5//! This module implements the operator pipeline that processes streaming responses
6//! according to protocol configuration. It is the core execution engine that
7//! transforms raw provider responses into unified streaming events.
8//!
9//! ## Overview
10//!
11//! The pipeline architecture provides:
12//! - **Protocol-Driven Processing**: Pipeline structure defined by protocol manifest
13//! - **Composable Operators**: Mix and match transforms for different providers
14//! - **Streaming-First**: Efficient byte-level streaming throughout
15//! - **Type-Safe Events**: Strongly typed output events for application consumption
16//!
17//! ## Pipeline Stages
18//!
19//! ```text
20//! Raw Bytes → Decoder → Transforms → Event Mapper → Unified Events
21//!     │           │          │              │
22//!     │        JSON/SSE   Selector,      Content,
23//!     │        parsing    Accumulator,   ToolCall,
24//!   HTTP                  FanOut...      Usage events
25//! ```
26//!
27//! ## Key Components
28//!
29//! | Component | Description |
30//! |-----------|-------------|
31//! | [`Pipeline`] | Main pipeline executor |
32//! | [`PipelineBuilder`] | Builder for constructing pipelines |
33//! | [`Decoder`] | Trait for stream decoding (SSE, JSON Lines) |
34//! | [`Transform`] | Trait for intermediate transformations |
35//! | [`Mapper`] | Trait for final event mapping |
36//!
37//! ## Submodules
38//!
39//! | Module | Description |
40//! |--------|-------------|
41//! | [`decode`] | Stream decoders (SSE, JSON Lines, raw) |
42//! | [`select`] | Frame selection operators (JSON path) |
43//! | [`accumulate`] | Content accumulation operators |
44//! | [`fan_out`] | Multi-candidate fan-out operators |
45//! | [`event_map`] | Event mapping to unified format |
46//! | [`retry`] | Retry operators with backoff |
47//! | [`fallback`] | Fallback operators for resilience |
48//!
49//! ## Example
50//!
51//! ```rust,no_run
52//! use ai_lib_rust::pipeline::{Pipeline, PipelineBuilder};
53//! use ai_lib_rust::protocol::ProtocolManifest;
54//!
55//! // Create pipeline from protocol manifest
56//! let manifest: ProtocolManifest = todo!(); // Load from file
57//! let pipeline = Pipeline::from_manifest(&manifest)?;
58//!
59//! // Process a streaming response
60//! // let events = pipeline.process_stream(byte_stream).await?;
61//! # Ok::<(), ai_lib_rust::pipeline::PipelineError>(())
62//! ```
63
64pub mod accumulate;
65pub mod decode;
66pub mod event_map;
67pub mod fan_out;
68pub mod select;
69
70// Resilience Operators
71pub mod fallback;
72pub mod retry;
73
74#[cfg(test)]
75mod tests;
76
77use crate::protocol::ProtocolManifest;
78use crate::types::events::StreamingEvent;
79use crate::{BoxStream, PipeResult};
80
81/// Core transformer interface: all logic operators follow this unified abstraction
82#[async_trait::async_trait]
83pub trait Transform: Send + Sync {
84    /// A transform takes a stream of JSON values and returns a new stream of JSON values
85    async fn transform(
86        &self,
87        input: BoxStream<'static, serde_json::Value>,
88    ) -> PipeResult<BoxStream<'static, serde_json::Value>>;
89}
90
91/// Specialized mapper for the final stage of the pipeline
92#[async_trait::async_trait]
93pub trait Mapper: Send + Sync {
94    /// A mapper takes a stream of JSON values and returns a stream of unified events
95    async fn map(
96        &self,
97        input: BoxStream<'static, serde_json::Value>,
98    ) -> PipeResult<BoxStream<'static, StreamingEvent>>;
99}
100
101/// Decoder trait for stream decoding
102#[async_trait::async_trait]
103pub trait Decoder: Send + Sync {
104    /// Decode a byte stream into JSON values
105    async fn decode_stream(
106        &self,
107        input: BoxStream<'static, bytes::Bytes>,
108    ) -> PipeResult<BoxStream<'static, serde_json::Value>>;
109}
110
111/// Pipeline error types
112#[derive(Debug, thiserror::Error)]
113pub enum PipelineError {
114    #[error("Decoder error: {0}")]
115    Decoder(String),
116
117    #[error("Selector error: {0}")]
118    Selector(String),
119
120    #[error("Accumulator error: {0}")]
121    Accumulator(String),
122
123    #[error("Event mapper error: {0}")]
124    EventMapper(String),
125
126    #[error("Configuration error: {0}")]
127    Configuration(String),
128
129    #[error("Missing required field: {name}{}", .hint.as_ref().map(|h| format!("\n💡 Hint: {}", h)).unwrap_or_default())]
130    MissingField { name: String, hint: Option<String> },
131
132    #[error("Invalid JSON path: {path} - {error}{}", .hint.as_ref().map(|h| format!("\n💡 Hint: {}", h)).unwrap_or_default())]
133    InvalidJsonPath {
134        path: String,
135        error: String,
136        hint: Option<String>,
137    },
138
139    #[error("Operator execution failed: {operator} - {reason}{}", .hint.as_ref().map(|h| format!("\n💡 Hint: {}", h)).unwrap_or_default())]
140    Execution {
141        operator: String,
142        reason: String,
143        hint: Option<String>,
144    },
145}
146
147impl PipelineError {
148    /// Attach an actionable hint to the error
149    pub fn with_hint(mut self, hint: impl Into<String>) -> Self {
150        let hint_val = Some(hint.into());
151        match self {
152            PipelineError::MissingField { ref mut hint, .. } => *hint = hint_val,
153            PipelineError::InvalidJsonPath { ref mut hint, .. } => *hint = hint_val,
154            PipelineError::Execution { ref mut hint, .. } => *hint = hint_val,
155            _ => (),
156        }
157        self
158    }
159}
160
161/// Pipeline builder that constructs the operator chain from protocol manifest
162pub struct PipelineBuilder {
163    decoder: Option<Box<dyn Decoder>>,
164    transforms: Vec<Box<dyn Transform>>,
165    mapper: Option<Box<dyn Mapper>>,
166}
167
168impl PipelineBuilder {
169    pub fn new() -> Self {
170        Self {
171            decoder: None,
172            transforms: Vec::new(),
173            mapper: None,
174        }
175    }
176
177    pub fn set_decoder(mut self, decoder: Box<dyn Decoder>) -> Self {
178        self.decoder = Some(decoder);
179        self
180    }
181
182    pub fn add_transform(mut self, transform: Box<dyn Transform>) -> Self {
183        self.transforms.push(transform);
184        self
185    }
186
187    pub fn set_mapper(mut self, mapper: Box<dyn Mapper>) -> Self {
188        self.mapper = Some(mapper);
189        self
190    }
191
192    pub fn build(self) -> Result<Pipeline, PipelineError> {
193        Ok(Pipeline {
194            decoder: self
195                .decoder
196                .ok_or_else(|| PipelineError::Configuration("Decoder is required".to_string()))?,
197            transforms: self.transforms,
198            mapper: self.mapper.ok_or_else(|| {
199                PipelineError::Configuration("Event mapper is required".to_string())
200            })?,
201        })
202    }
203}
204
205/// Pipeline that processes streaming responses
206pub struct Pipeline {
207    decoder: Box<dyn Decoder>,
208    transforms: Vec<Box<dyn Transform>>,
209    mapper: Box<dyn Mapper>,
210}
211
212impl Pipeline {
213    /// Create pipeline from protocol manifest
214    pub fn from_manifest(manifest: &ProtocolManifest) -> Result<Self, PipelineError> {
215        let mut builder = PipelineBuilder::new();
216
217        if let Some(streaming) = &manifest.streaming {
218            // 1. Build decoder
219            if let Some(decoder_config) = &streaming.decoder {
220                builder = builder.set_decoder(decode::create_decoder(decoder_config)?);
221            } else {
222                return Err(PipelineError::Configuration(
223                    "streaming.decoder is required for streaming pipelines".to_string(),
224                ));
225            }
226
227            // 2. Build transforms in order
228            if let Some(frame_selector) = &streaming.frame_selector {
229                builder = builder.add_transform(select::create_selector(frame_selector)?);
230            }
231
232            if let Some(accumulator_config) = &streaming.accumulator {
233                builder =
234                    builder.add_transform(accumulate::create_accumulator(accumulator_config)?);
235            }
236
237            if let Some(candidate_config) = &streaming.candidate {
238                if candidate_config.fan_out.unwrap_or(false) {
239                    builder = builder.add_transform(fan_out::create_fan_out(candidate_config)?);
240                }
241            }
242
243            // 3. Build event mapper
244            // Prefer manifest-driven rules. If none provided, fallback to adapter-based defaults.
245            if !streaming.event_map.is_empty() {
246                builder = builder.set_mapper(event_map::create_event_mapper(&streaming.event_map)?);
247            } else {
248                let tool_use = manifest.tooling.as_ref().and_then(|t| t.tool_use.clone());
249                // Default: manifest-driven path mapping for OpenAI-compatible streaming
250                builder = builder.set_mapper(Box::new(event_map::PathEventMapper::new(
251                    streaming.content_path.clone(),
252                    streaming.tool_call_path.clone(),
253                    streaming.usage_path.clone(),
254                    tool_use,
255                )));
256            }
257        }
258
259        builder.build()
260    }
261
262    /// Process a byte stream through the pipeline
263    pub async fn process_stream(
264        &self,
265        input: BoxStream<'static, bytes::Bytes>,
266    ) -> PipeResult<BoxStream<'static, StreamingEvent>> {
267        // 1. Start with decoding: Bytes -> JSON Value
268        let mut stream = self.decoder.decode_stream(input).await?;
269
270        // 2. Apply all transforms in sequence: Value -> Value
271        for transform in &self.transforms {
272            stream = transform.transform(stream).await?;
273        }
274
275        // 3. Final mapping to events: Value -> Event
276        let events = self.mapper.map(stream).await?;
277
278        Ok(events)
279    }
280
281    pub async fn process_stream_arc(
282        self: std::sync::Arc<Self>,
283        input: BoxStream<'static, bytes::Bytes>,
284    ) -> PipeResult<BoxStream<'static, StreamingEvent>> {
285        self.process_stream(input).await
286    }
287}
288
289impl Default for PipelineBuilder {
290    fn default() -> Self {
291        Self::new()
292    }
293}