agent_chain_core/tracers/
log_stream.rs

1//! Tracer that streams run logs to a stream.
2//!
3//! This module provides a tracer that streams run logs using JSON patches.
4//! Mirrors `langchain_core.tracers.log_stream`.
5
6use std::collections::HashMap;
7use std::fmt;
8use std::sync::{Arc, Mutex};
9
10use chrono::{DateTime, Utc};
11use serde::{Deserialize, Serialize};
12use serde_json::Value;
13use uuid::Uuid;
14
15use crate::tracers::base::BaseTracer;
16use crate::tracers::core::{SchemaFormat, TracerCore, TracerCoreConfig};
17use crate::tracers::memory_stream::{MemoryStream, ReceiveStream, SendStream};
18use crate::tracers::schemas::Run;
19use crate::tracers::streaming::StreamingCallbackHandler;
20
21/// A single entry in the run log.
22#[derive(Debug, Clone, Serialize, Deserialize)]
23pub struct LogEntry {
24    /// ID of the sub-run.
25    pub id: String,
26    /// Name of the object being run.
27    pub name: String,
28    /// Type of the object being run, eg. prompt, chain, llm, etc.
29    #[serde(rename = "type")]
30    pub run_type: String,
31    /// List of tags for the run.
32    pub tags: Vec<String>,
33    /// Key-value pairs of metadata for the run.
34    pub metadata: HashMap<String, Value>,
35    /// ISO-8601 timestamp of when the run started.
36    pub start_time: String,
37    /// List of LLM tokens streamed by this run, if applicable.
38    pub streamed_output_str: Vec<String>,
39    /// List of output chunks streamed by this run, if available.
40    pub streamed_output: Vec<Value>,
41    /// Inputs to this run. Not available currently via astream_log.
42    #[serde(skip_serializing_if = "Option::is_none")]
43    pub inputs: Option<Value>,
44    /// Final output of this run. Only available after the run has finished successfully.
45    pub final_output: Option<Value>,
46    /// ISO-8601 timestamp of when the run ended. Only available after the run has finished.
47    pub end_time: Option<String>,
48}
49
50impl LogEntry {
51    /// Create a new log entry.
52    pub fn new(
53        id: String,
54        name: String,
55        run_type: String,
56        tags: Vec<String>,
57        metadata: HashMap<String, Value>,
58        start_time: DateTime<Utc>,
59    ) -> Self {
60        Self {
61            id,
62            name,
63            run_type,
64            tags,
65            metadata,
66            start_time: start_time.format("%Y-%m-%dT%H:%M:%S%.3fZ").to_string(),
67            streamed_output_str: Vec::new(),
68            streamed_output: Vec::new(),
69            inputs: None,
70            final_output: None,
71            end_time: None,
72        }
73    }
74}
75
76/// State of the run.
77#[derive(Debug, Clone, Serialize, Deserialize)]
78pub struct RunState {
79    /// ID of the run.
80    pub id: String,
81    /// List of output chunks streamed by Runnable.stream()
82    pub streamed_output: Vec<Value>,
83    /// Final output of the run, usually the result of aggregating streamed_output.
84    pub final_output: Option<Value>,
85    /// Name of the object being run.
86    pub name: String,
87    /// Type of the object being run, eg. prompt, chain, llm, etc.
88    #[serde(rename = "type")]
89    pub run_type: String,
90    /// Map of run names to sub-runs.
91    pub logs: HashMap<String, LogEntry>,
92}
93
94impl RunState {
95    /// Create a new run state.
96    pub fn new(id: String, name: String, run_type: String) -> Self {
97        Self {
98            id,
99            streamed_output: Vec::new(),
100            final_output: None,
101            name,
102            run_type,
103            logs: HashMap::new(),
104        }
105    }
106}
107
108/// A JSON patch operation.
109#[derive(Debug, Clone, Serialize, Deserialize)]
110pub struct JsonPatchOp {
111    /// The operation type (add, replace, remove, etc.)
112    pub op: String,
113    /// The path to apply the operation to.
114    pub path: String,
115    /// The value for the operation.
116    #[serde(skip_serializing_if = "Option::is_none")]
117    pub value: Option<Value>,
118}
119
120impl JsonPatchOp {
121    /// Create a new add operation.
122    pub fn add(path: impl Into<String>, value: Value) -> Self {
123        Self {
124            op: "add".to_string(),
125            path: path.into(),
126            value: Some(value),
127        }
128    }
129
130    /// Create a new replace operation.
131    pub fn replace(path: impl Into<String>, value: Value) -> Self {
132        Self {
133            op: "replace".to_string(),
134            path: path.into(),
135            value: Some(value),
136        }
137    }
138
139    /// Create a new remove operation.
140    pub fn remove(path: impl Into<String>) -> Self {
141        Self {
142            op: "remove".to_string(),
143            path: path.into(),
144            value: None,
145        }
146    }
147}
148
149/// Patch to the run log.
150#[derive(Debug, Clone, Serialize, Deserialize)]
151pub struct RunLogPatch {
152    /// List of JSONPatch operations.
153    pub ops: Vec<JsonPatchOp>,
154}
155
156impl RunLogPatch {
157    /// Create a new run log patch.
158    pub fn new(ops: Vec<JsonPatchOp>) -> Self {
159        Self { ops }
160    }
161
162    /// Create a patch from a single operation.
163    pub fn from_op(op: JsonPatchOp) -> Self {
164        Self { ops: vec![op] }
165    }
166
167    /// Create a patch from multiple operations.
168    pub fn from_ops(ops: impl IntoIterator<Item = JsonPatchOp>) -> Self {
169        Self {
170            ops: ops.into_iter().collect(),
171        }
172    }
173}
174
175impl fmt::Display for RunLogPatch {
176    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
177        write!(f, "RunLogPatch({:?})", self.ops)
178    }
179}
180
181/// Run log with full state.
182#[derive(Debug, Clone, Serialize, Deserialize)]
183pub struct RunLog {
184    /// List of JSONPatch operations.
185    pub ops: Vec<JsonPatchOp>,
186    /// Current state of the log.
187    pub state: Option<RunState>,
188}
189
190impl RunLog {
191    /// Create a new run log.
192    pub fn new(ops: Vec<JsonPatchOp>, state: Option<RunState>) -> Self {
193        Self { ops, state }
194    }
195
196    /// Apply a patch to the run log.
197    pub fn apply_patch(&mut self, patch: RunLogPatch) {
198        self.ops.extend(patch.ops.clone());
199
200        // Apply the operations to the state
201        if let Some(ref mut state) = self.state {
202            for op in patch.ops {
203                Self::apply_op_to_state(state, &op);
204            }
205        }
206    }
207
208    fn apply_op_to_state(state: &mut RunState, op: &JsonPatchOp) {
209        let path_parts: Vec<&str> = op.path.split('/').filter(|s| !s.is_empty()).collect();
210
211        match op.op.as_str() {
212            "replace" => {
213                if op.path.is_empty() || op.path == "/" {
214                    // Replace entire state
215                    if let Some(value) = &op.value
216                        && let Ok(new_state) = serde_json::from_value::<RunState>(value.clone())
217                    {
218                        *state = new_state;
219                    }
220                } else if path_parts.first() == Some(&"final_output") {
221                    state.final_output = op.value.clone();
222                }
223            }
224            "add" => {
225                if path_parts.len() >= 2 {
226                    match path_parts[0] {
227                        "logs" => {
228                            if path_parts.len() == 2 {
229                                // Adding a new log entry
230                                if let Some(value) = &op.value
231                                    && let Ok(entry) =
232                                        serde_json::from_value::<LogEntry>(value.clone())
233                                {
234                                    state.logs.insert(path_parts[1].to_string(), entry);
235                                }
236                            } else if path_parts.len() >= 3 {
237                                // Updating an existing log entry field
238                                if let Some(entry) = state.logs.get_mut(path_parts[1]) {
239                                    match path_parts[2] {
240                                        "streamed_output"
241                                            if path_parts.len() == 4 && path_parts[3] == "-" =>
242                                        {
243                                            if let Some(value) = &op.value {
244                                                entry.streamed_output.push(value.clone());
245                                            }
246                                        }
247                                        "streamed_output_str"
248                                            if path_parts.len() == 4 && path_parts[3] == "-" =>
249                                        {
250                                            if let Some(value) = &op.value
251                                                && let Some(s) = value.as_str()
252                                            {
253                                                entry.streamed_output_str.push(s.to_string());
254                                            }
255                                        }
256                                        "final_output" => {
257                                            entry.final_output = op.value.clone();
258                                        }
259                                        "end_time" => {
260                                            entry.end_time = op
261                                                .value
262                                                .clone()
263                                                .and_then(|v| v.as_str().map(String::from));
264                                        }
265                                        "inputs" => {
266                                            entry.inputs = op.value.clone();
267                                        }
268                                        _ => {}
269                                    }
270                                }
271                            }
272                        }
273                        "streamed_output" if path_parts.len() == 2 && path_parts[1] == "-" => {
274                            if let Some(value) = &op.value {
275                                state.streamed_output.push(value.clone());
276                            }
277                        }
278                        "final_output" => {
279                            state.final_output = op.value.clone();
280                        }
281                        _ => {}
282                    }
283                }
284            }
285            _ => {}
286        }
287    }
288}
289
290impl fmt::Display for RunLog {
291    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
292        write!(f, "RunLog({:?})", self.state)
293    }
294}
295
296/// Tracer that streams run logs to a stream.
297pub struct LogStreamCallbackHandler {
298    /// The tracer configuration.
299    config: TracerCoreConfig,
300    /// The run map.
301    run_map: HashMap<String, Run>,
302    /// The order map.
303    order_map: HashMap<Uuid, (Uuid, String)>,
304    /// Whether to auto-close the stream when the root run finishes.
305    auto_close: bool,
306    /// Only include runs from Runnables with matching names.
307    include_names: Option<Vec<String>>,
308    /// Only include runs from Runnables with matching types.
309    include_types: Option<Vec<String>>,
310    /// Only include runs from Runnables with matching tags.
311    include_tags: Option<Vec<String>>,
312    /// Exclude runs from Runnables with matching names.
313    exclude_names: Option<Vec<String>>,
314    /// Exclude runs from Runnables with matching types.
315    exclude_types: Option<Vec<String>>,
316    /// Exclude runs from Runnables with matching tags.
317    exclude_tags: Option<Vec<String>>,
318    /// The send stream for patches.
319    send_stream: SendStream<RunLogPatch>,
320    /// The receive stream for patches.
321    receive_stream: Option<ReceiveStream<RunLogPatch>>,
322    /// Map of run ID to key name.
323    key_map_by_run_id: HashMap<Uuid, String>,
324    /// Map of name to counter.
325    counter_map_by_name: HashMap<String, usize>,
326    /// The root run ID.
327    root_id: Option<Uuid>,
328    /// Lock for thread safety.
329    lock: Arc<Mutex<()>>,
330}
331
332impl fmt::Debug for LogStreamCallbackHandler {
333    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
334        f.debug_struct("LogStreamCallbackHandler")
335            .field("config", &self.config)
336            .field("auto_close", &self.auto_close)
337            .field("include_names", &self.include_names)
338            .field("include_types", &self.include_types)
339            .field("include_tags", &self.include_tags)
340            .field("exclude_names", &self.exclude_names)
341            .field("exclude_types", &self.exclude_types)
342            .field("exclude_tags", &self.exclude_tags)
343            .field("root_id", &self.root_id)
344            .finish()
345    }
346}
347
348/// Configuration for LogStreamCallbackHandler.
349#[derive(Debug, Clone, Default)]
350pub struct LogStreamConfig {
351    /// Whether to auto-close the stream when the root run finishes.
352    pub auto_close: bool,
353    /// Only include runs from Runnables with matching names.
354    pub include_names: Option<Vec<String>>,
355    /// Only include runs from Runnables with matching types.
356    pub include_types: Option<Vec<String>>,
357    /// Only include runs from Runnables with matching tags.
358    pub include_tags: Option<Vec<String>>,
359    /// Exclude runs from Runnables with matching names.
360    pub exclude_names: Option<Vec<String>>,
361    /// Exclude runs from Runnables with matching types.
362    pub exclude_types: Option<Vec<String>>,
363    /// Exclude runs from Runnables with matching tags.
364    pub exclude_tags: Option<Vec<String>>,
365    /// The schema format to use.
366    pub schema_format: SchemaFormat,
367}
368
369impl LogStreamCallbackHandler {
370    /// Create a new LogStreamCallbackHandler.
371    pub fn new(config: LogStreamConfig) -> Self {
372        let stream: MemoryStream<RunLogPatch> = MemoryStream::new();
373        let send_stream = stream.get_send_stream();
374        let receive_stream = stream.get_receive_stream();
375
376        Self {
377            config: TracerCoreConfig {
378                schema_format: config.schema_format,
379                log_missing_parent: true,
380            },
381            run_map: HashMap::new(),
382            order_map: HashMap::new(),
383            auto_close: config.auto_close,
384            include_names: config.include_names,
385            include_types: config.include_types,
386            include_tags: config.include_tags,
387            exclude_names: config.exclude_names,
388            exclude_types: config.exclude_types,
389            exclude_tags: config.exclude_tags,
390            send_stream,
391            receive_stream: Some(receive_stream),
392            key_map_by_run_id: HashMap::new(),
393            counter_map_by_name: HashMap::new(),
394            root_id: None,
395            lock: Arc::new(Mutex::new(())),
396        }
397    }
398
399    /// Take the receive stream. Can only be called once.
400    pub fn take_receive_stream(&mut self) -> Option<ReceiveStream<RunLogPatch>> {
401        self.receive_stream.take()
402    }
403
404    /// Get the root run ID.
405    pub fn root_id(&self) -> Option<Uuid> {
406        self.root_id
407    }
408
409    /// Send patches to the stream.
410    ///
411    /// # Returns
412    ///
413    /// `true` if the patches were sent successfully, `false` otherwise.
414    pub fn send(&self, ops: Vec<JsonPatchOp>) -> bool {
415        self.send_stream.send_nowait(RunLogPatch::new(ops)).is_ok()
416    }
417
418    /// Check if a Run should be included in the log.
419    pub fn include_run(&self, run: &Run) -> bool {
420        if Some(run.id) == self.root_id {
421            return false;
422        }
423
424        let run_tags = run.tags.clone().unwrap_or_default();
425
426        let mut include = self.include_names.is_none()
427            && self.include_types.is_none()
428            && self.include_tags.is_none();
429
430        if let Some(ref names) = self.include_names {
431            include = include || names.contains(&run.name);
432        }
433        if let Some(ref types) = self.include_types {
434            include = include || types.contains(&run.run_type);
435        }
436        if let Some(ref tags) = self.include_tags {
437            include = include || run_tags.iter().any(|t| tags.contains(t));
438        }
439
440        if let Some(ref names) = self.exclude_names {
441            include = include && !names.contains(&run.name);
442        }
443        if let Some(ref types) = self.exclude_types {
444            include = include && !types.contains(&run.run_type);
445        }
446        if let Some(ref tags) = self.exclude_tags {
447            include = include && !run_tags.iter().any(|t| tags.contains(t));
448        }
449
450        include
451    }
452
453    /// Get the standardized inputs for a run.
454    fn get_standardized_inputs(&self, run: &Run) -> Option<Value> {
455        match self.config.schema_format {
456            SchemaFormat::Original | SchemaFormat::OriginalChat => {
457                Some(serde_json::to_value(&run.inputs).unwrap_or_default())
458            }
459            SchemaFormat::StreamingEvents => {
460                if run.run_type == "retriever"
461                    || run.run_type == "llm"
462                    || run.run_type == "chat_model"
463                {
464                    Some(serde_json::to_value(&run.inputs).unwrap_or_default())
465                } else {
466                    run.inputs.get("input").cloned()
467                }
468            }
469        }
470    }
471
472    /// Get the standardized outputs for a run.
473    fn get_standardized_outputs(&self, run: &Run) -> Option<Value> {
474        let outputs = run.outputs.as_ref()?;
475
476        match self.config.schema_format {
477            SchemaFormat::Original | SchemaFormat::OriginalChat => {
478                if run.run_type == "prompt" {
479                    outputs.get("output").cloned()
480                } else {
481                    Some(serde_json::to_value(outputs).unwrap_or_default())
482                }
483            }
484            SchemaFormat::StreamingEvents => {
485                if run.run_type == "retriever"
486                    || run.run_type == "llm"
487                    || run.run_type == "chat_model"
488                {
489                    Some(serde_json::to_value(outputs).unwrap_or_default())
490                } else {
491                    outputs.get("output").cloned()
492                }
493            }
494        }
495    }
496}
497
498impl TracerCore for LogStreamCallbackHandler {
499    fn config(&self) -> &TracerCoreConfig {
500        &self.config
501    }
502
503    fn config_mut(&mut self) -> &mut TracerCoreConfig {
504        &mut self.config
505    }
506
507    fn run_map(&self) -> &HashMap<String, Run> {
508        &self.run_map
509    }
510
511    fn run_map_mut(&mut self) -> &mut HashMap<String, Run> {
512        &mut self.run_map
513    }
514
515    fn order_map(&self) -> &HashMap<Uuid, (Uuid, String)> {
516        &self.order_map
517    }
518
519    fn order_map_mut(&mut self) -> &mut HashMap<Uuid, (Uuid, String)> {
520        &mut self.order_map
521    }
522
523    fn persist_run(&mut self, _run: &Run) {
524        // This is a legacy method only called once for an entire run tree
525        // therefore not useful here
526    }
527
528    fn on_run_create(&mut self, run: &Run) {
529        if self.root_id.is_none() {
530            self.root_id = Some(run.id);
531            let state = RunState::new(run.id.to_string(), run.name.clone(), run.run_type.clone());
532            if !self.send(vec![JsonPatchOp::replace(
533                "",
534                serde_json::to_value(state).unwrap_or_default(),
535            )]) {
536                return;
537            }
538        }
539
540        if !self.include_run(run) {
541            return;
542        }
543
544        // Determine key name with counter
545        let _lock = self.lock.lock().unwrap();
546        let count = self
547            .counter_map_by_name
548            .entry(run.name.clone())
549            .or_insert(0);
550        *count += 1;
551        let key = if *count == 1 {
552            run.name.clone()
553        } else {
554            format!("{}:{}", run.name, count)
555        };
556        self.key_map_by_run_id.insert(run.id, key.clone());
557
558        let metadata = run
559            .extra
560            .get("metadata")
561            .and_then(|v| serde_json::from_value::<HashMap<String, Value>>(v.clone()).ok())
562            .unwrap_or_default();
563
564        let mut entry = LogEntry::new(
565            run.id.to_string(),
566            run.name.clone(),
567            run.run_type.clone(),
568            run.tags.clone().unwrap_or_default(),
569            metadata,
570            run.start_time,
571        );
572
573        if self.config.schema_format == SchemaFormat::StreamingEvents {
574            entry.inputs = self.get_standardized_inputs(run);
575        }
576
577        self.send(vec![JsonPatchOp::add(
578            format!("/logs/{}", key),
579            serde_json::to_value(entry).unwrap_or_default(),
580        )]);
581    }
582
583    fn on_run_update(&mut self, run: &Run) {
584        let key = match self.key_map_by_run_id.get(&run.id) {
585            Some(k) => k.clone(),
586            None => {
587                // Check if this is the root run ending
588                if run.id == self.root_id.unwrap_or(Uuid::nil()) && self.auto_close {
589                    let _ = self.send_stream.close();
590                }
591                return;
592            }
593        };
594
595        let mut ops = Vec::new();
596
597        if self.config.schema_format == SchemaFormat::StreamingEvents
598            && let Some(inputs) = self.get_standardized_inputs(run)
599        {
600            ops.push(JsonPatchOp::replace(
601                format!("/logs/{}/inputs", key),
602                inputs,
603            ));
604        }
605
606        if let Some(outputs) = self.get_standardized_outputs(run) {
607            ops.push(JsonPatchOp::add(
608                format!("/logs/{}/final_output", key),
609                outputs,
610            ));
611        }
612
613        if let Some(end_time) = run.end_time {
614            ops.push(JsonPatchOp::add(
615                format!("/logs/{}/end_time", key),
616                Value::String(end_time.format("%Y-%m-%dT%H:%M:%S%.3fZ").to_string()),
617            ));
618        }
619
620        self.send(ops);
621
622        if run.id == self.root_id.unwrap_or(Uuid::nil()) && self.auto_close {
623            let _ = self.send_stream.close();
624        }
625    }
626
627    fn on_llm_new_token(&mut self, run: &Run, token: &str, chunk: Option<&dyn std::any::Any>) {
628        let key = match self.key_map_by_run_id.get(&run.id) {
629            Some(k) => k.clone(),
630            None => return,
631        };
632
633        let chunk_value = if let Some(chunk_any) = chunk {
634            if let Some(gen_chunk) = chunk_any.downcast_ref::<crate::outputs::GenerationChunk>() {
635                serde_json::to_value(gen_chunk).unwrap_or(Value::String(token.to_string()))
636            } else if let Some(chat_chunk) =
637                chunk_any.downcast_ref::<crate::outputs::ChatGenerationChunk>()
638            {
639                // For chat chunks, include the message
640                serde_json::to_value(&chat_chunk.message)
641                    .unwrap_or(Value::String(token.to_string()))
642            } else {
643                Value::String(token.to_string())
644            }
645        } else {
646            Value::String(token.to_string())
647        };
648
649        self.send(vec![
650            JsonPatchOp::add(
651                format!("/logs/{}/streamed_output_str/-", key),
652                Value::String(token.to_string()),
653            ),
654            JsonPatchOp::add(format!("/logs/{}/streamed_output/-", key), chunk_value),
655        ]);
656    }
657}
658
659impl BaseTracer for LogStreamCallbackHandler {
660    fn persist_run_impl(&mut self, _run: &Run) {
661        // This is a legacy method only called once for an entire run tree
662        // therefore not useful here
663    }
664}
665
666impl<T: Send + 'static> StreamingCallbackHandler<T> for LogStreamCallbackHandler {
667    fn tap_output_aiter(
668        &self,
669        run_id: Uuid,
670        output: std::pin::Pin<Box<dyn futures::Stream<Item = T> + Send>>,
671    ) -> std::pin::Pin<Box<dyn futures::Stream<Item = T> + Send>> {
672        use futures::StreamExt;
673
674        let root_id = self.root_id;
675        let key = self.key_map_by_run_id.get(&run_id).cloned();
676        let send_stream = self.send_stream.clone();
677
678        Box::pin(futures::stream::unfold(
679            (output, run_id, root_id, key, send_stream),
680            |(mut stream, run_id, root_id, key, sender)| async move {
681                let item = stream.next().await?;
682
683                // Root run is handled separately
684                // If we can't find the run key, silently ignore
685                if run_id != root_id.unwrap_or(Uuid::nil())
686                    && let Some(ref k) = key
687                {
688                    // Note: We can't easily serialize generic T here
689                    // This would need a more sophisticated implementation
690                    // for real-world use with proper chunk serialization
691                    let _ = sender.send_nowait(RunLogPatch::new(vec![JsonPatchOp::add(
692                        format!("/logs/{}/streamed_output/-", k),
693                        Value::Null, // Placeholder - real implementation would serialize the chunk
694                    )]));
695                }
696
697                Some((item, (stream, run_id, root_id, key, sender)))
698            },
699        ))
700    }
701
702    fn tap_output_iter(
703        &self,
704        run_id: Uuid,
705        output: Box<dyn Iterator<Item = T> + Send>,
706    ) -> Box<dyn Iterator<Item = T> + Send> {
707        let root_id = self.root_id;
708        let key = self.key_map_by_run_id.get(&run_id).cloned();
709        let send_stream = self.send_stream.clone();
710
711        Box::new(TappedIterator {
712            inner: output,
713            run_id,
714            root_id,
715            key,
716            send_stream,
717        })
718    }
719}
720
721struct TappedIterator<T> {
722    inner: Box<dyn Iterator<Item = T> + Send>,
723    run_id: Uuid,
724    root_id: Option<Uuid>,
725    key: Option<String>,
726    send_stream: SendStream<RunLogPatch>,
727}
728
729impl<T> Iterator for TappedIterator<T> {
730    type Item = T;
731
732    fn next(&mut self) -> Option<Self::Item> {
733        let item = self.inner.next()?;
734
735        // Root run is handled separately
736        if self.run_id != self.root_id.unwrap_or(Uuid::nil())
737            && let Some(ref k) = self.key
738        {
739            let _ = self
740                .send_stream
741                .send_nowait(RunLogPatch::new(vec![JsonPatchOp::add(
742                    format!("/logs/{}/streamed_output/-", k),
743                    Value::Null, // Placeholder
744                )]));
745        }
746
747        Some(item)
748    }
749}
750
751#[cfg(test)]
752mod tests {
753    use super::*;
754
755    #[test]
756    fn test_log_entry_new() {
757        let entry = LogEntry::new(
758            "test-id".to_string(),
759            "test".to_string(),
760            "chain".to_string(),
761            vec!["tag1".to_string()],
762            HashMap::new(),
763            Utc::now(),
764        );
765
766        assert_eq!(entry.id, "test-id");
767        assert_eq!(entry.name, "test");
768        assert_eq!(entry.run_type, "chain");
769        assert!(entry.final_output.is_none());
770    }
771
772    #[test]
773    fn test_run_state_new() {
774        let state = RunState::new(
775            "state-id".to_string(),
776            "test".to_string(),
777            "chain".to_string(),
778        );
779
780        assert_eq!(state.id, "state-id");
781        assert!(state.logs.is_empty());
782        assert!(state.final_output.is_none());
783    }
784
785    #[test]
786    fn test_json_patch_ops() {
787        let add_op = JsonPatchOp::add("/path", Value::String("value".to_string()));
788        assert_eq!(add_op.op, "add");
789        assert_eq!(add_op.path, "/path");
790
791        let replace_op = JsonPatchOp::replace("/path", Value::Number(42.into()));
792        assert_eq!(replace_op.op, "replace");
793
794        let remove_op = JsonPatchOp::remove("/path");
795        assert_eq!(remove_op.op, "remove");
796        assert!(remove_op.value.is_none());
797    }
798
799    #[test]
800    fn test_run_log_apply_patch() {
801        let mut log = RunLog::new(
802            vec![],
803            Some(RunState::new(
804                "id".to_string(),
805                "test".to_string(),
806                "chain".to_string(),
807            )),
808        );
809
810        let patch = RunLogPatch::new(vec![JsonPatchOp::add(
811            "/logs/entry1",
812            serde_json::to_value(LogEntry::new(
813                "entry1".to_string(),
814                "sub".to_string(),
815                "tool".to_string(),
816                vec![],
817                HashMap::new(),
818                Utc::now(),
819            ))
820            .unwrap(),
821        )]);
822
823        log.apply_patch(patch);
824
825        assert!(log.state.as_ref().unwrap().logs.contains_key("entry1"));
826    }
827
828    #[test]
829    fn test_log_stream_handler_include_run() {
830        let handler = LogStreamCallbackHandler::new(LogStreamConfig {
831            include_names: Some(vec!["allowed".to_string()]),
832            ..Default::default()
833        });
834
835        let run = Run {
836            name: "allowed".to_string(),
837            ..Default::default()
838        };
839        assert!(handler.include_run(&run));
840
841        let run = Run {
842            name: "not_allowed".to_string(),
843            ..Default::default()
844        };
845        assert!(!handler.include_run(&run));
846    }
847
848    #[test]
849    fn test_log_stream_handler_exclude_run() {
850        let handler = LogStreamCallbackHandler::new(LogStreamConfig {
851            exclude_names: Some(vec!["excluded".to_string()]),
852            ..Default::default()
853        });
854
855        let run = Run {
856            name: "excluded".to_string(),
857            ..Default::default()
858        };
859        assert!(!handler.include_run(&run));
860
861        let run = Run {
862            name: "allowed".to_string(),
863            ..Default::default()
864        };
865        assert!(handler.include_run(&run));
866    }
867
868    #[test]
869    fn test_log_stream_handler_include_tags() {
870        let handler = LogStreamCallbackHandler::new(LogStreamConfig {
871            include_tags: Some(vec!["important".to_string()]),
872            ..Default::default()
873        });
874
875        let run = Run {
876            tags: Some(vec!["important".to_string(), "other".to_string()]),
877            ..Default::default()
878        };
879        assert!(handler.include_run(&run));
880
881        let run = Run {
882            tags: Some(vec!["other".to_string()]),
883            ..Default::default()
884        };
885        assert!(!handler.include_run(&run));
886    }
887}