umari 0.3.0

SDK for building event-sourced WASM components for the Umari runtime
Documentation
use std::{
    collections::{BTreeSet, HashMap},
    mem,
};

use indexmap::IndexMap;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use slotmap::SlotMap;
use umadb_dcb::{DcbQuery, DcbQueryItem};
use uuid::Uuid;

use crate::{
    IDEMPOTENCY_NAMESPACE,
    domain_id::{DomainIdBindings, DomainIds, FromDomainIds},
    effect::CURRENT_EVENT_CONTEXT,
    emit::Emit,
    event::{Event, EventDomainId, EventSet, StoredEvent},
    folds::{Append, Fold, FoldHandle, FoldKey, FoldSpec, FoldStates, HasFolds},
    runtime::command::{DomainId, EmitEvent, umari::command::transaction::Transaction},
};

pub struct Command<I: DomainIds, Fs = ()> {
    input: I,
    context: CommandContext,
    domain_ids: Vec<EventDomainId>,
    folds: SlotMap<FoldKey, FoldSpec<I>>,
    handles: Fs,
}

impl<I: DomainIds> Command<I, ()> {
    pub fn execute<F>(self, f: F) -> anyhow::Result<ExecuteOutput>
    where
        F: FnOnce(I) -> anyhow::Result<Emit>,
    {
        self.run(move |input, ()| f(input))
    }

    pub fn new(input: I, context: CommandContext) -> Self {
        Command {
            input,
            context,
            domain_ids: Vec::new(),
            folds: SlotMap::with_key(),
            handles: (),
        }
    }
}

impl<I: DomainIds, Fs> Command<I, Fs> {
    pub fn fold<T>(self) -> Command<I, <Fs as Append<FoldHandle<T>>>::Output>
    where
        T: Fold + FromDomainIds<Args = ()>,
        Fs: Append<FoldHandle<T>>,
    {
        self.fold_args(())
    }

    pub fn fold_args<T>(self, args: T::Args) -> Command<I, <Fs as Append<FoldHandle<T>>>::Output>
    where
        T: Fold + FromDomainIds,
        Fs: Append<FoldHandle<T>>,
    {
        let mut domain_ids = self.domain_ids;
        let mut folds = self.folds;
        domain_ids.extend(<T::Events>::event_domain_ids());
        let spec = FoldSpec::new::<T>(move |_input, bindings| {
            T::from_domain_ids(args, bindings).expect("failed to create fold from bindings")
        });
        let key = folds.insert(spec);
        Command {
            input: self.input,
            context: self.context,
            domain_ids,
            folds,
            handles: self.handles.append(FoldHandle::new(key)),
        }
    }

    pub fn fold_with<T, F>(self, f: F) -> Command<I, <Fs as Append<FoldHandle<T>>>::Output>
    where
        T: Fold,
        F: FnOnce(&I) -> T + 'static,
        Fs: Append<FoldHandle<T>>,
    {
        let mut domain_ids = self.domain_ids;
        let mut folds = self.folds;
        domain_ids.extend(<T::Events>::event_domain_ids());
        let spec = FoldSpec::new::<T>(move |input, _bindings| f(input));
        let key = folds.insert(spec);
        Command {
            input: self.input,
            context: self.context,
            domain_ids,
            folds,
            handles: self.handles.append(FoldHandle::new(key)),
        }
    }

    fn run<F>(self, f: F) -> anyhow::Result<ExecuteOutput>
    where
        Fs: FoldStates,
        F: FnOnce(I, Fs::States) -> anyhow::Result<Emit>,
    {
        let bindings = self.input.domain_ids();
        let mut folds: HashMap<_, _> = self
            .folds
            .into_iter()
            .map(|(key, spec)| {
                let (fold, fold_binding, state) = spec.create(&self.input, &bindings);
                (key, (fold, fold_binding, state))
            })
            .collect();

        let query = build_dcb_query(self.domain_ids, std::slice::from_ref(&bindings));
        let tx = Transaction::new(&query.into());

        loop {
            let events = tx.next_batch();
            if events.is_empty() {
                break;
            }

            for event in events {
                let event: StoredEvent<Value> = event.into();
                let is_idempotent = self
                    .context
                    .idempotency_key
                    .zip(event.idempotency_key)
                    .is_some_and(|(a, b)| a == b);
                if is_idempotent {
                    let position = tx.commit(&self.context.into(), &[]);
                    return Ok(ExecuteOutput {
                        position,
                        events: vec![],
                    });
                }

                for (fold, fold_binding, state) in folds.values_mut() {
                    fold.box_apply(state, &*fold_binding, &event)?;
                }
            }
        }

        let mut states: HashMap<_, _> = folds
            .into_iter()
            .map(|(key, (_, _, state))| (key, state))
            .collect();

        let emit = f(self.input, self.handles.extract(&mut states))?;
        let emitted_events: Vec<_> = emit
            .into_events()
            .into_iter()
            .enumerate()
            .map(|(i, event)| {
                let id = {
                    let mut key = Vec::with_capacity(
                        mem::size_of::<uuid::Bytes>() * 2 + mem::size_of::<u32>(),
                    );
                    key.extend_from_slice(self.context.correlation_id.as_bytes());
                    key.extend_from_slice(self.context.causation_id.as_bytes());
                    key.extend_from_slice(&(i as u32).to_be_bytes());
                    Uuid::new_v5(&IDEMPOTENCY_NAMESPACE, &key)
                };
                let data = serde_json::to_string(&event.data)
                    .unwrap_or_else(|err| panic!("failed to serialize event data: {err}"));
                EmitEvent {
                    id: id.to_string(),
                    event_type: event.event_type,
                    data,
                    domain_ids: event
                        .domain_ids
                        .into_iter()
                        .map(|(k, id)| DomainId {
                            name: k.to_string(),
                            id,
                        })
                        .collect(),
                    encryption_scope: event.encryption_scope,
                }
            })
            .collect();
        let position = tx.commit(&self.context.into(), &emitted_events);
        Ok(ExecuteOutput {
            position,
            events: emitted_events
                .into_iter()
                .map(|event| EmittedEvent {
                    id: event.id.parse().unwrap(),
                    event_type: event.event_type,
                    domain_ids: event
                        .domain_ids
                        .into_iter()
                        .map(|domain_id| (domain_id.name, domain_id.id))
                        .collect(),
                })
                .collect(),
        })
    }
}

impl<I: DomainIds, Fs: HasFolds> Command<I, Fs> {
    pub fn execute<F>(self, f: F) -> anyhow::Result<ExecuteOutput>
    where
        F: FnOnce(I, Fs::States) -> anyhow::Result<Emit>,
    {
        self.run(f)
    }
}

pub struct ExecuteOutput {
    pub position: Option<u64>,
    pub events: Vec<EmittedEvent>,
}

impl ExecuteOutput {
    pub fn has_event<E: Event>(&self) -> bool {
        self.events
            .iter()
            .any(|event| event.event_type == E::EVENT_TYPE)
    }
}

pub struct EmittedEvent {
    /// Event unique identifier
    pub id: Uuid,
    /// Event type identifier
    pub event_type: String,
    /// Domain IDs for event routing (field name -> value)
    pub domain_ids: IndexMap<String, String>,
}

/// A trait implemented when using the `export_command!` macro, with the command name matching the crate name.
pub trait CommandName {
    const COMMAND_NAME: &'static str;
}

#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
pub struct CommandContext {
    /// Original request ID (flows through everything)
    pub correlation_id: Uuid,
    /// Unique ID for this specific command execution
    pub causation_id: Uuid,
    /// Event ID that triggered this command (for sagas)
    pub triggering_event_id: Option<Uuid>,
    /// Client-supplied key for deduplicating retried command executions.
    pub idempotency_key: Option<Uuid>,
}

impl CommandContext {
    pub fn new() -> Self {
        CURRENT_EVENT_CONTEXT.with_borrow(|ctx| {
            ctx.map(|ctx| CommandContext {
                correlation_id: ctx.correlation_id,
                causation_id: Uuid::new_v4(),
                triggering_event_id: Some(ctx.triggering_event_id),
                idempotency_key: None,
            })
            .unwrap_or_else(|| CommandContext {
                correlation_id: Uuid::new_v4(),
                causation_id: Uuid::new_v4(),
                triggering_event_id: None,
                idempotency_key: None,
            })
        })
    }

    pub fn with_correlation_id(mut self, correlation_id: Uuid) -> Self {
        self.correlation_id = correlation_id;
        self
    }

    pub fn with_triggering_event_id(
        mut self,
        triggering_event_id: impl Into<Option<Uuid>>,
    ) -> Self {
        self.triggering_event_id = triggering_event_id.into();
        self
    }

    pub fn with_idempotency_key(mut self, idempotency_key: impl Into<Option<Uuid>>) -> Self {
        self.idempotency_key = idempotency_key.into();
        self
    }
}

#[derive(Clone, Debug)]
pub struct CommandReceipt {
    pub position: Option<u64>,
    pub events: Vec<EmittedEventRef>,
}

#[derive(Clone, Debug)]
pub struct EmittedEventRef {
    pub id: Uuid,
    pub event_type: String,
    pub tags: Vec<String>,
}

pub(crate) fn build_dcb_query(
    domain_ids: Vec<EventDomainId>,
    bindings: &[DomainIdBindings],
) -> DcbQuery {
    // Key: set of tags. Value: set of event types sharing those tags.
    // BTreeSet for tags ensures deterministic ordering for grouping.
    let mut grouped_items: IndexMap<BTreeSet<String>, BTreeSet<String>> = IndexMap::new();

    for binding_set in bindings {
        for entry in &domain_ids {
            let mut tags = BTreeSet::new();

            for field_name in entry.dynamic_fields {
                if let Some(value) = binding_set.get(field_name) {
                    tags.insert(format!("{}:{}", field_name, value));
                }
            }

            for &(field_name, value) in entry.static_fields {
                tags.insert(format!("{}:{}", field_name, value));
            }

            grouped_items
                .entry(tags)
                .or_default()
                .insert(entry.event_type.to_string());
        }
    }

    let items = grouped_items
        .into_iter()
        .map(|(tags, types)| DcbQueryItem {
            types: types.into_iter().collect(),
            tags: tags.into_iter().collect(),
        })
        .collect();

    DcbQuery { items }
}