Skip to main content

aegis_server/
activity.rs

1//! Aegis Activity Logging Module
2//!
3//! Tracks and stores system activity for auditing and monitoring.
4//! Activities are persisted to disk in JSON-lines format with hash chain
5//! integrity verification for tamper detection.
6//!
7//! Key Features:
8//! - In-memory buffer for fast queries
9//! - Append-only JSON-lines file persistence
10//! - Hash chain for tamper detection
11//! - Automatic log rotation at configurable size limit
12//! - Recovery from persisted logs on startup
13//!
14//! @version 0.1.0
15//! @author AutomataNexus Development Team
16
17use parking_lot::RwLock;
18use serde::{Deserialize, Serialize};
19use sha2::{Digest, Sha256};
20use std::collections::VecDeque;
21use std::fs::{File, OpenOptions};
22use std::io::{BufRead, BufReader, BufWriter, Write};
23use std::path::PathBuf;
24use std::sync::atomic::{AtomicU64, Ordering};
25use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
26
27// =============================================================================
28// Constants
29// =============================================================================
30
31/// Default maximum size of a single log file before rotation (100MB).
32pub const DEFAULT_MAX_LOG_SIZE: u64 = 100 * 1024 * 1024;
33
34/// Maximum number of rotated log files to keep.
35pub const MAX_ROTATED_FILES: usize = 10;
36
37/// Default maximum entries to keep in memory.
38pub const DEFAULT_MAX_ENTRIES: usize = 1000;
39
40/// Default retention duration for in-memory entries (24 hours).
41pub const DEFAULT_RETENTION_SECS: u64 = 24 * 60 * 60;
42
43// =============================================================================
44// Activity Types
45// =============================================================================
46
47/// Type of activity.
48#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
49#[serde(rename_all = "lowercase")]
50pub enum ActivityType {
51    Query,
52    Write,
53    Delete,
54    Config,
55    Node,
56    Auth,
57    System,
58}
59
60impl std::fmt::Display for ActivityType {
61    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
62        match self {
63            ActivityType::Query => write!(f, "query"),
64            ActivityType::Write => write!(f, "write"),
65            ActivityType::Delete => write!(f, "delete"),
66            ActivityType::Config => write!(f, "config"),
67            ActivityType::Node => write!(f, "node"),
68            ActivityType::Auth => write!(f, "auth"),
69            ActivityType::System => write!(f, "system"),
70        }
71    }
72}
73
74/// Activity entry.
75#[derive(Debug, Clone, Serialize, Deserialize)]
76pub struct Activity {
77    pub id: String,
78    #[serde(rename = "type")]
79    pub activity_type: ActivityType,
80    pub description: String,
81    pub timestamp: String,
82    pub duration: Option<u64>,
83    pub user: Option<String>,
84    pub source: Option<String>,
85    pub details: Option<serde_json::Value>,
86}
87
88/// Persisted activity record with hash chain for integrity verification.
89#[derive(Debug, Clone, Serialize, Deserialize)]
90pub struct PersistedActivity {
91    /// The activity data
92    pub activity: Activity,
93    /// SHA-256 hash of the previous record (hex encoded), or "genesis" for first record
94    pub prev_hash: String,
95    /// SHA-256 hash of this record (hex encoded)
96    pub hash: String,
97}
98
99/// Internal activity record with instant for TTL.
100#[derive(Debug, Clone)]
101struct ActivityRecord {
102    activity: Activity,
103    created_at: Instant,
104}
105
106/// Persistence state for the activity logger.
107struct PersistenceState {
108    /// Path to the audit log directory
109    log_dir: PathBuf,
110    /// Current log file writer
111    writer: Option<BufWriter<File>>,
112    /// Current log file size
113    current_size: u64,
114    /// Maximum log file size before rotation
115    max_size: u64,
116    /// Hash of the last written record
117    last_hash: String,
118    /// Current log file number
119    current_file_num: u64,
120}
121
122// =============================================================================
123// Activity Logger Service
124// =============================================================================
125
126/// Activity logging service with optional disk persistence.
127pub struct ActivityLogger {
128    /// In-memory activity buffer for fast queries
129    activities: RwLock<VecDeque<ActivityRecord>>,
130    /// Next activity ID counter
131    next_id: AtomicU64,
132    /// Maximum entries to keep in memory
133    max_entries: usize,
134    /// Retention duration for in-memory entries
135    retention_duration: Duration,
136    /// Persistence state (None if persistence is disabled)
137    persistence: RwLock<Option<PersistenceState>>,
138}
139
140impl ActivityLogger {
141    /// Create a new activity logger without persistence.
142    pub fn new() -> Self {
143        Self {
144            activities: RwLock::new(VecDeque::with_capacity(DEFAULT_MAX_ENTRIES)),
145            next_id: AtomicU64::new(1),
146            max_entries: DEFAULT_MAX_ENTRIES,
147            retention_duration: Duration::from_secs(DEFAULT_RETENTION_SECS),
148            persistence: RwLock::new(None),
149        }
150    }
151
152    /// Create a new activity logger with persistence to the specified directory.
153    pub fn with_persistence(log_dir: PathBuf) -> std::io::Result<Self> {
154        Self::with_persistence_and_options(log_dir, DEFAULT_MAX_LOG_SIZE, DEFAULT_MAX_ENTRIES)
155    }
156
157    /// Create a new activity logger with persistence and custom options.
158    pub fn with_persistence_and_options(
159        log_dir: PathBuf,
160        max_log_size: u64,
161        max_memory_entries: usize,
162    ) -> std::io::Result<Self> {
163        // Ensure the log directory exists
164        std::fs::create_dir_all(&log_dir)?;
165
166        // Find existing log files and determine the current file number
167        let (current_file_num, last_hash, loaded_activities) = Self::load_from_directory(&log_dir)?;
168
169        // Determine next activity ID from loaded activities
170        let next_id = loaded_activities
171            .iter()
172            .filter_map(|a| {
173                a.id.strip_prefix("act-")
174                    .and_then(|s| s.parse::<u64>().ok())
175            })
176            .max()
177            .unwrap_or(0)
178            + 1;
179
180        // Convert loaded activities to in-memory records
181        let activities: VecDeque<ActivityRecord> = loaded_activities
182            .into_iter()
183            .map(|activity| ActivityRecord {
184                activity,
185                created_at: Instant::now(), // Use current time for TTL purposes
186            })
187            .collect();
188
189        // Open or create the current log file
190        let log_file_path = log_dir.join(format!("audit_{:08}.jsonl", current_file_num));
191        let file = OpenOptions::new()
192            .create(true)
193            .append(true)
194            .open(&log_file_path)?;
195
196        let current_size = file.metadata()?.len();
197
198        let persistence_state = PersistenceState {
199            log_dir,
200            writer: Some(BufWriter::new(file)),
201            current_size,
202            max_size: max_log_size,
203            last_hash,
204            current_file_num,
205        };
206
207        Ok(Self {
208            activities: RwLock::new(activities),
209            next_id: AtomicU64::new(next_id),
210            max_entries: max_memory_entries,
211            retention_duration: Duration::from_secs(DEFAULT_RETENTION_SECS),
212            persistence: RwLock::new(Some(persistence_state)),
213        })
214    }
215
216    /// Load activities from the log directory.
217    /// Returns (current_file_num, last_hash, activities).
218    fn load_from_directory(log_dir: &PathBuf) -> std::io::Result<(u64, String, Vec<Activity>)> {
219        let mut log_files: Vec<(u64, PathBuf)> = Vec::new();
220
221        // Collect all audit log files
222        if let Ok(entries) = std::fs::read_dir(log_dir) {
223            for entry in entries.flatten() {
224                let path = entry.path();
225                if let Some(name) = path.file_name().and_then(|n| n.to_str()) {
226                    if let Some(num_str) = name
227                        .strip_prefix("audit_")
228                        .and_then(|s| s.strip_suffix(".jsonl"))
229                    {
230                        if let Ok(num) = num_str.parse::<u64>() {
231                            log_files.push((num, path));
232                        }
233                    }
234                }
235            }
236        }
237
238        // Sort by file number (oldest first)
239        log_files.sort_by_key(|(num, _)| *num);
240
241        let mut activities = Vec::new();
242        let mut last_hash = "genesis".to_string();
243        let mut current_file_num = 0u64;
244        let mut integrity_verified = true;
245
246        // Load activities from all files, verifying hash chain
247        for (file_num, path) in &log_files {
248            current_file_num = *file_num;
249
250            let file = match File::open(path) {
251                Ok(f) => f,
252                Err(e) => {
253                    tracing::warn!("Failed to open audit log {:?}: {}", path, e);
254                    continue;
255                }
256            };
257
258            let reader = BufReader::new(file);
259            for line in reader.lines() {
260                let line = match line {
261                    Ok(l) => l,
262                    Err(e) => {
263                        tracing::warn!("Failed to read line from {:?}: {}", path, e);
264                        continue;
265                    }
266                };
267
268                if line.trim().is_empty() {
269                    continue;
270                }
271
272                match serde_json::from_str::<PersistedActivity>(&line) {
273                    Ok(persisted) => {
274                        // Verify hash chain integrity
275                        if persisted.prev_hash != last_hash {
276                            tracing::error!(
277                                "Hash chain integrity violation detected in {:?}: expected prev_hash '{}', got '{}'",
278                                path,
279                                last_hash,
280                                persisted.prev_hash
281                            );
282                            integrity_verified = false;
283                        }
284
285                        // Verify record hash
286                        let computed_hash =
287                            Self::compute_hash(&persisted.activity, &persisted.prev_hash);
288                        if computed_hash != persisted.hash {
289                            tracing::error!(
290                                "Record hash mismatch in {:?} for activity '{}': computed '{}', stored '{}'",
291                                path,
292                                persisted.activity.id,
293                                computed_hash,
294                                persisted.hash
295                            );
296                            integrity_verified = false;
297                        }
298
299                        last_hash = persisted.hash;
300                        activities.push(persisted.activity);
301                    }
302                    Err(e) => {
303                        tracing::warn!("Failed to parse audit record from {:?}: {}", path, e);
304                    }
305                }
306            }
307        }
308
309        if !integrity_verified {
310            tracing::warn!(
311                "Audit log integrity verification FAILED. Some records may have been tampered with."
312            );
313        } else if !activities.is_empty() {
314            tracing::info!(
315                "Loaded {} audit records from {} files with verified integrity",
316                activities.len(),
317                log_files.len()
318            );
319        }
320
321        // If no files exist, start with file 0
322        if log_files.is_empty() {
323            current_file_num = 0;
324        }
325
326        Ok((current_file_num, last_hash, activities))
327    }
328
329    /// Compute the SHA-256 hash of an activity record.
330    fn compute_hash(activity: &Activity, prev_hash: &str) -> String {
331        let mut hasher = Sha256::new();
332
333        // Hash the activity JSON
334        if let Ok(json) = serde_json::to_string(activity) {
335            hasher.update(json.as_bytes());
336        }
337
338        // Include previous hash in the chain
339        hasher.update(prev_hash.as_bytes());
340
341        // Return hex-encoded hash
342        let result = hasher.finalize();
343        hex_encode(&result)
344    }
345
346    /// Persist an activity to disk.
347    fn persist_activity(&self, activity: &Activity) {
348        let mut persistence = self.persistence.write();
349        let state = match persistence.as_mut() {
350            Some(s) => s,
351            None => return, // Persistence not enabled
352        };
353
354        // Compute hash chain
355        let prev_hash = state.last_hash.clone();
356        let hash = Self::compute_hash(activity, &prev_hash);
357
358        let persisted = PersistedActivity {
359            activity: activity.clone(),
360            prev_hash,
361            hash: hash.clone(),
362        };
363
364        // Serialize to JSON line
365        let json_line = match serde_json::to_string(&persisted) {
366            Ok(j) => j,
367            Err(e) => {
368                tracing::error!("Failed to serialize activity: {}", e);
369                return;
370            }
371        };
372
373        let line_size = json_line.len() as u64 + 1; // +1 for newline
374
375        // Check if we need to rotate
376        if state.current_size + line_size > state.max_size {
377            if let Err(e) = self.rotate_log_file(state) {
378                tracing::error!("Failed to rotate audit log: {}", e);
379                return;
380            }
381        }
382
383        // Write to the log file
384        if let Some(ref mut writer) = state.writer {
385            if let Err(e) = writeln!(writer, "{}", json_line) {
386                tracing::error!("Failed to write audit record: {}", e);
387                return;
388            }
389
390            // Flush to ensure durability
391            if let Err(e) = writer.flush() {
392                tracing::error!("Failed to flush audit log: {}", e);
393                return;
394            }
395
396            state.current_size += line_size;
397            state.last_hash = hash;
398        }
399    }
400
401    /// Rotate to a new log file.
402    fn rotate_log_file(&self, state: &mut PersistenceState) -> std::io::Result<()> {
403        // Flush and close the current file
404        if let Some(ref mut writer) = state.writer {
405            writer.flush()?;
406        }
407        state.writer = None;
408
409        // Increment file number
410        state.current_file_num += 1;
411
412        // Clean up old files if we have too many
413        self.cleanup_old_files(state)?;
414
415        // Open new file
416        let new_path = state
417            .log_dir
418            .join(format!("audit_{:08}.jsonl", state.current_file_num));
419        let file = OpenOptions::new()
420            .create(true)
421            .append(true)
422            .open(&new_path)?;
423
424        tracing::info!("Rotated audit log to file {}", state.current_file_num);
425
426        state.writer = Some(BufWriter::new(file));
427        state.current_size = 0;
428
429        Ok(())
430    }
431
432    /// Clean up old log files, keeping only the most recent ones.
433    fn cleanup_old_files(&self, state: &PersistenceState) -> std::io::Result<()> {
434        let mut log_files: Vec<(u64, PathBuf)> = Vec::new();
435
436        if let Ok(entries) = std::fs::read_dir(&state.log_dir) {
437            for entry in entries.flatten() {
438                let path = entry.path();
439                if let Some(name) = path.file_name().and_then(|n| n.to_str()) {
440                    if let Some(num_str) = name
441                        .strip_prefix("audit_")
442                        .and_then(|s| s.strip_suffix(".jsonl"))
443                    {
444                        if let Ok(num) = num_str.parse::<u64>() {
445                            log_files.push((num, path));
446                        }
447                    }
448                }
449            }
450        }
451
452        // Sort oldest first
453        log_files.sort_by_key(|(num, _)| *num);
454
455        // Remove old files if we have more than the limit
456        while log_files.len() > MAX_ROTATED_FILES {
457            if let Some((num, path)) = log_files.first() {
458                if *num < state.current_file_num {
459                    if let Err(e) = std::fs::remove_file(path) {
460                        tracing::warn!("Failed to remove old audit log {:?}: {}", path, e);
461                    } else {
462                        tracing::debug!("Removed old audit log file {:?}", path);
463                    }
464                }
465            }
466            log_files.remove(0);
467        }
468
469        Ok(())
470    }
471
472    /// Verify the integrity of all persisted audit logs.
473    /// Returns Ok(record_count) if integrity is verified, Err with details otherwise.
474    ///
475    /// Note: If old log files have been cleaned up due to rotation limits,
476    /// the first remaining file's first record will have a prev_hash that
477    /// references the (now deleted) previous file. In this case, we accept
478    /// the first record's prev_hash as valid and continue verification from there.
479    pub fn verify_integrity(&self) -> Result<usize, String> {
480        let persistence = self.persistence.read();
481        let state = match persistence.as_ref() {
482            Some(s) => s,
483            None => return Err("Persistence not enabled".to_string()),
484        };
485
486        let mut log_files: Vec<(u64, PathBuf)> = Vec::new();
487
488        if let Ok(entries) = std::fs::read_dir(&state.log_dir) {
489            for entry in entries.flatten() {
490                let path = entry.path();
491                if let Some(name) = path.file_name().and_then(|n| n.to_str()) {
492                    if let Some(num_str) = name
493                        .strip_prefix("audit_")
494                        .and_then(|s| s.strip_suffix(".jsonl"))
495                    {
496                        if let Ok(num) = num_str.parse::<u64>() {
497                            log_files.push((num, path));
498                        }
499                    }
500                }
501            }
502        }
503
504        log_files.sort_by_key(|(num, _)| *num);
505
506        let mut last_hash: Option<String> = None;
507        let mut record_count = 0usize;
508        let mut is_first_record = true;
509
510        for (_file_num, path) in &log_files {
511            let file = File::open(path).map_err(|e| format!("Failed to open {:?}: {}", path, e))?;
512            let reader = BufReader::new(file);
513
514            for (line_num, line) in reader.lines().enumerate() {
515                let line = line.map_err(|e| {
516                    format!("Failed to read line {} in {:?}: {}", line_num, path, e)
517                })?;
518
519                if line.trim().is_empty() {
520                    continue;
521                }
522
523                let persisted: PersistedActivity = serde_json::from_str(&line).map_err(|e| {
524                    format!(
525                        "Failed to parse record at line {} in {:?}: {}",
526                        line_num, path, e
527                    )
528                })?;
529
530                // For the first record, accept its prev_hash as the starting point
531                // (older files may have been cleaned up)
532                if is_first_record {
533                    is_first_record = false;
534                } else if let Some(ref expected) = last_hash {
535                    // Verify hash chain linkage for subsequent records
536                    if &persisted.prev_hash != expected {
537                        return Err(format!(
538                            "Hash chain broken at line {} in {:?}: expected '{}', got '{}'",
539                            line_num, path, expected, persisted.prev_hash
540                        ));
541                    }
542                }
543
544                // Always verify the record's own hash is computed correctly
545                let computed_hash = Self::compute_hash(&persisted.activity, &persisted.prev_hash);
546                if computed_hash != persisted.hash {
547                    return Err(format!(
548                        "Record hash mismatch at line {} in {:?}: computed '{}', stored '{}'",
549                        line_num, path, computed_hash, persisted.hash
550                    ));
551                }
552
553                last_hash = Some(persisted.hash);
554                record_count += 1;
555            }
556        }
557
558        Ok(record_count)
559    }
560
561    /// Verify integrity with strict mode - requires chain to start from "genesis".
562    /// Use this for complete audit verification when all log files are present.
563    pub fn verify_integrity_strict(&self) -> Result<usize, String> {
564        let persistence = self.persistence.read();
565        let state = match persistence.as_ref() {
566            Some(s) => s,
567            None => return Err("Persistence not enabled".to_string()),
568        };
569
570        let mut log_files: Vec<(u64, PathBuf)> = Vec::new();
571
572        if let Ok(entries) = std::fs::read_dir(&state.log_dir) {
573            for entry in entries.flatten() {
574                let path = entry.path();
575                if let Some(name) = path.file_name().and_then(|n| n.to_str()) {
576                    if let Some(num_str) = name
577                        .strip_prefix("audit_")
578                        .and_then(|s| s.strip_suffix(".jsonl"))
579                    {
580                        if let Ok(num) = num_str.parse::<u64>() {
581                            log_files.push((num, path));
582                        }
583                    }
584                }
585            }
586        }
587
588        log_files.sort_by_key(|(num, _)| *num);
589
590        // Check if the first file is file 0 (no files have been cleaned up)
591        if !log_files.is_empty() && log_files[0].0 != 0 {
592            return Err(format!(
593                "Strict verification failed: oldest log file is {:08}, expected 00000000. \
594                 Some log files have been cleaned up.",
595                log_files[0].0
596            ));
597        }
598
599        let mut last_hash = "genesis".to_string();
600        let mut record_count = 0usize;
601
602        for (_file_num, path) in &log_files {
603            let file = File::open(path).map_err(|e| format!("Failed to open {:?}: {}", path, e))?;
604            let reader = BufReader::new(file);
605
606            for (line_num, line) in reader.lines().enumerate() {
607                let line = line.map_err(|e| {
608                    format!("Failed to read line {} in {:?}: {}", line_num, path, e)
609                })?;
610
611                if line.trim().is_empty() {
612                    continue;
613                }
614
615                let persisted: PersistedActivity = serde_json::from_str(&line).map_err(|e| {
616                    format!(
617                        "Failed to parse record at line {} in {:?}: {}",
618                        line_num, path, e
619                    )
620                })?;
621
622                // Verify hash chain
623                if persisted.prev_hash != last_hash {
624                    return Err(format!(
625                        "Hash chain broken at line {} in {:?}: expected '{}', got '{}'",
626                        line_num, path, last_hash, persisted.prev_hash
627                    ));
628                }
629
630                // Verify record hash
631                let computed_hash = Self::compute_hash(&persisted.activity, &persisted.prev_hash);
632                if computed_hash != persisted.hash {
633                    return Err(format!(
634                        "Record hash mismatch at line {} in {:?}: computed '{}', stored '{}'",
635                        line_num, path, computed_hash, persisted.hash
636                    ));
637                }
638
639                last_hash = persisted.hash;
640                record_count += 1;
641            }
642        }
643
644        Ok(record_count)
645    }
646
647    /// Force flush any buffered data to disk.
648    pub fn flush(&self) -> std::io::Result<()> {
649        let mut persistence = self.persistence.write();
650        if let Some(ref mut state) = *persistence {
651            if let Some(ref mut writer) = state.writer {
652                writer.flush()?;
653            }
654        }
655        Ok(())
656    }
657
658    /// Log a new activity.
659    pub fn log(&self, activity_type: ActivityType, description: &str) -> String {
660        self.log_with_details(activity_type, description, None, None, None, None)
661    }
662
663    /// Log a query activity with duration.
664    pub fn log_query(&self, sql: &str, duration_ms: u64, user: Option<&str>) {
665        self.log_with_details(
666            ActivityType::Query,
667            sql,
668            Some(duration_ms),
669            user,
670            None,
671            None,
672        );
673    }
674
675    /// Log a write activity.
676    pub fn log_write(&self, description: &str, user: Option<&str>) {
677        self.log_with_details(ActivityType::Write, description, None, user, None, None);
678    }
679
680    /// Log a configuration change.
681    pub fn log_config(&self, description: &str, user: Option<&str>) {
682        self.log_with_details(ActivityType::Config, description, None, user, None, None);
683    }
684
685    /// Log a node event.
686    pub fn log_node(&self, description: &str) {
687        self.log_with_details(
688            ActivityType::Node,
689            description,
690            None,
691            None,
692            Some("cluster"),
693            None,
694        );
695    }
696
697    /// Log an authentication event.
698    pub fn log_auth(&self, description: &str, user: Option<&str>) {
699        self.log_with_details(ActivityType::Auth, description, None, user, None, None);
700    }
701
702    /// Log a system event.
703    pub fn log_system(&self, description: &str) {
704        self.log_with_details(
705            ActivityType::System,
706            description,
707            None,
708            None,
709            Some("system"),
710            None,
711        );
712    }
713
714    /// Log an activity with full details.
715    pub fn log_with_details(
716        &self,
717        activity_type: ActivityType,
718        description: &str,
719        duration: Option<u64>,
720        user: Option<&str>,
721        source: Option<&str>,
722        details: Option<serde_json::Value>,
723    ) -> String {
724        let id = format!("act-{:08}", self.next_id.fetch_add(1, Ordering::SeqCst));
725        let activity = Activity {
726            id: id.clone(),
727            activity_type,
728            description: description.to_string(),
729            timestamp: format_timestamp(now_timestamp()),
730            duration,
731            user: user.map(|s| s.to_string()),
732            source: source.map(|s| s.to_string()),
733            details,
734        };
735
736        // Persist to disk first (for durability)
737        self.persist_activity(&activity);
738
739        let record = ActivityRecord {
740            activity,
741            created_at: Instant::now(),
742        };
743
744        let mut activities = self.activities.write();
745
746        // Remove old entries if at capacity
747        while activities.len() >= self.max_entries {
748            activities.pop_front();
749        }
750
751        activities.push_back(record);
752        id
753    }
754
755    /// Get recent activities.
756    pub fn get_recent(&self, limit: usize) -> Vec<Activity> {
757        self.cleanup_expired();
758
759        let activities = self.activities.read();
760        activities
761            .iter()
762            .rev()
763            .take(limit)
764            .map(|r| r.activity.clone())
765            .collect()
766    }
767
768    /// Get activities by type.
769    pub fn get_by_type(&self, activity_type: ActivityType, limit: usize) -> Vec<Activity> {
770        self.cleanup_expired();
771
772        let activities = self.activities.read();
773        activities
774            .iter()
775            .rev()
776            .filter(|r| r.activity.activity_type == activity_type)
777            .take(limit)
778            .map(|r| r.activity.clone())
779            .collect()
780    }
781
782    /// Get activities by user.
783    pub fn get_by_user(&self, username: &str, limit: usize) -> Vec<Activity> {
784        self.cleanup_expired();
785
786        let activities = self.activities.read();
787        activities
788            .iter()
789            .rev()
790            .filter(|r| r.activity.user.as_deref() == Some(username))
791            .take(limit)
792            .map(|r| r.activity.clone())
793            .collect()
794    }
795
796    /// Get activity count.
797    pub fn count(&self) -> usize {
798        self.activities.read().len()
799    }
800
801    /// Clean up expired activities.
802    fn cleanup_expired(&self) {
803        let now = Instant::now();
804        let mut activities = self.activities.write();
805
806        while let Some(front) = activities.front() {
807            if now.duration_since(front.created_at) > self.retention_duration {
808                activities.pop_front();
809            } else {
810                break;
811            }
812        }
813    }
814
815    /// Clear all activities.
816    pub fn clear(&self) {
817        self.activities.write().clear();
818    }
819}
820
821impl Default for ActivityLogger {
822    fn default() -> Self {
823        Self::new()
824    }
825}
826
827// =============================================================================
828// Helper Functions
829// =============================================================================
830
831/// Get current timestamp in milliseconds.
832fn now_timestamp() -> u64 {
833    SystemTime::now()
834        .duration_since(UNIX_EPOCH)
835        .unwrap_or_default()
836        .as_millis() as u64
837}
838
839/// Format a timestamp to RFC3339 string.
840fn format_timestamp(timestamp_ms: u64) -> String {
841    let secs = timestamp_ms / 1000;
842    let datetime = UNIX_EPOCH + Duration::from_secs(secs);
843    let duration = datetime.duration_since(UNIX_EPOCH).unwrap_or_default();
844    let total_secs = duration.as_secs();
845
846    let days_since_epoch = total_secs / 86400;
847    let secs_today = total_secs % 86400;
848
849    let hours = secs_today / 3600;
850    let minutes = (secs_today % 3600) / 60;
851    let seconds = secs_today % 60;
852
853    let mut year = 1970u64;
854    let mut remaining_days = days_since_epoch;
855
856    loop {
857        let days_in_year = if is_leap_year(year) { 366 } else { 365 };
858        if remaining_days < days_in_year {
859            break;
860        }
861        remaining_days -= days_in_year;
862        year += 1;
863    }
864
865    let days_in_months: [u64; 12] = if is_leap_year(year) {
866        [31, 29, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31]
867    } else {
868        [31, 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31]
869    };
870
871    let mut month = 1u64;
872    for &days in &days_in_months {
873        if remaining_days < days {
874            break;
875        }
876        remaining_days -= days;
877        month += 1;
878    }
879    let day = remaining_days + 1;
880
881    format!(
882        "{:04}-{:02}-{:02}T{:02}:{:02}:{:02}Z",
883        year, month, day, hours, minutes, seconds
884    )
885}
886
887fn is_leap_year(year: u64) -> bool {
888    (year % 4 == 0 && year % 100 != 0) || (year % 400 == 0)
889}
890
891/// Encode bytes as lowercase hex string.
892fn hex_encode(bytes: &[u8]) -> String {
893    const HEX_CHARS: &[u8] = b"0123456789abcdef";
894    let mut result = String::with_capacity(bytes.len() * 2);
895    for &byte in bytes {
896        result.push(HEX_CHARS[(byte >> 4) as usize] as char);
897        result.push(HEX_CHARS[(byte & 0x0f) as usize] as char);
898    }
899    result
900}
901
902// =============================================================================
903// Tests
904// =============================================================================
905
906#[cfg(test)]
907mod tests {
908    use super::*;
909    use tempfile::TempDir;
910
911    #[test]
912    fn test_log_activity() {
913        let logger = ActivityLogger::new();
914        let id = logger.log(ActivityType::Query, "SELECT * FROM users");
915        assert!(!id.is_empty());
916        assert_eq!(logger.count(), 1);
917    }
918
919    #[test]
920    fn test_get_recent() {
921        let logger = ActivityLogger::new();
922        logger.log(ActivityType::Query, "Query 1");
923        logger.log(ActivityType::Write, "Write 1");
924        logger.log(ActivityType::Query, "Query 2");
925
926        let recent = logger.get_recent(2);
927        assert_eq!(recent.len(), 2);
928        assert_eq!(recent[0].description, "Query 2");
929        assert_eq!(recent[1].description, "Write 1");
930    }
931
932    #[test]
933    fn test_get_by_type() {
934        let logger = ActivityLogger::new();
935        logger.log(ActivityType::Query, "Query 1");
936        logger.log(ActivityType::Write, "Write 1");
937        logger.log(ActivityType::Query, "Query 2");
938
939        let queries = logger.get_by_type(ActivityType::Query, 10);
940        assert_eq!(queries.len(), 2);
941    }
942
943    #[test]
944    fn test_log_query_with_duration() {
945        let logger = ActivityLogger::new();
946        logger.log_query("SELECT * FROM metrics", 42, Some("admin"));
947
948        let recent = logger.get_recent(1);
949        assert_eq!(recent.len(), 1);
950        assert_eq!(recent[0].duration, Some(42));
951        assert_eq!(recent[0].user, Some("admin".to_string()));
952    }
953
954    #[test]
955    fn test_max_entries() {
956        let logger = ActivityLogger::new();
957
958        // Log more than max entries
959        for i in 0..1100 {
960            logger.log(ActivityType::Query, &format!("Query {}", i));
961        }
962
963        // Should be capped at max
964        assert!(logger.count() <= 1000);
965    }
966
967    #[test]
968    fn test_persistence_basic() {
969        let temp_dir = TempDir::new().expect("failed to create temp dir");
970        let log_dir = temp_dir.path().to_path_buf();
971
972        // Create logger and log some activities
973        {
974            let logger =
975                ActivityLogger::with_persistence(log_dir.clone()).expect("failed to create logger");
976            logger.log(ActivityType::Query, "SELECT * FROM users");
977            logger.log(ActivityType::Write, "INSERT INTO users");
978            logger.log(ActivityType::Auth, "User login");
979            logger.flush().expect("failed to flush");
980        }
981
982        // Verify the log file was created
983        let log_file = log_dir.join("audit_00000000.jsonl");
984        assert!(log_file.exists(), "audit log file should exist");
985
986        // Create a new logger and verify it loads the activities
987        let logger2 =
988            ActivityLogger::with_persistence(log_dir).expect("failed to create second logger");
989
990        let recent = logger2.get_recent(10);
991        assert_eq!(recent.len(), 3, "should load 3 activities from disk");
992
993        // Verify integrity
994        let count = logger2
995            .verify_integrity()
996            .expect("integrity check should pass");
997        assert_eq!(count, 3, "should have 3 verified records");
998    }
999
1000    #[test]
1001    fn test_persistence_hash_chain() {
1002        let temp_dir = TempDir::new().expect("failed to create temp dir");
1003        let log_dir = temp_dir.path().to_path_buf();
1004
1005        let logger =
1006            ActivityLogger::with_persistence(log_dir.clone()).expect("failed to create logger");
1007
1008        // Log multiple activities
1009        for i in 0..5 {
1010            logger.log(ActivityType::Query, &format!("Query {}", i));
1011        }
1012        logger.flush().expect("failed to flush");
1013
1014        // Read the log file and verify hash chain
1015        let log_file = log_dir.join("audit_00000000.jsonl");
1016        let content = std::fs::read_to_string(&log_file).expect("failed to read log file");
1017
1018        let mut last_hash = "genesis".to_string();
1019        for line in content.lines() {
1020            if line.trim().is_empty() {
1021                continue;
1022            }
1023            let persisted: PersistedActivity =
1024                serde_json::from_str(line).expect("failed to parse record");
1025
1026            // Verify chain linkage
1027            assert_eq!(
1028                persisted.prev_hash, last_hash,
1029                "prev_hash should match last record's hash"
1030            );
1031
1032            // Verify hash computation
1033            let computed = ActivityLogger::compute_hash(&persisted.activity, &persisted.prev_hash);
1034            assert_eq!(
1035                persisted.hash, computed,
1036                "stored hash should match computed hash"
1037            );
1038
1039            last_hash = persisted.hash;
1040        }
1041    }
1042
1043    #[test]
1044    fn test_persistence_recovery_continues_ids() {
1045        let temp_dir = TempDir::new().expect("failed to create temp dir");
1046        let log_dir = temp_dir.path().to_path_buf();
1047
1048        // Create logger and log some activities
1049        {
1050            let logger =
1051                ActivityLogger::with_persistence(log_dir.clone()).expect("failed to create logger");
1052            logger.log(ActivityType::Query, "Query 1");
1053            logger.log(ActivityType::Query, "Query 2");
1054            logger.flush().expect("failed to flush");
1055        }
1056
1057        // Create a new logger and log more activities
1058        let logger2 =
1059            ActivityLogger::with_persistence(log_dir).expect("failed to create second logger");
1060        let id = logger2.log(ActivityType::Query, "Query 3");
1061
1062        // The new ID should continue from where we left off
1063        assert!(
1064            id.contains("00000003"),
1065            "ID should continue sequence: got {}",
1066            id
1067        );
1068    }
1069
1070    #[test]
1071    fn test_persistence_log_rotation() {
1072        let temp_dir = TempDir::new().expect("failed to create temp dir");
1073        let log_dir = temp_dir.path().to_path_buf();
1074
1075        // Create logger with very small max size to force rotation
1076        let logger = ActivityLogger::with_persistence_and_options(
1077            log_dir.clone(),
1078            500, // 500 bytes max
1079            100,
1080        )
1081        .expect("failed to create logger");
1082
1083        // Log enough activities to trigger rotation
1084        for i in 0..20 {
1085            logger.log(ActivityType::Query, &format!("This is query number {}", i));
1086        }
1087        logger.flush().expect("failed to flush");
1088
1089        // Count log files
1090        let log_files: Vec<_> = std::fs::read_dir(&log_dir)
1091            .expect("failed to read dir")
1092            .filter_map(|e| e.ok())
1093            .filter(|e| {
1094                e.path()
1095                    .file_name()
1096                    .and_then(|n| n.to_str())
1097                    .is_some_and(|n| n.starts_with("audit_") && n.ends_with(".jsonl"))
1098            })
1099            .collect();
1100
1101        assert!(
1102            log_files.len() > 1,
1103            "should have multiple log files after rotation, got {}",
1104            log_files.len()
1105        );
1106
1107        // Verify integrity across all remaining files
1108        // Note: some old files may have been cleaned up, so we use non-strict verification
1109        let count = logger
1110            .verify_integrity()
1111            .expect("integrity check should pass");
1112        // Count may be less than 20 if old files were cleaned up
1113        assert!(count > 0, "should have some verified records");
1114    }
1115
1116    #[test]
1117    fn test_persistence_log_rotation_strict() {
1118        let temp_dir = TempDir::new().expect("failed to create temp dir");
1119        let log_dir = temp_dir.path().to_path_buf();
1120
1121        // Create logger with small max size but we'll log fewer entries
1122        // so that cleanup doesn't remove files
1123        let logger = ActivityLogger::with_persistence_and_options(
1124            log_dir.clone(),
1125            300, // 300 bytes max
1126            100,
1127        )
1128        .expect("failed to create logger");
1129
1130        // Log just a few activities to get multiple files without hitting cleanup
1131        for i in 0..5 {
1132            logger.log(ActivityType::Query, &format!("Query {}", i));
1133        }
1134        logger.flush().expect("failed to flush");
1135
1136        // Count log files
1137        let log_files: Vec<_> = std::fs::read_dir(&log_dir)
1138            .expect("failed to read dir")
1139            .filter_map(|e| e.ok())
1140            .filter(|e| {
1141                e.path()
1142                    .file_name()
1143                    .and_then(|n| n.to_str())
1144                    .is_some_and(|n| n.starts_with("audit_") && n.ends_with(".jsonl"))
1145            })
1146            .collect();
1147
1148        // If we have all files starting from 0, strict verification should pass
1149        if log_files.iter().any(|e| {
1150            e.path()
1151                .file_name()
1152                .and_then(|n| n.to_str())
1153                .is_some_and(|n| n == "audit_00000000.jsonl")
1154        }) {
1155            let count = logger
1156                .verify_integrity_strict()
1157                .expect("strict integrity check should pass");
1158            assert_eq!(count, 5, "should have 5 verified records");
1159        }
1160    }
1161
1162    #[test]
1163    fn test_hex_encode() {
1164        let data = [0x00, 0x01, 0x0a, 0xff, 0xab];
1165        let hex = hex_encode(&data);
1166        assert_eq!(hex, "00010affab");
1167    }
1168
1169    #[test]
1170    fn test_hash_computation() {
1171        let activity = Activity {
1172            id: "act-00000001".to_string(),
1173            activity_type: ActivityType::Query,
1174            description: "SELECT * FROM users".to_string(),
1175            timestamp: "2025-01-26T12:00:00Z".to_string(),
1176            duration: None,
1177            user: Some("admin".to_string()),
1178            source: None,
1179            details: None,
1180        };
1181
1182        let hash1 = ActivityLogger::compute_hash(&activity, "genesis");
1183        let hash2 = ActivityLogger::compute_hash(&activity, "genesis");
1184
1185        // Same input should produce same hash
1186        assert_eq!(hash1, hash2);
1187
1188        // Different prev_hash should produce different hash
1189        let hash3 = ActivityLogger::compute_hash(&activity, "different");
1190        assert_ne!(hash1, hash3);
1191    }
1192
1193    #[test]
1194    fn test_integrity_verification_fails_on_tamper() {
1195        let temp_dir = TempDir::new().expect("failed to create temp dir");
1196        let log_dir = temp_dir.path().to_path_buf();
1197
1198        // Create and populate the log
1199        {
1200            let logger =
1201                ActivityLogger::with_persistence(log_dir.clone()).expect("failed to create logger");
1202            logger.log(ActivityType::Query, "Query 1");
1203            logger.log(ActivityType::Query, "Query 2");
1204            logger.log(ActivityType::Query, "Query 3");
1205            logger.flush().expect("failed to flush");
1206        }
1207
1208        // Tamper with the log file
1209        let log_file = log_dir.join("audit_00000000.jsonl");
1210        let content = std::fs::read_to_string(&log_file).expect("failed to read");
1211        let tampered = content.replace("Query 2", "TAMPERED");
1212        std::fs::write(&log_file, tampered).expect("failed to write");
1213
1214        // Verify integrity should fail
1215        let logger2 = ActivityLogger::with_persistence(log_dir).expect("failed to create logger");
1216        let result = logger2.verify_integrity();
1217        assert!(
1218            result.is_err(),
1219            "integrity check should fail after tampering"
1220        );
1221    }
1222}