1use std::collections::{BTreeMap, BTreeSet};
11
12use serde::{Deserialize, Serialize};
13use serde_json::Value;
14use thiserror::Error;
15
16#[derive(Debug, Error, Clone, PartialEq, Eq)]
18pub enum EventStoreError {
19 #[error("validation error: {0}")]
21 Validation(String),
22 #[error("idempotency conflict: {0}")]
24 IdempotencyConflict(String),
25 #[error("expected cursor conflict: {0}")]
27 ExpectedCursorConflict(String),
28 #[error("cursor expired: {0}")]
30 CursorExpired(String),
31 #[error("integrity error: {0}")]
33 Integrity(String),
34 #[error("io error: {0}")]
36 Io(String),
37 #[error("serialization error: {0}")]
39 Serialization(String),
40}
41
42#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
44#[serde(transparent)]
45pub struct CanonicalEventId(String);
46
47impl CanonicalEventId {
48 pub fn new(value: impl Into<String>) -> Result<Self, EventStoreError> {
50 let value = value.into();
51 reject_blank("event_id", &value)?;
52 Ok(Self(value))
53 }
54
55 #[must_use]
57 pub fn as_str(&self) -> &str {
58 &self.0
59 }
60}
61
62#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
64#[serde(transparent)]
65pub struct EventCursor(String);
66
67impl EventCursor {
68 pub fn new(value: impl Into<String>) -> Result<Self, EventStoreError> {
70 let value = value.into();
71 reject_blank("cursor", &value)?;
72 Ok(Self(value))
73 }
74
75 #[must_use]
77 pub fn as_str(&self) -> &str {
78 &self.0
79 }
80}
81
82#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
84#[serde(transparent)]
85pub struct CanonicalEventKind(String);
86
87impl CanonicalEventKind {
88 pub fn new(value: impl Into<String>) -> Result<Self, EventStoreError> {
90 let value = value.into();
91 reject_blank("event_kind", &value)?;
92 Ok(Self(value))
93 }
94
95 #[must_use]
97 pub fn as_str(&self) -> &str {
98 &self.0
99 }
100}
101
102#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
104#[serde(tag = "scope_type", rename_all = "snake_case")]
105pub enum EventScope {
106 Thread { thread_id: String },
108 Run { run_id: String },
110}
111
112impl EventScope {
113 #[must_use]
115 pub fn thread(thread_id: impl Into<String>) -> Self {
116 Self::Thread {
117 thread_id: thread_id.into(),
118 }
119 }
120
121 #[must_use]
123 pub fn run(run_id: impl Into<String>) -> Self {
124 Self::Run {
125 run_id: run_id.into(),
126 }
127 }
128
129 #[must_use]
131 pub const fn family(&self) -> EventScopeFamily {
132 match self {
133 Self::Thread { .. } => EventScopeFamily::Thread,
134 Self::Run { .. } => EventScopeFamily::Run,
135 }
136 }
137}
138
139#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
141#[serde(rename_all = "snake_case")]
142pub enum EventScopeFamily {
143 Thread,
144 Run,
145}
146
147#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
149pub struct EventScopeIds {
150 #[serde(default, skip_serializing_if = "Option::is_none")]
151 pub thread_id: Option<String>,
152 #[serde(default, skip_serializing_if = "Option::is_none")]
153 pub run_id: Option<String>,
154}
155
156#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
158#[serde(rename_all = "snake_case")]
159pub enum EventVisibility {
160 #[default]
162 Public,
163 Internal,
165 Audit,
167 Sensitive,
169}
170
171#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
173pub struct CanonicalEventDraft {
174 pub scopes: Vec<EventScope>,
175 pub event_kind: CanonicalEventKind,
176 #[serde(default)]
177 pub payload: Value,
178 #[serde(default, skip_serializing_if = "Option::is_none")]
179 pub causation_id: Option<String>,
180 #[serde(default, skip_serializing_if = "Option::is_none")]
181 pub correlation_id: Option<String>,
182 pub origin: String,
183 #[serde(default)]
184 pub visibility: EventVisibility,
185 pub schema_version: u32,
186}
187
188impl CanonicalEventDraft {
189 pub fn new(
191 scopes: Vec<EventScope>,
192 event_kind: CanonicalEventKind,
193 payload: Value,
194 origin: impl Into<String>,
195 ) -> Result<Self, EventStoreError> {
196 let draft = Self {
197 scopes,
198 event_kind,
199 payload,
200 causation_id: None,
201 correlation_id: None,
202 origin: origin.into(),
203 visibility: EventVisibility::default(),
204 schema_version: 1,
205 };
206 draft.validate()?;
207 Ok(draft)
208 }
209
210 pub fn validate(&self) -> Result<(), EventStoreError> {
212 validate_scope_set(&self.scopes)?;
213 reject_blank("origin", &self.origin)?;
214 if self.schema_version == 0 {
215 return Err(EventStoreError::Validation(
216 "schema_version must be greater than 0".to_string(),
217 ));
218 }
219 Ok(())
220 }
221
222 pub fn scope_ids(&self) -> Result<EventScopeIds, EventStoreError> {
224 derive_scope_ids(&self.scopes)
225 }
226
227 pub fn idempotency_digest(&self) -> Result<Vec<u8>, EventStoreError> {
232 #[derive(Serialize)]
233 struct Basis<'a> {
234 scopes: &'a Vec<EventScope>,
235 event_kind: &'a CanonicalEventKind,
236 payload: &'a Value,
237 visibility: &'a EventVisibility,
238 causation_id: &'a Option<String>,
239 correlation_id: &'a Option<String>,
240 }
241 serde_json::to_vec(&Basis {
242 scopes: &self.scopes,
243 event_kind: &self.event_kind,
244 payload: &self.payload,
245 visibility: &self.visibility,
246 causation_id: &self.causation_id,
247 correlation_id: &self.correlation_id,
248 })
249 .map_err(|error| EventStoreError::Serialization(error.to_string()))
250 }
251}
252
253#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
255pub struct AppendOptions {
256 #[serde(default, skip_serializing_if = "Option::is_none")]
257 pub writer_id: Option<String>,
258 #[serde(default, skip_serializing_if = "Option::is_none")]
259 pub idempotency_key: Option<String>,
260 #[serde(default, skip_serializing_if = "BTreeMap::is_empty")]
261 pub expected_prior_cursors: BTreeMap<EventScope, EventCursor>,
262}
263
264impl AppendOptions {
265 pub fn validate(&self) -> Result<(), EventStoreError> {
267 if self.idempotency_key.is_some() {
268 reject_blank("writer_id", self.writer_id.as_deref().unwrap_or_default())?;
269 }
270 if let Some(key) = self.idempotency_key.as_deref() {
271 reject_blank("idempotency_key", key)?;
272 }
273 Ok(())
274 }
275
276 pub fn idempotency_identity(&self) -> Result<Option<(String, String)>, EventStoreError> {
278 self.validate()?;
279 Ok(match (&self.writer_id, &self.idempotency_key) {
280 (Some(writer_id), Some(key)) => Some((writer_id.clone(), key.clone())),
281 _ => None,
282 })
283 }
284}
285
286fn reject_blank(field: &str, value: &str) -> Result<(), EventStoreError> {
287 if value.trim().is_empty() {
288 return Err(EventStoreError::Validation(format!("{field} is required")));
289 }
290 Ok(())
291}
292
293fn validate_scope_set(scopes: &[EventScope]) -> Result<(), EventStoreError> {
294 if scopes.is_empty() {
295 return Err(EventStoreError::Validation(
296 "at least one event scope is required".to_string(),
297 ));
298 }
299 let mut exact_scopes = BTreeSet::new();
300 let mut families = BTreeSet::new();
301 for scope in scopes {
302 validate_scope(scope)?;
303 if !exact_scopes.insert(scope) {
304 return Err(EventStoreError::Validation(format!(
305 "duplicate event scope: {scope:?}"
306 )));
307 }
308 if !families.insert(scope.family()) {
309 return Err(EventStoreError::Validation(format!(
310 "duplicate event scope family: {:?}",
311 scope.family()
312 )));
313 }
314 }
315 derive_scope_ids(scopes)?;
316 Ok(())
317}
318
319fn validate_scope(scope: &EventScope) -> Result<(), EventStoreError> {
320 match scope {
321 EventScope::Thread { thread_id } => reject_blank("thread_id", thread_id),
322 EventScope::Run { run_id } => reject_blank("run_id", run_id),
323 }
324}
325
326fn derive_scope_ids(scopes: &[EventScope]) -> Result<EventScopeIds, EventStoreError> {
327 let mut ids = EventScopeIds::default();
328 for scope in scopes {
329 match scope {
330 EventScope::Thread { thread_id } => set_optional_id(
331 &mut ids.thread_id,
332 thread_id,
333 "thread_id contradicts scope membership",
334 )?,
335 EventScope::Run { run_id } => {
336 set_optional_id(
337 &mut ids.run_id,
338 run_id,
339 "run_id contradicts scope membership",
340 )?;
341 }
342 }
343 }
344 Ok(ids)
345}
346
347fn set_optional_id(
348 slot: &mut Option<String>,
349 value: &str,
350 error: &str,
351) -> Result<(), EventStoreError> {
352 match slot {
353 Some(existing) if existing != value => Err(EventStoreError::Validation(error.to_string())),
354 Some(_) => Ok(()),
355 None => {
356 *slot = Some(value.to_string());
357 Ok(())
358 }
359 }
360}
361
362#[cfg(test)]
363mod tests {
364 use super::*;
365
366 fn kind() -> CanonicalEventKind {
367 CanonicalEventKind::new("RunStarted").unwrap()
368 }
369
370 #[test]
371 fn draft_requires_at_least_one_scope() {
372 let err = CanonicalEventDraft::new(Vec::new(), kind(), Value::Null, "server").unwrap_err();
373 assert!(matches!(err, EventStoreError::Validation(message) if message.contains("scope")));
374 }
375
376 #[test]
377 fn draft_rejects_duplicate_scope_family() {
378 let err = CanonicalEventDraft::new(
379 vec![EventScope::thread("t1"), EventScope::thread("t2")],
380 kind(),
381 Value::Null,
382 "server",
383 )
384 .unwrap_err();
385 assert!(matches!(err, EventStoreError::Validation(message) if message.contains("family")));
386 }
387
388 #[test]
389 fn draft_derives_scope_ids() {
390 let draft = CanonicalEventDraft::new(
391 vec![EventScope::thread("t1"), EventScope::run("r1")],
392 kind(),
393 Value::Null,
394 "server",
395 )
396 .unwrap();
397 let ids = draft.scope_ids().unwrap();
398 assert_eq!(ids.thread_id.as_deref(), Some("t1"));
399 assert_eq!(ids.run_id.as_deref(), Some("r1"));
400 }
401
402 #[test]
403 fn append_options_require_writer_for_idempotency() {
404 let options = AppendOptions {
405 writer_id: None,
406 idempotency_key: Some("key-1".into()),
407 expected_prior_cursors: BTreeMap::new(),
408 };
409 let err = options.validate().unwrap_err();
410 assert!(
411 matches!(err, EventStoreError::Validation(message) if message.contains("writer_id"))
412 );
413 }
414
415 #[test]
416 fn append_options_return_idempotency_identity() {
417 let options = AppendOptions {
418 writer_id: Some("writer".into()),
419 idempotency_key: Some("key-1".into()),
420 expected_prior_cursors: BTreeMap::new(),
421 };
422 assert_eq!(
423 options.idempotency_identity().unwrap(),
424 Some(("writer".to_string(), "key-1".to_string()))
425 );
426 }
427
428 #[test]
429 fn idempotency_digest_ignores_origin_and_schema_version() {
430 let mut a = CanonicalEventDraft::new(
431 vec![EventScope::thread("t1")],
432 kind(),
433 serde_json::json!({"x": 1}),
434 "server",
435 )
436 .unwrap();
437 let mut b = a.clone();
438 b.origin = "ai-sdk".to_string();
439 b.schema_version = 17;
440 assert_eq!(
441 a.idempotency_digest().unwrap(),
442 b.idempotency_digest().unwrap()
443 );
444
445 a.payload = serde_json::json!({"x": 2});
447 assert_ne!(
448 a.idempotency_digest().unwrap(),
449 b.idempotency_digest().unwrap()
450 );
451 }
452
453 #[test]
454 fn idempotency_digest_distinguishes_d5_fields() {
455 let base = CanonicalEventDraft::new(
456 vec![EventScope::thread("t1")],
457 kind(),
458 serde_json::json!({}),
459 "server",
460 )
461 .unwrap();
462 let base_digest = base.idempotency_digest().unwrap();
463
464 let mut other_scope = base.clone();
465 other_scope.scopes = vec![EventScope::thread("t2")];
466 assert_ne!(base_digest, other_scope.idempotency_digest().unwrap());
467
468 let mut other_visibility = base.clone();
469 other_visibility.visibility = EventVisibility::Internal;
470 assert_ne!(base_digest, other_visibility.idempotency_digest().unwrap());
471
472 let mut other_causation = base.clone();
473 other_causation.causation_id = Some("evt_prev".into());
474 assert_ne!(base_digest, other_causation.idempotency_digest().unwrap());
475
476 let mut other_correlation = base.clone();
477 other_correlation.correlation_id = Some("corr-1".into());
478 assert_ne!(base_digest, other_correlation.idempotency_digest().unwrap());
479 }
480
481 #[test]
482 fn opaque_cursor_roundtrips_without_exposing_structure() {
483 let cursor = EventCursor::new("evtcur_opaque").unwrap();
484 let encoded = serde_json::to_string(&cursor).unwrap();
485 assert_eq!(encoded, "\"evtcur_opaque\"");
486 let decoded: EventCursor = serde_json::from_str(&encoded).unwrap();
487 assert_eq!(decoded.as_str(), "evtcur_opaque");
488 }
489}