Skip to main content

agent_ask/
server.rs

1//! axum-based HTTP server (mirror of `src/server.ts`).
2
3use std::sync::Arc;
4
5use axum::{
6    body::Bytes,
7    extract::{Path, Query, Request, State},
8    http::{header, HeaderMap, StatusCode},
9    response::{IntoResponse, Response},
10    routing::{get, post},
11    Json, Router,
12};
13use chrono::{DateTime, Utc};
14use serde_json::{json, Value};
15
16use crate::artifact::verify_artifact;
17use crate::store::Store;
18
19const MAX_BODY: usize = 64 * 1024;
20
21#[derive(Clone)]
22pub struct AppState {
23    pub store: Store,
24    pub now_fn: Arc<dyn Fn() -> DateTime<Utc> + Send + Sync>,
25}
26
27impl AppState {
28    pub fn new(store: Store) -> Self {
29        Self { store, now_fn: Arc::new(Utc::now) }
30    }
31
32    pub fn with_now_fn<F>(store: Store, now_fn: F) -> Self
33    where
34        F: Fn() -> DateTime<Utc> + Send + Sync + 'static,
35    {
36        Self { store, now_fn: Arc::new(now_fn) }
37    }
38}
39
40pub fn create_app(state: AppState) -> Router {
41    Router::new()
42        .route("/questions", post(post_question).get(get_questions))
43        .route("/answers", post(post_answer))
44        .route("/ratings", post(post_rating))
45        .route("/artifact/:cid", get(get_artifact))
46        .route("/feed", get(get_feed))
47        .with_state(state)
48}
49
50async fn post_question(state: State<AppState>, headers: HeaderMap, req: Request) -> Response {
51    handle_post(state, headers, req, "question").await
52}
53
54async fn post_answer(state: State<AppState>, headers: HeaderMap, req: Request) -> Response {
55    handle_post(state, headers, req, "answer").await
56}
57
58async fn post_rating(state: State<AppState>, headers: HeaderMap, req: Request) -> Response {
59    handle_post(state, headers, req, "rating").await
60}
61
62async fn handle_post(
63    State(state): State<AppState>,
64    headers: HeaderMap,
65    req: Request,
66    expected_kind: &str,
67) -> Response {
68    if let Some(cl) = headers.get(header::CONTENT_LENGTH) {
69        if let Ok(s) = cl.to_str() {
70            if let Ok(n) = s.parse::<usize>() {
71                if n > MAX_BODY {
72                    return (
73                        StatusCode::PAYLOAD_TOO_LARGE,
74                        Json(json!({"error": "body too large"})),
75                    ).into_response();
76                }
77            }
78        }
79    }
80    let body = match axum::body::to_bytes(req.into_body(), MAX_BODY + 1).await {
81        Ok(b) => b,
82        Err(_) => {
83            return (StatusCode::PAYLOAD_TOO_LARGE, Json(json!({"error": "body too large"})))
84                .into_response();
85        }
86    };
87    if body.len() > MAX_BODY {
88        return (StatusCode::PAYLOAD_TOO_LARGE, Json(json!({"error": "body too large"})))
89            .into_response();
90    }
91    if body.is_empty() {
92        return (StatusCode::BAD_REQUEST, Json(json!({"error": "invalid json"}))).into_response();
93    }
94    let parsed: Value = match serde_json::from_slice(&body) {
95        Ok(v) => v,
96        Err(_) => {
97            return (StatusCode::BAD_REQUEST, Json(json!({"error": "invalid json"})))
98                .into_response();
99        }
100    };
101    ingest(&state, parsed, expected_kind).await
102}
103
104async fn ingest(state: &AppState, body: Value, expected_kind: &str) -> Response {
105    let v = verify_artifact(&body);
106    if !v.ok {
107        return (
108            StatusCode::BAD_REQUEST,
109            Json(json!({"error": format!("verify: {}", v.errors.join("; "))})),
110        )
111            .into_response();
112    }
113    let kind = body.get("kind").and_then(|x| x.as_str()).unwrap_or("");
114    if kind != expected_kind {
115        return (
116            StatusCode::BAD_REQUEST,
117            Json(json!({"error": format!("kind mismatch: expected {expected_kind}")})),
118        )
119            .into_response();
120    }
121    let now_ms = (state.now_fn)().timestamp_millis();
122    let created_at = body.get("created_at").and_then(|v| v.as_str()).unwrap_or("");
123    let created_ms = match DateTime::parse_from_rfc3339(created_at) {
124        Ok(d) => d.with_timezone(&Utc).timestamp_millis(),
125        Err(_) => {
126            return (StatusCode::BAD_REQUEST, Json(json!({"error": "invalid created_at"})))
127                .into_response();
128        }
129    };
130    if (now_ms - created_ms).abs() > 24 * 60 * 60 * 1000 {
131        return (
132            StatusCode::BAD_REQUEST,
133            Json(json!({"error": "created_at outside ±24h window"})),
134        )
135            .into_response();
136    }
137    match kind {
138        "answer" => {
139            let qcid = body.get("question_cid").and_then(|v| v.as_str()).unwrap_or("");
140            match state.store.has_artifact(qcid) {
141                Ok(false) => return (
142                    StatusCode::BAD_REQUEST,
143                    Json(json!({"error": "question_cid not known locally"})),
144                ).into_response(),
145                Err(e) => return (
146                    StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": e.to_string()})),
147                ).into_response(),
148                _ => {}
149            }
150        }
151        "rating" => {
152            let tcid = body.get("target_cid").and_then(|v| v.as_str()).unwrap_or("");
153            match state.store.has_artifact(tcid) {
154                Ok(false) => return (
155                    StatusCode::BAD_REQUEST,
156                    Json(json!({"error": "target_cid not known locally"})),
157                ).into_response(),
158                Err(e) => return (
159                    StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": e.to_string()})),
160                ).into_response(),
161                _ => {}
162            }
163        }
164        _ => {}
165    }
166    match state.store.insert_artifact(&body) {
167        Ok(cid) => (StatusCode::CREATED, Json(json!({"cid": cid}))).into_response(),
168        Err(e) => (
169            StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": e.to_string()})),
170        ).into_response(),
171    }
172}
173
174async fn get_artifact(
175    State(state): State<AppState>,
176    Path(cid): Path<String>,
177) -> Response {
178    match state.store.get_artifact(&cid) {
179        Ok(Some(a)) => Json(a).into_response(),
180        Ok(None) => (StatusCode::NOT_FOUND, Json(json!({"error": "not found"}))).into_response(),
181        Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": e.to_string()}))).into_response(),
182    }
183}
184
185#[derive(serde::Deserialize)]
186struct ListQ {
187    tag: Option<String>,
188    since: Option<String>,
189}
190
191async fn get_questions(
192    State(state): State<AppState>,
193    Query(q): Query<ListQ>,
194) -> Response {
195    match state.store.list_questions(q.tag.as_deref(), q.since.as_deref(), 100) {
196        Ok(rows) => Json(rows).into_response(),
197        Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": e.to_string()}))).into_response(),
198    }
199}
200
201#[derive(serde::Deserialize)]
202struct FeedQ {
203    since: Option<String>,
204}
205
206async fn get_feed(
207    State(state): State<AppState>,
208    Query(q): Query<FeedQ>,
209) -> Response {
210    let items = match state.store.stream_feed(q.since.as_deref(), 1000) {
211        Ok(r) => r,
212        Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": e.to_string()}))).into_response(),
213    };
214    let mut buf = Vec::new();
215    for item in items {
216        // Serialize compactly, no extra whitespace.
217        let line = serde_json::to_string(&item).unwrap_or_default();
218        buf.extend_from_slice(line.as_bytes());
219        buf.push(b'\n');
220    }
221    (
222        StatusCode::OK,
223        [(header::CONTENT_TYPE, "application/x-ndjson; charset=utf-8")],
224        Bytes::from(buf),
225    )
226        .into_response()
227}