Skip to main content

clawdentity_core/runtime/
server.rs

1use std::future::Future;
2use std::net::SocketAddr;
3
4use axum::extract::State;
5use axum::http::StatusCode;
6use axum::response::IntoResponse;
7use axum::{Json, Router, routing::get, routing::post};
8use serde::Deserialize;
9use serde_json::json;
10
11use crate::connector_client::ConnectorClientSender;
12use crate::db::SqliteStore;
13use crate::db_inbound::{dead_letter_count, list_dead_letter, pending_count};
14use crate::db_outbound::{EnqueueOutboundInput, enqueue_outbound, outbound_count};
15use crate::did::parse_agent_did;
16use crate::error::{CoreError, Result};
17use crate::runtime_relay::flush_outbound_queue_to_relay;
18use crate::runtime_replay::{purge_dead_letter_messages, replay_dead_letter_messages};
19
20#[derive(Clone)]
21pub struct RuntimeServerState {
22    pub store: SqliteStore,
23    pub relay_sender: Option<ConnectorClientSender>,
24}
25
26#[derive(Debug, Deserialize)]
27#[serde(rename_all = "camelCase")]
28struct OutboundRequest {
29    to_agent_did: String,
30    payload: serde_json::Value,
31    #[serde(default)]
32    conversation_id: Option<String>,
33    #[serde(default)]
34    reply_to: Option<String>,
35}
36
37#[derive(Debug, Deserialize)]
38#[serde(rename_all = "camelCase")]
39struct DeadLetterMutationRequest {
40    #[serde(default)]
41    request_ids: Option<Vec<String>>,
42}
43
44/// TODO(clawdentity): document `create_runtime_router`.
45pub fn create_runtime_router(state: RuntimeServerState) -> Router {
46    Router::new()
47        .route("/v1/status", get(status_handler))
48        .route("/v1/outbound", post(outbound_handler))
49        .route("/v1/inbound/dead-letter", get(dead_letter_list_handler))
50        .route(
51            "/v1/inbound/dead-letter/replay",
52            post(dead_letter_replay_handler),
53        )
54        .route(
55            "/v1/inbound/dead-letter/purge",
56            post(dead_letter_purge_handler),
57        )
58        .with_state(state)
59}
60
61/// TODO(clawdentity): document `run_runtime_server`.
62pub async fn run_runtime_server(
63    bind_addr: SocketAddr,
64    state: RuntimeServerState,
65    shutdown_signal: impl Future<Output = ()> + Send + 'static,
66) -> Result<()> {
67    let listener = tokio::net::TcpListener::bind(bind_addr)
68        .await
69        .map_err(|error| CoreError::Http(error.to_string()))?;
70    axum::serve(listener, create_runtime_router(state))
71        .with_graceful_shutdown(shutdown_signal)
72        .await
73        .map_err(|error| CoreError::Http(error.to_string()))?;
74    Ok(())
75}
76
77async fn status_handler(State(state): State<RuntimeServerState>) -> impl IntoResponse {
78    let outbound_pending = outbound_count(&state.store).unwrap_or(0);
79    let inbound_pending = pending_count(&state.store).unwrap_or(0);
80    let inbound_dead_letter = dead_letter_count(&state.store).unwrap_or(0);
81    let relay = state.relay_sender.as_ref();
82
83    (
84        StatusCode::OK,
85        Json(json!({
86            "status": "ok",
87            "websocket": {
88                "connected": relay.map(|sender| sender.is_connected()).unwrap_or(false),
89                "metrics": relay.map(|sender| sender.metrics_snapshot()),
90            },
91            "outbound": {
92                "queue": {
93                    "pendingCount": outbound_pending,
94                },
95            },
96            "inbound": {
97                "pending": inbound_pending,
98                "deadLetter": inbound_dead_letter,
99            }
100        })),
101    )
102}
103
104#[allow(clippy::too_many_lines)]
105async fn outbound_handler(
106    State(state): State<RuntimeServerState>,
107    Json(request): Json<OutboundRequest>,
108) -> impl IntoResponse {
109    let normalized_to_agent_did = request.to_agent_did.trim().to_string();
110    if parse_agent_did(&normalized_to_agent_did).is_err() {
111        return (
112            StatusCode::BAD_REQUEST,
113            Json(json!({
114                "error": {
115                    "code": "INVALID_TO_AGENT_DID",
116                    "message": "toAgentDid must be a valid agent DID",
117                }
118            })),
119        );
120    }
121
122    let frame_id = ulid::Ulid::new().to_string();
123    let enqueue_result = enqueue_outbound(
124        &state.store,
125        EnqueueOutboundInput {
126            frame_id: frame_id.clone(),
127            frame_version: 1,
128            frame_type: "enqueue".to_string(),
129            to_agent_did: normalized_to_agent_did,
130            payload_json: request.payload.to_string(),
131            conversation_id: request.conversation_id,
132            reply_to: request.reply_to,
133        },
134    );
135    if let Err(error) = enqueue_result {
136        return (
137            StatusCode::INTERNAL_SERVER_ERROR,
138            Json(json!({
139                "error": {
140                    "code": "OUTBOUND_PERSIST_FAILED",
141                    "message": error.to_string(),
142                }
143            })),
144        );
145    }
146
147    if let Some(relay_sender) = &state.relay_sender {
148        let _ = flush_outbound_queue_to_relay(&state.store, relay_sender, 1, None).await;
149    }
150
151    (
152        StatusCode::ACCEPTED,
153        Json(json!({
154            "accepted": true,
155            "frameId": frame_id,
156        })),
157    )
158}
159
160async fn dead_letter_list_handler(State(state): State<RuntimeServerState>) -> impl IntoResponse {
161    match list_dead_letter(&state.store, 500) {
162        Ok(items) => (
163            StatusCode::OK,
164            Json(json!({
165                "status": "ok",
166                "count": items.len(),
167                "items": items,
168            })),
169        ),
170        Err(error) => (
171            StatusCode::INTERNAL_SERVER_ERROR,
172            Json(json!({
173                "error": {
174                    "code": "DEAD_LETTER_LIST_FAILED",
175                    "message": error.to_string(),
176                }
177            })),
178        ),
179    }
180}
181
182async fn dead_letter_replay_handler(
183    State(state): State<RuntimeServerState>,
184    body: Option<Json<DeadLetterMutationRequest>>,
185) -> impl IntoResponse {
186    let request_ids = body
187        .and_then(|body| body.0.request_ids)
188        .map(normalize_request_ids);
189    match replay_dead_letter_messages(&state.store, request_ids) {
190        Ok(result) => (
191            StatusCode::OK,
192            Json(json!({
193                "status": "ok",
194                "replayedCount": result.replayed_count,
195            })),
196        ),
197        Err(error) => (
198            StatusCode::INTERNAL_SERVER_ERROR,
199            Json(json!({
200                "error": {
201                    "code": "DEAD_LETTER_REPLAY_FAILED",
202                    "message": error.to_string(),
203                }
204            })),
205        ),
206    }
207}
208
209async fn dead_letter_purge_handler(
210    State(state): State<RuntimeServerState>,
211    body: Option<Json<DeadLetterMutationRequest>>,
212) -> impl IntoResponse {
213    let request_ids = body
214        .and_then(|body| body.0.request_ids)
215        .map(normalize_request_ids);
216    match purge_dead_letter_messages(&state.store, request_ids) {
217        Ok(result) => (
218            StatusCode::OK,
219            Json(json!({
220                "status": "ok",
221                "purgedCount": result.purged_count,
222            })),
223        ),
224        Err(error) => (
225            StatusCode::INTERNAL_SERVER_ERROR,
226            Json(json!({
227                "error": {
228                    "code": "DEAD_LETTER_PURGE_FAILED",
229                    "message": error.to_string(),
230                }
231            })),
232        ),
233    }
234}
235
236fn normalize_request_ids(request_ids: Vec<String>) -> Vec<String> {
237    request_ids
238        .into_iter()
239        .filter_map(|value| {
240            let trimmed = value.trim();
241            if trimmed.is_empty() {
242                None
243            } else {
244                Some(trimmed.to_string())
245            }
246        })
247        .collect()
248}
249
250#[cfg(test)]
251mod tests {
252    use axum::body::{Body, to_bytes};
253    use axum::http::{Request, StatusCode};
254    use serde_json::Value;
255    use tempfile::TempDir;
256    use tower::ServiceExt;
257
258    use crate::db::SqliteStore;
259    use crate::db_outbound::outbound_count;
260
261    use super::{RuntimeServerState, create_runtime_router};
262
263    #[tokio::test]
264    async fn status_endpoint_returns_ok_payload() {
265        let temp = TempDir::new().expect("temp dir");
266        let store = SqliteStore::open_path(temp.path().join("db.sqlite3")).expect("open db");
267        let app = create_runtime_router(RuntimeServerState {
268            store,
269            relay_sender: None,
270        });
271
272        let response = app
273            .oneshot(
274                Request::builder()
275                    .uri("/v1/status")
276                    .body(Body::empty())
277                    .expect("request"),
278            )
279            .await
280            .expect("response");
281        assert_eq!(response.status(), StatusCode::OK);
282        let body = to_bytes(response.into_body(), usize::MAX)
283            .await
284            .expect("bytes");
285        let payload: Value = serde_json::from_slice(&body).expect("json");
286        assert_eq!(
287            payload.get("status").and_then(|value| value.as_str()),
288            Some("ok")
289        );
290    }
291
292    #[tokio::test]
293    async fn outbound_endpoint_enqueues_message() {
294        let temp = TempDir::new().expect("temp dir");
295        let store = SqliteStore::open_path(temp.path().join("db.sqlite3")).expect("open db");
296        let app = create_runtime_router(RuntimeServerState {
297            store: store.clone(),
298            relay_sender: None,
299        });
300
301        let response = app
302            .oneshot(
303                Request::builder()
304                    .method("POST")
305                    .uri("/v1/outbound")
306                    .header("content-type", "application/json")
307                    .body(Body::from(
308                        "{\"toAgentDid\":\"did:cdi:registry.clawdentity.com:agent:01HF7YAT00W6W7CM7N3W5FDXT4\",\"payload\":{\"hello\":\"world\"}}",
309                    ))
310                    .expect("request"),
311            )
312            .await
313            .expect("response");
314        assert_eq!(response.status(), StatusCode::ACCEPTED);
315        assert_eq!(outbound_count(&store).expect("count"), 1);
316    }
317}