awaken-server 0.6.0

Multi-protocol HTTP server with SSE, mailbox, and protocol adapters for Awaken
Documentation
use awaken_server_contract::contract::event_store::{
    AppendOptions, CanonicalEventDraft, CanonicalEventKind, EventScope, EventStoreError,
    EventWriter,
};
use awaken_server_contract::contract::storage::{ChildThreadDeleteStrategy, StorageError};
use awaken_server_contract::thread::Thread;
use serde_json::{Value, json};

use crate::app::RunRoutesState;
use crate::services::thread_service::{
    CreateThreadOptions, DeleteThreadOptions, UpdateThreadOptions,
};

pub(crate) async fn create_thread(
    state: &RunRoutesState,
    options: CreateThreadOptions,
) -> Result<Thread, StorageError> {
    let thread = crate::services::thread_service::create_thread_with_options(
        state.run.store().as_ref(),
        options,
    )
    .await?;
    record_thread_created(state, &thread).await;
    Ok(thread)
}

pub(crate) async fn update_thread(
    state: &RunRoutesState,
    thread_id: &str,
    options: UpdateThreadOptions,
) -> Result<Thread, StorageError> {
    let before = state
        .run
        .store()
        .load_thread(thread_id)
        .await?
        .ok_or_else(|| StorageError::NotFound(thread_id.to_string()))?;
    let thread = crate::services::thread_service::update_thread(
        state.run.store().as_ref(),
        thread_id,
        options,
    )
    .await?;
    record_thread_updated(state, &before, &thread).await;
    Ok(thread)
}

pub(crate) async fn delete_thread(
    state: &RunRoutesState,
    thread_id: &str,
    options: DeleteThreadOptions,
) -> Result<(), StorageError> {
    let thread = state
        .run
        .store()
        .load_thread(thread_id)
        .await?
        .ok_or_else(|| StorageError::NotFound(thread_id.to_string()))?;
    crate::services::thread_service::delete_thread(state.run.store().as_ref(), thread_id, options)
        .await?;
    record_thread_deleted(state, &thread, options.child_strategy).await;
    Ok(())
}

pub(crate) async fn record_thread_created(state: &RunRoutesState, thread: &Thread) {
    record_thread_event(
        state,
        "ThreadCreated",
        thread,
        thread_payload(thread, None, None),
        Some(format!("ThreadCreated/{}", thread.id)),
    )
    .await;
}

pub(crate) async fn record_thread_updated(state: &RunRoutesState, before: &Thread, after: &Thread) {
    record_thread_event(
        state,
        "ThreadUpdated",
        after,
        thread_payload(after, Some(before), None),
        None,
    )
    .await;
}

pub(crate) async fn record_thread_deleted(
    state: &RunRoutesState,
    thread: &Thread,
    child_strategy: ChildThreadDeleteStrategy,
) {
    record_thread_event(
        state,
        "ThreadDeleted",
        thread,
        thread_payload(thread, None, Some(child_strategy)),
        Some(format!("ThreadDeleted/{}", thread.id)),
    )
    .await;
}

async fn record_thread_event(
    state: &RunRoutesState,
    event_kind: &'static str,
    thread: &Thread,
    payload: Value,
    idempotency_key: Option<String>,
) {
    let Some(writer) = state
        .events
        .as_ref()
        .map(|events| events.event_store.clone())
    else {
        return;
    };
    if let Err(error) = append_thread_event(
        writer.as_ref(),
        event_kind,
        thread,
        payload,
        idempotency_key,
    )
    .await
    {
        tracing::error!(error = %error, thread_id = %thread.id, event_kind, "failed to record thread event");
    }
}

async fn append_thread_event(
    writer: &dyn EventWriter,
    event_kind: &'static str,
    thread: &Thread,
    payload: Value,
    idempotency_key: Option<String>,
) -> Result<(), EventStoreError> {
    let mut draft = CanonicalEventDraft::new(
        scopes_for_thread(thread),
        CanonicalEventKind::new(event_kind)?,
        payload,
        "server",
    )?;
    draft.correlation_id = Some(thread.id.clone());
    writer
        .append(
            draft,
            AppendOptions {
                writer_id: Some("thread-service".to_string()),
                idempotency_key,
                expected_prior_cursors: Default::default(),
            },
        )
        .await?;
    Ok(())
}

fn scopes_for_thread(thread: &Thread) -> Vec<EventScope> {
    vec![EventScope::thread(thread.id.clone())]
}

fn thread_payload(
    thread: &Thread,
    before: Option<&Thread>,
    child_strategy: Option<ChildThreadDeleteStrategy>,
) -> Value {
    let mut payload = json!({
        "thread_id": thread.id,
        "resource_id": thread.resource_id,
        "parent_thread_id": thread.parent_thread_id,
        "title": thread.metadata.title,
        "created_at": thread.metadata.created_at,
        "updated_at": thread.metadata.updated_at,
    });
    if let Some(before) = before
        && let Some(map) = payload.as_object_mut()
    {
        map.insert(
            "previous".to_string(),
            json!({
                "resource_id": before.resource_id,
                "parent_thread_id": before.parent_thread_id,
                "title": before.metadata.title,
                "updated_at": before.metadata.updated_at,
            }),
        );
    }
    if let Some(child_strategy) = child_strategy
        && let Some(map) = payload.as_object_mut()
    {
        map.insert("child_strategy".to_string(), json!(child_strategy));
    }
    payload
}

#[cfg(test)]
mod tests {
    use super::*;
    use awaken_server_contract::contract::event_store::{EventReader, EventVisibility};
    use awaken_stores::InMemoryEventStore;

    fn child_thread() -> Thread {
        let mut thread = Thread::with_id("thread-1")
            .with_title("Child")
            .with_resource_id("resource-a")
            .with_parent_thread_id("parent-1");
        thread.metadata.created_at = Some(10);
        thread.metadata.updated_at = Some(20);
        thread
    }

    #[tokio::test]
    async fn append_thread_event_indexes_thread_scope() {
        let store = InMemoryEventStore::new();
        append_thread_event(
            &store,
            "ThreadCreated",
            &child_thread(),
            thread_payload(&child_thread(), None, None),
            Some("ThreadCreated/thread-1".to_string()),
        )
        .await
        .unwrap();

        let by_thread = store
            .list(EventScope::thread("thread-1"), None, 10)
            .await
            .unwrap();

        assert_eq!(by_thread.events.len(), 1);
        assert_eq!(by_thread.events[0].event_kind.as_str(), "ThreadCreated");
        assert_eq!(by_thread.events[0].payload["resource_id"], "resource-a");
        assert_eq!(by_thread.events[0].payload["parent_thread_id"], "parent-1");
        assert_eq!(by_thread.events[0].visibility, EventVisibility::Public);
    }

    #[tokio::test]
    async fn thread_update_payload_carries_previous_lineage() {
        let store = InMemoryEventStore::new();
        let before = child_thread();
        let mut after = child_thread();
        after.resource_id = Some("resource-b".to_string());
        after.parent_thread_id = Some("parent-2".to_string());
        after.metadata.updated_at = Some(30);

        append_thread_event(
            &store,
            "ThreadUpdated",
            &after,
            thread_payload(&after, Some(&before), None),
            None,
        )
        .await
        .unwrap();

        let page = store
            .list(EventScope::thread("thread-1"), None, 10)
            .await
            .unwrap();
        assert_eq!(page.events[0].payload["resource_id"], "resource-b");
        assert_eq!(page.events[0].payload["parent_thread_id"], "parent-2");
        assert_eq!(
            page.events[0].payload["previous"]["resource_id"],
            "resource-a"
        );
        assert_eq!(
            page.events[0].payload["previous"]["parent_thread_id"],
            "parent-1"
        );
    }
}