Skip to main content

palisade_errors/
ring_buffer.rs

1// src/ring_buffer.rs
2//! Ring buffer for bounded forensic logging with DoS protection.
3//!
4//! Prevents memory exhaustion attacks by maintaining a fixed-size buffer
5//! with FIFO eviction. Ideal for high-volume honeypot deployments where
6//! attackers might trigger thousands of errors per second.
7//!
8//! # Design Principles
9//!
10//! - **Bounded memory**: Fixed maximum size regardless of attack volume
11//! - **FIFO eviction**: Oldest entries dropped first, keeps recent attacks
12//! - **Per-entry size caps**: No single error can dominate the buffer
13//! - **RwLock-based**: Concurrent readers, exclusive writers
14//!
15//! # Performance Characteristics
16//!
17//! - Zero allocations for reads (uses Arc<str> for cheap cloning)
18//! - O(1) insertion and eviction
19//! - Concurrent read scalability (N readers simultaneously)
20//! - Fixed memory footprint (no growth/reallocation)
21//!
22//! # Example
23//!
24//! ```rust
25//! use palisade_errors::ring_buffer::RingBufferLogger;
26//! use palisade_errors::{AgentError, definitions};
27//!
28//! // Max 1000 entries, 2KB per entry = 2MB total
29//! let logger = RingBufferLogger::new(1000, 2048);
30//!
31//! // Log errors - oldest automatically evicted
32//! let err = AgentError::config(definitions::CFG_PARSE_FAILED, "op", "details");
33//! logger.log(&err, "192.168.1.100");
34//!
35//! // Retrieve recent entries for analysis
36//! let recent = logger.get_recent(10);
37//! for entry in recent {
38//!     println!("{:?}", entry);
39//! }
40//! ```
41
42use crate::AgentError;
43use std::borrow::Cow;
44use std::sync::atomic::{AtomicU64, Ordering};
45use std::sync::{Arc, RwLock};
46use std::time::{SystemTime, UNIX_EPOCH};
47
48/// A single forensic log entry with bounded size.
49///
50/// Uses Arc<str> instead of String to enable cheap cloning (atomic refcount increment)
51/// without heap allocations. This is critical for high-frequency logging where
52/// get_recent() might be called thousands of times per second.
53#[derive(Clone, Debug)]
54pub struct ForensicEntry {
55    /// Unix timestamp of error creation
56    pub timestamp: u64,
57    /// Error code (e.g., "E-CFG-100") - shared immutable string
58    pub code: Arc<str>,
59    /// Operation that failed - shared immutable string
60    pub operation: Arc<str>,
61    /// Error details - shared immutable string
62    pub details: Arc<str>,
63    /// Source IP or identifier - shared immutable string
64    pub source_ip: Arc<str>,
65    /// Additional metadata from the error - exact-size allocation
66    pub metadata: Arc<[(Arc<str>, Arc<str>)]>,
67    /// Approximate size in bytes
68    pub size_bytes: usize,
69    /// Whether this error was marked retryable
70    pub retryable: bool,
71}
72
73/// Fixed-size ring buffer with exact allocation (no growth).
74struct RingBuffer {
75    /// Fixed-size array of entries (no Vec growth overhead)
76    entries: Box<[Option<ForensicEntry>]>,
77    /// Write position (tail)
78    tail: usize,
79    /// Read position (head)
80    head: usize,
81    /// Current number of entries
82    len: usize,
83}
84
85impl RingBuffer {
86    fn new(capacity: usize) -> Self {
87        Self {
88            entries: (0..capacity)
89                .map(|_| None)
90                .collect::<Vec<_>>()
91                .into_boxed_slice(),
92            tail: 0,
93            head: 0,
94            len: 0,
95        }
96    }
97
98    fn push(&mut self, entry: ForensicEntry) -> Option<ForensicEntry> {
99        let evicted = self.entries[self.tail].replace(entry);
100        self.tail = (self.tail + 1) % self.entries.len();
101
102        if self.len < self.entries.len() {
103            self.len += 1;
104        } else {
105            self.head = (self.head + 1) % self.entries.len();
106        }
107
108        evicted
109    }
110
111    #[inline]
112    fn len(&self) -> usize {
113        self.len
114    }
115
116    #[inline]
117    fn capacity(&self) -> usize {
118        self.entries.len()
119    }
120
121    // Fixed: Return DoubleEndedIterator so rev() works in public methods
122    fn iter(&self) -> impl DoubleEndedIterator<Item = &ForensicEntry> {
123        let head = self.head;
124        let len = self.len;
125        let cap = self.entries.len();
126
127        (0..len).filter_map(move |i| {
128            let idx = (head + i) % cap;
129            self.entries[idx].as_ref()
130        })
131    }
132
133    fn clear(&mut self) {
134        for entry in self.entries.iter_mut() {
135            *entry = None;
136        }
137        self.head = 0;
138        self.tail = 0;
139        self.len = 0;
140    }
141}
142
143/// Ring buffer logger with bounded memory usage.
144///
145/// Uses RwLock for concurrent read scalability - multiple threads can
146/// call get_recent() simultaneously without contention.
147pub struct RingBufferLogger {
148    buffer: Arc<RwLock<RingBuffer>>,
149    max_entries: usize,
150    max_entry_bytes: usize,
151    eviction_count: Arc<AtomicU64>,
152}
153
154impl RingBufferLogger {
155    /// Create a new ring buffer logger.
156    ///
157    /// # Arguments
158    ///
159    /// * `max_entries` - Maximum number of entries before FIFO eviction
160    /// * `max_entry_bytes` - Maximum bytes per entry (defensive cap)
161    ///
162    /// # Example
163    ///
164    /// ```rust
165    /// use palisade_errors::ring_buffer::RingBufferLogger;
166    ///
167    /// // Typical honeypot: 10k entries, 1KB each = 10MB max
168    /// let logger = RingBufferLogger::new(10_000, 1024);
169    /// ```
170    pub fn new(max_entries: usize, max_entry_bytes: usize) -> Self {
171        Self {
172            buffer: Arc::new(RwLock::new(RingBuffer::new(max_entries))),
173            max_entries,
174            max_entry_bytes,
175            eviction_count: Arc::new(AtomicU64::new(0)),
176        }
177    }
178
179    /// Log an error with automatic eviction if buffer is full.
180    ///
181    /// # Arguments
182    ///
183    /// * `err` - The error to log
184    /// * `source_ip` - Source IP or identifier for tracking
185    ///
186    /// # Example
187    ///
188    /// ```rust
189    /// # use palisade_errors::ring_buffer::RingBufferLogger;
190    /// # use palisade_errors::{AgentError, definitions};
191    /// let logger = RingBufferLogger::new(100, 1024);
192    /// let err = AgentError::config(definitions::CFG_PARSE_FAILED, "op", "details");
193    /// logger.log(&err, "192.168.1.100");
194    /// ```
195    pub fn log(&self, err: &AgentError, source_ip: &str) {
196        let entry = self.create_entry(err, source_ip);
197
198        let mut buffer = self.buffer.write().unwrap();
199
200        // Evict oldest entry if buffer is full
201        if let Some(_evicted) = buffer.push(entry) {
202            self.eviction_count.fetch_add(1, Ordering::Relaxed);
203        }
204    }
205
206    /// Create a bounded forensic entry from an error.
207    ///
208    /// Uses Cow and Arc to minimize allocations:
209    /// - Short strings that don't need truncation: zero allocations
210    /// - Truncated strings: single allocation for the truncated content
211    /// - Arc cloning: just atomic refcount increments
212    fn create_entry(&self, err: &AgentError, source_ip: &str) -> ForensicEntry {
213        err.with_internal_log(|log| {
214            let mut size = 0usize;
215            let mut remaining = self.max_entry_bytes;
216
217            // Truncate operation name (cap at 256 or remaining)
218            let op_cap = remaining.min(256);
219            let operation = truncate_to_bytes(log.operation(), op_cap);
220            let op_len = operation.len();
221            size += op_len;
222            remaining = remaining.saturating_sub(op_len);
223
224            // Truncate details (cap at 512 or remaining)
225            let details_cap = remaining.min(512);
226            let details = truncate_to_bytes(log.details(), details_cap);
227            let details_len = details.len();
228            size += details_len;
229            remaining = remaining.saturating_sub(details_len);
230
231            // Add metadata up to remaining space, values capped at 128 bytes each
232            // Fixed: Explicit type to prevent ambiguity in Arc creation
233            let mut metadata_vec: Vec<(Arc<str>, Arc<str>)> = Vec::new();
234            for (k, v) in log.metadata() {
235                if remaining == 0 {
236                    break;
237                }
238                let key_len = k.len();
239                if key_len >= remaining {
240                    break;
241                }
242                let value_cap = (remaining - key_len).min(128);
243                if value_cap == 0 {
244                    break;
245                }
246                let value = truncate_to_bytes(v.as_str(), value_cap);
247                let used = key_len + value.len();
248                if used > remaining {
249                    break;
250                }
251                size += used;
252                remaining = remaining.saturating_sub(used);
253                
254                // Convert to Arc<str> for cheap cloning
255                // Fixed: explicitly use as_str() to ensure we get Arc<str> not Arc<&str>
256                metadata_vec.push((Arc::from(*k), Arc::from(value.as_ref())));
257            }
258
259            // Convert metadata Vec to Box<[T]> for exact-size allocation
260            let metadata: Arc<[(Arc<str>, Arc<str>)]> = metadata_vec.into_boxed_slice().into();
261
262            // Add source_ip if space permits (cap to 128 bytes)
263            let source_ip_str = if remaining == 0 {
264                Cow::Borrowed("[TRUNCATED]")
265            } else {
266                let source_ip = truncate_to_bytes(source_ip, remaining.min(128));
267                size += source_ip.len();
268                source_ip
269            };
270
271            ForensicEntry {
272                timestamp: SystemTime::now()
273                    .duration_since(UNIX_EPOCH)
274                    .unwrap()
275                    .as_secs(),
276                // Fixed: Enum to string conversion before Arc
277                code: Arc::from(log.code().to_string()),
278                operation: Arc::from(operation.as_ref()),
279                details: Arc::from(details.as_ref()),
280                source_ip: Arc::from(source_ip_str.as_ref()),
281                metadata,
282                size_bytes: size,
283                retryable: log.is_retryable(),
284            }
285        })
286    }
287
288    /// Get the N most recent entries in reverse chronological order.
289    ///
290    /// Uses read lock for concurrent access - multiple threads can call
291    /// this simultaneously without blocking each other.
292    ///
293    /// # Example
294    ///
295    /// ```rust
296    /// # use palisade_errors::ring_buffer::RingBufferLogger;
297    /// # let logger = RingBufferLogger::new(100, 1024);
298    /// // Get last 10 errors
299    /// let recent = logger.get_recent(10);
300    /// for entry in recent {
301    ///     println!("[{}] {} - {}", entry.timestamp, entry.code, entry.operation);
302    /// }
303    /// ```
304    pub fn get_recent(&self, count: usize) -> Vec<ForensicEntry> {
305        let buffer = self.buffer.read().unwrap();
306        buffer
307            .iter()
308            .rev()
309            .take(count)
310            .cloned() // Cheap: just Arc refcount increments
311            .collect()
312    }
313
314    /// Get all entries in reverse chronological order.
315    pub fn get_all(&self) -> Vec<ForensicEntry> {
316        let buffer = self.buffer.read().unwrap();
317        buffer.iter().rev().cloned().collect()
318    }
319
320    /// Get entries matching a predicate (e.g., filter by source IP).
321    ///
322    /// # Example
323    ///
324    /// ```rust
325    /// # use palisade_errors::ring_buffer::RingBufferLogger;
326    /// # let logger = RingBufferLogger::new(100, 1024);
327    /// // Get all errors from specific IP
328    /// let from_attacker = logger.get_filtered(|entry| {
329    ///     entry.source_ip.as_ref() == "192.168.1.100"
330    /// });
331    /// ```
332    pub fn get_filtered<F>(&self, predicate: F) -> Vec<ForensicEntry>
333    where
334        F: Fn(&ForensicEntry) -> bool,
335    {
336        let buffer = self.buffer.read().unwrap();
337        buffer.iter().filter(|e| predicate(e)).cloned().collect()
338    }
339
340    /// Get current number of entries in buffer.
341    #[inline]
342    pub fn len(&self) -> usize {
343        let buffer = self.buffer.read().unwrap();
344        buffer.len()
345    }
346
347    /// Check if buffer is empty.
348    #[inline]
349    pub fn is_empty(&self) -> bool {
350        self.len() == 0
351    }
352
353    /// Get total payload bytes (lower-bound estimate).
354    pub fn payload_bytes(&self) -> usize {
355        let buffer = self.buffer.read().unwrap();
356        buffer.iter().map(|e| e.size_bytes).sum()
357    }
358
359    /// Get total number of evictions since creation.
360    ///
361    /// High eviction rate indicates sustained attack volume.
362    #[inline]
363    pub fn eviction_count(&self) -> u64 {
364        self.eviction_count.load(Ordering::Relaxed)
365    }
366
367    /// Clear all entries (useful after archival or testing).
368    pub fn clear(&self) {
369        let mut buffer = self.buffer.write().unwrap();
370        buffer.clear();
371    }
372
373    /// Get buffer capacity.
374    #[inline]
375    pub fn capacity(&self) -> usize {
376        self.max_entries
377    }
378
379    /// Check if buffer is at capacity.
380    pub fn is_full(&self) -> bool {
381        self.len() >= self.max_entries
382    }
383}
384
385impl Clone for RingBufferLogger {
386    fn clone(&self) -> Self {
387        Self {
388            buffer: Arc::clone(&self.buffer),
389            max_entries: self.max_entries,
390            max_entry_bytes: self.max_entry_bytes,
391            eviction_count: Arc::clone(&self.eviction_count),
392        }
393    }
394}
395
396/// Truncate string to maximum byte length, respecting UTF-8 boundaries.
397///
398/// Returns Cow to avoid allocation when no truncation is needed (common case).
399// Fixed: Added lifetime 'a to signature
400fn truncate_to_bytes<'a>(s: &'a str, max_bytes: usize) -> Cow<'a, str> {
401    if max_bytes == 0 {
402        return Cow::Borrowed("");
403    }
404    if s.len() <= max_bytes {
405        // Common case: no truncation needed, zero allocations
406        return Cow::Borrowed(s);
407    }
408
409    // Reserve space for truncation indicator
410    let indicator = "...[TRUNC]";
411    if max_bytes <= indicator.len() {
412        return Cow::Borrowed(&indicator[..max_bytes]);
413    }
414    let max_content = max_bytes - indicator.len();
415
416    // Find last valid UTF-8 boundary
417    let mut idx = max_content;
418    while idx > 0 && !s.is_char_boundary(idx) {
419        idx -= 1;
420    }
421
422    if idx == 0 {
423        return Cow::Borrowed(indicator);
424    }
425
426    // Only path that allocates
427    Cow::Owned(format!("{}{}", &s[..idx], indicator))
428}
429
430#[cfg(test)]
431mod tests {
432    use super::*;
433    use crate::{AgentError, definitions};
434
435    #[test]
436    fn ring_buffer_evicts_oldest() {
437        let logger = RingBufferLogger::new(3, 1024);
438
439        for i in 0..5 {
440            let err = AgentError::config(
441                definitions::CFG_PARSE_FAILED,
442                "op",
443                format!("error {}", i),
444            );
445            logger.log(&err, "192.168.1.1");
446        }
447
448        // Should only have last 3
449        assert_eq!(logger.len(), 3);
450        assert_eq!(logger.eviction_count(), 2);
451
452        let entries = logger.get_all();
453        assert!(entries[0].details.contains("error 4"));
454        assert!(entries[2].details.contains("error 2"));
455    }
456
457    #[test]
458    fn ring_buffer_respects_size_limit() {
459        let logger = RingBufferLogger::new(100, 128);
460
461        let huge_details = "A".repeat(10000);
462        let err =
463            AgentError::config(definitions::CFG_PARSE_FAILED, "op", huge_details);
464        logger.log(&err, "192.168.1.1");
465
466        let entry = &logger.get_recent(1)[0];
467        assert!(entry.size_bytes <= 128);
468        assert!(entry.details.contains("TRUNC"));
469    }
470
471    #[test]
472    fn ring_buffer_filtering() {
473        let logger = RingBufferLogger::new(100, 1024);
474
475        for i in 0..10 {
476            let err = AgentError::config(
477                definitions::CFG_PARSE_FAILED,
478                "op",
479                format!("error {}", i),
480            );
481            let ip = if i % 2 == 0 {
482                "192.168.1.1"
483            } else {
484                "192.168.1.2"
485            };
486            logger.log(&err, ip);
487        }
488
489        let from_ip1 =
490            logger.get_filtered(|e| e.source_ip.as_ref() == "192.168.1.1");
491        assert_eq!(from_ip1.len(), 5);
492    }
493
494    #[test]
495    fn ring_buffer_clone_shares_state() {
496        let logger1 = RingBufferLogger::new(100, 1024);
497        let logger2 = logger1.clone();
498
499        let err = AgentError::config(definitions::CFG_PARSE_FAILED, "op", "test");
500        logger1.log(&err, "192.168.1.1");
501
502        // Both should see the entry
503        assert_eq!(logger1.len(), 1);
504        assert_eq!(logger2.len(), 1);
505    }
506
507    #[test]
508    fn truncate_respects_utf8() {
509        let emoji = "🔥".repeat(100);
510        let truncated = truncate_to_bytes(&emoji, 50);
511
512        // Should not panic and should be valid UTF-8
513        assert!(std::str::from_utf8(truncated.as_bytes()).is_ok());
514        assert!(truncated.len() <= 50);
515    }
516
517    #[test]
518    fn truncate_no_allocation_when_short() {
519        let s = "short";
520        let truncated = truncate_to_bytes(s, 100);
521
522        // Should be borrowed (zero allocations)
523        assert!(matches!(truncated, Cow::Borrowed(_)));
524        assert_eq!(truncated.as_ref(), s);
525    }
526
527    #[test]
528    fn ring_buffer_concurrent_logging() {
529        use std::thread;
530
531        let logger = RingBufferLogger::new(128, 256);
532        let mut handles = Vec::new();
533
534        for i in 0..8 {
535            let logger = logger.clone();
536            handles.push(thread::spawn(move || {
537                for j in 0..100 {
538                    let err = AgentError::config(
539                        definitions::CFG_PARSE_FAILED,
540                        "op",
541                        format!("t{}-{}", i, j),
542                    );
543                    logger.log(&err, "192.168.1.1");
544                }
545            }));
546        }
547
548        for handle in handles {
549            handle.join().expect("thread panicked");
550        }
551
552        assert_eq!(logger.len(), 128);
553        assert!(logger.eviction_count() > 0);
554    }
555
556    #[test]
557    fn ring_buffer_concurrent_reads() {
558        use std::thread;
559
560        let logger = RingBufferLogger::new(100, 256);
561
562        // Populate buffer
563        for i in 0..50 {
564            let err = AgentError::config(
565                definitions::CFG_PARSE_FAILED,
566                "op",
567                format!("error {}", i),
568            );
569            logger.log(&err, "192.168.1.1");
570        }
571
572        // Multiple threads reading simultaneously
573        let mut handles = Vec::new();
574        for _ in 0..8 {
575            let logger = logger.clone();
576            handles.push(thread::spawn(move || {
577                for _ in 0..1000 {
578                    let entries = logger.get_recent(10);
579                    assert!(!entries.is_empty());
580                }
581            }));
582        }
583
584        for handle in handles {
585            handle.join().expect("thread panicked");
586        }
587    }
588
589    #[test]
590    fn arc_str_cloning_is_cheap() {
591        let logger = RingBufferLogger::new(10, 1024);
592
593        let err = AgentError::config(
594            definitions::CFG_PARSE_FAILED,
595            "operation",
596            "details",
597        );
598        logger.log(&err, "192.168.1.1");
599
600        let entry1 = logger.get_recent(1)[0].clone();
601        let entry2 = logger.get_recent(1)[0].clone();
602
603        // Verify Arc pointers point to same allocation
604        assert!(Arc::ptr_eq(&entry1.code, &entry2.code));
605        assert!(Arc::ptr_eq(&entry1.operation, &entry2.operation));
606        assert!(Arc::ptr_eq(&entry1.details, &entry2.details));
607    }
608}