1use crate::AgentError;
43use smallvec::SmallVec;
44use std::borrow::Cow;
45use std::sync::RwLockReadGuard;
46use std::sync::RwLockWriteGuard;
47use std::sync::atomic::{AtomicU64, Ordering};
48use std::sync::{Arc, RwLock};
49use std::time::{SystemTime, UNIX_EPOCH};
50
51#[derive(Clone, Debug)]
57pub struct ForensicEntry {
58 pub timestamp: u64,
60 pub code: Arc<str>,
62 pub operation: Arc<str>,
64 pub details: Arc<str>,
66 pub source_ip: Arc<str>,
68 pub metadata: Arc<[(Arc<str>, Arc<str>)]>,
70 pub size_bytes: usize,
72 pub retryable: bool,
74}
75
76struct RingBuffer {
78 entries: Box<[Option<ForensicEntry>]>,
80 tail: usize,
82 head: usize,
84 len: usize,
86}
87
88impl RingBuffer {
89 fn new(capacity: usize) -> Self {
90 Self {
91 entries: std::iter::repeat_with(|| None)
92 .take(capacity)
93 .collect::<Box<[Option<ForensicEntry>]>>(),
94 tail: 0,
95 head: 0,
96 len: 0,
97 }
98 }
99
100 fn push(&mut self, entry: ForensicEntry) -> Option<ForensicEntry> {
101 let evicted = self.entries[self.tail].replace(entry);
102 self.tail = (self.tail + 1) % self.entries.len();
103
104 if self.len < self.entries.len() {
105 self.len += 1;
106 } else {
107 self.head = (self.head + 1) % self.entries.len();
108 }
109
110 evicted
111 }
112
113 #[inline]
114 fn len(&self) -> usize {
115 self.len
116 }
117
118 #[inline]
119 fn capacity(&self) -> usize {
120 self.entries.len()
121 }
122
123 fn iter(&self) -> impl DoubleEndedIterator<Item = &ForensicEntry> {
125 let head = self.head;
126 let len = self.len;
127 let cap = self.entries.len();
128
129 (0..len).filter_map(move |i| {
130 let idx = (head + i) % cap;
131 self.entries[idx].as_ref()
132 })
133 }
134
135 fn clear(&mut self) {
136 for entry in self.entries.iter_mut() {
137 *entry = None;
138 }
139 self.head = 0;
140 self.tail = 0;
141 self.len = 0;
142 }
143}
144
145pub struct RingBufferLogger {
150 buffer: Arc<RwLock<RingBuffer>>,
151 max_entries: usize,
152 max_entry_bytes: usize,
153 eviction_count: Arc<AtomicU64>,
154}
155
156impl RingBufferLogger {
157 pub fn new(max_entries: usize, max_entry_bytes: usize) -> Self {
173 let bounded_entries = max_entries.max(1);
174 Self {
175 buffer: Arc::new(RwLock::new(RingBuffer::new(bounded_entries))),
176 max_entries: bounded_entries,
177 max_entry_bytes,
178 eviction_count: Arc::new(AtomicU64::new(0)),
179 }
180 }
181
182 #[inline]
183 fn read_buffer(&self) -> RwLockReadGuard<'_, RingBuffer> {
184 match self.buffer.read() {
185 Ok(guard) => guard,
186 Err(poisoned) => poisoned.into_inner(),
187 }
188 }
189
190 #[inline]
191 fn write_buffer(&self) -> RwLockWriteGuard<'_, RingBuffer> {
192 match self.buffer.write() {
193 Ok(guard) => guard,
194 Err(poisoned) => poisoned.into_inner(),
195 }
196 }
197
198 pub fn log(&self, err: &AgentError, source_ip: &str) {
215 let entry = self.create_entry(err, source_ip);
216
217 let mut buffer = self.write_buffer();
218
219 if let Some(_evicted) = buffer.push(entry) {
221 self.eviction_count.fetch_add(1, Ordering::Relaxed);
222 }
223 }
224
225 fn create_entry(&self, err: &AgentError, source_ip: &str) -> ForensicEntry {
232 err.with_internal_log(|log| {
233 let mut size = 0usize;
234 let mut remaining = self.max_entry_bytes;
235
236 let op_cap = remaining.min(256);
238 let operation = truncate_to_bytes(log.operation(), op_cap);
239 let op_len = operation.len();
240 size += op_len;
241 remaining = remaining.saturating_sub(op_len);
242
243 let details_cap = remaining.min(512);
245 let details = truncate_to_bytes(log.details(), details_cap);
246 let details_len = details.len();
247 size += details_len;
248 remaining = remaining.saturating_sub(details_len);
249
250 let mut metadata_vec: SmallVec<[(Arc<str>, Arc<str>); 8]> = SmallVec::new();
252 for (k, v) in log.metadata() {
253 if remaining == 0 {
254 break;
255 }
256 let key_len = k.len();
257 if key_len >= remaining {
258 break;
259 }
260 let value_cap = (remaining - key_len).min(128);
261 if value_cap == 0 {
262 break;
263 }
264 let value = truncate_to_bytes(v.as_str(), value_cap);
265 let used = key_len + value.len();
266 if used > remaining {
267 break;
268 }
269 size += used;
270 remaining = remaining.saturating_sub(used);
271
272 metadata_vec.push((Arc::from(*k), Arc::from(value.as_ref())));
273 }
274
275 let metadata: Arc<[(Arc<str>, Arc<str>)]> =
276 metadata_vec.into_vec().into_boxed_slice().into();
277
278 let source_ip_str = if remaining == 0 {
280 Cow::Borrowed("[TRUNCATED]")
281 } else {
282 let source_ip = truncate_to_bytes(source_ip, remaining.min(128));
283 size += source_ip.len();
284 source_ip
285 };
286
287 ForensicEntry {
288 timestamp: SystemTime::now()
289 .duration_since(UNIX_EPOCH)
290 .map_or(0, |d| d.as_secs()),
291 code: Arc::from(log.code().to_string()),
292 operation: Arc::from(operation.as_ref()),
293 details: Arc::from(details.as_ref()),
294 source_ip: Arc::from(source_ip_str.as_ref()),
295 metadata,
296 size_bytes: size,
297 retryable: log.is_retryable(),
298 }
299 })
300 }
301
302 pub fn get_recent(&self, count: usize) -> Vec<ForensicEntry> {
319 let buffer = self.read_buffer();
320 buffer
321 .iter()
322 .rev()
323 .take(count)
324 .cloned() .collect()
326 }
327
328 pub fn get_all(&self) -> Vec<ForensicEntry> {
330 let buffer = self.read_buffer();
331 buffer.iter().rev().cloned().collect()
332 }
333
334 pub fn get_filtered<F>(&self, predicate: F) -> Vec<ForensicEntry>
347 where
348 F: Fn(&ForensicEntry) -> bool,
349 {
350 let buffer = self.read_buffer();
351 buffer.iter().filter(|e| predicate(e)).cloned().collect()
352 }
353
354 #[inline]
356 pub fn len(&self) -> usize {
357 let buffer = self.read_buffer();
358 buffer.len()
359 }
360
361 #[inline]
363 pub fn is_empty(&self) -> bool {
364 self.len() == 0
365 }
366
367 pub fn payload_bytes(&self) -> usize {
369 let buffer = self.read_buffer();
370 buffer.iter().map(|e| e.size_bytes).sum()
371 }
372
373 #[inline]
377 pub fn eviction_count(&self) -> u64 {
378 self.eviction_count.load(Ordering::Relaxed)
379 }
380
381 pub fn clear(&self) {
383 let mut buffer = self.write_buffer();
384 buffer.clear();
385 }
386
387 #[inline]
389 pub fn capacity(&self) -> usize {
390 self.max_entries
391 }
392
393 pub fn is_full(&self) -> bool {
395 self.len() >= self.max_entries
396 }
397}
398
399impl Clone for RingBufferLogger {
400 fn clone(&self) -> Self {
401 Self {
402 buffer: Arc::clone(&self.buffer),
403 max_entries: self.max_entries,
404 max_entry_bytes: self.max_entry_bytes,
405 eviction_count: Arc::clone(&self.eviction_count),
406 }
407 }
408}
409
410fn truncate_to_bytes<'a>(s: &'a str, max_bytes: usize) -> Cow<'a, str> {
415 if max_bytes == 0 {
416 return Cow::Borrowed("");
417 }
418 if s.len() <= max_bytes {
419 return Cow::Borrowed(s);
421 }
422
423 let indicator = "...[TRUNC]";
425 if max_bytes <= indicator.len() {
426 return Cow::Borrowed(&indicator[..max_bytes]);
427 }
428 let max_content = max_bytes - indicator.len();
429
430 let mut idx = max_content;
432 while idx > 0 && !s.is_char_boundary(idx) {
433 idx -= 1;
434 }
435
436 if idx == 0 {
437 return Cow::Borrowed(indicator);
438 }
439
440 let mut out = String::with_capacity(idx + indicator.len());
442 out.push_str(&s[..idx]);
443 out.push_str(indicator);
444 Cow::Owned(out)
445}
446
447#[cfg(test)]
448mod tests {
449 use super::*;
450 use crate::{AgentError, definitions};
451
452 #[test]
453 fn ring_buffer_evicts_oldest() {
454 let logger = RingBufferLogger::new(3, 1024);
455
456 for i in 0..5 {
457 let err = AgentError::config(
458 definitions::CFG_PARSE_FAILED,
459 "op",
460 format!("error {}", i),
461 );
462 logger.log(&err, "192.168.1.1");
463 }
464
465 assert_eq!(logger.len(), 3);
467 assert_eq!(logger.eviction_count(), 2);
468
469 let entries = logger.get_all();
470 assert!(entries[0].details.contains("error 4"));
471 assert!(entries[2].details.contains("error 2"));
472 }
473
474 #[test]
475 fn ring_buffer_respects_size_limit() {
476 let logger = RingBufferLogger::new(100, 128);
477
478 let huge_details = "A".repeat(10000);
479 let err =
480 AgentError::config(definitions::CFG_PARSE_FAILED, "op", huge_details);
481 logger.log(&err, "192.168.1.1");
482
483 let entry = &logger.get_recent(1)[0];
484 assert!(entry.size_bytes <= 128);
485 assert!(entry.details.contains("TRUNC"));
486 }
487
488 #[test]
489 fn ring_buffer_filtering() {
490 let logger = RingBufferLogger::new(100, 1024);
491
492 for i in 0..10 {
493 let err = AgentError::config(
494 definitions::CFG_PARSE_FAILED,
495 "op",
496 format!("error {}", i),
497 );
498 let ip = if i % 2 == 0 {
499 "192.168.1.1"
500 } else {
501 "192.168.1.2"
502 };
503 logger.log(&err, ip);
504 }
505
506 let from_ip1 =
507 logger.get_filtered(|e| e.source_ip.as_ref() == "192.168.1.1");
508 assert_eq!(from_ip1.len(), 5);
509 }
510
511 #[test]
512 fn ring_buffer_clone_shares_state() {
513 let logger1 = RingBufferLogger::new(100, 1024);
514 let logger2 = logger1.clone();
515
516 let err = AgentError::config(definitions::CFG_PARSE_FAILED, "op", "test");
517 logger1.log(&err, "192.168.1.1");
518
519 assert_eq!(logger1.len(), 1);
521 assert_eq!(logger2.len(), 1);
522 }
523
524 #[test]
525 fn truncate_respects_utf8() {
526 let emoji = "🔥".repeat(100);
527 let truncated = truncate_to_bytes(&emoji, 50);
528
529 assert!(std::str::from_utf8(truncated.as_bytes()).is_ok());
531 assert!(truncated.len() <= 50);
532 }
533
534 #[test]
535 fn truncate_no_allocation_when_short() {
536 let s = "short";
537 let truncated = truncate_to_bytes(s, 100);
538
539 assert!(matches!(truncated, Cow::Borrowed(_)));
541 assert_eq!(truncated.as_ref(), s);
542 }
543
544 #[test]
545 fn ring_buffer_concurrent_logging() {
546 use std::thread;
547
548 let logger = RingBufferLogger::new(128, 256);
549 let mut handles = Vec::new();
550
551 for i in 0..8 {
552 let logger = logger.clone();
553 handles.push(thread::spawn(move || {
554 for j in 0..100 {
555 let err = AgentError::config(
556 definitions::CFG_PARSE_FAILED,
557 "op",
558 format!("t{}-{}", i, j),
559 );
560 logger.log(&err, "192.168.1.1");
561 }
562 }));
563 }
564
565 for handle in handles {
566 handle.join().expect("thread panicked");
567 }
568
569 assert_eq!(logger.len(), 128);
570 assert!(logger.eviction_count() > 0);
571 }
572
573 #[test]
574 fn ring_buffer_concurrent_reads() {
575 use std::thread;
576
577 let logger = RingBufferLogger::new(100, 256);
578
579 for i in 0..50 {
581 let err = AgentError::config(
582 definitions::CFG_PARSE_FAILED,
583 "op",
584 format!("error {}", i),
585 );
586 logger.log(&err, "192.168.1.1");
587 }
588
589 let mut handles = Vec::new();
591 for _ in 0..8 {
592 let logger = logger.clone();
593 handles.push(thread::spawn(move || {
594 for _ in 0..1000 {
595 let entries = logger.get_recent(10);
596 assert!(!entries.is_empty());
597 }
598 }));
599 }
600
601 for handle in handles {
602 handle.join().expect("thread panicked");
603 }
604 }
605
606 #[test]
607 fn arc_str_cloning_is_cheap() {
608 let logger = RingBufferLogger::new(10, 1024);
609
610 let err = AgentError::config(
611 definitions::CFG_PARSE_FAILED,
612 "operation",
613 "details",
614 );
615 logger.log(&err, "192.168.1.1");
616
617 let entry1 = logger.get_recent(1)[0].clone();
618 let entry2 = logger.get_recent(1)[0].clone();
619
620 assert!(Arc::ptr_eq(&entry1.code, &entry2.code));
622 assert!(Arc::ptr_eq(&entry1.operation, &entry2.operation));
623 assert!(Arc::ptr_eq(&entry1.details, &entry2.details));
624 }
625}