1#![forbid(unsafe_code)]
62#![deny(missing_docs)]
63#![cfg_attr(docsrs, feature(doc_cfg))]
64
65pub mod encryption;
66pub mod id;
67pub mod kind;
68pub mod sink;
69pub mod status;
70pub mod subject;
71pub mod trace;
72
73pub use encryption::{AeadAlgorithm, EncryptedBlob, KeyId};
74pub use id::{DeviceId, EventId, SessionId, TenantId, UserId};
75pub use kind::{EventPayload, KindTag};
76pub use sink::{EventSink, LogAndSwallow, NoopEventSink, SinkError};
77pub use status::EventStatus;
78pub use subject::EventSubject;
79pub use trace::TraceContext;
80
81#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
85#[cfg_attr(
86 feature = "rkyv",
87 derive(rkyv::Archive, rkyv::Serialize, rkyv::Deserialize)
88)]
89#[derive(Clone, Debug, PartialEq, Eq)]
90pub struct Event<P>
91where
92 P: EventPayload,
93{
94 pub id: EventId,
96 pub envelope_version: u8,
100 pub time_micros: i64,
106 pub tenant_id: Option<TenantId>,
110 pub kind: KindTag,
114 pub subject: Option<EventSubject>,
117 pub actor: Option<EventSubject>,
122 pub trace_context: Option<TraceContext>,
125 pub status: EventStatus,
127 pub body: EventBody<P>,
129}
130
131impl<P: EventPayload> Event<P> {
132 pub const ENVELOPE_VERSION: u8 = 1;
135}
136
137#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
144#[cfg_attr(
145 feature = "rkyv",
146 derive(rkyv::Archive, rkyv::Serialize, rkyv::Deserialize)
147)]
148#[derive(Clone, Debug, PartialEq, Eq)]
149pub enum EventBody<P>
150where
151 P: EventPayload,
152{
153 Clear(P),
155 Encrypted(EncryptedBlob),
157}
158
159#[cfg(test)]
160mod tests {
161 use super::*;
162
163 #[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
165 #[cfg_attr(
166 feature = "rkyv",
167 derive(rkyv::Archive, rkyv::Serialize, rkyv::Deserialize)
168 )]
169 #[derive(Clone, Debug, PartialEq, Eq)]
170 enum TestPayload {
171 LoginAttempt { user: UserId },
172 DeviceFirstSeen { device: DeviceId },
173 }
174
175 impl EventPayload for TestPayload {
176 fn kind_tag(&self) -> KindTag {
177 match self {
178 TestPayload::LoginAttempt { .. } => KindTag::from_static("test.login_attempt.v1"),
179 TestPayload::DeviceFirstSeen { .. } => {
180 KindTag::from_static("test.device_first_seen.v1")
181 }
182 }
183 }
184
185 #[cfg(feature = "serde")]
186 fn to_inner_json(&self) -> serde_json::Value {
187 serde_json::to_value(self).unwrap_or_default()
188 }
189 }
190
191 const SAMPLE_TIME_MICROS: i64 = 1_778_500_800_000_000;
193
194 fn sample_event() -> Event<TestPayload> {
195 let payload = TestPayload::LoginAttempt { user: UserId::NIL };
196 Event {
197 id: EventId::NIL,
198 envelope_version: Event::<TestPayload>::ENVELOPE_VERSION,
199 time_micros: SAMPLE_TIME_MICROS,
200 tenant_id: None,
201 kind: payload.kind_tag(),
202 subject: Some(EventSubject::User(UserId::NIL)),
203 actor: None,
204 trace_context: None,
205 status: EventStatus::Success,
206 body: EventBody::Clear(payload),
207 }
208 }
209
210 #[test]
211 fn kind_tag_is_derived_from_payload() {
212 let event = sample_event();
213 assert_eq!(event.kind.as_str(), "test.login_attempt.v1");
214 }
215
216 #[test]
217 fn envelope_version_is_one() {
218 assert_eq!(Event::<TestPayload>::ENVELOPE_VERSION, 1);
219 }
220
221 #[test]
222 fn event_body_clear_round_trip() {
223 let event = sample_event();
224 match event.body {
225 EventBody::Clear(TestPayload::LoginAttempt { .. }) => {}
226 _ => panic!("expected Clear(LoginAttempt)"),
227 }
228 }
229
230 #[test]
231 fn event_body_encrypted_carries_metadata() {
232 let body: EventBody<TestPayload> = EventBody::Encrypted(EncryptedBlob {
233 key_id: KeyId::from_static("kms.test.v1"),
234 algorithm: AeadAlgorithm::Aes256Gcm,
235 nonce: [0u8; 12],
236 ciphertext: vec![1, 2, 3, 4, 5],
237 });
238 match body {
239 EventBody::Encrypted(blob) => {
240 assert_eq!(blob.key_id.as_str(), "kms.test.v1");
241 assert_eq!(blob.algorithm, AeadAlgorithm::Aes256Gcm);
242 assert_eq!(blob.ciphertext, vec![1, 2, 3, 4, 5]);
243 }
244 _ => panic!("expected Encrypted"),
245 }
246 }
247
248 #[cfg(feature = "serde")]
249 #[test]
250 fn serde_json_round_trips_clear_event() {
251 let event = sample_event();
252 let json = serde_json::to_string(&event).unwrap();
253 let back: Event<TestPayload> = serde_json::from_str(&json).unwrap();
254 assert_eq!(event, back);
255 }
256
257 #[cfg(feature = "serde")]
258 #[test]
259 fn serde_json_round_trips_encrypted_event() {
260 let mut event = sample_event();
261 event.body = EventBody::Encrypted(EncryptedBlob {
262 key_id: KeyId::from_static("kms.test.v1"),
263 algorithm: AeadAlgorithm::ChaCha20Poly1305,
264 nonce: [7u8; 12],
265 ciphertext: vec![10, 20, 30],
266 });
267 let json = serde_json::to_string(&event).unwrap();
268 let back: Event<TestPayload> = serde_json::from_str(&json).unwrap();
269 assert_eq!(event, back);
270 }
271
272 #[cfg(feature = "rkyv")]
273 #[test]
274 fn rkyv_round_trips_clear_event() {
275 use rkyv::{from_bytes, rancor::Error, to_bytes};
276 let event = sample_event();
277 let bytes = to_bytes::<Error>(&event).unwrap();
278 let back: Event<TestPayload> = from_bytes::<Event<TestPayload>, Error>(&bytes).unwrap();
279 assert_eq!(event, back);
280 }
281
282 #[tokio::test]
284 async fn noop_sink_accepts_every_batch() {
285 let sink: NoopEventSink<TestPayload> = NoopEventSink::new();
286 let events = vec![sample_event(), sample_event()];
287 sink.emit(&events).await.unwrap();
288 }
289
290 #[tokio::test]
292 async fn log_and_swallow_swallows_errors() {
293 struct Failing;
294 impl EventSink<TestPayload> for Failing {
295 async fn emit(&self, events: &[Event<TestPayload>]) -> Result<(), SinkError> {
296 Err(SinkError::Unavailable(format!(
297 "forced (rejected batch of {})",
298 events.len()
299 )))
300 }
301 }
302
303 let sink = LogAndSwallow(Failing);
304 let events = vec![sample_event()];
305 sink.emit(&events).await.unwrap();
307 }
308}