clawdentity-core 0.1.7

Core Rust library for Clawdentity identity, registry auth, relay, connector, and provider flows.
Documentation
use std::future::Future;
use std::net::SocketAddr;

use axum::extract::State;
use axum::http::StatusCode;
use axum::response::IntoResponse;
use axum::{Json, Router, routing::get, routing::post};
use serde::Deserialize;
use serde_json::json;

use crate::connector_client::ConnectorClientSender;
use crate::db::SqliteStore;
use crate::db_inbound::{dead_letter_count, list_dead_letter, pending_count};
use crate::db_outbound::{EnqueueOutboundInput, enqueue_outbound, outbound_count};
use crate::did::parse_agent_did;
use crate::error::{CoreError, Result};
use crate::runtime_relay::flush_outbound_queue_to_relay;
use crate::runtime_replay::{purge_dead_letter_messages, replay_dead_letter_messages};

#[derive(Clone)]
pub struct RuntimeServerState {
    pub store: SqliteStore,
    pub relay_sender: Option<ConnectorClientSender>,
}

#[derive(Debug, Deserialize)]
#[serde(rename_all = "camelCase")]
struct OutboundRequest {
    #[serde(default)]
    to_agent_did: Option<String>,
    #[serde(default)]
    peer_did: Option<String>,
    payload: serde_json::Value,
    #[serde(default)]
    conversation_id: Option<String>,
    #[serde(default)]
    reply_to: Option<String>,
}

#[derive(Debug, Deserialize)]
#[serde(rename_all = "camelCase")]
struct DeadLetterMutationRequest {
    #[serde(default)]
    request_ids: Option<Vec<String>>,
}

/// TODO(clawdentity): document `create_runtime_router`.
pub fn create_runtime_router(state: RuntimeServerState) -> Router {
    Router::new()
        .route("/v1/status", get(status_handler))
        .route("/v1/outbound", post(outbound_handler))
        .route("/v1/inbound/dead-letter", get(dead_letter_list_handler))
        .route(
            "/v1/inbound/dead-letter/replay",
            post(dead_letter_replay_handler),
        )
        .route(
            "/v1/inbound/dead-letter/purge",
            post(dead_letter_purge_handler),
        )
        .with_state(state)
}

/// TODO(clawdentity): document `run_runtime_server`.
pub async fn run_runtime_server(
    bind_addr: SocketAddr,
    state: RuntimeServerState,
    shutdown_signal: impl Future<Output = ()> + Send + 'static,
) -> Result<()> {
    let listener = tokio::net::TcpListener::bind(bind_addr)
        .await
        .map_err(|error| CoreError::Http(error.to_string()))?;
    axum::serve(listener, create_runtime_router(state))
        .with_graceful_shutdown(shutdown_signal)
        .await
        .map_err(|error| CoreError::Http(error.to_string()))?;
    Ok(())
}

async fn status_handler(State(state): State<RuntimeServerState>) -> impl IntoResponse {
    let outbound_pending = outbound_count(&state.store).unwrap_or(0);
    let inbound_pending = pending_count(&state.store).unwrap_or(0);
    let inbound_dead_letter = dead_letter_count(&state.store).unwrap_or(0);
    let relay = state.relay_sender.as_ref();

    (
        StatusCode::OK,
        Json(json!({
            "status": "ok",
            "websocket": {
                "connected": relay.map(|sender| sender.is_connected()).unwrap_or(false),
                "metrics": relay.map(|sender| sender.metrics_snapshot()),
            },
            "outbound": {
                "queue": {
                    "pendingCount": outbound_pending,
                },
            },
            "inbound": {
                "pending": inbound_pending,
                "deadLetter": inbound_dead_letter,
            }
        })),
    )
}

#[allow(clippy::too_many_lines)]
async fn outbound_handler(
    State(state): State<RuntimeServerState>,
    Json(request): Json<OutboundRequest>,
) -> impl IntoResponse {
    let Some(normalized_to_agent_did) = request.normalized_to_agent_did() else {
        return (
            StatusCode::BAD_REQUEST,
            Json(json!({
                "error": {
                    "code": "INVALID_TO_AGENT_DID",
                    "message": "toAgentDid or peerDid must be a valid agent DID",
                }
            })),
        );
    };
    if parse_agent_did(&normalized_to_agent_did).is_err() {
        return (
            StatusCode::BAD_REQUEST,
            Json(json!({
                "error": {
                    "code": "INVALID_TO_AGENT_DID",
                    "message": "toAgentDid or peerDid must be a valid agent DID",
                }
            })),
        );
    }

    let frame_id = ulid::Ulid::new().to_string();
    let enqueue_result = enqueue_outbound(
        &state.store,
        EnqueueOutboundInput {
            frame_id: frame_id.clone(),
            frame_version: 1,
            frame_type: "enqueue".to_string(),
            to_agent_did: normalized_to_agent_did,
            payload_json: request.payload.to_string(),
            conversation_id: request.conversation_id,
            reply_to: request.reply_to,
        },
    );
    if let Err(error) = enqueue_result {
        return (
            StatusCode::INTERNAL_SERVER_ERROR,
            Json(json!({
                "error": {
                    "code": "OUTBOUND_PERSIST_FAILED",
                    "message": error.to_string(),
                }
            })),
        );
    }

    if let Some(relay_sender) = &state.relay_sender {
        let _ = flush_outbound_queue_to_relay(&state.store, relay_sender, 1, None).await;
    }

    (
        StatusCode::ACCEPTED,
        Json(json!({
            "accepted": true,
            "frameId": frame_id,
        })),
    )
}

async fn dead_letter_list_handler(State(state): State<RuntimeServerState>) -> impl IntoResponse {
    match list_dead_letter(&state.store, 500) {
        Ok(items) => (
            StatusCode::OK,
            Json(json!({
                "status": "ok",
                "count": items.len(),
                "items": items,
            })),
        ),
        Err(error) => (
            StatusCode::INTERNAL_SERVER_ERROR,
            Json(json!({
                "error": {
                    "code": "DEAD_LETTER_LIST_FAILED",
                    "message": error.to_string(),
                }
            })),
        ),
    }
}

async fn dead_letter_replay_handler(
    State(state): State<RuntimeServerState>,
    body: Option<Json<DeadLetterMutationRequest>>,
) -> impl IntoResponse {
    let request_ids = body
        .and_then(|body| body.0.request_ids)
        .map(normalize_request_ids);
    match replay_dead_letter_messages(&state.store, request_ids) {
        Ok(result) => (
            StatusCode::OK,
            Json(json!({
                "status": "ok",
                "replayedCount": result.replayed_count,
            })),
        ),
        Err(error) => (
            StatusCode::INTERNAL_SERVER_ERROR,
            Json(json!({
                "error": {
                    "code": "DEAD_LETTER_REPLAY_FAILED",
                    "message": error.to_string(),
                }
            })),
        ),
    }
}

async fn dead_letter_purge_handler(
    State(state): State<RuntimeServerState>,
    body: Option<Json<DeadLetterMutationRequest>>,
) -> impl IntoResponse {
    let request_ids = body
        .and_then(|body| body.0.request_ids)
        .map(normalize_request_ids);
    match purge_dead_letter_messages(&state.store, request_ids) {
        Ok(result) => (
            StatusCode::OK,
            Json(json!({
                "status": "ok",
                "purgedCount": result.purged_count,
            })),
        ),
        Err(error) => (
            StatusCode::INTERNAL_SERVER_ERROR,
            Json(json!({
                "error": {
                    "code": "DEAD_LETTER_PURGE_FAILED",
                    "message": error.to_string(),
                }
            })),
        ),
    }
}

fn normalize_request_ids(request_ids: Vec<String>) -> Vec<String> {
    request_ids
        .into_iter()
        .filter_map(|value| {
            let trimmed = value.trim();
            if trimmed.is_empty() {
                None
            } else {
                Some(trimmed.to_string())
            }
        })
        .collect()
}

impl OutboundRequest {
    fn normalized_to_agent_did(&self) -> Option<String> {
        self.to_agent_did
            .as_deref()
            .or(self.peer_did.as_deref())
            .map(str::trim)
            .filter(|value| !value.is_empty())
            .map(ToOwned::to_owned)
    }
}

#[cfg(test)]
mod tests {
    use axum::body::{Body, to_bytes};
    use axum::http::{Request, StatusCode};
    use serde_json::Value;
    use tempfile::TempDir;
    use tower::ServiceExt;

    use crate::db::SqliteStore;
    use crate::db_outbound::outbound_count;

    use super::{RuntimeServerState, create_runtime_router};

    #[tokio::test]
    async fn status_endpoint_returns_ok_payload() {
        let temp = TempDir::new().expect("temp dir");
        let store = SqliteStore::open_path(temp.path().join("db.sqlite3")).expect("open db");
        let app = create_runtime_router(RuntimeServerState {
            store,
            relay_sender: None,
        });

        let response = app
            .oneshot(
                Request::builder()
                    .uri("/v1/status")
                    .body(Body::empty())
                    .expect("request"),
            )
            .await
            .expect("response");
        assert_eq!(response.status(), StatusCode::OK);
        let body = to_bytes(response.into_body(), usize::MAX)
            .await
            .expect("bytes");
        let payload: Value = serde_json::from_slice(&body).expect("json");
        assert_eq!(
            payload.get("status").and_then(|value| value.as_str()),
            Some("ok")
        );
    }

    #[tokio::test]
    async fn outbound_endpoint_enqueues_message() {
        let temp = TempDir::new().expect("temp dir");
        let store = SqliteStore::open_path(temp.path().join("db.sqlite3")).expect("open db");
        let app = create_runtime_router(RuntimeServerState {
            store: store.clone(),
            relay_sender: None,
        });

        let response = app
            .oneshot(
                Request::builder()
                    .method("POST")
                    .uri("/v1/outbound")
                    .header("content-type", "application/json")
                    .body(Body::from(
                        "{\"toAgentDid\":\"did:cdi:registry.clawdentity.com:agent:01HF7YAT00W6W7CM7N3W5FDXT4\",\"payload\":{\"hello\":\"world\"}}",
                    ))
                    .expect("request"),
            )
            .await
            .expect("response");
        assert_eq!(response.status(), StatusCode::ACCEPTED);
        assert_eq!(outbound_count(&store).expect("count"), 1);
    }

    #[tokio::test]
    async fn outbound_endpoint_accepts_legacy_peer_did_payload() {
        let temp = TempDir::new().expect("temp dir");
        let store = SqliteStore::open_path(temp.path().join("db.sqlite3")).expect("open db");
        let app = create_runtime_router(RuntimeServerState {
            store: store.clone(),
            relay_sender: None,
        });

        let response = app
            .oneshot(
                Request::builder()
                    .method("POST")
                    .uri("/v1/outbound")
                    .header("content-type", "application/json")
                    .body(Body::from(
                        "{\"peer\":\"beta\",\"peerDid\":\"did:cdi:registry.clawdentity.com:agent:01HF7YAT00W6W7CM7N3W5FDXT4\",\"peerProxyUrl\":\"https://example.test/hooks/agent\",\"payload\":{\"hello\":\"world\"}}",
                    ))
                    .expect("request"),
            )
            .await
            .expect("response");
        assert_eq!(response.status(), StatusCode::ACCEPTED);
        assert_eq!(outbound_count(&store).expect("count"), 1);
    }
}