1use crate::AgentError;
43use std::borrow::Cow;
44use std::sync::atomic::{AtomicU64, Ordering};
45use std::sync::{Arc, RwLock};
46use std::time::{SystemTime, UNIX_EPOCH};
47
48#[derive(Clone, Debug)]
54pub struct ForensicEntry {
55 pub timestamp: u64,
57 pub code: Arc<str>,
59 pub operation: Arc<str>,
61 pub details: Arc<str>,
63 pub source_ip: Arc<str>,
65 pub metadata: Arc<[(Arc<str>, Arc<str>)]>,
67 pub size_bytes: usize,
69 pub retryable: bool,
71}
72
73struct RingBuffer {
75 entries: Box<[Option<ForensicEntry>]>,
77 tail: usize,
79 head: usize,
81 len: usize,
83}
84
85impl RingBuffer {
86 fn new(capacity: usize) -> Self {
87 Self {
88 entries: (0..capacity)
89 .map(|_| None)
90 .collect::<Vec<_>>()
91 .into_boxed_slice(),
92 tail: 0,
93 head: 0,
94 len: 0,
95 }
96 }
97
98 fn push(&mut self, entry: ForensicEntry) -> Option<ForensicEntry> {
99 let evicted = self.entries[self.tail].replace(entry);
100 self.tail = (self.tail + 1) % self.entries.len();
101
102 if self.len < self.entries.len() {
103 self.len += 1;
104 } else {
105 self.head = (self.head + 1) % self.entries.len();
106 }
107
108 evicted
109 }
110
111 #[inline]
112 fn len(&self) -> usize {
113 self.len
114 }
115
116 #[inline]
117 fn capacity(&self) -> usize {
118 self.entries.len()
119 }
120
121 fn iter(&self) -> impl DoubleEndedIterator<Item = &ForensicEntry> {
123 let head = self.head;
124 let len = self.len;
125 let cap = self.entries.len();
126
127 (0..len).filter_map(move |i| {
128 let idx = (head + i) % cap;
129 self.entries[idx].as_ref()
130 })
131 }
132
133 fn clear(&mut self) {
134 for entry in self.entries.iter_mut() {
135 *entry = None;
136 }
137 self.head = 0;
138 self.tail = 0;
139 self.len = 0;
140 }
141}
142
143pub struct RingBufferLogger {
148 buffer: Arc<RwLock<RingBuffer>>,
149 max_entries: usize,
150 max_entry_bytes: usize,
151 eviction_count: Arc<AtomicU64>,
152}
153
154impl RingBufferLogger {
155 pub fn new(max_entries: usize, max_entry_bytes: usize) -> Self {
171 Self {
172 buffer: Arc::new(RwLock::new(RingBuffer::new(max_entries))),
173 max_entries,
174 max_entry_bytes,
175 eviction_count: Arc::new(AtomicU64::new(0)),
176 }
177 }
178
179 pub fn log(&self, err: &AgentError, source_ip: &str) {
196 let entry = self.create_entry(err, source_ip);
197
198 let mut buffer = self.buffer.write().unwrap();
199
200 if let Some(_evicted) = buffer.push(entry) {
202 self.eviction_count.fetch_add(1, Ordering::Relaxed);
203 }
204 }
205
206 fn create_entry(&self, err: &AgentError, source_ip: &str) -> ForensicEntry {
213 err.with_internal_log(|log| {
214 let mut size = 0usize;
215 let mut remaining = self.max_entry_bytes;
216
217 let op_cap = remaining.min(256);
219 let operation = truncate_to_bytes(log.operation(), op_cap);
220 let op_len = operation.len();
221 size += op_len;
222 remaining = remaining.saturating_sub(op_len);
223
224 let details_cap = remaining.min(512);
226 let details = truncate_to_bytes(log.details(), details_cap);
227 let details_len = details.len();
228 size += details_len;
229 remaining = remaining.saturating_sub(details_len);
230
231 let mut metadata_vec: Vec<(Arc<str>, Arc<str>)> = Vec::new();
234 for (k, v) in log.metadata() {
235 if remaining == 0 {
236 break;
237 }
238 let key_len = k.len();
239 if key_len >= remaining {
240 break;
241 }
242 let value_cap = (remaining - key_len).min(128);
243 if value_cap == 0 {
244 break;
245 }
246 let value = truncate_to_bytes(v.as_str(), value_cap);
247 let used = key_len + value.len();
248 if used > remaining {
249 break;
250 }
251 size += used;
252 remaining = remaining.saturating_sub(used);
253
254 metadata_vec.push((Arc::from(*k), Arc::from(value.as_ref())));
257 }
258
259 let metadata: Arc<[(Arc<str>, Arc<str>)]> = metadata_vec.into_boxed_slice().into();
261
262 let source_ip_str = if remaining == 0 {
264 Cow::Borrowed("[TRUNCATED]")
265 } else {
266 let source_ip = truncate_to_bytes(source_ip, remaining.min(128));
267 size += source_ip.len();
268 source_ip
269 };
270
271 ForensicEntry {
272 timestamp: SystemTime::now()
273 .duration_since(UNIX_EPOCH)
274 .unwrap()
275 .as_secs(),
276 code: Arc::from(log.code().to_string()),
278 operation: Arc::from(operation.as_ref()),
279 details: Arc::from(details.as_ref()),
280 source_ip: Arc::from(source_ip_str.as_ref()),
281 metadata,
282 size_bytes: size,
283 retryable: log.is_retryable(),
284 }
285 })
286 }
287
288 pub fn get_recent(&self, count: usize) -> Vec<ForensicEntry> {
305 let buffer = self.buffer.read().unwrap();
306 buffer
307 .iter()
308 .rev()
309 .take(count)
310 .cloned() .collect()
312 }
313
314 pub fn get_all(&self) -> Vec<ForensicEntry> {
316 let buffer = self.buffer.read().unwrap();
317 buffer.iter().rev().cloned().collect()
318 }
319
320 pub fn get_filtered<F>(&self, predicate: F) -> Vec<ForensicEntry>
333 where
334 F: Fn(&ForensicEntry) -> bool,
335 {
336 let buffer = self.buffer.read().unwrap();
337 buffer.iter().filter(|e| predicate(e)).cloned().collect()
338 }
339
340 #[inline]
342 pub fn len(&self) -> usize {
343 let buffer = self.buffer.read().unwrap();
344 buffer.len()
345 }
346
347 #[inline]
349 pub fn is_empty(&self) -> bool {
350 self.len() == 0
351 }
352
353 pub fn payload_bytes(&self) -> usize {
355 let buffer = self.buffer.read().unwrap();
356 buffer.iter().map(|e| e.size_bytes).sum()
357 }
358
359 #[inline]
363 pub fn eviction_count(&self) -> u64 {
364 self.eviction_count.load(Ordering::Relaxed)
365 }
366
367 pub fn clear(&self) {
369 let mut buffer = self.buffer.write().unwrap();
370 buffer.clear();
371 }
372
373 #[inline]
375 pub fn capacity(&self) -> usize {
376 self.max_entries
377 }
378
379 pub fn is_full(&self) -> bool {
381 self.len() >= self.max_entries
382 }
383}
384
385impl Clone for RingBufferLogger {
386 fn clone(&self) -> Self {
387 Self {
388 buffer: Arc::clone(&self.buffer),
389 max_entries: self.max_entries,
390 max_entry_bytes: self.max_entry_bytes,
391 eviction_count: Arc::clone(&self.eviction_count),
392 }
393 }
394}
395
396fn truncate_to_bytes<'a>(s: &'a str, max_bytes: usize) -> Cow<'a, str> {
401 if max_bytes == 0 {
402 return Cow::Borrowed("");
403 }
404 if s.len() <= max_bytes {
405 return Cow::Borrowed(s);
407 }
408
409 let indicator = "...[TRUNC]";
411 if max_bytes <= indicator.len() {
412 return Cow::Borrowed(&indicator[..max_bytes]);
413 }
414 let max_content = max_bytes - indicator.len();
415
416 let mut idx = max_content;
418 while idx > 0 && !s.is_char_boundary(idx) {
419 idx -= 1;
420 }
421
422 if idx == 0 {
423 return Cow::Borrowed(indicator);
424 }
425
426 Cow::Owned(format!("{}{}", &s[..idx], indicator))
428}
429
430#[cfg(test)]
431mod tests {
432 use super::*;
433 use crate::{AgentError, definitions};
434
435 #[test]
436 fn ring_buffer_evicts_oldest() {
437 let logger = RingBufferLogger::new(3, 1024);
438
439 for i in 0..5 {
440 let err = AgentError::config(
441 definitions::CFG_PARSE_FAILED,
442 "op",
443 format!("error {}", i),
444 );
445 logger.log(&err, "192.168.1.1");
446 }
447
448 assert_eq!(logger.len(), 3);
450 assert_eq!(logger.eviction_count(), 2);
451
452 let entries = logger.get_all();
453 assert!(entries[0].details.contains("error 4"));
454 assert!(entries[2].details.contains("error 2"));
455 }
456
457 #[test]
458 fn ring_buffer_respects_size_limit() {
459 let logger = RingBufferLogger::new(100, 128);
460
461 let huge_details = "A".repeat(10000);
462 let err =
463 AgentError::config(definitions::CFG_PARSE_FAILED, "op", huge_details);
464 logger.log(&err, "192.168.1.1");
465
466 let entry = &logger.get_recent(1)[0];
467 assert!(entry.size_bytes <= 128);
468 assert!(entry.details.contains("TRUNC"));
469 }
470
471 #[test]
472 fn ring_buffer_filtering() {
473 let logger = RingBufferLogger::new(100, 1024);
474
475 for i in 0..10 {
476 let err = AgentError::config(
477 definitions::CFG_PARSE_FAILED,
478 "op",
479 format!("error {}", i),
480 );
481 let ip = if i % 2 == 0 {
482 "192.168.1.1"
483 } else {
484 "192.168.1.2"
485 };
486 logger.log(&err, ip);
487 }
488
489 let from_ip1 =
490 logger.get_filtered(|e| e.source_ip.as_ref() == "192.168.1.1");
491 assert_eq!(from_ip1.len(), 5);
492 }
493
494 #[test]
495 fn ring_buffer_clone_shares_state() {
496 let logger1 = RingBufferLogger::new(100, 1024);
497 let logger2 = logger1.clone();
498
499 let err = AgentError::config(definitions::CFG_PARSE_FAILED, "op", "test");
500 logger1.log(&err, "192.168.1.1");
501
502 assert_eq!(logger1.len(), 1);
504 assert_eq!(logger2.len(), 1);
505 }
506
507 #[test]
508 fn truncate_respects_utf8() {
509 let emoji = "🔥".repeat(100);
510 let truncated = truncate_to_bytes(&emoji, 50);
511
512 assert!(std::str::from_utf8(truncated.as_bytes()).is_ok());
514 assert!(truncated.len() <= 50);
515 }
516
517 #[test]
518 fn truncate_no_allocation_when_short() {
519 let s = "short";
520 let truncated = truncate_to_bytes(s, 100);
521
522 assert!(matches!(truncated, Cow::Borrowed(_)));
524 assert_eq!(truncated.as_ref(), s);
525 }
526
527 #[test]
528 fn ring_buffer_concurrent_logging() {
529 use std::thread;
530
531 let logger = RingBufferLogger::new(128, 256);
532 let mut handles = Vec::new();
533
534 for i in 0..8 {
535 let logger = logger.clone();
536 handles.push(thread::spawn(move || {
537 for j in 0..100 {
538 let err = AgentError::config(
539 definitions::CFG_PARSE_FAILED,
540 "op",
541 format!("t{}-{}", i, j),
542 );
543 logger.log(&err, "192.168.1.1");
544 }
545 }));
546 }
547
548 for handle in handles {
549 handle.join().expect("thread panicked");
550 }
551
552 assert_eq!(logger.len(), 128);
553 assert!(logger.eviction_count() > 0);
554 }
555
556 #[test]
557 fn ring_buffer_concurrent_reads() {
558 use std::thread;
559
560 let logger = RingBufferLogger::new(100, 256);
561
562 for i in 0..50 {
564 let err = AgentError::config(
565 definitions::CFG_PARSE_FAILED,
566 "op",
567 format!("error {}", i),
568 );
569 logger.log(&err, "192.168.1.1");
570 }
571
572 let mut handles = Vec::new();
574 for _ in 0..8 {
575 let logger = logger.clone();
576 handles.push(thread::spawn(move || {
577 for _ in 0..1000 {
578 let entries = logger.get_recent(10);
579 assert!(!entries.is_empty());
580 }
581 }));
582 }
583
584 for handle in handles {
585 handle.join().expect("thread panicked");
586 }
587 }
588
589 #[test]
590 fn arc_str_cloning_is_cheap() {
591 let logger = RingBufferLogger::new(10, 1024);
592
593 let err = AgentError::config(
594 definitions::CFG_PARSE_FAILED,
595 "operation",
596 "details",
597 );
598 logger.log(&err, "192.168.1.1");
599
600 let entry1 = logger.get_recent(1)[0].clone();
601 let entry2 = logger.get_recent(1)[0].clone();
602
603 assert!(Arc::ptr_eq(&entry1.code, &entry2.code));
605 assert!(Arc::ptr_eq(&entry1.operation, &entry2.operation));
606 assert!(Arc::ptr_eq(&entry1.details, &entry2.details));
607 }
608}