use crate::contract::outbox::OutboxMessageDraft;
use async_trait::async_trait;
use awaken_runtime_contract::contract::commit_coordinator::{
CommitCoordinator, CommitError, StagedCanonicalEvent, ThreadCommit,
};
use awaken_runtime_contract::contract::event_store::{
AppendOptions, CanonicalEventDraft, EventScope,
};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use thiserror::Error;
#[derive(Debug, Clone, PartialEq)]
pub struct ServerCanonicalEvent {
pub draft: CanonicalEventDraft,
pub options: AppendOptions,
}
impl ServerCanonicalEvent {
#[must_use]
pub fn new(draft: CanonicalEventDraft) -> Self {
Self {
draft,
options: AppendOptions::default(),
}
}
#[must_use]
pub fn with_options(mut self, options: AppendOptions) -> Self {
self.options = options;
self
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ServerEventPublishOutcome {
Enqueued { dedupe_key: String },
}
#[derive(Debug, Error, Clone, PartialEq, Eq)]
pub enum EventPublishError {
#[error("validation error: {0}")]
Validation(String),
#[error("outbox enqueue failed: {0}")]
Enqueue(#[from] crate::contract::outbox::OutboxError),
#[error("serialization error: {0}")]
Serialization(String),
}
#[async_trait]
pub trait OutboxServerEventPublisher: Send + Sync {
async fn publish(
&self,
draft: CanonicalEventDraft,
options: AppendOptions,
) -> Result<ServerEventPublishOutcome, EventPublishError>;
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct DiagnosticEvent {
pub kind: String,
#[serde(default)]
pub payload: Value,
}
pub trait DiagnosticEventPublisher: Send + Sync {
fn record(&self, event: DiagnosticEvent);
}
#[derive(Debug, Clone, Default, PartialEq)]
pub struct ThreadCommitStagedWrites {
pub canonical_drafts: Vec<StagedCanonicalEvent>,
pub server_events: Vec<ServerCanonicalEvent>,
pub additional_outbox: Vec<OutboxMessageDraft>,
}
#[deprecated(since = "0.6.0", note = "Use `ThreadCommitStagedWrites`.")]
pub type CheckpointStagedWrites = ThreadCommitStagedWrites;
impl ThreadCommitStagedWrites {
#[must_use]
pub fn is_empty(&self) -> bool {
self.canonical_drafts.is_empty()
&& self.server_events.is_empty()
&& self.additional_outbox.is_empty()
}
#[must_use]
pub fn with_canonical_drafts(mut self, drafts: Vec<StagedCanonicalEvent>) -> Self {
self.canonical_drafts = drafts;
self
}
#[must_use]
pub fn with_server_events(mut self, events: Vec<ServerCanonicalEvent>) -> Self {
self.server_events = events;
self
}
#[must_use]
pub fn with_additional_outbox(mut self, rows: Vec<OutboxMessageDraft>) -> Self {
self.additional_outbox = rows;
self
}
pub fn validate(&self, thread_id: &str, run_id: &str) -> Result<(), CommitError> {
for staged in &self.canonical_drafts {
staged.draft.validate().map_err(CommitError::EventAppend)?;
validate_event_scope_membership(&staged.draft, thread_id, run_id)?;
staged
.append_options
.validate()
.map_err(CommitError::EventAppend)?;
}
for event in &self.server_events {
event.draft.validate().map_err(CommitError::EventAppend)?;
validate_event_scope_membership(&event.draft, thread_id, run_id)?;
event.options.validate().map_err(CommitError::EventAppend)?;
}
for row in &self.additional_outbox {
row.validate().map_err(CommitError::OutboxInsert)?;
}
Ok(())
}
}
#[derive(Debug, Clone, Default, PartialEq)]
pub struct ThreadCommitStagedOutcome {
pub canonical_event_ids: Vec<String>,
pub server_event_ids: Vec<String>,
pub additional_outbox_ids: Vec<String>,
}
fn validate_event_scope_membership(
draft: &CanonicalEventDraft,
thread_id: &str,
run_id: &str,
) -> Result<(), CommitError> {
for scope in &draft.scopes {
match scope {
EventScope::Thread {
thread_id: scope_thread,
} if scope_thread != thread_id => {
return Err(CommitError::Validation(format!(
"event thread scope '{scope_thread}' must match thread commit thread_id '{thread_id}'"
)));
}
EventScope::Run { run_id: scope_run } if scope_run != run_id => {
return Err(CommitError::Validation(format!(
"event run scope '{scope_run}' must match thread commit run_projection.run_id '{run_id}'"
)));
}
_ => {}
}
}
Ok(())
}
#[async_trait]
pub trait StagedCommitCoordinator: CommitCoordinator {
async fn commit_checkpoint_staged(
&self,
plan: ThreadCommit,
staged: ThreadCommitStagedWrites,
) -> Result<ThreadCommitStagedOutcome, CommitError>;
}
#[cfg(test)]
mod tests {
use super::*;
use awaken_runtime_contract::contract::event_store::{CanonicalEventKind, EventVisibility};
use serde_json::json;
fn draft(kind: &str, thread_id: &str, run_id: &str) -> CanonicalEventDraft {
let mut draft = CanonicalEventDraft::new(
vec![EventScope::thread(thread_id), EventScope::run(run_id)],
CanonicalEventKind::new(kind).unwrap(),
json!({ "kind": kind }),
"test",
)
.unwrap();
draft.visibility = EventVisibility::Public;
draft
}
#[test]
fn empty_is_empty() {
assert!(ThreadCommitStagedWrites::default().is_empty());
}
#[test]
fn validate_accepts_matching_scope() {
let staged = ThreadCommitStagedWrites::default().with_canonical_drafts(vec![
StagedCanonicalEvent::new(draft("RunStarted", "t", "r")),
]);
staged.validate("t", "r").unwrap();
}
#[test]
fn validate_rejects_wrong_thread_scope() {
let staged = ThreadCommitStagedWrites::default().with_canonical_drafts(vec![
StagedCanonicalEvent::new(draft("RunStarted", "other", "r")),
]);
let err = staged.validate("t", "r").unwrap_err();
assert!(matches!(err, CommitError::Validation(m) if m.contains("thread scope")));
}
#[test]
fn validate_rejects_wrong_run_scope() {
let staged = ThreadCommitStagedWrites::default().with_server_events(vec![
ServerCanonicalEvent::new(draft("RunSubmitted", "t", "other")),
]);
let err = staged.validate("t", "r").unwrap_err();
assert!(matches!(err, CommitError::Validation(m) if m.contains("run scope")));
}
}