Skip to main content

pitchfork_cli/log_store/
sqlite.rs

1use crate::Result;
2use crate::daemon_id::DaemonId;
3use crate::log_store::{LogEntry, LogQuery, LogStore};
4use chrono::{DateTime, Local, TimeZone};
5use log::error;
6use miette::IntoDiagnostic;
7use rusqlite::{Connection, OptionalExtension, params};
8use std::collections::HashSet;
9use std::io::{BufRead, BufReader};
10use std::path::PathBuf;
11use std::sync::Mutex;
12
13/// SQLite-backed log store with WAL mode for concurrent readers.
14pub struct SqliteLogStore {
15    conn: Mutex<Connection>,
16}
17
18impl SqliteLogStore {
19    /// Open or create the SQLite log store at the given path.
20    pub fn open(path: impl Into<PathBuf>) -> Result<Self> {
21        let path = path.into();
22        if let Some(parent) = path.parent() {
23            std::fs::create_dir_all(parent).into_diagnostic()?;
24        }
25        let conn = Connection::open(&path).into_diagnostic()?;
26        conn.execute_batch(
27            "PRAGMA journal_mode = WAL;
28             PRAGMA synchronous = NORMAL;",
29        )
30        .into_diagnostic()?;
31        conn.execute(
32            "CREATE TABLE IF NOT EXISTS log_entries (
33                id          INTEGER PRIMARY KEY AUTOINCREMENT,
34                daemon_id   TEXT    NOT NULL,
35                timestamp   INTEGER NOT NULL,
36                message     TEXT    NOT NULL
37            );",
38            [],
39        )
40        .into_diagnostic()?;
41        conn.execute(
42            "CREATE INDEX IF NOT EXISTS idx_daemon_ts ON log_entries(daemon_id, timestamp);",
43            [],
44        )
45        .into_diagnostic()?;
46        conn.execute(
47            "CREATE INDEX IF NOT EXISTS idx_daemon_id ON log_entries(daemon_id, id);",
48            [],
49        )
50        .into_diagnostic()?;
51        conn.execute(
52            "CREATE INDEX IF NOT EXISTS idx_timestamp ON log_entries(timestamp);",
53            [],
54        )
55        .into_diagnostic()?;
56        conn.execute(
57            "CREATE TABLE IF NOT EXISTS log_clear_generations (
58                daemon_id TEXT PRIMARY KEY,
59                generation INTEGER NOT NULL DEFAULT 0
60            );",
61            [],
62        )
63        .into_diagnostic()?;
64        Ok(Self {
65            conn: Mutex::new(conn),
66        })
67    }
68
69    /// Rotate (delete) old log entries for a specific daemon based on retention policy.
70    pub fn rotate_by_age(&self, daemon_id: &DaemonId, max_age: chrono::Duration) -> Result<u64> {
71        let cutoff = (Local::now() - max_age).timestamp_millis();
72        let conn = self.conn.lock().unwrap();
73        let rows = conn
74            .execute(
75                "DELETE FROM log_entries WHERE daemon_id = ?1 AND timestamp < ?2",
76                params![daemon_id.qualified(), cutoff],
77            )
78            .into_diagnostic()?;
79        Ok(rows as u64)
80    }
81
82    /// Rotate (delete) old log entries keeping only the most recent `max_count` rows
83    /// for a specific daemon.
84    pub fn rotate_by_count(&self, daemon_id: &DaemonId, max_count: u64) -> Result<u64> {
85        let mut conn = self.conn.lock().unwrap();
86        let tx = conn.transaction().into_diagnostic()?;
87        let count: i64 = tx
88            .query_row(
89                "SELECT COUNT(*) FROM log_entries WHERE daemon_id = ?1",
90                [daemon_id.qualified()],
91                |row| row.get(0),
92            )
93            .into_diagnostic()?;
94        let to_delete = count.saturating_sub(max_count as i64);
95        let total_deleted = if to_delete > 0 {
96            let rows = tx
97                .execute(
98                    "DELETE FROM log_entries WHERE id IN (
99                        SELECT id FROM log_entries WHERE daemon_id = ?1 ORDER BY timestamp ASC, id ASC LIMIT ?2
100                    )",
101                    params![daemon_id.qualified(), to_delete],
102                )
103                .into_diagnostic()?;
104            rows as u64
105        } else {
106            0
107        };
108        tx.commit().into_diagnostic()?;
109        Ok(total_deleted)
110    }
111
112    /// Migrate existing text logs for a daemon into SQLite.
113    ///
114    /// Reads the legacy text file line-by-line (streaming) and inserts in
115    /// batches of 1000 to avoid loading multi-GB files into memory at once.
116    pub fn migrate_daemon_text_logs(&self, daemon_id: &DaemonId) -> Result<u64> {
117        let text_path = daemon_id.log_path();
118        if !text_path.exists() {
119            return Ok(0);
120        }
121
122        let file = std::fs::File::open(&text_path).into_diagnostic()?;
123        let reader = BufReader::new(file);
124        let re = regex::Regex::new(r"^(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}) ([\w./-]+) (.*)$")
125            .expect("invalid regex");
126
127        let mut current_timestamp: Option<DateTime<Local>> = None;
128        let mut current_message = String::new();
129        let mut entries = Vec::with_capacity(1000);
130        let mut total_migrated: u64 = 0;
131
132        for line in reader.lines() {
133            let line = line.into_diagnostic()?;
134            if let Some(caps) = re.captures(&line) {
135                if let Some(ts) = current_timestamp.take() {
136                    entries.push((ts, std::mem::take(&mut current_message)));
137                }
138                let ts_str = caps.get(1).map(|m| m.as_str()).unwrap_or_default();
139                let msg = caps.get(3).map(|m| m.as_str()).unwrap_or_default();
140                if let Ok(naive) =
141                    chrono::NaiveDateTime::parse_from_str(ts_str, "%Y-%m-%d %H:%M:%S")
142                {
143                    current_timestamp = Local.from_local_datetime(&naive).single();
144                    current_message = msg.to_string();
145                }
146            } else if current_timestamp.is_some() {
147                current_message.push('\n');
148                current_message.push_str(&line);
149            }
150
151            if entries.len() >= 1000 {
152                total_migrated += self.insert_batch(daemon_id, &entries)?;
153                entries.clear();
154            }
155        }
156
157        if let Some(ts) = current_timestamp {
158            entries.push((ts, std::mem::take(&mut current_message)));
159        }
160
161        if !entries.is_empty() {
162            total_migrated += self.insert_batch(daemon_id, &entries)?;
163        }
164
165        if total_migrated > 0 {
166            if let Err(e) = std::fs::remove_file(&text_path) {
167                log::warn!(
168                    "failed to remove legacy log file after migration {}: {e}",
169                    text_path.display()
170                );
171            }
172        }
173
174        Ok(total_migrated)
175    }
176
177    fn insert_batch(
178        &self,
179        daemon_id: &DaemonId,
180        entries: &[(DateTime<Local>, String)],
181    ) -> Result<u64> {
182        let mut conn = self.conn.lock().unwrap();
183        let tx = conn.transaction().into_diagnostic()?;
184        let mut count = 0u64;
185        {
186            let mut stmt = tx
187                .prepare(
188                    "INSERT INTO log_entries (daemon_id, timestamp, message) VALUES (?1, ?2, ?3)",
189                )
190                .into_diagnostic()?;
191            for (ts, msg) in entries {
192                stmt.execute(params![daemon_id.qualified(), ts.timestamp_millis(), msg])
193                    .into_diagnostic()?;
194                count += 1;
195            }
196        }
197        tx.commit().into_diagnostic()?;
198        Ok(count)
199    }
200}
201
202impl LogStore for SqliteLogStore {
203    fn append(&self, daemon_id: &DaemonId, message: &str) -> Result<()> {
204        let ts = Local::now().timestamp_millis();
205        let id = daemon_id.qualified();
206        let msg = message.to_string();
207
208        let conn = self.conn.lock().unwrap();
209        let _ = conn
210            .execute(
211                "INSERT INTO log_entries (daemon_id, timestamp, message) VALUES (?1, ?2, ?3)",
212                params![id, ts, msg],
213            )
214            .into_diagnostic()?;
215        Ok(())
216    }
217
218    fn query(&self, opts: &LogQuery) -> Result<Vec<LogEntry>> {
219        let conn = self.conn.lock().unwrap();
220        let mut conditions = Vec::new();
221        let mut query_params: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
222
223        if !opts.daemon_ids.is_empty() {
224            let placeholders: Vec<String> = (1..=opts.daemon_ids.len())
225                .map(|i| format!("?{}", i))
226                .collect();
227            conditions.push(format!("daemon_id IN ({})", placeholders.join(", ")));
228            for id in &opts.daemon_ids {
229                query_params.push(Box::new(id.clone()));
230            }
231        }
232
233        if let Some(from) = opts.from {
234            conditions.push(format!("timestamp >= ?{}", query_params.len() + 1));
235            query_params.push(Box::new(from.timestamp_millis()));
236        }
237
238        if let Some(to) = opts.to {
239            conditions.push(format!("timestamp <= ?{}", query_params.len() + 1));
240            query_params.push(Box::new(to.timestamp_millis()));
241        }
242
243        if let Some(after_id) = opts.after_id {
244            conditions.push(format!("id > ?{}", query_params.len() + 1));
245            query_params.push(Box::new(after_id));
246        }
247
248        let where_clause = if conditions.is_empty() {
249            String::new()
250        } else {
251            format!("WHERE {}", conditions.join(" AND "))
252        };
253
254        let order = if opts.order_desc { "DESC" } else { "ASC" };
255
256        let limit_clause = opts
257            .limit
258            .map(|n| format!("LIMIT {}", n))
259            .unwrap_or_default();
260
261        let sql = format!(
262            "SELECT id, daemon_id, timestamp, message FROM log_entries {} ORDER BY timestamp {}, id {} {}",
263            where_clause, order, order, limit_clause
264        );
265
266        let mut stmt = conn.prepare(&sql).into_diagnostic()?;
267        let params_ref: Vec<&dyn rusqlite::ToSql> =
268            query_params.iter().map(|p| p.as_ref()).collect();
269        let rows = stmt
270            .query_map(params_ref.as_slice(), |row| {
271                let id: i64 = row.get(0)?;
272                let daemon_id: String = row.get(1)?;
273                let ts_millis: i64 = row.get(2)?;
274                let message: String = row.get(3)?;
275                let timestamp = Local
276                    .timestamp_millis_opt(ts_millis)
277                    .single()
278                    .unwrap_or_else(Local::now);
279                Ok(LogEntry {
280                    id,
281                    daemon_id,
282                    timestamp,
283                    message,
284                })
285            })
286            .into_diagnostic()?;
287
288        let mut entries = Vec::new();
289        for row in rows {
290            entries.push(row.into_diagnostic()?);
291        }
292        Ok(entries)
293    }
294
295    fn tail(&self, daemon_id: &DaemonId, after_id: Option<i64>) -> Result<Vec<LogEntry>> {
296        self.query(&LogQuery {
297            daemon_ids: vec![daemon_id.qualified()],
298            from: None,
299            to: None,
300            limit: None,
301            order_desc: false,
302            after_id,
303        })
304    }
305
306    fn clear(&self, daemon_ids: &[DaemonId]) -> Result<()> {
307        let mut conn = self.conn.lock().unwrap();
308        let tx = conn.transaction().into_diagnostic()?;
309        for id in daemon_ids {
310            tx.execute(
311                "DELETE FROM log_entries WHERE daemon_id = ?1",
312                params![id.qualified()],
313            )
314            .into_diagnostic()?;
315            tx.execute(
316                "INSERT INTO log_clear_generations (daemon_id, generation)
317                 VALUES (?1, 1)
318                 ON CONFLICT(daemon_id) DO UPDATE SET generation = generation + 1",
319                params![id.qualified()],
320            )
321            .into_diagnostic()?;
322        }
323        tx.commit().into_diagnostic()?;
324        Ok(())
325    }
326
327    fn list_daemon_ids(&self) -> Result<Vec<String>> {
328        let conn = self.conn.lock().unwrap();
329        let mut stmt = conn
330            .prepare("SELECT DISTINCT daemon_id FROM log_entries")
331            .into_diagnostic()?;
332        let ids = stmt
333            .query_map([], |row| {
334                let id: String = row.get(0)?;
335                Ok(id)
336            })
337            .into_diagnostic()?
338            .filter_map(|r| r.ok())
339            .collect();
340        Ok(ids)
341    }
342
343    fn apply_retention(
344        &self,
345        policy: &super::RetentionPolicy,
346        excluded_daemon_ids: &[DaemonId],
347    ) -> Result<u64> {
348        let daemon_ids = self.list_daemon_ids()?;
349        let excluded: HashSet<String> = excluded_daemon_ids.iter().map(|d| d.qualified()).collect();
350        let mut total = 0u64;
351        for id_str in daemon_ids {
352            if excluded.contains(&id_str) {
353                continue;
354            }
355            let id = DaemonId::parse(&id_str).unwrap_or_else(|_| {
356                DaemonId::try_new("global", &id_str).unwrap_or_else(|_| DaemonId::pitchfork())
357            });
358            if let Some(dur) = policy.age {
359                total += self.rotate_by_age(&id, dur)?;
360            }
361            if let Some(n) = policy.count {
362                total += self.rotate_by_count(&id, n)?;
363            }
364        }
365        Ok(total)
366    }
367
368    fn apply_retention_for_daemon(
369        &self,
370        daemon_id: &DaemonId,
371        policy: &super::RetentionPolicy,
372    ) -> Result<u64> {
373        let mut total = 0u64;
374        if let Some(dur) = policy.age {
375            total += self.rotate_by_age(daemon_id, dur)?;
376        }
377        if let Some(n) = policy.count {
378            total += self.rotate_by_count(daemon_id, n)?;
379        }
380        Ok(total)
381    }
382
383    fn last_clear_generation(&self, daemon_id: &DaemonId) -> Result<Option<u64>> {
384        let conn = self.conn.lock().unwrap();
385        let mut stmt = conn
386            .prepare("SELECT generation FROM log_clear_generations WHERE daemon_id = ?1")
387            .into_diagnostic()?;
388        let generation: Option<u64> = stmt
389            .query_row(params![daemon_id.qualified()], |row| row.get(0))
390            .optional()
391            .into_diagnostic()?;
392        Ok(generation)
393    }
394}
395
396/// Global singleton log store.
397use once_cell::sync::Lazy;
398use std::sync::Arc;
399
400pub static LOG_STORE: Lazy<Arc<SqliteLogStore>> = Lazy::new(|| {
401    let path = crate::env::PITCHFORK_LOGS_DIR.join("logs.db");
402    let mut is_fallback = false;
403    let store = Arc::new(SqliteLogStore::open(&path).unwrap_or_else(|e| {
404        error!(
405            "failed to open log store at {}: {e}. Falling back to in-memory store; logs will not persist across restarts.",
406            path.display()
407        );
408        is_fallback = true;
409        SqliteLogStore::open(":memory:").expect("in-memory SQLite should always open")
410    }));
411
412    // Auto-migrate any legacy text log files into SQLite on first access.
413    // This runs once per process startup and is idempotent.
414    // Skip migration when using the in-memory fallback to prevent data loss.
415    if !is_fallback {
416        if let Err(e) = auto_migrate_legacy_logs(&store) {
417            warn!("legacy log auto-migration failed: {e}");
418        }
419    } else {
420        warn!(
421            "skipping legacy log auto-migration because log store is in-memory (no durable destination)"
422        );
423    }
424
425    store
426});
427
428/// Auto-migrate legacy text log files into the SQLite log store.
429///
430/// Scans the logs directory for directories matching the new-format layout
431/// (`namespace--name/namespace--name.log`), attempts to parse the directory
432/// name as a valid safe-path daemon ID, and imports the content into SQLite.
433/// This is idempotent: re-running it on already-migrated data is a no-op
434/// because the legacy text files are deleted after successful import.
435fn auto_migrate_legacy_logs(store: &SqliteLogStore) -> Result<()> {
436    let logs_dir = &*crate::env::PITCHFORK_LOGS_DIR;
437    if !logs_dir.exists() {
438        return Ok(());
439    }
440
441    let Ok(entries) = std::fs::read_dir(logs_dir) else {
442        return Ok(());
443    };
444
445    let mut total_migrated = 0u64;
446    let mut migrated_ids = Vec::new();
447
448    for entry in entries.flatten() {
449        let path = entry.path();
450        if !path.is_dir() {
451            continue;
452        }
453        // Skip the supervisor's own log directory.
454        let file_name = path
455            .file_name()
456            .map_or(String::new(), |n| n.to_string_lossy().to_string());
457        if file_name == "pitchfork" {
458            continue;
459        }
460
461        // Only consider directories that look like new-format safe-paths
462        if !file_name.contains("--") {
463            continue;
464        }
465        let log_file = path.join(format!("{file_name}.log"));
466        if !log_file.exists() {
467            continue;
468        }
469
470        let daemon_id = match DaemonId::from_safe_path(&file_name) {
471            Ok(id) => id,
472            Err(_) => continue,
473        };
474
475        // Skip the supervisor's own daemon; its log directory may be present
476        // under logs/ but should never be imported into the user-facing log store.
477        if daemon_id == DaemonId::pitchfork() {
478            continue;
479        }
480
481        match store.migrate_daemon_text_logs(&daemon_id) {
482            Ok(0) => {}
483            Ok(n) => {
484                total_migrated += n;
485                migrated_ids.push(daemon_id.qualified());
486            }
487            Err(e) => {
488                warn!(
489                    "failed to migrate text logs for {}: {e}",
490                    daemon_id.qualified()
491                );
492            }
493        }
494    }
495
496    if total_migrated > 0 {
497        warn!(
498            "auto-migrated {total_migrated} legacy log entries from {count} daemon(s): {ids}",
499            count = migrated_ids.len(),
500            ids = migrated_ids.join(", ")
501        );
502    }
503
504    Ok(())
505}