use std::panic::{AssertUnwindSafe, UnwindSafe};
use std::sync::atomic::{AtomicBool, Ordering};
use thiserror::Error;
use super::events::{
ChannelAuditEvent, FallbackAuditEvent, ModerationAuditEvent, SubstrateAuditEvent,
UserAuditEvent,
};
#[non_exhaustive]
#[derive(Debug, Clone, PartialEq, Eq, Error)]
pub enum AuditError {
#[error("audit sink unavailable")]
Unavailable,
#[error("audit buffer rejected event")]
BufferRejected,
}
#[non_exhaustive]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum SinkKind {
User,
Channel,
Substrate,
Moderation,
}
pub trait UserAuditSink: Send + Sync {
fn record(&self, event: UserAuditEvent) -> Result<(), AuditError>;
}
pub trait ChannelAuditSink: Send + Sync {
fn record(&self, event: ChannelAuditEvent) -> Result<(), AuditError>;
}
pub trait SubstrateAuditSink: Send + Sync {
fn record(&self, event: SubstrateAuditEvent) -> Result<(), AuditError>;
}
pub trait ModerationAuditSink: Send + Sync {
fn record(&self, event: ModerationAuditEvent) -> Result<(), AuditError>;
}
pub trait FallbackAuditSink: Send + Sync {
fn record_panic(
&self,
sink: SinkKind,
trace_id: crate::identity::TraceId,
capability: crate::authority::capability::CapabilityKind,
at: std::time::SystemTime,
);
fn record_composite_failure(
&self,
trace_id: crate::identity::TraceId,
composite_op_id: super::composite::CompositeOpId,
sinks_committed: &[SinkKind],
sinks_failed: &[SinkKind],
at: std::time::SystemTime,
);
fn record_event(&self, event: FallbackAuditEvent);
}
#[derive(Debug, Clone, Copy)]
pub struct Panicked;
pub struct SinkPanicGuard;
impl SinkPanicGuard {
pub fn call<F, T>(f: F) -> Result<T, Panicked>
where
F: FnOnce() -> T + UnwindSafe,
{
std::panic::catch_unwind(AssertUnwindSafe(f)).map_err(|_| Panicked)
}
}
pub struct TerminatedSinkGuard<S> {
inner: S,
terminated: AtomicBool,
}
impl<S> TerminatedSinkGuard<S> {
#[must_use]
pub fn new(inner: S) -> Self {
TerminatedSinkGuard {
inner,
terminated: AtomicBool::new(false),
}
}
}
impl<S> UserAuditSink for TerminatedSinkGuard<S>
where
S: UserAuditSink + std::panic::RefUnwindSafe + Send + Sync,
{
fn record(&self, event: UserAuditEvent) -> Result<(), AuditError> {
if self.terminated.load(Ordering::Acquire) {
return Err(AuditError::Unavailable);
}
match SinkPanicGuard::call(|| self.inner.record(event)) {
Ok(result) => result,
Err(Panicked) => {
self.terminated.store(true, Ordering::Release);
Err(AuditError::Unavailable)
}
}
}
}