use std::collections::{BTreeMap, BTreeSet};
use async_trait::async_trait;
use futures::stream::BoxStream;
use serde::{Deserialize, Serialize};
use serde_json::Value;
pub use awaken_runtime_contract::contract::event_store::{
AppendOptions, CanonicalEventDraft, CanonicalEventId, CanonicalEventKind, EventCursor,
EventScope, EventScopeFamily, EventScopeIds, EventStoreError, EventVisibility,
};
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum FidelityClass {
ObservedRuntimeEvent,
CommittedRuntimeEvent,
DomainEvent,
ControlEvent,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct CanonicalEvent {
pub event_id: CanonicalEventId,
pub scopes: Vec<EventScope>,
pub cursors_by_scope: BTreeMap<EventScope, EventCursor>,
pub event_kind: CanonicalEventKind,
#[serde(default)]
pub payload: Value,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub thread_id: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub run_id: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub causation_id: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub correlation_id: Option<String>,
pub origin: String,
#[serde(default)]
pub visibility: EventVisibility,
pub schema_version: u32,
pub created_at: u64,
}
impl CanonicalEvent {
pub fn from_append(
event_id: CanonicalEventId,
cursors_by_scope: BTreeMap<EventScope, EventCursor>,
created_at: u64,
draft: CanonicalEventDraft,
) -> Result<Self, EventStoreError> {
draft.validate()?;
validate_cursor_coverage(&draft.scopes, &cursors_by_scope)?;
let ids = draft.scope_ids()?;
Ok(Self {
event_id,
scopes: draft.scopes,
cursors_by_scope,
event_kind: draft.event_kind,
payload: draft.payload,
thread_id: ids.thread_id,
run_id: ids.run_id,
causation_id: draft.causation_id,
correlation_id: draft.correlation_id,
origin: draft.origin,
visibility: draft.visibility,
schema_version: draft.schema_version,
created_at,
})
}
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct AppendResult {
pub event: CanonicalEvent,
}
impl AppendResult {
#[must_use]
pub fn event_id(&self) -> &CanonicalEventId {
&self.event.event_id
}
#[must_use]
pub fn cursors_by_scope(&self) -> &BTreeMap<EventScope, EventCursor> {
&self.event.cursors_by_scope
}
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct EventPage {
pub events: Vec<CanonicalEvent>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub next_cursor: Option<EventCursor>,
pub has_more: bool,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum SubscribeStart {
FromStart,
FromCursor(EventCursor),
FromNow,
}
pub type CanonicalEventStream = BoxStream<'static, Result<CanonicalEvent, EventStoreError>>;
pub struct SubscribeHandle {
pub start_cursor: Option<EventCursor>,
pub stream: CanonicalEventStream,
}
fn validate_cursor_coverage(
scopes: &[EventScope],
cursors: &BTreeMap<EventScope, EventCursor>,
) -> Result<(), EventStoreError> {
let scope_set = scopes.iter().collect::<BTreeSet<_>>();
let cursor_scope_set = cursors.keys().collect::<BTreeSet<_>>();
if scope_set != cursor_scope_set {
return Err(EventStoreError::Validation(
"cursors_by_scope must exactly match event scopes".to_string(),
));
}
Ok(())
}
#[async_trait]
pub trait EventWriter: Send + Sync {
async fn append(
&self,
draft: CanonicalEventDraft,
options: AppendOptions,
) -> Result<AppendResult, EventStoreError>;
}
#[async_trait]
pub trait EventReader: Send + Sync {
async fn list(
&self,
scope: EventScope,
from: Option<EventCursor>,
limit: usize,
) -> Result<EventPage, EventStoreError>;
async fn count(&self, scope: EventScope) -> Result<u64, EventStoreError>;
}
#[async_trait]
pub trait EventLookup: Send + Sync {
async fn load_event(
&self,
event_id: &CanonicalEventId,
) -> Result<CanonicalEvent, EventStoreError>;
}
#[async_trait]
pub trait EventSubscriber: Send + Sync {
async fn subscribe(
&self,
scope: EventScope,
start: SubscribeStart,
) -> Result<SubscribeHandle, EventStoreError>;
}
pub trait EventStore: EventWriter + EventReader + EventLookup + EventSubscriber {}
impl<T> EventStore for T where T: EventWriter + EventReader + EventLookup + EventSubscriber {}
#[cfg(test)]
mod tests {
use super::*;
use std::collections::BTreeMap;
fn kind() -> CanonicalEventKind {
CanonicalEventKind::new("RunStarted").unwrap()
}
fn cursor(value: &str) -> EventCursor {
EventCursor::new(value).unwrap()
}
fn event_id(value: &str) -> CanonicalEventId {
CanonicalEventId::new(value).unwrap()
}
#[test]
fn persisted_event_requires_cursor_for_each_scope() {
let draft = CanonicalEventDraft::new(
vec![EventScope::thread("t1"), EventScope::run("r1")],
kind(),
serde_json::Value::Null,
"server",
)
.unwrap();
let mut cursors = BTreeMap::new();
cursors.insert(EventScope::thread("t1"), cursor("cur_t_1"));
let err = CanonicalEvent::from_append(event_id("evt_1"), cursors, 1, draft).unwrap_err();
assert!(
matches!(err, EventStoreError::Validation(message) if message.contains("cursors_by_scope"))
);
}
#[test]
fn persisted_event_carries_store_assigned_fields_and_denormalized_ids() {
let draft = CanonicalEventDraft::new(
vec![EventScope::thread("t1"), EventScope::run("r1")],
kind(),
serde_json::json!({"ok": true}),
"server",
)
.unwrap();
let mut cursors = BTreeMap::new();
cursors.insert(EventScope::thread("t1"), cursor("cur_t_1"));
cursors.insert(EventScope::run("r1"), cursor("cur_r_1"));
let event = CanonicalEvent::from_append(event_id("evt_1"), cursors, 42, draft).unwrap();
assert_eq!(event.event_id.as_str(), "evt_1");
assert_eq!(event.thread_id.as_deref(), Some("t1"));
assert_eq!(event.run_id.as_deref(), Some("r1"));
assert_eq!(event.created_at, 42);
assert_eq!(event.payload, serde_json::json!({"ok": true}));
}
}