use std::sync::Arc;
use super::ice::AdminAuditRecord;
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct AdminAuditAppendError {
pub reason: String,
}
impl std::fmt::Display for AdminAuditAppendError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "admin-audit-chain append failed: {}", self.reason)
}
}
impl std::error::Error for AdminAuditAppendError {}
pub trait AdminAuditChainAppender: Send + Sync + 'static {
fn append(&self, record: &AdminAuditRecord) -> Result<(), AdminAuditAppendError>;
}
#[derive(Debug, Default)]
pub struct NoOpAdminAuditChainAppender;
impl AdminAuditChainAppender for NoOpAdminAuditChainAppender {
fn append(&self, _record: &AdminAuditRecord) -> Result<(), AdminAuditAppendError> {
Ok(())
}
}
pub const DEFAULT_AUDIT_BUFFERING_APPENDER_CAPACITY: usize = 4096;
#[derive(Debug)]
pub struct BufferingAdminAuditChainAppender {
records: parking_lot::Mutex<std::collections::VecDeque<AdminAuditRecord>>,
capacity: usize,
dropped: std::sync::atomic::AtomicU64,
}
impl Default for BufferingAdminAuditChainAppender {
fn default() -> Self {
Self::with_capacity(DEFAULT_AUDIT_BUFFERING_APPENDER_CAPACITY)
}
}
impl BufferingAdminAuditChainAppender {
pub fn with_capacity(capacity: usize) -> Self {
Self {
records: parking_lot::Mutex::new(std::collections::VecDeque::new()),
capacity: capacity.max(1),
dropped: std::sync::atomic::AtomicU64::new(0),
}
}
pub fn captured(&self) -> Vec<AdminAuditRecord> {
self.records.lock().iter().cloned().collect()
}
pub fn dropped_count(&self) -> u64 {
self.dropped.load(std::sync::atomic::Ordering::Relaxed)
}
}
impl AdminAuditChainAppender for BufferingAdminAuditChainAppender {
fn append(&self, record: &AdminAuditRecord) -> Result<(), AdminAuditAppendError> {
let mut buf = self.records.lock();
if buf.len() >= self.capacity {
buf.pop_front();
self.dropped
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
}
buf.push_back(record.clone());
Ok(())
}
}
pub(crate) fn no_op_arc() -> Arc<dyn AdminAuditChainAppender> {
Arc::new(NoOpAdminAuditChainAppender)
}
#[cfg(test)]
mod tests {
use super::super::event::AdminEvent;
use super::super::ice::VerificationOutcome;
use super::*;
use std::time::Duration;
fn fixture(seq: u64) -> AdminAuditRecord {
AdminAuditRecord {
seq,
committed_at_ms: 1_000 + seq,
event: AdminEvent::FreezeCluster {
ttl: Duration::from_secs(30),
},
operator_ids: vec![7],
outcome: VerificationOutcome::Accepted,
chain_pending: false,
}
}
#[test]
fn no_op_returns_ok_for_every_record() {
let app = NoOpAdminAuditChainAppender;
app.append(&fixture(1)).expect("no_op should be infallible");
app.append(&fixture(2)).expect("no_op should be infallible");
}
#[test]
fn buffering_captures_records_in_order() {
let app = BufferingAdminAuditChainAppender::default();
for i in 1..=3 {
app.append(&fixture(i)).unwrap();
}
let captured = app.captured();
assert_eq!(captured.len(), 3);
assert_eq!(captured[0].seq, 1);
assert_eq!(captured[2].seq, 3);
}
#[test]
fn buffering_drops_oldest_when_over_capacity() {
let app = BufferingAdminAuditChainAppender::with_capacity(2);
for i in 1..=5 {
app.append(&fixture(i)).unwrap();
}
let captured = app.captured();
assert_eq!(captured.len(), 2);
assert_eq!(captured[0].seq, 4);
assert_eq!(captured[1].seq, 5);
assert_eq!(app.dropped_count(), 3);
}
#[test]
fn buffering_with_capacity_zero_clamps_to_one() {
let app = BufferingAdminAuditChainAppender::with_capacity(0);
app.append(&fixture(1)).unwrap();
app.append(&fixture(2)).unwrap();
let captured = app.captured();
assert_eq!(captured.len(), 1);
assert_eq!(captured[0].seq, 2);
assert_eq!(app.dropped_count(), 1);
}
}