1use parking_lot::RwLock;
18use serde::{Deserialize, Serialize};
19use sha2::{Digest, Sha256};
20use std::collections::VecDeque;
21use std::fs::{File, OpenOptions};
22use std::io::{BufRead, BufReader, BufWriter, Write};
23use std::path::PathBuf;
24use std::sync::atomic::{AtomicU64, Ordering};
25use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
26
27pub const DEFAULT_MAX_LOG_SIZE: u64 = 100 * 1024 * 1024;
33
34pub const MAX_ROTATED_FILES: usize = 10;
36
37pub const DEFAULT_MAX_ENTRIES: usize = 1000;
39
40pub const DEFAULT_RETENTION_SECS: u64 = 24 * 60 * 60;
42
43#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
49#[serde(rename_all = "lowercase")]
50pub enum ActivityType {
51 Query,
52 Write,
53 Delete,
54 Config,
55 Node,
56 Auth,
57 System,
58}
59
60impl std::fmt::Display for ActivityType {
61 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
62 match self {
63 ActivityType::Query => write!(f, "query"),
64 ActivityType::Write => write!(f, "write"),
65 ActivityType::Delete => write!(f, "delete"),
66 ActivityType::Config => write!(f, "config"),
67 ActivityType::Node => write!(f, "node"),
68 ActivityType::Auth => write!(f, "auth"),
69 ActivityType::System => write!(f, "system"),
70 }
71 }
72}
73
74#[derive(Debug, Clone, Serialize, Deserialize)]
76pub struct Activity {
77 pub id: String,
78 #[serde(rename = "type")]
79 pub activity_type: ActivityType,
80 pub description: String,
81 pub timestamp: String,
82 pub duration: Option<u64>,
83 pub user: Option<String>,
84 pub source: Option<String>,
85 pub details: Option<serde_json::Value>,
86}
87
88#[derive(Debug, Clone, Serialize, Deserialize)]
90pub struct PersistedActivity {
91 pub activity: Activity,
93 pub prev_hash: String,
95 pub hash: String,
97}
98
99#[derive(Debug, Clone)]
101struct ActivityRecord {
102 activity: Activity,
103 created_at: Instant,
104}
105
106struct PersistenceState {
108 log_dir: PathBuf,
110 writer: Option<BufWriter<File>>,
112 current_size: u64,
114 max_size: u64,
116 last_hash: String,
118 current_file_num: u64,
120}
121
122pub struct ActivityLogger {
128 activities: RwLock<VecDeque<ActivityRecord>>,
130 next_id: AtomicU64,
132 max_entries: usize,
134 retention_duration: Duration,
136 persistence: RwLock<Option<PersistenceState>>,
138}
139
140impl ActivityLogger {
141 pub fn new() -> Self {
143 Self {
144 activities: RwLock::new(VecDeque::with_capacity(DEFAULT_MAX_ENTRIES)),
145 next_id: AtomicU64::new(1),
146 max_entries: DEFAULT_MAX_ENTRIES,
147 retention_duration: Duration::from_secs(DEFAULT_RETENTION_SECS),
148 persistence: RwLock::new(None),
149 }
150 }
151
152 pub fn with_persistence(log_dir: PathBuf) -> std::io::Result<Self> {
154 Self::with_persistence_and_options(log_dir, DEFAULT_MAX_LOG_SIZE, DEFAULT_MAX_ENTRIES)
155 }
156
157 pub fn with_persistence_and_options(
159 log_dir: PathBuf,
160 max_log_size: u64,
161 max_memory_entries: usize,
162 ) -> std::io::Result<Self> {
163 std::fs::create_dir_all(&log_dir)?;
165
166 let (current_file_num, last_hash, loaded_activities) = Self::load_from_directory(&log_dir)?;
168
169 let next_id = loaded_activities
171 .iter()
172 .filter_map(|a| {
173 a.id.strip_prefix("act-")
174 .and_then(|s| s.parse::<u64>().ok())
175 })
176 .max()
177 .unwrap_or(0)
178 + 1;
179
180 let activities: VecDeque<ActivityRecord> = loaded_activities
182 .into_iter()
183 .map(|activity| ActivityRecord {
184 activity,
185 created_at: Instant::now(), })
187 .collect();
188
189 let log_file_path = log_dir.join(format!("audit_{:08}.jsonl", current_file_num));
191 let file = OpenOptions::new()
192 .create(true)
193 .append(true)
194 .open(&log_file_path)?;
195
196 let current_size = file.metadata()?.len();
197
198 let persistence_state = PersistenceState {
199 log_dir,
200 writer: Some(BufWriter::new(file)),
201 current_size,
202 max_size: max_log_size,
203 last_hash,
204 current_file_num,
205 };
206
207 Ok(Self {
208 activities: RwLock::new(activities),
209 next_id: AtomicU64::new(next_id),
210 max_entries: max_memory_entries,
211 retention_duration: Duration::from_secs(DEFAULT_RETENTION_SECS),
212 persistence: RwLock::new(Some(persistence_state)),
213 })
214 }
215
216 fn load_from_directory(log_dir: &PathBuf) -> std::io::Result<(u64, String, Vec<Activity>)> {
219 let mut log_files: Vec<(u64, PathBuf)> = Vec::new();
220
221 if let Ok(entries) = std::fs::read_dir(log_dir) {
223 for entry in entries.flatten() {
224 let path = entry.path();
225 if let Some(name) = path.file_name().and_then(|n| n.to_str()) {
226 if let Some(num_str) = name
227 .strip_prefix("audit_")
228 .and_then(|s| s.strip_suffix(".jsonl"))
229 {
230 if let Ok(num) = num_str.parse::<u64>() {
231 log_files.push((num, path));
232 }
233 }
234 }
235 }
236 }
237
238 log_files.sort_by_key(|(num, _)| *num);
240
241 let mut activities = Vec::new();
242 let mut last_hash = "genesis".to_string();
243 let mut current_file_num = 0u64;
244 let mut integrity_verified = true;
245
246 for (file_num, path) in &log_files {
248 current_file_num = *file_num;
249
250 let file = match File::open(path) {
251 Ok(f) => f,
252 Err(e) => {
253 tracing::warn!("Failed to open audit log {:?}: {}", path, e);
254 continue;
255 }
256 };
257
258 let reader = BufReader::new(file);
259 for line in reader.lines() {
260 let line = match line {
261 Ok(l) => l,
262 Err(e) => {
263 tracing::warn!("Failed to read line from {:?}: {}", path, e);
264 continue;
265 }
266 };
267
268 if line.trim().is_empty() {
269 continue;
270 }
271
272 match serde_json::from_str::<PersistedActivity>(&line) {
273 Ok(persisted) => {
274 if persisted.prev_hash != last_hash {
276 tracing::error!(
277 "Hash chain integrity violation detected in {:?}: expected prev_hash '{}', got '{}'",
278 path,
279 last_hash,
280 persisted.prev_hash
281 );
282 integrity_verified = false;
283 }
284
285 let computed_hash =
287 Self::compute_hash(&persisted.activity, &persisted.prev_hash);
288 if computed_hash != persisted.hash {
289 tracing::error!(
290 "Record hash mismatch in {:?} for activity '{}': computed '{}', stored '{}'",
291 path,
292 persisted.activity.id,
293 computed_hash,
294 persisted.hash
295 );
296 integrity_verified = false;
297 }
298
299 last_hash = persisted.hash;
300 activities.push(persisted.activity);
301 }
302 Err(e) => {
303 tracing::warn!("Failed to parse audit record from {:?}: {}", path, e);
304 }
305 }
306 }
307 }
308
309 if !integrity_verified {
310 tracing::warn!(
311 "Audit log integrity verification FAILED. Some records may have been tampered with."
312 );
313 } else if !activities.is_empty() {
314 tracing::info!(
315 "Loaded {} audit records from {} files with verified integrity",
316 activities.len(),
317 log_files.len()
318 );
319 }
320
321 if log_files.is_empty() {
323 current_file_num = 0;
324 }
325
326 Ok((current_file_num, last_hash, activities))
327 }
328
329 fn compute_hash(activity: &Activity, prev_hash: &str) -> String {
331 let mut hasher = Sha256::new();
332
333 if let Ok(json) = serde_json::to_string(activity) {
335 hasher.update(json.as_bytes());
336 }
337
338 hasher.update(prev_hash.as_bytes());
340
341 let result = hasher.finalize();
343 hex_encode(&result)
344 }
345
346 fn persist_activity(&self, activity: &Activity) {
348 let mut persistence = self.persistence.write();
349 let state = match persistence.as_mut() {
350 Some(s) => s,
351 None => return, };
353
354 let prev_hash = state.last_hash.clone();
356 let hash = Self::compute_hash(activity, &prev_hash);
357
358 let persisted = PersistedActivity {
359 activity: activity.clone(),
360 prev_hash,
361 hash: hash.clone(),
362 };
363
364 let json_line = match serde_json::to_string(&persisted) {
366 Ok(j) => j,
367 Err(e) => {
368 tracing::error!("Failed to serialize activity: {}", e);
369 return;
370 }
371 };
372
373 let line_size = json_line.len() as u64 + 1; if state.current_size + line_size > state.max_size {
377 if let Err(e) = self.rotate_log_file(state) {
378 tracing::error!("Failed to rotate audit log: {}", e);
379 return;
380 }
381 }
382
383 if let Some(ref mut writer) = state.writer {
385 if let Err(e) = writeln!(writer, "{}", json_line) {
386 tracing::error!("Failed to write audit record: {}", e);
387 return;
388 }
389
390 if let Err(e) = writer.flush() {
392 tracing::error!("Failed to flush audit log: {}", e);
393 return;
394 }
395
396 state.current_size += line_size;
397 state.last_hash = hash;
398 }
399 }
400
401 fn rotate_log_file(&self, state: &mut PersistenceState) -> std::io::Result<()> {
403 if let Some(ref mut writer) = state.writer {
405 writer.flush()?;
406 }
407 state.writer = None;
408
409 state.current_file_num += 1;
411
412 self.cleanup_old_files(state)?;
414
415 let new_path = state
417 .log_dir
418 .join(format!("audit_{:08}.jsonl", state.current_file_num));
419 let file = OpenOptions::new()
420 .create(true)
421 .append(true)
422 .open(&new_path)?;
423
424 tracing::info!("Rotated audit log to file {}", state.current_file_num);
425
426 state.writer = Some(BufWriter::new(file));
427 state.current_size = 0;
428
429 Ok(())
430 }
431
432 fn cleanup_old_files(&self, state: &PersistenceState) -> std::io::Result<()> {
434 let mut log_files: Vec<(u64, PathBuf)> = Vec::new();
435
436 if let Ok(entries) = std::fs::read_dir(&state.log_dir) {
437 for entry in entries.flatten() {
438 let path = entry.path();
439 if let Some(name) = path.file_name().and_then(|n| n.to_str()) {
440 if let Some(num_str) = name
441 .strip_prefix("audit_")
442 .and_then(|s| s.strip_suffix(".jsonl"))
443 {
444 if let Ok(num) = num_str.parse::<u64>() {
445 log_files.push((num, path));
446 }
447 }
448 }
449 }
450 }
451
452 log_files.sort_by_key(|(num, _)| *num);
454
455 while log_files.len() > MAX_ROTATED_FILES {
457 if let Some((num, path)) = log_files.first() {
458 if *num < state.current_file_num {
459 if let Err(e) = std::fs::remove_file(path) {
460 tracing::warn!("Failed to remove old audit log {:?}: {}", path, e);
461 } else {
462 tracing::debug!("Removed old audit log file {:?}", path);
463 }
464 }
465 }
466 log_files.remove(0);
467 }
468
469 Ok(())
470 }
471
472 pub fn verify_integrity(&self) -> Result<usize, String> {
480 let persistence = self.persistence.read();
481 let state = match persistence.as_ref() {
482 Some(s) => s,
483 None => return Err("Persistence not enabled".to_string()),
484 };
485
486 let mut log_files: Vec<(u64, PathBuf)> = Vec::new();
487
488 if let Ok(entries) = std::fs::read_dir(&state.log_dir) {
489 for entry in entries.flatten() {
490 let path = entry.path();
491 if let Some(name) = path.file_name().and_then(|n| n.to_str()) {
492 if let Some(num_str) = name
493 .strip_prefix("audit_")
494 .and_then(|s| s.strip_suffix(".jsonl"))
495 {
496 if let Ok(num) = num_str.parse::<u64>() {
497 log_files.push((num, path));
498 }
499 }
500 }
501 }
502 }
503
504 log_files.sort_by_key(|(num, _)| *num);
505
506 let mut last_hash: Option<String> = None;
507 let mut record_count = 0usize;
508 let mut is_first_record = true;
509
510 for (_file_num, path) in &log_files {
511 let file = File::open(path).map_err(|e| format!("Failed to open {:?}: {}", path, e))?;
512 let reader = BufReader::new(file);
513
514 for (line_num, line) in reader.lines().enumerate() {
515 let line = line.map_err(|e| {
516 format!("Failed to read line {} in {:?}: {}", line_num, path, e)
517 })?;
518
519 if line.trim().is_empty() {
520 continue;
521 }
522
523 let persisted: PersistedActivity = serde_json::from_str(&line).map_err(|e| {
524 format!(
525 "Failed to parse record at line {} in {:?}: {}",
526 line_num, path, e
527 )
528 })?;
529
530 if is_first_record {
533 is_first_record = false;
534 } else if let Some(ref expected) = last_hash {
535 if &persisted.prev_hash != expected {
537 return Err(format!(
538 "Hash chain broken at line {} in {:?}: expected '{}', got '{}'",
539 line_num, path, expected, persisted.prev_hash
540 ));
541 }
542 }
543
544 let computed_hash = Self::compute_hash(&persisted.activity, &persisted.prev_hash);
546 if computed_hash != persisted.hash {
547 return Err(format!(
548 "Record hash mismatch at line {} in {:?}: computed '{}', stored '{}'",
549 line_num, path, computed_hash, persisted.hash
550 ));
551 }
552
553 last_hash = Some(persisted.hash);
554 record_count += 1;
555 }
556 }
557
558 Ok(record_count)
559 }
560
561 pub fn verify_integrity_strict(&self) -> Result<usize, String> {
564 let persistence = self.persistence.read();
565 let state = match persistence.as_ref() {
566 Some(s) => s,
567 None => return Err("Persistence not enabled".to_string()),
568 };
569
570 let mut log_files: Vec<(u64, PathBuf)> = Vec::new();
571
572 if let Ok(entries) = std::fs::read_dir(&state.log_dir) {
573 for entry in entries.flatten() {
574 let path = entry.path();
575 if let Some(name) = path.file_name().and_then(|n| n.to_str()) {
576 if let Some(num_str) = name
577 .strip_prefix("audit_")
578 .and_then(|s| s.strip_suffix(".jsonl"))
579 {
580 if let Ok(num) = num_str.parse::<u64>() {
581 log_files.push((num, path));
582 }
583 }
584 }
585 }
586 }
587
588 log_files.sort_by_key(|(num, _)| *num);
589
590 if !log_files.is_empty() && log_files[0].0 != 0 {
592 return Err(format!(
593 "Strict verification failed: oldest log file is {:08}, expected 00000000. \
594 Some log files have been cleaned up.",
595 log_files[0].0
596 ));
597 }
598
599 let mut last_hash = "genesis".to_string();
600 let mut record_count = 0usize;
601
602 for (_file_num, path) in &log_files {
603 let file = File::open(path).map_err(|e| format!("Failed to open {:?}: {}", path, e))?;
604 let reader = BufReader::new(file);
605
606 for (line_num, line) in reader.lines().enumerate() {
607 let line = line.map_err(|e| {
608 format!("Failed to read line {} in {:?}: {}", line_num, path, e)
609 })?;
610
611 if line.trim().is_empty() {
612 continue;
613 }
614
615 let persisted: PersistedActivity = serde_json::from_str(&line).map_err(|e| {
616 format!(
617 "Failed to parse record at line {} in {:?}: {}",
618 line_num, path, e
619 )
620 })?;
621
622 if persisted.prev_hash != last_hash {
624 return Err(format!(
625 "Hash chain broken at line {} in {:?}: expected '{}', got '{}'",
626 line_num, path, last_hash, persisted.prev_hash
627 ));
628 }
629
630 let computed_hash = Self::compute_hash(&persisted.activity, &persisted.prev_hash);
632 if computed_hash != persisted.hash {
633 return Err(format!(
634 "Record hash mismatch at line {} in {:?}: computed '{}', stored '{}'",
635 line_num, path, computed_hash, persisted.hash
636 ));
637 }
638
639 last_hash = persisted.hash;
640 record_count += 1;
641 }
642 }
643
644 Ok(record_count)
645 }
646
647 pub fn flush(&self) -> std::io::Result<()> {
649 let mut persistence = self.persistence.write();
650 if let Some(ref mut state) = *persistence {
651 if let Some(ref mut writer) = state.writer {
652 writer.flush()?;
653 }
654 }
655 Ok(())
656 }
657
658 pub fn log(&self, activity_type: ActivityType, description: &str) -> String {
660 self.log_with_details(activity_type, description, None, None, None, None)
661 }
662
663 pub fn log_query(&self, sql: &str, duration_ms: u64, user: Option<&str>) {
665 self.log_with_details(
666 ActivityType::Query,
667 sql,
668 Some(duration_ms),
669 user,
670 None,
671 None,
672 );
673 }
674
675 pub fn log_write(&self, description: &str, user: Option<&str>) {
677 self.log_with_details(ActivityType::Write, description, None, user, None, None);
678 }
679
680 pub fn log_config(&self, description: &str, user: Option<&str>) {
682 self.log_with_details(ActivityType::Config, description, None, user, None, None);
683 }
684
685 pub fn log_node(&self, description: &str) {
687 self.log_with_details(
688 ActivityType::Node,
689 description,
690 None,
691 None,
692 Some("cluster"),
693 None,
694 );
695 }
696
697 pub fn log_auth(&self, description: &str, user: Option<&str>) {
699 self.log_with_details(ActivityType::Auth, description, None, user, None, None);
700 }
701
702 pub fn log_system(&self, description: &str) {
704 self.log_with_details(
705 ActivityType::System,
706 description,
707 None,
708 None,
709 Some("system"),
710 None,
711 );
712 }
713
714 pub fn log_with_details(
716 &self,
717 activity_type: ActivityType,
718 description: &str,
719 duration: Option<u64>,
720 user: Option<&str>,
721 source: Option<&str>,
722 details: Option<serde_json::Value>,
723 ) -> String {
724 let id = format!("act-{:08}", self.next_id.fetch_add(1, Ordering::SeqCst));
725 let activity = Activity {
726 id: id.clone(),
727 activity_type,
728 description: description.to_string(),
729 timestamp: format_timestamp(now_timestamp()),
730 duration,
731 user: user.map(|s| s.to_string()),
732 source: source.map(|s| s.to_string()),
733 details,
734 };
735
736 self.persist_activity(&activity);
738
739 let record = ActivityRecord {
740 activity,
741 created_at: Instant::now(),
742 };
743
744 let mut activities = self.activities.write();
745
746 while activities.len() >= self.max_entries {
748 activities.pop_front();
749 }
750
751 activities.push_back(record);
752 id
753 }
754
755 pub fn get_recent(&self, limit: usize) -> Vec<Activity> {
757 self.cleanup_expired();
758
759 let activities = self.activities.read();
760 activities
761 .iter()
762 .rev()
763 .take(limit)
764 .map(|r| r.activity.clone())
765 .collect()
766 }
767
768 pub fn get_by_type(&self, activity_type: ActivityType, limit: usize) -> Vec<Activity> {
770 self.cleanup_expired();
771
772 let activities = self.activities.read();
773 activities
774 .iter()
775 .rev()
776 .filter(|r| r.activity.activity_type == activity_type)
777 .take(limit)
778 .map(|r| r.activity.clone())
779 .collect()
780 }
781
782 pub fn get_by_user(&self, username: &str, limit: usize) -> Vec<Activity> {
784 self.cleanup_expired();
785
786 let activities = self.activities.read();
787 activities
788 .iter()
789 .rev()
790 .filter(|r| r.activity.user.as_deref() == Some(username))
791 .take(limit)
792 .map(|r| r.activity.clone())
793 .collect()
794 }
795
796 pub fn count(&self) -> usize {
798 self.activities.read().len()
799 }
800
801 fn cleanup_expired(&self) {
803 let now = Instant::now();
804 let mut activities = self.activities.write();
805
806 while let Some(front) = activities.front() {
807 if now.duration_since(front.created_at) > self.retention_duration {
808 activities.pop_front();
809 } else {
810 break;
811 }
812 }
813 }
814
815 pub fn clear(&self) {
817 self.activities.write().clear();
818 }
819}
820
821impl Default for ActivityLogger {
822 fn default() -> Self {
823 Self::new()
824 }
825}
826
827fn now_timestamp() -> u64 {
833 SystemTime::now()
834 .duration_since(UNIX_EPOCH)
835 .unwrap_or_default()
836 .as_millis() as u64
837}
838
839fn format_timestamp(timestamp_ms: u64) -> String {
841 let secs = timestamp_ms / 1000;
842 let datetime = UNIX_EPOCH + Duration::from_secs(secs);
843 let duration = datetime.duration_since(UNIX_EPOCH).unwrap_or_default();
844 let total_secs = duration.as_secs();
845
846 let days_since_epoch = total_secs / 86400;
847 let secs_today = total_secs % 86400;
848
849 let hours = secs_today / 3600;
850 let minutes = (secs_today % 3600) / 60;
851 let seconds = secs_today % 60;
852
853 let mut year = 1970u64;
854 let mut remaining_days = days_since_epoch;
855
856 loop {
857 let days_in_year = if is_leap_year(year) { 366 } else { 365 };
858 if remaining_days < days_in_year {
859 break;
860 }
861 remaining_days -= days_in_year;
862 year += 1;
863 }
864
865 let days_in_months: [u64; 12] = if is_leap_year(year) {
866 [31, 29, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31]
867 } else {
868 [31, 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31]
869 };
870
871 let mut month = 1u64;
872 for &days in &days_in_months {
873 if remaining_days < days {
874 break;
875 }
876 remaining_days -= days;
877 month += 1;
878 }
879 let day = remaining_days + 1;
880
881 format!(
882 "{:04}-{:02}-{:02}T{:02}:{:02}:{:02}Z",
883 year, month, day, hours, minutes, seconds
884 )
885}
886
887fn is_leap_year(year: u64) -> bool {
888 (year % 4 == 0 && year % 100 != 0) || (year % 400 == 0)
889}
890
891fn hex_encode(bytes: &[u8]) -> String {
893 const HEX_CHARS: &[u8] = b"0123456789abcdef";
894 let mut result = String::with_capacity(bytes.len() * 2);
895 for &byte in bytes {
896 result.push(HEX_CHARS[(byte >> 4) as usize] as char);
897 result.push(HEX_CHARS[(byte & 0x0f) as usize] as char);
898 }
899 result
900}
901
902#[cfg(test)]
907mod tests {
908 use super::*;
909 use tempfile::TempDir;
910
911 #[test]
912 fn test_log_activity() {
913 let logger = ActivityLogger::new();
914 let id = logger.log(ActivityType::Query, "SELECT * FROM users");
915 assert!(!id.is_empty());
916 assert_eq!(logger.count(), 1);
917 }
918
919 #[test]
920 fn test_get_recent() {
921 let logger = ActivityLogger::new();
922 logger.log(ActivityType::Query, "Query 1");
923 logger.log(ActivityType::Write, "Write 1");
924 logger.log(ActivityType::Query, "Query 2");
925
926 let recent = logger.get_recent(2);
927 assert_eq!(recent.len(), 2);
928 assert_eq!(recent[0].description, "Query 2");
929 assert_eq!(recent[1].description, "Write 1");
930 }
931
932 #[test]
933 fn test_get_by_type() {
934 let logger = ActivityLogger::new();
935 logger.log(ActivityType::Query, "Query 1");
936 logger.log(ActivityType::Write, "Write 1");
937 logger.log(ActivityType::Query, "Query 2");
938
939 let queries = logger.get_by_type(ActivityType::Query, 10);
940 assert_eq!(queries.len(), 2);
941 }
942
943 #[test]
944 fn test_log_query_with_duration() {
945 let logger = ActivityLogger::new();
946 logger.log_query("SELECT * FROM metrics", 42, Some("admin"));
947
948 let recent = logger.get_recent(1);
949 assert_eq!(recent.len(), 1);
950 assert_eq!(recent[0].duration, Some(42));
951 assert_eq!(recent[0].user, Some("admin".to_string()));
952 }
953
954 #[test]
955 fn test_max_entries() {
956 let logger = ActivityLogger::new();
957
958 for i in 0..1100 {
960 logger.log(ActivityType::Query, &format!("Query {}", i));
961 }
962
963 assert!(logger.count() <= 1000);
965 }
966
967 #[test]
968 fn test_persistence_basic() {
969 let temp_dir = TempDir::new().expect("failed to create temp dir");
970 let log_dir = temp_dir.path().to_path_buf();
971
972 {
974 let logger =
975 ActivityLogger::with_persistence(log_dir.clone()).expect("failed to create logger");
976 logger.log(ActivityType::Query, "SELECT * FROM users");
977 logger.log(ActivityType::Write, "INSERT INTO users");
978 logger.log(ActivityType::Auth, "User login");
979 logger.flush().expect("failed to flush");
980 }
981
982 let log_file = log_dir.join("audit_00000000.jsonl");
984 assert!(log_file.exists(), "audit log file should exist");
985
986 let logger2 =
988 ActivityLogger::with_persistence(log_dir).expect("failed to create second logger");
989
990 let recent = logger2.get_recent(10);
991 assert_eq!(recent.len(), 3, "should load 3 activities from disk");
992
993 let count = logger2
995 .verify_integrity()
996 .expect("integrity check should pass");
997 assert_eq!(count, 3, "should have 3 verified records");
998 }
999
1000 #[test]
1001 fn test_persistence_hash_chain() {
1002 let temp_dir = TempDir::new().expect("failed to create temp dir");
1003 let log_dir = temp_dir.path().to_path_buf();
1004
1005 let logger =
1006 ActivityLogger::with_persistence(log_dir.clone()).expect("failed to create logger");
1007
1008 for i in 0..5 {
1010 logger.log(ActivityType::Query, &format!("Query {}", i));
1011 }
1012 logger.flush().expect("failed to flush");
1013
1014 let log_file = log_dir.join("audit_00000000.jsonl");
1016 let content = std::fs::read_to_string(&log_file).expect("failed to read log file");
1017
1018 let mut last_hash = "genesis".to_string();
1019 for line in content.lines() {
1020 if line.trim().is_empty() {
1021 continue;
1022 }
1023 let persisted: PersistedActivity =
1024 serde_json::from_str(line).expect("failed to parse record");
1025
1026 assert_eq!(
1028 persisted.prev_hash, last_hash,
1029 "prev_hash should match last record's hash"
1030 );
1031
1032 let computed = ActivityLogger::compute_hash(&persisted.activity, &persisted.prev_hash);
1034 assert_eq!(
1035 persisted.hash, computed,
1036 "stored hash should match computed hash"
1037 );
1038
1039 last_hash = persisted.hash;
1040 }
1041 }
1042
1043 #[test]
1044 fn test_persistence_recovery_continues_ids() {
1045 let temp_dir = TempDir::new().expect("failed to create temp dir");
1046 let log_dir = temp_dir.path().to_path_buf();
1047
1048 {
1050 let logger =
1051 ActivityLogger::with_persistence(log_dir.clone()).expect("failed to create logger");
1052 logger.log(ActivityType::Query, "Query 1");
1053 logger.log(ActivityType::Query, "Query 2");
1054 logger.flush().expect("failed to flush");
1055 }
1056
1057 let logger2 =
1059 ActivityLogger::with_persistence(log_dir).expect("failed to create second logger");
1060 let id = logger2.log(ActivityType::Query, "Query 3");
1061
1062 assert!(
1064 id.contains("00000003"),
1065 "ID should continue sequence: got {}",
1066 id
1067 );
1068 }
1069
1070 #[test]
1071 fn test_persistence_log_rotation() {
1072 let temp_dir = TempDir::new().expect("failed to create temp dir");
1073 let log_dir = temp_dir.path().to_path_buf();
1074
1075 let logger = ActivityLogger::with_persistence_and_options(
1077 log_dir.clone(),
1078 500, 100,
1080 )
1081 .expect("failed to create logger");
1082
1083 for i in 0..20 {
1085 logger.log(ActivityType::Query, &format!("This is query number {}", i));
1086 }
1087 logger.flush().expect("failed to flush");
1088
1089 let log_files: Vec<_> = std::fs::read_dir(&log_dir)
1091 .expect("failed to read dir")
1092 .filter_map(|e| e.ok())
1093 .filter(|e| {
1094 e.path()
1095 .file_name()
1096 .and_then(|n| n.to_str())
1097 .is_some_and(|n| n.starts_with("audit_") && n.ends_with(".jsonl"))
1098 })
1099 .collect();
1100
1101 assert!(
1102 log_files.len() > 1,
1103 "should have multiple log files after rotation, got {}",
1104 log_files.len()
1105 );
1106
1107 let count = logger
1110 .verify_integrity()
1111 .expect("integrity check should pass");
1112 assert!(count > 0, "should have some verified records");
1114 }
1115
1116 #[test]
1117 fn test_persistence_log_rotation_strict() {
1118 let temp_dir = TempDir::new().expect("failed to create temp dir");
1119 let log_dir = temp_dir.path().to_path_buf();
1120
1121 let logger = ActivityLogger::with_persistence_and_options(
1124 log_dir.clone(),
1125 300, 100,
1127 )
1128 .expect("failed to create logger");
1129
1130 for i in 0..5 {
1132 logger.log(ActivityType::Query, &format!("Query {}", i));
1133 }
1134 logger.flush().expect("failed to flush");
1135
1136 let log_files: Vec<_> = std::fs::read_dir(&log_dir)
1138 .expect("failed to read dir")
1139 .filter_map(|e| e.ok())
1140 .filter(|e| {
1141 e.path()
1142 .file_name()
1143 .and_then(|n| n.to_str())
1144 .is_some_and(|n| n.starts_with("audit_") && n.ends_with(".jsonl"))
1145 })
1146 .collect();
1147
1148 if log_files.iter().any(|e| {
1150 e.path()
1151 .file_name()
1152 .and_then(|n| n.to_str())
1153 .is_some_and(|n| n == "audit_00000000.jsonl")
1154 }) {
1155 let count = logger
1156 .verify_integrity_strict()
1157 .expect("strict integrity check should pass");
1158 assert_eq!(count, 5, "should have 5 verified records");
1159 }
1160 }
1161
1162 #[test]
1163 fn test_hex_encode() {
1164 let data = [0x00, 0x01, 0x0a, 0xff, 0xab];
1165 let hex = hex_encode(&data);
1166 assert_eq!(hex, "00010affab");
1167 }
1168
1169 #[test]
1170 fn test_hash_computation() {
1171 let activity = Activity {
1172 id: "act-00000001".to_string(),
1173 activity_type: ActivityType::Query,
1174 description: "SELECT * FROM users".to_string(),
1175 timestamp: "2025-01-26T12:00:00Z".to_string(),
1176 duration: None,
1177 user: Some("admin".to_string()),
1178 source: None,
1179 details: None,
1180 };
1181
1182 let hash1 = ActivityLogger::compute_hash(&activity, "genesis");
1183 let hash2 = ActivityLogger::compute_hash(&activity, "genesis");
1184
1185 assert_eq!(hash1, hash2);
1187
1188 let hash3 = ActivityLogger::compute_hash(&activity, "different");
1190 assert_ne!(hash1, hash3);
1191 }
1192
1193 #[test]
1194 fn test_integrity_verification_fails_on_tamper() {
1195 let temp_dir = TempDir::new().expect("failed to create temp dir");
1196 let log_dir = temp_dir.path().to_path_buf();
1197
1198 {
1200 let logger =
1201 ActivityLogger::with_persistence(log_dir.clone()).expect("failed to create logger");
1202 logger.log(ActivityType::Query, "Query 1");
1203 logger.log(ActivityType::Query, "Query 2");
1204 logger.log(ActivityType::Query, "Query 3");
1205 logger.flush().expect("failed to flush");
1206 }
1207
1208 let log_file = log_dir.join("audit_00000000.jsonl");
1210 let content = std::fs::read_to_string(&log_file).expect("failed to read");
1211 let tampered = content.replace("Query 2", "TAMPERED");
1212 std::fs::write(&log_file, tampered).expect("failed to write");
1213
1214 let logger2 = ActivityLogger::with_persistence(log_dir).expect("failed to create logger");
1216 let result = logger2.verify_integrity();
1217 assert!(
1218 result.is_err(),
1219 "integrity check should fail after tampering"
1220 );
1221 }
1222}