use std::sync::Arc;
use super::logs::LogRecord;
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct LogAppendError {
pub reason: String,
}
impl std::fmt::Display for LogAppendError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "log-chain append failed: {}", self.reason)
}
}
impl std::error::Error for LogAppendError {}
pub trait LogChainAppender: Send + Sync + 'static {
fn append(&self, record: &LogRecord) -> Result<(), LogAppendError>;
}
#[derive(Debug, Default)]
pub struct NoOpLogChainAppender;
impl LogChainAppender for NoOpLogChainAppender {
fn append(&self, _record: &LogRecord) -> Result<(), LogAppendError> {
Ok(())
}
}
pub const DEFAULT_LOG_BUFFERING_APPENDER_CAPACITY: usize = 16_384;
#[derive(Debug)]
pub struct BufferingLogChainAppender {
records: parking_lot::Mutex<std::collections::VecDeque<LogRecord>>,
capacity: usize,
dropped: std::sync::atomic::AtomicU64,
}
impl Default for BufferingLogChainAppender {
fn default() -> Self {
Self::with_capacity(DEFAULT_LOG_BUFFERING_APPENDER_CAPACITY)
}
}
impl BufferingLogChainAppender {
pub fn with_capacity(capacity: usize) -> Self {
Self {
records: parking_lot::Mutex::new(std::collections::VecDeque::new()),
capacity: capacity.max(1),
dropped: std::sync::atomic::AtomicU64::new(0),
}
}
pub fn captured(&self) -> Vec<LogRecord> {
self.records.lock().iter().cloned().collect()
}
pub fn dropped_count(&self) -> u64 {
self.dropped.load(std::sync::atomic::Ordering::Relaxed)
}
}
impl LogChainAppender for BufferingLogChainAppender {
fn append(&self, record: &LogRecord) -> Result<(), LogAppendError> {
let mut buf = self.records.lock();
if buf.len() >= self.capacity {
buf.pop_front();
self.dropped
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
}
buf.push_back(record.clone());
Ok(())
}
}
pub(crate) fn no_op_arc() -> Arc<dyn LogChainAppender> {
Arc::new(NoOpLogChainAppender)
}
#[cfg(test)]
mod tests {
use super::super::logs::LogLevel;
use super::*;
fn fixture(seq: u64) -> LogRecord {
LogRecord {
seq,
ts_ms: 1_700_000_000_000 + seq,
level: LogLevel::Info,
daemon_id: Some(7),
node_id: Some(100),
message: format!("message {seq}"),
chain_pending: false,
}
}
#[test]
fn no_op_returns_ok_for_every_record() {
let app = NoOpLogChainAppender;
app.append(&fixture(1)).expect("no_op should be infallible");
app.append(&fixture(2)).expect("no_op should be infallible");
}
#[test]
fn buffering_captures_records_in_order() {
let app = BufferingLogChainAppender::default();
for i in 1..=3 {
app.append(&fixture(i)).unwrap();
}
let captured = app.captured();
assert_eq!(captured.len(), 3);
assert_eq!(captured[0].seq, 1);
assert_eq!(captured[2].seq, 3);
}
#[test]
fn buffering_drops_oldest_when_over_capacity() {
let app = BufferingLogChainAppender::with_capacity(2);
for i in 1..=5 {
app.append(&fixture(i)).unwrap();
}
let captured = app.captured();
assert_eq!(captured.len(), 2);
assert_eq!(captured[0].seq, 4);
assert_eq!(captured[1].seq, 5);
assert_eq!(app.dropped_count(), 3);
}
}