Skip to main content

braintrust_sdk_rust/
span.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3
4use async_trait::async_trait;
5use serde_json::{Map, Value};
6use tokio::sync::Mutex;
7use uuid::Uuid;
8
9use crate::error::Result;
10use crate::types::{ParentSpanInfo, SpanAttributes, SpanPayload, SpanType};
11
12/// Event data to log to a span. All fields are optional.
13/// Multiple calls to `log()` will merge data.
14#[derive(Clone, Default)]
15pub struct SpanLog {
16    pub name: Option<String>,
17    pub input: Option<Value>,
18    pub output: Option<Value>,
19    pub metadata: Option<Map<String, Value>>,
20    pub metrics: Option<HashMap<String, f64>>,
21}
22
23#[async_trait]
24pub(crate) trait SpanSubmitter: Send + Sync {
25    async fn submit(
26        &self,
27        token: impl Into<String> + Send,
28        payload: SpanPayload,
29        parent_info: Option<ParentSpanInfo>,
30    ) -> Result<()>;
31}
32
33#[allow(private_bounds)]
34pub struct SpanBuilder<S: SpanSubmitter> {
35    submitter: Arc<S>,
36    token: String,
37    org_id: String,
38    org_name: Option<String>,
39    project_name: Option<String>,
40    parent_info: Option<ParentSpanInfo>,
41    span_type: SpanType,
42    purpose: Option<String>,
43}
44
45impl<S: SpanSubmitter> Clone for SpanBuilder<S> {
46    fn clone(&self) -> Self {
47        Self {
48            submitter: Arc::clone(&self.submitter),
49            token: self.token.clone(),
50            org_id: self.org_id.clone(),
51            org_name: self.org_name.clone(),
52            project_name: self.project_name.clone(),
53            parent_info: self.parent_info.clone(),
54            span_type: self.span_type,
55            purpose: self.purpose.clone(),
56        }
57    }
58}
59
60#[allow(private_bounds)]
61impl<S: SpanSubmitter> SpanBuilder<S> {
62    pub(crate) fn new(
63        submitter: Arc<S>,
64        token: impl Into<String>,
65        org_id: impl Into<String>,
66    ) -> Self {
67        Self {
68            submitter,
69            token: token.into(),
70            org_id: org_id.into(),
71            org_name: None,
72            project_name: None,
73            parent_info: None,
74            span_type: SpanType::default(),
75            purpose: None,
76        }
77    }
78
79    pub fn org_name(mut self, org_name: impl Into<String>) -> Self {
80        self.org_name = Some(org_name.into());
81        self
82    }
83
84    pub fn project_name(mut self, project_name: impl Into<String>) -> Self {
85        self.project_name = Some(project_name.into());
86        self
87    }
88
89    pub fn parent_info(mut self, parent: ParentSpanInfo) -> Self {
90        self.parent_info = Some(parent);
91        self
92    }
93
94    pub fn span_type(mut self, span_type: SpanType) -> Self {
95        self.span_type = span_type;
96        self
97    }
98
99    pub fn purpose(mut self, purpose: impl Into<String>) -> Self {
100        self.purpose = Some(purpose.into());
101        self
102    }
103
104    pub fn build(self) -> SpanHandle<S> {
105        use std::time::{SystemTime, UNIX_EPOCH};
106
107        // Generate both IDs ONCE at span creation - reused for all flushes
108        let row_id = Uuid::new_v4().to_string();
109        let span_id = Uuid::new_v4().to_string();
110        let start_time = SystemTime::now()
111            .duration_since(UNIX_EPOCH)
112            .map(|d| d.as_secs_f64())
113            .ok();
114
115        SpanHandle {
116            submitter: Arc::clone(&self.submitter),
117            token: self.token,
118            parent_info: self.parent_info,
119            inner: Arc::new(Mutex::new(SpanData {
120                row_id,
121                span_id,
122                has_flushed: false,
123                org_id: self.org_id,
124                org_name: self.org_name,
125                project_name: self.project_name,
126                start_time,
127                span_type: self.span_type,
128                purpose: self.purpose,
129                ..Default::default()
130            })),
131        }
132    }
133}
134
135#[allow(private_bounds)]
136pub struct SpanHandle<S: SpanSubmitter> {
137    submitter: Arc<S>,
138    token: String,
139    parent_info: Option<ParentSpanInfo>,
140    inner: Arc<Mutex<SpanData>>,
141}
142
143impl<S: SpanSubmitter> Clone for SpanHandle<S> {
144    fn clone(&self) -> Self {
145        Self {
146            submitter: Arc::clone(&self.submitter),
147            token: self.token.clone(),
148            parent_info: self.parent_info.clone(),
149            inner: Arc::clone(&self.inner),
150        }
151    }
152}
153
154#[allow(private_bounds)]
155impl<S: SpanSubmitter> SpanHandle<S> {
156    /// Log event data to this span. All fields are optional.
157    /// Multiple calls will merge data (later values overwrite earlier ones).
158    pub async fn log(&self, event: SpanLog) {
159        let mut inner = self.inner.lock().await;
160
161        if let Some(name) = event.name {
162            inner.name = Some(name);
163        }
164        if let Some(input) = event.input {
165            inner.input = Some(input);
166        }
167        if let Some(output) = event.output {
168            inner.output = Some(output);
169        }
170        if let Some(metadata) = event.metadata {
171            for (key, value) in metadata {
172                inner.metadata.insert(key, value);
173            }
174        }
175        if let Some(metrics) = event.metrics {
176            for (key, value) in metrics {
177                inner.metrics.insert(key, value);
178            }
179        }
180    }
181
182    /// Flush span data to Braintrust. Can be called multiple times - last writer wins.
183    /// Each call updates the same span (same row_id and span_id).
184    /// First flush sends with is_merge=false (replace), subsequent flushes send is_merge=true (merge).
185    pub async fn flush(&self) -> Result<()> {
186        use std::time::{SystemTime, UNIX_EPOCH};
187
188        // Capture end time and add start/end to metrics
189        let end_time = SystemTime::now()
190            .duration_since(UNIX_EPOCH)
191            .map(|d| d.as_secs_f64())
192            .ok();
193
194        let payload: SpanPayload = {
195            let mut inner = self.inner.lock().await;
196            if let Some(start) = inner.start_time {
197                inner.metrics.insert("start".to_string(), start);
198            }
199            if let Some(end) = end_time {
200                inner.metrics.insert("end".to_string(), end);
201            }
202
203            // Create payload from current state (captures has_flushed for is_merge)
204            let payload: SpanPayload = inner.clone().into();
205
206            // Mark as flushed for subsequent calls
207            inner.has_flushed = true;
208
209            payload
210        };
211
212        self.submitter
213            .submit(self.token.clone(), payload, self.parent_info.clone())
214            .await
215    }
216}
217
218#[derive(Clone, Default)]
219struct SpanData {
220    row_id: String,
221    span_id: String,
222    has_flushed: bool,
223    org_id: String,
224    org_name: Option<String>,
225    project_name: Option<String>,
226    name: Option<String>,
227    span_type: SpanType,
228    purpose: Option<String>,
229    input: Option<Value>,
230    output: Option<Value>,
231    metadata: Map<String, Value>,
232    metrics: HashMap<String, f64>,
233    start_time: Option<f64>,
234}
235
236impl From<SpanData> for SpanPayload {
237    fn from(data: SpanData) -> Self {
238        let span_attributes = SpanAttributes {
239            name: data.name,
240            span_type: Some(data.span_type),
241            purpose: data.purpose,
242            extra: HashMap::new(),
243        };
244
245        // Only include span_attributes if it has meaningful content
246        let has_attributes = span_attributes.name.is_some()
247            || span_attributes.span_type.is_some()
248            || span_attributes.purpose.is_some();
249
250        Self {
251            row_id: data.row_id,
252            span_id: data.span_id,
253            is_merge: data.has_flushed, // First flush = false (replace), subsequent = true (merge)
254            org_id: data.org_id,
255            org_name: data.org_name,
256            project_name: data.project_name,
257            input: data.input,
258            output: data.output,
259            metadata: (!data.metadata.is_empty()).then_some(data.metadata),
260            metrics: (!data.metrics.is_empty()).then_some(data.metrics),
261            span_attributes: has_attributes.then_some(span_attributes),
262        }
263    }
264}
265
266#[cfg(test)]
267mod tests {
268    use super::*;
269    use crate::test_utils::{build_test_span, mock_span_builder};
270    use crate::types::{usage_metrics_to_map, UsageMetrics};
271    use serde_json::json;
272
273    #[tokio::test]
274    async fn span_logs_input_and_output() {
275        let (span, collector) = build_test_span();
276        span.log(SpanLog {
277            input: Some(json!({"input": "hello"})),
278            output: Some(json!({"output": "world"})),
279            ..Default::default()
280        })
281        .await;
282        span.flush().await.expect("flush");
283
284        let spans = collector.spans();
285        assert_eq!(spans.len(), 1);
286        let captured = &spans[0];
287        assert!(captured.payload.input.is_some());
288        assert!(captured.payload.output.is_some());
289    }
290
291    #[tokio::test]
292    async fn span_logs_metadata_and_metrics() {
293        let (span, collector) = build_test_span();
294        span.log(SpanLog {
295            metadata: Some([("foo".to_string(), json!("bar"))].into_iter().collect()),
296            metrics: Some(usage_metrics_to_map(UsageMetrics {
297                prompt_tokens: Some(10),
298                completion_tokens: Some(5),
299                total_tokens: Some(15),
300                reasoning_tokens: None,
301                ..Default::default()
302            })),
303            ..Default::default()
304        })
305        .await;
306        span.flush().await.expect("flush");
307
308        let captured = collector.spans().into_iter().next().unwrap();
309        let metadata = captured.payload.metadata.unwrap();
310        assert_eq!(metadata.get("foo").unwrap(), "bar");
311        let metrics = captured.payload.metrics.unwrap();
312        assert_eq!(metrics.get("prompt_tokens").copied(), Some(10.0));
313        assert_eq!(metrics.get("completion_tokens").copied(), Some(5.0));
314        assert_eq!(metrics.get("tokens").copied(), Some(15.0));
315    }
316
317    #[tokio::test]
318    async fn builder_applies_project_and_parent_info() {
319        let (builder, collector) = mock_span_builder();
320        let span = builder
321            .project_name("demo-project")
322            .parent_info(ParentSpanInfo::ProjectLogs {
323                object_id: "proj-id".into(),
324            })
325            .build();
326        span.log(SpanLog {
327            input: Some(json!("data")),
328            ..Default::default()
329        })
330        .await;
331        span.flush().await.expect("flush");
332
333        let captured = collector.spans().into_iter().next().unwrap();
334        assert_eq!(
335            captured.payload.project_name.as_deref().expect("project"),
336            "demo-project"
337        );
338        assert!(matches!(
339            captured.parent,
340            Some(ParentSpanInfo::ProjectLogs { .. })
341        ));
342    }
343}