1use crate::error::Result;
4use crate::event::Event;
5use chrono::Utc;
6use rusqlite::{params, Connection};
7use std::path::PathBuf;
8
9pub struct EventStorage {
11 conn: Connection,
12}
13
14unsafe impl Send for EventStorage {}
17unsafe impl Sync for EventStorage {}
18
19impl EventStorage {
20 pub fn new(db_path: impl Into<PathBuf>) -> Result<Self> {
24 let path = db_path.into();
25
26 if let Some(parent) = path.parent() {
28 std::fs::create_dir_all(parent)?;
29 }
30
31 let conn = Connection::open(&path)?;
32
33 let storage = Self { conn };
34 storage.initialize_schema()?;
35
36 Ok(storage)
37 }
38
39 pub fn in_memory() -> Result<Self> {
41 let conn = Connection::open_in_memory()?;
42 let storage = Self { conn };
43 storage.initialize_schema()?;
44 Ok(storage)
45 }
46
47 fn initialize_schema(&self) -> Result<()> {
49 self.conn.execute_batch(
50 r#"
51 CREATE TABLE IF NOT EXISTS events (
52 id INTEGER PRIMARY KEY AUTOINCREMENT,
53 event_id TEXT UNIQUE NOT NULL,
54 event_data TEXT NOT NULL,
55 created_at INTEGER NOT NULL,
56 synced_at INTEGER,
57 retry_count INTEGER DEFAULT 0
58 );
59
60 CREATE INDEX IF NOT EXISTS idx_synced_at ON events(synced_at);
61 CREATE INDEX IF NOT EXISTS idx_created_at ON events(created_at);
62 "#,
63 )?;
64
65 Ok(())
66 }
67
68 pub fn insert(&self, event: &Event) -> Result<()> {
70 let event_json = serde_json::to_string(event)?;
71 let created_at = Utc::now().timestamp();
72
73 self.conn.execute(
74 "INSERT INTO events (event_id, event_data, created_at) VALUES (?1, ?2, ?3)",
75 params![event.event_id.to_string(), event_json, created_at],
76 )?;
77
78 Ok(())
79 }
80
81 pub fn get_unsynced(&self, limit: usize) -> Result<Vec<Event>> {
83 let mut stmt = self.conn.prepare(
84 "SELECT event_data FROM events WHERE synced_at IS NULL ORDER BY created_at ASC LIMIT ?1",
85 )?;
86
87 let events = stmt
88 .query_map(params![limit], |row| {
89 let event_json: String = row.get(0)?;
90 Ok(event_json)
91 })?
92 .collect::<std::result::Result<Vec<_>, _>>()?;
93
94 let mut parsed_events = Vec::new();
95 for event_json in events {
96 let event: Event = serde_json::from_str(&event_json)?;
97 parsed_events.push(event);
98 }
99
100 Ok(parsed_events)
101 }
102
103 pub fn mark_synced(&self, event_ids: &[uuid::Uuid]) -> Result<()> {
105 let synced_at = Utc::now().timestamp();
106
107 let event_id_strings: Vec<String> = event_ids.iter().map(|id| id.to_string()).collect();
109
110 let placeholders = event_ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
111
112 let query = format!(
113 "UPDATE events SET synced_at = ?1 WHERE event_id IN ({})",
114 placeholders
115 );
116
117 let params: Vec<&dyn rusqlite::ToSql> = std::iter::once(&synced_at as &dyn rusqlite::ToSql)
118 .chain(event_id_strings.iter().map(|s| s as &dyn rusqlite::ToSql))
119 .collect();
120
121 self.conn.execute(&query, params.as_slice())?;
122
123 Ok(())
124 }
125
126 pub fn increment_retry(&self, event_ids: &[uuid::Uuid]) -> Result<()> {
128 let event_id_strings: Vec<String> = event_ids.iter().map(|id| id.to_string()).collect();
130
131 let placeholders = event_ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
132
133 let query = format!(
134 "UPDATE events SET retry_count = retry_count + 1 WHERE event_id IN ({})",
135 placeholders
136 );
137
138 let params: Vec<&dyn rusqlite::ToSql> = event_id_strings
139 .iter()
140 .map(|s| s as &dyn rusqlite::ToSql)
141 .collect();
142
143 self.conn.execute(&query, params.as_slice())?;
144
145 Ok(())
146 }
147
148 pub fn unsynced_count(&self) -> Result<usize> {
150 let count: usize = self.conn.query_row(
151 "SELECT COUNT(*) FROM events WHERE synced_at IS NULL",
152 [],
153 |row| row.get(0),
154 )?;
155
156 Ok(count)
157 }
158
159 pub fn cleanup_old_events(&self) -> Result<usize> {
161 let seven_days_ago = Utc::now().timestamp() - (7 * 24 * 60 * 60);
162
163 let deleted = self.conn.execute(
164 "DELETE FROM events WHERE synced_at IS NOT NULL AND synced_at < ?1",
165 params![seven_days_ago],
166 )?;
167
168 Ok(deleted)
169 }
170
171 pub fn total_count(&self) -> Result<usize> {
173 let count: usize = self
174 .conn
175 .query_row("SELECT COUNT(*) FROM events", [], |row| row.get(0))?;
176
177 Ok(count)
178 }
179}
180
181#[cfg(test)]
182mod tests {
183 use super::*;
184 use crate::event::*;
185 use uuid::Uuid;
186
187 fn create_test_event() -> Event {
188 Event {
189 schema_version: SCHEMA_VERSION.to_string(),
190 event_id: Uuid::new_v4(),
191 timestamp: Utc::now(),
192 service: ServiceInfo {
193 name: "test-service".to_string(),
194 version: "1.0.0".to_string(),
195 language: "rust".to_string(),
196 language_version: Some("1.75.0".to_string()),
197 },
198 user_id: "client_test123".to_string(),
199 session_id: Some("sess_test456".to_string()),
200 environment: Environment {
201 os: "linux".to_string(),
202 os_version: None,
203 arch: Some("x86_64".to_string()),
204 ci: Some(false),
205 shell: None,
206 },
207 event: EventData {
208 event_type: "test_event".to_string(),
209 category: Some("test".to_string()),
210 data: serde_json::json!({"test": true}),
211 },
212 metadata: Metadata {
213 sdk_version: "0.1.0".to_string(),
214 transmission_timestamp: Utc::now(),
215 batch_size: 1,
216 retry_count: 0,
217 },
218 }
219 }
220
221 #[test]
222 fn test_insert_and_get() {
223 let storage = EventStorage::in_memory().unwrap();
224 let event = create_test_event();
225
226 storage.insert(&event).unwrap();
227
228 let unsynced = storage.get_unsynced(10).unwrap();
229 assert_eq!(unsynced.len(), 1);
230 assert_eq!(unsynced[0].event_id, event.event_id);
231 }
232
233 #[test]
234 fn test_mark_synced() {
235 let storage = EventStorage::in_memory().unwrap();
236 let event = create_test_event();
237 let event_id = event.event_id;
238
239 storage.insert(&event).unwrap();
240 assert_eq!(storage.unsynced_count().unwrap(), 1);
241
242 storage.mark_synced(&[event_id]).unwrap();
243 assert_eq!(storage.unsynced_count().unwrap(), 0);
244 }
245
246 #[test]
247 fn test_increment_retry() {
248 let storage = EventStorage::in_memory().unwrap();
249 let event = create_test_event();
250 let event_id = event.event_id;
251
252 storage.insert(&event).unwrap();
253 storage.increment_retry(&[event_id]).unwrap();
254
255 }
258}