ai_lib_rust/pipeline/
mod.rs1pub mod accumulate;
65pub mod decode;
66pub mod event_map;
67pub mod fan_out;
68pub mod select;
69
70pub mod fallback;
72pub mod retry;
73
74#[cfg(test)]
75mod tests;
76
77use crate::protocol::ProtocolManifest;
78use crate::types::events::StreamingEvent;
79use crate::{BoxStream, PipeResult};
80
81#[async_trait::async_trait]
83pub trait Transform: Send + Sync {
84 async fn transform(
86 &self,
87 input: BoxStream<'static, serde_json::Value>,
88 ) -> PipeResult<BoxStream<'static, serde_json::Value>>;
89}
90
91#[async_trait::async_trait]
93pub trait Mapper: Send + Sync {
94 async fn map(
96 &self,
97 input: BoxStream<'static, serde_json::Value>,
98 ) -> PipeResult<BoxStream<'static, StreamingEvent>>;
99}
100
101#[async_trait::async_trait]
103pub trait Decoder: Send + Sync {
104 async fn decode_stream(
106 &self,
107 input: BoxStream<'static, bytes::Bytes>,
108 ) -> PipeResult<BoxStream<'static, serde_json::Value>>;
109}
110
111#[derive(Debug, thiserror::Error)]
113pub enum PipelineError {
114 #[error("Decoder error: {0}")]
115 Decoder(String),
116
117 #[error("Selector error: {0}")]
118 Selector(String),
119
120 #[error("Accumulator error: {0}")]
121 Accumulator(String),
122
123 #[error("Event mapper error: {0}")]
124 EventMapper(String),
125
126 #[error("Configuration error: {0}")]
127 Configuration(String),
128
129 #[error("Missing required field: {name}{}", .hint.as_ref().map(|h| format!("\n💡 Hint: {}", h)).unwrap_or_default())]
130 MissingField { name: String, hint: Option<String> },
131
132 #[error("Invalid JSON path: {path} - {error}{}", .hint.as_ref().map(|h| format!("\n💡 Hint: {}", h)).unwrap_or_default())]
133 InvalidJsonPath {
134 path: String,
135 error: String,
136 hint: Option<String>,
137 },
138
139 #[error("Operator execution failed: {operator} - {reason}{}", .hint.as_ref().map(|h| format!("\n💡 Hint: {}", h)).unwrap_or_default())]
140 Execution {
141 operator: String,
142 reason: String,
143 hint: Option<String>,
144 },
145}
146
147impl PipelineError {
148 pub fn with_hint(mut self, hint: impl Into<String>) -> Self {
150 let hint_val = Some(hint.into());
151 match self {
152 PipelineError::MissingField { ref mut hint, .. } => *hint = hint_val,
153 PipelineError::InvalidJsonPath { ref mut hint, .. } => *hint = hint_val,
154 PipelineError::Execution { ref mut hint, .. } => *hint = hint_val,
155 _ => (),
156 }
157 self
158 }
159}
160
161pub struct PipelineBuilder {
163 decoder: Option<Box<dyn Decoder>>,
164 transforms: Vec<Box<dyn Transform>>,
165 mapper: Option<Box<dyn Mapper>>,
166}
167
168impl PipelineBuilder {
169 pub fn new() -> Self {
170 Self {
171 decoder: None,
172 transforms: Vec::new(),
173 mapper: None,
174 }
175 }
176
177 pub fn set_decoder(mut self, decoder: Box<dyn Decoder>) -> Self {
178 self.decoder = Some(decoder);
179 self
180 }
181
182 pub fn add_transform(mut self, transform: Box<dyn Transform>) -> Self {
183 self.transforms.push(transform);
184 self
185 }
186
187 pub fn set_mapper(mut self, mapper: Box<dyn Mapper>) -> Self {
188 self.mapper = Some(mapper);
189 self
190 }
191
192 pub fn build(self) -> Result<Pipeline, PipelineError> {
193 Ok(Pipeline {
194 decoder: self
195 .decoder
196 .ok_or_else(|| PipelineError::Configuration("Decoder is required".to_string()))?,
197 transforms: self.transforms,
198 mapper: self.mapper.ok_or_else(|| {
199 PipelineError::Configuration("Event mapper is required".to_string())
200 })?,
201 })
202 }
203}
204
205pub struct Pipeline {
207 decoder: Box<dyn Decoder>,
208 transforms: Vec<Box<dyn Transform>>,
209 mapper: Box<dyn Mapper>,
210}
211
212impl Pipeline {
213 pub fn from_manifest(manifest: &ProtocolManifest) -> Result<Self, PipelineError> {
215 let mut builder = PipelineBuilder::new();
216
217 if let Some(streaming) = &manifest.streaming {
218 if let Some(decoder_config) = &streaming.decoder {
220 builder = builder.set_decoder(decode::create_decoder(decoder_config)?);
221 } else {
222 return Err(PipelineError::Configuration(
223 "streaming.decoder is required for streaming pipelines".to_string(),
224 ));
225 }
226
227 if let Some(frame_selector) = &streaming.frame_selector {
229 builder = builder.add_transform(select::create_selector(frame_selector)?);
230 }
231
232 if let Some(accumulator_config) = &streaming.accumulator {
233 builder =
234 builder.add_transform(accumulate::create_accumulator(accumulator_config)?);
235 }
236
237 if let Some(candidate_config) = &streaming.candidate {
238 if candidate_config.fan_out.unwrap_or(false) {
239 builder = builder.add_transform(fan_out::create_fan_out(candidate_config)?);
240 }
241 }
242
243 if !streaming.event_map.is_empty() {
246 builder = builder.set_mapper(event_map::create_event_mapper(&streaming.event_map)?);
247 } else {
248 let tool_use = manifest.tooling.as_ref().and_then(|t| t.tool_use.clone());
249 builder = builder.set_mapper(Box::new(event_map::PathEventMapper::new(
251 streaming.content_path.clone(),
252 streaming.tool_call_path.clone(),
253 streaming.usage_path.clone(),
254 tool_use,
255 )));
256 }
257 }
258
259 builder.build()
260 }
261
262 pub async fn process_stream(
264 &self,
265 input: BoxStream<'static, bytes::Bytes>,
266 ) -> PipeResult<BoxStream<'static, StreamingEvent>> {
267 let mut stream = self.decoder.decode_stream(input).await?;
269
270 for transform in &self.transforms {
272 stream = transform.transform(stream).await?;
273 }
274
275 let events = self.mapper.map(stream).await?;
277
278 Ok(events)
279 }
280
281 pub async fn process_stream_arc(
282 self: std::sync::Arc<Self>,
283 input: BoxStream<'static, bytes::Bytes>,
284 ) -> PipeResult<BoxStream<'static, StreamingEvent>> {
285 self.process_stream(input).await
286 }
287}
288
289impl Default for PipelineBuilder {
290 fn default() -> Self {
291 Self::new()
292 }
293}