ai_lib_rust/pipeline/
mod.rs1pub mod accumulate;
10pub mod decode;
11pub mod event_map;
12pub mod fan_out;
13pub mod select;
14
15pub mod fallback;
17pub mod retry;
18
19#[cfg(test)]
20mod tests;
21
22use crate::protocol::ProtocolManifest;
23use crate::types::events::StreamingEvent;
24use crate::{BoxStream, PipeResult};
25
26#[async_trait::async_trait]
28pub trait Transform: Send + Sync {
29 async fn transform(
31 &self,
32 input: BoxStream<'static, serde_json::Value>,
33 ) -> PipeResult<BoxStream<'static, serde_json::Value>>;
34}
35
36#[async_trait::async_trait]
38pub trait Mapper: Send + Sync {
39 async fn map(
41 &self,
42 input: BoxStream<'static, serde_json::Value>,
43 ) -> PipeResult<BoxStream<'static, StreamingEvent>>;
44}
45
46#[async_trait::async_trait]
48pub trait Decoder: Send + Sync {
49 async fn decode_stream(
51 &self,
52 input: BoxStream<'static, bytes::Bytes>,
53 ) -> PipeResult<BoxStream<'static, serde_json::Value>>;
54}
55
56#[derive(Debug, thiserror::Error)]
58pub enum PipelineError {
59 #[error("Decoder error: {0}")]
60 Decoder(String),
61
62 #[error("Selector error: {0}")]
63 Selector(String),
64
65 #[error("Accumulator error: {0}")]
66 Accumulator(String),
67
68 #[error("Event mapper error: {0}")]
69 EventMapper(String),
70
71 #[error("Configuration error: {0}")]
72 Configuration(String),
73
74 #[error("Missing required field: {name}{}", .hint.as_ref().map(|h| format!("\n💡 Hint: {}", h)).unwrap_or_default())]
75 MissingField { name: String, hint: Option<String> },
76
77 #[error("Invalid JSON path: {path} - {error}{}", .hint.as_ref().map(|h| format!("\n💡 Hint: {}", h)).unwrap_or_default())]
78 InvalidJsonPath {
79 path: String,
80 error: String,
81 hint: Option<String>,
82 },
83
84 #[error("Operator execution failed: {operator} - {reason}{}", .hint.as_ref().map(|h| format!("\n💡 Hint: {}", h)).unwrap_or_default())]
85 Execution {
86 operator: String,
87 reason: String,
88 hint: Option<String>,
89 },
90}
91
92impl PipelineError {
93 pub fn with_hint(mut self, hint: impl Into<String>) -> Self {
95 let hint_val = Some(hint.into());
96 match self {
97 PipelineError::MissingField { ref mut hint, .. } => *hint = hint_val,
98 PipelineError::InvalidJsonPath { ref mut hint, .. } => *hint = hint_val,
99 PipelineError::Execution { ref mut hint, .. } => *hint = hint_val,
100 _ => (),
101 }
102 self
103 }
104}
105
106pub struct PipelineBuilder {
108 decoder: Option<Box<dyn Decoder>>,
109 transforms: Vec<Box<dyn Transform>>,
110 mapper: Option<Box<dyn Mapper>>,
111}
112
113impl PipelineBuilder {
114 pub fn new() -> Self {
115 Self {
116 decoder: None,
117 transforms: Vec::new(),
118 mapper: None,
119 }
120 }
121
122 pub fn set_decoder(mut self, decoder: Box<dyn Decoder>) -> Self {
123 self.decoder = Some(decoder);
124 self
125 }
126
127 pub fn add_transform(mut self, transform: Box<dyn Transform>) -> Self {
128 self.transforms.push(transform);
129 self
130 }
131
132 pub fn set_mapper(mut self, mapper: Box<dyn Mapper>) -> Self {
133 self.mapper = Some(mapper);
134 self
135 }
136
137 pub fn build(self) -> Result<Pipeline, PipelineError> {
138 Ok(Pipeline {
139 decoder: self
140 .decoder
141 .ok_or_else(|| PipelineError::Configuration("Decoder is required".to_string()))?,
142 transforms: self.transforms,
143 mapper: self.mapper.ok_or_else(|| {
144 PipelineError::Configuration("Event mapper is required".to_string())
145 })?,
146 })
147 }
148}
149
150pub struct Pipeline {
152 decoder: Box<dyn Decoder>,
153 transforms: Vec<Box<dyn Transform>>,
154 mapper: Box<dyn Mapper>,
155}
156
157impl Pipeline {
158 pub fn from_manifest(manifest: &ProtocolManifest) -> Result<Self, PipelineError> {
160 let mut builder = PipelineBuilder::new();
161
162 if let Some(streaming) = &manifest.streaming {
163 if let Some(decoder_config) = &streaming.decoder {
165 builder = builder.set_decoder(decode::create_decoder(decoder_config)?);
166 } else {
167 return Err(PipelineError::Configuration(
168 "streaming.decoder is required for streaming pipelines".to_string(),
169 ));
170 }
171
172 if let Some(frame_selector) = &streaming.frame_selector {
174 builder = builder.add_transform(select::create_selector(frame_selector)?);
175 }
176
177 if let Some(accumulator_config) = &streaming.accumulator {
178 builder =
179 builder.add_transform(accumulate::create_accumulator(accumulator_config)?);
180 }
181
182 if let Some(candidate_config) = &streaming.candidate {
183 if candidate_config.fan_out.unwrap_or(false) {
184 builder = builder.add_transform(fan_out::create_fan_out(candidate_config)?);
185 }
186 }
187
188 if !streaming.event_map.is_empty() {
191 builder = builder.set_mapper(event_map::create_event_mapper(&streaming.event_map)?);
192 } else {
193 let tool_use = manifest.tooling.as_ref().and_then(|t| t.tool_use.clone());
194 builder = builder.set_mapper(Box::new(event_map::PathEventMapper::new(
196 streaming.content_path.clone(),
197 streaming.tool_call_path.clone(),
198 streaming.usage_path.clone(),
199 tool_use,
200 )));
201 }
202 }
203
204 builder.build()
205 }
206
207 pub async fn process_stream(
209 &self,
210 input: BoxStream<'static, bytes::Bytes>,
211 ) -> PipeResult<BoxStream<'static, StreamingEvent>> {
212 let mut stream = self.decoder.decode_stream(input).await?;
214
215 for transform in &self.transforms {
217 stream = transform.transform(stream).await?;
218 }
219
220 let events = self.mapper.map(stream).await?;
222
223 Ok(events)
224 }
225
226 pub async fn process_stream_arc(
227 self: std::sync::Arc<Self>,
228 input: BoxStream<'static, bytes::Bytes>,
229 ) -> PipeResult<BoxStream<'static, StreamingEvent>> {
230 self.process_stream(input).await
231 }
232}
233
234impl Default for PipelineBuilder {
235 fn default() -> Self {
236 Self::new()
237 }
238}