use majra::pubsub::{TypedMessage, TypedPubSub};
use tokio::sync::broadcast;
use tracing::debug;
use crate::entry::AuditEntry;
pub type StreamMessage = TypedMessage<AuditEntry>;
pub struct AuditStream {
hub: TypedPubSub<AuditEntry>,
topic_prefix: String,
}
impl AuditStream {
pub fn new() -> Self {
Self {
hub: TypedPubSub::new(),
topic_prefix: "libro".to_owned(),
}
}
pub fn with_prefix(prefix: impl Into<String>) -> Self {
Self {
hub: TypedPubSub::new(),
topic_prefix: prefix.into(),
}
}
fn topic_for(&self, entry: &AuditEntry) -> String {
format!(
"{}/{}/{}",
self.topic_prefix,
entry.source(),
entry.action()
)
}
pub fn publish(&self, entry: &AuditEntry) {
let topic = self.topic_for(entry);
debug!(topic = %topic, hash = entry.hash(), "entry published to stream");
self.hub.publish(&topic, entry.clone());
}
pub fn subscribe(&self, pattern: &str) -> broadcast::Receiver<StreamMessage> {
self.hub.subscribe(pattern)
}
pub fn pattern_count(&self) -> usize {
self.hub.pattern_count()
}
pub fn entries_published(&self) -> u64 {
self.hub.messages_published()
}
}
impl Default for AuditStream {
fn default() -> Self {
Self::new()
}
}
impl std::fmt::Debug for AuditStream {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("AuditStream")
.field("prefix", &self.topic_prefix)
.field("patterns", &self.hub.pattern_count())
.field("published", &self.hub.messages_published())
.finish()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::AuditChain;
use crate::entry::EventSeverity;
#[tokio::test]
async fn publish_subscribe_entry() {
let stream = AuditStream::new();
let mut rx = stream.subscribe("libro/#");
let mut chain = AuditChain::new();
let entry = chain.append(
EventSeverity::Info,
"daimon",
"agent.start",
serde_json::json!({}),
);
stream.publish(entry);
let msg = rx.recv().await.unwrap();
assert_eq!(msg.topic, "libro/daimon/agent.start");
assert_eq!(msg.payload.hash(), entry.hash());
assert_eq!(stream.entries_published(), 1);
}
#[tokio::test]
async fn subscribe_filtered_by_source() {
let stream = AuditStream::new();
let mut rx_daimon = stream.subscribe("libro/daimon/#");
let mut rx_aegis = stream.subscribe("libro/aegis/#");
let mut chain = AuditChain::new();
let e1 = chain.append(
EventSeverity::Info,
"daimon",
"start",
serde_json::json!({}),
);
stream.publish(e1);
let e2 = chain.append(
EventSeverity::Security,
"aegis",
"alert",
serde_json::json!({}),
);
stream.publish(e2);
let msg = rx_daimon.recv().await.unwrap();
assert_eq!(msg.payload.source(), "daimon");
assert!(rx_daimon.try_recv().is_err());
let msg = rx_aegis.recv().await.unwrap();
assert_eq!(msg.payload.source(), "aegis");
assert!(rx_aegis.try_recv().is_err());
}
#[tokio::test]
async fn subscribe_by_action_wildcard() {
let stream = AuditStream::new();
let mut rx = stream.subscribe("libro/*/alert");
let mut chain = AuditChain::new();
let e1 = chain.append(
EventSeverity::Info,
"daimon",
"start",
serde_json::json!({}),
);
stream.publish(e1);
let e2 = chain.append(
EventSeverity::Security,
"aegis",
"alert",
serde_json::json!({}),
);
stream.publish(e2);
let msg = rx.recv().await.unwrap();
assert_eq!(msg.payload.action(), "alert");
assert!(rx.try_recv().is_err());
}
#[tokio::test]
async fn no_delivery_on_mismatch() {
let stream = AuditStream::new();
let mut rx = stream.subscribe("libro/aegis/#");
let mut chain = AuditChain::new();
let entry = chain.append(
EventSeverity::Info,
"daimon",
"start",
serde_json::json!({}),
);
stream.publish(entry);
assert!(rx.try_recv().is_err());
}
#[test]
fn custom_prefix() {
let stream = AuditStream::with_prefix("audit/prod");
let entry = AuditEntry::new(
EventSeverity::Info,
"daimon",
"start",
serde_json::json!({}),
"",
);
assert_eq!(stream.topic_for(&entry), "audit/prod/daimon/start");
}
#[test]
fn debug_display() {
let stream = AuditStream::new();
let debug = format!("{stream:?}");
assert!(debug.contains("AuditStream"));
assert!(debug.contains("libro"));
}
#[tokio::test]
async fn multiple_entries_stream() {
let stream = AuditStream::new();
let mut rx = stream.subscribe("libro/#");
let mut chain = AuditChain::new();
for i in 0..5 {
let entry = chain.append(
EventSeverity::Info,
"src",
format!("e{i}"),
serde_json::json!({}),
);
stream.publish(entry);
}
for i in 0..5 {
let msg = rx.recv().await.unwrap();
assert_eq!(msg.payload.action(), format!("e{i}"));
}
assert_eq!(stream.entries_published(), 5);
}
}