agent_chain_core/tracers/
core.rs

1//! Utilities for the tracer core.
2//!
3//! This module provides the TracerCore trait with common methods for tracers.
4//! Mirrors `langchain_core.tracers.core`.
5
6use std::collections::HashMap;
7use std::fmt::Debug;
8
9use chrono::Utc;
10use serde_json::Value;
11use uuid::Uuid;
12
13use crate::messages::BaseMessage;
14use crate::outputs::{ChatGenerationChunk, GenerationChunk, LLMResult};
15use crate::tracers::schemas::{Run, RunEvent};
16
17/// Schema format type for tracers.
18#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
19pub enum SchemaFormat {
20    /// Original format used by all current tracers.
21    #[default]
22    Original,
23    /// Streaming events format for internal usage.
24    StreamingEvents,
25    /// Original format with chat model support.
26    OriginalChat,
27}
28
29/// Configuration for TracerCore.
30#[derive(Debug, Clone)]
31pub struct TracerCoreConfig {
32    /// The schema format to use.
33    pub schema_format: SchemaFormat,
34    /// Whether to log missing parent warnings.
35    pub log_missing_parent: bool,
36}
37
38impl Default for TracerCoreConfig {
39    fn default() -> Self {
40        Self {
41            schema_format: SchemaFormat::Original,
42            log_missing_parent: true,
43        }
44    }
45}
46
47/// Abstract base trait for tracers.
48///
49/// This trait provides common methods and reusable methods for tracers.
50pub trait TracerCore: Send + Sync + Debug {
51    /// Get the configuration for this tracer.
52    fn config(&self) -> &TracerCoreConfig;
53
54    /// Get the mutable configuration for this tracer.
55    fn config_mut(&mut self) -> &mut TracerCoreConfig;
56
57    /// Get the run map.
58    fn run_map(&self) -> &HashMap<String, Run>;
59
60    /// Get the mutable run map.
61    fn run_map_mut(&mut self) -> &mut HashMap<String, Run>;
62
63    /// Get the order map (run_id -> (trace_id, dotted_order)).
64    fn order_map(&self) -> &HashMap<Uuid, (Uuid, String)>;
65
66    /// Get the mutable order map.
67    fn order_map_mut(&mut self) -> &mut HashMap<Uuid, (Uuid, String)>;
68
69    /// Persist a run.
70    fn persist_run(&mut self, run: &Run);
71
72    /// Add a child run to a parent run.
73    fn add_child_run(&mut self, parent_run: &mut Run, child_run: Run) {
74        parent_run.child_runs.push(child_run);
75    }
76
77    /// Get the stacktrace of an error.
78    fn get_stacktrace(error: &dyn std::error::Error) -> String {
79        error.to_string()
80    }
81
82    /// Start a trace for a run.
83    fn start_trace(&mut self, run: &mut Run) {
84        let current_dotted_order =
85            format!("{}{}", run.start_time.format("%Y%m%dT%H%M%S%fZ"), run.id);
86
87        if let Some(parent_run_id) = run.parent_run_id {
88            if let Some((trace_id, parent_dotted_order)) =
89                self.order_map().get(&parent_run_id).cloned()
90            {
91                run.trace_id = Some(trace_id);
92                run.dotted_order =
93                    Some(format!("{}.{}", parent_dotted_order, current_dotted_order));
94
95                if let Some(parent_run) = self.run_map_mut().get_mut(&parent_run_id.to_string()) {
96                    let child_clone = run.clone();
97                    parent_run.child_runs.push(child_clone);
98                }
99            } else {
100                if self.config().log_missing_parent {
101                    tracing::debug!(
102                        "Parent run {} not found for run {}. Treating as a root run.",
103                        parent_run_id,
104                        run.id
105                    );
106                }
107                run.parent_run_id = None;
108                run.trace_id = Some(run.id);
109                run.dotted_order = Some(current_dotted_order.clone());
110            }
111        } else {
112            run.trace_id = Some(run.id);
113            run.dotted_order = Some(current_dotted_order.clone());
114        }
115
116        let trace_id = run.trace_id.unwrap_or(run.id);
117        let dotted_order = run.dotted_order.clone().unwrap_or(current_dotted_order);
118
119        self.order_map_mut()
120            .insert(run.id, (trace_id, dotted_order));
121        self.run_map_mut().insert(run.id.to_string(), run.clone());
122    }
123
124    /// End a trace for a run.
125    fn end_trace(&mut self, run: &Run) {
126        self.run_map_mut().remove(&run.id.to_string());
127    }
128
129    /// Get a run by ID.
130    fn get_run(&self, run_id: Uuid, run_type: Option<&[&str]>) -> Result<Run, TracerError> {
131        let run = self
132            .run_map()
133            .get(&run_id.to_string())
134            .cloned()
135            .ok_or(TracerError::RunNotFound(run_id))?;
136
137        if let Some(expected_types) = run_type
138            && !expected_types.contains(&run.run_type.as_str())
139        {
140            return Err(TracerError::WrongRunType {
141                run_id,
142                expected: expected_types.iter().map(|s| s.to_string()).collect(),
143                actual: run.run_type.clone(),
144            });
145        }
146
147        Ok(run)
148    }
149
150    /// Get a mutable run by ID.
151    fn get_run_mut(&mut self, run_id: Uuid) -> Option<&mut Run> {
152        self.run_map_mut().get_mut(&run_id.to_string())
153    }
154
155    /// Create a chat model run.
156    #[allow(clippy::too_many_arguments)]
157    fn create_chat_model_run(
158        &self,
159        serialized: HashMap<String, Value>,
160        messages: &[Vec<BaseMessage>],
161        run_id: Uuid,
162        parent_run_id: Option<Uuid>,
163        tags: Option<Vec<String>>,
164        metadata: Option<HashMap<String, Value>>,
165        name: Option<String>,
166        extra: HashMap<String, Value>,
167    ) -> Result<Run, TracerError> {
168        let schema_format = self.config().schema_format;
169        if schema_format != SchemaFormat::StreamingEvents
170            && schema_format != SchemaFormat::OriginalChat
171        {
172            return Err(TracerError::UnsupportedSchemaFormat(
173                "Chat model tracing is not supported in original format".to_string(),
174            ));
175        }
176
177        let start_time = Utc::now();
178        let mut run_extra = extra;
179        if let Some(meta) = metadata {
180            run_extra.insert(
181                "metadata".to_string(),
182                serde_json::to_value(meta).unwrap_or_default(),
183            );
184        }
185
186        let inputs: HashMap<String, Value> = [(
187            "messages".to_string(),
188            serde_json::to_value(
189                messages
190                    .iter()
191                    .map(|batch| {
192                        batch
193                            .iter()
194                            .map(|msg| serde_json::to_value(msg).unwrap_or_default())
195                            .collect::<Vec<_>>()
196                    })
197                    .collect::<Vec<_>>(),
198            )
199            .unwrap_or_default(),
200        )]
201        .into_iter()
202        .collect();
203
204        let run = Run {
205            id: run_id,
206            name: name.unwrap_or_else(|| "ChatModel".to_string()),
207            run_type: "chat_model".to_string(),
208            parent_run_id,
209            trace_id: None,
210            dotted_order: None,
211            start_time,
212            end_time: None,
213            inputs,
214            outputs: None,
215            error: None,
216            serialized,
217            extra: run_extra,
218            events: vec![RunEvent::with_time("start", start_time)],
219            tags,
220            child_runs: Vec::new(),
221            session_name: None,
222            reference_example_id: None,
223        };
224
225        Ok(run)
226    }
227
228    /// Create an LLM run.
229    #[allow(clippy::too_many_arguments)]
230    fn create_llm_run(
231        &self,
232        serialized: HashMap<String, Value>,
233        prompts: &[String],
234        run_id: Uuid,
235        parent_run_id: Option<Uuid>,
236        tags: Option<Vec<String>>,
237        metadata: Option<HashMap<String, Value>>,
238        name: Option<String>,
239        extra: HashMap<String, Value>,
240    ) -> Run {
241        let start_time = Utc::now();
242        let mut run_extra = extra;
243        if let Some(meta) = metadata {
244            run_extra.insert(
245                "metadata".to_string(),
246                serde_json::to_value(meta).unwrap_or_default(),
247            );
248        }
249
250        let inputs: HashMap<String, Value> = [(
251            "prompts".to_string(),
252            serde_json::to_value(prompts).unwrap_or_default(),
253        )]
254        .into_iter()
255        .collect();
256
257        Run {
258            id: run_id,
259            name: name.unwrap_or_else(|| "LLM".to_string()),
260            run_type: "llm".to_string(),
261            parent_run_id,
262            trace_id: None,
263            dotted_order: None,
264            start_time,
265            end_time: None,
266            inputs,
267            outputs: None,
268            error: None,
269            serialized,
270            extra: run_extra,
271            events: vec![RunEvent::with_time("start", start_time)],
272            tags: Some(tags.unwrap_or_default()),
273            child_runs: Vec::new(),
274            session_name: None,
275            reference_example_id: None,
276        }
277    }
278
279    /// Process an LLM run with a new token event.
280    fn llm_run_with_token_event(
281        &mut self,
282        token: &str,
283        run_id: Uuid,
284        chunk: Option<&dyn std::any::Any>,
285        _parent_run_id: Option<Uuid>,
286    ) -> Result<Run, TracerError> {
287        let run = self
288            .run_map_mut()
289            .get_mut(&run_id.to_string())
290            .ok_or(TracerError::RunNotFound(run_id))?;
291
292        // Check run type
293        if run.run_type != "llm" && run.run_type != "chat_model" {
294            return Err(TracerError::WrongRunType {
295                run_id,
296                expected: vec!["llm".to_string(), "chat_model".to_string()],
297                actual: run.run_type.clone(),
298            });
299        }
300
301        let mut event_kwargs: HashMap<String, Value> = HashMap::new();
302        event_kwargs.insert("token".to_string(), Value::String(token.to_string()));
303
304        if let Some(chunk_any) = chunk {
305            if let Some(gen_chunk) = chunk_any.downcast_ref::<GenerationChunk>() {
306                event_kwargs.insert(
307                    "chunk".to_string(),
308                    serde_json::to_value(gen_chunk).unwrap_or_default(),
309                );
310            } else if let Some(chat_chunk) = chunk_any.downcast_ref::<ChatGenerationChunk>() {
311                event_kwargs.insert(
312                    "chunk".to_string(),
313                    serde_json::to_value(chat_chunk).unwrap_or_default(),
314                );
315            }
316        }
317
318        run.events
319            .push(RunEvent::with_kwargs("new_token", event_kwargs));
320
321        Ok(run.clone())
322    }
323
324    /// Process an LLM run with a retry event.
325    fn llm_run_with_retry_event(
326        &mut self,
327        retry_state: &HashMap<String, Value>,
328        run_id: Uuid,
329    ) -> Result<Run, TracerError> {
330        let run = self
331            .run_map_mut()
332            .get_mut(&run_id.to_string())
333            .ok_or(TracerError::RunNotFound(run_id))?;
334
335        run.events
336            .push(RunEvent::with_kwargs("retry", retry_state.clone()));
337
338        Ok(run.clone())
339    }
340
341    /// Complete an LLM run.
342    fn complete_llm_run(&mut self, response: &LLMResult, run_id: Uuid) -> Result<Run, TracerError> {
343        let run = self
344            .run_map_mut()
345            .get_mut(&run_id.to_string())
346            .ok_or(TracerError::RunNotFound(run_id))?;
347
348        // Check run type
349        if run.run_type != "llm" && run.run_type != "chat_model" {
350            return Err(TracerError::WrongRunType {
351                run_id,
352                expected: vec!["llm".to_string(), "chat_model".to_string()],
353                actual: run.run_type.clone(),
354            });
355        }
356
357        if run.outputs.is_none() {
358            run.outputs = Some(HashMap::new());
359        }
360
361        let omit_outputs = run
362            .extra
363            .get("__omit_auto_outputs")
364            .and_then(|v| v.as_bool())
365            .unwrap_or(false);
366
367        if !omit_outputs
368            && let Some(outputs) = &mut run.outputs
369            && let Ok(Value::Object(map)) = serde_json::to_value(response)
370        {
371            for (k, v) in map {
372                outputs.insert(k, v);
373            }
374        }
375
376        run.end_time = Some(Utc::now());
377        run.events
378            .push(RunEvent::with_time("end", run.end_time.unwrap()));
379
380        Ok(run.clone())
381    }
382
383    /// Mark an LLM run as errored.
384    fn errored_llm_run(
385        &mut self,
386        error: &dyn std::error::Error,
387        run_id: Uuid,
388        response: Option<&LLMResult>,
389    ) -> Result<Run, TracerError> {
390        let run = self
391            .run_map_mut()
392            .get_mut(&run_id.to_string())
393            .ok_or(TracerError::RunNotFound(run_id))?;
394
395        // Check run type
396        if run.run_type != "llm" && run.run_type != "chat_model" {
397            return Err(TracerError::WrongRunType {
398                run_id,
399                expected: vec!["llm".to_string(), "chat_model".to_string()],
400                actual: run.run_type.clone(),
401            });
402        }
403
404        run.error = Some(Self::get_stacktrace(error));
405
406        if let Some(resp) = response {
407            if run.outputs.is_none() {
408                run.outputs = Some(HashMap::new());
409            }
410
411            let omit_outputs = run
412                .extra
413                .get("__omit_auto_outputs")
414                .and_then(|v| v.as_bool())
415                .unwrap_or(false);
416
417            if !omit_outputs
418                && let Some(outputs) = &mut run.outputs
419                && let Ok(Value::Object(map)) = serde_json::to_value(resp)
420            {
421                for (k, v) in map {
422                    outputs.insert(k, v);
423                }
424            }
425        }
426
427        run.end_time = Some(Utc::now());
428        run.events
429            .push(RunEvent::with_time("error", run.end_time.unwrap()));
430
431        Ok(run.clone())
432    }
433
434    /// Create a chain run.
435    #[allow(clippy::too_many_arguments)]
436    fn create_chain_run(
437        &self,
438        serialized: HashMap<String, Value>,
439        inputs: HashMap<String, Value>,
440        run_id: Uuid,
441        parent_run_id: Option<Uuid>,
442        tags: Option<Vec<String>>,
443        metadata: Option<HashMap<String, Value>>,
444        run_type: Option<String>,
445        name: Option<String>,
446        extra: HashMap<String, Value>,
447    ) -> Run {
448        let start_time = Utc::now();
449        let mut run_extra = extra;
450        if let Some(meta) = metadata {
451            run_extra.insert(
452                "metadata".to_string(),
453                serde_json::to_value(meta).unwrap_or_default(),
454            );
455        }
456
457        let processed_inputs = self.get_chain_inputs(inputs);
458
459        Run {
460            id: run_id,
461            name: name.unwrap_or_else(|| "Chain".to_string()),
462            run_type: run_type.unwrap_or_else(|| "chain".to_string()),
463            parent_run_id,
464            trace_id: None,
465            dotted_order: None,
466            start_time,
467            end_time: None,
468            inputs: processed_inputs,
469            outputs: None,
470            error: None,
471            serialized,
472            extra: run_extra,
473            events: vec![RunEvent::with_time("start", start_time)],
474            tags: Some(tags.unwrap_or_default()),
475            child_runs: Vec::new(),
476            session_name: None,
477            reference_example_id: None,
478        }
479    }
480
481    /// Get chain inputs based on schema format.
482    fn get_chain_inputs(&self, inputs: HashMap<String, Value>) -> HashMap<String, Value> {
483        match self.config().schema_format {
484            SchemaFormat::Original | SchemaFormat::OriginalChat => inputs,
485            SchemaFormat::StreamingEvents => [(
486                "input".to_string(),
487                serde_json::to_value(inputs).unwrap_or_default(),
488            )]
489            .into_iter()
490            .collect(),
491        }
492    }
493
494    /// Get chain outputs based on schema format.
495    fn get_chain_outputs(&self, outputs: HashMap<String, Value>) -> HashMap<String, Value> {
496        match self.config().schema_format {
497            SchemaFormat::Original | SchemaFormat::OriginalChat => outputs,
498            SchemaFormat::StreamingEvents => [(
499                "output".to_string(),
500                serde_json::to_value(outputs).unwrap_or_default(),
501            )]
502            .into_iter()
503            .collect(),
504        }
505    }
506
507    /// Complete a chain run.
508    fn complete_chain_run(
509        &mut self,
510        outputs: HashMap<String, Value>,
511        run_id: Uuid,
512        inputs: Option<HashMap<String, Value>>,
513    ) -> Result<Run, TracerError> {
514        let processed_outputs = self.get_chain_outputs(outputs);
515        let processed_inputs = inputs.map(|i| self.get_chain_inputs(i));
516
517        let run = self
518            .run_map_mut()
519            .get_mut(&run_id.to_string())
520            .ok_or(TracerError::RunNotFound(run_id))?;
521
522        if run.outputs.is_none() {
523            run.outputs = Some(HashMap::new());
524        }
525
526        let omit_outputs = run
527            .extra
528            .get("__omit_auto_outputs")
529            .and_then(|v| v.as_bool())
530            .unwrap_or(false);
531
532        if !omit_outputs && let Some(outputs) = &mut run.outputs {
533            outputs.extend(processed_outputs);
534        }
535
536        run.end_time = Some(Utc::now());
537        run.events
538            .push(RunEvent::with_time("end", run.end_time.unwrap()));
539
540        if let Some(inputs) = processed_inputs {
541            run.inputs = inputs;
542        }
543
544        Ok(run.clone())
545    }
546
547    /// Mark a chain run as errored.
548    fn errored_chain_run(
549        &mut self,
550        error: &dyn std::error::Error,
551        run_id: Uuid,
552        inputs: Option<HashMap<String, Value>>,
553    ) -> Result<Run, TracerError> {
554        let processed_inputs = inputs.map(|i| self.get_chain_inputs(i));
555
556        let run = self
557            .run_map_mut()
558            .get_mut(&run_id.to_string())
559            .ok_or(TracerError::RunNotFound(run_id))?;
560
561        run.error = Some(Self::get_stacktrace(error));
562        run.end_time = Some(Utc::now());
563        run.events
564            .push(RunEvent::with_time("error", run.end_time.unwrap()));
565
566        if let Some(inputs) = processed_inputs {
567            run.inputs = inputs;
568        }
569
570        Ok(run.clone())
571    }
572
573    /// Create a tool run.
574    #[allow(clippy::too_many_arguments)]
575    fn create_tool_run(
576        &self,
577        serialized: HashMap<String, Value>,
578        input_str: &str,
579        run_id: Uuid,
580        parent_run_id: Option<Uuid>,
581        tags: Option<Vec<String>>,
582        metadata: Option<HashMap<String, Value>>,
583        name: Option<String>,
584        inputs: Option<HashMap<String, Value>>,
585        extra: HashMap<String, Value>,
586    ) -> Run {
587        let start_time = Utc::now();
588        let mut run_extra = extra;
589        if let Some(meta) = metadata {
590            run_extra.insert(
591                "metadata".to_string(),
592                serde_json::to_value(meta).unwrap_or_default(),
593            );
594        }
595
596        let processed_inputs = match self.config().schema_format {
597            SchemaFormat::Original | SchemaFormat::OriginalChat => {
598                [("input".to_string(), Value::String(input_str.to_string()))]
599                    .into_iter()
600                    .collect()
601            }
602            SchemaFormat::StreamingEvents => [(
603                "input".to_string(),
604                serde_json::to_value(inputs).unwrap_or_default(),
605            )]
606            .into_iter()
607            .collect(),
608        };
609
610        Run {
611            id: run_id,
612            name: name.unwrap_or_else(|| "Tool".to_string()),
613            run_type: "tool".to_string(),
614            parent_run_id,
615            trace_id: None,
616            dotted_order: None,
617            start_time,
618            end_time: None,
619            inputs: processed_inputs,
620            outputs: None,
621            error: None,
622            serialized,
623            extra: run_extra,
624            events: vec![RunEvent::with_time("start", start_time)],
625            tags: Some(tags.unwrap_or_default()),
626            child_runs: Vec::new(),
627            session_name: None,
628            reference_example_id: None,
629        }
630    }
631
632    /// Complete a tool run.
633    fn complete_tool_run(&mut self, output: Value, run_id: Uuid) -> Result<Run, TracerError> {
634        let run = self
635            .run_map_mut()
636            .get_mut(&run_id.to_string())
637            .ok_or(TracerError::RunNotFound(run_id))?;
638
639        if run.run_type != "tool" {
640            return Err(TracerError::WrongRunType {
641                run_id,
642                expected: vec!["tool".to_string()],
643                actual: run.run_type.clone(),
644            });
645        }
646
647        if run.outputs.is_none() {
648            run.outputs = Some(HashMap::new());
649        }
650
651        let omit_outputs = run
652            .extra
653            .get("__omit_auto_outputs")
654            .and_then(|v| v.as_bool())
655            .unwrap_or(false);
656
657        if !omit_outputs && let Some(outputs) = &mut run.outputs {
658            outputs.insert("output".to_string(), output);
659        }
660
661        run.end_time = Some(Utc::now());
662        run.events
663            .push(RunEvent::with_time("end", run.end_time.unwrap()));
664
665        Ok(run.clone())
666    }
667
668    /// Mark a tool run as errored.
669    fn errored_tool_run(
670        &mut self,
671        error: &dyn std::error::Error,
672        run_id: Uuid,
673    ) -> Result<Run, TracerError> {
674        let run = self
675            .run_map_mut()
676            .get_mut(&run_id.to_string())
677            .ok_or(TracerError::RunNotFound(run_id))?;
678
679        if run.run_type != "tool" {
680            return Err(TracerError::WrongRunType {
681                run_id,
682                expected: vec!["tool".to_string()],
683                actual: run.run_type.clone(),
684            });
685        }
686
687        run.error = Some(Self::get_stacktrace(error));
688        run.end_time = Some(Utc::now());
689        run.events
690            .push(RunEvent::with_time("error", run.end_time.unwrap()));
691
692        Ok(run.clone())
693    }
694
695    /// Create a retrieval run.
696    #[allow(clippy::too_many_arguments)]
697    fn create_retrieval_run(
698        &self,
699        serialized: HashMap<String, Value>,
700        query: &str,
701        run_id: Uuid,
702        parent_run_id: Option<Uuid>,
703        tags: Option<Vec<String>>,
704        metadata: Option<HashMap<String, Value>>,
705        name: Option<String>,
706        extra: HashMap<String, Value>,
707    ) -> Run {
708        let start_time = Utc::now();
709        let mut run_extra = extra;
710        if let Some(meta) = metadata {
711            run_extra.insert(
712                "metadata".to_string(),
713                serde_json::to_value(meta).unwrap_or_default(),
714            );
715        }
716
717        let inputs: HashMap<String, Value> =
718            [("query".to_string(), Value::String(query.to_string()))]
719                .into_iter()
720                .collect();
721
722        Run {
723            id: run_id,
724            name: name.unwrap_or_else(|| "Retriever".to_string()),
725            run_type: "retriever".to_string(),
726            parent_run_id,
727            trace_id: None,
728            dotted_order: None,
729            start_time,
730            end_time: None,
731            inputs,
732            outputs: None,
733            error: None,
734            serialized,
735            extra: run_extra,
736            events: vec![RunEvent::with_time("start", start_time)],
737            tags,
738            child_runs: Vec::new(),
739            session_name: None,
740            reference_example_id: None,
741        }
742    }
743
744    /// Complete a retrieval run.
745    fn complete_retrieval_run(
746        &mut self,
747        documents: Vec<Value>,
748        run_id: Uuid,
749    ) -> Result<Run, TracerError> {
750        let run = self
751            .run_map_mut()
752            .get_mut(&run_id.to_string())
753            .ok_or(TracerError::RunNotFound(run_id))?;
754
755        if run.run_type != "retriever" {
756            return Err(TracerError::WrongRunType {
757                run_id,
758                expected: vec!["retriever".to_string()],
759                actual: run.run_type.clone(),
760            });
761        }
762
763        if run.outputs.is_none() {
764            run.outputs = Some(HashMap::new());
765        }
766
767        let omit_outputs = run
768            .extra
769            .get("__omit_auto_outputs")
770            .and_then(|v| v.as_bool())
771            .unwrap_or(false);
772
773        if !omit_outputs && let Some(outputs) = &mut run.outputs {
774            outputs.insert("documents".to_string(), Value::Array(documents));
775        }
776
777        run.end_time = Some(Utc::now());
778        run.events
779            .push(RunEvent::with_time("end", run.end_time.unwrap()));
780
781        Ok(run.clone())
782    }
783
784    /// Mark a retrieval run as errored.
785    fn errored_retrieval_run(
786        &mut self,
787        error: &dyn std::error::Error,
788        run_id: Uuid,
789    ) -> Result<Run, TracerError> {
790        let run = self
791            .run_map_mut()
792            .get_mut(&run_id.to_string())
793            .ok_or(TracerError::RunNotFound(run_id))?;
794
795        if run.run_type != "retriever" {
796            return Err(TracerError::WrongRunType {
797                run_id,
798                expected: vec!["retriever".to_string()],
799                actual: run.run_type.clone(),
800            });
801        }
802
803        run.error = Some(Self::get_stacktrace(error));
804        run.end_time = Some(Utc::now());
805        run.events
806            .push(RunEvent::with_time("error", run.end_time.unwrap()));
807
808        Ok(run.clone())
809    }
810
811    // Hook methods (to be overridden by subclasses)
812
813    /// Called when a run is created.
814    fn on_run_create(&mut self, _run: &Run) {}
815
816    /// Called when a run is updated.
817    fn on_run_update(&mut self, _run: &Run) {}
818
819    /// Called when an LLM run starts.
820    fn on_llm_start(&mut self, _run: &Run) {}
821
822    /// Called when a new LLM token is received.
823    fn on_llm_new_token(&mut self, _run: &Run, _token: &str, _chunk: Option<&dyn std::any::Any>) {}
824
825    /// Called when an LLM run ends.
826    fn on_llm_end(&mut self, _run: &Run) {}
827
828    /// Called when an LLM run errors.
829    fn on_llm_error(&mut self, _run: &Run) {}
830
831    /// Called when a chain run starts.
832    fn on_chain_start(&mut self, _run: &Run) {}
833
834    /// Called when a chain run ends.
835    fn on_chain_end(&mut self, _run: &Run) {}
836
837    /// Called when a chain run errors.
838    fn on_chain_error(&mut self, _run: &Run) {}
839
840    /// Called when a tool run starts.
841    fn on_tool_start(&mut self, _run: &Run) {}
842
843    /// Called when a tool run ends.
844    fn on_tool_end(&mut self, _run: &Run) {}
845
846    /// Called when a tool run errors.
847    fn on_tool_error(&mut self, _run: &Run) {}
848
849    /// Called when a chat model run starts.
850    fn on_chat_model_start(&mut self, _run: &Run) {}
851
852    /// Called when a retriever run starts.
853    fn on_retriever_start(&mut self, _run: &Run) {}
854
855    /// Called when a retriever run ends.
856    fn on_retriever_end(&mut self, _run: &Run) {}
857
858    /// Called when a retriever run errors.
859    fn on_retriever_error(&mut self, _run: &Run) {}
860}
861
862/// Error type for tracer operations.
863#[derive(Debug, Clone)]
864pub enum TracerError {
865    /// Run not found.
866    RunNotFound(Uuid),
867    /// Wrong run type.
868    WrongRunType {
869        run_id: Uuid,
870        expected: Vec<String>,
871        actual: String,
872    },
873    /// Unsupported schema format.
874    UnsupportedSchemaFormat(String),
875}
876
877impl std::fmt::Display for TracerError {
878    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
879        match self {
880            TracerError::RunNotFound(id) => write!(f, "No indexed run ID {}", id),
881            TracerError::WrongRunType {
882                run_id,
883                expected,
884                actual,
885            } => write!(
886                f,
887                "Found {} run at ID {}, but expected {:?} run",
888                actual, run_id, expected
889            ),
890            TracerError::UnsupportedSchemaFormat(msg) => write!(f, "{}", msg),
891        }
892    }
893}
894
895impl std::error::Error for TracerError {}
896
897#[cfg(test)]
898mod tests {
899    use super::*;
900
901    #[derive(Debug)]
902    struct TestTracer {
903        config: TracerCoreConfig,
904        run_map: HashMap<String, Run>,
905        order_map: HashMap<Uuid, (Uuid, String)>,
906    }
907
908    impl TestTracer {
909        fn new() -> Self {
910            Self {
911                config: TracerCoreConfig::default(),
912                run_map: HashMap::new(),
913                order_map: HashMap::new(),
914            }
915        }
916    }
917
918    impl TracerCore for TestTracer {
919        fn config(&self) -> &TracerCoreConfig {
920            &self.config
921        }
922
923        fn config_mut(&mut self) -> &mut TracerCoreConfig {
924            &mut self.config
925        }
926
927        fn run_map(&self) -> &HashMap<String, Run> {
928            &self.run_map
929        }
930
931        fn run_map_mut(&mut self) -> &mut HashMap<String, Run> {
932            &mut self.run_map
933        }
934
935        fn order_map(&self) -> &HashMap<Uuid, (Uuid, String)> {
936            &self.order_map
937        }
938
939        fn order_map_mut(&mut self) -> &mut HashMap<Uuid, (Uuid, String)> {
940            &mut self.order_map
941        }
942
943        fn persist_run(&mut self, _run: &Run) {}
944    }
945
946    #[test]
947    fn test_create_chain_run() {
948        let tracer = TestTracer::new();
949        let run = tracer.create_chain_run(
950            HashMap::new(),
951            HashMap::new(),
952            Uuid::new_v4(),
953            None,
954            None,
955            None,
956            None,
957            None,
958            HashMap::new(),
959        );
960
961        assert_eq!(run.run_type, "chain");
962        assert!(run.end_time.is_none());
963        assert!(!run.events.is_empty());
964    }
965
966    #[test]
967    fn test_start_trace() {
968        let mut tracer = TestTracer::new();
969        let mut run = tracer.create_chain_run(
970            HashMap::new(),
971            HashMap::new(),
972            Uuid::new_v4(),
973            None,
974            None,
975            None,
976            None,
977            None,
978            HashMap::new(),
979        );
980
981        tracer.start_trace(&mut run);
982
983        assert!(run.trace_id.is_some());
984        assert!(run.dotted_order.is_some());
985        assert!(tracer.run_map.contains_key(&run.id.to_string()));
986    }
987
988    #[test]
989    fn test_complete_chain_run() {
990        let mut tracer = TestTracer::new();
991        let mut run = tracer.create_chain_run(
992            HashMap::new(),
993            HashMap::new(),
994            Uuid::new_v4(),
995            None,
996            None,
997            None,
998            None,
999            None,
1000            HashMap::new(),
1001        );
1002
1003        tracer.start_trace(&mut run);
1004        let run_id = run.id;
1005
1006        let result = tracer.complete_chain_run(
1007            [("result".to_string(), Value::String("success".to_string()))]
1008                .into_iter()
1009                .collect(),
1010            run_id,
1011            None,
1012        );
1013
1014        assert!(result.is_ok());
1015        let completed_run = result.unwrap();
1016        assert!(completed_run.end_time.is_some());
1017        assert!(completed_run.outputs.is_some());
1018    }
1019
1020    #[test]
1021    fn test_get_run_not_found() {
1022        let tracer = TestTracer::new();
1023        let result = tracer.get_run(Uuid::new_v4(), None);
1024        assert!(result.is_err());
1025    }
1026}