Skip to main content

awaken_runtime_contract/contract/
event_store.rs

1//! Canonical event input & identity vocabulary.
2//!
3//! runtime-contract owns what a writer constructs and what locates an event:
4//! `CanonicalEventDraft`/`AppendOptions` (named by the commit-coordinator write
5//! boundary), the scope/kind/visibility value types, and `CanonicalEventId`/
6//! `EventCursor`. The store-output types (`CanonicalEvent` record, `EventPage`,
7//! `AppendResult`, subscription handles) and the store traits are a
8//! server/store concern and live in `awaken-server-contract`.
9
10use std::collections::{BTreeMap, BTreeSet};
11
12use serde::{Deserialize, Serialize};
13use serde_json::Value;
14use thiserror::Error;
15
16/// Errors returned by canonical event store implementations.
17#[derive(Debug, Error, Clone, PartialEq, Eq)]
18pub enum EventStoreError {
19    /// The provided input violates the event-store contract.
20    #[error("validation error: {0}")]
21    Validation(String),
22    /// The idempotency identity already exists with different append input.
23    #[error("idempotency conflict: {0}")]
24    IdempotencyConflict(String),
25    /// The caller supplied an expected cursor that does not match storage state.
26    #[error("expected cursor conflict: {0}")]
27    ExpectedCursorConflict(String),
28    /// The requested cursor is outside the retained history.
29    #[error("cursor expired: {0}")]
30    CursorExpired(String),
31    /// Storage history is missing data that should still be retained.
32    #[error("integrity error: {0}")]
33    Integrity(String),
34    /// An I/O error occurred.
35    #[error("io error: {0}")]
36    Io(String),
37    /// A serialization or deserialization error occurred.
38    #[error("serialization error: {0}")]
39    Serialization(String),
40}
41
42/// Stable canonical event identifier assigned by an [`EventWriter`].
43#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
44#[serde(transparent)]
45pub struct CanonicalEventId(String);
46
47impl CanonicalEventId {
48    /// Create an event id after validating that it is non-empty.
49    pub fn new(value: impl Into<String>) -> Result<Self, EventStoreError> {
50        let value = value.into();
51        reject_blank("event_id", &value)?;
52        Ok(Self(value))
53    }
54
55    /// Return the opaque id string.
56    #[must_use]
57    pub fn as_str(&self) -> &str {
58        &self.0
59    }
60}
61
62/// Opaque cursor for a single [`EventScope`].
63#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
64#[serde(transparent)]
65pub struct EventCursor(String);
66
67impl EventCursor {
68    /// Create an event cursor after validating that it is non-empty.
69    pub fn new(value: impl Into<String>) -> Result<Self, EventStoreError> {
70        let value = value.into();
71        reject_blank("cursor", &value)?;
72        Ok(Self(value))
73    }
74
75    /// Return the opaque cursor string.
76    #[must_use]
77    pub fn as_str(&self) -> &str {
78        &self.0
79    }
80}
81
82/// Protocol-neutral canonical event kind.
83#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
84#[serde(transparent)]
85pub struct CanonicalEventKind(String);
86
87impl CanonicalEventKind {
88    /// Create an event kind after validating that it is non-empty.
89    pub fn new(value: impl Into<String>) -> Result<Self, EventStoreError> {
90        let value = value.into();
91        reject_blank("event_kind", &value)?;
92        Ok(Self(value))
93    }
94
95    /// Return the event kind string.
96    #[must_use]
97    pub fn as_str(&self) -> &str {
98        &self.0
99    }
100}
101
102/// Query and ordering scope for canonical events.
103#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
104#[serde(tag = "scope_type", rename_all = "snake_case")]
105pub enum EventScope {
106    /// Events for one thread.
107    Thread { thread_id: String },
108    /// Events for one run activation.
109    Run { run_id: String },
110}
111
112impl EventScope {
113    /// Create a thread scope.
114    #[must_use]
115    pub fn thread(thread_id: impl Into<String>) -> Self {
116        Self::Thread {
117            thread_id: thread_id.into(),
118        }
119    }
120
121    /// Create a run scope.
122    #[must_use]
123    pub fn run(run_id: impl Into<String>) -> Self {
124        Self::Run {
125            run_id: run_id.into(),
126        }
127    }
128
129    /// Return the stable scope family name.
130    #[must_use]
131    pub const fn family(&self) -> EventScopeFamily {
132        match self {
133            Self::Thread { .. } => EventScopeFamily::Thread,
134            Self::Run { .. } => EventScopeFamily::Run,
135        }
136    }
137}
138
139/// Standard event-scope family.
140#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
141#[serde(rename_all = "snake_case")]
142pub enum EventScopeFamily {
143    Thread,
144    Run,
145}
146
147/// Denormalized ids derived from event scopes.
148#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
149pub struct EventScopeIds {
150    #[serde(default, skip_serializing_if = "Option::is_none")]
151    pub thread_id: Option<String>,
152    #[serde(default, skip_serializing_if = "Option::is_none")]
153    pub run_id: Option<String>,
154}
155
156/// Visibility and redaction hint for canonical events.
157#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
158#[serde(rename_all = "snake_case")]
159pub enum EventVisibility {
160    /// Safe for protocol replay after protocol-specific shaping.
161    #[default]
162    Public,
163    /// Internal server detail.
164    Internal,
165    /// Audit-oriented detail.
166    Audit,
167    /// Sensitive data requiring redaction or payload references.
168    Sensitive,
169}
170
171/// EventStore append input. Store-assigned fields are intentionally absent.
172#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
173pub struct CanonicalEventDraft {
174    pub scopes: Vec<EventScope>,
175    pub event_kind: CanonicalEventKind,
176    #[serde(default)]
177    pub payload: Value,
178    #[serde(default, skip_serializing_if = "Option::is_none")]
179    pub causation_id: Option<String>,
180    #[serde(default, skip_serializing_if = "Option::is_none")]
181    pub correlation_id: Option<String>,
182    pub origin: String,
183    #[serde(default)]
184    pub visibility: EventVisibility,
185    pub schema_version: u32,
186}
187
188impl CanonicalEventDraft {
189    /// Create and validate a canonical event draft.
190    pub fn new(
191        scopes: Vec<EventScope>,
192        event_kind: CanonicalEventKind,
193        payload: Value,
194        origin: impl Into<String>,
195    ) -> Result<Self, EventStoreError> {
196        let draft = Self {
197            scopes,
198            event_kind,
199            payload,
200            causation_id: None,
201            correlation_id: None,
202            origin: origin.into(),
203            visibility: EventVisibility::default(),
204            schema_version: 1,
205        };
206        draft.validate()?;
207        Ok(draft)
208    }
209
210    /// Validate scope membership, origin, and schema version.
211    pub fn validate(&self) -> Result<(), EventStoreError> {
212        validate_scope_set(&self.scopes)?;
213        reject_blank("origin", &self.origin)?;
214        if self.schema_version == 0 {
215            return Err(EventStoreError::Validation(
216                "schema_version must be greater than 0".to_string(),
217            ));
218        }
219        Ok(())
220    }
221
222    /// Return denormalized ids derived from scopes.
223    pub fn scope_ids(&self) -> Result<EventScopeIds, EventStoreError> {
224        derive_scope_ids(&self.scopes)
225    }
226
227    /// Idempotency equality basis per ADR-0034 D5: scope set, event_kind,
228    /// canonical payload, visibility, causation_id, correlation_id. Excludes
229    /// `origin` and `schema_version` so retries that differ only in those
230    /// fields return the original event instead of IdempotencyConflict.
231    pub fn idempotency_digest(&self) -> Result<Vec<u8>, EventStoreError> {
232        #[derive(Serialize)]
233        struct Basis<'a> {
234            scopes: &'a Vec<EventScope>,
235            event_kind: &'a CanonicalEventKind,
236            payload: &'a Value,
237            visibility: &'a EventVisibility,
238            causation_id: &'a Option<String>,
239            correlation_id: &'a Option<String>,
240        }
241        serde_json::to_vec(&Basis {
242            scopes: &self.scopes,
243            event_kind: &self.event_kind,
244            payload: &self.payload,
245            visibility: &self.visibility,
246            causation_id: &self.causation_id,
247            correlation_id: &self.correlation_id,
248        })
249        .map_err(|error| EventStoreError::Serialization(error.to_string()))
250    }
251}
252
253/// Options supplied when appending a canonical event.
254#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
255pub struct AppendOptions {
256    #[serde(default, skip_serializing_if = "Option::is_none")]
257    pub writer_id: Option<String>,
258    #[serde(default, skip_serializing_if = "Option::is_none")]
259    pub idempotency_key: Option<String>,
260    #[serde(default, skip_serializing_if = "BTreeMap::is_empty")]
261    pub expected_prior_cursors: BTreeMap<EventScope, EventCursor>,
262}
263
264impl AppendOptions {
265    /// Validate append options before passing them to a store.
266    pub fn validate(&self) -> Result<(), EventStoreError> {
267        if self.idempotency_key.is_some() {
268            reject_blank("writer_id", self.writer_id.as_deref().unwrap_or_default())?;
269        }
270        if let Some(key) = self.idempotency_key.as_deref() {
271            reject_blank("idempotency_key", key)?;
272        }
273        Ok(())
274    }
275
276    /// Return the `(writer_id, idempotency_key)` identity when present.
277    pub fn idempotency_identity(&self) -> Result<Option<(String, String)>, EventStoreError> {
278        self.validate()?;
279        Ok(match (&self.writer_id, &self.idempotency_key) {
280            (Some(writer_id), Some(key)) => Some((writer_id.clone(), key.clone())),
281            _ => None,
282        })
283    }
284}
285
286fn reject_blank(field: &str, value: &str) -> Result<(), EventStoreError> {
287    if value.trim().is_empty() {
288        return Err(EventStoreError::Validation(format!("{field} is required")));
289    }
290    Ok(())
291}
292
293fn validate_scope_set(scopes: &[EventScope]) -> Result<(), EventStoreError> {
294    if scopes.is_empty() {
295        return Err(EventStoreError::Validation(
296            "at least one event scope is required".to_string(),
297        ));
298    }
299    let mut exact_scopes = BTreeSet::new();
300    let mut families = BTreeSet::new();
301    for scope in scopes {
302        validate_scope(scope)?;
303        if !exact_scopes.insert(scope) {
304            return Err(EventStoreError::Validation(format!(
305                "duplicate event scope: {scope:?}"
306            )));
307        }
308        if !families.insert(scope.family()) {
309            return Err(EventStoreError::Validation(format!(
310                "duplicate event scope family: {:?}",
311                scope.family()
312            )));
313        }
314    }
315    derive_scope_ids(scopes)?;
316    Ok(())
317}
318
319fn validate_scope(scope: &EventScope) -> Result<(), EventStoreError> {
320    match scope {
321        EventScope::Thread { thread_id } => reject_blank("thread_id", thread_id),
322        EventScope::Run { run_id } => reject_blank("run_id", run_id),
323    }
324}
325
326fn derive_scope_ids(scopes: &[EventScope]) -> Result<EventScopeIds, EventStoreError> {
327    let mut ids = EventScopeIds::default();
328    for scope in scopes {
329        match scope {
330            EventScope::Thread { thread_id } => set_optional_id(
331                &mut ids.thread_id,
332                thread_id,
333                "thread_id contradicts scope membership",
334            )?,
335            EventScope::Run { run_id } => {
336                set_optional_id(
337                    &mut ids.run_id,
338                    run_id,
339                    "run_id contradicts scope membership",
340                )?;
341            }
342        }
343    }
344    Ok(ids)
345}
346
347fn set_optional_id(
348    slot: &mut Option<String>,
349    value: &str,
350    error: &str,
351) -> Result<(), EventStoreError> {
352    match slot {
353        Some(existing) if existing != value => Err(EventStoreError::Validation(error.to_string())),
354        Some(_) => Ok(()),
355        None => {
356            *slot = Some(value.to_string());
357            Ok(())
358        }
359    }
360}
361
362#[cfg(test)]
363mod tests {
364    use super::*;
365
366    fn kind() -> CanonicalEventKind {
367        CanonicalEventKind::new("RunStarted").unwrap()
368    }
369
370    #[test]
371    fn draft_requires_at_least_one_scope() {
372        let err = CanonicalEventDraft::new(Vec::new(), kind(), Value::Null, "server").unwrap_err();
373        assert!(matches!(err, EventStoreError::Validation(message) if message.contains("scope")));
374    }
375
376    #[test]
377    fn draft_rejects_duplicate_scope_family() {
378        let err = CanonicalEventDraft::new(
379            vec![EventScope::thread("t1"), EventScope::thread("t2")],
380            kind(),
381            Value::Null,
382            "server",
383        )
384        .unwrap_err();
385        assert!(matches!(err, EventStoreError::Validation(message) if message.contains("family")));
386    }
387
388    #[test]
389    fn draft_derives_scope_ids() {
390        let draft = CanonicalEventDraft::new(
391            vec![EventScope::thread("t1"), EventScope::run("r1")],
392            kind(),
393            Value::Null,
394            "server",
395        )
396        .unwrap();
397        let ids = draft.scope_ids().unwrap();
398        assert_eq!(ids.thread_id.as_deref(), Some("t1"));
399        assert_eq!(ids.run_id.as_deref(), Some("r1"));
400    }
401
402    #[test]
403    fn append_options_require_writer_for_idempotency() {
404        let options = AppendOptions {
405            writer_id: None,
406            idempotency_key: Some("key-1".into()),
407            expected_prior_cursors: BTreeMap::new(),
408        };
409        let err = options.validate().unwrap_err();
410        assert!(
411            matches!(err, EventStoreError::Validation(message) if message.contains("writer_id"))
412        );
413    }
414
415    #[test]
416    fn append_options_return_idempotency_identity() {
417        let options = AppendOptions {
418            writer_id: Some("writer".into()),
419            idempotency_key: Some("key-1".into()),
420            expected_prior_cursors: BTreeMap::new(),
421        };
422        assert_eq!(
423            options.idempotency_identity().unwrap(),
424            Some(("writer".to_string(), "key-1".to_string()))
425        );
426    }
427
428    #[test]
429    fn idempotency_digest_ignores_origin_and_schema_version() {
430        let mut a = CanonicalEventDraft::new(
431            vec![EventScope::thread("t1")],
432            kind(),
433            serde_json::json!({"x": 1}),
434            "server",
435        )
436        .unwrap();
437        let mut b = a.clone();
438        b.origin = "ai-sdk".to_string();
439        b.schema_version = 17;
440        assert_eq!(
441            a.idempotency_digest().unwrap(),
442            b.idempotency_digest().unwrap()
443        );
444
445        // Different payload must produce a different digest.
446        a.payload = serde_json::json!({"x": 2});
447        assert_ne!(
448            a.idempotency_digest().unwrap(),
449            b.idempotency_digest().unwrap()
450        );
451    }
452
453    #[test]
454    fn idempotency_digest_distinguishes_d5_fields() {
455        let base = CanonicalEventDraft::new(
456            vec![EventScope::thread("t1")],
457            kind(),
458            serde_json::json!({}),
459            "server",
460        )
461        .unwrap();
462        let base_digest = base.idempotency_digest().unwrap();
463
464        let mut other_scope = base.clone();
465        other_scope.scopes = vec![EventScope::thread("t2")];
466        assert_ne!(base_digest, other_scope.idempotency_digest().unwrap());
467
468        let mut other_visibility = base.clone();
469        other_visibility.visibility = EventVisibility::Internal;
470        assert_ne!(base_digest, other_visibility.idempotency_digest().unwrap());
471
472        let mut other_causation = base.clone();
473        other_causation.causation_id = Some("evt_prev".into());
474        assert_ne!(base_digest, other_causation.idempotency_digest().unwrap());
475
476        let mut other_correlation = base.clone();
477        other_correlation.correlation_id = Some("corr-1".into());
478        assert_ne!(base_digest, other_correlation.idempotency_digest().unwrap());
479    }
480
481    #[test]
482    fn opaque_cursor_roundtrips_without_exposing_structure() {
483        let cursor = EventCursor::new("evtcur_opaque").unwrap();
484        let encoded = serde_json::to_string(&cursor).unwrap();
485        assert_eq!(encoded, "\"evtcur_opaque\"");
486        let decoded: EventCursor = serde_json::from_str(&encoded).unwrap();
487        assert_eq!(decoded.as_str(), "evtcur_opaque");
488    }
489}