dragoon-server 0.1.0

Public-relay server for the dragoon remote-executor: axum + rusqlite + ed25519 task signing + per-user message inbox.
Documentation
//! `/v1/messages` (drain) + `/v1/messages/ack` (mark read).

use std::time::{Duration, Instant};

use axum::{
    extract::{Query, State},
    http::StatusCode,
    middleware,
    routing::{get, post},
    Extension, Json, Router,
};
use serde::Deserialize;
use serde_json::{json, Value};

use crate::{
    app::{signed_request, AppState, SignedSession},
    messages_repo,
};

pub fn router(state: AppState) -> Router {
    Router::new()
        .route("/v1/messages", get(list_messages))
        .route("/v1/messages/ack", post(ack_messages))
        .layer(middleware::from_fn_with_state(state.clone(), signed_request))
        .with_state(state)
}

#[derive(Debug, Deserialize, Default)]
struct MessagesQuery {
    #[serde(default)]
    since_id: i64,
    #[serde(default)]
    include_read: bool,
    #[serde(default)]
    long_poll: bool,
}

async fn list_messages(
    State(state): State<AppState>,
    Query(q): Query<MessagesQuery>,
    Extension(sess): Extension<SignedSession>,
) -> Result<Json<Value>, StatusCode> {
    let fetch = |since_id: i64| -> anyhow::Result<Vec<messages_repo::Message>> {
        let conn = state.conn.lock().unwrap();
        if q.include_read {
            messages_repo::fetch_all(&conn, sess.0.user_id, since_id, 200)
        } else {
            messages_repo::fetch_unread(&conn, sess.0.user_id, since_id, 200)
        }
    };

    let mut msgs = fetch(q.since_id).map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
    if msgs.is_empty() && q.long_poll {
        let deadline = Instant::now() + Duration::from_secs_f64(state.settings.log_long_poll_sec);
        while Instant::now() < deadline {
            tokio::time::sleep(Duration::from_secs_f64(
                state.settings.log_long_poll_step_sec,
            ))
            .await;
            msgs = fetch(q.since_id).map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
            if !msgs.is_empty() {
                break;
            }
        }
    }
    let next_since = msgs.last().map_or(q.since_id, |m| m.id);
    Ok(Json(json!({
        "messages": msgs.into_iter().map(serialize).collect::<Vec<_>>(),
        "next_since_id": next_since,
    })))
}

fn serialize(m: messages_repo::Message) -> Value {
    json!({
        "id": m.id,
        "kind": m.kind,
        "task_id": m.task_id,
        "worker_name": m.worker_name,
        "error": m.error,
        "payload": m.payload,
        "created_at": m.created_at,
    })
}

#[derive(Debug, Deserialize)]
struct AckBody {
    #[serde(default)]
    ids: Vec<i64>,
}

async fn ack_messages(
    State(state): State<AppState>,
    Extension(sess): Extension<SignedSession>,
    Json(body): Json<AckBody>,
) -> Json<Value> {
    let conn = state.conn.lock().unwrap();
    let n = messages_repo::ack(&conn, sess.0.user_id, &body.ids).unwrap_or(0);
    Json(json!({"acked": n}))
}