1use std::sync::atomic::{AtomicU64, Ordering};
6use std::sync::Mutex;
7
8use sha2::{Digest, Sha256};
9use trueno_db::experiment::{
10 ExperimentRecord, ExperimentStore, MetricRecord, RunRecord, RunStatus as TruenoRunStatus,
11};
12
13use super::{ExperimentStorage, MetricPoint, Result, RunStatus, StorageError};
14
15pub struct TruenoBackend {
20 store: Mutex<ExperimentStore>,
21 next_exp_id: AtomicU64,
22 next_run_id: AtomicU64,
23}
24
25impl std::fmt::Debug for TruenoBackend {
26 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
27 f.debug_struct("TruenoBackend")
28 .field("next_exp_id", &self.next_exp_id)
29 .field("next_run_id", &self.next_run_id)
30 .finish_non_exhaustive()
31 }
32}
33
34impl TruenoBackend {
35 pub fn new() -> Self {
40 Self {
41 store: Mutex::new(ExperimentStore::new()),
42 next_exp_id: AtomicU64::new(0),
43 next_run_id: AtomicU64::new(0),
44 }
45 }
46
47 #[allow(unused_variables)]
52 pub fn open(path: impl AsRef<std::path::Path>) -> Result<Self> {
53 Ok(Self::new())
54 }
55
56 pub fn experiment_count(&self) -> usize {
58 self.store.lock().unwrap_or_else(std::sync::PoisonError::into_inner).experiment_count()
59 }
60
61 pub fn run_count(&self) -> usize {
63 self.store.lock().unwrap_or_else(std::sync::PoisonError::into_inner).run_count()
64 }
65
66 fn compute_hash(data: &[u8]) -> String {
68 let mut hasher = Sha256::new();
69 hasher.update(data);
70 let result = hasher.finalize();
71 format!("sha256-{}", hex::encode(result.get(..16).unwrap_or(&result)))
72 }
73
74 fn to_trueno_status(status: RunStatus) -> TruenoRunStatus {
76 match status {
77 RunStatus::Pending => TruenoRunStatus::Pending,
78 RunStatus::Running => TruenoRunStatus::Running,
79 RunStatus::Success => TruenoRunStatus::Success,
80 RunStatus::Failed => TruenoRunStatus::Failed,
81 RunStatus::Cancelled => TruenoRunStatus::Cancelled,
82 }
83 }
84
85 fn from_trueno_status(status: TruenoRunStatus) -> RunStatus {
87 match status {
88 TruenoRunStatus::Pending => RunStatus::Pending,
89 TruenoRunStatus::Running => RunStatus::Running,
90 TruenoRunStatus::Success => RunStatus::Success,
91 TruenoRunStatus::Failed => RunStatus::Failed,
92 TruenoRunStatus::Cancelled => RunStatus::Cancelled,
93 }
94 }
95}
96
97impl Default for TruenoBackend {
98 fn default() -> Self {
99 Self::new()
100 }
101}
102
103impl ExperimentStorage for TruenoBackend {
104 fn create_experiment(
105 &mut self,
106 name: &str,
107 config: Option<serde_json::Value>,
108 ) -> Result<String> {
109 let id = self.next_exp_id.fetch_add(1, Ordering::SeqCst);
110 let exp_id = format!("exp-{id}");
111
112 let record = if let Some(cfg) = config {
113 ExperimentRecord::builder(&exp_id, name).config(cfg).build()
114 } else {
115 ExperimentRecord::new(&exp_id, name)
116 };
117
118 self.store.lock().unwrap_or_else(std::sync::PoisonError::into_inner).add_experiment(record);
119
120 Ok(exp_id)
121 }
122
123 fn create_run(&mut self, experiment_id: &str) -> Result<String> {
124 let store = self.store.lock().unwrap_or_else(std::sync::PoisonError::into_inner);
125 if store.get_experiment(experiment_id).is_none() {
126 return Err(StorageError::ExperimentNotFound(experiment_id.to_string()));
127 }
128 drop(store);
129
130 let id = self.next_run_id.fetch_add(1, Ordering::SeqCst);
131 let run_id = format!("run-{id}");
132
133 let record = RunRecord::new(&run_id, experiment_id);
135 self.store.lock().unwrap_or_else(std::sync::PoisonError::into_inner).add_run(record);
136
137 Ok(run_id)
138 }
139
140 fn start_run(&mut self, run_id: &str) -> Result<()> {
141 let mut store = self.store.lock().unwrap_or_else(std::sync::PoisonError::into_inner);
142 let run =
143 store.get_run(run_id).ok_or_else(|| StorageError::RunNotFound(run_id.to_string()))?;
144
145 if run.status() != TruenoRunStatus::Pending {
146 return Err(StorageError::InvalidState(format!(
147 "Run {run_id} is not in Pending state"
148 )));
149 }
150
151 let mut new_record = RunRecord::new(run_id, run.experiment_id());
153 new_record.start();
154
155 store.add_run(new_record);
157
158 Ok(())
159 }
160
161 fn complete_run(&mut self, run_id: &str, status: RunStatus) -> Result<()> {
162 let mut store = self.store.lock().unwrap_or_else(std::sync::PoisonError::into_inner);
163 let run =
164 store.get_run(run_id).ok_or_else(|| StorageError::RunNotFound(run_id.to_string()))?;
165
166 if run.status() != TruenoRunStatus::Running {
167 return Err(StorageError::InvalidState(format!(
168 "Run {run_id} is not in Running state"
169 )));
170 }
171
172 let mut new_record = RunRecord::new(run_id, run.experiment_id());
174 new_record.start();
175 new_record.complete(Self::to_trueno_status(status));
176
177 store.add_run(new_record);
178
179 Ok(())
180 }
181
182 fn log_metric(&mut self, run_id: &str, key: &str, step: u64, value: f64) -> Result<()> {
183 let store = self.store.lock().unwrap_or_else(std::sync::PoisonError::into_inner);
184 if store.get_run(run_id).is_none() {
185 return Err(StorageError::RunNotFound(run_id.to_string()));
186 }
187 drop(store);
188
189 let metric = MetricRecord::new(run_id, key, step, value);
190 self.store.lock().unwrap_or_else(std::sync::PoisonError::into_inner).add_metric(metric);
191
192 Ok(())
193 }
194
195 fn log_artifact(&mut self, run_id: &str, key: &str, data: &[u8]) -> Result<String> {
196 let store = self.store.lock().unwrap_or_else(std::sync::PoisonError::into_inner);
197 if store.get_run(run_id).is_none() {
198 return Err(StorageError::RunNotFound(run_id.to_string()));
199 }
200 drop(store);
201
202 let hash = Self::compute_hash(data);
204 let _ = key; Ok(hash)
207 }
208
209 fn get_metrics(&self, run_id: &str, key: &str) -> Result<Vec<MetricPoint>> {
210 let store = self.store.lock().unwrap_or_else(std::sync::PoisonError::into_inner);
211 if store.get_run(run_id).is_none() {
212 return Err(StorageError::RunNotFound(run_id.to_string()));
213 }
214
215 let metrics = store.get_metrics_for_run(run_id, key);
216
217 Ok(metrics
218 .into_iter()
219 .map(|m| MetricPoint::with_timestamp(m.step(), m.value(), m.timestamp()))
220 .collect())
221 }
222
223 fn get_run_status(&self, run_id: &str) -> Result<RunStatus> {
224 let store = self.store.lock().unwrap_or_else(std::sync::PoisonError::into_inner);
225 let run =
226 store.get_run(run_id).ok_or_else(|| StorageError::RunNotFound(run_id.to_string()))?;
227
228 Ok(Self::from_trueno_status(run.status()))
229 }
230
231 fn set_span_id(&mut self, run_id: &str, span_id: &str) -> Result<()> {
232 let mut store = self.store.lock().unwrap_or_else(std::sync::PoisonError::into_inner);
233 let run =
234 store.get_run(run_id).ok_or_else(|| StorageError::RunNotFound(run_id.to_string()))?;
235
236 let current_status = run.status();
237 let experiment_id = run.experiment_id().to_string();
238
239 let mut new_record =
241 RunRecord::builder(run_id, &experiment_id).renacer_span_id(span_id).build();
242
243 match current_status {
245 TruenoRunStatus::Running => {
246 new_record.start();
247 }
248 TruenoRunStatus::Success | TruenoRunStatus::Failed | TruenoRunStatus::Cancelled => {
249 new_record.start();
250 new_record.complete(current_status);
251 }
252 TruenoRunStatus::Pending => {
253 }
255 }
256
257 store.add_run(new_record);
258
259 Ok(())
260 }
261
262 fn get_span_id(&self, run_id: &str) -> Result<Option<String>> {
263 let store = self.store.lock().unwrap_or_else(std::sync::PoisonError::into_inner);
264 let run =
265 store.get_run(run_id).ok_or_else(|| StorageError::RunNotFound(run_id.to_string()))?;
266
267 Ok(run.renacer_span_id().map(String::from))
268 }
269}
270
271#[cfg(test)]
272mod tests {
273 use super::*;
274
275 #[test]
276 fn test_trueno_backend_new() {
277 let backend = TruenoBackend::new();
278 assert_eq!(backend.experiment_count(), 0);
279 assert_eq!(backend.run_count(), 0);
280 }
281
282 #[test]
283 fn test_trueno_backend_open() {
284 let backend = TruenoBackend::open("/tmp/test.trueno").expect("operation should succeed");
285 assert_eq!(backend.experiment_count(), 0);
286 }
287
288 #[test]
289 fn test_trueno_create_experiment() {
290 let mut backend = TruenoBackend::new();
291 let exp_id = backend.create_experiment("test-exp", None).expect("operation should succeed");
292
293 assert!(exp_id.starts_with("exp-"));
294 assert_eq!(backend.experiment_count(), 1);
295 }
296
297 #[test]
298 fn test_trueno_create_experiment_with_config() {
299 let mut backend = TruenoBackend::new();
300 let config = serde_json::json!({"batch_size": 32});
301 let exp_id =
302 backend.create_experiment("test-exp", Some(config)).expect("config should be valid");
303
304 assert!(exp_id.starts_with("exp-"));
305 }
306
307 #[test]
308 fn test_trueno_create_run() {
309 let mut backend = TruenoBackend::new();
310 let exp_id = backend.create_experiment("test-exp", None).expect("operation should succeed");
311 let run_id = backend.create_run(&exp_id).expect("operation should succeed");
312
313 assert!(run_id.starts_with("run-"));
314 assert_eq!(backend.run_count(), 1);
315 assert_eq!(
316 backend.get_run_status(&run_id).expect("operation should succeed"),
317 RunStatus::Pending
318 );
319 }
320
321 #[test]
322 fn test_trueno_create_run_invalid_experiment() {
323 let mut backend = TruenoBackend::new();
324 let result = backend.create_run("fake-exp");
325
326 assert!(result.is_err());
327 }
328
329 #[test]
330 fn test_trueno_start_run() {
331 let mut backend = TruenoBackend::new();
332 let exp_id = backend.create_experiment("test-exp", None).expect("operation should succeed");
333 let run_id = backend.create_run(&exp_id).expect("operation should succeed");
334
335 backend.start_run(&run_id).expect("operation should succeed");
336 assert_eq!(
337 backend.get_run_status(&run_id).expect("operation should succeed"),
338 RunStatus::Running
339 );
340 }
341
342 #[test]
343 fn test_trueno_complete_run() {
344 let mut backend = TruenoBackend::new();
345 let exp_id = backend.create_experiment("test-exp", None).expect("operation should succeed");
346 let run_id = backend.create_run(&exp_id).expect("operation should succeed");
347
348 backend.start_run(&run_id).expect("operation should succeed");
349 backend.complete_run(&run_id, RunStatus::Success).expect("operation should succeed");
350
351 assert_eq!(
352 backend.get_run_status(&run_id).expect("operation should succeed"),
353 RunStatus::Success
354 );
355 }
356
357 #[test]
358 fn test_trueno_log_metric() {
359 let mut backend = TruenoBackend::new();
360 let exp_id = backend.create_experiment("test-exp", None).expect("operation should succeed");
361 let run_id = backend.create_run(&exp_id).expect("operation should succeed");
362
363 backend.log_metric(&run_id, "loss", 0, 0.5).expect("operation should succeed");
364 backend.log_metric(&run_id, "loss", 1, 0.4).expect("operation should succeed");
365
366 let metrics = backend.get_metrics(&run_id, "loss").expect("operation should succeed");
367 assert_eq!(metrics.len(), 2);
368 assert_eq!(metrics[0].step, 0);
369 assert_eq!(metrics[1].step, 1);
370 }
371
372 #[test]
373 fn test_trueno_log_metric_invalid_run() {
374 let mut backend = TruenoBackend::new();
375 let result = backend.log_metric("fake-run", "loss", 0, 0.5);
376
377 assert!(result.is_err());
378 }
379
380 #[test]
381 fn test_trueno_log_artifact() {
382 let mut backend = TruenoBackend::new();
383 let exp_id = backend.create_experiment("test-exp", None).expect("operation should succeed");
384 let run_id = backend.create_run(&exp_id).expect("operation should succeed");
385
386 let hash = backend
387 .log_artifact(&run_id, "model.safetensors", b"model data")
388 .expect("operation should succeed");
389
390 assert!(hash.starts_with("sha256-"));
391 }
392
393 #[test]
394 fn test_trueno_get_run_not_found() {
395 let backend = TruenoBackend::new();
396 let result = backend.get_run_status("fake-run");
397
398 assert!(result.is_err());
399 }
400
401 #[test]
402 fn test_trueno_complete_run_failed() {
403 let mut backend = TruenoBackend::new();
404 let exp_id = backend.create_experiment("test-exp", None).expect("operation should succeed");
405 let run_id = backend.create_run(&exp_id).expect("operation should succeed");
406
407 backend.start_run(&run_id).expect("operation should succeed");
408 backend.complete_run(&run_id, RunStatus::Failed).expect("operation should succeed");
409
410 assert_eq!(
411 backend.get_run_status(&run_id).expect("operation should succeed"),
412 RunStatus::Failed
413 );
414 }
415
416 #[test]
417 fn test_trueno_complete_run_cancelled() {
418 let mut backend = TruenoBackend::new();
419 let exp_id = backend.create_experiment("test-exp", None).expect("operation should succeed");
420 let run_id = backend.create_run(&exp_id).expect("operation should succeed");
421
422 backend.start_run(&run_id).expect("operation should succeed");
423 backend.complete_run(&run_id, RunStatus::Cancelled).expect("operation should succeed");
424
425 assert_eq!(
426 backend.get_run_status(&run_id).expect("operation should succeed"),
427 RunStatus::Cancelled
428 );
429 }
430
431 #[test]
432 fn test_trueno_set_and_get_span_id() {
433 let mut backend = TruenoBackend::new();
434 let exp_id = backend.create_experiment("test-exp", None).expect("operation should succeed");
435 let run_id = backend.create_run(&exp_id).expect("operation should succeed");
436
437 backend.set_span_id(&run_id, "span-abc123").expect("operation should succeed");
438
439 assert_eq!(
440 backend.get_span_id(&run_id).expect("operation should succeed"),
441 Some("span-abc123".to_string())
442 );
443 }
444}