Skip to main content

palisade_errors/
ring_buffer.rs

1// src/ring_buffer.rs
2//! Ring buffer for bounded forensic logging with DoS protection.
3//!
4//! Prevents memory exhaustion attacks by maintaining a fixed-size buffer
5//! with FIFO eviction. Ideal for high-volume honeypot deployments where
6//! attackers might trigger thousands of errors per second.
7//!
8//! # Design Principles
9//!
10//! - **Bounded memory**: Fixed maximum size regardless of attack volume
11//! - **FIFO eviction**: Oldest entries dropped first, keeps recent attacks
12//! - **Per-entry size caps**: No single error can dominate the buffer
13//! - **RwLock-based**: Concurrent readers, exclusive writers
14//!
15//! # Performance Characteristics
16//!
17//! - Zero allocations for reads (uses Arc<str> for cheap cloning)
18//! - O(1) insertion and eviction
19//! - Concurrent read scalability (N readers simultaneously)
20//! - Fixed memory footprint (no growth/reallocation)
21//!
22//! # Example
23//!
24//! ```rust
25//! use palisade_errors::ring_buffer::RingBufferLogger;
26//! use palisade_errors::{AgentError, definitions};
27//!
28//! // Max 1000 entries, 2KB per entry = 2MB total
29//! let logger = RingBufferLogger::new(1000, 2048);
30//!
31//! // Log errors - oldest automatically evicted
32//! let err = AgentError::config(definitions::CFG_PARSE_FAILED, "op", "details");
33//! logger.log(&err, "192.168.1.100");
34//!
35//! // Retrieve recent entries for analysis
36//! let recent = logger.get_recent(10);
37//! for entry in recent {
38//!     println!("{:?}", entry);
39//! }
40//! ```
41
42use crate::AgentError;
43use smallvec::SmallVec;
44use std::borrow::Cow;
45use std::sync::RwLockReadGuard;
46use std::sync::RwLockWriteGuard;
47use std::sync::atomic::{AtomicU64, Ordering};
48use std::sync::{Arc, RwLock};
49use std::time::{SystemTime, UNIX_EPOCH};
50
51/// A single forensic log entry with bounded size.
52///
53/// Uses Arc<str> instead of String to enable cheap cloning (atomic refcount increment)
54/// without heap allocations. This is critical for high-frequency logging where
55/// get_recent() might be called thousands of times per second.
56#[derive(Clone, Debug)]
57pub struct ForensicEntry {
58    /// Unix timestamp of error creation
59    pub timestamp: u64,
60    /// Error code (e.g., "E-CFG-100") - shared immutable string
61    pub code: Arc<str>,
62    /// Operation that failed - shared immutable string
63    pub operation: Arc<str>,
64    /// Error details - shared immutable string
65    pub details: Arc<str>,
66    /// Source IP or identifier - shared immutable string
67    pub source_ip: Arc<str>,
68    /// Additional metadata from the error - exact-size allocation
69    pub metadata: Arc<[(Arc<str>, Arc<str>)]>,
70    /// Approximate size in bytes
71    pub size_bytes: usize,
72    /// Whether this error was marked retryable
73    pub retryable: bool,
74}
75
76/// Fixed-size ring buffer with exact allocation (no growth).
77struct RingBuffer {
78    /// Fixed-size array of entries (no Vec growth overhead)
79    entries: Box<[Option<ForensicEntry>]>,
80    /// Write position (tail)
81    tail: usize,
82    /// Read position (head)
83    head: usize,
84    /// Current number of entries
85    len: usize,
86}
87
88impl RingBuffer {
89    fn new(capacity: usize) -> Self {
90        Self {
91            entries: std::iter::repeat_with(|| None)
92                .take(capacity)
93                .collect::<Box<[Option<ForensicEntry>]>>(),
94            tail: 0,
95            head: 0,
96            len: 0,
97        }
98    }
99
100    fn push(&mut self, entry: ForensicEntry) -> Option<ForensicEntry> {
101        let evicted = self.entries[self.tail].replace(entry);
102        self.tail = (self.tail + 1) % self.entries.len();
103
104        if self.len < self.entries.len() {
105            self.len += 1;
106        } else {
107            self.head = (self.head + 1) % self.entries.len();
108        }
109
110        evicted
111    }
112
113    #[inline]
114    fn len(&self) -> usize {
115        self.len
116    }
117
118    #[inline]
119    fn capacity(&self) -> usize {
120        self.entries.len()
121    }
122
123    // Fixed: Return DoubleEndedIterator so rev() works in public methods
124    fn iter(&self) -> impl DoubleEndedIterator<Item = &ForensicEntry> {
125        let head = self.head;
126        let len = self.len;
127        let cap = self.entries.len();
128
129        (0..len).filter_map(move |i| {
130            let idx = (head + i) % cap;
131            self.entries[idx].as_ref()
132        })
133    }
134
135    fn clear(&mut self) {
136        for entry in self.entries.iter_mut() {
137            *entry = None;
138        }
139        self.head = 0;
140        self.tail = 0;
141        self.len = 0;
142    }
143}
144
145/// Ring buffer logger with bounded memory usage.
146///
147/// Uses RwLock for concurrent read scalability - multiple threads can
148/// call get_recent() simultaneously without contention.
149pub struct RingBufferLogger {
150    buffer: Arc<RwLock<RingBuffer>>,
151    max_entries: usize,
152    max_entry_bytes: usize,
153    eviction_count: Arc<AtomicU64>,
154}
155
156impl RingBufferLogger {
157    /// Create a new ring buffer logger.
158    ///
159    /// # Arguments
160    ///
161    /// * `max_entries` - Maximum number of entries before FIFO eviction
162    /// * `max_entry_bytes` - Maximum bytes per entry (defensive cap)
163    ///
164    /// # Example
165    ///
166    /// ```rust
167    /// use palisade_errors::ring_buffer::RingBufferLogger;
168    ///
169    /// // Typical honeypot: 10k entries, 1KB each = 10MB max
170    /// let logger = RingBufferLogger::new(10_000, 1024);
171    /// ```
172    pub fn new(max_entries: usize, max_entry_bytes: usize) -> Self {
173        let bounded_entries = max_entries.max(1);
174        Self {
175            buffer: Arc::new(RwLock::new(RingBuffer::new(bounded_entries))),
176            max_entries: bounded_entries,
177            max_entry_bytes,
178            eviction_count: Arc::new(AtomicU64::new(0)),
179        }
180    }
181
182    #[inline]
183    fn read_buffer(&self) -> RwLockReadGuard<'_, RingBuffer> {
184        match self.buffer.read() {
185            Ok(guard) => guard,
186            Err(poisoned) => poisoned.into_inner(),
187        }
188    }
189
190    #[inline]
191    fn write_buffer(&self) -> RwLockWriteGuard<'_, RingBuffer> {
192        match self.buffer.write() {
193            Ok(guard) => guard,
194            Err(poisoned) => poisoned.into_inner(),
195        }
196    }
197
198    /// Log an error with automatic eviction if buffer is full.
199    ///
200    /// # Arguments
201    ///
202    /// * `err` - The error to log
203    /// * `source_ip` - Source IP or identifier for tracking
204    ///
205    /// # Example
206    ///
207    /// ```rust
208    /// # use palisade_errors::ring_buffer::RingBufferLogger;
209    /// # use palisade_errors::{AgentError, definitions};
210    /// let logger = RingBufferLogger::new(100, 1024);
211    /// let err = AgentError::config(definitions::CFG_PARSE_FAILED, "op", "details");
212    /// logger.log(&err, "192.168.1.100");
213    /// ```
214    pub fn log(&self, err: &AgentError, source_ip: &str) {
215        let entry = self.create_entry(err, source_ip);
216
217        let mut buffer = self.write_buffer();
218
219        // Evict oldest entry if buffer is full
220        if let Some(_evicted) = buffer.push(entry) {
221            self.eviction_count.fetch_add(1, Ordering::Relaxed);
222        }
223    }
224
225    /// Create a bounded forensic entry from an error.
226    ///
227    /// Uses Cow and Arc to minimize allocations:
228    /// - Short strings that don't need truncation: zero allocations
229    /// - Truncated strings: single allocation for the truncated content
230    /// - Arc cloning: just atomic refcount increments
231    fn create_entry(&self, err: &AgentError, source_ip: &str) -> ForensicEntry {
232        err.with_internal_log(|log| {
233            let mut size = 0usize;
234            let mut remaining = self.max_entry_bytes;
235
236            // Truncate operation name (cap at 256 or remaining)
237            let op_cap = remaining.min(256);
238            let operation = truncate_to_bytes(log.operation(), op_cap);
239            let op_len = operation.len();
240            size += op_len;
241            remaining = remaining.saturating_sub(op_len);
242
243            // Truncate details (cap at 512 or remaining)
244            let details_cap = remaining.min(512);
245            let details = truncate_to_bytes(log.details(), details_cap);
246            let details_len = details.len();
247            size += details_len;
248            remaining = remaining.saturating_sub(details_len);
249
250            // Add metadata up to remaining space, values capped at 128 bytes each
251            let mut metadata_vec: SmallVec<[(Arc<str>, Arc<str>); 8]> = SmallVec::new();
252            for (k, v) in log.metadata() {
253                if remaining == 0 {
254                    break;
255                }
256                let key_len = k.len();
257                if key_len >= remaining {
258                    break;
259                }
260                let value_cap = (remaining - key_len).min(128);
261                if value_cap == 0 {
262                    break;
263                }
264                let value = truncate_to_bytes(v.as_str(), value_cap);
265                let used = key_len + value.len();
266                if used > remaining {
267                    break;
268                }
269                size += used;
270                remaining = remaining.saturating_sub(used);
271                
272                metadata_vec.push((Arc::from(*k), Arc::from(value.as_ref())));
273            }
274
275            let metadata: Arc<[(Arc<str>, Arc<str>)]> =
276                metadata_vec.into_vec().into_boxed_slice().into();
277
278            // Add source_ip if space permits (cap to 128 bytes)
279            let source_ip_str = if remaining == 0 {
280                Cow::Borrowed("[TRUNCATED]")
281            } else {
282                let source_ip = truncate_to_bytes(source_ip, remaining.min(128));
283                size += source_ip.len();
284                source_ip
285            };
286
287            ForensicEntry {
288                timestamp: SystemTime::now()
289                    .duration_since(UNIX_EPOCH)
290                    .map_or(0, |d| d.as_secs()),
291                code: Arc::from(log.code().to_string()),
292                operation: Arc::from(operation.as_ref()),
293                details: Arc::from(details.as_ref()),
294                source_ip: Arc::from(source_ip_str.as_ref()),
295                metadata,
296                size_bytes: size,
297                retryable: log.is_retryable(),
298            }
299        })
300    }
301
302    /// Get the N most recent entries in reverse chronological order.
303    ///
304    /// Uses read lock for concurrent access - multiple threads can call
305    /// this simultaneously without blocking each other.
306    ///
307    /// # Example
308    ///
309    /// ```rust
310    /// # use palisade_errors::ring_buffer::RingBufferLogger;
311    /// # let logger = RingBufferLogger::new(100, 1024);
312    /// // Get last 10 errors
313    /// let recent = logger.get_recent(10);
314    /// for entry in recent {
315    ///     println!("[{}] {} - {}", entry.timestamp, entry.code, entry.operation);
316    /// }
317    /// ```
318    pub fn get_recent(&self, count: usize) -> Vec<ForensicEntry> {
319        let buffer = self.read_buffer();
320        buffer
321            .iter()
322            .rev()
323            .take(count)
324            .cloned() // Cheap: just Arc refcount increments
325            .collect()
326    }
327
328    /// Get all entries in reverse chronological order.
329    pub fn get_all(&self) -> Vec<ForensicEntry> {
330        let buffer = self.read_buffer();
331        buffer.iter().rev().cloned().collect()
332    }
333
334    /// Get entries matching a predicate (e.g., filter by source IP).
335    ///
336    /// # Example
337    ///
338    /// ```rust
339    /// # use palisade_errors::ring_buffer::RingBufferLogger;
340    /// # let logger = RingBufferLogger::new(100, 1024);
341    /// // Get all errors from specific IP
342    /// let from_attacker = logger.get_filtered(|entry| {
343    ///     entry.source_ip.as_ref() == "192.168.1.100"
344    /// });
345    /// ```
346    pub fn get_filtered<F>(&self, predicate: F) -> Vec<ForensicEntry>
347    where
348        F: Fn(&ForensicEntry) -> bool,
349    {
350        let buffer = self.read_buffer();
351        buffer.iter().filter(|e| predicate(e)).cloned().collect()
352    }
353
354    /// Get current number of entries in buffer.
355    #[inline]
356    pub fn len(&self) -> usize {
357        let buffer = self.read_buffer();
358        buffer.len()
359    }
360
361    /// Check if buffer is empty.
362    #[inline]
363    pub fn is_empty(&self) -> bool {
364        self.len() == 0
365    }
366
367    /// Get total payload bytes (lower-bound estimate).
368    pub fn payload_bytes(&self) -> usize {
369        let buffer = self.read_buffer();
370        buffer.iter().map(|e| e.size_bytes).sum()
371    }
372
373    /// Get total number of evictions since creation.
374    ///
375    /// High eviction rate indicates sustained attack volume.
376    #[inline]
377    pub fn eviction_count(&self) -> u64 {
378        self.eviction_count.load(Ordering::Relaxed)
379    }
380
381    /// Clear all entries (useful after archival or testing).
382    pub fn clear(&self) {
383        let mut buffer = self.write_buffer();
384        buffer.clear();
385    }
386
387    /// Get buffer capacity.
388    #[inline]
389    pub fn capacity(&self) -> usize {
390        self.max_entries
391    }
392
393    /// Check if buffer is at capacity.
394    pub fn is_full(&self) -> bool {
395        self.len() >= self.max_entries
396    }
397}
398
399impl Clone for RingBufferLogger {
400    fn clone(&self) -> Self {
401        Self {
402            buffer: Arc::clone(&self.buffer),
403            max_entries: self.max_entries,
404            max_entry_bytes: self.max_entry_bytes,
405            eviction_count: Arc::clone(&self.eviction_count),
406        }
407    }
408}
409
410/// Truncate string to maximum byte length, respecting UTF-8 boundaries.
411///
412/// Returns Cow to avoid allocation when no truncation is needed (common case).
413// Fixed: Added lifetime 'a to signature
414fn truncate_to_bytes<'a>(s: &'a str, max_bytes: usize) -> Cow<'a, str> {
415    if max_bytes == 0 {
416        return Cow::Borrowed("");
417    }
418    if s.len() <= max_bytes {
419        // Common case: no truncation needed, zero allocations
420        return Cow::Borrowed(s);
421    }
422
423    // Reserve space for truncation indicator
424    let indicator = "...[TRUNC]";
425    if max_bytes <= indicator.len() {
426        return Cow::Borrowed(&indicator[..max_bytes]);
427    }
428    let max_content = max_bytes - indicator.len();
429
430    // Find last valid UTF-8 boundary
431    let mut idx = max_content;
432    while idx > 0 && !s.is_char_boundary(idx) {
433        idx -= 1;
434    }
435
436    if idx == 0 {
437        return Cow::Borrowed(indicator);
438    }
439
440    // Only path that allocates
441    let mut out = String::with_capacity(idx + indicator.len());
442    out.push_str(&s[..idx]);
443    out.push_str(indicator);
444    Cow::Owned(out)
445}
446
447#[cfg(test)]
448mod tests {
449    use super::*;
450    use crate::{AgentError, definitions};
451
452    #[test]
453    fn ring_buffer_evicts_oldest() {
454        let logger = RingBufferLogger::new(3, 1024);
455
456        for i in 0..5 {
457            let err = AgentError::config(
458                definitions::CFG_PARSE_FAILED,
459                "op",
460                format!("error {}", i),
461            );
462            logger.log(&err, "192.168.1.1");
463        }
464
465        // Should only have last 3
466        assert_eq!(logger.len(), 3);
467        assert_eq!(logger.eviction_count(), 2);
468
469        let entries = logger.get_all();
470        assert!(entries[0].details.contains("error 4"));
471        assert!(entries[2].details.contains("error 2"));
472    }
473
474    #[test]
475    fn ring_buffer_respects_size_limit() {
476        let logger = RingBufferLogger::new(100, 128);
477
478        let huge_details = "A".repeat(10000);
479        let err =
480            AgentError::config(definitions::CFG_PARSE_FAILED, "op", huge_details);
481        logger.log(&err, "192.168.1.1");
482
483        let entry = &logger.get_recent(1)[0];
484        assert!(entry.size_bytes <= 128);
485        assert!(entry.details.contains("TRUNC"));
486    }
487
488    #[test]
489    fn ring_buffer_filtering() {
490        let logger = RingBufferLogger::new(100, 1024);
491
492        for i in 0..10 {
493            let err = AgentError::config(
494                definitions::CFG_PARSE_FAILED,
495                "op",
496                format!("error {}", i),
497            );
498            let ip = if i % 2 == 0 {
499                "192.168.1.1"
500            } else {
501                "192.168.1.2"
502            };
503            logger.log(&err, ip);
504        }
505
506        let from_ip1 =
507            logger.get_filtered(|e| e.source_ip.as_ref() == "192.168.1.1");
508        assert_eq!(from_ip1.len(), 5);
509    }
510
511    #[test]
512    fn ring_buffer_clone_shares_state() {
513        let logger1 = RingBufferLogger::new(100, 1024);
514        let logger2 = logger1.clone();
515
516        let err = AgentError::config(definitions::CFG_PARSE_FAILED, "op", "test");
517        logger1.log(&err, "192.168.1.1");
518
519        // Both should see the entry
520        assert_eq!(logger1.len(), 1);
521        assert_eq!(logger2.len(), 1);
522    }
523
524    #[test]
525    fn truncate_respects_utf8() {
526        let emoji = "🔥".repeat(100);
527        let truncated = truncate_to_bytes(&emoji, 50);
528
529        // Should not panic and should be valid UTF-8
530        assert!(std::str::from_utf8(truncated.as_bytes()).is_ok());
531        assert!(truncated.len() <= 50);
532    }
533
534    #[test]
535    fn truncate_no_allocation_when_short() {
536        let s = "short";
537        let truncated = truncate_to_bytes(s, 100);
538
539        // Should be borrowed (zero allocations)
540        assert!(matches!(truncated, Cow::Borrowed(_)));
541        assert_eq!(truncated.as_ref(), s);
542    }
543
544    #[test]
545    fn ring_buffer_concurrent_logging() {
546        use std::thread;
547
548        let logger = RingBufferLogger::new(128, 256);
549        let mut handles = Vec::new();
550
551        for i in 0..8 {
552            let logger = logger.clone();
553            handles.push(thread::spawn(move || {
554                for j in 0..100 {
555                    let err = AgentError::config(
556                        definitions::CFG_PARSE_FAILED,
557                        "op",
558                        format!("t{}-{}", i, j),
559                    );
560                    logger.log(&err, "192.168.1.1");
561                }
562            }));
563        }
564
565        for handle in handles {
566            handle.join().expect("thread panicked");
567        }
568
569        assert_eq!(logger.len(), 128);
570        assert!(logger.eviction_count() > 0);
571    }
572
573    #[test]
574    fn ring_buffer_concurrent_reads() {
575        use std::thread;
576
577        let logger = RingBufferLogger::new(100, 256);
578
579        // Populate buffer
580        for i in 0..50 {
581            let err = AgentError::config(
582                definitions::CFG_PARSE_FAILED,
583                "op",
584                format!("error {}", i),
585            );
586            logger.log(&err, "192.168.1.1");
587        }
588
589        // Multiple threads reading simultaneously
590        let mut handles = Vec::new();
591        for _ in 0..8 {
592            let logger = logger.clone();
593            handles.push(thread::spawn(move || {
594                for _ in 0..1000 {
595                    let entries = logger.get_recent(10);
596                    assert!(!entries.is_empty());
597                }
598            }));
599        }
600
601        for handle in handles {
602            handle.join().expect("thread panicked");
603        }
604    }
605
606    #[test]
607    fn arc_str_cloning_is_cheap() {
608        let logger = RingBufferLogger::new(10, 1024);
609
610        let err = AgentError::config(
611            definitions::CFG_PARSE_FAILED,
612            "operation",
613            "details",
614        );
615        logger.log(&err, "192.168.1.1");
616
617        let entry1 = logger.get_recent(1)[0].clone();
618        let entry2 = logger.get_recent(1)[0].clone();
619
620        // Verify Arc pointers point to same allocation
621        assert!(Arc::ptr_eq(&entry1.code, &entry2.code));
622        assert!(Arc::ptr_eq(&entry1.operation, &entry2.operation));
623        assert!(Arc::ptr_eq(&entry1.details, &entry2.details));
624    }
625}