dragoon-server 0.1.0

Public-relay server for the dragoon remote-executor: axum + rusqlite + ed25519 task signing + per-user message inbox.
Documentation
//! Per-user controller-message inbox. Server enqueues task-state +
//! worker-error events; ctl drains them via /v1/messages.

use anyhow::Result;
use chrono::Utc;
use rusqlite::{params, params_from_iter, Connection};
use serde_json::Value as JsonValue;

#[derive(Debug, Clone, serde::Serialize)]
pub struct Message {
    pub id: i64,
    pub user_id: i64,
    pub kind: String,
    pub task_id: Option<String>,
    pub worker_name: Option<String>,
    pub error: Option<String>,
    pub payload: JsonValue,
    pub created_at: String,
    pub read_at: Option<String>,
}

fn iso_now() -> String {
    Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Micros, true)
}

#[allow(clippy::too_many_arguments)]
pub fn enqueue(
    conn: &Connection,
    user_id: i64,
    kind: &str,
    task_id: Option<&str>,
    worker_name: Option<&str>,
    error: Option<&str>,
    payload: &JsonValue,
) -> Result<Message> {
    let ts = iso_now();
    let payload_json = serde_json::to_string(payload)?;
    conn.execute(
        "INSERT INTO controller_messages
         (user_id, kind, task_id, worker_name, error, payload_json, created_at)
         VALUES (?,?,?,?,?,?,?)",
        params![user_id, kind, task_id, worker_name, error, payload_json, ts],
    )?;
    let id = conn.last_insert_rowid();
    Ok(Message {
        id,
        user_id,
        kind: kind.into(),
        task_id: task_id.map(String::from),
        worker_name: worker_name.map(String::from),
        error: error.map(String::from),
        payload: payload.clone(),
        created_at: ts,
        read_at: None,
    })
}

fn row_to_message(r: &rusqlite::Row<'_>) -> rusqlite::Result<Message> {
    let payload_json: Option<String> = r.get(6)?;
    Ok(Message {
        id: r.get(0)?,
        user_id: r.get(1)?,
        kind: r.get(2)?,
        task_id: r.get(3)?,
        worker_name: r.get(4)?,
        error: r.get(5)?,
        payload: serde_json::from_str(payload_json.as_deref().unwrap_or("{}"))
            .unwrap_or(JsonValue::Null),
        created_at: r.get(7)?,
        read_at: r.get(8)?,
    })
}

pub fn fetch_unread(
    conn: &Connection,
    user_id: i64,
    since_id: i64,
    limit: i64,
) -> Result<Vec<Message>> {
    let mut stmt = conn.prepare(
        "SELECT id, user_id, kind, task_id, worker_name, error, payload_json, created_at, read_at
         FROM controller_messages
         WHERE user_id=? AND id>? AND read_at IS NULL
         ORDER BY id ASC LIMIT ?",
    )?;
    let rows = stmt
        .query_map(params![user_id, since_id, limit], row_to_message)?
        .collect::<rusqlite::Result<Vec<_>>>()?;
    Ok(rows)
}

pub fn fetch_all(
    conn: &Connection,
    user_id: i64,
    since_id: i64,
    limit: i64,
) -> Result<Vec<Message>> {
    let mut stmt = conn.prepare(
        "SELECT id, user_id, kind, task_id, worker_name, error, payload_json, created_at, read_at
         FROM controller_messages
         WHERE user_id=? AND id>?
         ORDER BY id ASC LIMIT ?",
    )?;
    let rows = stmt
        .query_map(params![user_id, since_id, limit], row_to_message)?
        .collect::<rusqlite::Result<Vec<_>>>()?;
    Ok(rows)
}

pub fn ack(conn: &Connection, user_id: i64, ids: &[i64]) -> Result<usize> {
    if ids.is_empty() {
        return Ok(0);
    }
    let placeholders = vec!["?"; ids.len()].join(",");
    let sql = format!(
        "UPDATE controller_messages SET read_at=?
         WHERE user_id=? AND read_at IS NULL AND id IN ({placeholders})"
    );
    let mut params: Vec<rusqlite::types::Value> = Vec::with_capacity(2 + ids.len());
    params.push(rusqlite::types::Value::Text(iso_now()));
    params.push(rusqlite::types::Value::Integer(user_id));
    for id in ids {
        params.push(rusqlite::types::Value::Integer(*id));
    }
    Ok(conn.execute(&sql, params_from_iter(params.iter()))?)
}

#[cfg(test)]
mod tests {
    use super::*;
    use serde_json::json;

    fn fresh() -> Connection {
        let c = crate::db::connect_in_memory().unwrap();
        crate::db::bootstrap(&c).unwrap();
        c
    }

    fn insert_user(c: &Connection, name: &str) -> i64 {
        c.execute(
            "INSERT INTO users (username, password_hash, totp_secret_enc, created_at)
             VALUES (?,?,?,?)",
            params![name, "h", "s", "2026-01-01T00:00:00Z"],
        )
        .unwrap();
        c.last_insert_rowid()
    }

    #[test]
    fn enqueue_and_fetch() {
        let c = fresh();
        let alice = insert_user(&c, "alice");
        let m = enqueue(
            &c,
            alice,
            "task_state",
            Some("t1"),
            Some("w1"),
            None,
            &json!({"state":"COMPLETED"}),
        )
        .unwrap();
        assert!(m.id > 0);
        let msgs = fetch_unread(&c, alice, 0, 10).unwrap();
        assert_eq!(msgs.len(), 1);
        assert_eq!(msgs[0].task_id.as_deref(), Some("t1"));
    }

    #[test]
    fn per_user_isolation() {
        let c = fresh();
        let alice = insert_user(&c, "alice");
        let bob = insert_user(&c, "bob");
        enqueue(&c, alice, "task_state", None, None, None, &json!({"x":1})).unwrap();
        enqueue(&c, bob, "task_state", None, None, None, &json!({"y":2})).unwrap();
        let a = fetch_unread(&c, alice, 0, 10).unwrap();
        let b = fetch_unread(&c, bob, 0, 10).unwrap();
        assert_eq!(a.len(), 1);
        assert_eq!(b.len(), 1);
        assert_eq!(a[0].payload, json!({"x":1}));
        assert_eq!(b[0].payload, json!({"y":2}));
    }

    #[test]
    fn ack_marks_read() {
        let c = fresh();
        let alice = insert_user(&c, "alice");
        let m1 = enqueue(&c, alice, "k", None, None, None, &json!({})).unwrap();
        let m2 = enqueue(&c, alice, "k", None, None, None, &json!({})).unwrap();
        assert_eq!(ack(&c, alice, &[m1.id]).unwrap(), 1);
        let rest = fetch_unread(&c, alice, 0, 10).unwrap();
        assert_eq!(rest.iter().map(|m| m.id).collect::<Vec<_>>(), vec![m2.id]);
    }

    #[test]
    fn ack_does_not_cross_users() {
        let c = fresh();
        let alice = insert_user(&c, "alice");
        let bob = insert_user(&c, "bob");
        let m = enqueue(&c, alice, "k", None, None, None, &json!({})).unwrap();
        assert_eq!(ack(&c, bob, &[m.id]).unwrap(), 0);
        assert_eq!(fetch_unread(&c, alice, 0, 10).unwrap().len(), 1);
    }
}