Skip to main content

ai_lib_rust/pipeline/
event_map.rs

1//! 事件映射:将 JSON 帧转换为统一的 StreamingEvent。
2//!
3//! Event mapping (JSON Value -> StreamingEvent).
4//!
5//! Two modes:
6//! - If manifest provides `streaming.event_map`, use rule-based mapping
7//! - Otherwise, fallback to built-in adapter mapping (e.g. openai-style)
8
9use crate::pipeline::{Mapper, PipelineError};
10use crate::protocol::EventMapRule;
11use crate::protocol::ToolUseMapping;
12use crate::types::events::StreamingEvent;
13use crate::utils::JsonPathEvaluator;
14use crate::{BoxStream, PipeResult};
15use futures::{stream, StreamExt};
16use serde_json::Value;
17use std::collections::{HashMap, HashSet, VecDeque};
18use tracing::debug;
19
20#[derive(Clone)]
21struct CompiledRule {
22    matcher: JsonPathEvaluator,
23    emit: String,
24    extract: Vec<(String, String)>, // (field_name, json_path)
25}
26
27pub struct RuleBasedEventMapper {
28    rules: Vec<CompiledRule>,
29}
30
31impl RuleBasedEventMapper {
32    pub fn new(rules: &[EventMapRule]) -> Result<Self, PipelineError> {
33        let mut compiled = Vec::new();
34        for r in rules {
35            let matcher = JsonPathEvaluator::new(&r.match_expr).map_err(|e| {
36                PipelineError::InvalidJsonPath {
37                    path: r.match_expr.clone(),
38                    error: e.to_string(),
39                    hint: None,
40                }
41            })?;
42            let mut extract = Vec::new();
43            if let Some(map) = &r.fields {
44                for (k, v) in map {
45                    let k: &String = k;
46                    let v: &String = v;
47                    extract.push((k.clone(), v.clone()));
48                }
49            }
50            compiled.push(CompiledRule {
51                matcher,
52                emit: r.emit.clone(),
53                extract,
54            });
55        }
56        Ok(Self { rules: compiled })
57    }
58
59    fn build_event(
60        emit: &str,
61        frame: &Value,
62        extract: &[(String, String)],
63    ) -> Option<StreamingEvent> {
64        match emit {
65            "PartialContentDelta" => {
66                // Expect extracted `content` or infer from common openai path
67                let mut content: Option<String> = None;
68                for (k, p) in extract {
69                    if k == "content" {
70                        content = crate::utils::PathMapper::get_string(frame, p);
71                    }
72                }
73                let content = content.or_else(|| {
74                    crate::utils::PathMapper::get_string(frame, "$.choices[0].delta.content")
75                })?;
76
77                // Filter out empty content to avoid unnecessary events and ensure consistency
78                // with fallback mapper behavior. Empty content can occur when providers send
79                // frames with null/empty delta.content (e.g., during tool calls or finish events).
80                if content.is_empty() {
81                    return None;
82                }
83
84                Some(StreamingEvent::PartialContentDelta {
85                    content,
86                    sequence_id: None,
87                })
88            }
89            "Metadata" => {
90                // usage optional
91                let usage = crate::utils::PathMapper::get_path(frame, "$.usage").cloned();
92                Some(StreamingEvent::Metadata {
93                    usage,
94                    finish_reason: None,
95                    stop_reason: None,
96                })
97            }
98            "StreamEnd" => Some(StreamingEvent::StreamEnd {
99                finish_reason: None,
100            }),
101            _ => None,
102        }
103    }
104}
105
106#[async_trait::async_trait]
107impl Mapper for RuleBasedEventMapper {
108    async fn map(
109        &self,
110        input: BoxStream<'static, Value>,
111    ) -> PipeResult<BoxStream<'static, StreamingEvent>> {
112        let rules = self.rules.clone();
113
114        let mapped = stream::unfold((input, false), move |(mut input, mut ended)| {
115            let rules = rules.clone();
116            async move {
117                if ended {
118                    return None;
119                }
120
121                while let Some(item) = input.next().await {
122                    match item {
123                        Ok(frame) => {
124                            for r in &rules {
125                                if r.matcher.matches(&frame) {
126                                    if let Some(ev) = RuleBasedEventMapper::build_event(
127                                        &r.emit, &frame, &r.extract,
128                                    ) {
129                                        return Some((Ok(ev), (input, ended)));
130                                    }
131                                    // Rule matched but build_event returned None (e.g., empty content filtered)
132                                    // This is expected behavior, continue to next rule or frame
133                                }
134                            }
135
136                            // If no rule matched, skip this frame silently
137                            // This is normal for frames that don't match any event pattern
138                            // (e.g., ping frames, metadata-only frames, etc.)
139                            continue;
140                        }
141                        Err(e) => return Some((Err(e), (input, ended))),
142                    }
143                }
144
145                // EOF: emit StreamEnd exactly once
146                ended = true;
147                Some((
148                    Ok(StreamingEvent::StreamEnd {
149                        finish_reason: None,
150                    }),
151                    (input, ended),
152                ))
153            }
154        });
155
156        Ok(Box::pin(mapped))
157    }
158}
159
160/// Fallback openai-style mapping when no event_map rules are provided.
161pub struct OpenAiStyleEventMapper;
162
163#[async_trait::async_trait]
164impl Mapper for OpenAiStyleEventMapper {
165    async fn map(
166        &self,
167        input: BoxStream<'static, Value>,
168    ) -> PipeResult<BoxStream<'static, StreamingEvent>> {
169        let stream = stream::unfold((input, false), move |(mut input, mut ended)| async move {
170            if ended {
171                return None;
172            }
173
174            while let Some(item) = input.next().await {
175                match item {
176                    Ok(frame) => {
177                        // content delta
178                        if let Some(content) = crate::utils::PathMapper::get_string(
179                            &frame,
180                            "$.choices[0].delta.content",
181                        ) {
182                            if !content.is_empty() {
183                                return Some((
184                                    Ok(StreamingEvent::PartialContentDelta {
185                                        content,
186                                        sequence_id: None,
187                                    }),
188                                    (input, ended),
189                                ));
190                            }
191                        }
192
193                        // usage metadata (rare in streaming but possible)
194                        if let Some(usage) =
195                            crate::utils::PathMapper::get_path(&frame, "$.usage").cloned()
196                        {
197                            return Some((
198                                Ok(StreamingEvent::Metadata {
199                                    usage: Some(usage),
200                                    finish_reason: None,
201                                    stop_reason: None,
202                                }),
203                                (input, ended),
204                            ));
205                        }
206
207                        continue;
208                    }
209                    Err(e) => return Some((Err(e), (input, ended))),
210                }
211            }
212
213            ended = true;
214            Some((
215                Ok(StreamingEvent::StreamEnd {
216                    finish_reason: None,
217                }),
218                (input, ended),
219            ))
220        });
221
222        Ok(Box::pin(stream))
223    }
224}
225
226pub fn create_event_mapper(rules: &[EventMapRule]) -> Result<Box<dyn Mapper>, PipelineError> {
227    Ok(Box::new(RuleBasedEventMapper::new(rules)?))
228}
229
230/// Manifest-driven path mapper for streaming frames.
231/// Supports:
232/// - content_path (text deltas)
233/// - tool_call_path (OpenAI-style tool_calls delta array)
234/// - usage_path (usage metadata)
235pub struct PathEventMapper {
236    content_path: String,
237    tool_call_path: String,
238    usage_path: String,
239    tool_use: Option<ToolUseMapping>,
240}
241
242impl PathEventMapper {
243    pub fn new(
244        content_path: Option<String>,
245        tool_call_path: Option<String>,
246        usage_path: Option<String>,
247        tool_use: Option<ToolUseMapping>,
248    ) -> Self {
249        Self {
250            content_path: content_path.unwrap_or_else(|| "$.choices[0].delta.content".to_string()),
251            tool_call_path: tool_call_path
252                .unwrap_or_else(|| "$.choices[0].delta.tool_calls".to_string()),
253            usage_path: usage_path.unwrap_or_else(|| "$.usage".to_string()),
254            tool_use,
255        }
256    }
257}
258
259fn debug_toolcall_enabled() -> bool {
260    std::env::var("AI_LIB_DEBUG_TOOLCALL").ok().as_deref() == Some("1")
261}
262
263fn extract_toolcall_id(tc: &Value) -> Option<String> {
264    crate::utils::PathMapper::get_string(tc, "id")
265        .or_else(|| crate::utils::PathMapper::get_string(tc, "tool_call_id"))
266        .or_else(|| crate::utils::PathMapper::get_string(tc, "delta.id"))
267        .or_else(|| crate::utils::PathMapper::get_string(tc, "delta.tool_call_id"))
268}
269
270fn extract_toolcall_name(tc: &Value) -> Option<String> {
271    crate::utils::PathMapper::get_string(tc, "function.name")
272        .or_else(|| crate::utils::PathMapper::get_string(tc, "name"))
273        .or_else(|| crate::utils::PathMapper::get_string(tc, "delta.function.name"))
274        .or_else(|| crate::utils::PathMapper::get_string(tc, "delta.name"))
275}
276
277fn extract_toolcall_arguments(tc: &Value) -> Option<String> {
278    // Common variants:
279    // - function.arguments: string
280    // - arguments: string
281    // - delta.function.arguments: string
282    // - delta.arguments: string
283    // Sometimes providers may emit object already; stringify it.
284    if let Some(v) = crate::utils::PathMapper::get_path(tc, "function.arguments") {
285        if let Some(s) = v.as_str() {
286            return Some(s.to_string());
287        }
288        if v.is_object() || v.is_array() {
289            return serde_json::to_string(v).ok();
290        }
291    }
292    if let Some(v) = crate::utils::PathMapper::get_path(tc, "arguments") {
293        if let Some(s) = v.as_str() {
294            return Some(s.to_string());
295        }
296        if v.is_object() || v.is_array() {
297            return serde_json::to_string(v).ok();
298        }
299    }
300    if let Some(v) = crate::utils::PathMapper::get_path(tc, "delta.function.arguments") {
301        if let Some(s) = v.as_str() {
302            return Some(s.to_string());
303        }
304        if v.is_object() || v.is_array() {
305            return serde_json::to_string(v).ok();
306        }
307    }
308    if let Some(v) = crate::utils::PathMapper::get_path(tc, "delta.arguments") {
309        if let Some(s) = v.as_str() {
310            return Some(s.to_string());
311        }
312        if v.is_object() || v.is_array() {
313            return serde_json::to_string(v).ok();
314        }
315    }
316    None
317}
318
319fn extract_by_tooling(
320    tc: &Value,
321    tool_use: &ToolUseMapping,
322) -> (Option<String>, Option<String>, Option<String>) {
323    let id = tool_use
324        .id_path
325        .as_deref()
326        .and_then(|p| crate::utils::PathMapper::get_string(tc, p));
327    let name = tool_use
328        .name_path
329        .as_deref()
330        .and_then(|p| crate::utils::PathMapper::get_string(tc, p));
331    let args = tool_use.input_path.as_deref().and_then(|p| {
332        let v = crate::utils::PathMapper::get_path(tc, p)?;
333        if let Some(s) = v.as_str() {
334            Some(s.to_string())
335        } else if v.is_object() || v.is_array() {
336            serde_json::to_string(v).ok()
337        } else {
338            serde_json::to_string(v).ok()
339        }
340    });
341    (id, name, args)
342}
343
344#[async_trait::async_trait]
345impl Mapper for PathEventMapper {
346    async fn map(
347        &self,
348        input: BoxStream<'static, Value>,
349    ) -> PipeResult<BoxStream<'static, StreamingEvent>> {
350        let content_path = self.content_path.clone();
351        let tool_call_path = self.tool_call_path.clone();
352        let usage_path = self.usage_path.clone();
353        let tool_use = self.tool_use.clone();
354
355        // State is local to each stream to avoid cross-request contamination.
356        let stream = stream::unfold(
357            (
358                input,
359                VecDeque::<StreamingEvent>::new(),
360                false,
361                HashSet::<String>::new(),
362                HashMap::<u32, String>::new(),
363            ),
364            move |(mut input, mut q, mut ended, mut started_ids, mut index_to_id)| {
365                let content_path = content_path.clone();
366                let tool_call_path = tool_call_path.clone();
367                let usage_path = usage_path.clone();
368                let tool_use = tool_use.clone();
369                async move {
370                    if let Some(ev) = q.pop_front() {
371                        return Some((Ok(ev), (input, q, ended, started_ids, index_to_id)));
372                    }
373                    if ended {
374                        return None;
375                    }
376
377                    while let Some(item) = input.next().await {
378                        match item {
379                            Ok(frame) => {
380                                // content delta
381                                if let Some(content) =
382                                    crate::utils::PathMapper::get_string(&frame, &content_path)
383                                {
384                                    if !content.is_empty() {
385                                        q.push_back(StreamingEvent::PartialContentDelta {
386                                            content,
387                                            sequence_id: None,
388                                        });
389                                    }
390                                }
391
392                                // usage
393                                if let Some(usage) =
394                                    crate::utils::PathMapper::get_path(&frame, &usage_path).cloned()
395                                {
396                                    q.push_back(StreamingEvent::Metadata {
397                                        usage: Some(usage),
398                                        finish_reason: None,
399                                        stop_reason: None,
400                                    });
401                                }
402
403                                // tool calls (OpenAI delta style)
404                                if let Some(tc_val) =
405                                    crate::utils::PathMapper::get_path(&frame, &tool_call_path)
406                                {
407                                    if debug_toolcall_enabled() {
408                                        debug!(
409                                            tool_call_path = tool_call_path.as_str(),
410                                            tool_call_delta = %tc_val,
411                                            frame = %frame,
412                                            "tool_call delta observed"
413                                        );
414                                    }
415                                    if let Some(arr) = tc_val.as_array() {
416                                        for (idx, tc) in arr.iter().enumerate() {
417                                            // Determine tool-call index (some providers omit id on subsequent deltas)
418                                            let tc_index: u32 =
419                                                crate::utils::PathMapper::get_path(tc, "index")
420                                                    .and_then(|v| v.as_u64())
421                                                    .map(|v| v as u32)
422                                                    .unwrap_or(idx as u32);
423
424                                            // Prefer protocol tooling mapping if present
425                                            let (mut id, mut name, mut args) =
426                                                if let Some(ref tu) = tool_use {
427                                                    extract_by_tooling(tc, tu)
428                                                } else {
429                                                    (None, None, None)
430                                                };
431
432                                            // Fallback to openai-style variants if tooling mapping didn't yield values
433                                            if id.is_none() {
434                                                id = extract_toolcall_id(tc);
435                                            }
436                                            if name.is_none() {
437                                                name = extract_toolcall_name(tc);
438                                            }
439                                            if args.is_none() {
440                                                args = extract_toolcall_arguments(tc);
441                                            }
442
443                                            // If we saw an id, remember it for this index
444                                            if let Some(ref real_id) = id {
445                                                index_to_id.insert(tc_index, real_id.clone());
446                                            } else {
447                                                // Otherwise, try to recover id from prior frames using index
448                                                id = index_to_id.get(&tc_index).cloned();
449                                            }
450
451                                            if let (Some(id), Some(name)) =
452                                                (id.clone(), name.clone())
453                                            {
454                                                if !started_ids.contains(&id) {
455                                                    started_ids.insert(id.clone());
456                                                    q.push_back(StreamingEvent::ToolCallStarted {
457                                                        tool_call_id: id.clone(),
458                                                        tool_name: name,
459                                                        index: Some(tc_index),
460                                                    });
461                                                }
462                                            }
463
464                                            if let (Some(id), Some(arguments)) = (id, args) {
465                                                q.push_back(StreamingEvent::PartialToolCall {
466                                                    tool_call_id: id,
467                                                    arguments,
468                                                    index: Some(tc_index),
469                                                    is_complete: None,
470                                                });
471                                            }
472                                        }
473                                    }
474                                }
475
476                                if let Some(ev) = q.pop_front() {
477                                    return Some((
478                                        Ok(ev),
479                                        (input, q, ended, started_ids, index_to_id),
480                                    ));
481                                }
482                                continue;
483                            }
484                            Err(e) => {
485                                return Some((Err(e), (input, q, ended, started_ids, index_to_id)))
486                            }
487                        }
488                    }
489
490                    ended = true;
491                    Some((
492                        Ok(StreamingEvent::StreamEnd {
493                            finish_reason: None,
494                        }),
495                        (input, q, ended, started_ids, index_to_id),
496                    ))
497                }
498            },
499        );
500
501        Ok(Box::pin(stream))
502    }
503}