use anyhow::{Context, Result};
use chrono::Utc;
use rusqlite::{params, Connection};
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Event {
pub id: String,
pub user_id: String,
pub project: String,
pub event_type: String,
pub path: Option<String>,
pub payload: serde_json::Value,
pub created_at: String,
pub updated_at: String,
pub synced_at: Option<String>,
}
#[derive(Debug, Clone, Deserialize)]
pub struct EventInput {
pub user_id: String,
pub project: String,
pub event_type: String,
#[serde(default)]
pub path: Option<String>,
#[serde(default = "default_payload")]
pub payload: serde_json::Value,
}
fn default_payload() -> serde_json::Value {
serde_json::json!({})
}
pub fn insert(conn: &Connection, input: EventInput) -> Result<Event> {
let id = super::uuid_like();
let now = Utc::now().to_rfc3339();
conn.execute(
r#"INSERT INTO events
(id, user_id, project, event_type, path, payload, created_at, updated_at)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?7)"#,
params![
id,
input.user_id,
input.project,
input.event_type,
input.path,
input.payload.to_string(),
now,
],
)
.context("insert event")?;
get(conn, &input.user_id, &id)?.context("event missing after insert")
}
pub fn get(conn: &Connection, user_id: &str, id: &str) -> Result<Option<Event>> {
let mut stmt = conn.prepare(
r#"SELECT id, user_id, project, event_type, path, payload,
created_at, updated_at, synced_at
FROM events
WHERE user_id = ?1 AND id = ?2"#,
)?;
Ok(stmt.query_row(params![user_id, id], row_to_event).ok())
}
pub fn list_unsynced(conn: &Connection, limit: usize) -> Result<Vec<Event>> {
let mut stmt = conn.prepare(
r#"SELECT id, user_id, project, event_type, path, payload,
created_at, updated_at, synced_at
FROM events
WHERE synced_at IS NULL
ORDER BY created_at ASC
LIMIT ?1"#,
)?;
let rows: Vec<Event> = stmt
.query_map(params![limit as i64], row_to_event)?
.filter_map(|r| r.ok())
.collect();
Ok(rows)
}
pub fn list_recent(
conn: &Connection,
user_id: &str,
project: Option<&str>,
limit: usize,
) -> Result<Vec<Event>> {
let rows: Vec<Event> = if let Some(p) = project {
let mut stmt = conn.prepare(
r#"SELECT id, user_id, project, event_type, path, payload,
created_at, updated_at, synced_at
FROM events
WHERE user_id = ?1 AND project = ?2
ORDER BY created_at DESC LIMIT ?3"#,
)?;
let rs: Vec<Event> = stmt
.query_map(params![user_id, p, limit as i64], row_to_event)?
.filter_map(|r| r.ok())
.collect();
rs
} else {
let mut stmt = conn.prepare(
r#"SELECT id, user_id, project, event_type, path, payload,
created_at, updated_at, synced_at
FROM events
WHERE user_id = ?1
ORDER BY created_at DESC LIMIT ?2"#,
)?;
let rs: Vec<Event> = stmt
.query_map(params![user_id, limit as i64], row_to_event)?
.filter_map(|r| r.ok())
.collect();
rs
};
Ok(rows)
}
pub fn mark_synced(conn: &Connection, ids: &[&str], when: &str) -> Result<()> {
if ids.is_empty() {
return Ok(());
}
let placeholders = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
let sql = format!(
"UPDATE events SET synced_at = ? WHERE id IN ({})",
placeholders
);
let mut stmt = conn.prepare(&sql)?;
let mut binds: Vec<rusqlite::types::Value> = Vec::with_capacity(ids.len() + 1);
binds.push(when.to_string().into());
for id in ids {
binds.push((*id).to_string().into());
}
stmt.execute(rusqlite::params_from_iter(binds.iter()))?;
Ok(())
}
fn row_to_event(row: &rusqlite::Row<'_>) -> rusqlite::Result<Event> {
let payload_str: String = row.get(5)?;
let payload =
serde_json::from_str(&payload_str).unwrap_or_else(|_| serde_json::json!({}));
Ok(Event {
id: row.get(0)?,
user_id: row.get(1)?,
project: row.get(2)?,
event_type: row.get(3)?,
path: row.get(4)?,
payload,
created_at: row.get(6)?,
updated_at: row.get(7)?,
synced_at: row.get(8)?,
})
}