1use crate::pipeline::{Mapper, PipelineError};
10use crate::protocol::EventMapRule;
11use crate::protocol::ToolUseMapping;
12use crate::types::events::StreamingEvent;
13use crate::utils::JsonPathEvaluator;
14use crate::{BoxStream, PipeResult};
15use futures::{stream, StreamExt};
16use serde_json::Value;
17use std::collections::{HashMap, HashSet, VecDeque};
18use tracing::debug;
19
20#[derive(Clone)]
21struct CompiledRule {
22 matcher: JsonPathEvaluator,
23 emit: String,
24 extract: Vec<(String, String)>, }
26
27pub struct RuleBasedEventMapper {
28 rules: Vec<CompiledRule>,
29}
30
31impl RuleBasedEventMapper {
32 pub fn new(rules: &[EventMapRule]) -> Result<Self, PipelineError> {
33 let mut compiled = Vec::new();
34 for r in rules {
35 let matcher = JsonPathEvaluator::new(&r.match_expr).map_err(|e| {
36 PipelineError::InvalidJsonPath {
37 path: r.match_expr.clone(),
38 error: e.to_string(),
39 hint: None,
40 }
41 })?;
42 let mut extract = Vec::new();
43 if let Some(map) = &r.fields {
44 for (k, v) in map {
45 let k: &String = k;
46 let v: &String = v;
47 extract.push((k.clone(), v.clone()));
48 }
49 }
50 compiled.push(CompiledRule {
51 matcher,
52 emit: r.emit.clone(),
53 extract,
54 });
55 }
56 Ok(Self { rules: compiled })
57 }
58
59 fn build_event(
60 emit: &str,
61 frame: &Value,
62 extract: &[(String, String)],
63 ) -> Option<StreamingEvent> {
64 match emit {
65 "PartialContentDelta" => {
66 let mut content: Option<String> = None;
68 for (k, p) in extract {
69 if k == "content" {
70 content = crate::utils::PathMapper::get_string(frame, p);
71 }
72 }
73 let content = content.or_else(|| {
74 crate::utils::PathMapper::get_string(frame, "$.choices[0].delta.content")
75 })?;
76
77 if content.is_empty() {
81 return None;
82 }
83
84 Some(StreamingEvent::PartialContentDelta {
85 content,
86 sequence_id: None,
87 })
88 }
89 "Metadata" => {
90 let usage = crate::utils::PathMapper::get_path(frame, "$.usage").cloned();
92 Some(StreamingEvent::Metadata {
93 usage,
94 finish_reason: None,
95 stop_reason: None,
96 })
97 }
98 "StreamEnd" => Some(StreamingEvent::StreamEnd {
99 finish_reason: None,
100 }),
101 _ => None,
102 }
103 }
104}
105
106#[async_trait::async_trait]
107impl Mapper for RuleBasedEventMapper {
108 async fn map(
109 &self,
110 input: BoxStream<'static, Value>,
111 ) -> PipeResult<BoxStream<'static, StreamingEvent>> {
112 let rules = self.rules.clone();
113
114 let mapped = stream::unfold((input, false), move |(mut input, mut ended)| {
115 let rules = rules.clone();
116 async move {
117 if ended {
118 return None;
119 }
120
121 while let Some(item) = input.next().await {
122 match item {
123 Ok(frame) => {
124 for r in &rules {
125 if r.matcher.matches(&frame) {
126 if let Some(ev) = RuleBasedEventMapper::build_event(
127 &r.emit, &frame, &r.extract,
128 ) {
129 return Some((Ok(ev), (input, ended)));
130 }
131 }
134 }
135
136 continue;
140 }
141 Err(e) => return Some((Err(e), (input, ended))),
142 }
143 }
144
145 ended = true;
147 Some((
148 Ok(StreamingEvent::StreamEnd {
149 finish_reason: None,
150 }),
151 (input, ended),
152 ))
153 }
154 });
155
156 Ok(Box::pin(mapped))
157 }
158}
159
160pub struct OpenAiStyleEventMapper;
162
163#[async_trait::async_trait]
164impl Mapper for OpenAiStyleEventMapper {
165 async fn map(
166 &self,
167 input: BoxStream<'static, Value>,
168 ) -> PipeResult<BoxStream<'static, StreamingEvent>> {
169 let stream = stream::unfold((input, false), move |(mut input, mut ended)| async move {
170 if ended {
171 return None;
172 }
173
174 while let Some(item) = input.next().await {
175 match item {
176 Ok(frame) => {
177 if let Some(content) = crate::utils::PathMapper::get_string(
179 &frame,
180 "$.choices[0].delta.content",
181 ) {
182 if !content.is_empty() {
183 return Some((
184 Ok(StreamingEvent::PartialContentDelta {
185 content,
186 sequence_id: None,
187 }),
188 (input, ended),
189 ));
190 }
191 }
192
193 if let Some(usage) =
195 crate::utils::PathMapper::get_path(&frame, "$.usage").cloned()
196 {
197 return Some((
198 Ok(StreamingEvent::Metadata {
199 usage: Some(usage),
200 finish_reason: None,
201 stop_reason: None,
202 }),
203 (input, ended),
204 ));
205 }
206
207 continue;
208 }
209 Err(e) => return Some((Err(e), (input, ended))),
210 }
211 }
212
213 ended = true;
214 Some((
215 Ok(StreamingEvent::StreamEnd {
216 finish_reason: None,
217 }),
218 (input, ended),
219 ))
220 });
221
222 Ok(Box::pin(stream))
223 }
224}
225
226pub fn create_event_mapper(rules: &[EventMapRule]) -> Result<Box<dyn Mapper>, PipelineError> {
227 Ok(Box::new(RuleBasedEventMapper::new(rules)?))
228}
229
230pub struct PathEventMapper {
236 content_path: String,
237 tool_call_path: String,
238 usage_path: String,
239 tool_use: Option<ToolUseMapping>,
240}
241
242impl PathEventMapper {
243 pub fn new(
244 content_path: Option<String>,
245 tool_call_path: Option<String>,
246 usage_path: Option<String>,
247 tool_use: Option<ToolUseMapping>,
248 ) -> Self {
249 Self {
250 content_path: content_path.unwrap_or_else(|| "$.choices[0].delta.content".to_string()),
251 tool_call_path: tool_call_path
252 .unwrap_or_else(|| "$.choices[0].delta.tool_calls".to_string()),
253 usage_path: usage_path.unwrap_or_else(|| "$.usage".to_string()),
254 tool_use,
255 }
256 }
257}
258
259fn debug_toolcall_enabled() -> bool {
260 std::env::var("AI_LIB_DEBUG_TOOLCALL").ok().as_deref() == Some("1")
261}
262
263fn extract_toolcall_id(tc: &Value) -> Option<String> {
264 crate::utils::PathMapper::get_string(tc, "id")
265 .or_else(|| crate::utils::PathMapper::get_string(tc, "tool_call_id"))
266 .or_else(|| crate::utils::PathMapper::get_string(tc, "delta.id"))
267 .or_else(|| crate::utils::PathMapper::get_string(tc, "delta.tool_call_id"))
268}
269
270fn extract_toolcall_name(tc: &Value) -> Option<String> {
271 crate::utils::PathMapper::get_string(tc, "function.name")
272 .or_else(|| crate::utils::PathMapper::get_string(tc, "name"))
273 .or_else(|| crate::utils::PathMapper::get_string(tc, "delta.function.name"))
274 .or_else(|| crate::utils::PathMapper::get_string(tc, "delta.name"))
275}
276
277fn extract_toolcall_arguments(tc: &Value) -> Option<String> {
278 if let Some(v) = crate::utils::PathMapper::get_path(tc, "function.arguments") {
285 if let Some(s) = v.as_str() {
286 return Some(s.to_string());
287 }
288 if v.is_object() || v.is_array() {
289 return serde_json::to_string(v).ok();
290 }
291 }
292 if let Some(v) = crate::utils::PathMapper::get_path(tc, "arguments") {
293 if let Some(s) = v.as_str() {
294 return Some(s.to_string());
295 }
296 if v.is_object() || v.is_array() {
297 return serde_json::to_string(v).ok();
298 }
299 }
300 if let Some(v) = crate::utils::PathMapper::get_path(tc, "delta.function.arguments") {
301 if let Some(s) = v.as_str() {
302 return Some(s.to_string());
303 }
304 if v.is_object() || v.is_array() {
305 return serde_json::to_string(v).ok();
306 }
307 }
308 if let Some(v) = crate::utils::PathMapper::get_path(tc, "delta.arguments") {
309 if let Some(s) = v.as_str() {
310 return Some(s.to_string());
311 }
312 if v.is_object() || v.is_array() {
313 return serde_json::to_string(v).ok();
314 }
315 }
316 None
317}
318
319fn extract_by_tooling(
320 tc: &Value,
321 tool_use: &ToolUseMapping,
322) -> (Option<String>, Option<String>, Option<String>) {
323 let id = tool_use
324 .id_path
325 .as_deref()
326 .and_then(|p| crate::utils::PathMapper::get_string(tc, p));
327 let name = tool_use
328 .name_path
329 .as_deref()
330 .and_then(|p| crate::utils::PathMapper::get_string(tc, p));
331 let args = tool_use.input_path.as_deref().and_then(|p| {
332 let v = crate::utils::PathMapper::get_path(tc, p)?;
333 if let Some(s) = v.as_str() {
334 Some(s.to_string())
335 } else if v.is_object() || v.is_array() {
336 serde_json::to_string(v).ok()
337 } else {
338 serde_json::to_string(v).ok()
339 }
340 });
341 (id, name, args)
342}
343
344#[async_trait::async_trait]
345impl Mapper for PathEventMapper {
346 async fn map(
347 &self,
348 input: BoxStream<'static, Value>,
349 ) -> PipeResult<BoxStream<'static, StreamingEvent>> {
350 let content_path = self.content_path.clone();
351 let tool_call_path = self.tool_call_path.clone();
352 let usage_path = self.usage_path.clone();
353 let tool_use = self.tool_use.clone();
354
355 let stream = stream::unfold(
357 (
358 input,
359 VecDeque::<StreamingEvent>::new(),
360 false,
361 HashSet::<String>::new(),
362 HashMap::<u32, String>::new(),
363 ),
364 move |(mut input, mut q, mut ended, mut started_ids, mut index_to_id)| {
365 let content_path = content_path.clone();
366 let tool_call_path = tool_call_path.clone();
367 let usage_path = usage_path.clone();
368 let tool_use = tool_use.clone();
369 async move {
370 if let Some(ev) = q.pop_front() {
371 return Some((Ok(ev), (input, q, ended, started_ids, index_to_id)));
372 }
373 if ended {
374 return None;
375 }
376
377 while let Some(item) = input.next().await {
378 match item {
379 Ok(frame) => {
380 if let Some(content) =
382 crate::utils::PathMapper::get_string(&frame, &content_path)
383 {
384 if !content.is_empty() {
385 q.push_back(StreamingEvent::PartialContentDelta {
386 content,
387 sequence_id: None,
388 });
389 }
390 }
391
392 if let Some(usage) =
394 crate::utils::PathMapper::get_path(&frame, &usage_path).cloned()
395 {
396 q.push_back(StreamingEvent::Metadata {
397 usage: Some(usage),
398 finish_reason: None,
399 stop_reason: None,
400 });
401 }
402
403 if let Some(tc_val) =
405 crate::utils::PathMapper::get_path(&frame, &tool_call_path)
406 {
407 if debug_toolcall_enabled() {
408 debug!(
409 tool_call_path = tool_call_path.as_str(),
410 tool_call_delta = %tc_val,
411 frame = %frame,
412 "tool_call delta observed"
413 );
414 }
415 if let Some(arr) = tc_val.as_array() {
416 for (idx, tc) in arr.iter().enumerate() {
417 let tc_index: u32 =
419 crate::utils::PathMapper::get_path(tc, "index")
420 .and_then(|v| v.as_u64())
421 .map(|v| v as u32)
422 .unwrap_or(idx as u32);
423
424 let (mut id, mut name, mut args) =
426 if let Some(ref tu) = tool_use {
427 extract_by_tooling(tc, tu)
428 } else {
429 (None, None, None)
430 };
431
432 if id.is_none() {
434 id = extract_toolcall_id(tc);
435 }
436 if name.is_none() {
437 name = extract_toolcall_name(tc);
438 }
439 if args.is_none() {
440 args = extract_toolcall_arguments(tc);
441 }
442
443 if let Some(ref real_id) = id {
445 index_to_id.insert(tc_index, real_id.clone());
446 } else {
447 id = index_to_id.get(&tc_index).cloned();
449 }
450
451 if let (Some(id), Some(name)) =
452 (id.clone(), name.clone())
453 {
454 if !started_ids.contains(&id) {
455 started_ids.insert(id.clone());
456 q.push_back(StreamingEvent::ToolCallStarted {
457 tool_call_id: id.clone(),
458 tool_name: name,
459 index: Some(tc_index),
460 });
461 }
462 }
463
464 if let (Some(id), Some(arguments)) = (id, args) {
465 q.push_back(StreamingEvent::PartialToolCall {
466 tool_call_id: id,
467 arguments,
468 index: Some(tc_index),
469 is_complete: None,
470 });
471 }
472 }
473 }
474 }
475
476 if let Some(ev) = q.pop_front() {
477 return Some((
478 Ok(ev),
479 (input, q, ended, started_ids, index_to_id),
480 ));
481 }
482 continue;
483 }
484 Err(e) => {
485 return Some((Err(e), (input, q, ended, started_ids, index_to_id)))
486 }
487 }
488 }
489
490 ended = true;
491 Some((
492 Ok(StreamingEvent::StreamEnd {
493 finish_reason: None,
494 }),
495 (input, q, ended, started_ids, index_to_id),
496 ))
497 }
498 },
499 );
500
501 Ok(Box::pin(stream))
502 }
503}