eventdbx 1.12.2

An event-sourced, key-value, write-side database system.
Documentation
use std::collections::BTreeMap;

use async_graphql::Result as GqlResult;
use async_graphql::{Context, EmptySubscription, Json, Object, Schema, SimpleObject};
use axum::http::HeaderMap;
use serde::Deserialize;
use serde_json::Value;

use crate::error::EventError;
use crate::server::{AppState, extract_bearer_token, run_cli_json};
use crate::store::{AggregateState, EventMetadata, EventRecord, payload_to_map};
use crate::token::AccessKind;

#[derive(Clone)]
pub struct GraphqlState {
    app: AppState,
}

impl GraphqlState {
    pub(crate) fn new(app_state: AppState) -> Self {
        Self { app: app_state }
    }
}

pub type EventSchema = Schema<QueryRoot, MutationRoot, EmptySubscription>;

pub fn build_schema(state: GraphqlState) -> EventSchema {
    Schema::build(
        QueryRoot::default(),
        MutationRoot::default(),
        EmptySubscription,
    )
    .data(state)
    .finish()
}

#[derive(Default)]
pub struct QueryRoot;

#[Object]
impl QueryRoot {
    async fn aggregates(
        &self,
        ctx: &Context<'_>,
        skip: Option<usize>,
        take: Option<usize>,
    ) -> GqlResult<Vec<Aggregate>> {
        let state = ctx.data::<GraphqlState>()?;
        let app = &state.app;
        let skip = skip.unwrap_or(0);
        let mut take = take.unwrap_or(app.list_page_size());
        if take == 0 {
            return Ok(Vec::new());
        }
        if take > app.page_limit() {
            take = app.page_limit();
        }
        let args = vec![
            "aggregate".to_string(),
            "list".to_string(),
            "--skip".to_string(),
            skip.to_string(),
            "--take".to_string(),
            take.to_string(),
            "--json".to_string(),
        ];
        let aggregates: Vec<AggregateState> = run_cli_json(args)
            .await
            .map_err(async_graphql::Error::from)?;
        Ok(aggregates
            .into_iter()
            .filter(|aggregate| !app.is_hidden_aggregate(&aggregate.aggregate_type))
            .map(|aggregate| app.sanitize_aggregate(aggregate).into())
            .collect())
    }

    async fn aggregate(
        &self,
        ctx: &Context<'_>,
        aggregate_type: String,
        aggregate_id: String,
    ) -> GqlResult<Option<Aggregate>> {
        let state = ctx.data::<GraphqlState>()?;
        let app = &state.app;
        if app.is_hidden_aggregate(&aggregate_type) {
            return Ok(None);
        }
        let args = vec![
            "aggregate".to_string(),
            "get".to_string(),
            aggregate_type.clone(),
            aggregate_id.clone(),
        ];
        match run_cli_json::<AggregateState>(args).await {
            Ok(state) => Ok(Some(app.sanitize_aggregate(state).into())),
            Err(EventError::AggregateNotFound) => Ok(None),
            Err(err) => Err(async_graphql::Error::from(err)),
        }
    }

    async fn aggregate_events(
        &self,
        ctx: &Context<'_>,
        aggregate_type: String,
        aggregate_id: String,
        skip: Option<usize>,
        take: Option<usize>,
    ) -> GqlResult<Vec<Event>> {
        let state = ctx.data::<GraphqlState>()?;
        let app = &state.app;
        if app.is_hidden_aggregate(&aggregate_type) {
            return Err(EventError::AggregateNotFound.into());
        }
        let skip = skip.unwrap_or(0);
        let mut take = take.unwrap_or(app.page_limit());
        if take == 0 {
            return Ok(Vec::new());
        }
        if take > app.page_limit() {
            take = app.page_limit();
        }

        let args = vec![
            "aggregate".to_string(),
            "replay".to_string(),
            aggregate_type.clone(),
            aggregate_id.clone(),
            "--skip".to_string(),
            skip.to_string(),
            "--take".to_string(),
            take.to_string(),
            "--json".to_string(),
        ];
        let events: Vec<EventRecord> = run_cli_json(args)
            .await
            .map_err(async_graphql::Error::from)?;
        let events = events.into_iter().map(Into::into).collect();
        Ok(events)
    }

    async fn verify_aggregate(
        &self,
        ctx: &Context<'_>,
        aggregate_type: String,
        aggregate_id: String,
    ) -> GqlResult<VerifyResult> {
        let state = ctx.data::<GraphqlState>()?;
        let app = &state.app;
        if app.is_hidden_aggregate(&aggregate_type) {
            return Err(EventError::AggregateNotFound.into());
        }
        let args = vec![
            "aggregate".to_string(),
            "verify".to_string(),
            aggregate_type,
            aggregate_id,
            "--json".to_string(),
        ];
        let response: VerifyPayload = run_cli_json(args)
            .await
            .map_err(async_graphql::Error::from)?;
        Ok(VerifyResult {
            merkle_root: response.merkle_root,
        })
    }
}

#[derive(Default)]
pub struct MutationRoot;

#[Object]
impl MutationRoot {
    async fn append_event(&self, ctx: &Context<'_>, input: AppendEventInput) -> GqlResult<Event> {
        let state = ctx.data::<GraphqlState>()?;
        let app = &state.app;
        let headers = ctx
            .data::<HeaderMap>()
            .map_err(|_| async_graphql::Error::from(EventError::Unauthorized))?;
        let token = extract_bearer_token(headers)
            .ok_or_else(|| async_graphql::Error::from(EventError::Unauthorized))?;
        app.tokens()
            .authorize(&token, AccessKind::Write)
            .map_err(async_graphql::Error::from)?;

        let payload_map = payload_to_map(&input.payload);
        if app.restrict() {
            app.schemas()
                .validate_event(&input.aggregate_type, &input.event_type, &payload_map)?;
        }

        let payload_json = serde_json::to_string(&input.payload).map_err(|err| {
            async_graphql::Error::from(EventError::Serialization(err.to_string()))
        })?;
        let args = vec![
            "aggregate".to_string(),
            "apply".to_string(),
            input.aggregate_type.clone(),
            input.aggregate_id.clone(),
            input.event_type.clone(),
            "--payload".to_string(),
            payload_json,
        ];
        let record: EventRecord = run_cli_json(args)
            .await
            .map_err(async_graphql::Error::from)?;

        Ok(record.into())
    }
}

#[derive(async_graphql::InputObject)]
struct AppendEventInput {
    aggregate_type: String,
    aggregate_id: String,
    event_type: String,
    payload: serde_json::Value,
}

#[derive(async_graphql::SimpleObject)]
struct VerifyResult {
    merkle_root: String,
}

#[derive(Deserialize)]
struct VerifyPayload {
    merkle_root: String,
}

#[derive(SimpleObject)]
struct Aggregate {
    aggregate_type: String,
    aggregate_id: String,
    version: u64,
    state: Json<BTreeMap<String, String>>,
    merkle_root: String,
    archived: bool,
}

impl From<AggregateState> for Aggregate {
    fn from(value: AggregateState) -> Self {
        Self {
            aggregate_type: value.aggregate_type,
            aggregate_id: value.aggregate_id,
            version: value.version,
            state: Json(value.state),
            merkle_root: value.merkle_root,
            archived: value.archived,
        }
    }
}

#[derive(SimpleObject)]
struct Event {
    aggregate_type: String,
    aggregate_id: String,
    event_type: String,
    version: u64,
    payload: Json<Value>,
    metadata: EventMetadataObject,
    hash: String,
    merkle_root: String,
}

impl From<EventRecord> for Event {
    fn from(value: EventRecord) -> Self {
        Self {
            aggregate_type: value.aggregate_type,
            aggregate_id: value.aggregate_id,
            event_type: value.event_type,
            version: value.version,
            payload: Json(value.payload),
            metadata: value.metadata.into(),
            hash: value.hash,
            merkle_root: value.merkle_root,
        }
    }
}

#[derive(SimpleObject)]
struct EventMetadataObject {
    event_id: String,
    created_at: String,
    issued_by: Option<ActorClaimsObject>,
}

impl From<EventMetadata> for EventMetadataObject {
    fn from(value: EventMetadata) -> Self {
        Self {
            event_id: value.event_id.to_string(),
            created_at: value.created_at.to_rfc3339(),
            issued_by: value.issued_by.map(Into::into),
        }
    }
}

#[derive(SimpleObject)]
struct ActorClaimsObject {
    group: String,
    user: String,
}

impl From<crate::store::ActorClaims> for ActorClaimsObject {
    fn from(value: crate::store::ActorClaims) -> Self {
        Self {
            group: value.group,
            user: value.user,
        }
    }
}