1use std::collections::HashMap;
7use std::fmt;
8use std::sync::{Arc, Mutex};
9
10use chrono::{DateTime, Utc};
11use serde::{Deserialize, Serialize};
12use serde_json::Value;
13use uuid::Uuid;
14
15use crate::tracers::base::BaseTracer;
16use crate::tracers::core::{SchemaFormat, TracerCore, TracerCoreConfig};
17use crate::tracers::memory_stream::{MemoryStream, ReceiveStream, SendStream};
18use crate::tracers::schemas::Run;
19use crate::tracers::streaming::StreamingCallbackHandler;
20
21#[derive(Debug, Clone, Serialize, Deserialize)]
23pub struct LogEntry {
24 pub id: String,
26 pub name: String,
28 #[serde(rename = "type")]
30 pub run_type: String,
31 pub tags: Vec<String>,
33 pub metadata: HashMap<String, Value>,
35 pub start_time: String,
37 pub streamed_output_str: Vec<String>,
39 pub streamed_output: Vec<Value>,
41 #[serde(skip_serializing_if = "Option::is_none")]
43 pub inputs: Option<Value>,
44 pub final_output: Option<Value>,
46 pub end_time: Option<String>,
48}
49
50impl LogEntry {
51 pub fn new(
53 id: String,
54 name: String,
55 run_type: String,
56 tags: Vec<String>,
57 metadata: HashMap<String, Value>,
58 start_time: DateTime<Utc>,
59 ) -> Self {
60 Self {
61 id,
62 name,
63 run_type,
64 tags,
65 metadata,
66 start_time: start_time.format("%Y-%m-%dT%H:%M:%S%.3fZ").to_string(),
67 streamed_output_str: Vec::new(),
68 streamed_output: Vec::new(),
69 inputs: None,
70 final_output: None,
71 end_time: None,
72 }
73 }
74}
75
76#[derive(Debug, Clone, Serialize, Deserialize)]
78pub struct RunState {
79 pub id: String,
81 pub streamed_output: Vec<Value>,
83 pub final_output: Option<Value>,
85 pub name: String,
87 #[serde(rename = "type")]
89 pub run_type: String,
90 pub logs: HashMap<String, LogEntry>,
92}
93
94impl RunState {
95 pub fn new(id: String, name: String, run_type: String) -> Self {
97 Self {
98 id,
99 streamed_output: Vec::new(),
100 final_output: None,
101 name,
102 run_type,
103 logs: HashMap::new(),
104 }
105 }
106}
107
108#[derive(Debug, Clone, Serialize, Deserialize)]
110pub struct JsonPatchOp {
111 pub op: String,
113 pub path: String,
115 #[serde(skip_serializing_if = "Option::is_none")]
117 pub value: Option<Value>,
118}
119
120impl JsonPatchOp {
121 pub fn add(path: impl Into<String>, value: Value) -> Self {
123 Self {
124 op: "add".to_string(),
125 path: path.into(),
126 value: Some(value),
127 }
128 }
129
130 pub fn replace(path: impl Into<String>, value: Value) -> Self {
132 Self {
133 op: "replace".to_string(),
134 path: path.into(),
135 value: Some(value),
136 }
137 }
138
139 pub fn remove(path: impl Into<String>) -> Self {
141 Self {
142 op: "remove".to_string(),
143 path: path.into(),
144 value: None,
145 }
146 }
147}
148
149#[derive(Debug, Clone, Serialize, Deserialize)]
151pub struct RunLogPatch {
152 pub ops: Vec<JsonPatchOp>,
154}
155
156impl RunLogPatch {
157 pub fn new(ops: Vec<JsonPatchOp>) -> Self {
159 Self { ops }
160 }
161
162 pub fn from_op(op: JsonPatchOp) -> Self {
164 Self { ops: vec![op] }
165 }
166
167 pub fn from_ops(ops: impl IntoIterator<Item = JsonPatchOp>) -> Self {
169 Self {
170 ops: ops.into_iter().collect(),
171 }
172 }
173}
174
175impl fmt::Display for RunLogPatch {
176 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
177 write!(f, "RunLogPatch({:?})", self.ops)
178 }
179}
180
181#[derive(Debug, Clone, Serialize, Deserialize)]
183pub struct RunLog {
184 pub ops: Vec<JsonPatchOp>,
186 pub state: Option<RunState>,
188}
189
190impl RunLog {
191 pub fn new(ops: Vec<JsonPatchOp>, state: Option<RunState>) -> Self {
193 Self { ops, state }
194 }
195
196 pub fn apply_patch(&mut self, patch: RunLogPatch) {
198 self.ops.extend(patch.ops.clone());
199
200 if let Some(ref mut state) = self.state {
202 for op in patch.ops {
203 Self::apply_op_to_state(state, &op);
204 }
205 }
206 }
207
208 fn apply_op_to_state(state: &mut RunState, op: &JsonPatchOp) {
209 let path_parts: Vec<&str> = op.path.split('/').filter(|s| !s.is_empty()).collect();
210
211 match op.op.as_str() {
212 "replace" => {
213 if op.path.is_empty() || op.path == "/" {
214 if let Some(value) = &op.value
216 && let Ok(new_state) = serde_json::from_value::<RunState>(value.clone())
217 {
218 *state = new_state;
219 }
220 } else if path_parts.first() == Some(&"final_output") {
221 state.final_output = op.value.clone();
222 }
223 }
224 "add" => {
225 if path_parts.len() >= 2 {
226 match path_parts[0] {
227 "logs" => {
228 if path_parts.len() == 2 {
229 if let Some(value) = &op.value
231 && let Ok(entry) =
232 serde_json::from_value::<LogEntry>(value.clone())
233 {
234 state.logs.insert(path_parts[1].to_string(), entry);
235 }
236 } else if path_parts.len() >= 3 {
237 if let Some(entry) = state.logs.get_mut(path_parts[1]) {
239 match path_parts[2] {
240 "streamed_output"
241 if path_parts.len() == 4 && path_parts[3] == "-" =>
242 {
243 if let Some(value) = &op.value {
244 entry.streamed_output.push(value.clone());
245 }
246 }
247 "streamed_output_str"
248 if path_parts.len() == 4 && path_parts[3] == "-" =>
249 {
250 if let Some(value) = &op.value
251 && let Some(s) = value.as_str()
252 {
253 entry.streamed_output_str.push(s.to_string());
254 }
255 }
256 "final_output" => {
257 entry.final_output = op.value.clone();
258 }
259 "end_time" => {
260 entry.end_time = op
261 .value
262 .clone()
263 .and_then(|v| v.as_str().map(String::from));
264 }
265 "inputs" => {
266 entry.inputs = op.value.clone();
267 }
268 _ => {}
269 }
270 }
271 }
272 }
273 "streamed_output" if path_parts.len() == 2 && path_parts[1] == "-" => {
274 if let Some(value) = &op.value {
275 state.streamed_output.push(value.clone());
276 }
277 }
278 "final_output" => {
279 state.final_output = op.value.clone();
280 }
281 _ => {}
282 }
283 }
284 }
285 _ => {}
286 }
287 }
288}
289
290impl fmt::Display for RunLog {
291 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
292 write!(f, "RunLog({:?})", self.state)
293 }
294}
295
296pub struct LogStreamCallbackHandler {
298 config: TracerCoreConfig,
300 run_map: HashMap<String, Run>,
302 order_map: HashMap<Uuid, (Uuid, String)>,
304 auto_close: bool,
306 include_names: Option<Vec<String>>,
308 include_types: Option<Vec<String>>,
310 include_tags: Option<Vec<String>>,
312 exclude_names: Option<Vec<String>>,
314 exclude_types: Option<Vec<String>>,
316 exclude_tags: Option<Vec<String>>,
318 send_stream: SendStream<RunLogPatch>,
320 receive_stream: Option<ReceiveStream<RunLogPatch>>,
322 key_map_by_run_id: HashMap<Uuid, String>,
324 counter_map_by_name: HashMap<String, usize>,
326 root_id: Option<Uuid>,
328 lock: Arc<Mutex<()>>,
330}
331
332impl fmt::Debug for LogStreamCallbackHandler {
333 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
334 f.debug_struct("LogStreamCallbackHandler")
335 .field("config", &self.config)
336 .field("auto_close", &self.auto_close)
337 .field("include_names", &self.include_names)
338 .field("include_types", &self.include_types)
339 .field("include_tags", &self.include_tags)
340 .field("exclude_names", &self.exclude_names)
341 .field("exclude_types", &self.exclude_types)
342 .field("exclude_tags", &self.exclude_tags)
343 .field("root_id", &self.root_id)
344 .finish()
345 }
346}
347
348#[derive(Debug, Clone, Default)]
350pub struct LogStreamConfig {
351 pub auto_close: bool,
353 pub include_names: Option<Vec<String>>,
355 pub include_types: Option<Vec<String>>,
357 pub include_tags: Option<Vec<String>>,
359 pub exclude_names: Option<Vec<String>>,
361 pub exclude_types: Option<Vec<String>>,
363 pub exclude_tags: Option<Vec<String>>,
365 pub schema_format: SchemaFormat,
367}
368
369impl LogStreamCallbackHandler {
370 pub fn new(config: LogStreamConfig) -> Self {
372 let stream: MemoryStream<RunLogPatch> = MemoryStream::new();
373 let send_stream = stream.get_send_stream();
374 let receive_stream = stream.get_receive_stream();
375
376 Self {
377 config: TracerCoreConfig {
378 schema_format: config.schema_format,
379 log_missing_parent: true,
380 },
381 run_map: HashMap::new(),
382 order_map: HashMap::new(),
383 auto_close: config.auto_close,
384 include_names: config.include_names,
385 include_types: config.include_types,
386 include_tags: config.include_tags,
387 exclude_names: config.exclude_names,
388 exclude_types: config.exclude_types,
389 exclude_tags: config.exclude_tags,
390 send_stream,
391 receive_stream: Some(receive_stream),
392 key_map_by_run_id: HashMap::new(),
393 counter_map_by_name: HashMap::new(),
394 root_id: None,
395 lock: Arc::new(Mutex::new(())),
396 }
397 }
398
399 pub fn take_receive_stream(&mut self) -> Option<ReceiveStream<RunLogPatch>> {
401 self.receive_stream.take()
402 }
403
404 pub fn root_id(&self) -> Option<Uuid> {
406 self.root_id
407 }
408
409 pub fn send(&self, ops: Vec<JsonPatchOp>) -> bool {
415 self.send_stream.send_nowait(RunLogPatch::new(ops)).is_ok()
416 }
417
418 pub fn include_run(&self, run: &Run) -> bool {
420 if Some(run.id) == self.root_id {
421 return false;
422 }
423
424 let run_tags = run.tags.clone().unwrap_or_default();
425
426 let mut include = self.include_names.is_none()
427 && self.include_types.is_none()
428 && self.include_tags.is_none();
429
430 if let Some(ref names) = self.include_names {
431 include = include || names.contains(&run.name);
432 }
433 if let Some(ref types) = self.include_types {
434 include = include || types.contains(&run.run_type);
435 }
436 if let Some(ref tags) = self.include_tags {
437 include = include || run_tags.iter().any(|t| tags.contains(t));
438 }
439
440 if let Some(ref names) = self.exclude_names {
441 include = include && !names.contains(&run.name);
442 }
443 if let Some(ref types) = self.exclude_types {
444 include = include && !types.contains(&run.run_type);
445 }
446 if let Some(ref tags) = self.exclude_tags {
447 include = include && !run_tags.iter().any(|t| tags.contains(t));
448 }
449
450 include
451 }
452
453 fn get_standardized_inputs(&self, run: &Run) -> Option<Value> {
455 match self.config.schema_format {
456 SchemaFormat::Original | SchemaFormat::OriginalChat => {
457 Some(serde_json::to_value(&run.inputs).unwrap_or_default())
458 }
459 SchemaFormat::StreamingEvents => {
460 if run.run_type == "retriever"
461 || run.run_type == "llm"
462 || run.run_type == "chat_model"
463 {
464 Some(serde_json::to_value(&run.inputs).unwrap_or_default())
465 } else {
466 run.inputs.get("input").cloned()
467 }
468 }
469 }
470 }
471
472 fn get_standardized_outputs(&self, run: &Run) -> Option<Value> {
474 let outputs = run.outputs.as_ref()?;
475
476 match self.config.schema_format {
477 SchemaFormat::Original | SchemaFormat::OriginalChat => {
478 if run.run_type == "prompt" {
479 outputs.get("output").cloned()
480 } else {
481 Some(serde_json::to_value(outputs).unwrap_or_default())
482 }
483 }
484 SchemaFormat::StreamingEvents => {
485 if run.run_type == "retriever"
486 || run.run_type == "llm"
487 || run.run_type == "chat_model"
488 {
489 Some(serde_json::to_value(outputs).unwrap_or_default())
490 } else {
491 outputs.get("output").cloned()
492 }
493 }
494 }
495 }
496}
497
498impl TracerCore for LogStreamCallbackHandler {
499 fn config(&self) -> &TracerCoreConfig {
500 &self.config
501 }
502
503 fn config_mut(&mut self) -> &mut TracerCoreConfig {
504 &mut self.config
505 }
506
507 fn run_map(&self) -> &HashMap<String, Run> {
508 &self.run_map
509 }
510
511 fn run_map_mut(&mut self) -> &mut HashMap<String, Run> {
512 &mut self.run_map
513 }
514
515 fn order_map(&self) -> &HashMap<Uuid, (Uuid, String)> {
516 &self.order_map
517 }
518
519 fn order_map_mut(&mut self) -> &mut HashMap<Uuid, (Uuid, String)> {
520 &mut self.order_map
521 }
522
523 fn persist_run(&mut self, _run: &Run) {
524 }
527
528 fn on_run_create(&mut self, run: &Run) {
529 if self.root_id.is_none() {
530 self.root_id = Some(run.id);
531 let state = RunState::new(run.id.to_string(), run.name.clone(), run.run_type.clone());
532 if !self.send(vec![JsonPatchOp::replace(
533 "",
534 serde_json::to_value(state).unwrap_or_default(),
535 )]) {
536 return;
537 }
538 }
539
540 if !self.include_run(run) {
541 return;
542 }
543
544 let _lock = self.lock.lock().unwrap();
546 let count = self
547 .counter_map_by_name
548 .entry(run.name.clone())
549 .or_insert(0);
550 *count += 1;
551 let key = if *count == 1 {
552 run.name.clone()
553 } else {
554 format!("{}:{}", run.name, count)
555 };
556 self.key_map_by_run_id.insert(run.id, key.clone());
557
558 let metadata = run
559 .extra
560 .get("metadata")
561 .and_then(|v| serde_json::from_value::<HashMap<String, Value>>(v.clone()).ok())
562 .unwrap_or_default();
563
564 let mut entry = LogEntry::new(
565 run.id.to_string(),
566 run.name.clone(),
567 run.run_type.clone(),
568 run.tags.clone().unwrap_or_default(),
569 metadata,
570 run.start_time,
571 );
572
573 if self.config.schema_format == SchemaFormat::StreamingEvents {
574 entry.inputs = self.get_standardized_inputs(run);
575 }
576
577 self.send(vec![JsonPatchOp::add(
578 format!("/logs/{}", key),
579 serde_json::to_value(entry).unwrap_or_default(),
580 )]);
581 }
582
583 fn on_run_update(&mut self, run: &Run) {
584 let key = match self.key_map_by_run_id.get(&run.id) {
585 Some(k) => k.clone(),
586 None => {
587 if run.id == self.root_id.unwrap_or(Uuid::nil()) && self.auto_close {
589 let _ = self.send_stream.close();
590 }
591 return;
592 }
593 };
594
595 let mut ops = Vec::new();
596
597 if self.config.schema_format == SchemaFormat::StreamingEvents
598 && let Some(inputs) = self.get_standardized_inputs(run)
599 {
600 ops.push(JsonPatchOp::replace(
601 format!("/logs/{}/inputs", key),
602 inputs,
603 ));
604 }
605
606 if let Some(outputs) = self.get_standardized_outputs(run) {
607 ops.push(JsonPatchOp::add(
608 format!("/logs/{}/final_output", key),
609 outputs,
610 ));
611 }
612
613 if let Some(end_time) = run.end_time {
614 ops.push(JsonPatchOp::add(
615 format!("/logs/{}/end_time", key),
616 Value::String(end_time.format("%Y-%m-%dT%H:%M:%S%.3fZ").to_string()),
617 ));
618 }
619
620 self.send(ops);
621
622 if run.id == self.root_id.unwrap_or(Uuid::nil()) && self.auto_close {
623 let _ = self.send_stream.close();
624 }
625 }
626
627 fn on_llm_new_token(&mut self, run: &Run, token: &str, chunk: Option<&dyn std::any::Any>) {
628 let key = match self.key_map_by_run_id.get(&run.id) {
629 Some(k) => k.clone(),
630 None => return,
631 };
632
633 let chunk_value = if let Some(chunk_any) = chunk {
634 if let Some(gen_chunk) = chunk_any.downcast_ref::<crate::outputs::GenerationChunk>() {
635 serde_json::to_value(gen_chunk).unwrap_or(Value::String(token.to_string()))
636 } else if let Some(chat_chunk) =
637 chunk_any.downcast_ref::<crate::outputs::ChatGenerationChunk>()
638 {
639 serde_json::to_value(&chat_chunk.message)
641 .unwrap_or(Value::String(token.to_string()))
642 } else {
643 Value::String(token.to_string())
644 }
645 } else {
646 Value::String(token.to_string())
647 };
648
649 self.send(vec![
650 JsonPatchOp::add(
651 format!("/logs/{}/streamed_output_str/-", key),
652 Value::String(token.to_string()),
653 ),
654 JsonPatchOp::add(format!("/logs/{}/streamed_output/-", key), chunk_value),
655 ]);
656 }
657}
658
659impl BaseTracer for LogStreamCallbackHandler {
660 fn persist_run_impl(&mut self, _run: &Run) {
661 }
664}
665
666impl<T: Send + 'static> StreamingCallbackHandler<T> for LogStreamCallbackHandler {
667 fn tap_output_aiter(
668 &self,
669 run_id: Uuid,
670 output: std::pin::Pin<Box<dyn futures::Stream<Item = T> + Send>>,
671 ) -> std::pin::Pin<Box<dyn futures::Stream<Item = T> + Send>> {
672 use futures::StreamExt;
673
674 let root_id = self.root_id;
675 let key = self.key_map_by_run_id.get(&run_id).cloned();
676 let send_stream = self.send_stream.clone();
677
678 Box::pin(futures::stream::unfold(
679 (output, run_id, root_id, key, send_stream),
680 |(mut stream, run_id, root_id, key, sender)| async move {
681 let item = stream.next().await?;
682
683 if run_id != root_id.unwrap_or(Uuid::nil())
686 && let Some(ref k) = key
687 {
688 let _ = sender.send_nowait(RunLogPatch::new(vec![JsonPatchOp::add(
692 format!("/logs/{}/streamed_output/-", k),
693 Value::Null, )]));
695 }
696
697 Some((item, (stream, run_id, root_id, key, sender)))
698 },
699 ))
700 }
701
702 fn tap_output_iter(
703 &self,
704 run_id: Uuid,
705 output: Box<dyn Iterator<Item = T> + Send>,
706 ) -> Box<dyn Iterator<Item = T> + Send> {
707 let root_id = self.root_id;
708 let key = self.key_map_by_run_id.get(&run_id).cloned();
709 let send_stream = self.send_stream.clone();
710
711 Box::new(TappedIterator {
712 inner: output,
713 run_id,
714 root_id,
715 key,
716 send_stream,
717 })
718 }
719}
720
721struct TappedIterator<T> {
722 inner: Box<dyn Iterator<Item = T> + Send>,
723 run_id: Uuid,
724 root_id: Option<Uuid>,
725 key: Option<String>,
726 send_stream: SendStream<RunLogPatch>,
727}
728
729impl<T> Iterator for TappedIterator<T> {
730 type Item = T;
731
732 fn next(&mut self) -> Option<Self::Item> {
733 let item = self.inner.next()?;
734
735 if self.run_id != self.root_id.unwrap_or(Uuid::nil())
737 && let Some(ref k) = self.key
738 {
739 let _ = self
740 .send_stream
741 .send_nowait(RunLogPatch::new(vec![JsonPatchOp::add(
742 format!("/logs/{}/streamed_output/-", k),
743 Value::Null, )]));
745 }
746
747 Some(item)
748 }
749}
750
751#[cfg(test)]
752mod tests {
753 use super::*;
754
755 #[test]
756 fn test_log_entry_new() {
757 let entry = LogEntry::new(
758 "test-id".to_string(),
759 "test".to_string(),
760 "chain".to_string(),
761 vec!["tag1".to_string()],
762 HashMap::new(),
763 Utc::now(),
764 );
765
766 assert_eq!(entry.id, "test-id");
767 assert_eq!(entry.name, "test");
768 assert_eq!(entry.run_type, "chain");
769 assert!(entry.final_output.is_none());
770 }
771
772 #[test]
773 fn test_run_state_new() {
774 let state = RunState::new(
775 "state-id".to_string(),
776 "test".to_string(),
777 "chain".to_string(),
778 );
779
780 assert_eq!(state.id, "state-id");
781 assert!(state.logs.is_empty());
782 assert!(state.final_output.is_none());
783 }
784
785 #[test]
786 fn test_json_patch_ops() {
787 let add_op = JsonPatchOp::add("/path", Value::String("value".to_string()));
788 assert_eq!(add_op.op, "add");
789 assert_eq!(add_op.path, "/path");
790
791 let replace_op = JsonPatchOp::replace("/path", Value::Number(42.into()));
792 assert_eq!(replace_op.op, "replace");
793
794 let remove_op = JsonPatchOp::remove("/path");
795 assert_eq!(remove_op.op, "remove");
796 assert!(remove_op.value.is_none());
797 }
798
799 #[test]
800 fn test_run_log_apply_patch() {
801 let mut log = RunLog::new(
802 vec![],
803 Some(RunState::new(
804 "id".to_string(),
805 "test".to_string(),
806 "chain".to_string(),
807 )),
808 );
809
810 let patch = RunLogPatch::new(vec![JsonPatchOp::add(
811 "/logs/entry1",
812 serde_json::to_value(LogEntry::new(
813 "entry1".to_string(),
814 "sub".to_string(),
815 "tool".to_string(),
816 vec![],
817 HashMap::new(),
818 Utc::now(),
819 ))
820 .unwrap(),
821 )]);
822
823 log.apply_patch(patch);
824
825 assert!(log.state.as_ref().unwrap().logs.contains_key("entry1"));
826 }
827
828 #[test]
829 fn test_log_stream_handler_include_run() {
830 let handler = LogStreamCallbackHandler::new(LogStreamConfig {
831 include_names: Some(vec!["allowed".to_string()]),
832 ..Default::default()
833 });
834
835 let run = Run {
836 name: "allowed".to_string(),
837 ..Default::default()
838 };
839 assert!(handler.include_run(&run));
840
841 let run = Run {
842 name: "not_allowed".to_string(),
843 ..Default::default()
844 };
845 assert!(!handler.include_run(&run));
846 }
847
848 #[test]
849 fn test_log_stream_handler_exclude_run() {
850 let handler = LogStreamCallbackHandler::new(LogStreamConfig {
851 exclude_names: Some(vec!["excluded".to_string()]),
852 ..Default::default()
853 });
854
855 let run = Run {
856 name: "excluded".to_string(),
857 ..Default::default()
858 };
859 assert!(!handler.include_run(&run));
860
861 let run = Run {
862 name: "allowed".to_string(),
863 ..Default::default()
864 };
865 assert!(handler.include_run(&run));
866 }
867
868 #[test]
869 fn test_log_stream_handler_include_tags() {
870 let handler = LogStreamCallbackHandler::new(LogStreamConfig {
871 include_tags: Some(vec!["important".to_string()]),
872 ..Default::default()
873 });
874
875 let run = Run {
876 tags: Some(vec!["important".to_string(), "other".to_string()]),
877 ..Default::default()
878 };
879 assert!(handler.include_run(&run));
880
881 let run = Run {
882 tags: Some(vec!["other".to_string()]),
883 ..Default::default()
884 };
885 assert!(!handler.include_run(&run));
886 }
887}