langsmith_rust/observability/
node_wrapper.rs1use crate::observability::observer::Observer;
2use crate::models::run::RunType;
3use crate::tracing::decorator::trace_node;
4use crate::error::Result;
5use serde::Serialize;
6use std::future::Future;
7use std::sync::Arc;
8
9pub struct ObservableNodeWrapper {
11 name: String,
12 run_type: RunType,
13 observers: Vec<Arc<dyn Observer>>,
14}
15
16impl ObservableNodeWrapper {
17 pub fn new(name: impl Into<String>, run_type: RunType) -> Self {
18 Self {
19 name: name.into(),
20 run_type,
21 observers: Vec::new(),
22 }
23 }
24
25 pub fn with_observer(mut self, observer: Arc<dyn Observer>) -> Self {
26 self.observers.push(observer);
27 self
28 }
29
30 pub async fn execute<F, Fut, I, O>(
32 &self,
33 inputs: I,
34 f: F,
35 ) -> Result<O>
36 where
37 F: FnOnce(I) -> Fut,
38 Fut: Future<Output = Result<O>>,
39 I: Serialize,
40 O: Serialize,
41 {
42 use serde_json::to_value;
43
44 let inputs_value = to_value(&inputs).unwrap_or_default();
46 for observer in &self.observers {
47 observer.on_node_start(&self.name, &inputs_value);
48 }
49
50 let result = trace_node(&self.name, self.run_type.clone(), inputs, f).await;
52
53 match &result {
55 Ok(output) => {
56 let outputs_value = to_value(output).unwrap_or_default();
57 for observer in &self.observers {
58 observer.on_node_end(&self.name, &outputs_value);
59 }
60 }
61 Err(e) => {
62 for observer in &self.observers {
63 observer.on_node_error(&self.name, &e.to_string());
64 }
65 }
66 }
67
68 result
69 }
70}
71