Skip to main content

briefcase_core/storage/
mod.rs

1use crate::models::{DecisionSnapshot, Snapshot};
2#[cfg(feature = "async")]
3use async_trait::async_trait;
4use chrono::{DateTime, Utc};
5use std::collections::HashMap;
6use thiserror::Error;
7
8#[cfg(feature = "lakefs-storage")]
9pub mod lakefs;
10#[cfg(feature = "sqlite-storage")]
11pub mod sqlite;
12pub mod sync;
13
14#[cfg(feature = "lakefs-storage")]
15pub use lakefs::{LakeFSBackend, LakeFSConfig};
16#[cfg(feature = "sqlite-storage")]
17pub use sqlite::SqliteBackend;
18#[cfg(feature = "sqlite-storage")]
19pub use sync::SyncSqliteBackend;
20pub use sync::{MemoryStorageBackend, SyncStorageBackend};
21
22#[cfg(feature = "async")]
23#[async_trait]
24pub trait StorageBackend: Send + Sync {
25    /// Save a snapshot, return its ID
26    async fn save(&self, snapshot: &Snapshot) -> Result<String, StorageError>;
27
28    /// Save a single decision snapshot
29    async fn save_decision(&self, decision: &DecisionSnapshot) -> Result<String, StorageError>;
30
31    /// Load a snapshot by ID
32    async fn load(&self, snapshot_id: &str) -> Result<Snapshot, StorageError>;
33
34    /// Load a decision by ID
35    async fn load_decision(&self, decision_id: &str) -> Result<DecisionSnapshot, StorageError>;
36
37    /// Query snapshots with filters
38    async fn query(&self, query: SnapshotQuery) -> Result<Vec<Snapshot>, StorageError>;
39
40    /// Delete a snapshot
41    async fn delete(&self, snapshot_id: &str) -> Result<bool, StorageError>;
42
43    /// Flush pending writes (for batching backends)
44    async fn flush(&self) -> Result<FlushResult, StorageError>;
45
46    /// Check health/connectivity
47    async fn health_check(&self) -> Result<bool, StorageError>;
48}
49
50#[derive(Debug, Clone, Default)]
51pub struct SnapshotQuery {
52    pub function_name: Option<String>,
53    pub module_name: Option<String>,
54    pub model_name: Option<String>,
55    pub start_time: Option<DateTime<Utc>>,
56    pub end_time: Option<DateTime<Utc>>,
57    pub tags: Option<HashMap<String, String>>,
58    pub limit: Option<usize>,
59    pub offset: Option<usize>,
60}
61
62impl SnapshotQuery {
63    pub fn new() -> Self {
64        Self::default()
65    }
66
67    pub fn with_function_name(mut self, function_name: impl Into<String>) -> Self {
68        self.function_name = Some(function_name.into());
69        self
70    }
71
72    pub fn with_module_name(mut self, module_name: impl Into<String>) -> Self {
73        self.module_name = Some(module_name.into());
74        self
75    }
76
77    pub fn with_model_name(mut self, model_name: impl Into<String>) -> Self {
78        self.model_name = Some(model_name.into());
79        self
80    }
81
82    pub fn with_time_range(mut self, start: DateTime<Utc>, end: DateTime<Utc>) -> Self {
83        self.start_time = Some(start);
84        self.end_time = Some(end);
85        self
86    }
87
88    pub fn with_tag(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
89        if self.tags.is_none() {
90            self.tags = Some(HashMap::new());
91        }
92        self.tags.as_mut().unwrap().insert(key.into(), value.into());
93        self
94    }
95
96    pub fn with_limit(mut self, limit: usize) -> Self {
97        self.limit = Some(limit);
98        self
99    }
100
101    pub fn with_offset(mut self, offset: usize) -> Self {
102        self.offset = Some(offset);
103        self
104    }
105}
106
107#[derive(Debug, Clone)]
108pub struct FlushResult {
109    pub snapshots_written: usize,
110    pub bytes_written: usize,
111    pub checkpoint_id: Option<String>, // LakeFS commit ID
112}
113
114#[derive(Error, Debug, Clone, PartialEq)]
115pub enum StorageError {
116    #[error("Not found: {0}")]
117    NotFound(String),
118    #[error("Connection error: {0}")]
119    ConnectionError(String),
120    #[error("Serialization error: {0}")]
121    SerializationError(String),
122    #[error("Permission denied: {0}")]
123    PermissionDenied(String),
124    #[error("Quota exceeded")]
125    QuotaExceeded,
126    #[error("Invalid query: {0}")]
127    InvalidQuery(String),
128    #[error("IO error: {0}")]
129    IoError(String),
130}
131
132impl From<serde_json::Error> for StorageError {
133    fn from(err: serde_json::Error) -> Self {
134        StorageError::SerializationError(err.to_string())
135    }
136}
137
138impl From<std::io::Error> for StorageError {
139    fn from(err: std::io::Error) -> Self {
140        StorageError::IoError(err.to_string())
141    }
142}