Skip to main content

synth_ai_core/tracing/
storage.rs

1//! Trace storage trait definition.
2//!
3//! This module defines the abstract `TraceStorage` trait that storage
4//! backends must implement.
5
6use async_trait::async_trait;
7use chrono::{DateTime, Utc};
8use serde_json::Value;
9
10use super::error::TracingError;
11use super::models::{
12    EventReward, MarkovBlanketMessage, OutcomeReward, SessionTimeStep, SessionTrace, TracingEvent,
13};
14
15/// Abstract storage backend for traces.
16///
17/// Implementations can store traces in SQLite, Turso, or other backends.
18#[async_trait]
19pub trait TraceStorage: Send + Sync {
20    // ========================================================================
21    // INITIALIZATION
22    // ========================================================================
23
24    /// Initialize the storage backend (create tables, etc.).
25    async fn initialize(&self) -> Result<(), TracingError>;
26
27    /// Close the storage connection.
28    async fn close(&self) -> Result<(), TracingError>;
29
30    // ========================================================================
31    // SESSION OPERATIONS
32    // ========================================================================
33
34    /// Ensure a session exists (create if not present).
35    ///
36    /// This is idempotent - calling multiple times with the same session_id
37    /// will not create duplicates.
38    async fn ensure_session(
39        &self,
40        session_id: &str,
41        created_at: DateTime<Utc>,
42        metadata: &Value,
43    ) -> Result<(), TracingError>;
44
45    /// Get a session trace by ID.
46    async fn get_session(&self, session_id: &str) -> Result<Option<SessionTrace>, TracingError>;
47
48    /// Delete a session and all related data.
49    async fn delete_session(&self, session_id: &str) -> Result<bool, TracingError>;
50
51    // ========================================================================
52    // TIMESTEP OPERATIONS
53    // ========================================================================
54
55    /// Ensure a timestep exists and return its database ID.
56    ///
57    /// This is idempotent - calling multiple times with the same session_id
58    /// and step_id will return the same database ID.
59    async fn ensure_timestep(
60        &self,
61        session_id: &str,
62        step: &SessionTimeStep,
63    ) -> Result<i64, TracingError>;
64
65    /// Update a timestep (e.g., mark as completed).
66    async fn update_timestep(
67        &self,
68        session_id: &str,
69        step_id: &str,
70        completed_at: Option<DateTime<Utc>>,
71    ) -> Result<(), TracingError>;
72
73    // ========================================================================
74    // EVENT OPERATIONS
75    // ========================================================================
76
77    /// Insert an event and return its database ID.
78    async fn insert_event(
79        &self,
80        session_id: &str,
81        timestep_db_id: Option<i64>,
82        event: &TracingEvent,
83    ) -> Result<i64, TracingError>;
84
85    // ========================================================================
86    // MESSAGE OPERATIONS
87    // ========================================================================
88
89    /// Insert a message and return its database ID.
90    async fn insert_message(
91        &self,
92        session_id: &str,
93        timestep_db_id: Option<i64>,
94        msg: &MarkovBlanketMessage,
95    ) -> Result<i64, TracingError>;
96
97    // ========================================================================
98    // REWARD OPERATIONS
99    // ========================================================================
100
101    /// Insert an outcome (session-level) reward.
102    async fn insert_outcome_reward(
103        &self,
104        session_id: &str,
105        reward: &OutcomeReward,
106    ) -> Result<i64, TracingError>;
107
108    /// Insert an event-level reward.
109    async fn insert_event_reward(
110        &self,
111        session_id: &str,
112        event_id: i64,
113        message_id: Option<i64>,
114        turn_number: Option<i32>,
115        reward: &EventReward,
116    ) -> Result<i64, TracingError>;
117
118    // ========================================================================
119    // QUERY OPERATIONS
120    // ========================================================================
121
122    /// Execute a raw SQL query and return results as JSON values.
123    async fn query(&self, sql: &str, params: QueryParams) -> Result<Vec<Value>, TracingError>;
124
125    /// Update session counts (num_timesteps, num_events, num_messages).
126    async fn update_session_counts(&self, session_id: &str) -> Result<(), TracingError>;
127}
128
129/// Query parameters for trace storage queries.
130#[derive(Debug, Clone, Default)]
131pub enum QueryParams {
132    /// No parameters provided.
133    #[default]
134    None,
135    /// Positional parameters (e.g., ?1, ?2).
136    Positional(Vec<Value>),
137    /// Named parameters (e.g., :session_id).
138    Named(Vec<(String, Value)>),
139}
140
141impl QueryParams {
142    /// Returns true if there are no parameters.
143    pub fn is_empty(&self) -> bool {
144        matches!(self, QueryParams::None)
145    }
146}
147
148impl From<Vec<Value>> for QueryParams {
149    fn from(values: Vec<Value>) -> Self {
150        if values.is_empty() {
151            QueryParams::None
152        } else {
153            QueryParams::Positional(values)
154        }
155    }
156}
157
158impl From<Vec<(String, Value)>> for QueryParams {
159    fn from(values: Vec<(String, Value)>) -> Self {
160        if values.is_empty() {
161            QueryParams::None
162        } else {
163            QueryParams::Named(values)
164        }
165    }
166}
167
168/// Storage configuration.
169#[derive(Debug, Clone)]
170pub struct StorageConfig {
171    /// Database URL (file path, :memory:, or libsql:// URL)
172    pub db_url: String,
173    /// Auth token for remote databases
174    pub auth_token: Option<String>,
175    /// Whether to auto-initialize schema
176    pub auto_init: bool,
177}
178
179impl Default for StorageConfig {
180    fn default() -> Self {
181        Self {
182            db_url: ":memory:".to_string(),
183            auth_token: None,
184            auto_init: true,
185        }
186    }
187}
188
189impl StorageConfig {
190    /// Create config for in-memory database.
191    pub fn memory() -> Self {
192        Self::default()
193    }
194
195    /// Create config for a file-based database.
196    pub fn file(path: impl Into<String>) -> Self {
197        Self {
198            db_url: path.into(),
199            auth_token: None,
200            auto_init: true,
201        }
202    }
203
204    /// Create config for a remote Turso database.
205    pub fn turso(url: impl Into<String>, token: impl Into<String>) -> Self {
206        Self {
207            db_url: url.into(),
208            auth_token: Some(token.into()),
209            auto_init: true,
210        }
211    }
212}
213
214#[cfg(test)]
215mod tests {
216    use super::*;
217
218    #[test]
219    fn test_storage_config_default() {
220        let config = StorageConfig::default();
221        assert_eq!(config.db_url, ":memory:");
222        assert!(config.auth_token.is_none());
223        assert!(config.auto_init);
224    }
225
226    #[test]
227    fn test_storage_config_file() {
228        let config = StorageConfig::file("/tmp/test.db");
229        assert_eq!(config.db_url, "/tmp/test.db");
230    }
231
232    #[test]
233    fn test_storage_config_turso() {
234        let config = StorageConfig::turso("libsql://test.turso.io", "token123");
235        assert_eq!(config.db_url, "libsql://test.turso.io");
236        assert_eq!(config.auth_token, Some("token123".to_string()));
237    }
238}