clawdentity_core/runtime/
server.rs1use std::future::Future;
2use std::net::SocketAddr;
3
4use axum::extract::State;
5use axum::http::StatusCode;
6use axum::response::IntoResponse;
7use axum::{Json, Router, routing::get, routing::post};
8use serde::Deserialize;
9use serde_json::json;
10
11use crate::connector_client::ConnectorClientSender;
12use crate::db::SqliteStore;
13use crate::db_inbound::{dead_letter_count, list_dead_letter, pending_count};
14use crate::db_outbound::{EnqueueOutboundInput, enqueue_outbound, outbound_count};
15use crate::did::parse_agent_did;
16use crate::error::{CoreError, Result};
17use crate::runtime_relay::flush_outbound_queue_to_relay;
18use crate::runtime_replay::{purge_dead_letter_messages, replay_dead_letter_messages};
19
20#[derive(Clone)]
21pub struct RuntimeServerState {
22 pub store: SqliteStore,
23 pub relay_sender: Option<ConnectorClientSender>,
24}
25
26#[derive(Debug, Deserialize)]
27#[serde(rename_all = "camelCase")]
28struct OutboundRequest {
29 to_agent_did: String,
30 payload: serde_json::Value,
31 #[serde(default)]
32 conversation_id: Option<String>,
33 #[serde(default)]
34 reply_to: Option<String>,
35}
36
37#[derive(Debug, Deserialize)]
38#[serde(rename_all = "camelCase")]
39struct DeadLetterMutationRequest {
40 #[serde(default)]
41 request_ids: Option<Vec<String>>,
42}
43
44pub fn create_runtime_router(state: RuntimeServerState) -> Router {
46 Router::new()
47 .route("/v1/status", get(status_handler))
48 .route("/v1/outbound", post(outbound_handler))
49 .route("/v1/inbound/dead-letter", get(dead_letter_list_handler))
50 .route(
51 "/v1/inbound/dead-letter/replay",
52 post(dead_letter_replay_handler),
53 )
54 .route(
55 "/v1/inbound/dead-letter/purge",
56 post(dead_letter_purge_handler),
57 )
58 .with_state(state)
59}
60
61pub async fn run_runtime_server(
63 bind_addr: SocketAddr,
64 state: RuntimeServerState,
65 shutdown_signal: impl Future<Output = ()> + Send + 'static,
66) -> Result<()> {
67 let listener = tokio::net::TcpListener::bind(bind_addr)
68 .await
69 .map_err(|error| CoreError::Http(error.to_string()))?;
70 axum::serve(listener, create_runtime_router(state))
71 .with_graceful_shutdown(shutdown_signal)
72 .await
73 .map_err(|error| CoreError::Http(error.to_string()))?;
74 Ok(())
75}
76
77async fn status_handler(State(state): State<RuntimeServerState>) -> impl IntoResponse {
78 let outbound_pending = outbound_count(&state.store).unwrap_or(0);
79 let inbound_pending = pending_count(&state.store).unwrap_or(0);
80 let inbound_dead_letter = dead_letter_count(&state.store).unwrap_or(0);
81 let relay = state.relay_sender.as_ref();
82
83 (
84 StatusCode::OK,
85 Json(json!({
86 "status": "ok",
87 "websocket": {
88 "connected": relay.map(|sender| sender.is_connected()).unwrap_or(false),
89 "metrics": relay.map(|sender| sender.metrics_snapshot()),
90 },
91 "outbound": {
92 "queue": {
93 "pendingCount": outbound_pending,
94 },
95 },
96 "inbound": {
97 "pending": inbound_pending,
98 "deadLetter": inbound_dead_letter,
99 }
100 })),
101 )
102}
103
104#[allow(clippy::too_many_lines)]
105async fn outbound_handler(
106 State(state): State<RuntimeServerState>,
107 Json(request): Json<OutboundRequest>,
108) -> impl IntoResponse {
109 let normalized_to_agent_did = request.to_agent_did.trim().to_string();
110 if parse_agent_did(&normalized_to_agent_did).is_err() {
111 return (
112 StatusCode::BAD_REQUEST,
113 Json(json!({
114 "error": {
115 "code": "INVALID_TO_AGENT_DID",
116 "message": "toAgentDid must be a valid agent DID",
117 }
118 })),
119 );
120 }
121
122 let frame_id = ulid::Ulid::new().to_string();
123 let enqueue_result = enqueue_outbound(
124 &state.store,
125 EnqueueOutboundInput {
126 frame_id: frame_id.clone(),
127 frame_version: 1,
128 frame_type: "enqueue".to_string(),
129 to_agent_did: normalized_to_agent_did,
130 payload_json: request.payload.to_string(),
131 conversation_id: request.conversation_id,
132 reply_to: request.reply_to,
133 },
134 );
135 if let Err(error) = enqueue_result {
136 return (
137 StatusCode::INTERNAL_SERVER_ERROR,
138 Json(json!({
139 "error": {
140 "code": "OUTBOUND_PERSIST_FAILED",
141 "message": error.to_string(),
142 }
143 })),
144 );
145 }
146
147 if let Some(relay_sender) = &state.relay_sender {
148 let _ = flush_outbound_queue_to_relay(&state.store, relay_sender, 1, None).await;
149 }
150
151 (
152 StatusCode::ACCEPTED,
153 Json(json!({
154 "accepted": true,
155 "frameId": frame_id,
156 })),
157 )
158}
159
160async fn dead_letter_list_handler(State(state): State<RuntimeServerState>) -> impl IntoResponse {
161 match list_dead_letter(&state.store, 500) {
162 Ok(items) => (
163 StatusCode::OK,
164 Json(json!({
165 "status": "ok",
166 "count": items.len(),
167 "items": items,
168 })),
169 ),
170 Err(error) => (
171 StatusCode::INTERNAL_SERVER_ERROR,
172 Json(json!({
173 "error": {
174 "code": "DEAD_LETTER_LIST_FAILED",
175 "message": error.to_string(),
176 }
177 })),
178 ),
179 }
180}
181
182async fn dead_letter_replay_handler(
183 State(state): State<RuntimeServerState>,
184 body: Option<Json<DeadLetterMutationRequest>>,
185) -> impl IntoResponse {
186 let request_ids = body
187 .and_then(|body| body.0.request_ids)
188 .map(normalize_request_ids);
189 match replay_dead_letter_messages(&state.store, request_ids) {
190 Ok(result) => (
191 StatusCode::OK,
192 Json(json!({
193 "status": "ok",
194 "replayedCount": result.replayed_count,
195 })),
196 ),
197 Err(error) => (
198 StatusCode::INTERNAL_SERVER_ERROR,
199 Json(json!({
200 "error": {
201 "code": "DEAD_LETTER_REPLAY_FAILED",
202 "message": error.to_string(),
203 }
204 })),
205 ),
206 }
207}
208
209async fn dead_letter_purge_handler(
210 State(state): State<RuntimeServerState>,
211 body: Option<Json<DeadLetterMutationRequest>>,
212) -> impl IntoResponse {
213 let request_ids = body
214 .and_then(|body| body.0.request_ids)
215 .map(normalize_request_ids);
216 match purge_dead_letter_messages(&state.store, request_ids) {
217 Ok(result) => (
218 StatusCode::OK,
219 Json(json!({
220 "status": "ok",
221 "purgedCount": result.purged_count,
222 })),
223 ),
224 Err(error) => (
225 StatusCode::INTERNAL_SERVER_ERROR,
226 Json(json!({
227 "error": {
228 "code": "DEAD_LETTER_PURGE_FAILED",
229 "message": error.to_string(),
230 }
231 })),
232 ),
233 }
234}
235
236fn normalize_request_ids(request_ids: Vec<String>) -> Vec<String> {
237 request_ids
238 .into_iter()
239 .filter_map(|value| {
240 let trimmed = value.trim();
241 if trimmed.is_empty() {
242 None
243 } else {
244 Some(trimmed.to_string())
245 }
246 })
247 .collect()
248}
249
250#[cfg(test)]
251mod tests {
252 use axum::body::{Body, to_bytes};
253 use axum::http::{Request, StatusCode};
254 use serde_json::Value;
255 use tempfile::TempDir;
256 use tower::ServiceExt;
257
258 use crate::db::SqliteStore;
259 use crate::db_outbound::outbound_count;
260
261 use super::{RuntimeServerState, create_runtime_router};
262
263 #[tokio::test]
264 async fn status_endpoint_returns_ok_payload() {
265 let temp = TempDir::new().expect("temp dir");
266 let store = SqliteStore::open_path(temp.path().join("db.sqlite3")).expect("open db");
267 let app = create_runtime_router(RuntimeServerState {
268 store,
269 relay_sender: None,
270 });
271
272 let response = app
273 .oneshot(
274 Request::builder()
275 .uri("/v1/status")
276 .body(Body::empty())
277 .expect("request"),
278 )
279 .await
280 .expect("response");
281 assert_eq!(response.status(), StatusCode::OK);
282 let body = to_bytes(response.into_body(), usize::MAX)
283 .await
284 .expect("bytes");
285 let payload: Value = serde_json::from_slice(&body).expect("json");
286 assert_eq!(
287 payload.get("status").and_then(|value| value.as_str()),
288 Some("ok")
289 );
290 }
291
292 #[tokio::test]
293 async fn outbound_endpoint_enqueues_message() {
294 let temp = TempDir::new().expect("temp dir");
295 let store = SqliteStore::open_path(temp.path().join("db.sqlite3")).expect("open db");
296 let app = create_runtime_router(RuntimeServerState {
297 store: store.clone(),
298 relay_sender: None,
299 });
300
301 let response = app
302 .oneshot(
303 Request::builder()
304 .method("POST")
305 .uri("/v1/outbound")
306 .header("content-type", "application/json")
307 .body(Body::from(
308 "{\"toAgentDid\":\"did:cdi:registry.clawdentity.com:agent:01HF7YAT00W6W7CM7N3W5FDXT4\",\"payload\":{\"hello\":\"world\"}}",
309 ))
310 .expect("request"),
311 )
312 .await
313 .expect("response");
314 assert_eq!(response.status(), StatusCode::ACCEPTED);
315 assert_eq!(outbound_count(&store).expect("count"), 1);
316 }
317}