#![forbid(unsafe_code)]
#![deny(missing_docs)]
#![cfg_attr(docsrs, feature(doc_cfg))]
pub mod encryption;
pub mod id;
pub mod kind;
pub mod sink;
pub mod status;
pub mod subject;
pub mod trace;
pub use encryption::{AeadAlgorithm, EncryptedBlob, KeyId};
pub use id::{DeviceId, EventId, SessionId, TenantId, UserId};
pub use kind::{EventPayload, KindTag};
pub use sink::{EventSink, LogAndSwallow, NoopEventSink, SinkError};
pub use status::EventStatus;
pub use subject::EventSubject;
pub use trace::TraceContext;
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
#[cfg_attr(
feature = "rkyv",
derive(rkyv::Archive, rkyv::Serialize, rkyv::Deserialize)
)]
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct Event<P>
where
P: EventPayload,
{
pub id: EventId,
pub envelope_version: u8,
pub time_micros: i64,
pub tenant_id: Option<TenantId>,
pub kind: KindTag,
pub subject: Option<EventSubject>,
pub actor: Option<EventSubject>,
pub trace_context: Option<TraceContext>,
pub status: EventStatus,
pub body: EventBody<P>,
}
impl<P: EventPayload> Event<P> {
pub const ENVELOPE_VERSION: u8 = 1;
}
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
#[cfg_attr(
feature = "rkyv",
derive(rkyv::Archive, rkyv::Serialize, rkyv::Deserialize)
)]
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum EventBody<P>
where
P: EventPayload,
{
Clear(P),
Encrypted(EncryptedBlob),
}
#[cfg(test)]
mod tests {
use super::*;
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
#[cfg_attr(
feature = "rkyv",
derive(rkyv::Archive, rkyv::Serialize, rkyv::Deserialize)
)]
#[derive(Clone, Debug, PartialEq, Eq)]
enum TestPayload {
LoginAttempt { user: UserId },
DeviceFirstSeen { device: DeviceId },
}
impl EventPayload for TestPayload {
fn kind_tag(&self) -> KindTag {
match self {
TestPayload::LoginAttempt { .. } => KindTag::from_static("test.login_attempt.v1"),
TestPayload::DeviceFirstSeen { .. } => {
KindTag::from_static("test.device_first_seen.v1")
}
}
}
#[cfg(feature = "serde")]
fn to_inner_json(&self) -> serde_json::Value {
serde_json::to_value(self).unwrap_or_default()
}
}
const SAMPLE_TIME_MICROS: i64 = 1_778_500_800_000_000;
fn sample_event() -> Event<TestPayload> {
let payload = TestPayload::LoginAttempt { user: UserId::NIL };
Event {
id: EventId::NIL,
envelope_version: Event::<TestPayload>::ENVELOPE_VERSION,
time_micros: SAMPLE_TIME_MICROS,
tenant_id: None,
kind: payload.kind_tag(),
subject: Some(EventSubject::User(UserId::NIL)),
actor: None,
trace_context: None,
status: EventStatus::Success,
body: EventBody::Clear(payload),
}
}
#[test]
fn kind_tag_is_derived_from_payload() {
let event = sample_event();
assert_eq!(event.kind.as_str(), "test.login_attempt.v1");
}
#[test]
fn envelope_version_is_one() {
assert_eq!(Event::<TestPayload>::ENVELOPE_VERSION, 1);
}
#[test]
fn event_body_clear_round_trip() {
let event = sample_event();
match event.body {
EventBody::Clear(TestPayload::LoginAttempt { .. }) => {}
_ => panic!("expected Clear(LoginAttempt)"),
}
}
#[test]
fn event_body_encrypted_carries_metadata() {
let body: EventBody<TestPayload> = EventBody::Encrypted(EncryptedBlob {
key_id: KeyId::from_static("kms.test.v1"),
algorithm: AeadAlgorithm::Aes256Gcm,
nonce: [0u8; 12],
ciphertext: vec![1, 2, 3, 4, 5],
});
match body {
EventBody::Encrypted(blob) => {
assert_eq!(blob.key_id.as_str(), "kms.test.v1");
assert_eq!(blob.algorithm, AeadAlgorithm::Aes256Gcm);
assert_eq!(blob.ciphertext, vec![1, 2, 3, 4, 5]);
}
_ => panic!("expected Encrypted"),
}
}
#[cfg(feature = "serde")]
#[test]
fn serde_json_round_trips_clear_event() {
let event = sample_event();
let json = serde_json::to_string(&event).unwrap();
let back: Event<TestPayload> = serde_json::from_str(&json).unwrap();
assert_eq!(event, back);
}
#[cfg(feature = "serde")]
#[test]
fn serde_json_round_trips_encrypted_event() {
let mut event = sample_event();
event.body = EventBody::Encrypted(EncryptedBlob {
key_id: KeyId::from_static("kms.test.v1"),
algorithm: AeadAlgorithm::ChaCha20Poly1305,
nonce: [7u8; 12],
ciphertext: vec![10, 20, 30],
});
let json = serde_json::to_string(&event).unwrap();
let back: Event<TestPayload> = serde_json::from_str(&json).unwrap();
assert_eq!(event, back);
}
#[cfg(feature = "rkyv")]
#[test]
fn rkyv_round_trips_clear_event() {
use rkyv::{from_bytes, rancor::Error, to_bytes};
let event = sample_event();
let bytes = to_bytes::<Error>(&event).unwrap();
let back: Event<TestPayload> = from_bytes::<Event<TestPayload>, Error>(&bytes).unwrap();
assert_eq!(event, back);
}
#[tokio::test]
async fn noop_sink_accepts_every_batch() {
let sink: NoopEventSink<TestPayload> = NoopEventSink::new();
let events = vec![sample_event(), sample_event()];
sink.emit(&events).await.unwrap();
}
#[tokio::test]
async fn log_and_swallow_swallows_errors() {
struct Failing;
impl EventSink<TestPayload> for Failing {
async fn emit(&self, events: &[Event<TestPayload>]) -> Result<(), SinkError> {
Err(SinkError::Unavailable(format!(
"forced (rejected batch of {})",
events.len()
)))
}
}
let sink = LogAndSwallow(Failing);
let events = vec![sample_event()];
sink.emit(&events).await.unwrap();
}
}