Skip to main content

ai_lib_core/pipeline/
event_map.rs

1//! 事件映射:将 JSON 帧转换为统一的 StreamingEvent。
2//!
3//! Event mapping (JSON Value -> StreamingEvent).
4//!
5//! Two modes:
6//! - If manifest provides `streaming.event_map`, use rule-based mapping
7//! - Otherwise, fallback to built-in adapter mapping (e.g. openai-style)
8
9use crate::pipeline::{Mapper, PipelineError};
10use crate::protocol::EventMapRule;
11use crate::protocol::ToolUseMapping;
12use crate::types::events::StreamingEvent;
13use crate::utils::JsonPathEvaluator;
14use crate::{BoxStream, PipeResult};
15use futures::{stream, StreamExt};
16use serde_json::Value;
17use std::collections::{HashMap, HashSet, VecDeque};
18use tracing::debug;
19
20#[derive(Clone)]
21struct CompiledRule {
22    matcher: JsonPathEvaluator,
23    emit: String,
24    extract: Vec<(String, String)>, // (field_name, json_path)
25}
26
27pub struct RuleBasedEventMapper {
28    rules: Vec<CompiledRule>,
29}
30
31impl RuleBasedEventMapper {
32    pub fn new(rules: &[EventMapRule]) -> Result<Self, PipelineError> {
33        let mut compiled = Vec::new();
34        for r in rules {
35            let matcher = JsonPathEvaluator::new(&r.match_expr).map_err(|e| {
36                PipelineError::InvalidJsonPath {
37                    path: r.match_expr.clone(),
38                    error: e.to_string(),
39                    hint: None,
40                }
41            })?;
42            let mut extract = Vec::new();
43            if let Some(map) = &r.fields {
44                for (k, v) in map {
45                    let k: &String = k;
46                    let v: &String = v;
47                    extract.push((k.clone(), v.clone()));
48                }
49            }
50            compiled.push(CompiledRule {
51                matcher,
52                emit: r.emit.clone(),
53                extract,
54            });
55        }
56        Ok(Self { rules: compiled })
57    }
58
59    fn build_event(
60        emit: &str,
61        frame: &Value,
62        extract: &[(String, String)],
63    ) -> Option<StreamingEvent> {
64        match emit {
65            "PartialContentDelta" => {
66                // Expect extracted `content` or infer from common openai path
67                let mut content: Option<String> = None;
68                for (k, p) in extract {
69                    if k == "content" {
70                        content = crate::utils::PathMapper::get_string(frame, p);
71                    }
72                }
73                let content = content.or_else(|| {
74                    crate::utils::PathMapper::get_string(frame, "$.choices[0].delta.content")
75                })?;
76
77                // Filter out empty content to avoid unnecessary events and ensure consistency
78                // with fallback mapper behavior. Empty content can occur when providers send
79                // frames with null/empty delta.content (e.g., during tool calls or finish events).
80                if content.is_empty() {
81                    return None;
82                }
83
84                Some(StreamingEvent::PartialContentDelta {
85                    content,
86                    sequence_id: None,
87                })
88            }
89            "Metadata" => {
90                // usage optional
91                let usage = crate::utils::PathMapper::get_path(frame, "$.usage").cloned();
92                Some(StreamingEvent::Metadata {
93                    usage,
94                    finish_reason: None,
95                    stop_reason: None,
96                })
97            }
98            "ThinkingDelta" => {
99                let mut thinking: Option<String> = None;
100                for (k, p) in extract {
101                    if k == "thinking" {
102                        thinking = crate::utils::PathMapper::get_string(frame, p);
103                    }
104                }
105                let thinking = thinking.or_else(|| {
106                    crate::utils::PathMapper::get_string(
107                        frame,
108                        "$.choices[0].delta.reasoning_content",
109                    )
110                })?;
111                if thinking.is_empty() {
112                    return None;
113                }
114                Some(StreamingEvent::ThinkingDelta {
115                    thinking,
116                    tool_consideration: None,
117                })
118            }
119            "StreamEnd" => {
120                let mut finish: Option<String> = None;
121                for (k, p) in extract {
122                    if k == "finish_reason" {
123                        finish = crate::utils::PathMapper::get_string(frame, p);
124                    }
125                }
126                let finish_reason = finish.or_else(|| {
127                    crate::utils::PathMapper::get_string(frame, "$.choices[0].finish_reason")
128                });
129                Some(StreamingEvent::StreamEnd { finish_reason })
130            }
131            _ => None,
132        }
133    }
134}
135
136#[async_trait::async_trait]
137impl Mapper for RuleBasedEventMapper {
138    async fn map(
139        &self,
140        input: BoxStream<'static, Value>,
141    ) -> PipeResult<BoxStream<'static, StreamingEvent>> {
142        let rules = self.rules.clone();
143
144        let mapped = stream::unfold((input, false), move |(mut input, mut ended)| {
145            let rules = rules.clone();
146            async move {
147                if ended {
148                    return None;
149                }
150
151                while let Some(item) = input.next().await {
152                    match item {
153                        Ok(frame) => {
154                            for r in &rules {
155                                if r.matcher.matches(&frame) {
156                                    if let Some(ev) = RuleBasedEventMapper::build_event(
157                                        &r.emit, &frame, &r.extract,
158                                    ) {
159                                        return Some((Ok(ev), (input, ended)));
160                                    }
161                                    // Rule matched but build_event returned None (e.g., empty content filtered)
162                                    // This is expected behavior, continue to next rule or frame
163                                }
164                            }
165
166                            // If no rule matched, skip this frame silently
167                            // This is normal for frames that don't match any event pattern
168                            // (e.g., ping frames, metadata-only frames, etc.)
169                            continue;
170                        }
171                        Err(e) => return Some((Err(e), (input, ended))),
172                    }
173                }
174
175                // EOF: emit StreamEnd exactly once
176                ended = true;
177                Some((
178                    Ok(StreamingEvent::StreamEnd {
179                        finish_reason: None,
180                    }),
181                    (input, ended),
182                ))
183            }
184        });
185
186        Ok(Box::pin(mapped))
187    }
188}
189
190/// Fallback openai-style mapping when no event_map rules are provided.
191pub struct OpenAiStyleEventMapper;
192
193#[async_trait::async_trait]
194impl Mapper for OpenAiStyleEventMapper {
195    async fn map(
196        &self,
197        input: BoxStream<'static, Value>,
198    ) -> PipeResult<BoxStream<'static, StreamingEvent>> {
199        let stream = stream::unfold((input, false), move |(mut input, mut ended)| async move {
200            if ended {
201                return None;
202            }
203
204            while let Some(item) = input.next().await {
205                match item {
206                    Ok(frame) => {
207                        // content delta
208                        if let Some(content) = crate::utils::PathMapper::get_string(
209                            &frame,
210                            "$.choices[0].delta.content",
211                        ) {
212                            if !content.is_empty() {
213                                return Some((
214                                    Ok(StreamingEvent::PartialContentDelta {
215                                        content,
216                                        sequence_id: None,
217                                    }),
218                                    (input, ended),
219                                ));
220                            }
221                        }
222
223                        if let Some(thinking) = crate::utils::PathMapper::get_string(
224                            &frame,
225                            "$.choices[0].delta.reasoning_content",
226                        ) {
227                            if !thinking.is_empty() {
228                                return Some((
229                                    Ok(StreamingEvent::ThinkingDelta {
230                                        thinking,
231                                        tool_consideration: None,
232                                    }),
233                                    (input, ended),
234                                ));
235                            }
236                        }
237
238                        // usage metadata (rare in streaming but possible)
239                        if let Some(usage) =
240                            crate::utils::PathMapper::get_path(&frame, "$.usage").cloned()
241                        {
242                            return Some((
243                                Ok(StreamingEvent::Metadata {
244                                    usage: Some(usage),
245                                    finish_reason: None,
246                                    stop_reason: None,
247                                }),
248                                (input, ended),
249                            ));
250                        }
251
252                        if let Some(reason) = crate::utils::PathMapper::get_string(
253                            &frame,
254                            "$.choices[0].finish_reason",
255                        ) {
256                            if !reason.is_empty() {
257                                return Some((
258                                    Ok(StreamingEvent::StreamEnd {
259                                        finish_reason: Some(reason),
260                                    }),
261                                    (input, ended),
262                                ));
263                            }
264                        }
265
266                        continue;
267                    }
268                    Err(e) => return Some((Err(e), (input, ended))),
269                }
270            }
271
272            ended = true;
273            Some((
274                Ok(StreamingEvent::StreamEnd {
275                    finish_reason: None,
276                }),
277                (input, ended),
278            ))
279        });
280
281        Ok(Box::pin(stream))
282    }
283}
284
285pub fn create_event_mapper(rules: &[EventMapRule]) -> Result<Box<dyn Mapper>, PipelineError> {
286    Ok(Box::new(RuleBasedEventMapper::new(rules)?))
287}
288
289/// Manifest-driven path mapper for streaming frames.
290/// Supports:
291/// - content_path (text deltas)
292/// - tool_call_path (OpenAI-style tool_calls delta array)
293/// - usage_path (usage metadata)
294pub struct PathEventMapper {
295    content_path: String,
296    tool_call_path: String,
297    usage_path: String,
298    tool_use: Option<ToolUseMapping>,
299}
300
301impl PathEventMapper {
302    pub fn new(
303        content_path: Option<String>,
304        tool_call_path: Option<String>,
305        usage_path: Option<String>,
306        tool_use: Option<ToolUseMapping>,
307    ) -> Self {
308        Self {
309            content_path: content_path.unwrap_or_else(|| "$.choices[0].delta.content".to_string()),
310            tool_call_path: tool_call_path
311                .unwrap_or_else(|| "$.choices[0].delta.tool_calls".to_string()),
312            usage_path: usage_path.unwrap_or_else(|| "$.usage".to_string()),
313            tool_use,
314        }
315    }
316}
317
318fn debug_toolcall_enabled() -> bool {
319    std::env::var("AI_LIB_DEBUG_TOOLCALL").ok().as_deref() == Some("1")
320}
321
322fn extract_toolcall_id(tc: &Value) -> Option<String> {
323    crate::utils::PathMapper::get_string(tc, "id")
324        .or_else(|| crate::utils::PathMapper::get_string(tc, "tool_call_id"))
325        .or_else(|| crate::utils::PathMapper::get_string(tc, "delta.id"))
326        .or_else(|| crate::utils::PathMapper::get_string(tc, "delta.tool_call_id"))
327}
328
329fn extract_toolcall_name(tc: &Value) -> Option<String> {
330    crate::utils::PathMapper::get_string(tc, "function.name")
331        .or_else(|| crate::utils::PathMapper::get_string(tc, "name"))
332        .or_else(|| crate::utils::PathMapper::get_string(tc, "delta.function.name"))
333        .or_else(|| crate::utils::PathMapper::get_string(tc, "delta.name"))
334}
335
336fn extract_toolcall_arguments(tc: &Value) -> Option<String> {
337    // Common variants:
338    // - function.arguments: string
339    // - arguments: string
340    // - delta.function.arguments: string
341    // - delta.arguments: string
342    // Sometimes providers may emit object already; stringify it.
343    if let Some(v) = crate::utils::PathMapper::get_path(tc, "function.arguments") {
344        if let Some(s) = v.as_str() {
345            return Some(s.to_string());
346        }
347        if v.is_object() || v.is_array() {
348            return serde_json::to_string(v).ok();
349        }
350    }
351    if let Some(v) = crate::utils::PathMapper::get_path(tc, "arguments") {
352        if let Some(s) = v.as_str() {
353            return Some(s.to_string());
354        }
355        if v.is_object() || v.is_array() {
356            return serde_json::to_string(v).ok();
357        }
358    }
359    if let Some(v) = crate::utils::PathMapper::get_path(tc, "delta.function.arguments") {
360        if let Some(s) = v.as_str() {
361            return Some(s.to_string());
362        }
363        if v.is_object() || v.is_array() {
364            return serde_json::to_string(v).ok();
365        }
366    }
367    if let Some(v) = crate::utils::PathMapper::get_path(tc, "delta.arguments") {
368        if let Some(s) = v.as_str() {
369            return Some(s.to_string());
370        }
371        if v.is_object() || v.is_array() {
372            return serde_json::to_string(v).ok();
373        }
374    }
375    None
376}
377
378fn extract_by_tooling(
379    tc: &Value,
380    tool_use: &ToolUseMapping,
381) -> (Option<String>, Option<String>, Option<String>) {
382    let id = tool_use
383        .id_path
384        .as_deref()
385        .and_then(|p| crate::utils::PathMapper::get_string(tc, p));
386    let name = tool_use
387        .name_path
388        .as_deref()
389        .and_then(|p| crate::utils::PathMapper::get_string(tc, p));
390    let args = tool_use.input_path.as_deref().and_then(|p| {
391        let v = crate::utils::PathMapper::get_path(tc, p)?;
392        if let Some(s) = v.as_str() {
393            Some(s.to_string())
394        } else {
395            serde_json::to_string(v).ok()
396        }
397    });
398    (id, name, args)
399}
400
401#[async_trait::async_trait]
402impl Mapper for PathEventMapper {
403    async fn map(
404        &self,
405        input: BoxStream<'static, Value>,
406    ) -> PipeResult<BoxStream<'static, StreamingEvent>> {
407        let content_path = self.content_path.clone();
408        let tool_call_path = self.tool_call_path.clone();
409        let usage_path = self.usage_path.clone();
410        let tool_use = self.tool_use.clone();
411
412        // State is local to each stream to avoid cross-request contamination.
413        let stream = stream::unfold(
414            (
415                input,
416                VecDeque::<StreamingEvent>::new(),
417                false,
418                HashSet::<String>::new(),
419                HashMap::<u32, String>::new(),
420            ),
421            move |(mut input, mut q, mut ended, mut started_ids, mut index_to_id)| {
422                let content_path = content_path.clone();
423                let tool_call_path = tool_call_path.clone();
424                let usage_path = usage_path.clone();
425                let tool_use = tool_use.clone();
426                async move {
427                    if let Some(ev) = q.pop_front() {
428                        return Some((Ok(ev), (input, q, ended, started_ids, index_to_id)));
429                    }
430                    if ended {
431                        return None;
432                    }
433
434                    while let Some(item) = input.next().await {
435                        match item {
436                            Ok(frame) => {
437                                // content delta
438                                if let Some(content) =
439                                    crate::utils::PathMapper::get_string(&frame, &content_path)
440                                {
441                                    if !content.is_empty() {
442                                        q.push_back(StreamingEvent::PartialContentDelta {
443                                            content,
444                                            sequence_id: None,
445                                        });
446                                    }
447                                }
448
449                                // usage
450                                if let Some(usage) =
451                                    crate::utils::PathMapper::get_path(&frame, &usage_path).cloned()
452                                {
453                                    q.push_back(StreamingEvent::Metadata {
454                                        usage: Some(usage),
455                                        finish_reason: None,
456                                        stop_reason: None,
457                                    });
458                                }
459
460                                // tool calls (OpenAI delta style)
461                                if let Some(tc_val) =
462                                    crate::utils::PathMapper::get_path(&frame, &tool_call_path)
463                                {
464                                    if debug_toolcall_enabled() {
465                                        debug!(
466                                            tool_call_path = tool_call_path.as_str(),
467                                            tool_call_delta = %tc_val,
468                                            frame = %frame,
469                                            "tool_call delta observed"
470                                        );
471                                    }
472                                    if let Some(arr) = tc_val.as_array() {
473                                        for (idx, tc) in arr.iter().enumerate() {
474                                            // Determine tool-call index (some providers omit id on subsequent deltas)
475                                            let tc_index: u32 =
476                                                crate::utils::PathMapper::get_path(tc, "index")
477                                                    .and_then(|v| v.as_u64())
478                                                    .map(|v| v as u32)
479                                                    .unwrap_or(idx as u32);
480
481                                            // Prefer protocol tooling mapping if present
482                                            let (mut id, mut name, mut args) =
483                                                if let Some(ref tu) = tool_use {
484                                                    extract_by_tooling(tc, tu)
485                                                } else {
486                                                    (None, None, None)
487                                                };
488
489                                            // Fallback to openai-style variants if tooling mapping didn't yield values
490                                            if id.is_none() {
491                                                id = extract_toolcall_id(tc);
492                                            }
493                                            if name.is_none() {
494                                                name = extract_toolcall_name(tc);
495                                            }
496                                            if args.is_none() {
497                                                args = extract_toolcall_arguments(tc);
498                                            }
499
500                                            // If we saw an id, remember it for this index
501                                            if let Some(ref real_id) = id {
502                                                index_to_id.insert(tc_index, real_id.clone());
503                                            } else {
504                                                // Otherwise, try to recover id from prior frames using index
505                                                id = index_to_id.get(&tc_index).cloned();
506                                            }
507
508                                            if let (Some(id), Some(name)) =
509                                                (id.clone(), name.clone())
510                                            {
511                                                if !started_ids.contains(&id) {
512                                                    started_ids.insert(id.clone());
513                                                    q.push_back(StreamingEvent::ToolCallStarted {
514                                                        tool_call_id: id.clone(),
515                                                        tool_name: name,
516                                                        index: Some(tc_index),
517                                                    });
518                                                }
519                                            }
520
521                                            if let (Some(id), Some(arguments)) = (id, args) {
522                                                q.push_back(StreamingEvent::PartialToolCall {
523                                                    tool_call_id: id,
524                                                    arguments,
525                                                    index: Some(tc_index),
526                                                    is_complete: None,
527                                                });
528                                            }
529                                        }
530                                    }
531                                }
532
533                                if let Some(thinking) = crate::utils::PathMapper::get_string(
534                                    &frame,
535                                    "$.choices[0].delta.reasoning_content",
536                                ) {
537                                    if !thinking.is_empty() {
538                                        q.push_back(StreamingEvent::ThinkingDelta {
539                                            thinking,
540                                            tool_consideration: None,
541                                        });
542                                    }
543                                }
544
545                                if let Some(reason) = crate::utils::PathMapper::get_string(
546                                    &frame,
547                                    "$.choices[0].finish_reason",
548                                ) {
549                                    if !reason.is_empty() {
550                                        q.push_back(StreamingEvent::StreamEnd {
551                                            finish_reason: Some(reason),
552                                        });
553                                    }
554                                }
555
556                                if let Some(ev) = q.pop_front() {
557                                    return Some((
558                                        Ok(ev),
559                                        (input, q, ended, started_ids, index_to_id),
560                                    ));
561                                }
562                                continue;
563                            }
564                            Err(e) => {
565                                return Some((Err(e), (input, q, ended, started_ids, index_to_id)))
566                            }
567                        }
568                    }
569
570                    ended = true;
571                    Some((
572                        Ok(StreamingEvent::StreamEnd {
573                            finish_reason: None,
574                        }),
575                        (input, q, ended, started_ids, index_to_id),
576                    ))
577                }
578            },
579        );
580
581        Ok(Box::pin(stream))
582    }
583}