telemetry_kit/
storage.rs

1//! SQLite storage for buffering events
2
3use crate::error::Result;
4use crate::event::Event;
5use chrono::Utc;
6use rusqlite::{params, Connection};
7use std::path::PathBuf;
8
9/// SQLite storage for buffering telemetry events
10pub struct EventStorage {
11    conn: Connection,
12}
13
14// SAFETY: EventStorage is always used behind Arc<RwLock<>> which ensures
15// only one thread accesses the Connection at a time
16unsafe impl Send for EventStorage {}
17unsafe impl Sync for EventStorage {}
18
19impl EventStorage {
20    /// Create a new event storage
21    ///
22    /// Creates the storage directory and initializes the database schema.
23    pub fn new(db_path: impl Into<PathBuf>) -> Result<Self> {
24        let path = db_path.into();
25
26        // Create parent directory if it doesn't exist
27        if let Some(parent) = path.parent() {
28            std::fs::create_dir_all(parent)?;
29        }
30
31        let conn = Connection::open(&path)?;
32
33        let storage = Self { conn };
34        storage.initialize_schema()?;
35
36        Ok(storage)
37    }
38
39    /// Create an in-memory storage (for testing)
40    pub fn in_memory() -> Result<Self> {
41        let conn = Connection::open_in_memory()?;
42        let storage = Self { conn };
43        storage.initialize_schema()?;
44        Ok(storage)
45    }
46
47    /// Initialize the database schema
48    fn initialize_schema(&self) -> Result<()> {
49        self.conn.execute_batch(
50            r#"
51            CREATE TABLE IF NOT EXISTS events (
52                id INTEGER PRIMARY KEY AUTOINCREMENT,
53                event_id TEXT UNIQUE NOT NULL,
54                event_data TEXT NOT NULL,
55                created_at INTEGER NOT NULL,
56                synced_at INTEGER,
57                retry_count INTEGER DEFAULT 0
58            );
59
60            CREATE INDEX IF NOT EXISTS idx_synced_at ON events(synced_at);
61            CREATE INDEX IF NOT EXISTS idx_created_at ON events(created_at);
62            "#,
63        )?;
64
65        Ok(())
66    }
67
68    /// Insert an event into the storage
69    pub fn insert(&self, event: &Event) -> Result<()> {
70        let event_json = serde_json::to_string(event)?;
71        let created_at = Utc::now().timestamp();
72
73        self.conn.execute(
74            "INSERT INTO events (event_id, event_data, created_at) VALUES (?1, ?2, ?3)",
75            params![event.event_id.to_string(), event_json, created_at],
76        )?;
77
78        Ok(())
79    }
80
81    /// Get unsynced events (up to a limit)
82    pub fn get_unsynced(&self, limit: usize) -> Result<Vec<Event>> {
83        let mut stmt = self.conn.prepare(
84            "SELECT event_data FROM events WHERE synced_at IS NULL ORDER BY created_at ASC LIMIT ?1",
85        )?;
86
87        let events = stmt
88            .query_map(params![limit], |row| {
89                let event_json: String = row.get(0)?;
90                Ok(event_json)
91            })?
92            .collect::<std::result::Result<Vec<_>, _>>()?;
93
94        let mut parsed_events = Vec::new();
95        for event_json in events {
96            let event: Event = serde_json::from_str(&event_json)?;
97            parsed_events.push(event);
98        }
99
100        Ok(parsed_events)
101    }
102
103    /// Mark events as synced
104    pub fn mark_synced(&self, event_ids: &[uuid::Uuid]) -> Result<()> {
105        let synced_at = Utc::now().timestamp();
106
107        // Convert UUIDs to strings first so they own the data
108        let event_id_strings: Vec<String> = event_ids.iter().map(|id| id.to_string()).collect();
109
110        let placeholders = event_ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
111
112        let query = format!(
113            "UPDATE events SET synced_at = ?1 WHERE event_id IN ({})",
114            placeholders
115        );
116
117        let params: Vec<&dyn rusqlite::ToSql> = std::iter::once(&synced_at as &dyn rusqlite::ToSql)
118            .chain(event_id_strings.iter().map(|s| s as &dyn rusqlite::ToSql))
119            .collect();
120
121        self.conn.execute(&query, params.as_slice())?;
122
123        Ok(())
124    }
125
126    /// Increment retry count for events
127    pub fn increment_retry(&self, event_ids: &[uuid::Uuid]) -> Result<()> {
128        // Convert UUIDs to strings first so they own the data
129        let event_id_strings: Vec<String> = event_ids.iter().map(|id| id.to_string()).collect();
130
131        let placeholders = event_ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
132
133        let query = format!(
134            "UPDATE events SET retry_count = retry_count + 1 WHERE event_id IN ({})",
135            placeholders
136        );
137
138        let params: Vec<&dyn rusqlite::ToSql> = event_id_strings
139            .iter()
140            .map(|s| s as &dyn rusqlite::ToSql)
141            .collect();
142
143        self.conn.execute(&query, params.as_slice())?;
144
145        Ok(())
146    }
147
148    /// Get count of unsynced events
149    pub fn unsynced_count(&self) -> Result<usize> {
150        let count: usize = self.conn.query_row(
151            "SELECT COUNT(*) FROM events WHERE synced_at IS NULL",
152            [],
153            |row| row.get(0),
154        )?;
155
156        Ok(count)
157    }
158
159    /// Delete old synced events (older than 7 days)
160    pub fn cleanup_old_events(&self) -> Result<usize> {
161        let seven_days_ago = Utc::now().timestamp() - (7 * 24 * 60 * 60);
162
163        let deleted = self.conn.execute(
164            "DELETE FROM events WHERE synced_at IS NOT NULL AND synced_at < ?1",
165            params![seven_days_ago],
166        )?;
167
168        Ok(deleted)
169    }
170
171    /// Get total event count
172    pub fn total_count(&self) -> Result<usize> {
173        let count: usize = self
174            .conn
175            .query_row("SELECT COUNT(*) FROM events", [], |row| row.get(0))?;
176
177        Ok(count)
178    }
179}
180
181#[cfg(test)]
182mod tests {
183    use super::*;
184    use crate::event::*;
185    use uuid::Uuid;
186
187    fn create_test_event() -> Event {
188        Event {
189            schema_version: SCHEMA_VERSION.to_string(),
190            event_id: Uuid::new_v4(),
191            timestamp: Utc::now(),
192            service: ServiceInfo {
193                name: "test-service".to_string(),
194                version: "1.0.0".to_string(),
195                language: "rust".to_string(),
196                language_version: Some("1.75.0".to_string()),
197            },
198            user_id: "client_test123".to_string(),
199            session_id: Some("sess_test456".to_string()),
200            environment: Environment {
201                os: "linux".to_string(),
202                os_version: None,
203                arch: Some("x86_64".to_string()),
204                ci: Some(false),
205                shell: None,
206            },
207            event: EventData {
208                event_type: "test_event".to_string(),
209                category: Some("test".to_string()),
210                data: serde_json::json!({"test": true}),
211            },
212            metadata: Metadata {
213                sdk_version: "0.1.0".to_string(),
214                transmission_timestamp: Utc::now(),
215                batch_size: 1,
216                retry_count: 0,
217            },
218        }
219    }
220
221    #[test]
222    fn test_insert_and_get() {
223        let storage = EventStorage::in_memory().unwrap();
224        let event = create_test_event();
225
226        storage.insert(&event).unwrap();
227
228        let unsynced = storage.get_unsynced(10).unwrap();
229        assert_eq!(unsynced.len(), 1);
230        assert_eq!(unsynced[0].event_id, event.event_id);
231    }
232
233    #[test]
234    fn test_mark_synced() {
235        let storage = EventStorage::in_memory().unwrap();
236        let event = create_test_event();
237        let event_id = event.event_id;
238
239        storage.insert(&event).unwrap();
240        assert_eq!(storage.unsynced_count().unwrap(), 1);
241
242        storage.mark_synced(&[event_id]).unwrap();
243        assert_eq!(storage.unsynced_count().unwrap(), 0);
244    }
245
246    #[test]
247    fn test_increment_retry() {
248        let storage = EventStorage::in_memory().unwrap();
249        let event = create_test_event();
250        let event_id = event.event_id;
251
252        storage.insert(&event).unwrap();
253        storage.increment_retry(&[event_id]).unwrap();
254
255        // Note: We can't easily verify the retry count increased
256        // without adding a method to retrieve it, but the function runs without error
257    }
258}