Skip to main content

ai_lib_rust/pipeline/
event_map.rs

1//! Event mapping (JSON Value -> StreamingEvent)
2//!
3//! Two modes:
4//! - If manifest provides `streaming.event_map`, use rule-based mapping
5//! - Otherwise, fallback to built-in adapter mapping (e.g. openai-style)
6
7use crate::pipeline::{Mapper, PipelineError};
8use crate::protocol::EventMapRule;
9use crate::protocol::ToolUseMapping;
10use crate::types::events::StreamingEvent;
11use crate::utils::JsonPathEvaluator;
12use crate::{BoxStream, PipeResult};
13use futures::{stream, StreamExt};
14use serde_json::Value;
15use std::collections::{HashMap, HashSet, VecDeque};
16use tracing::debug;
17
18#[derive(Clone)]
19struct CompiledRule {
20    matcher: JsonPathEvaluator,
21    emit: String,
22    extract: Vec<(String, String)>, // (field_name, json_path)
23}
24
25pub struct RuleBasedEventMapper {
26    rules: Vec<CompiledRule>,
27}
28
29impl RuleBasedEventMapper {
30    pub fn new(rules: &[EventMapRule]) -> Result<Self, PipelineError> {
31        let mut compiled = Vec::new();
32        for r in rules {
33            let matcher = JsonPathEvaluator::new(&r.match_expr).map_err(|e| {
34                PipelineError::InvalidJsonPath {
35                    path: r.match_expr.clone(),
36                    error: e.to_string(),
37                    hint: None,
38                }
39            })?;
40            let mut extract = Vec::new();
41            if let Some(map) = &r.fields {
42                for (k, v) in map {
43                    extract.push((k.clone(), v.clone()));
44                }
45            }
46            compiled.push(CompiledRule {
47                matcher,
48                emit: r.emit.clone(),
49                extract,
50            });
51        }
52        Ok(Self { rules: compiled })
53    }
54
55    fn build_event(
56        emit: &str,
57        frame: &Value,
58        extract: &[(String, String)],
59    ) -> Option<StreamingEvent> {
60        match emit {
61            "PartialContentDelta" => {
62                // Expect extracted `content` or infer from common openai path
63                let mut content: Option<String> = None;
64                for (k, p) in extract {
65                    if k == "content" {
66                        content = crate::utils::PathMapper::get_string(frame, p);
67                    }
68                }
69                let content = content.or_else(|| {
70                    crate::utils::PathMapper::get_string(frame, "$.choices[0].delta.content")
71                })?;
72
73                // Filter out empty content to avoid unnecessary events and ensure consistency
74                // with fallback mapper behavior. Empty content can occur when providers send
75                // frames with null/empty delta.content (e.g., during tool calls or finish events).
76                if content.is_empty() {
77                    return None;
78                }
79
80                Some(StreamingEvent::PartialContentDelta {
81                    content,
82                    sequence_id: None,
83                })
84            }
85            "Metadata" => {
86                // usage optional
87                let usage = crate::utils::PathMapper::get_path(frame, "$.usage").cloned();
88                Some(StreamingEvent::Metadata {
89                    usage,
90                    finish_reason: None,
91                    stop_reason: None,
92                })
93            }
94            "StreamEnd" => Some(StreamingEvent::StreamEnd {
95                finish_reason: None,
96            }),
97            _ => None,
98        }
99    }
100}
101
102#[async_trait::async_trait]
103impl Mapper for RuleBasedEventMapper {
104    async fn map(
105        &self,
106        input: BoxStream<'static, Value>,
107    ) -> PipeResult<BoxStream<'static, StreamingEvent>> {
108        let rules = self.rules.clone();
109
110        let mapped = stream::unfold((input, false), move |(mut input, mut ended)| {
111            let rules = rules.clone();
112            async move {
113                if ended {
114                    return None;
115                }
116
117                while let Some(item) = input.next().await {
118                    match item {
119                        Ok(frame) => {
120                            for r in &rules {
121                                if r.matcher.matches(&frame) {
122                                    if let Some(ev) = RuleBasedEventMapper::build_event(
123                                        &r.emit, &frame, &r.extract,
124                                    ) {
125                                        return Some((Ok(ev), (input, ended)));
126                                    }
127                                    // Rule matched but build_event returned None (e.g., empty content filtered)
128                                    // This is expected behavior, continue to next rule or frame
129                                }
130                            }
131
132                            // If no rule matched, skip this frame silently
133                            // This is normal for frames that don't match any event pattern
134                            // (e.g., ping frames, metadata-only frames, etc.)
135                            continue;
136                        }
137                        Err(e) => return Some((Err(e), (input, ended))),
138                    }
139                }
140
141                // EOF: emit StreamEnd exactly once
142                ended = true;
143                Some((
144                    Ok(StreamingEvent::StreamEnd {
145                        finish_reason: None,
146                    }),
147                    (input, ended),
148                ))
149            }
150        });
151
152        Ok(Box::pin(mapped))
153    }
154}
155
156/// Fallback openai-style mapping when no event_map rules are provided.
157pub struct OpenAiStyleEventMapper;
158
159#[async_trait::async_trait]
160impl Mapper for OpenAiStyleEventMapper {
161    async fn map(
162        &self,
163        input: BoxStream<'static, Value>,
164    ) -> PipeResult<BoxStream<'static, StreamingEvent>> {
165        let stream = stream::unfold((input, false), move |(mut input, mut ended)| async move {
166            if ended {
167                return None;
168            }
169
170            while let Some(item) = input.next().await {
171                match item {
172                    Ok(frame) => {
173                        // content delta
174                        if let Some(content) = crate::utils::PathMapper::get_string(
175                            &frame,
176                            "$.choices[0].delta.content",
177                        ) {
178                            if !content.is_empty() {
179                                return Some((
180                                    Ok(StreamingEvent::PartialContentDelta {
181                                        content,
182                                        sequence_id: None,
183                                    }),
184                                    (input, ended),
185                                ));
186                            }
187                        }
188
189                        // usage metadata (rare in streaming but possible)
190                        if let Some(usage) =
191                            crate::utils::PathMapper::get_path(&frame, "$.usage").cloned()
192                        {
193                            return Some((
194                                Ok(StreamingEvent::Metadata {
195                                    usage: Some(usage),
196                                    finish_reason: None,
197                                    stop_reason: None,
198                                }),
199                                (input, ended),
200                            ));
201                        }
202
203                        continue;
204                    }
205                    Err(e) => return Some((Err(e), (input, ended))),
206                }
207            }
208
209            ended = true;
210            Some((
211                Ok(StreamingEvent::StreamEnd {
212                    finish_reason: None,
213                }),
214                (input, ended),
215            ))
216        });
217
218        Ok(Box::pin(stream))
219    }
220}
221
222pub fn create_event_mapper(rules: &[EventMapRule]) -> Result<Box<dyn Mapper>, PipelineError> {
223    Ok(Box::new(RuleBasedEventMapper::new(rules)?))
224}
225
226/// Manifest-driven path mapper for streaming frames.
227/// Supports:
228/// - content_path (text deltas)
229/// - tool_call_path (OpenAI-style tool_calls delta array)
230/// - usage_path (usage metadata)
231pub struct PathEventMapper {
232    content_path: String,
233    tool_call_path: String,
234    usage_path: String,
235    tool_use: Option<ToolUseMapping>,
236}
237
238impl PathEventMapper {
239    pub fn new(
240        content_path: Option<String>,
241        tool_call_path: Option<String>,
242        usage_path: Option<String>,
243        tool_use: Option<ToolUseMapping>,
244    ) -> Self {
245        Self {
246            content_path: content_path.unwrap_or_else(|| "$.choices[0].delta.content".to_string()),
247            tool_call_path: tool_call_path
248                .unwrap_or_else(|| "$.choices[0].delta.tool_calls".to_string()),
249            usage_path: usage_path.unwrap_or_else(|| "$.usage".to_string()),
250            tool_use,
251        }
252    }
253}
254
255fn debug_toolcall_enabled() -> bool {
256    std::env::var("AI_LIB_DEBUG_TOOLCALL").ok().as_deref() == Some("1")
257}
258
259fn extract_toolcall_id(tc: &Value) -> Option<String> {
260    crate::utils::PathMapper::get_string(tc, "id")
261        .or_else(|| crate::utils::PathMapper::get_string(tc, "tool_call_id"))
262        .or_else(|| crate::utils::PathMapper::get_string(tc, "delta.id"))
263        .or_else(|| crate::utils::PathMapper::get_string(tc, "delta.tool_call_id"))
264}
265
266fn extract_toolcall_name(tc: &Value) -> Option<String> {
267    crate::utils::PathMapper::get_string(tc, "function.name")
268        .or_else(|| crate::utils::PathMapper::get_string(tc, "name"))
269        .or_else(|| crate::utils::PathMapper::get_string(tc, "delta.function.name"))
270        .or_else(|| crate::utils::PathMapper::get_string(tc, "delta.name"))
271}
272
273fn extract_toolcall_arguments(tc: &Value) -> Option<String> {
274    // Common variants:
275    // - function.arguments: string
276    // - arguments: string
277    // - delta.function.arguments: string
278    // - delta.arguments: string
279    // Sometimes providers may emit object already; stringify it.
280    if let Some(v) = crate::utils::PathMapper::get_path(tc, "function.arguments") {
281        if let Some(s) = v.as_str() {
282            return Some(s.to_string());
283        }
284        if v.is_object() || v.is_array() {
285            return serde_json::to_string(v).ok();
286        }
287    }
288    if let Some(v) = crate::utils::PathMapper::get_path(tc, "arguments") {
289        if let Some(s) = v.as_str() {
290            return Some(s.to_string());
291        }
292        if v.is_object() || v.is_array() {
293            return serde_json::to_string(v).ok();
294        }
295    }
296    if let Some(v) = crate::utils::PathMapper::get_path(tc, "delta.function.arguments") {
297        if let Some(s) = v.as_str() {
298            return Some(s.to_string());
299        }
300        if v.is_object() || v.is_array() {
301            return serde_json::to_string(v).ok();
302        }
303    }
304    if let Some(v) = crate::utils::PathMapper::get_path(tc, "delta.arguments") {
305        if let Some(s) = v.as_str() {
306            return Some(s.to_string());
307        }
308        if v.is_object() || v.is_array() {
309            return serde_json::to_string(v).ok();
310        }
311    }
312    None
313}
314
315fn extract_by_tooling(
316    tc: &Value,
317    tool_use: &ToolUseMapping,
318) -> (Option<String>, Option<String>, Option<String>) {
319    let id = tool_use
320        .id_path
321        .as_deref()
322        .and_then(|p| crate::utils::PathMapper::get_string(tc, p));
323    let name = tool_use
324        .name_path
325        .as_deref()
326        .and_then(|p| crate::utils::PathMapper::get_string(tc, p));
327    let args = tool_use.input_path.as_deref().and_then(|p| {
328        let v = crate::utils::PathMapper::get_path(tc, p)?;
329        if let Some(s) = v.as_str() {
330            Some(s.to_string())
331        } else if v.is_object() || v.is_array() {
332            serde_json::to_string(v).ok()
333        } else {
334            serde_json::to_string(v).ok()
335        }
336    });
337    (id, name, args)
338}
339
340#[async_trait::async_trait]
341impl Mapper for PathEventMapper {
342    async fn map(
343        &self,
344        input: BoxStream<'static, Value>,
345    ) -> PipeResult<BoxStream<'static, StreamingEvent>> {
346        let content_path = self.content_path.clone();
347        let tool_call_path = self.tool_call_path.clone();
348        let usage_path = self.usage_path.clone();
349        let tool_use = self.tool_use.clone();
350
351        // State is local to each stream to avoid cross-request contamination.
352        let stream = stream::unfold(
353            (
354                input,
355                VecDeque::<StreamingEvent>::new(),
356                false,
357                HashSet::<String>::new(),
358                HashMap::<u32, String>::new(),
359            ),
360            move |(mut input, mut q, mut ended, mut started_ids, mut index_to_id)| {
361                let content_path = content_path.clone();
362                let tool_call_path = tool_call_path.clone();
363                let usage_path = usage_path.clone();
364                let tool_use = tool_use.clone();
365                async move {
366                    if let Some(ev) = q.pop_front() {
367                        return Some((Ok(ev), (input, q, ended, started_ids, index_to_id)));
368                    }
369                    if ended {
370                        return None;
371                    }
372
373                    while let Some(item) = input.next().await {
374                        match item {
375                            Ok(frame) => {
376                                // content delta
377                                if let Some(content) =
378                                    crate::utils::PathMapper::get_string(&frame, &content_path)
379                                {
380                                    if !content.is_empty() {
381                                        q.push_back(StreamingEvent::PartialContentDelta {
382                                            content,
383                                            sequence_id: None,
384                                        });
385                                    }
386                                }
387
388                                // usage
389                                if let Some(usage) =
390                                    crate::utils::PathMapper::get_path(&frame, &usage_path).cloned()
391                                {
392                                    q.push_back(StreamingEvent::Metadata {
393                                        usage: Some(usage),
394                                        finish_reason: None,
395                                        stop_reason: None,
396                                    });
397                                }
398
399                                // tool calls (OpenAI delta style)
400                                if let Some(tc_val) =
401                                    crate::utils::PathMapper::get_path(&frame, &tool_call_path)
402                                {
403                                    if debug_toolcall_enabled() {
404                                        debug!(
405                                            tool_call_path = tool_call_path.as_str(),
406                                            tool_call_delta = %tc_val,
407                                            frame = %frame,
408                                            "tool_call delta observed"
409                                        );
410                                    }
411                                    if let Some(arr) = tc_val.as_array() {
412                                        for (idx, tc) in arr.iter().enumerate() {
413                                            // Determine tool-call index (some providers omit id on subsequent deltas)
414                                            let tc_index: u32 =
415                                                crate::utils::PathMapper::get_path(tc, "index")
416                                                    .and_then(|v| v.as_u64())
417                                                    .map(|v| v as u32)
418                                                    .unwrap_or(idx as u32);
419
420                                            // Prefer protocol tooling mapping if present
421                                            let (mut id, mut name, mut args) =
422                                                if let Some(ref tu) = tool_use {
423                                                    extract_by_tooling(tc, tu)
424                                                } else {
425                                                    (None, None, None)
426                                                };
427
428                                            // Fallback to openai-style variants if tooling mapping didn't yield values
429                                            if id.is_none() {
430                                                id = extract_toolcall_id(tc);
431                                            }
432                                            if name.is_none() {
433                                                name = extract_toolcall_name(tc);
434                                            }
435                                            if args.is_none() {
436                                                args = extract_toolcall_arguments(tc);
437                                            }
438
439                                            // If we saw an id, remember it for this index
440                                            if let Some(ref real_id) = id {
441                                                index_to_id.insert(tc_index, real_id.clone());
442                                            } else {
443                                                // Otherwise, try to recover id from prior frames using index
444                                                id = index_to_id.get(&tc_index).cloned();
445                                            }
446
447                                            if let (Some(id), Some(name)) =
448                                                (id.clone(), name.clone())
449                                            {
450                                                if !started_ids.contains(&id) {
451                                                    started_ids.insert(id.clone());
452                                                    q.push_back(StreamingEvent::ToolCallStarted {
453                                                        tool_call_id: id.clone(),
454                                                        tool_name: name,
455                                                        index: Some(tc_index),
456                                                    });
457                                                }
458                                            }
459
460                                            if let (Some(id), Some(arguments)) = (id, args) {
461                                                q.push_back(StreamingEvent::PartialToolCall {
462                                                    tool_call_id: id,
463                                                    arguments,
464                                                    index: Some(tc_index),
465                                                    is_complete: None,
466                                                });
467                                            }
468                                        }
469                                    }
470                                }
471
472                                if let Some(ev) = q.pop_front() {
473                                    return Some((
474                                        Ok(ev),
475                                        (input, q, ended, started_ids, index_to_id),
476                                    ));
477                                }
478                                continue;
479                            }
480                            Err(e) => {
481                                return Some((Err(e), (input, q, ended, started_ids, index_to_id)))
482                            }
483                        }
484                    }
485
486                    ended = true;
487                    Some((
488                        Ok(StreamingEvent::StreamEnd {
489                            finish_reason: None,
490                        }),
491                        (input, q, ended, started_ids, index_to_id),
492                    ))
493                }
494            },
495        );
496
497        Ok(Box::pin(stream))
498    }
499}