Skip to main content

clawdentity_core/runtime/
server.rs

1use std::future::Future;
2use std::net::SocketAddr;
3
4use axum::extract::State;
5use axum::http::StatusCode;
6use axum::response::IntoResponse;
7use axum::{Json, Router, routing::get, routing::post};
8use serde::Deserialize;
9use serde_json::json;
10
11use crate::connector_client::ConnectorClientSender;
12use crate::db::SqliteStore;
13use crate::db_inbound::{dead_letter_count, list_dead_letter, pending_count};
14use crate::db_outbound::{EnqueueOutboundInput, enqueue_outbound, outbound_count};
15use crate::did::parse_agent_did;
16use crate::error::{CoreError, Result};
17use crate::runtime_relay::flush_outbound_queue_to_relay;
18use crate::runtime_replay::{purge_dead_letter_messages, replay_dead_letter_messages};
19
20#[derive(Clone)]
21pub struct RuntimeServerState {
22    pub store: SqliteStore,
23    pub relay_sender: Option<ConnectorClientSender>,
24}
25
26#[derive(Debug, Deserialize)]
27#[serde(rename_all = "camelCase")]
28struct OutboundRequest {
29    #[serde(default)]
30    to_agent_did: Option<String>,
31    #[serde(default)]
32    peer_did: Option<String>,
33    payload: serde_json::Value,
34    #[serde(default)]
35    conversation_id: Option<String>,
36    #[serde(default)]
37    reply_to: Option<String>,
38}
39
40#[derive(Debug, Deserialize)]
41#[serde(rename_all = "camelCase")]
42struct DeadLetterMutationRequest {
43    #[serde(default)]
44    request_ids: Option<Vec<String>>,
45}
46
47/// TODO(clawdentity): document `create_runtime_router`.
48pub fn create_runtime_router(state: RuntimeServerState) -> Router {
49    Router::new()
50        .route("/v1/status", get(status_handler))
51        .route("/v1/outbound", post(outbound_handler))
52        .route("/v1/inbound/dead-letter", get(dead_letter_list_handler))
53        .route(
54            "/v1/inbound/dead-letter/replay",
55            post(dead_letter_replay_handler),
56        )
57        .route(
58            "/v1/inbound/dead-letter/purge",
59            post(dead_letter_purge_handler),
60        )
61        .with_state(state)
62}
63
64/// TODO(clawdentity): document `run_runtime_server`.
65pub async fn run_runtime_server(
66    bind_addr: SocketAddr,
67    state: RuntimeServerState,
68    shutdown_signal: impl Future<Output = ()> + Send + 'static,
69) -> Result<()> {
70    let listener = tokio::net::TcpListener::bind(bind_addr)
71        .await
72        .map_err(|error| CoreError::Http(error.to_string()))?;
73    axum::serve(listener, create_runtime_router(state))
74        .with_graceful_shutdown(shutdown_signal)
75        .await
76        .map_err(|error| CoreError::Http(error.to_string()))?;
77    Ok(())
78}
79
80async fn status_handler(State(state): State<RuntimeServerState>) -> impl IntoResponse {
81    let outbound_pending = outbound_count(&state.store).unwrap_or(0);
82    let inbound_pending = pending_count(&state.store).unwrap_or(0);
83    let inbound_dead_letter = dead_letter_count(&state.store).unwrap_or(0);
84    let relay = state.relay_sender.as_ref();
85
86    (
87        StatusCode::OK,
88        Json(json!({
89            "status": "ok",
90            "websocket": {
91                "connected": relay.map(|sender| sender.is_connected()).unwrap_or(false),
92                "metrics": relay.map(|sender| sender.metrics_snapshot()),
93            },
94            "outbound": {
95                "queue": {
96                    "pendingCount": outbound_pending,
97                },
98            },
99            "inbound": {
100                "pending": inbound_pending,
101                "deadLetter": inbound_dead_letter,
102            }
103        })),
104    )
105}
106
107#[allow(clippy::too_many_lines)]
108async fn outbound_handler(
109    State(state): State<RuntimeServerState>,
110    Json(request): Json<OutboundRequest>,
111) -> impl IntoResponse {
112    let Some(normalized_to_agent_did) = request.normalized_to_agent_did() else {
113        return (
114            StatusCode::BAD_REQUEST,
115            Json(json!({
116                "error": {
117                    "code": "INVALID_TO_AGENT_DID",
118                    "message": "toAgentDid or peerDid must be a valid agent DID",
119                }
120            })),
121        );
122    };
123    if parse_agent_did(&normalized_to_agent_did).is_err() {
124        return (
125            StatusCode::BAD_REQUEST,
126            Json(json!({
127                "error": {
128                    "code": "INVALID_TO_AGENT_DID",
129                    "message": "toAgentDid or peerDid must be a valid agent DID",
130                }
131            })),
132        );
133    }
134
135    let frame_id = ulid::Ulid::new().to_string();
136    let enqueue_result = enqueue_outbound(
137        &state.store,
138        EnqueueOutboundInput {
139            frame_id: frame_id.clone(),
140            frame_version: 1,
141            frame_type: "enqueue".to_string(),
142            to_agent_did: normalized_to_agent_did,
143            payload_json: request.payload.to_string(),
144            conversation_id: request.conversation_id,
145            reply_to: request.reply_to,
146        },
147    );
148    if let Err(error) = enqueue_result {
149        return (
150            StatusCode::INTERNAL_SERVER_ERROR,
151            Json(json!({
152                "error": {
153                    "code": "OUTBOUND_PERSIST_FAILED",
154                    "message": error.to_string(),
155                }
156            })),
157        );
158    }
159
160    if let Some(relay_sender) = &state.relay_sender {
161        let _ = flush_outbound_queue_to_relay(&state.store, relay_sender, 1, None).await;
162    }
163
164    (
165        StatusCode::ACCEPTED,
166        Json(json!({
167            "accepted": true,
168            "frameId": frame_id,
169        })),
170    )
171}
172
173async fn dead_letter_list_handler(State(state): State<RuntimeServerState>) -> impl IntoResponse {
174    match list_dead_letter(&state.store, 500) {
175        Ok(items) => (
176            StatusCode::OK,
177            Json(json!({
178                "status": "ok",
179                "count": items.len(),
180                "items": items,
181            })),
182        ),
183        Err(error) => (
184            StatusCode::INTERNAL_SERVER_ERROR,
185            Json(json!({
186                "error": {
187                    "code": "DEAD_LETTER_LIST_FAILED",
188                    "message": error.to_string(),
189                }
190            })),
191        ),
192    }
193}
194
195async fn dead_letter_replay_handler(
196    State(state): State<RuntimeServerState>,
197    body: Option<Json<DeadLetterMutationRequest>>,
198) -> impl IntoResponse {
199    let request_ids = body
200        .and_then(|body| body.0.request_ids)
201        .map(normalize_request_ids);
202    match replay_dead_letter_messages(&state.store, request_ids) {
203        Ok(result) => (
204            StatusCode::OK,
205            Json(json!({
206                "status": "ok",
207                "replayedCount": result.replayed_count,
208            })),
209        ),
210        Err(error) => (
211            StatusCode::INTERNAL_SERVER_ERROR,
212            Json(json!({
213                "error": {
214                    "code": "DEAD_LETTER_REPLAY_FAILED",
215                    "message": error.to_string(),
216                }
217            })),
218        ),
219    }
220}
221
222async fn dead_letter_purge_handler(
223    State(state): State<RuntimeServerState>,
224    body: Option<Json<DeadLetterMutationRequest>>,
225) -> impl IntoResponse {
226    let request_ids = body
227        .and_then(|body| body.0.request_ids)
228        .map(normalize_request_ids);
229    match purge_dead_letter_messages(&state.store, request_ids) {
230        Ok(result) => (
231            StatusCode::OK,
232            Json(json!({
233                "status": "ok",
234                "purgedCount": result.purged_count,
235            })),
236        ),
237        Err(error) => (
238            StatusCode::INTERNAL_SERVER_ERROR,
239            Json(json!({
240                "error": {
241                    "code": "DEAD_LETTER_PURGE_FAILED",
242                    "message": error.to_string(),
243                }
244            })),
245        ),
246    }
247}
248
249fn normalize_request_ids(request_ids: Vec<String>) -> Vec<String> {
250    request_ids
251        .into_iter()
252        .filter_map(|value| {
253            let trimmed = value.trim();
254            if trimmed.is_empty() {
255                None
256            } else {
257                Some(trimmed.to_string())
258            }
259        })
260        .collect()
261}
262
263impl OutboundRequest {
264    fn normalized_to_agent_did(&self) -> Option<String> {
265        self.to_agent_did
266            .as_deref()
267            .or(self.peer_did.as_deref())
268            .map(str::trim)
269            .filter(|value| !value.is_empty())
270            .map(ToOwned::to_owned)
271    }
272}
273
274#[cfg(test)]
275mod tests {
276    use axum::body::{Body, to_bytes};
277    use axum::http::{Request, StatusCode};
278    use serde_json::Value;
279    use tempfile::TempDir;
280    use tower::ServiceExt;
281
282    use crate::db::SqliteStore;
283    use crate::db_outbound::outbound_count;
284
285    use super::{RuntimeServerState, create_runtime_router};
286
287    #[tokio::test]
288    async fn status_endpoint_returns_ok_payload() {
289        let temp = TempDir::new().expect("temp dir");
290        let store = SqliteStore::open_path(temp.path().join("db.sqlite3")).expect("open db");
291        let app = create_runtime_router(RuntimeServerState {
292            store,
293            relay_sender: None,
294        });
295
296        let response = app
297            .oneshot(
298                Request::builder()
299                    .uri("/v1/status")
300                    .body(Body::empty())
301                    .expect("request"),
302            )
303            .await
304            .expect("response");
305        assert_eq!(response.status(), StatusCode::OK);
306        let body = to_bytes(response.into_body(), usize::MAX)
307            .await
308            .expect("bytes");
309        let payload: Value = serde_json::from_slice(&body).expect("json");
310        assert_eq!(
311            payload.get("status").and_then(|value| value.as_str()),
312            Some("ok")
313        );
314    }
315
316    #[tokio::test]
317    async fn outbound_endpoint_enqueues_message() {
318        let temp = TempDir::new().expect("temp dir");
319        let store = SqliteStore::open_path(temp.path().join("db.sqlite3")).expect("open db");
320        let app = create_runtime_router(RuntimeServerState {
321            store: store.clone(),
322            relay_sender: None,
323        });
324
325        let response = app
326            .oneshot(
327                Request::builder()
328                    .method("POST")
329                    .uri("/v1/outbound")
330                    .header("content-type", "application/json")
331                    .body(Body::from(
332                        "{\"toAgentDid\":\"did:cdi:registry.clawdentity.com:agent:01HF7YAT00W6W7CM7N3W5FDXT4\",\"payload\":{\"hello\":\"world\"}}",
333                    ))
334                    .expect("request"),
335            )
336            .await
337            .expect("response");
338        assert_eq!(response.status(), StatusCode::ACCEPTED);
339        assert_eq!(outbound_count(&store).expect("count"), 1);
340    }
341
342    #[tokio::test]
343    async fn outbound_endpoint_accepts_legacy_peer_did_payload() {
344        let temp = TempDir::new().expect("temp dir");
345        let store = SqliteStore::open_path(temp.path().join("db.sqlite3")).expect("open db");
346        let app = create_runtime_router(RuntimeServerState {
347            store: store.clone(),
348            relay_sender: None,
349        });
350
351        let response = app
352            .oneshot(
353                Request::builder()
354                    .method("POST")
355                    .uri("/v1/outbound")
356                    .header("content-type", "application/json")
357                    .body(Body::from(
358                        "{\"peer\":\"beta\",\"peerDid\":\"did:cdi:registry.clawdentity.com:agent:01HF7YAT00W6W7CM7N3W5FDXT4\",\"peerProxyUrl\":\"https://example.test/hooks/agent\",\"payload\":{\"hello\":\"world\"}}",
359                    ))
360                    .expect("request"),
361            )
362            .await
363            .expect("response");
364        assert_eq!(response.status(), StatusCode::ACCEPTED);
365        assert_eq!(outbound_count(&store).expect("count"), 1);
366    }
367}