#![allow(dead_code)]
use anyhow::{Context, Result};
use rusqlite::{params, Connection};
use std::path::Path;
#[derive(Debug, Clone)]
pub struct Event {
pub id: Option<i64>,
pub project: String,
pub event_type: String, pub path: Option<String>,
pub payload: String, pub severity: String, pub created_at: String,
pub synced_at: Option<String>,
pub acked_at: Option<String>,
}
pub struct Db {
conn: Connection,
}
impl Db {
pub fn open(path: &Path) -> Result<Self> {
if let Some(parent) = path.parent() {
std::fs::create_dir_all(parent).ok();
}
let conn =
Connection::open(path).with_context(|| format!("open sqlite {}", path.display()))?;
let db = Db { conn };
db.migrate()?;
Ok(db)
}
fn migrate(&self) -> Result<()> {
self.conn.execute_batch(
r#"
CREATE TABLE IF NOT EXISTS worker_events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
project TEXT NOT NULL,
event_type TEXT NOT NULL,
path TEXT,
payload TEXT NOT NULL DEFAULT '{}',
severity TEXT NOT NULL DEFAULT 'info',
created_at TEXT NOT NULL,
synced_at TEXT,
acked_at TEXT
);
CREATE INDEX IF NOT EXISTS idx_worker_events_created
ON worker_events(created_at);
CREATE INDEX IF NOT EXISTS idx_worker_events_unsynced
ON worker_events(synced_at) WHERE synced_at IS NULL;
CREATE INDEX IF NOT EXISTS idx_worker_events_project
ON worker_events(project);
"#,
)?;
Ok(())
}
pub fn insert(&self, ev: &Event) -> Result<i64> {
self.conn.execute(
r#"INSERT INTO worker_events
(project, event_type, path, payload, severity, created_at)
VALUES (?1, ?2, ?3, ?4, ?5, ?6)"#,
params![
ev.project,
ev.event_type,
ev.path,
ev.payload,
ev.severity,
ev.created_at
],
)?;
Ok(self.conn.last_insert_rowid())
}
pub fn recent(&self, limit: usize) -> Result<Vec<Event>> {
let mut stmt = self.conn.prepare(
r#"SELECT id, project, event_type, path, payload, severity,
created_at, synced_at, acked_at
FROM worker_events
ORDER BY id DESC
LIMIT ?1"#,
)?;
let rows = stmt.query_map(params![limit as i64], row_to_event)?;
let mut out = Vec::new();
for r in rows {
out.push(r?);
}
out.reverse();
Ok(out)
}
pub fn since(&self, last_id: i64, limit: usize) -> Result<Vec<Event>> {
let mut stmt = self.conn.prepare(
r#"SELECT id, project, event_type, path, payload, severity,
created_at, synced_at, acked_at
FROM worker_events
WHERE id > ?1
ORDER BY id ASC
LIMIT ?2"#,
)?;
let rows = stmt.query_map(params![last_id, limit as i64], row_to_event)?;
let mut out = Vec::new();
for r in rows {
out.push(r?);
}
Ok(out)
}
pub fn unsynced(&self, limit: usize) -> Result<Vec<Event>> {
let mut stmt = self.conn.prepare(
r#"SELECT id, project, event_type, path, payload, severity,
created_at, synced_at, acked_at
FROM worker_events
WHERE synced_at IS NULL
ORDER BY id ASC
LIMIT ?1"#,
)?;
let rows = stmt.query_map(params![limit as i64], row_to_event)?;
let mut out = Vec::new();
for r in rows {
out.push(r?);
}
Ok(out)
}
pub fn mark_synced(&self, ids: &[i64], when: &str) -> Result<()> {
if ids.is_empty() {
return Ok(());
}
let placeholders = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
let sql = format!(
"UPDATE worker_events SET synced_at = ? WHERE id IN ({})",
placeholders
);
let mut stmt = self.conn.prepare(&sql)?;
let mut binds: Vec<rusqlite::types::Value> = Vec::with_capacity(ids.len() + 1);
binds.push(when.to_string().into());
for id in ids {
binds.push((*id).into());
}
stmt.execute(rusqlite::params_from_iter(binds.iter()))?;
Ok(())
}
pub fn distinct_projects(&self) -> Result<Vec<String>> {
let mut stmt = self
.conn
.prepare("SELECT DISTINCT project FROM worker_events ORDER BY project")?;
let rows = stmt.query_map([], |r| r.get::<_, String>(0))?;
let mut out = Vec::new();
for r in rows {
out.push(r?);
}
Ok(out)
}
pub fn project_has_changes_since(&self, project: &str, since_rfc3339: &str) -> Result<bool> {
let count: i64 = self.conn.query_row(
"SELECT COUNT(*) FROM worker_events
WHERE project = ?1
AND event_type IN ('file_changed', 'file_changed_continuous')
AND created_at > ?2",
params![project, since_rfc3339],
|r| r.get(0),
)?;
Ok(count > 0)
}
pub fn counts(&self) -> Result<DbCounts> {
let total: i64 = self
.conn
.query_row("SELECT COUNT(*) FROM worker_events", [], |r| r.get(0))?;
let unsynced: i64 = self.conn.query_row(
"SELECT COUNT(*) FROM worker_events WHERE synced_at IS NULL",
[],
|r| r.get(0),
)?;
Ok(DbCounts { total, unsynced })
}
}
#[derive(Debug)]
pub struct DbCounts {
pub total: i64,
pub unsynced: i64,
}
fn row_to_event(row: &rusqlite::Row<'_>) -> rusqlite::Result<Event> {
Ok(Event {
id: row.get(0)?,
project: row.get(1)?,
event_type: row.get(2)?,
path: row.get(3)?,
payload: row.get(4)?,
severity: row.get(5)?,
created_at: row.get(6)?,
synced_at: row.get(7)?,
acked_at: row.get(8)?,
})
}