1use crate::pipeline::{Mapper, PipelineError};
10use crate::protocol::EventMapRule;
11use crate::protocol::ToolUseMapping;
12use crate::types::events::StreamingEvent;
13use crate::utils::JsonPathEvaluator;
14use crate::{BoxStream, PipeResult};
15use futures::{stream, StreamExt};
16use serde_json::Value;
17use std::collections::{HashMap, HashSet, VecDeque};
18use tracing::debug;
19
20#[derive(Clone)]
21struct CompiledRule {
22 matcher: JsonPathEvaluator,
23 emit: String,
24 extract: Vec<(String, String)>, }
26
27pub struct RuleBasedEventMapper {
28 rules: Vec<CompiledRule>,
29}
30
31impl RuleBasedEventMapper {
32 pub fn new(rules: &[EventMapRule]) -> Result<Self, PipelineError> {
33 let mut compiled = Vec::new();
34 for r in rules {
35 let matcher = JsonPathEvaluator::new(&r.match_expr).map_err(|e| {
36 PipelineError::InvalidJsonPath {
37 path: r.match_expr.clone(),
38 error: e.to_string(),
39 hint: None,
40 }
41 })?;
42 let mut extract = Vec::new();
43 if let Some(map) = &r.fields {
44 for (k, v) in map {
45 let k: &String = k;
46 let v: &String = v;
47 extract.push((k.clone(), v.clone()));
48 }
49 }
50 compiled.push(CompiledRule {
51 matcher,
52 emit: r.emit.clone(),
53 extract,
54 });
55 }
56 Ok(Self { rules: compiled })
57 }
58
59 fn build_event(
60 emit: &str,
61 frame: &Value,
62 extract: &[(String, String)],
63 ) -> Option<StreamingEvent> {
64 match emit {
65 "PartialContentDelta" => {
66 let mut content: Option<String> = None;
68 for (k, p) in extract {
69 if k == "content" {
70 content = crate::utils::PathMapper::get_string(frame, p);
71 }
72 }
73 let content = content.or_else(|| {
74 crate::utils::PathMapper::get_string(frame, "$.choices[0].delta.content")
75 })?;
76
77 if content.is_empty() {
81 return None;
82 }
83
84 Some(StreamingEvent::PartialContentDelta {
85 content,
86 sequence_id: None,
87 })
88 }
89 "Metadata" => {
90 let usage = crate::utils::PathMapper::get_path(frame, "$.usage").cloned();
92 Some(StreamingEvent::Metadata {
93 usage,
94 finish_reason: None,
95 stop_reason: None,
96 })
97 }
98 "ThinkingDelta" => {
99 let mut thinking: Option<String> = None;
100 for (k, p) in extract {
101 if k == "thinking" {
102 thinking = crate::utils::PathMapper::get_string(frame, p);
103 }
104 }
105 let thinking = thinking.or_else(|| {
106 crate::utils::PathMapper::get_string(
107 frame,
108 "$.choices[0].delta.reasoning_content",
109 )
110 })?;
111 if thinking.is_empty() {
112 return None;
113 }
114 Some(StreamingEvent::ThinkingDelta {
115 thinking,
116 tool_consideration: None,
117 })
118 }
119 "StreamEnd" => {
120 let mut finish: Option<String> = None;
121 for (k, p) in extract {
122 if k == "finish_reason" {
123 finish = crate::utils::PathMapper::get_string(frame, p);
124 }
125 }
126 let finish_reason = finish.or_else(|| {
127 crate::utils::PathMapper::get_string(frame, "$.choices[0].finish_reason")
128 });
129 Some(StreamingEvent::StreamEnd { finish_reason })
130 }
131 _ => None,
132 }
133 }
134}
135
136#[async_trait::async_trait]
137impl Mapper for RuleBasedEventMapper {
138 async fn map(
139 &self,
140 input: BoxStream<'static, Value>,
141 ) -> PipeResult<BoxStream<'static, StreamingEvent>> {
142 let rules = self.rules.clone();
143
144 let mapped = stream::unfold((input, false), move |(mut input, mut ended)| {
145 let rules = rules.clone();
146 async move {
147 if ended {
148 return None;
149 }
150
151 while let Some(item) = input.next().await {
152 match item {
153 Ok(frame) => {
154 for r in &rules {
155 if r.matcher.matches(&frame) {
156 if let Some(ev) = RuleBasedEventMapper::build_event(
157 &r.emit, &frame, &r.extract,
158 ) {
159 return Some((Ok(ev), (input, ended)));
160 }
161 }
164 }
165
166 continue;
170 }
171 Err(e) => return Some((Err(e), (input, ended))),
172 }
173 }
174
175 ended = true;
177 Some((
178 Ok(StreamingEvent::StreamEnd {
179 finish_reason: None,
180 }),
181 (input, ended),
182 ))
183 }
184 });
185
186 Ok(Box::pin(mapped))
187 }
188}
189
190pub struct OpenAiStyleEventMapper;
192
193#[async_trait::async_trait]
194impl Mapper for OpenAiStyleEventMapper {
195 async fn map(
196 &self,
197 input: BoxStream<'static, Value>,
198 ) -> PipeResult<BoxStream<'static, StreamingEvent>> {
199 let stream = stream::unfold((input, false), move |(mut input, mut ended)| async move {
200 if ended {
201 return None;
202 }
203
204 while let Some(item) = input.next().await {
205 match item {
206 Ok(frame) => {
207 if let Some(content) = crate::utils::PathMapper::get_string(
209 &frame,
210 "$.choices[0].delta.content",
211 ) {
212 if !content.is_empty() {
213 return Some((
214 Ok(StreamingEvent::PartialContentDelta {
215 content,
216 sequence_id: None,
217 }),
218 (input, ended),
219 ));
220 }
221 }
222
223 if let Some(thinking) = crate::utils::PathMapper::get_string(
224 &frame,
225 "$.choices[0].delta.reasoning_content",
226 ) {
227 if !thinking.is_empty() {
228 return Some((
229 Ok(StreamingEvent::ThinkingDelta {
230 thinking,
231 tool_consideration: None,
232 }),
233 (input, ended),
234 ));
235 }
236 }
237
238 if let Some(usage) =
240 crate::utils::PathMapper::get_path(&frame, "$.usage").cloned()
241 {
242 return Some((
243 Ok(StreamingEvent::Metadata {
244 usage: Some(usage),
245 finish_reason: None,
246 stop_reason: None,
247 }),
248 (input, ended),
249 ));
250 }
251
252 if let Some(reason) = crate::utils::PathMapper::get_string(
253 &frame,
254 "$.choices[0].finish_reason",
255 ) {
256 if !reason.is_empty() {
257 return Some((
258 Ok(StreamingEvent::StreamEnd {
259 finish_reason: Some(reason),
260 }),
261 (input, ended),
262 ));
263 }
264 }
265
266 continue;
267 }
268 Err(e) => return Some((Err(e), (input, ended))),
269 }
270 }
271
272 ended = true;
273 Some((
274 Ok(StreamingEvent::StreamEnd {
275 finish_reason: None,
276 }),
277 (input, ended),
278 ))
279 });
280
281 Ok(Box::pin(stream))
282 }
283}
284
285pub fn create_event_mapper(rules: &[EventMapRule]) -> Result<Box<dyn Mapper>, PipelineError> {
286 Ok(Box::new(RuleBasedEventMapper::new(rules)?))
287}
288
289pub struct PathEventMapper {
295 content_path: String,
296 tool_call_path: String,
297 usage_path: String,
298 tool_use: Option<ToolUseMapping>,
299}
300
301impl PathEventMapper {
302 pub fn new(
303 content_path: Option<String>,
304 tool_call_path: Option<String>,
305 usage_path: Option<String>,
306 tool_use: Option<ToolUseMapping>,
307 ) -> Self {
308 Self {
309 content_path: content_path.unwrap_or_else(|| "$.choices[0].delta.content".to_string()),
310 tool_call_path: tool_call_path
311 .unwrap_or_else(|| "$.choices[0].delta.tool_calls".to_string()),
312 usage_path: usage_path.unwrap_or_else(|| "$.usage".to_string()),
313 tool_use,
314 }
315 }
316}
317
318fn debug_toolcall_enabled() -> bool {
319 std::env::var("AI_LIB_DEBUG_TOOLCALL").ok().as_deref() == Some("1")
320}
321
322fn extract_toolcall_id(tc: &Value) -> Option<String> {
323 crate::utils::PathMapper::get_string(tc, "id")
324 .or_else(|| crate::utils::PathMapper::get_string(tc, "tool_call_id"))
325 .or_else(|| crate::utils::PathMapper::get_string(tc, "delta.id"))
326 .or_else(|| crate::utils::PathMapper::get_string(tc, "delta.tool_call_id"))
327}
328
329fn extract_toolcall_name(tc: &Value) -> Option<String> {
330 crate::utils::PathMapper::get_string(tc, "function.name")
331 .or_else(|| crate::utils::PathMapper::get_string(tc, "name"))
332 .or_else(|| crate::utils::PathMapper::get_string(tc, "delta.function.name"))
333 .or_else(|| crate::utils::PathMapper::get_string(tc, "delta.name"))
334}
335
336fn extract_toolcall_arguments(tc: &Value) -> Option<String> {
337 if let Some(v) = crate::utils::PathMapper::get_path(tc, "function.arguments") {
344 if let Some(s) = v.as_str() {
345 return Some(s.to_string());
346 }
347 if v.is_object() || v.is_array() {
348 return serde_json::to_string(v).ok();
349 }
350 }
351 if let Some(v) = crate::utils::PathMapper::get_path(tc, "arguments") {
352 if let Some(s) = v.as_str() {
353 return Some(s.to_string());
354 }
355 if v.is_object() || v.is_array() {
356 return serde_json::to_string(v).ok();
357 }
358 }
359 if let Some(v) = crate::utils::PathMapper::get_path(tc, "delta.function.arguments") {
360 if let Some(s) = v.as_str() {
361 return Some(s.to_string());
362 }
363 if v.is_object() || v.is_array() {
364 return serde_json::to_string(v).ok();
365 }
366 }
367 if let Some(v) = crate::utils::PathMapper::get_path(tc, "delta.arguments") {
368 if let Some(s) = v.as_str() {
369 return Some(s.to_string());
370 }
371 if v.is_object() || v.is_array() {
372 return serde_json::to_string(v).ok();
373 }
374 }
375 None
376}
377
378fn extract_by_tooling(
379 tc: &Value,
380 tool_use: &ToolUseMapping,
381) -> (Option<String>, Option<String>, Option<String>) {
382 let id = tool_use
383 .id_path
384 .as_deref()
385 .and_then(|p| crate::utils::PathMapper::get_string(tc, p));
386 let name = tool_use
387 .name_path
388 .as_deref()
389 .and_then(|p| crate::utils::PathMapper::get_string(tc, p));
390 let args = tool_use.input_path.as_deref().and_then(|p| {
391 let v = crate::utils::PathMapper::get_path(tc, p)?;
392 if let Some(s) = v.as_str() {
393 Some(s.to_string())
394 } else {
395 serde_json::to_string(v).ok()
396 }
397 });
398 (id, name, args)
399}
400
401#[async_trait::async_trait]
402impl Mapper for PathEventMapper {
403 async fn map(
404 &self,
405 input: BoxStream<'static, Value>,
406 ) -> PipeResult<BoxStream<'static, StreamingEvent>> {
407 let content_path = self.content_path.clone();
408 let tool_call_path = self.tool_call_path.clone();
409 let usage_path = self.usage_path.clone();
410 let tool_use = self.tool_use.clone();
411
412 let stream = stream::unfold(
414 (
415 input,
416 VecDeque::<StreamingEvent>::new(),
417 false,
418 HashSet::<String>::new(),
419 HashMap::<u32, String>::new(),
420 ),
421 move |(mut input, mut q, mut ended, mut started_ids, mut index_to_id)| {
422 let content_path = content_path.clone();
423 let tool_call_path = tool_call_path.clone();
424 let usage_path = usage_path.clone();
425 let tool_use = tool_use.clone();
426 async move {
427 if let Some(ev) = q.pop_front() {
428 return Some((Ok(ev), (input, q, ended, started_ids, index_to_id)));
429 }
430 if ended {
431 return None;
432 }
433
434 while let Some(item) = input.next().await {
435 match item {
436 Ok(frame) => {
437 if let Some(content) =
439 crate::utils::PathMapper::get_string(&frame, &content_path)
440 {
441 if !content.is_empty() {
442 q.push_back(StreamingEvent::PartialContentDelta {
443 content,
444 sequence_id: None,
445 });
446 }
447 }
448
449 if let Some(usage) =
451 crate::utils::PathMapper::get_path(&frame, &usage_path).cloned()
452 {
453 q.push_back(StreamingEvent::Metadata {
454 usage: Some(usage),
455 finish_reason: None,
456 stop_reason: None,
457 });
458 }
459
460 if let Some(tc_val) =
462 crate::utils::PathMapper::get_path(&frame, &tool_call_path)
463 {
464 if debug_toolcall_enabled() {
465 debug!(
466 tool_call_path = tool_call_path.as_str(),
467 tool_call_delta = %tc_val,
468 frame = %frame,
469 "tool_call delta observed"
470 );
471 }
472 if let Some(arr) = tc_val.as_array() {
473 for (idx, tc) in arr.iter().enumerate() {
474 let tc_index: u32 =
476 crate::utils::PathMapper::get_path(tc, "index")
477 .and_then(|v| v.as_u64())
478 .map(|v| v as u32)
479 .unwrap_or(idx as u32);
480
481 let (mut id, mut name, mut args) =
483 if let Some(ref tu) = tool_use {
484 extract_by_tooling(tc, tu)
485 } else {
486 (None, None, None)
487 };
488
489 if id.is_none() {
491 id = extract_toolcall_id(tc);
492 }
493 if name.is_none() {
494 name = extract_toolcall_name(tc);
495 }
496 if args.is_none() {
497 args = extract_toolcall_arguments(tc);
498 }
499
500 if let Some(ref real_id) = id {
502 index_to_id.insert(tc_index, real_id.clone());
503 } else {
504 id = index_to_id.get(&tc_index).cloned();
506 }
507
508 if let (Some(id), Some(name)) =
509 (id.clone(), name.clone())
510 {
511 if !started_ids.contains(&id) {
512 started_ids.insert(id.clone());
513 q.push_back(StreamingEvent::ToolCallStarted {
514 tool_call_id: id.clone(),
515 tool_name: name,
516 index: Some(tc_index),
517 });
518 }
519 }
520
521 if let (Some(id), Some(arguments)) = (id, args) {
522 q.push_back(StreamingEvent::PartialToolCall {
523 tool_call_id: id,
524 arguments,
525 index: Some(tc_index),
526 is_complete: None,
527 });
528 }
529 }
530 }
531 }
532
533 if let Some(thinking) = crate::utils::PathMapper::get_string(
534 &frame,
535 "$.choices[0].delta.reasoning_content",
536 ) {
537 if !thinking.is_empty() {
538 q.push_back(StreamingEvent::ThinkingDelta {
539 thinking,
540 tool_consideration: None,
541 });
542 }
543 }
544
545 if let Some(reason) = crate::utils::PathMapper::get_string(
546 &frame,
547 "$.choices[0].finish_reason",
548 ) {
549 if !reason.is_empty() {
550 q.push_back(StreamingEvent::StreamEnd {
551 finish_reason: Some(reason),
552 });
553 }
554 }
555
556 if let Some(ev) = q.pop_front() {
557 return Some((
558 Ok(ev),
559 (input, q, ended, started_ids, index_to_id),
560 ));
561 }
562 continue;
563 }
564 Err(e) => {
565 return Some((Err(e), (input, q, ended, started_ids, index_to_id)))
566 }
567 }
568 }
569
570 ended = true;
571 Some((
572 Ok(StreamingEvent::StreamEnd {
573 finish_reason: None,
574 }),
575 (input, q, ended, started_ids, index_to_id),
576 ))
577 }
578 },
579 );
580
581 Ok(Box::pin(stream))
582 }
583}