1use crate::Result;
2use crate::daemon_id::DaemonId;
3use crate::log_store::{LogEntry, LogQuery, LogStore};
4use chrono::{DateTime, Local, TimeZone};
5use log::error;
6use miette::IntoDiagnostic;
7use rusqlite::{Connection, OptionalExtension, params};
8use std::collections::HashSet;
9use std::io::{BufRead, BufReader};
10use std::path::PathBuf;
11use std::sync::Mutex;
12
13pub struct SqliteLogStore {
15 conn: Mutex<Connection>,
16}
17
18impl SqliteLogStore {
19 pub fn open(path: impl Into<PathBuf>) -> Result<Self> {
21 let path = path.into();
22 if let Some(parent) = path.parent() {
23 std::fs::create_dir_all(parent).into_diagnostic()?;
24 }
25 let conn = Connection::open(&path).into_diagnostic()?;
26 conn.execute_batch(
27 "PRAGMA journal_mode = WAL;
28 PRAGMA synchronous = NORMAL;",
29 )
30 .into_diagnostic()?;
31 conn.execute(
32 "CREATE TABLE IF NOT EXISTS log_entries (
33 id INTEGER PRIMARY KEY AUTOINCREMENT,
34 daemon_id TEXT NOT NULL,
35 timestamp INTEGER NOT NULL,
36 message TEXT NOT NULL
37 );",
38 [],
39 )
40 .into_diagnostic()?;
41 conn.execute(
42 "CREATE INDEX IF NOT EXISTS idx_daemon_ts ON log_entries(daemon_id, timestamp);",
43 [],
44 )
45 .into_diagnostic()?;
46 conn.execute(
47 "CREATE INDEX IF NOT EXISTS idx_daemon_id ON log_entries(daemon_id, id);",
48 [],
49 )
50 .into_diagnostic()?;
51 conn.execute(
52 "CREATE INDEX IF NOT EXISTS idx_timestamp ON log_entries(timestamp);",
53 [],
54 )
55 .into_diagnostic()?;
56 conn.execute(
57 "CREATE TABLE IF NOT EXISTS log_clear_generations (
58 daemon_id TEXT PRIMARY KEY,
59 generation INTEGER NOT NULL DEFAULT 0
60 );",
61 [],
62 )
63 .into_diagnostic()?;
64 Ok(Self {
65 conn: Mutex::new(conn),
66 })
67 }
68
69 pub fn rotate_by_age(&self, daemon_id: &DaemonId, max_age: chrono::Duration) -> Result<u64> {
71 let cutoff = (Local::now() - max_age).timestamp_millis();
72 let conn = self.conn.lock().unwrap();
73 let rows = conn
74 .execute(
75 "DELETE FROM log_entries WHERE daemon_id = ?1 AND timestamp < ?2",
76 params![daemon_id.qualified(), cutoff],
77 )
78 .into_diagnostic()?;
79 Ok(rows as u64)
80 }
81
82 pub fn rotate_by_count(&self, daemon_id: &DaemonId, max_count: u64) -> Result<u64> {
85 let mut conn = self.conn.lock().unwrap();
86 let tx = conn.transaction().into_diagnostic()?;
87 let count: i64 = tx
88 .query_row(
89 "SELECT COUNT(*) FROM log_entries WHERE daemon_id = ?1",
90 [daemon_id.qualified()],
91 |row| row.get(0),
92 )
93 .into_diagnostic()?;
94 let to_delete = count.saturating_sub(max_count as i64);
95 let total_deleted = if to_delete > 0 {
96 let rows = tx
97 .execute(
98 "DELETE FROM log_entries WHERE id IN (
99 SELECT id FROM log_entries WHERE daemon_id = ?1 ORDER BY timestamp ASC, id ASC LIMIT ?2
100 )",
101 params![daemon_id.qualified(), to_delete],
102 )
103 .into_diagnostic()?;
104 rows as u64
105 } else {
106 0
107 };
108 tx.commit().into_diagnostic()?;
109 Ok(total_deleted)
110 }
111
112 pub fn migrate_daemon_text_logs(&self, daemon_id: &DaemonId) -> Result<u64> {
117 let text_path = daemon_id.log_path();
118 if !text_path.exists() {
119 return Ok(0);
120 }
121
122 let file = std::fs::File::open(&text_path).into_diagnostic()?;
123 let reader = BufReader::new(file);
124 let re = regex::Regex::new(r"^(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}) ([\w./-]+) (.*)$")
125 .expect("invalid regex");
126
127 let mut current_timestamp: Option<DateTime<Local>> = None;
128 let mut current_message = String::new();
129 let mut entries = Vec::with_capacity(1000);
130 let mut total_migrated: u64 = 0;
131
132 for line in reader.lines() {
133 let line = line.into_diagnostic()?;
134 if let Some(caps) = re.captures(&line) {
135 if let Some(ts) = current_timestamp.take() {
136 entries.push((ts, std::mem::take(&mut current_message)));
137 }
138 let ts_str = caps.get(1).map(|m| m.as_str()).unwrap_or_default();
139 let msg = caps.get(3).map(|m| m.as_str()).unwrap_or_default();
140 if let Ok(naive) =
141 chrono::NaiveDateTime::parse_from_str(ts_str, "%Y-%m-%d %H:%M:%S")
142 {
143 current_timestamp = Local.from_local_datetime(&naive).single();
144 current_message = msg.to_string();
145 }
146 } else if current_timestamp.is_some() {
147 current_message.push('\n');
148 current_message.push_str(&line);
149 }
150
151 if entries.len() >= 1000 {
152 total_migrated += self.insert_batch(daemon_id, &entries)?;
153 entries.clear();
154 }
155 }
156
157 if let Some(ts) = current_timestamp {
158 entries.push((ts, std::mem::take(&mut current_message)));
159 }
160
161 if !entries.is_empty() {
162 total_migrated += self.insert_batch(daemon_id, &entries)?;
163 }
164
165 if total_migrated > 0 {
166 if let Err(e) = std::fs::remove_file(&text_path) {
167 log::warn!(
168 "failed to remove legacy log file after migration {}: {e}",
169 text_path.display()
170 );
171 }
172 }
173
174 Ok(total_migrated)
175 }
176
177 fn insert_batch(
178 &self,
179 daemon_id: &DaemonId,
180 entries: &[(DateTime<Local>, String)],
181 ) -> Result<u64> {
182 let mut conn = self.conn.lock().unwrap();
183 let tx = conn.transaction().into_diagnostic()?;
184 let mut count = 0u64;
185 {
186 let mut stmt = tx
187 .prepare(
188 "INSERT INTO log_entries (daemon_id, timestamp, message) VALUES (?1, ?2, ?3)",
189 )
190 .into_diagnostic()?;
191 for (ts, msg) in entries {
192 stmt.execute(params![daemon_id.qualified(), ts.timestamp_millis(), msg])
193 .into_diagnostic()?;
194 count += 1;
195 }
196 }
197 tx.commit().into_diagnostic()?;
198 Ok(count)
199 }
200}
201
202impl LogStore for SqliteLogStore {
203 fn append(&self, daemon_id: &DaemonId, message: &str) -> Result<()> {
204 let ts = Local::now().timestamp_millis();
205 let id = daemon_id.qualified();
206 let msg = message.to_string();
207
208 let conn = self.conn.lock().unwrap();
209 let _ = conn
210 .execute(
211 "INSERT INTO log_entries (daemon_id, timestamp, message) VALUES (?1, ?2, ?3)",
212 params![id, ts, msg],
213 )
214 .into_diagnostic()?;
215 Ok(())
216 }
217
218 fn query(&self, opts: &LogQuery) -> Result<Vec<LogEntry>> {
219 let conn = self.conn.lock().unwrap();
220 let mut conditions = Vec::new();
221 let mut query_params: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
222
223 if !opts.daemon_ids.is_empty() {
224 let placeholders: Vec<String> = (1..=opts.daemon_ids.len())
225 .map(|i| format!("?{}", i))
226 .collect();
227 conditions.push(format!("daemon_id IN ({})", placeholders.join(", ")));
228 for id in &opts.daemon_ids {
229 query_params.push(Box::new(id.clone()));
230 }
231 }
232
233 if let Some(from) = opts.from {
234 conditions.push(format!("timestamp >= ?{}", query_params.len() + 1));
235 query_params.push(Box::new(from.timestamp_millis()));
236 }
237
238 if let Some(to) = opts.to {
239 conditions.push(format!("timestamp <= ?{}", query_params.len() + 1));
240 query_params.push(Box::new(to.timestamp_millis()));
241 }
242
243 if let Some(after_id) = opts.after_id {
244 conditions.push(format!("id > ?{}", query_params.len() + 1));
245 query_params.push(Box::new(after_id));
246 }
247
248 let where_clause = if conditions.is_empty() {
249 String::new()
250 } else {
251 format!("WHERE {}", conditions.join(" AND "))
252 };
253
254 let order = if opts.order_desc { "DESC" } else { "ASC" };
255
256 let limit_clause = opts
257 .limit
258 .map(|n| format!("LIMIT {}", n))
259 .unwrap_or_default();
260
261 let sql = format!(
262 "SELECT id, daemon_id, timestamp, message FROM log_entries {} ORDER BY timestamp {}, id {} {}",
263 where_clause, order, order, limit_clause
264 );
265
266 let mut stmt = conn.prepare(&sql).into_diagnostic()?;
267 let params_ref: Vec<&dyn rusqlite::ToSql> =
268 query_params.iter().map(|p| p.as_ref()).collect();
269 let rows = stmt
270 .query_map(params_ref.as_slice(), |row| {
271 let id: i64 = row.get(0)?;
272 let daemon_id: String = row.get(1)?;
273 let ts_millis: i64 = row.get(2)?;
274 let message: String = row.get(3)?;
275 let timestamp = Local
276 .timestamp_millis_opt(ts_millis)
277 .single()
278 .unwrap_or_else(Local::now);
279 Ok(LogEntry {
280 id,
281 daemon_id,
282 timestamp,
283 message,
284 })
285 })
286 .into_diagnostic()?;
287
288 let mut entries = Vec::new();
289 for row in rows {
290 entries.push(row.into_diagnostic()?);
291 }
292 Ok(entries)
293 }
294
295 fn tail(&self, daemon_id: &DaemonId, after_id: Option<i64>) -> Result<Vec<LogEntry>> {
296 self.query(&LogQuery {
297 daemon_ids: vec![daemon_id.qualified()],
298 from: None,
299 to: None,
300 limit: None,
301 order_desc: false,
302 after_id,
303 })
304 }
305
306 fn clear(&self, daemon_ids: &[DaemonId]) -> Result<()> {
307 let mut conn = self.conn.lock().unwrap();
308 let tx = conn.transaction().into_diagnostic()?;
309 for id in daemon_ids {
310 tx.execute(
311 "DELETE FROM log_entries WHERE daemon_id = ?1",
312 params![id.qualified()],
313 )
314 .into_diagnostic()?;
315 tx.execute(
316 "INSERT INTO log_clear_generations (daemon_id, generation)
317 VALUES (?1, 1)
318 ON CONFLICT(daemon_id) DO UPDATE SET generation = generation + 1",
319 params![id.qualified()],
320 )
321 .into_diagnostic()?;
322 }
323 tx.commit().into_diagnostic()?;
324 Ok(())
325 }
326
327 fn list_daemon_ids(&self) -> Result<Vec<String>> {
328 let conn = self.conn.lock().unwrap();
329 let mut stmt = conn
330 .prepare("SELECT DISTINCT daemon_id FROM log_entries")
331 .into_diagnostic()?;
332 let ids = stmt
333 .query_map([], |row| {
334 let id: String = row.get(0)?;
335 Ok(id)
336 })
337 .into_diagnostic()?
338 .filter_map(|r| r.ok())
339 .collect();
340 Ok(ids)
341 }
342
343 fn apply_retention(
344 &self,
345 policy: &super::RetentionPolicy,
346 excluded_daemon_ids: &[DaemonId],
347 ) -> Result<u64> {
348 let daemon_ids = self.list_daemon_ids()?;
349 let excluded: HashSet<String> = excluded_daemon_ids.iter().map(|d| d.qualified()).collect();
350 let mut total = 0u64;
351 for id_str in daemon_ids {
352 if excluded.contains(&id_str) {
353 continue;
354 }
355 let id = DaemonId::parse(&id_str).unwrap_or_else(|_| {
356 DaemonId::try_new("global", &id_str).unwrap_or_else(|_| DaemonId::pitchfork())
357 });
358 if let Some(dur) = policy.age {
359 total += self.rotate_by_age(&id, dur)?;
360 }
361 if let Some(n) = policy.count {
362 total += self.rotate_by_count(&id, n)?;
363 }
364 }
365 Ok(total)
366 }
367
368 fn apply_retention_for_daemon(
369 &self,
370 daemon_id: &DaemonId,
371 policy: &super::RetentionPolicy,
372 ) -> Result<u64> {
373 let mut total = 0u64;
374 if let Some(dur) = policy.age {
375 total += self.rotate_by_age(daemon_id, dur)?;
376 }
377 if let Some(n) = policy.count {
378 total += self.rotate_by_count(daemon_id, n)?;
379 }
380 Ok(total)
381 }
382
383 fn last_clear_generation(&self, daemon_id: &DaemonId) -> Result<Option<u64>> {
384 let conn = self.conn.lock().unwrap();
385 let mut stmt = conn
386 .prepare("SELECT generation FROM log_clear_generations WHERE daemon_id = ?1")
387 .into_diagnostic()?;
388 let generation: Option<u64> = stmt
389 .query_row(params![daemon_id.qualified()], |row| row.get(0))
390 .optional()
391 .into_diagnostic()?;
392 Ok(generation)
393 }
394}
395
396use once_cell::sync::Lazy;
398use std::sync::Arc;
399
400pub static LOG_STORE: Lazy<Arc<SqliteLogStore>> = Lazy::new(|| {
401 let path = crate::env::PITCHFORK_LOGS_DIR.join("logs.db");
402 let mut is_fallback = false;
403 let store = Arc::new(SqliteLogStore::open(&path).unwrap_or_else(|e| {
404 error!(
405 "failed to open log store at {}: {e}. Falling back to in-memory store; logs will not persist across restarts.",
406 path.display()
407 );
408 is_fallback = true;
409 SqliteLogStore::open(":memory:").expect("in-memory SQLite should always open")
410 }));
411
412 if !is_fallback {
416 if let Err(e) = auto_migrate_legacy_logs(&store) {
417 warn!("legacy log auto-migration failed: {e}");
418 }
419 } else {
420 warn!(
421 "skipping legacy log auto-migration because log store is in-memory (no durable destination)"
422 );
423 }
424
425 store
426});
427
428fn auto_migrate_legacy_logs(store: &SqliteLogStore) -> Result<()> {
436 let logs_dir = &*crate::env::PITCHFORK_LOGS_DIR;
437 if !logs_dir.exists() {
438 return Ok(());
439 }
440
441 let Ok(entries) = std::fs::read_dir(logs_dir) else {
442 return Ok(());
443 };
444
445 let mut total_migrated = 0u64;
446 let mut migrated_ids = Vec::new();
447
448 for entry in entries.flatten() {
449 let path = entry.path();
450 if !path.is_dir() {
451 continue;
452 }
453 let file_name = path
455 .file_name()
456 .map_or(String::new(), |n| n.to_string_lossy().to_string());
457 if file_name == "pitchfork" {
458 continue;
459 }
460
461 if !file_name.contains("--") {
463 continue;
464 }
465 let log_file = path.join(format!("{file_name}.log"));
466 if !log_file.exists() {
467 continue;
468 }
469
470 let daemon_id = match DaemonId::from_safe_path(&file_name) {
471 Ok(id) => id,
472 Err(_) => continue,
473 };
474
475 if daemon_id == DaemonId::pitchfork() {
478 continue;
479 }
480
481 match store.migrate_daemon_text_logs(&daemon_id) {
482 Ok(0) => {}
483 Ok(n) => {
484 total_migrated += n;
485 migrated_ids.push(daemon_id.qualified());
486 }
487 Err(e) => {
488 warn!(
489 "failed to migrate text logs for {}: {e}",
490 daemon_id.qualified()
491 );
492 }
493 }
494 }
495
496 if total_migrated > 0 {
497 warn!(
498 "auto-migrated {total_migrated} legacy log entries from {count} daemon(s): {ids}",
499 count = migrated_ids.len(),
500 ids = migrated_ids.join(", ")
501 );
502 }
503
504 Ok(())
505}