Skip to main content

axess_events/
lib.rs

1//! Shared event vocabulary for the axess workspace and platform domains.
2//!
3//! [`Event<P>`] is the cross-cutting envelope: id, time, tenant, kind,
4//! subject, actor, trace context, status, body. The body is either a
5//! clear typed payload `P` or an opaque [`EncryptedBlob`]; chosen by
6//! the producer at emission time, transparent to subscribers without
7//! the key.
8//!
9//! Each domain defines its own payload enum implementing
10//! [`EventPayload`]; the envelope crate stays domain-agnostic.
11//!
12//! # Quick start
13//!
14//! ```
15//! use axess_events::{Event, EventBody, EventId, EventPayload, EventStatus, KindTag};
16//! use serde::{Serialize, Deserialize};
17//!
18//! #[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
19//! enum DemoPayload {
20//!     Hello { who: String },
21//! }
22//!
23//! impl EventPayload for DemoPayload {
24//!     fn kind_tag(&self) -> KindTag {
25//!         match self {
26//!             DemoPayload::Hello { .. } => KindTag::from_static("demo.hello.v1"),
27//!         }
28//!     }
29//!
30//!     fn to_inner_json(&self) -> serde_json::Value {
31//!         match self {
32//!             DemoPayload::Hello { who } => serde_json::json!({ "who": who }),
33//!         }
34//!     }
35//! }
36//!
37//! let payload = DemoPayload::Hello { who: "world".into() };
38//! let event = Event::<DemoPayload> {
39//!     id: EventId::NIL,
40//!     envelope_version: Event::<DemoPayload>::ENVELOPE_VERSION,
41//!     time_micros: 0,
42//!     tenant_id: None,
43//!     kind: payload.kind_tag(),
44//!     subject: None,
45//!     actor: None,
46//!     trace_context: None,
47//!     status: EventStatus::Info,
48//!     body: EventBody::Clear(payload),
49//! };
50//! assert_eq!(event.kind.as_str(), "demo.hello.v1");
51//! ```
52//!
53//! # Feature flags
54//!
55//! | Feature | Default | Effect |
56//! |---------|---------|--------|
57//! | `serde` | yes | All envelope types implement `Serialize` / `Deserialize`. |
58//! | `rkyv`  | no  | All envelope types implement rkyv `Archive` / `Serialize` / `Deserialize`. Required for org-internal binary streams (Iggy, replay archives). |
59//! | `full`  | no  | Both `serde` and `rkyv`. |
60
61#![forbid(unsafe_code)]
62#![deny(missing_docs)]
63#![cfg_attr(docsrs, feature(doc_cfg))]
64
65pub mod encryption;
66pub mod id;
67pub mod kind;
68pub mod sink;
69pub mod status;
70pub mod subject;
71pub mod trace;
72
73pub use encryption::{AeadAlgorithm, EncryptedBlob, KeyId};
74pub use id::{DeviceId, EventId, SessionId, TenantId, UserId};
75pub use kind::{EventPayload, KindTag};
76pub use sink::{EventSink, LogAndSwallow, NoopEventSink, SinkError};
77pub use status::EventStatus;
78pub use subject::EventSubject;
79pub use trace::TraceContext;
80
81/// Cross-cutting event envelope. Domain-specific payload `P` lives in
82/// the [`body`](Event::body) field, wrapped in [`EventBody`] to allow
83/// either clear payloads or opaque encrypted bytes.
84#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
85#[cfg_attr(
86    feature = "rkyv",
87    derive(rkyv::Archive, rkyv::Serialize, rkyv::Deserialize)
88)]
89#[derive(Clone, Debug, PartialEq, Eq)]
90pub struct Event<P>
91where
92    P: EventPayload,
93{
94    /// 16-byte ULID-shaped identifier. Sortable by time.
95    pub id: EventId,
96    /// Monotone version of the envelope schema. Bumps when fields are
97    /// added or layout changes; subscribers reject envelopes with a
98    /// version they don't recognise rather than mis-decode them.
99    pub envelope_version: u8,
100    /// Wall-clock instant the event was minted, expressed as
101    /// microseconds since the Unix epoch (UTC). i64 covers ±292,000
102    /// years from 1970, enough headroom for any audit horizon.
103    /// Aligns with the workspace-wide epoch-microseconds convention
104    /// (MiFID II nanosecond-precision regulation tolerates µs).
105    pub time_micros: i64,
106    /// Tenant scope. `None` only for system-level events that
107    /// genuinely transcend tenants (operator account flows,
108    /// cross-tenant admin work).
109    pub tenant_id: Option<TenantId>,
110    /// Wire-form kind discriminator. Lets brokers route without
111    /// decoding the payload. Derived from
112    /// [`EventPayload::kind_tag`] at construction time.
113    pub kind: KindTag,
114    /// "Who is this event *about*": the entity whose state changed.
115    /// `None` for events with no specific subject.
116    pub subject: Option<EventSubject>,
117    /// "Who initiated this event": the principal who *did* the
118    /// thing. Differs from `subject` for impersonation flows; equals
119    /// the system principal for autonomous flows. `None` when
120    /// unknown.
121    pub actor: Option<EventSubject>,
122    /// W3C trace context for cross-service correlation. `None` if no
123    /// upstream context is in scope.
124    pub trace_context: Option<TraceContext>,
125    /// Coarse outcome bucket. Detail lives in the payload.
126    pub status: EventStatus,
127    /// Clear payload or opaque encrypted bytes.
128    pub body: EventBody<P>,
129}
130
131impl<P: EventPayload> Event<P> {
132    /// Current envelope schema version. Producers stamp this onto
133    /// [`Event::envelope_version`]; subscribers compare on read.
134    pub const ENVELOPE_VERSION: u8 = 1;
135}
136
137/// Either a clear typed payload or an opaque encrypted blob.
138///
139/// The kind tag on the envelope tells subscribers what type the
140/// plaintext is supposed to be, regardless of whether the body is
141/// `Clear` or `Encrypted` here. Subscribers without the decryption
142/// key can still route on every other envelope field.
143#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
144#[cfg_attr(
145    feature = "rkyv",
146    derive(rkyv::Archive, rkyv::Serialize, rkyv::Deserialize)
147)]
148#[derive(Clone, Debug, PartialEq, Eq)]
149pub enum EventBody<P>
150where
151    P: EventPayload,
152{
153    /// Decrypted, typed payload available in-process.
154    Clear(P),
155    /// Encrypted bytes. Decrypt to recover the typed payload.
156    Encrypted(EncryptedBlob),
157}
158
159#[cfg(test)]
160mod tests {
161    use super::*;
162
163    /// Test payload for envelope round-trip exercises.
164    #[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
165    #[cfg_attr(
166        feature = "rkyv",
167        derive(rkyv::Archive, rkyv::Serialize, rkyv::Deserialize)
168    )]
169    #[derive(Clone, Debug, PartialEq, Eq)]
170    enum TestPayload {
171        LoginAttempt { user: UserId },
172        DeviceFirstSeen { device: DeviceId },
173    }
174
175    impl EventPayload for TestPayload {
176        fn kind_tag(&self) -> KindTag {
177            match self {
178                TestPayload::LoginAttempt { .. } => KindTag::from_static("test.login_attempt.v1"),
179                TestPayload::DeviceFirstSeen { .. } => {
180                    KindTag::from_static("test.device_first_seen.v1")
181                }
182            }
183        }
184
185        #[cfg(feature = "serde")]
186        fn to_inner_json(&self) -> serde_json::Value {
187            serde_json::to_value(self).unwrap_or_default()
188        }
189    }
190
191    /// 2026-05-08T12:00:00Z in epoch microseconds.
192    const SAMPLE_TIME_MICROS: i64 = 1_778_500_800_000_000;
193
194    fn sample_event() -> Event<TestPayload> {
195        let payload = TestPayload::LoginAttempt { user: UserId::NIL };
196        Event {
197            id: EventId::NIL,
198            envelope_version: Event::<TestPayload>::ENVELOPE_VERSION,
199            time_micros: SAMPLE_TIME_MICROS,
200            tenant_id: None,
201            kind: payload.kind_tag(),
202            subject: Some(EventSubject::User(UserId::NIL)),
203            actor: None,
204            trace_context: None,
205            status: EventStatus::Success,
206            body: EventBody::Clear(payload),
207        }
208    }
209
210    #[test]
211    fn kind_tag_is_derived_from_payload() {
212        let event = sample_event();
213        assert_eq!(event.kind.as_str(), "test.login_attempt.v1");
214    }
215
216    #[test]
217    fn envelope_version_is_one() {
218        assert_eq!(Event::<TestPayload>::ENVELOPE_VERSION, 1);
219    }
220
221    #[test]
222    fn event_body_clear_round_trip() {
223        let event = sample_event();
224        match event.body {
225            EventBody::Clear(TestPayload::LoginAttempt { .. }) => {}
226            _ => panic!("expected Clear(LoginAttempt)"),
227        }
228    }
229
230    #[test]
231    fn event_body_encrypted_carries_metadata() {
232        let body: EventBody<TestPayload> = EventBody::Encrypted(EncryptedBlob {
233            key_id: KeyId::from_static("kms.test.v1"),
234            algorithm: AeadAlgorithm::Aes256Gcm,
235            nonce: [0u8; 12],
236            ciphertext: vec![1, 2, 3, 4, 5],
237        });
238        match body {
239            EventBody::Encrypted(blob) => {
240                assert_eq!(blob.key_id.as_str(), "kms.test.v1");
241                assert_eq!(blob.algorithm, AeadAlgorithm::Aes256Gcm);
242                assert_eq!(blob.ciphertext, vec![1, 2, 3, 4, 5]);
243            }
244            _ => panic!("expected Encrypted"),
245        }
246    }
247
248    #[cfg(feature = "serde")]
249    #[test]
250    fn serde_json_round_trips_clear_event() {
251        let event = sample_event();
252        let json = serde_json::to_string(&event).unwrap();
253        let back: Event<TestPayload> = serde_json::from_str(&json).unwrap();
254        assert_eq!(event, back);
255    }
256
257    #[cfg(feature = "serde")]
258    #[test]
259    fn serde_json_round_trips_encrypted_event() {
260        let mut event = sample_event();
261        event.body = EventBody::Encrypted(EncryptedBlob {
262            key_id: KeyId::from_static("kms.test.v1"),
263            algorithm: AeadAlgorithm::ChaCha20Poly1305,
264            nonce: [7u8; 12],
265            ciphertext: vec![10, 20, 30],
266        });
267        let json = serde_json::to_string(&event).unwrap();
268        let back: Event<TestPayload> = serde_json::from_str(&json).unwrap();
269        assert_eq!(event, back);
270    }
271
272    #[cfg(feature = "rkyv")]
273    #[test]
274    fn rkyv_round_trips_clear_event() {
275        use rkyv::{from_bytes, rancor::Error, to_bytes};
276        let event = sample_event();
277        let bytes = to_bytes::<Error>(&event).unwrap();
278        let back: Event<TestPayload> = from_bytes::<Event<TestPayload>, Error>(&bytes).unwrap();
279        assert_eq!(event, back);
280    }
281
282    /// Pin: the no-op sink accepts every batch.
283    #[tokio::test]
284    async fn noop_sink_accepts_every_batch() {
285        let sink: NoopEventSink<TestPayload> = NoopEventSink::new();
286        let events = vec![sample_event(), sample_event()];
287        sink.emit(&events).await.unwrap();
288    }
289
290    /// Pin: LogAndSwallow turns sink errors into Ok and logs.
291    #[tokio::test]
292    async fn log_and_swallow_swallows_errors() {
293        struct Failing;
294        impl EventSink<TestPayload> for Failing {
295            async fn emit(&self, events: &[Event<TestPayload>]) -> Result<(), SinkError> {
296                Err(SinkError::Unavailable(format!(
297                    "forced (rejected batch of {})",
298                    events.len()
299                )))
300            }
301        }
302
303        let sink = LogAndSwallow(Failing);
304        let events = vec![sample_event()];
305        // Returns Ok despite the inner Err.
306        sink.emit(&events).await.unwrap();
307    }
308}