zilliz 1.4.2

TUI and CLI tool for managing Zilliz Cloud clusters and Milvus operations
Documentation
use std::fs::{self, File, OpenOptions};
use std::io::{BufRead, BufReader, Write};
use std::path::PathBuf;

use chrono::{DateTime, Months, Utc};
use fs2::FileExt;
use serde::{Deserialize, Serialize};

use super::manager::ConfigManager;

const MAX_RECORDS: usize = 10_000;
const RETENTION_MONTHS: u32 = 3;

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct HistoryRecord {
    pub command: String,
    pub user: String,
    pub timestamp: String,
    pub command_type: String,
    pub resource: Option<String>,
    pub operation: Option<String>,
    pub success: bool,
    pub error_message: Option<String>,
}

/// Resolve the history file path for the current user.
/// Returns `{config_dir}/history/{user_id}.jsonl`, falling back to
/// `{config_dir}/history/anonymous.jsonl` when no user is logged in.
pub fn resolve_history_path(config_mgr: &ConfigManager) -> PathBuf {
    let history_dir = config_mgr.config_dir.join("history");
    if !history_dir.exists() {
        let _ = fs::create_dir_all(&history_dir);
        // Restrict directory to owner-only (0o700) on Unix
        #[cfg(unix)]
        {
            use std::os::unix::fs::PermissionsExt;
            let _ = fs::set_permissions(&history_dir, fs::Permissions::from_mode(0o700));
        }
    }

    let filename = match config_mgr.get_user_info() {
        Some(info) if !info.user_id.is_empty() => {
            // Percent-encode user_id to produce a collision-free, filesystem-safe filename.
            // Allow alphanumeric, dash, underscore; encode everything else as %XX.
            let safe_id: String = info
                .user_id
                .bytes()
                .map(|b| {
                    let c = b as char;
                    if c.is_ascii_alphanumeric() || c == '-' || c == '_' {
                        c.to_string()
                    } else {
                        format!("%{:02X}", b)
                    }
                })
                .collect();
            format!("{}.jsonl", safe_id)
        }
        _ => "anonymous.jsonl".to_string(),
    };

    history_dir.join(filename)
}

/// Append a single history record as a JSONL line.
/// Uses file-level advisory locking. Silently ignores write failures.
pub fn append_record(path: &PathBuf, record: &HistoryRecord) {
    let line = match serde_json::to_string(record) {
        Ok(l) => l,
        Err(_) => return,
    };

    let created = !path.exists();
    let file = OpenOptions::new().create(true).append(true).open(path);
    let mut file = match file {
        Ok(f) => f,
        Err(_) => return,
    };

    if file.lock_exclusive().is_err() {
        return;
    }
    let _ = writeln!(file, "{}", line);
    let _ = file.unlock();

    // Restrict new files to owner-only (0o600) on Unix
    #[cfg(unix)]
    if created {
        use std::os::unix::fs::PermissionsExt;
        let _ = fs::set_permissions(path, fs::Permissions::from_mode(0o600));
    }
}

/// Read all history records from the given file, skipping malformed lines.
pub fn read_all_records(path: &PathBuf) -> Vec<HistoryRecord> {
    let file = match File::open(path) {
        Ok(f) => f,
        Err(_) => return Vec::new(),
    };

    let reader = BufReader::new(file);
    reader
        .lines()
        .filter_map(|line| {
            let line = line.ok()?;
            let trimmed = line.trim();
            if trimmed.is_empty() {
                return None;
            }
            serde_json::from_str(trimmed).ok()
        })
        .collect()
}

/// Run retention cleanup on the given history file.
/// Removes the oldest records that are older than 3 months, but only when
/// the total count exceeds 10,000. Both conditions must be true (AND-combined).
/// Holds an exclusive lock for the entire read-modify-write cycle to prevent
/// concurrent data loss.
pub fn run_cleanup(path: &PathBuf) {
    let mut file = match OpenOptions::new()
        .read(true)
        .write(true)
        .create(true)
        .truncate(false)
        .open(path)
    {
        Ok(f) => f,
        Err(_) => return,
    };

    if file.lock_exclusive().is_err() {
        return;
    }

    let records: Vec<HistoryRecord> = BufReader::new(&file)
        .lines()
        .filter_map(|line| {
            let line = line.ok()?;
            let trimmed = line.trim();
            if trimmed.is_empty() {
                return None;
            }
            serde_json::from_str(trimmed).ok()
        })
        .collect();

    if records.len() <= MAX_RECORDS {
        let _ = file.unlock();
        return;
    }

    let cutoff = match Utc::now().checked_sub_months(Months::new(RETENTION_MONTHS)) {
        Some(c) => c,
        None => {
            let _ = file.unlock();
            return;
        }
    };

    let mut to_remove = records.len() - MAX_RECORDS;
    let keep: Vec<&HistoryRecord> = records
        .iter()
        .filter(|r| {
            if to_remove == 0 {
                return true;
            }
            if let Ok(ts) = DateTime::parse_from_rfc3339(&r.timestamp) {
                if ts.with_timezone(&Utc) < cutoff {
                    to_remove -= 1;
                    return false;
                }
            }
            true
        })
        .collect();

    if keep.len() < records.len() {
        let _ = file.set_len(0);
        let _ = std::io::Seek::seek(&mut file, std::io::SeekFrom::Start(0));
        for record in &keep {
            if let Ok(line) = serde_json::to_string(record) {
                let _ = writeln!(file, "{}", line);
            }
        }
    }

    let _ = file.unlock();
}

#[cfg(test)]
mod tests {
    use super::*;
    use chrono::Months;
    use tempfile::TempDir;

    fn make_record(timestamp: &str) -> HistoryRecord {
        HistoryRecord {
            command: "zilliz cluster list".to_string(),
            user: "test@example.com".to_string(),
            timestamp: timestamp.to_string(),
            command_type: "resource".to_string(),
            resource: Some("cluster".to_string()),
            operation: Some("list".to_string()),
            success: true,
            error_message: None,
        }
    }

    fn write_records(path: &PathBuf, records: &[HistoryRecord]) {
        let mut file = File::create(path).unwrap();
        for r in records {
            let line = serde_json::to_string(r).unwrap();
            writeln!(file, "{}", line).unwrap();
        }
    }

    fn now_iso() -> String {
        chrono::Local::now().to_rfc3339()
    }

    fn months_ago_iso(months: u32) -> String {
        let dt = Utc::now()
            .checked_sub_months(Months::new(months))
            .unwrap()
            .checked_sub_signed(chrono::Duration::days(1))
            .unwrap();
        dt.to_rfc3339()
    }

    #[test]
    fn test_under_count_with_old_records_no_pruning() {
        let dir = TempDir::new().unwrap();
        let path = dir.path().join("user.jsonl");

        let mut records = Vec::new();
        for _ in 0..8_000 {
            records.push(make_record(&months_ago_iso(6)));
        }
        write_records(&path, &records);

        run_cleanup(&path);

        let after = read_all_records(&path);
        assert_eq!(after.len(), 8_000);
    }

    #[test]
    fn test_over_count_with_no_old_records_no_pruning() {
        let dir = TempDir::new().unwrap();
        let path = dir.path().join("user.jsonl");

        let mut records = Vec::new();
        for _ in 0..12_000 {
            records.push(make_record(&now_iso()));
        }
        write_records(&path, &records);

        run_cleanup(&path);

        let after = read_all_records(&path);
        assert_eq!(after.len(), 12_000);
    }

    #[test]
    fn test_over_count_with_old_records_prune_oldest() {
        let dir = TempDir::new().unwrap();
        let path = dir.path().join("user.jsonl");

        let mut records = Vec::new();
        // 5,000 old records (older than 3 months)
        for _ in 0..5_000 {
            records.push(make_record(&months_ago_iso(6)));
        }
        // 7,000 recent records
        for _ in 0..7_000 {
            records.push(make_record(&now_iso()));
        }
        write_records(&path, &records);
        assert_eq!(records.len(), 12_000);

        run_cleanup(&path);

        let after = read_all_records(&path);
        // Should remove 2,000 oldest (12000 - 10000 = 2000 to remove, all old)
        assert_eq!(after.len(), 10_000);
    }

    #[test]
    fn test_exact_boundary_at_cap() {
        let dir = TempDir::new().unwrap();
        let path = dir.path().join("user.jsonl");

        let mut records = Vec::new();
        for _ in 0..10_000 {
            records.push(make_record(&months_ago_iso(6)));
        }
        write_records(&path, &records);

        run_cleanup(&path);

        let after = read_all_records(&path);
        // Exactly at cap, no pruning
        assert_eq!(after.len(), 10_000);
    }

    #[test]
    fn test_cross_user_isolation() {
        let dir = TempDir::new().unwrap();
        let alice_path = dir.path().join("alice.jsonl");
        let bob_path = dir.path().join("bob.jsonl");

        let mut alice_records = Vec::new();
        for _ in 0..12_000 {
            alice_records.push(make_record(&months_ago_iso(6)));
        }
        write_records(&alice_path, &alice_records);

        let mut bob_records = Vec::new();
        for _ in 0..100 {
            bob_records.push(make_record(&now_iso()));
        }
        write_records(&bob_path, &bob_records);

        // Cleanup alice's file only
        run_cleanup(&alice_path);

        // Alice's file should be trimmed
        let alice_after = read_all_records(&alice_path);
        assert_eq!(alice_after.len(), 10_000);

        // Bob's file untouched
        let bob_after = read_all_records(&bob_path);
        assert_eq!(bob_after.len(), 100);
    }
}