use anyhow::Result;
use chrono::Utc;
use rusqlite::{params, params_from_iter, Connection};
use serde_json::Value as JsonValue;
#[derive(Debug, Clone, serde::Serialize)]
pub struct Message {
pub id: i64,
pub user_id: i64,
pub kind: String,
pub task_id: Option<String>,
pub worker_name: Option<String>,
pub error: Option<String>,
pub payload: JsonValue,
pub created_at: String,
pub read_at: Option<String>,
}
fn iso_now() -> String {
Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Micros, true)
}
#[allow(clippy::too_many_arguments)]
pub fn enqueue(
conn: &Connection,
user_id: i64,
kind: &str,
task_id: Option<&str>,
worker_name: Option<&str>,
error: Option<&str>,
payload: &JsonValue,
) -> Result<Message> {
let ts = iso_now();
let payload_json = serde_json::to_string(payload)?;
conn.execute(
"INSERT INTO controller_messages
(user_id, kind, task_id, worker_name, error, payload_json, created_at)
VALUES (?,?,?,?,?,?,?)",
params![user_id, kind, task_id, worker_name, error, payload_json, ts],
)?;
let id = conn.last_insert_rowid();
Ok(Message {
id,
user_id,
kind: kind.into(),
task_id: task_id.map(String::from),
worker_name: worker_name.map(String::from),
error: error.map(String::from),
payload: payload.clone(),
created_at: ts,
read_at: None,
})
}
fn row_to_message(r: &rusqlite::Row<'_>) -> rusqlite::Result<Message> {
let payload_json: Option<String> = r.get(6)?;
Ok(Message {
id: r.get(0)?,
user_id: r.get(1)?,
kind: r.get(2)?,
task_id: r.get(3)?,
worker_name: r.get(4)?,
error: r.get(5)?,
payload: serde_json::from_str(payload_json.as_deref().unwrap_or("{}"))
.unwrap_or(JsonValue::Null),
created_at: r.get(7)?,
read_at: r.get(8)?,
})
}
pub fn fetch_unread(
conn: &Connection,
user_id: i64,
since_id: i64,
limit: i64,
) -> Result<Vec<Message>> {
let mut stmt = conn.prepare(
"SELECT id, user_id, kind, task_id, worker_name, error, payload_json, created_at, read_at
FROM controller_messages
WHERE user_id=? AND id>? AND read_at IS NULL
ORDER BY id ASC LIMIT ?",
)?;
let rows = stmt
.query_map(params![user_id, since_id, limit], row_to_message)?
.collect::<rusqlite::Result<Vec<_>>>()?;
Ok(rows)
}
pub fn fetch_all(
conn: &Connection,
user_id: i64,
since_id: i64,
limit: i64,
) -> Result<Vec<Message>> {
let mut stmt = conn.prepare(
"SELECT id, user_id, kind, task_id, worker_name, error, payload_json, created_at, read_at
FROM controller_messages
WHERE user_id=? AND id>?
ORDER BY id ASC LIMIT ?",
)?;
let rows = stmt
.query_map(params![user_id, since_id, limit], row_to_message)?
.collect::<rusqlite::Result<Vec<_>>>()?;
Ok(rows)
}
pub fn ack(conn: &Connection, user_id: i64, ids: &[i64]) -> Result<usize> {
if ids.is_empty() {
return Ok(0);
}
let placeholders = vec!["?"; ids.len()].join(",");
let sql = format!(
"UPDATE controller_messages SET read_at=?
WHERE user_id=? AND read_at IS NULL AND id IN ({placeholders})"
);
let mut params: Vec<rusqlite::types::Value> = Vec::with_capacity(2 + ids.len());
params.push(rusqlite::types::Value::Text(iso_now()));
params.push(rusqlite::types::Value::Integer(user_id));
for id in ids {
params.push(rusqlite::types::Value::Integer(*id));
}
Ok(conn.execute(&sql, params_from_iter(params.iter()))?)
}
#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;
fn fresh() -> Connection {
let c = crate::db::connect_in_memory().unwrap();
crate::db::bootstrap(&c).unwrap();
c
}
fn insert_user(c: &Connection, name: &str) -> i64 {
c.execute(
"INSERT INTO users (username, password_hash, totp_secret_enc, created_at)
VALUES (?,?,?,?)",
params![name, "h", "s", "2026-01-01T00:00:00Z"],
)
.unwrap();
c.last_insert_rowid()
}
#[test]
fn enqueue_and_fetch() {
let c = fresh();
let alice = insert_user(&c, "alice");
let m = enqueue(
&c,
alice,
"task_state",
Some("t1"),
Some("w1"),
None,
&json!({"state":"COMPLETED"}),
)
.unwrap();
assert!(m.id > 0);
let msgs = fetch_unread(&c, alice, 0, 10).unwrap();
assert_eq!(msgs.len(), 1);
assert_eq!(msgs[0].task_id.as_deref(), Some("t1"));
}
#[test]
fn per_user_isolation() {
let c = fresh();
let alice = insert_user(&c, "alice");
let bob = insert_user(&c, "bob");
enqueue(&c, alice, "task_state", None, None, None, &json!({"x":1})).unwrap();
enqueue(&c, bob, "task_state", None, None, None, &json!({"y":2})).unwrap();
let a = fetch_unread(&c, alice, 0, 10).unwrap();
let b = fetch_unread(&c, bob, 0, 10).unwrap();
assert_eq!(a.len(), 1);
assert_eq!(b.len(), 1);
assert_eq!(a[0].payload, json!({"x":1}));
assert_eq!(b[0].payload, json!({"y":2}));
}
#[test]
fn ack_marks_read() {
let c = fresh();
let alice = insert_user(&c, "alice");
let m1 = enqueue(&c, alice, "k", None, None, None, &json!({})).unwrap();
let m2 = enqueue(&c, alice, "k", None, None, None, &json!({})).unwrap();
assert_eq!(ack(&c, alice, &[m1.id]).unwrap(), 1);
let rest = fetch_unread(&c, alice, 0, 10).unwrap();
assert_eq!(rest.iter().map(|m| m.id).collect::<Vec<_>>(), vec![m2.id]);
}
#[test]
fn ack_does_not_cross_users() {
let c = fresh();
let alice = insert_user(&c, "alice");
let bob = insert_user(&c, "bob");
let m = enqueue(&c, alice, "k", None, None, None, &json!({})).unwrap();
assert_eq!(ack(&c, bob, &[m.id]).unwrap(), 0);
assert_eq!(fetch_unread(&c, alice, 0, 10).unwrap().len(), 1);
}
}