use std::time::{Duration, Instant};
use axum::{
extract::{Query, State},
http::StatusCode,
middleware,
routing::{get, post},
Extension, Json, Router,
};
use serde::Deserialize;
use serde_json::{json, Value};
use crate::{
app::{signed_request, AppState, SignedSession},
messages_repo,
};
pub fn router(state: AppState) -> Router {
Router::new()
.route("/v1/messages", get(list_messages))
.route("/v1/messages/ack", post(ack_messages))
.layer(middleware::from_fn_with_state(state.clone(), signed_request))
.with_state(state)
}
#[derive(Debug, Deserialize, Default)]
struct MessagesQuery {
#[serde(default)]
since_id: i64,
#[serde(default)]
include_read: bool,
#[serde(default)]
long_poll: bool,
}
async fn list_messages(
State(state): State<AppState>,
Query(q): Query<MessagesQuery>,
Extension(sess): Extension<SignedSession>,
) -> Result<Json<Value>, StatusCode> {
let fetch = |since_id: i64| -> anyhow::Result<Vec<messages_repo::Message>> {
let conn = state.conn.lock().unwrap();
if q.include_read {
messages_repo::fetch_all(&conn, sess.0.user_id, since_id, 200)
} else {
messages_repo::fetch_unread(&conn, sess.0.user_id, since_id, 200)
}
};
let mut msgs = fetch(q.since_id).map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
if msgs.is_empty() && q.long_poll {
let deadline = Instant::now() + Duration::from_secs_f64(state.settings.log_long_poll_sec);
while Instant::now() < deadline {
tokio::time::sleep(Duration::from_secs_f64(
state.settings.log_long_poll_step_sec,
))
.await;
msgs = fetch(q.since_id).map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
if !msgs.is_empty() {
break;
}
}
}
let next_since = msgs.last().map_or(q.since_id, |m| m.id);
Ok(Json(json!({
"messages": msgs.into_iter().map(serialize).collect::<Vec<_>>(),
"next_since_id": next_since,
})))
}
fn serialize(m: messages_repo::Message) -> Value {
json!({
"id": m.id,
"kind": m.kind,
"task_id": m.task_id,
"worker_name": m.worker_name,
"error": m.error,
"payload": m.payload,
"created_at": m.created_at,
})
}
#[derive(Debug, Deserialize)]
struct AckBody {
#[serde(default)]
ids: Vec<i64>,
}
async fn ack_messages(
State(state): State<AppState>,
Extension(sess): Extension<SignedSession>,
Json(body): Json<AckBody>,
) -> Json<Value> {
let conn = state.conn.lock().unwrap();
let n = messages_repo::ack(&conn, sess.0.user_id, &body.ids).unwrap_or(0);
Json(json!({"acked": n}))
}