Skip to main content

sparrow/runtime/
session.rs

1// ─── Session persistence (Phase 9 Item 27) ─────────────────────────────────────
2
3use rusqlite::{Connection, params};
4use serde::{Deserialize, Serialize};
5use std::sync::Mutex;
6
7#[derive(Debug, Clone, Serialize, Deserialize)]
8pub struct Session {
9    pub id: String,
10    pub name: Option<String>,
11    pub status: String,
12    pub messages_json: String,
13    pub created_at: i64,
14    pub updated_at: i64,
15}
16
17#[derive(Debug, Clone, Serialize, Deserialize)]
18pub struct SessionSearchHit {
19    pub session_id: String,
20    pub turn_index: usize,
21    pub role: String,
22    pub text: String,
23    pub updated_at: i64,
24}
25
26#[derive(Debug, Clone, Serialize, Deserialize)]
27pub struct SessionSlice {
28    pub session_id: String,
29    pub start: usize,
30    pub messages: Vec<crate::provider::Msg>,
31}
32
33/// SQLite-backed session storage.
34///
35/// All public methods are synchronous and hold a `std::sync::Mutex<Connection>`
36/// for the duration of a single statement (or a single `BEGIN…COMMIT` block).
37/// They are safe to call from an async context because they do not `.await`
38/// while the lock is held — but they are *blocking I/O*, so callers that run
39/// in a Tokio worker thread should wrap calls in `tokio::task::spawn_blocking`
40/// when latency matters (e.g. inside a hot request path).
41pub struct SessionStore {
42    conn: Mutex<Connection>,
43}
44
45impl SessionStore {
46    pub fn open(db_path: &std::path::Path) -> anyhow::Result<Self> {
47        let conn = Connection::open(db_path)?;
48        conn.execute_batch(
49            "CREATE TABLE IF NOT EXISTS sessions (
50                id TEXT PRIMARY KEY,
51                created_at INTEGER NOT NULL DEFAULT (unixepoch()),
52                updated_at INTEGER NOT NULL DEFAULT (unixepoch()),
53                name TEXT,
54                status TEXT DEFAULT 'active',
55                messages_json TEXT NOT NULL DEFAULT '[]'
56            );
57            CREATE TABLE IF NOT EXISTS session_messages (
58                session_id TEXT NOT NULL,
59                turn_index INTEGER NOT NULL,
60                role TEXT NOT NULL,
61                text TEXT NOT NULL,
62                updated_at INTEGER NOT NULL DEFAULT (unixepoch()),
63                PRIMARY KEY (session_id, turn_index)
64            );
65            CREATE VIRTUAL TABLE IF NOT EXISTS session_messages_fts USING fts5(
66                role, text, content='session_messages', content_rowid='rowid'
67            );
68            CREATE TRIGGER IF NOT EXISTS session_messages_ai AFTER INSERT ON session_messages BEGIN
69                INSERT INTO session_messages_fts(rowid, role, text) VALUES (new.rowid, new.role, new.text);
70            END;
71            CREATE TRIGGER IF NOT EXISTS session_messages_ad AFTER DELETE ON session_messages BEGIN
72                INSERT INTO session_messages_fts(session_messages_fts, rowid, role, text) VALUES ('delete', old.rowid, old.role, old.text);
73            END;
74            CREATE TRIGGER IF NOT EXISTS session_messages_au AFTER UPDATE ON session_messages BEGIN
75                INSERT INTO session_messages_fts(session_messages_fts, rowid, role, text) VALUES ('delete', old.rowid, old.role, old.text);
76                INSERT INTO session_messages_fts(rowid, role, text) VALUES (new.rowid, new.role, new.text);
77            END;",
78        )?;
79        Ok(Self {
80            conn: Mutex::new(conn),
81        })
82    }
83
84    pub fn save(
85        &self,
86        id: &str,
87        messages: &[crate::provider::Msg],
88        name: Option<&str>,
89    ) -> anyhow::Result<()> {
90        let mut conn = self.conn.lock().unwrap();
91        let json = serde_json::to_string(messages)?;
92        let tx = conn.transaction()?;
93        tx.execute(
94            "INSERT OR REPLACE INTO sessions (id, name, messages_json, updated_at) VALUES (?1, ?2, ?3, unixepoch())",
95            params![id, name, json],
96        )?;
97        tx.execute(
98            "DELETE FROM session_messages WHERE session_id = ?1",
99            params![id],
100        )?;
101        for (turn_index, msg) in messages.iter().enumerate() {
102            let text = message_text(msg);
103            if text.trim().is_empty() {
104                continue;
105            }
106            tx.execute(
107                "INSERT INTO session_messages (session_id, turn_index, role, text, updated_at)
108                 VALUES (?1, ?2, ?3, ?4, unixepoch())",
109                params![id, turn_index as i64, msg.role, text],
110            )?;
111        }
112        tx.commit()?;
113        Ok(())
114    }
115
116    pub fn load(&self, id: &str) -> Option<Session> {
117        let conn = self.conn.lock().unwrap();
118        conn.query_row(
119            "SELECT id, name, status, messages_json, created_at, updated_at FROM sessions WHERE id = ?1",
120            params![id],
121            |row| Ok(Session {
122                id: row.get(0)?, name: row.get(1)?, status: row.get(2)?,
123                messages_json: row.get(3)?, created_at: row.get(4)?, updated_at: row.get(5)?,
124            }),
125        ).ok()
126    }
127
128    pub fn list(&self) -> Vec<Session> {
129        let conn = self.conn.lock().unwrap();
130        let Ok(mut stmt) = conn.prepare(
131            "SELECT id, name, status, messages_json, created_at, updated_at FROM sessions ORDER BY updated_at DESC LIMIT 100"
132        ) else {
133            return Vec::new();
134        };
135        let Ok(rows) = stmt.query_map([], |row| {
136            Ok(Session {
137                id: row.get(0)?,
138                name: row.get(1)?,
139                status: row.get(2)?,
140                messages_json: row.get(3)?,
141                created_at: row.get(4)?,
142                updated_at: row.get(5)?,
143            })
144        }) else {
145            return Vec::new();
146        };
147        rows.filter_map(|r| r.ok()).collect()
148    }
149
150    pub fn delete(&self, id: &str) -> anyhow::Result<()> {
151        let mut conn = self.conn.lock().unwrap();
152        let tx = conn.transaction()?;
153        tx.execute(
154            "DELETE FROM session_messages WHERE session_id = ?1",
155            params![id],
156        )?;
157        tx.execute("DELETE FROM sessions WHERE id = ?1", params![id])?;
158        tx.commit()?;
159        Ok(())
160    }
161
162    pub fn search(&self, query: &str, limit: usize) -> Vec<SessionSearchHit> {
163        let query = query.trim();
164        if query.is_empty() {
165            return Vec::new();
166        }
167        let conn = self.conn.lock().unwrap();
168        let pattern = query
169            .split_whitespace()
170            .map(|word| format!("{}*", escape_fts_token(word)))
171            .collect::<Vec<_>>()
172            .join(" ");
173        let result = conn.prepare(
174            "SELECT sm.session_id, sm.turn_index, sm.role, sm.text, sm.updated_at
175             FROM session_messages sm
176             INNER JOIN session_messages_fts fts ON sm.rowid = fts.rowid
177             WHERE session_messages_fts MATCH ?1
178             ORDER BY rank LIMIT ?2",
179        );
180        if let Ok(mut stmt) = result {
181            if let Ok(rows) = stmt.query_map(params![pattern, limit as i64], |row| {
182                Ok(SessionSearchHit {
183                    session_id: row.get(0)?,
184                    turn_index: row.get::<_, i64>(1)? as usize,
185                    role: row.get(2)?,
186                    text: row.get(3)?,
187                    updated_at: row.get(4)?,
188                })
189            }) {
190                return rows.filter_map(|row| row.ok()).collect();
191            }
192        }
193
194        let like_pattern = format!("%{}%", query);
195        let Ok(mut stmt) = conn.prepare(
196            "SELECT session_id, turn_index, role, text, updated_at
197             FROM session_messages
198             WHERE text LIKE ?1
199             ORDER BY updated_at DESC LIMIT ?2",
200        ) else {
201            return Vec::new();
202        };
203        let Ok(rows) = stmt.query_map(params![like_pattern, limit as i64], |row| {
204            Ok(SessionSearchHit {
205                session_id: row.get(0)?,
206                turn_index: row.get::<_, i64>(1)? as usize,
207                role: row.get(2)?,
208                text: row.get(3)?,
209                updated_at: row.get(4)?,
210            })
211        }) else {
212            return Vec::new();
213        };
214        rows.filter_map(|row| row.ok()).collect()
215    }
216
217    pub fn scroll(
218        &self,
219        id: &str,
220        around: usize,
221        before: usize,
222        after: usize,
223    ) -> Option<SessionSlice> {
224        let session = self.load(id)?;
225        let messages: Vec<crate::provider::Msg> =
226            serde_json::from_str(&session.messages_json).ok()?;
227        let start = around.saturating_sub(before);
228        let end = (around + after + 1).min(messages.len());
229        Some(SessionSlice {
230            session_id: id.to_string(),
231            start,
232            messages: messages[start..end].to_vec(),
233        })
234    }
235
236    pub fn recent_inputs(&self, limit: usize) -> Vec<String> {
237        let limit = limit.clamp(1, 100);
238        let conn = self.conn.lock().unwrap();
239        let Ok(mut stmt) = conn.prepare(
240            "SELECT text
241             FROM session_messages
242             WHERE role = 'user' AND trim(text) != ''
243             ORDER BY updated_at DESC, session_id DESC, turn_index DESC
244             LIMIT ?1",
245        ) else {
246            return Vec::new();
247        };
248        let Ok(rows) = stmt.query_map(params![limit as i64], |row| row.get::<_, String>(0)) else {
249            return Vec::new();
250        };
251        let mut seen = std::collections::HashSet::new();
252        rows.filter_map(|row| row.ok())
253            .filter(|text| seen.insert(text.clone()))
254            .collect()
255    }
256}
257
258fn message_text(msg: &crate::provider::Msg) -> String {
259    msg.content
260        .iter()
261        .filter_map(block_text)
262        .collect::<Vec<_>>()
263        .join("\n")
264}
265
266fn block_text(block: &crate::provider::ContentBlock) -> Option<String> {
267    match block {
268        crate::provider::ContentBlock::Text { text } => Some(text.clone()),
269        crate::provider::ContentBlock::ToolUse { name, .. } => Some(name.clone()),
270        crate::provider::ContentBlock::ToolResult { content, .. } => {
271            let text = content
272                .iter()
273                .filter_map(block_text)
274                .collect::<Vec<_>>()
275                .join("\n");
276            if text.trim().is_empty() {
277                None
278            } else {
279                Some(text)
280            }
281        }
282        crate::provider::ContentBlock::Image { .. } => None,
283        // Reasoning content is an opaque thinking trace — not part of the
284        // visible transcript searched by FTS, but keep it indexed lightly so
285        // a "what did the model decide" query still hits.
286        crate::provider::ContentBlock::Reasoning { text } => Some(text.clone()),
287    }
288}
289
290fn escape_fts_token(token: &str) -> String {
291    token
292        .chars()
293        .filter(|c| c.is_alphanumeric() || *c == '_' || *c == '-')
294        .collect()
295}
296
297// ─── Prometheus metrics (Phase 10 Item 29) ─────────────────────────────────────
298
299use std::sync::atomic::{AtomicU64, Ordering};
300
301pub struct Metrics {
302    pub requests_total: AtomicU64,
303    pub requests_errors: AtomicU64,
304    pub tokens_input: AtomicU64,
305    pub tokens_output: AtomicU64,
306    pub tool_calls_total: AtomicU64,
307    pub tool_calls_errors: AtomicU64,
308    pub cost_usd_cents: AtomicU64,
309    pub active_sessions: AtomicU64,
310}
311
312impl Metrics {
313    pub fn new() -> Self {
314        Self {
315            requests_total: AtomicU64::new(0),
316            requests_errors: AtomicU64::new(0),
317            tokens_input: AtomicU64::new(0),
318            tokens_output: AtomicU64::new(0),
319            tool_calls_total: AtomicU64::new(0),
320            tool_calls_errors: AtomicU64::new(0),
321            cost_usd_cents: AtomicU64::new(0),
322            active_sessions: AtomicU64::new(0),
323        }
324    }
325
326    pub fn render(&self) -> String {
327        let mut out = String::new();
328        out.push_str(&format!(
329            "# HELP sparrow_requests_total Total number of requests\n"
330        ));
331        out.push_str(&format!("# TYPE sparrow_requests_total counter\n"));
332        out.push_str(&format!(
333            "sparrow_requests_total{{status=\"ok\"}} {}\n",
334            self.requests_total.load(Ordering::Relaxed)
335        ));
336        out.push_str(&format!(
337            "sparrow_requests_total{{status=\"error\"}} {}\n",
338            self.requests_errors.load(Ordering::Relaxed)
339        ));
340
341        out.push_str(&format!(
342            "# HELP sparrow_tokens_used_total Total tokens used\n"
343        ));
344        out.push_str(&format!("# TYPE sparrow_tokens_used_total counter\n"));
345        out.push_str(&format!(
346            "sparrow_tokens_used_total{{direction=\"input\"}} {}\n",
347            self.tokens_input.load(Ordering::Relaxed)
348        ));
349        out.push_str(&format!(
350            "sparrow_tokens_used_total{{direction=\"output\"}} {}\n",
351            self.tokens_output.load(Ordering::Relaxed)
352        ));
353
354        out.push_str(&format!(
355            "# HELP sparrow_tool_calls_total Total tool calls\n"
356        ));
357        out.push_str(&format!("# TYPE sparrow_tool_calls_total counter\n"));
358        out.push_str(&format!(
359            "sparrow_tool_calls_total{{status=\"ok\"}} {}\n",
360            self.tool_calls_total.load(Ordering::Relaxed)
361        ));
362        out.push_str(&format!(
363            "sparrow_tool_calls_total{{status=\"error\"}} {}\n",
364            self.tool_calls_errors.load(Ordering::Relaxed)
365        ));
366
367        out.push_str(&format!(
368            "# HELP sparrow_cost_usd_total Total cost in USD cents\n"
369        ));
370        out.push_str(&format!("# TYPE sparrow_cost_usd_total counter\n"));
371        out.push_str(&format!(
372            "sparrow_cost_usd_total {}\n",
373            self.cost_usd_cents.load(Ordering::Relaxed)
374        ));
375
376        out.push_str(&format!("# HELP sparrow_active_sessions Active sessions\n"));
377        out.push_str(&format!("# TYPE sparrow_active_sessions gauge\n"));
378        out.push_str(&format!(
379            "sparrow_active_sessions {}\n",
380            self.active_sessions.load(Ordering::Relaxed)
381        ));
382
383        out
384    }
385}
386
387impl Default for Metrics {
388    fn default() -> Self {
389        Self::new()
390    }
391}