cqrs-rust-lib 0.6.0

An opinionated implementation of CQRS/Event Sourcing with pluggable storage backends (InMemory, PostgreSQL, MongoDB)
Documentation
use crate::event::Event;
use crate::read::Paged;
use crate::{Aggregate, CqrsContext, DynEventStore, EventEnvelope};
use axum::extract::{Path, Query, State};
use axum::response::IntoResponse;
use axum::routing::get;
use axum::{Extension, Json};
use chrono::{DateTime, Utc};
use http::StatusCode;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use utoipa::openapi::path::ParameterIn;
use utoipa::openapi::{HttpMethod, Ref, RefOr};
use utoipa::{IntoParams, PartialSchema, ToSchema};
use utoipa_axum::router::{OpenApiRouter, UtoipaMethodRouter};

use crate::rest::helpers;

#[derive(Clone, Debug, Serialize, Deserialize, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct AuditLogEntry {
    pub id: String,
    pub aggregate_id: String,
    pub version: usize,
    pub event_type: String,
    pub metadata: HashMap<String, String>,
    #[schema(value_type = String)]
    pub at: DateTime<Utc>,
}

impl<A: Aggregate> From<EventEnvelope<A>> for AuditLogEntry {
    fn from(envelope: EventEnvelope<A>) -> Self {
        AuditLogEntry {
            id: envelope.event_id,
            aggregate_id: envelope.aggregate_id,
            version: envelope.version,
            event_type: envelope.payload.event_type(),
            metadata: envelope.metadata,
            at: envelope.at,
        }
    }
}

#[derive(Clone, Debug, Deserialize, IntoParams)]
pub struct AuditLogQuery {
    #[serde(default)]
    pub page: usize,
    #[serde(default = "default_page_size")]
    pub page_size: usize,
}

fn default_page_size() -> usize {
    10
}

#[derive(Clone)]
pub struct CQRSAuditLogRouter<A>
where
    A: Aggregate + 'static,
{
    _phantom: std::marker::PhantomData<A>,
    store: DynEventStore<A>,
}

impl<A> CQRSAuditLogRouter<A>
where
    A: Aggregate + 'static,
{
    #[must_use]
    fn new(store: DynEventStore<A>) -> Self {
        Self {
            _phantom: std::marker::PhantomData,
            store,
        }
    }

    fn path_aggregate_id_field() -> String {
        format!("{}_id", A::TYPE)
    }

    fn audit_log_route(
        router: OpenApiRouter<CQRSAuditLogRouter<A>>,
        tag: &str,
    ) -> OpenApiRouter<CQRSAuditLogRouter<A>> {
        let path = format!("/{{{}}}/audit", Self::path_aggregate_id_field());
        let response_schema_name = format!("Paged_{}_AuditLog", A::TYPE);
        let schemas = vec![(
            response_schema_name.to_string(),
            Paged::<AuditLogEntry>::schema(),
        )];

        let paths = helpers::generate_route(
            tag,
            HttpMethod::Get,
            &path,
            RefOr::Ref(Ref::from_schema_name(response_schema_name)),
            vec![(Self::path_aggregate_id_field(), String::schema())],
            AuditLogQuery::into_params(|| Some(ParameterIn::Query)),
            None,
        );

        let handler = get(
            move |State(router): State<CQRSAuditLogRouter<A>>,
                  Path(aggregate_id): Path<String>,
                  Query(query): Query<AuditLogQuery>,
                  Extension(_context): Extension<CqrsContext>| async move {
                Self::get_audit_log(router, aggregate_id, query).await
            },
        );

        router.routes(UtoipaMethodRouter::<CQRSAuditLogRouter<A>>::from((
            schemas, paths, handler,
        )))
    }

    pub fn routes(store: DynEventStore<A>, tag: &'static str) -> OpenApiRouter {
        let state = Self::new(store);
        let mut result = OpenApiRouter::<CQRSAuditLogRouter<A>>::new();
        result = Self::audit_log_route(result, tag);
        result.with_state(state)
    }

    async fn get_audit_log(
        router: CQRSAuditLogRouter<A>,
        aggregate_id: String,
        query: AuditLogQuery,
    ) -> impl IntoResponse {
        match router
            .store
            .load_events_paged(&aggregate_id, query.page, query.page_size)
            .await
        {
            Ok((events, total)) => {
                let items: Vec<AuditLogEntry> =
                    events.into_iter().map(AuditLogEntry::from).collect();
                let response = Paged {
                    items,
                    total,
                    page: query.page as i64,
                    page_size: query.page_size as i64,
                };
                (StatusCode::OK, Json(response)).into_response()
            }
            Err(err) => err.into_response(),
        }
    }
}