1use std::collections::HashMap;
2use std::sync::Arc;
3
4use async_trait::async_trait;
5use serde_json::{Map, Value};
6use tokio::sync::Mutex;
7use uuid::Uuid;
8
9use crate::error::Result;
10use crate::types::{ParentSpanInfo, SpanAttributes, SpanPayload, SpanType};
11
12#[derive(Clone, Default)]
15pub struct SpanLog {
16 pub name: Option<String>,
17 pub input: Option<Value>,
18 pub output: Option<Value>,
19 pub metadata: Option<Map<String, Value>>,
20 pub metrics: Option<HashMap<String, f64>>,
21}
22
23#[async_trait]
24pub(crate) trait SpanSubmitter: Send + Sync {
25 async fn submit(
26 &self,
27 token: impl Into<String> + Send,
28 payload: SpanPayload,
29 parent_info: Option<ParentSpanInfo>,
30 ) -> Result<()>;
31}
32
33#[allow(private_bounds)]
34pub struct SpanBuilder<S: SpanSubmitter> {
35 submitter: Arc<S>,
36 token: String,
37 org_id: String,
38 org_name: Option<String>,
39 project_name: Option<String>,
40 parent_info: Option<ParentSpanInfo>,
41 span_type: SpanType,
42 purpose: Option<String>,
43}
44
45impl<S: SpanSubmitter> Clone for SpanBuilder<S> {
46 fn clone(&self) -> Self {
47 Self {
48 submitter: Arc::clone(&self.submitter),
49 token: self.token.clone(),
50 org_id: self.org_id.clone(),
51 org_name: self.org_name.clone(),
52 project_name: self.project_name.clone(),
53 parent_info: self.parent_info.clone(),
54 span_type: self.span_type,
55 purpose: self.purpose.clone(),
56 }
57 }
58}
59
60#[allow(private_bounds)]
61impl<S: SpanSubmitter> SpanBuilder<S> {
62 pub(crate) fn new(
63 submitter: Arc<S>,
64 token: impl Into<String>,
65 org_id: impl Into<String>,
66 ) -> Self {
67 Self {
68 submitter,
69 token: token.into(),
70 org_id: org_id.into(),
71 org_name: None,
72 project_name: None,
73 parent_info: None,
74 span_type: SpanType::default(),
75 purpose: None,
76 }
77 }
78
79 pub fn org_name(mut self, org_name: impl Into<String>) -> Self {
80 self.org_name = Some(org_name.into());
81 self
82 }
83
84 pub fn project_name(mut self, project_name: impl Into<String>) -> Self {
85 self.project_name = Some(project_name.into());
86 self
87 }
88
89 pub fn parent_info(mut self, parent: ParentSpanInfo) -> Self {
90 self.parent_info = Some(parent);
91 self
92 }
93
94 pub fn span_type(mut self, span_type: SpanType) -> Self {
95 self.span_type = span_type;
96 self
97 }
98
99 pub fn purpose(mut self, purpose: impl Into<String>) -> Self {
100 self.purpose = Some(purpose.into());
101 self
102 }
103
104 pub fn build(self) -> SpanHandle<S> {
105 use std::time::{SystemTime, UNIX_EPOCH};
106
107 let row_id = Uuid::new_v4().to_string();
109 let span_id = Uuid::new_v4().to_string();
110 let start_time = SystemTime::now()
111 .duration_since(UNIX_EPOCH)
112 .map(|d| d.as_secs_f64())
113 .ok();
114
115 SpanHandle {
116 submitter: Arc::clone(&self.submitter),
117 token: self.token,
118 parent_info: self.parent_info,
119 inner: Arc::new(Mutex::new(SpanData {
120 row_id,
121 span_id,
122 has_flushed: false,
123 org_id: self.org_id,
124 org_name: self.org_name,
125 project_name: self.project_name,
126 start_time,
127 span_type: self.span_type,
128 purpose: self.purpose,
129 ..Default::default()
130 })),
131 }
132 }
133}
134
135#[allow(private_bounds)]
136pub struct SpanHandle<S: SpanSubmitter> {
137 submitter: Arc<S>,
138 token: String,
139 parent_info: Option<ParentSpanInfo>,
140 inner: Arc<Mutex<SpanData>>,
141}
142
143impl<S: SpanSubmitter> Clone for SpanHandle<S> {
144 fn clone(&self) -> Self {
145 Self {
146 submitter: Arc::clone(&self.submitter),
147 token: self.token.clone(),
148 parent_info: self.parent_info.clone(),
149 inner: Arc::clone(&self.inner),
150 }
151 }
152}
153
154#[allow(private_bounds)]
155impl<S: SpanSubmitter> SpanHandle<S> {
156 pub async fn log(&self, event: SpanLog) {
159 let mut inner = self.inner.lock().await;
160
161 if let Some(name) = event.name {
162 inner.name = Some(name);
163 }
164 if let Some(input) = event.input {
165 inner.input = Some(input);
166 }
167 if let Some(output) = event.output {
168 inner.output = Some(output);
169 }
170 if let Some(metadata) = event.metadata {
171 for (key, value) in metadata {
172 inner.metadata.insert(key, value);
173 }
174 }
175 if let Some(metrics) = event.metrics {
176 for (key, value) in metrics {
177 inner.metrics.insert(key, value);
178 }
179 }
180 }
181
182 pub async fn flush(&self) -> Result<()> {
186 use std::time::{SystemTime, UNIX_EPOCH};
187
188 let end_time = SystemTime::now()
190 .duration_since(UNIX_EPOCH)
191 .map(|d| d.as_secs_f64())
192 .ok();
193
194 let payload: SpanPayload = {
195 let mut inner = self.inner.lock().await;
196 if let Some(start) = inner.start_time {
197 inner.metrics.insert("start".to_string(), start);
198 }
199 if let Some(end) = end_time {
200 inner.metrics.insert("end".to_string(), end);
201 }
202
203 let payload: SpanPayload = inner.clone().into();
205
206 inner.has_flushed = true;
208
209 payload
210 };
211
212 self.submitter
213 .submit(self.token.clone(), payload, self.parent_info.clone())
214 .await
215 }
216}
217
218#[derive(Clone, Default)]
219struct SpanData {
220 row_id: String,
221 span_id: String,
222 has_flushed: bool,
223 org_id: String,
224 org_name: Option<String>,
225 project_name: Option<String>,
226 name: Option<String>,
227 span_type: SpanType,
228 purpose: Option<String>,
229 input: Option<Value>,
230 output: Option<Value>,
231 metadata: Map<String, Value>,
232 metrics: HashMap<String, f64>,
233 start_time: Option<f64>,
234}
235
236impl From<SpanData> for SpanPayload {
237 fn from(data: SpanData) -> Self {
238 let span_attributes = SpanAttributes {
239 name: data.name,
240 span_type: Some(data.span_type),
241 purpose: data.purpose,
242 extra: HashMap::new(),
243 };
244
245 let has_attributes = span_attributes.name.is_some()
247 || span_attributes.span_type.is_some()
248 || span_attributes.purpose.is_some();
249
250 Self {
251 row_id: data.row_id,
252 span_id: data.span_id,
253 is_merge: data.has_flushed, org_id: data.org_id,
255 org_name: data.org_name,
256 project_name: data.project_name,
257 input: data.input,
258 output: data.output,
259 metadata: (!data.metadata.is_empty()).then_some(data.metadata),
260 metrics: (!data.metrics.is_empty()).then_some(data.metrics),
261 span_attributes: has_attributes.then_some(span_attributes),
262 }
263 }
264}
265
266#[cfg(test)]
267mod tests {
268 use super::*;
269 use crate::test_utils::{build_test_span, mock_span_builder};
270 use crate::types::{usage_metrics_to_map, UsageMetrics};
271 use serde_json::json;
272
273 #[tokio::test]
274 async fn span_logs_input_and_output() {
275 let (span, collector) = build_test_span();
276 span.log(SpanLog {
277 input: Some(json!({"input": "hello"})),
278 output: Some(json!({"output": "world"})),
279 ..Default::default()
280 })
281 .await;
282 span.flush().await.expect("flush");
283
284 let spans = collector.spans();
285 assert_eq!(spans.len(), 1);
286 let captured = &spans[0];
287 assert!(captured.payload.input.is_some());
288 assert!(captured.payload.output.is_some());
289 }
290
291 #[tokio::test]
292 async fn span_logs_metadata_and_metrics() {
293 let (span, collector) = build_test_span();
294 span.log(SpanLog {
295 metadata: Some([("foo".to_string(), json!("bar"))].into_iter().collect()),
296 metrics: Some(usage_metrics_to_map(UsageMetrics {
297 prompt_tokens: Some(10),
298 completion_tokens: Some(5),
299 total_tokens: Some(15),
300 reasoning_tokens: None,
301 ..Default::default()
302 })),
303 ..Default::default()
304 })
305 .await;
306 span.flush().await.expect("flush");
307
308 let captured = collector.spans().into_iter().next().unwrap();
309 let metadata = captured.payload.metadata.unwrap();
310 assert_eq!(metadata.get("foo").unwrap(), "bar");
311 let metrics = captured.payload.metrics.unwrap();
312 assert_eq!(metrics.get("prompt_tokens").copied(), Some(10.0));
313 assert_eq!(metrics.get("completion_tokens").copied(), Some(5.0));
314 assert_eq!(metrics.get("tokens").copied(), Some(15.0));
315 }
316
317 #[tokio::test]
318 async fn builder_applies_project_and_parent_info() {
319 let (builder, collector) = mock_span_builder();
320 let span = builder
321 .project_name("demo-project")
322 .parent_info(ParentSpanInfo::ProjectLogs {
323 object_id: "proj-id".into(),
324 })
325 .build();
326 span.log(SpanLog {
327 input: Some(json!("data")),
328 ..Default::default()
329 })
330 .await;
331 span.flush().await.expect("flush");
332
333 let captured = collector.spans().into_iter().next().unwrap();
334 assert_eq!(
335 captured.payload.project_name.as_deref().expect("project"),
336 "demo-project"
337 );
338 assert!(matches!(
339 captured.parent,
340 Some(ParentSpanInfo::ProjectLogs { .. })
341 ));
342 }
343}