1use rusqlite::{Connection, params};
4use serde::{Deserialize, Serialize};
5use std::sync::Mutex;
6
7#[derive(Debug, Clone, Serialize, Deserialize)]
8pub struct Session {
9 pub id: String,
10 pub name: Option<String>,
11 pub status: String,
12 pub messages_json: String,
13 pub created_at: i64,
14 pub updated_at: i64,
15}
16
17#[derive(Debug, Clone, Serialize, Deserialize)]
18pub struct SessionSearchHit {
19 pub session_id: String,
20 pub turn_index: usize,
21 pub role: String,
22 pub text: String,
23 pub updated_at: i64,
24}
25
26#[derive(Debug, Clone, Serialize, Deserialize)]
27pub struct SessionSlice {
28 pub session_id: String,
29 pub start: usize,
30 pub messages: Vec<crate::provider::Msg>,
31}
32
33pub struct SessionStore {
42 conn: Mutex<Connection>,
43}
44
45impl SessionStore {
46 pub fn open(db_path: &std::path::Path) -> anyhow::Result<Self> {
47 let conn = Connection::open(db_path)?;
48 conn.execute_batch(
49 "CREATE TABLE IF NOT EXISTS sessions (
50 id TEXT PRIMARY KEY,
51 created_at INTEGER NOT NULL DEFAULT (unixepoch()),
52 updated_at INTEGER NOT NULL DEFAULT (unixepoch()),
53 name TEXT,
54 status TEXT DEFAULT 'active',
55 messages_json TEXT NOT NULL DEFAULT '[]'
56 );
57 CREATE TABLE IF NOT EXISTS session_messages (
58 session_id TEXT NOT NULL,
59 turn_index INTEGER NOT NULL,
60 role TEXT NOT NULL,
61 text TEXT NOT NULL,
62 updated_at INTEGER NOT NULL DEFAULT (unixepoch()),
63 PRIMARY KEY (session_id, turn_index)
64 );
65 CREATE VIRTUAL TABLE IF NOT EXISTS session_messages_fts USING fts5(
66 role, text, content='session_messages', content_rowid='rowid'
67 );
68 CREATE TRIGGER IF NOT EXISTS session_messages_ai AFTER INSERT ON session_messages BEGIN
69 INSERT INTO session_messages_fts(rowid, role, text) VALUES (new.rowid, new.role, new.text);
70 END;
71 CREATE TRIGGER IF NOT EXISTS session_messages_ad AFTER DELETE ON session_messages BEGIN
72 INSERT INTO session_messages_fts(session_messages_fts, rowid, role, text) VALUES ('delete', old.rowid, old.role, old.text);
73 END;
74 CREATE TRIGGER IF NOT EXISTS session_messages_au AFTER UPDATE ON session_messages BEGIN
75 INSERT INTO session_messages_fts(session_messages_fts, rowid, role, text) VALUES ('delete', old.rowid, old.role, old.text);
76 INSERT INTO session_messages_fts(rowid, role, text) VALUES (new.rowid, new.role, new.text);
77 END;",
78 )?;
79 Ok(Self {
80 conn: Mutex::new(conn),
81 })
82 }
83
84 pub fn save(
85 &self,
86 id: &str,
87 messages: &[crate::provider::Msg],
88 name: Option<&str>,
89 ) -> anyhow::Result<()> {
90 let mut conn = self.conn.lock().unwrap();
91 let json = serde_json::to_string(messages)?;
92 let tx = conn.transaction()?;
93 tx.execute(
94 "INSERT OR REPLACE INTO sessions (id, name, messages_json, updated_at) VALUES (?1, ?2, ?3, unixepoch())",
95 params![id, name, json],
96 )?;
97 tx.execute(
98 "DELETE FROM session_messages WHERE session_id = ?1",
99 params![id],
100 )?;
101 for (turn_index, msg) in messages.iter().enumerate() {
102 let text = message_text(msg);
103 if text.trim().is_empty() {
104 continue;
105 }
106 tx.execute(
107 "INSERT INTO session_messages (session_id, turn_index, role, text, updated_at)
108 VALUES (?1, ?2, ?3, ?4, unixepoch())",
109 params![id, turn_index as i64, msg.role, text],
110 )?;
111 }
112 tx.commit()?;
113 Ok(())
114 }
115
116 pub fn load(&self, id: &str) -> Option<Session> {
117 let conn = self.conn.lock().unwrap();
118 conn.query_row(
119 "SELECT id, name, status, messages_json, created_at, updated_at FROM sessions WHERE id = ?1",
120 params![id],
121 |row| Ok(Session {
122 id: row.get(0)?, name: row.get(1)?, status: row.get(2)?,
123 messages_json: row.get(3)?, created_at: row.get(4)?, updated_at: row.get(5)?,
124 }),
125 ).ok()
126 }
127
128 pub fn list(&self) -> Vec<Session> {
129 let conn = self.conn.lock().unwrap();
130 let Ok(mut stmt) = conn.prepare(
131 "SELECT id, name, status, messages_json, created_at, updated_at FROM sessions ORDER BY updated_at DESC LIMIT 100"
132 ) else {
133 return Vec::new();
134 };
135 let Ok(rows) = stmt.query_map([], |row| {
136 Ok(Session {
137 id: row.get(0)?,
138 name: row.get(1)?,
139 status: row.get(2)?,
140 messages_json: row.get(3)?,
141 created_at: row.get(4)?,
142 updated_at: row.get(5)?,
143 })
144 }) else {
145 return Vec::new();
146 };
147 rows.filter_map(|r| r.ok()).collect()
148 }
149
150 pub fn delete(&self, id: &str) -> anyhow::Result<()> {
151 let mut conn = self.conn.lock().unwrap();
152 let tx = conn.transaction()?;
153 tx.execute(
154 "DELETE FROM session_messages WHERE session_id = ?1",
155 params![id],
156 )?;
157 tx.execute("DELETE FROM sessions WHERE id = ?1", params![id])?;
158 tx.commit()?;
159 Ok(())
160 }
161
162 pub fn search(&self, query: &str, limit: usize) -> Vec<SessionSearchHit> {
163 let query = query.trim();
164 if query.is_empty() {
165 return Vec::new();
166 }
167 let conn = self.conn.lock().unwrap();
168 let pattern = query
169 .split_whitespace()
170 .map(|word| format!("{}*", escape_fts_token(word)))
171 .collect::<Vec<_>>()
172 .join(" ");
173 let result = conn.prepare(
174 "SELECT sm.session_id, sm.turn_index, sm.role, sm.text, sm.updated_at
175 FROM session_messages sm
176 INNER JOIN session_messages_fts fts ON sm.rowid = fts.rowid
177 WHERE session_messages_fts MATCH ?1
178 ORDER BY rank LIMIT ?2",
179 );
180 if let Ok(mut stmt) = result {
181 if let Ok(rows) = stmt.query_map(params![pattern, limit as i64], |row| {
182 Ok(SessionSearchHit {
183 session_id: row.get(0)?,
184 turn_index: row.get::<_, i64>(1)? as usize,
185 role: row.get(2)?,
186 text: row.get(3)?,
187 updated_at: row.get(4)?,
188 })
189 }) {
190 return rows.filter_map(|row| row.ok()).collect();
191 }
192 }
193
194 let like_pattern = format!("%{}%", query);
195 let Ok(mut stmt) = conn.prepare(
196 "SELECT session_id, turn_index, role, text, updated_at
197 FROM session_messages
198 WHERE text LIKE ?1
199 ORDER BY updated_at DESC LIMIT ?2",
200 ) else {
201 return Vec::new();
202 };
203 let Ok(rows) = stmt.query_map(params![like_pattern, limit as i64], |row| {
204 Ok(SessionSearchHit {
205 session_id: row.get(0)?,
206 turn_index: row.get::<_, i64>(1)? as usize,
207 role: row.get(2)?,
208 text: row.get(3)?,
209 updated_at: row.get(4)?,
210 })
211 }) else {
212 return Vec::new();
213 };
214 rows.filter_map(|row| row.ok()).collect()
215 }
216
217 pub fn scroll(
218 &self,
219 id: &str,
220 around: usize,
221 before: usize,
222 after: usize,
223 ) -> Option<SessionSlice> {
224 let session = self.load(id)?;
225 let messages: Vec<crate::provider::Msg> =
226 serde_json::from_str(&session.messages_json).ok()?;
227 let start = around.saturating_sub(before);
228 let end = (around + after + 1).min(messages.len());
229 Some(SessionSlice {
230 session_id: id.to_string(),
231 start,
232 messages: messages[start..end].to_vec(),
233 })
234 }
235
236 pub fn recent_inputs(&self, limit: usize) -> Vec<String> {
237 let limit = limit.clamp(1, 100);
238 let conn = self.conn.lock().unwrap();
239 let Ok(mut stmt) = conn.prepare(
240 "SELECT text
241 FROM session_messages
242 WHERE role = 'user' AND trim(text) != ''
243 ORDER BY updated_at DESC, session_id DESC, turn_index DESC
244 LIMIT ?1",
245 ) else {
246 return Vec::new();
247 };
248 let Ok(rows) = stmt.query_map(params![limit as i64], |row| row.get::<_, String>(0)) else {
249 return Vec::new();
250 };
251 let mut seen = std::collections::HashSet::new();
252 rows.filter_map(|row| row.ok())
253 .filter(|text| seen.insert(text.clone()))
254 .collect()
255 }
256}
257
258fn message_text(msg: &crate::provider::Msg) -> String {
259 msg.content
260 .iter()
261 .filter_map(block_text)
262 .collect::<Vec<_>>()
263 .join("\n")
264}
265
266fn block_text(block: &crate::provider::ContentBlock) -> Option<String> {
267 match block {
268 crate::provider::ContentBlock::Text { text } => Some(text.clone()),
269 crate::provider::ContentBlock::ToolUse { name, .. } => Some(name.clone()),
270 crate::provider::ContentBlock::ToolResult { content, .. } => {
271 let text = content
272 .iter()
273 .filter_map(block_text)
274 .collect::<Vec<_>>()
275 .join("\n");
276 if text.trim().is_empty() {
277 None
278 } else {
279 Some(text)
280 }
281 }
282 crate::provider::ContentBlock::Image { .. } => None,
283 crate::provider::ContentBlock::Reasoning { text } => Some(text.clone()),
287 }
288}
289
290fn escape_fts_token(token: &str) -> String {
291 token
292 .chars()
293 .filter(|c| c.is_alphanumeric() || *c == '_' || *c == '-')
294 .collect()
295}
296
297use std::sync::atomic::{AtomicU64, Ordering};
300
301pub struct Metrics {
302 pub requests_total: AtomicU64,
303 pub requests_errors: AtomicU64,
304 pub tokens_input: AtomicU64,
305 pub tokens_output: AtomicU64,
306 pub tool_calls_total: AtomicU64,
307 pub tool_calls_errors: AtomicU64,
308 pub cost_usd_cents: AtomicU64,
309 pub active_sessions: AtomicU64,
310}
311
312impl Metrics {
313 pub fn new() -> Self {
314 Self {
315 requests_total: AtomicU64::new(0),
316 requests_errors: AtomicU64::new(0),
317 tokens_input: AtomicU64::new(0),
318 tokens_output: AtomicU64::new(0),
319 tool_calls_total: AtomicU64::new(0),
320 tool_calls_errors: AtomicU64::new(0),
321 cost_usd_cents: AtomicU64::new(0),
322 active_sessions: AtomicU64::new(0),
323 }
324 }
325
326 pub fn render(&self) -> String {
327 let mut out = String::new();
328 out.push_str(&format!(
329 "# HELP sparrow_requests_total Total number of requests\n"
330 ));
331 out.push_str(&format!("# TYPE sparrow_requests_total counter\n"));
332 out.push_str(&format!(
333 "sparrow_requests_total{{status=\"ok\"}} {}\n",
334 self.requests_total.load(Ordering::Relaxed)
335 ));
336 out.push_str(&format!(
337 "sparrow_requests_total{{status=\"error\"}} {}\n",
338 self.requests_errors.load(Ordering::Relaxed)
339 ));
340
341 out.push_str(&format!(
342 "# HELP sparrow_tokens_used_total Total tokens used\n"
343 ));
344 out.push_str(&format!("# TYPE sparrow_tokens_used_total counter\n"));
345 out.push_str(&format!(
346 "sparrow_tokens_used_total{{direction=\"input\"}} {}\n",
347 self.tokens_input.load(Ordering::Relaxed)
348 ));
349 out.push_str(&format!(
350 "sparrow_tokens_used_total{{direction=\"output\"}} {}\n",
351 self.tokens_output.load(Ordering::Relaxed)
352 ));
353
354 out.push_str(&format!(
355 "# HELP sparrow_tool_calls_total Total tool calls\n"
356 ));
357 out.push_str(&format!("# TYPE sparrow_tool_calls_total counter\n"));
358 out.push_str(&format!(
359 "sparrow_tool_calls_total{{status=\"ok\"}} {}\n",
360 self.tool_calls_total.load(Ordering::Relaxed)
361 ));
362 out.push_str(&format!(
363 "sparrow_tool_calls_total{{status=\"error\"}} {}\n",
364 self.tool_calls_errors.load(Ordering::Relaxed)
365 ));
366
367 out.push_str(&format!(
368 "# HELP sparrow_cost_usd_total Total cost in USD cents\n"
369 ));
370 out.push_str(&format!("# TYPE sparrow_cost_usd_total counter\n"));
371 out.push_str(&format!(
372 "sparrow_cost_usd_total {}\n",
373 self.cost_usd_cents.load(Ordering::Relaxed)
374 ));
375
376 out.push_str(&format!("# HELP sparrow_active_sessions Active sessions\n"));
377 out.push_str(&format!("# TYPE sparrow_active_sessions gauge\n"));
378 out.push_str(&format!(
379 "sparrow_active_sessions {}\n",
380 self.active_sessions.load(Ordering::Relaxed)
381 ));
382
383 out
384 }
385}
386
387impl Default for Metrics {
388 fn default() -> Self {
389 Self::new()
390 }
391}