1use crate::pipeline::{Mapper, PipelineError};
8use crate::protocol::EventMapRule;
9use crate::protocol::ToolUseMapping;
10use crate::types::events::StreamingEvent;
11use crate::utils::JsonPathEvaluator;
12use crate::{BoxStream, PipeResult};
13use futures::{stream, StreamExt};
14use serde_json::Value;
15use std::collections::{HashMap, HashSet, VecDeque};
16use tracing::debug;
17
18#[derive(Clone)]
19struct CompiledRule {
20 matcher: JsonPathEvaluator,
21 emit: String,
22 extract: Vec<(String, String)>, }
24
25pub struct RuleBasedEventMapper {
26 rules: Vec<CompiledRule>,
27}
28
29impl RuleBasedEventMapper {
30 pub fn new(rules: &[EventMapRule]) -> Result<Self, PipelineError> {
31 let mut compiled = Vec::new();
32 for r in rules {
33 let matcher = JsonPathEvaluator::new(&r.match_expr).map_err(|e| {
34 PipelineError::InvalidJsonPath {
35 path: r.match_expr.clone(),
36 error: e.to_string(),
37 hint: None,
38 }
39 })?;
40 let mut extract = Vec::new();
41 if let Some(map) = &r.fields {
42 for (k, v) in map {
43 extract.push((k.clone(), v.clone()));
44 }
45 }
46 compiled.push(CompiledRule {
47 matcher,
48 emit: r.emit.clone(),
49 extract,
50 });
51 }
52 Ok(Self { rules: compiled })
53 }
54
55 fn build_event(
56 emit: &str,
57 frame: &Value,
58 extract: &[(String, String)],
59 ) -> Option<StreamingEvent> {
60 match emit {
61 "PartialContentDelta" => {
62 let mut content: Option<String> = None;
64 for (k, p) in extract {
65 if k == "content" {
66 content = crate::utils::PathMapper::get_string(frame, p);
67 }
68 }
69 let content = content.or_else(|| {
70 crate::utils::PathMapper::get_string(frame, "$.choices[0].delta.content")
71 })?;
72
73 if content.is_empty() {
77 return None;
78 }
79
80 Some(StreamingEvent::PartialContentDelta {
81 content,
82 sequence_id: None,
83 })
84 }
85 "Metadata" => {
86 let usage = crate::utils::PathMapper::get_path(frame, "$.usage").cloned();
88 Some(StreamingEvent::Metadata {
89 usage,
90 finish_reason: None,
91 stop_reason: None,
92 })
93 }
94 "StreamEnd" => Some(StreamingEvent::StreamEnd {
95 finish_reason: None,
96 }),
97 _ => None,
98 }
99 }
100}
101
102#[async_trait::async_trait]
103impl Mapper for RuleBasedEventMapper {
104 async fn map(
105 &self,
106 input: BoxStream<'static, Value>,
107 ) -> PipeResult<BoxStream<'static, StreamingEvent>> {
108 let rules = self.rules.clone();
109
110 let mapped = stream::unfold((input, false), move |(mut input, mut ended)| {
111 let rules = rules.clone();
112 async move {
113 if ended {
114 return None;
115 }
116
117 while let Some(item) = input.next().await {
118 match item {
119 Ok(frame) => {
120 for r in &rules {
121 if r.matcher.matches(&frame) {
122 if let Some(ev) = RuleBasedEventMapper::build_event(
123 &r.emit, &frame, &r.extract,
124 ) {
125 return Some((Ok(ev), (input, ended)));
126 }
127 }
130 }
131
132 continue;
136 }
137 Err(e) => return Some((Err(e), (input, ended))),
138 }
139 }
140
141 ended = true;
143 Some((
144 Ok(StreamingEvent::StreamEnd {
145 finish_reason: None,
146 }),
147 (input, ended),
148 ))
149 }
150 });
151
152 Ok(Box::pin(mapped))
153 }
154}
155
156pub struct OpenAiStyleEventMapper;
158
159#[async_trait::async_trait]
160impl Mapper for OpenAiStyleEventMapper {
161 async fn map(
162 &self,
163 input: BoxStream<'static, Value>,
164 ) -> PipeResult<BoxStream<'static, StreamingEvent>> {
165 let stream = stream::unfold((input, false), move |(mut input, mut ended)| async move {
166 if ended {
167 return None;
168 }
169
170 while let Some(item) = input.next().await {
171 match item {
172 Ok(frame) => {
173 if let Some(content) = crate::utils::PathMapper::get_string(
175 &frame,
176 "$.choices[0].delta.content",
177 ) {
178 if !content.is_empty() {
179 return Some((
180 Ok(StreamingEvent::PartialContentDelta {
181 content,
182 sequence_id: None,
183 }),
184 (input, ended),
185 ));
186 }
187 }
188
189 if let Some(usage) =
191 crate::utils::PathMapper::get_path(&frame, "$.usage").cloned()
192 {
193 return Some((
194 Ok(StreamingEvent::Metadata {
195 usage: Some(usage),
196 finish_reason: None,
197 stop_reason: None,
198 }),
199 (input, ended),
200 ));
201 }
202
203 continue;
204 }
205 Err(e) => return Some((Err(e), (input, ended))),
206 }
207 }
208
209 ended = true;
210 Some((
211 Ok(StreamingEvent::StreamEnd {
212 finish_reason: None,
213 }),
214 (input, ended),
215 ))
216 });
217
218 Ok(Box::pin(stream))
219 }
220}
221
222pub fn create_event_mapper(rules: &[EventMapRule]) -> Result<Box<dyn Mapper>, PipelineError> {
223 Ok(Box::new(RuleBasedEventMapper::new(rules)?))
224}
225
226pub struct PathEventMapper {
232 content_path: String,
233 tool_call_path: String,
234 usage_path: String,
235 tool_use: Option<ToolUseMapping>,
236}
237
238impl PathEventMapper {
239 pub fn new(
240 content_path: Option<String>,
241 tool_call_path: Option<String>,
242 usage_path: Option<String>,
243 tool_use: Option<ToolUseMapping>,
244 ) -> Self {
245 Self {
246 content_path: content_path.unwrap_or_else(|| "$.choices[0].delta.content".to_string()),
247 tool_call_path: tool_call_path
248 .unwrap_or_else(|| "$.choices[0].delta.tool_calls".to_string()),
249 usage_path: usage_path.unwrap_or_else(|| "$.usage".to_string()),
250 tool_use,
251 }
252 }
253}
254
255fn debug_toolcall_enabled() -> bool {
256 std::env::var("AI_LIB_DEBUG_TOOLCALL").ok().as_deref() == Some("1")
257}
258
259fn extract_toolcall_id(tc: &Value) -> Option<String> {
260 crate::utils::PathMapper::get_string(tc, "id")
261 .or_else(|| crate::utils::PathMapper::get_string(tc, "tool_call_id"))
262 .or_else(|| crate::utils::PathMapper::get_string(tc, "delta.id"))
263 .or_else(|| crate::utils::PathMapper::get_string(tc, "delta.tool_call_id"))
264}
265
266fn extract_toolcall_name(tc: &Value) -> Option<String> {
267 crate::utils::PathMapper::get_string(tc, "function.name")
268 .or_else(|| crate::utils::PathMapper::get_string(tc, "name"))
269 .or_else(|| crate::utils::PathMapper::get_string(tc, "delta.function.name"))
270 .or_else(|| crate::utils::PathMapper::get_string(tc, "delta.name"))
271}
272
273fn extract_toolcall_arguments(tc: &Value) -> Option<String> {
274 if let Some(v) = crate::utils::PathMapper::get_path(tc, "function.arguments") {
281 if let Some(s) = v.as_str() {
282 return Some(s.to_string());
283 }
284 if v.is_object() || v.is_array() {
285 return serde_json::to_string(v).ok();
286 }
287 }
288 if let Some(v) = crate::utils::PathMapper::get_path(tc, "arguments") {
289 if let Some(s) = v.as_str() {
290 return Some(s.to_string());
291 }
292 if v.is_object() || v.is_array() {
293 return serde_json::to_string(v).ok();
294 }
295 }
296 if let Some(v) = crate::utils::PathMapper::get_path(tc, "delta.function.arguments") {
297 if let Some(s) = v.as_str() {
298 return Some(s.to_string());
299 }
300 if v.is_object() || v.is_array() {
301 return serde_json::to_string(v).ok();
302 }
303 }
304 if let Some(v) = crate::utils::PathMapper::get_path(tc, "delta.arguments") {
305 if let Some(s) = v.as_str() {
306 return Some(s.to_string());
307 }
308 if v.is_object() || v.is_array() {
309 return serde_json::to_string(v).ok();
310 }
311 }
312 None
313}
314
315fn extract_by_tooling(
316 tc: &Value,
317 tool_use: &ToolUseMapping,
318) -> (Option<String>, Option<String>, Option<String>) {
319 let id = tool_use
320 .id_path
321 .as_deref()
322 .and_then(|p| crate::utils::PathMapper::get_string(tc, p));
323 let name = tool_use
324 .name_path
325 .as_deref()
326 .and_then(|p| crate::utils::PathMapper::get_string(tc, p));
327 let args = tool_use.input_path.as_deref().and_then(|p| {
328 let v = crate::utils::PathMapper::get_path(tc, p)?;
329 if let Some(s) = v.as_str() {
330 Some(s.to_string())
331 } else if v.is_object() || v.is_array() {
332 serde_json::to_string(v).ok()
333 } else {
334 serde_json::to_string(v).ok()
335 }
336 });
337 (id, name, args)
338}
339
340#[async_trait::async_trait]
341impl Mapper for PathEventMapper {
342 async fn map(
343 &self,
344 input: BoxStream<'static, Value>,
345 ) -> PipeResult<BoxStream<'static, StreamingEvent>> {
346 let content_path = self.content_path.clone();
347 let tool_call_path = self.tool_call_path.clone();
348 let usage_path = self.usage_path.clone();
349 let tool_use = self.tool_use.clone();
350
351 let stream = stream::unfold(
353 (
354 input,
355 VecDeque::<StreamingEvent>::new(),
356 false,
357 HashSet::<String>::new(),
358 HashMap::<u32, String>::new(),
359 ),
360 move |(mut input, mut q, mut ended, mut started_ids, mut index_to_id)| {
361 let content_path = content_path.clone();
362 let tool_call_path = tool_call_path.clone();
363 let usage_path = usage_path.clone();
364 let tool_use = tool_use.clone();
365 async move {
366 if let Some(ev) = q.pop_front() {
367 return Some((Ok(ev), (input, q, ended, started_ids, index_to_id)));
368 }
369 if ended {
370 return None;
371 }
372
373 while let Some(item) = input.next().await {
374 match item {
375 Ok(frame) => {
376 if let Some(content) =
378 crate::utils::PathMapper::get_string(&frame, &content_path)
379 {
380 if !content.is_empty() {
381 q.push_back(StreamingEvent::PartialContentDelta {
382 content,
383 sequence_id: None,
384 });
385 }
386 }
387
388 if let Some(usage) =
390 crate::utils::PathMapper::get_path(&frame, &usage_path).cloned()
391 {
392 q.push_back(StreamingEvent::Metadata {
393 usage: Some(usage),
394 finish_reason: None,
395 stop_reason: None,
396 });
397 }
398
399 if let Some(tc_val) =
401 crate::utils::PathMapper::get_path(&frame, &tool_call_path)
402 {
403 if debug_toolcall_enabled() {
404 debug!(
405 tool_call_path = tool_call_path.as_str(),
406 tool_call_delta = %tc_val,
407 frame = %frame,
408 "tool_call delta observed"
409 );
410 }
411 if let Some(arr) = tc_val.as_array() {
412 for (idx, tc) in arr.iter().enumerate() {
413 let tc_index: u32 =
415 crate::utils::PathMapper::get_path(tc, "index")
416 .and_then(|v| v.as_u64())
417 .map(|v| v as u32)
418 .unwrap_or(idx as u32);
419
420 let (mut id, mut name, mut args) =
422 if let Some(ref tu) = tool_use {
423 extract_by_tooling(tc, tu)
424 } else {
425 (None, None, None)
426 };
427
428 if id.is_none() {
430 id = extract_toolcall_id(tc);
431 }
432 if name.is_none() {
433 name = extract_toolcall_name(tc);
434 }
435 if args.is_none() {
436 args = extract_toolcall_arguments(tc);
437 }
438
439 if let Some(ref real_id) = id {
441 index_to_id.insert(tc_index, real_id.clone());
442 } else {
443 id = index_to_id.get(&tc_index).cloned();
445 }
446
447 if let (Some(id), Some(name)) =
448 (id.clone(), name.clone())
449 {
450 if !started_ids.contains(&id) {
451 started_ids.insert(id.clone());
452 q.push_back(StreamingEvent::ToolCallStarted {
453 tool_call_id: id.clone(),
454 tool_name: name,
455 index: Some(tc_index),
456 });
457 }
458 }
459
460 if let (Some(id), Some(arguments)) = (id, args) {
461 q.push_back(StreamingEvent::PartialToolCall {
462 tool_call_id: id,
463 arguments,
464 index: Some(tc_index),
465 is_complete: None,
466 });
467 }
468 }
469 }
470 }
471
472 if let Some(ev) = q.pop_front() {
473 return Some((
474 Ok(ev),
475 (input, q, ended, started_ids, index_to_id),
476 ));
477 }
478 continue;
479 }
480 Err(e) => {
481 return Some((Err(e), (input, q, ended, started_ids, index_to_id)))
482 }
483 }
484 }
485
486 ended = true;
487 Some((
488 Ok(StreamingEvent::StreamEnd {
489 finish_reason: None,
490 }),
491 (input, q, ended, started_ids, index_to_id),
492 ))
493 }
494 },
495 );
496
497 Ok(Box::pin(stream))
498 }
499}