clawdentity_core/runtime/
server.rs1use std::future::Future;
2use std::net::SocketAddr;
3
4use axum::extract::State;
5use axum::http::StatusCode;
6use axum::response::IntoResponse;
7use axum::{Json, Router, routing::get, routing::post};
8use serde::Deserialize;
9use serde_json::json;
10
11use crate::connector_client::ConnectorClientSender;
12use crate::db::SqliteStore;
13use crate::db_inbound::{dead_letter_count, list_dead_letter, pending_count};
14use crate::db_outbound::{EnqueueOutboundInput, enqueue_outbound, outbound_count};
15use crate::did::parse_agent_did;
16use crate::error::{CoreError, Result};
17use crate::runtime_relay::flush_outbound_queue_to_relay;
18use crate::runtime_replay::{purge_dead_letter_messages, replay_dead_letter_messages};
19
20#[derive(Clone)]
21pub struct RuntimeServerState {
22 pub store: SqliteStore,
23 pub relay_sender: Option<ConnectorClientSender>,
24}
25
26#[derive(Debug, Deserialize)]
27#[serde(rename_all = "camelCase")]
28struct OutboundRequest {
29 #[serde(default)]
30 to_agent_did: Option<String>,
31 #[serde(default)]
32 peer_did: Option<String>,
33 payload: serde_json::Value,
34 #[serde(default)]
35 conversation_id: Option<String>,
36 #[serde(default)]
37 reply_to: Option<String>,
38}
39
40#[derive(Debug, Deserialize)]
41#[serde(rename_all = "camelCase")]
42struct DeadLetterMutationRequest {
43 #[serde(default)]
44 request_ids: Option<Vec<String>>,
45}
46
47pub fn create_runtime_router(state: RuntimeServerState) -> Router {
49 Router::new()
50 .route("/v1/status", get(status_handler))
51 .route("/v1/outbound", post(outbound_handler))
52 .route("/v1/inbound/dead-letter", get(dead_letter_list_handler))
53 .route(
54 "/v1/inbound/dead-letter/replay",
55 post(dead_letter_replay_handler),
56 )
57 .route(
58 "/v1/inbound/dead-letter/purge",
59 post(dead_letter_purge_handler),
60 )
61 .with_state(state)
62}
63
64pub async fn run_runtime_server(
66 bind_addr: SocketAddr,
67 state: RuntimeServerState,
68 shutdown_signal: impl Future<Output = ()> + Send + 'static,
69) -> Result<()> {
70 let listener = tokio::net::TcpListener::bind(bind_addr)
71 .await
72 .map_err(|error| CoreError::Http(error.to_string()))?;
73 axum::serve(listener, create_runtime_router(state))
74 .with_graceful_shutdown(shutdown_signal)
75 .await
76 .map_err(|error| CoreError::Http(error.to_string()))?;
77 Ok(())
78}
79
80async fn status_handler(State(state): State<RuntimeServerState>) -> impl IntoResponse {
81 let outbound_pending = outbound_count(&state.store).unwrap_or(0);
82 let inbound_pending = pending_count(&state.store).unwrap_or(0);
83 let inbound_dead_letter = dead_letter_count(&state.store).unwrap_or(0);
84 let relay = state.relay_sender.as_ref();
85
86 (
87 StatusCode::OK,
88 Json(json!({
89 "status": "ok",
90 "websocket": {
91 "connected": relay.map(|sender| sender.is_connected()).unwrap_or(false),
92 "metrics": relay.map(|sender| sender.metrics_snapshot()),
93 },
94 "outbound": {
95 "queue": {
96 "pendingCount": outbound_pending,
97 },
98 },
99 "inbound": {
100 "pending": inbound_pending,
101 "deadLetter": inbound_dead_letter,
102 }
103 })),
104 )
105}
106
107#[allow(clippy::too_many_lines)]
108async fn outbound_handler(
109 State(state): State<RuntimeServerState>,
110 Json(request): Json<OutboundRequest>,
111) -> impl IntoResponse {
112 let Some(normalized_to_agent_did) = request.normalized_to_agent_did() else {
113 return (
114 StatusCode::BAD_REQUEST,
115 Json(json!({
116 "error": {
117 "code": "INVALID_TO_AGENT_DID",
118 "message": "toAgentDid or peerDid must be a valid agent DID",
119 }
120 })),
121 );
122 };
123 if parse_agent_did(&normalized_to_agent_did).is_err() {
124 return (
125 StatusCode::BAD_REQUEST,
126 Json(json!({
127 "error": {
128 "code": "INVALID_TO_AGENT_DID",
129 "message": "toAgentDid or peerDid must be a valid agent DID",
130 }
131 })),
132 );
133 }
134
135 let frame_id = ulid::Ulid::new().to_string();
136 let enqueue_result = enqueue_outbound(
137 &state.store,
138 EnqueueOutboundInput {
139 frame_id: frame_id.clone(),
140 frame_version: 1,
141 frame_type: "enqueue".to_string(),
142 to_agent_did: normalized_to_agent_did,
143 payload_json: request.payload.to_string(),
144 conversation_id: request.conversation_id,
145 reply_to: request.reply_to,
146 },
147 );
148 if let Err(error) = enqueue_result {
149 return (
150 StatusCode::INTERNAL_SERVER_ERROR,
151 Json(json!({
152 "error": {
153 "code": "OUTBOUND_PERSIST_FAILED",
154 "message": error.to_string(),
155 }
156 })),
157 );
158 }
159
160 if let Some(relay_sender) = &state.relay_sender {
161 let _ = flush_outbound_queue_to_relay(&state.store, relay_sender, 1, None).await;
162 }
163
164 (
165 StatusCode::ACCEPTED,
166 Json(json!({
167 "accepted": true,
168 "frameId": frame_id,
169 })),
170 )
171}
172
173async fn dead_letter_list_handler(State(state): State<RuntimeServerState>) -> impl IntoResponse {
174 match list_dead_letter(&state.store, 500) {
175 Ok(items) => (
176 StatusCode::OK,
177 Json(json!({
178 "status": "ok",
179 "count": items.len(),
180 "items": items,
181 })),
182 ),
183 Err(error) => (
184 StatusCode::INTERNAL_SERVER_ERROR,
185 Json(json!({
186 "error": {
187 "code": "DEAD_LETTER_LIST_FAILED",
188 "message": error.to_string(),
189 }
190 })),
191 ),
192 }
193}
194
195async fn dead_letter_replay_handler(
196 State(state): State<RuntimeServerState>,
197 body: Option<Json<DeadLetterMutationRequest>>,
198) -> impl IntoResponse {
199 let request_ids = body
200 .and_then(|body| body.0.request_ids)
201 .map(normalize_request_ids);
202 match replay_dead_letter_messages(&state.store, request_ids) {
203 Ok(result) => (
204 StatusCode::OK,
205 Json(json!({
206 "status": "ok",
207 "replayedCount": result.replayed_count,
208 })),
209 ),
210 Err(error) => (
211 StatusCode::INTERNAL_SERVER_ERROR,
212 Json(json!({
213 "error": {
214 "code": "DEAD_LETTER_REPLAY_FAILED",
215 "message": error.to_string(),
216 }
217 })),
218 ),
219 }
220}
221
222async fn dead_letter_purge_handler(
223 State(state): State<RuntimeServerState>,
224 body: Option<Json<DeadLetterMutationRequest>>,
225) -> impl IntoResponse {
226 let request_ids = body
227 .and_then(|body| body.0.request_ids)
228 .map(normalize_request_ids);
229 match purge_dead_letter_messages(&state.store, request_ids) {
230 Ok(result) => (
231 StatusCode::OK,
232 Json(json!({
233 "status": "ok",
234 "purgedCount": result.purged_count,
235 })),
236 ),
237 Err(error) => (
238 StatusCode::INTERNAL_SERVER_ERROR,
239 Json(json!({
240 "error": {
241 "code": "DEAD_LETTER_PURGE_FAILED",
242 "message": error.to_string(),
243 }
244 })),
245 ),
246 }
247}
248
249fn normalize_request_ids(request_ids: Vec<String>) -> Vec<String> {
250 request_ids
251 .into_iter()
252 .filter_map(|value| {
253 let trimmed = value.trim();
254 if trimmed.is_empty() {
255 None
256 } else {
257 Some(trimmed.to_string())
258 }
259 })
260 .collect()
261}
262
263impl OutboundRequest {
264 fn normalized_to_agent_did(&self) -> Option<String> {
265 self.to_agent_did
266 .as_deref()
267 .or(self.peer_did.as_deref())
268 .map(str::trim)
269 .filter(|value| !value.is_empty())
270 .map(ToOwned::to_owned)
271 }
272}
273
274#[cfg(test)]
275mod tests {
276 use axum::body::{Body, to_bytes};
277 use axum::http::{Request, StatusCode};
278 use serde_json::Value;
279 use tempfile::TempDir;
280 use tower::ServiceExt;
281
282 use crate::db::SqliteStore;
283 use crate::db_outbound::outbound_count;
284
285 use super::{RuntimeServerState, create_runtime_router};
286
287 #[tokio::test]
288 async fn status_endpoint_returns_ok_payload() {
289 let temp = TempDir::new().expect("temp dir");
290 let store = SqliteStore::open_path(temp.path().join("db.sqlite3")).expect("open db");
291 let app = create_runtime_router(RuntimeServerState {
292 store,
293 relay_sender: None,
294 });
295
296 let response = app
297 .oneshot(
298 Request::builder()
299 .uri("/v1/status")
300 .body(Body::empty())
301 .expect("request"),
302 )
303 .await
304 .expect("response");
305 assert_eq!(response.status(), StatusCode::OK);
306 let body = to_bytes(response.into_body(), usize::MAX)
307 .await
308 .expect("bytes");
309 let payload: Value = serde_json::from_slice(&body).expect("json");
310 assert_eq!(
311 payload.get("status").and_then(|value| value.as_str()),
312 Some("ok")
313 );
314 }
315
316 #[tokio::test]
317 async fn outbound_endpoint_enqueues_message() {
318 let temp = TempDir::new().expect("temp dir");
319 let store = SqliteStore::open_path(temp.path().join("db.sqlite3")).expect("open db");
320 let app = create_runtime_router(RuntimeServerState {
321 store: store.clone(),
322 relay_sender: None,
323 });
324
325 let response = app
326 .oneshot(
327 Request::builder()
328 .method("POST")
329 .uri("/v1/outbound")
330 .header("content-type", "application/json")
331 .body(Body::from(
332 "{\"toAgentDid\":\"did:cdi:registry.clawdentity.com:agent:01HF7YAT00W6W7CM7N3W5FDXT4\",\"payload\":{\"hello\":\"world\"}}",
333 ))
334 .expect("request"),
335 )
336 .await
337 .expect("response");
338 assert_eq!(response.status(), StatusCode::ACCEPTED);
339 assert_eq!(outbound_count(&store).expect("count"), 1);
340 }
341
342 #[tokio::test]
343 async fn outbound_endpoint_accepts_legacy_peer_did_payload() {
344 let temp = TempDir::new().expect("temp dir");
345 let store = SqliteStore::open_path(temp.path().join("db.sqlite3")).expect("open db");
346 let app = create_runtime_router(RuntimeServerState {
347 store: store.clone(),
348 relay_sender: None,
349 });
350
351 let response = app
352 .oneshot(
353 Request::builder()
354 .method("POST")
355 .uri("/v1/outbound")
356 .header("content-type", "application/json")
357 .body(Body::from(
358 "{\"peer\":\"beta\",\"peerDid\":\"did:cdi:registry.clawdentity.com:agent:01HF7YAT00W6W7CM7N3W5FDXT4\",\"peerProxyUrl\":\"https://example.test/hooks/agent\",\"payload\":{\"hello\":\"world\"}}",
359 ))
360 .expect("request"),
361 )
362 .await
363 .expect("response");
364 assert_eq!(response.status(), StatusCode::ACCEPTED);
365 assert_eq!(outbound_count(&store).expect("count"), 1);
366 }
367}