use crate::Result;
use crate::daemon_id::DaemonId;
use crate::log_store::{LogEntry, LogQuery, LogStore};
use chrono::{DateTime, Local, TimeZone};
use log::error;
use miette::IntoDiagnostic;
use rusqlite::{Connection, OptionalExtension, params};
use std::collections::HashSet;
use std::io::{BufRead, BufReader};
use std::path::PathBuf;
use std::sync::Mutex;
pub struct SqliteLogStore {
conn: Mutex<Connection>,
}
impl SqliteLogStore {
pub fn open(path: impl Into<PathBuf>) -> Result<Self> {
let path = path.into();
if let Some(parent) = path.parent() {
std::fs::create_dir_all(parent).into_diagnostic()?;
}
let conn = Connection::open(&path).into_diagnostic()?;
conn.execute_batch(
"PRAGMA journal_mode = WAL;
PRAGMA synchronous = NORMAL;",
)
.into_diagnostic()?;
conn.execute(
"CREATE TABLE IF NOT EXISTS log_entries (
id INTEGER PRIMARY KEY AUTOINCREMENT,
daemon_id TEXT NOT NULL,
timestamp INTEGER NOT NULL,
message TEXT NOT NULL
);",
[],
)
.into_diagnostic()?;
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_daemon_ts ON log_entries(daemon_id, timestamp);",
[],
)
.into_diagnostic()?;
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_daemon_id ON log_entries(daemon_id, id);",
[],
)
.into_diagnostic()?;
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_timestamp ON log_entries(timestamp);",
[],
)
.into_diagnostic()?;
conn.execute(
"CREATE TABLE IF NOT EXISTS log_clear_generations (
daemon_id TEXT PRIMARY KEY,
generation INTEGER NOT NULL DEFAULT 0
);",
[],
)
.into_diagnostic()?;
Ok(Self {
conn: Mutex::new(conn),
})
}
pub fn rotate_by_age(&self, daemon_id: &DaemonId, max_age: chrono::Duration) -> Result<u64> {
let cutoff = (Local::now() - max_age).timestamp_millis();
let conn = self.conn.lock().unwrap();
let rows = conn
.execute(
"DELETE FROM log_entries WHERE daemon_id = ?1 AND timestamp < ?2",
params![daemon_id.qualified(), cutoff],
)
.into_diagnostic()?;
Ok(rows as u64)
}
pub fn rotate_by_count(&self, daemon_id: &DaemonId, max_count: u64) -> Result<u64> {
let mut conn = self.conn.lock().unwrap();
let tx = conn.transaction().into_diagnostic()?;
let count: i64 = tx
.query_row(
"SELECT COUNT(*) FROM log_entries WHERE daemon_id = ?1",
[daemon_id.qualified()],
|row| row.get(0),
)
.into_diagnostic()?;
let to_delete = count.saturating_sub(max_count as i64);
let total_deleted = if to_delete > 0 {
let rows = tx
.execute(
"DELETE FROM log_entries WHERE id IN (
SELECT id FROM log_entries WHERE daemon_id = ?1 ORDER BY timestamp ASC, id ASC LIMIT ?2
)",
params![daemon_id.qualified(), to_delete],
)
.into_diagnostic()?;
rows as u64
} else {
0
};
tx.commit().into_diagnostic()?;
Ok(total_deleted)
}
pub fn migrate_daemon_text_logs(&self, daemon_id: &DaemonId) -> Result<u64> {
let text_path = daemon_id.log_path();
if !text_path.exists() {
return Ok(0);
}
let file = std::fs::File::open(&text_path).into_diagnostic()?;
let reader = BufReader::new(file);
let re = regex::Regex::new(r"^(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}) ([\w./-]+) (.*)$")
.expect("invalid regex");
let mut current_timestamp: Option<DateTime<Local>> = None;
let mut current_message = String::new();
let mut entries = Vec::with_capacity(1000);
let mut total_migrated: u64 = 0;
for line in reader.lines() {
let line = line.into_diagnostic()?;
if let Some(caps) = re.captures(&line) {
if let Some(ts) = current_timestamp.take() {
entries.push((ts, std::mem::take(&mut current_message)));
}
let ts_str = caps.get(1).map(|m| m.as_str()).unwrap_or_default();
let msg = caps.get(3).map(|m| m.as_str()).unwrap_or_default();
if let Ok(naive) =
chrono::NaiveDateTime::parse_from_str(ts_str, "%Y-%m-%d %H:%M:%S")
{
current_timestamp = Local.from_local_datetime(&naive).single();
current_message = msg.to_string();
}
} else if current_timestamp.is_some() {
current_message.push('\n');
current_message.push_str(&line);
}
if entries.len() >= 1000 {
total_migrated += self.insert_batch(daemon_id, &entries)?;
entries.clear();
}
}
if let Some(ts) = current_timestamp {
entries.push((ts, std::mem::take(&mut current_message)));
}
if !entries.is_empty() {
total_migrated += self.insert_batch(daemon_id, &entries)?;
}
if total_migrated > 0 {
if let Err(e) = std::fs::remove_file(&text_path) {
log::warn!(
"failed to remove legacy log file after migration {}: {e}",
text_path.display()
);
}
}
Ok(total_migrated)
}
fn insert_batch(
&self,
daemon_id: &DaemonId,
entries: &[(DateTime<Local>, String)],
) -> Result<u64> {
let mut conn = self.conn.lock().unwrap();
let tx = conn.transaction().into_diagnostic()?;
let mut count = 0u64;
{
let mut stmt = tx
.prepare(
"INSERT INTO log_entries (daemon_id, timestamp, message) VALUES (?1, ?2, ?3)",
)
.into_diagnostic()?;
for (ts, msg) in entries {
stmt.execute(params![daemon_id.qualified(), ts.timestamp_millis(), msg])
.into_diagnostic()?;
count += 1;
}
}
tx.commit().into_diagnostic()?;
Ok(count)
}
}
impl LogStore for SqliteLogStore {
fn append(&self, daemon_id: &DaemonId, message: &str) -> Result<()> {
let ts = Local::now().timestamp_millis();
let id = daemon_id.qualified();
let msg = message.to_string();
let conn = self.conn.lock().unwrap();
let _ = conn
.execute(
"INSERT INTO log_entries (daemon_id, timestamp, message) VALUES (?1, ?2, ?3)",
params![id, ts, msg],
)
.into_diagnostic()?;
Ok(())
}
fn query(&self, opts: &LogQuery) -> Result<Vec<LogEntry>> {
let conn = self.conn.lock().unwrap();
let mut conditions = Vec::new();
let mut query_params: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
if !opts.daemon_ids.is_empty() {
let placeholders: Vec<String> = (1..=opts.daemon_ids.len())
.map(|i| format!("?{}", i))
.collect();
conditions.push(format!("daemon_id IN ({})", placeholders.join(", ")));
for id in &opts.daemon_ids {
query_params.push(Box::new(id.clone()));
}
}
if let Some(from) = opts.from {
conditions.push(format!("timestamp >= ?{}", query_params.len() + 1));
query_params.push(Box::new(from.timestamp_millis()));
}
if let Some(to) = opts.to {
conditions.push(format!("timestamp <= ?{}", query_params.len() + 1));
query_params.push(Box::new(to.timestamp_millis()));
}
if let Some(after_id) = opts.after_id {
conditions.push(format!("id > ?{}", query_params.len() + 1));
query_params.push(Box::new(after_id));
}
let where_clause = if conditions.is_empty() {
String::new()
} else {
format!("WHERE {}", conditions.join(" AND "))
};
let order = if opts.order_desc { "DESC" } else { "ASC" };
let limit_clause = opts
.limit
.map(|n| format!("LIMIT {}", n))
.unwrap_or_default();
let sql = format!(
"SELECT id, daemon_id, timestamp, message FROM log_entries {} ORDER BY timestamp {}, id {} {}",
where_clause, order, order, limit_clause
);
let mut stmt = conn.prepare(&sql).into_diagnostic()?;
let params_ref: Vec<&dyn rusqlite::ToSql> =
query_params.iter().map(|p| p.as_ref()).collect();
let rows = stmt
.query_map(params_ref.as_slice(), |row| {
let id: i64 = row.get(0)?;
let daemon_id: String = row.get(1)?;
let ts_millis: i64 = row.get(2)?;
let message: String = row.get(3)?;
let timestamp = Local
.timestamp_millis_opt(ts_millis)
.single()
.unwrap_or_else(Local::now);
Ok(LogEntry {
id,
daemon_id,
timestamp,
message,
})
})
.into_diagnostic()?;
let mut entries = Vec::new();
for row in rows {
entries.push(row.into_diagnostic()?);
}
Ok(entries)
}
fn tail(&self, daemon_id: &DaemonId, after_id: Option<i64>) -> Result<Vec<LogEntry>> {
self.query(&LogQuery {
daemon_ids: vec![daemon_id.qualified()],
from: None,
to: None,
limit: None,
order_desc: false,
after_id,
})
}
fn clear(&self, daemon_ids: &[DaemonId]) -> Result<()> {
let mut conn = self.conn.lock().unwrap();
let tx = conn.transaction().into_diagnostic()?;
for id in daemon_ids {
tx.execute(
"DELETE FROM log_entries WHERE daemon_id = ?1",
params![id.qualified()],
)
.into_diagnostic()?;
tx.execute(
"INSERT INTO log_clear_generations (daemon_id, generation)
VALUES (?1, 1)
ON CONFLICT(daemon_id) DO UPDATE SET generation = generation + 1",
params![id.qualified()],
)
.into_diagnostic()?;
}
tx.commit().into_diagnostic()?;
Ok(())
}
fn list_daemon_ids(&self) -> Result<Vec<String>> {
let conn = self.conn.lock().unwrap();
let mut stmt = conn
.prepare("SELECT DISTINCT daemon_id FROM log_entries")
.into_diagnostic()?;
let ids = stmt
.query_map([], |row| {
let id: String = row.get(0)?;
Ok(id)
})
.into_diagnostic()?
.filter_map(|r| r.ok())
.collect();
Ok(ids)
}
fn apply_retention(
&self,
policy: &super::RetentionPolicy,
excluded_daemon_ids: &[DaemonId],
) -> Result<u64> {
let daemon_ids = self.list_daemon_ids()?;
let excluded: HashSet<String> = excluded_daemon_ids.iter().map(|d| d.qualified()).collect();
let mut total = 0u64;
for id_str in daemon_ids {
if excluded.contains(&id_str) {
continue;
}
let id = DaemonId::parse(&id_str).unwrap_or_else(|_| {
DaemonId::try_new("global", &id_str).unwrap_or_else(|_| DaemonId::pitchfork())
});
if let Some(dur) = policy.age {
total += self.rotate_by_age(&id, dur)?;
}
if let Some(n) = policy.count {
total += self.rotate_by_count(&id, n)?;
}
}
Ok(total)
}
fn apply_retention_for_daemon(
&self,
daemon_id: &DaemonId,
policy: &super::RetentionPolicy,
) -> Result<u64> {
let mut total = 0u64;
if let Some(dur) = policy.age {
total += self.rotate_by_age(daemon_id, dur)?;
}
if let Some(n) = policy.count {
total += self.rotate_by_count(daemon_id, n)?;
}
Ok(total)
}
fn last_clear_generation(&self, daemon_id: &DaemonId) -> Result<Option<u64>> {
let conn = self.conn.lock().unwrap();
let mut stmt = conn
.prepare("SELECT generation FROM log_clear_generations WHERE daemon_id = ?1")
.into_diagnostic()?;
let generation: Option<u64> = stmt
.query_row(params![daemon_id.qualified()], |row| row.get(0))
.optional()
.into_diagnostic()?;
Ok(generation)
}
}
use once_cell::sync::Lazy;
use std::sync::Arc;
pub static LOG_STORE: Lazy<Arc<SqliteLogStore>> = Lazy::new(|| {
let path = crate::env::PITCHFORK_LOGS_DIR.join("logs.db");
let mut is_fallback = false;
let store = Arc::new(SqliteLogStore::open(&path).unwrap_or_else(|e| {
error!(
"failed to open log store at {}: {e}. Falling back to in-memory store; logs will not persist across restarts.",
path.display()
);
is_fallback = true;
SqliteLogStore::open(":memory:").expect("in-memory SQLite should always open")
}));
if !is_fallback {
if let Err(e) = auto_migrate_legacy_logs(&store) {
warn!("legacy log auto-migration failed: {e}");
}
} else {
warn!(
"skipping legacy log auto-migration because log store is in-memory (no durable destination)"
);
}
store
});
fn auto_migrate_legacy_logs(store: &SqliteLogStore) -> Result<()> {
let logs_dir = &*crate::env::PITCHFORK_LOGS_DIR;
if !logs_dir.exists() {
return Ok(());
}
let Ok(entries) = std::fs::read_dir(logs_dir) else {
return Ok(());
};
let mut total_migrated = 0u64;
let mut migrated_ids = Vec::new();
for entry in entries.flatten() {
let path = entry.path();
if !path.is_dir() {
continue;
}
let file_name = path
.file_name()
.map_or(String::new(), |n| n.to_string_lossy().to_string());
if file_name == "pitchfork" {
continue;
}
if !file_name.contains("--") {
continue;
}
let log_file = path.join(format!("{file_name}.log"));
if !log_file.exists() {
continue;
}
let daemon_id = match DaemonId::from_safe_path(&file_name) {
Ok(id) => id,
Err(_) => continue,
};
if daemon_id == DaemonId::pitchfork() {
continue;
}
match store.migrate_daemon_text_logs(&daemon_id) {
Ok(0) => {}
Ok(n) => {
total_migrated += n;
migrated_ids.push(daemon_id.qualified());
}
Err(e) => {
warn!(
"failed to migrate text logs for {}: {e}",
daemon_id.qualified()
);
}
}
}
if total_migrated > 0 {
warn!(
"auto-migrated {total_migrated} legacy log entries from {count} daemon(s): {ids}",
count = migrated_ids.len(),
ids = migrated_ids.join(", ")
);
}
Ok(())
}