libnoa 0.1.1

AI-native distributed version control system with per-agent workspace isolation, JSONL append-only logs, snapshot-based history, and full git protocol compatibility
Documentation
use async_trait::async_trait;
use std::{
    fs::{File, OpenOptions},
    io::{Read, Seek, SeekFrom, Write},
    path::Path,
    sync::Mutex,
};

use super::{format, AgentLog, LogEntry};
use crate::error::{NoaError, Result};

pub struct FileAgentLog {
    file: Mutex<File>,
    next_seq: std::sync::atomic::AtomicU64,
}

impl FileAgentLog {
    pub fn create(path: &Path) -> Result<Self> {
        if let Some(parent) = path.parent() {
            std::fs::create_dir_all(parent)?;
        }
        let file = OpenOptions::new()
            .create(true)
            .append(true)
            .read(true)
            .open(path)
            .map_err(NoaError::Io)?;
        let log = FileAgentLog {
            file: Mutex::new(file),
            next_seq: std::sync::atomic::AtomicU64::new(1),
        };
        Ok(log)
    }

    pub fn open(path: &Path) -> Result<Self> {
        if !path.exists() {
            return Err(NoaError::Io(std::io::Error::new(
                std::io::ErrorKind::NotFound,
                format!("log file not found: {}", path.display()),
            )));
        }
        let file = OpenOptions::new()
            .append(true)
            .read(true)
            .open(path)
            .map_err(NoaError::Io)?;
        let log = FileAgentLog {
            file: Mutex::new(file),
            next_seq: std::sync::atomic::AtomicU64::new(1),
        };
        let max_seq = log.compute_max_seq()?;
        log.next_seq
            .store(max_seq + 1, std::sync::atomic::Ordering::SeqCst);
        Ok(log)
    }

    fn compute_max_seq(&self) -> Result<u64> {
        let mut file = self
            .file
            .lock()
            .map_err(|e| NoaError::Io(std::io::Error::other(e.to_string())))?;
        file.seek(SeekFrom::Start(0))?;
        let mut content = String::new();
        file.read_to_string(&mut content)?;
        let entries = format::deserialize_entries(&content)?;
        Ok(entries.iter().map(|e| e.seq).max().unwrap_or(0))
    }
}

#[async_trait]
impl AgentLog for FileAgentLog {
    async fn append(&self, entry: &LogEntry) -> Result<u64> {
        let seq = self
            .next_seq
            .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
        let mut assigned_entry = entry.clone();
        assigned_entry.seq = seq;
        let line = format::serialize_entry(&assigned_entry)?;
        let mut file = self
            .file
            .lock()
            .map_err(|e| NoaError::Io(std::io::Error::other(e.to_string())))?;
        writeln!(file, "{}", line)?;
        file.sync_all()?;
        Ok(seq)
    }

    async fn read_since(&self, seq: u64) -> Result<Vec<LogEntry>> {
        let entries = self.read_all().await?;
        Ok(entries.into_iter().filter(|e| e.seq > seq).collect())
    }

    async fn read_all(&self) -> Result<Vec<LogEntry>> {
        let mut file = self
            .file
            .lock()
            .map_err(|e| NoaError::Io(std::io::Error::other(e.to_string())))?;
        file.seek(SeekFrom::Start(0))?;
        let mut content = String::new();
        file.read_to_string(&mut content)?;
        format::deserialize_entries(&content)
    }

    async fn next_seq(&self) -> Result<u64> {
        Ok(self.next_seq.load(std::sync::atomic::Ordering::SeqCst))
    }

    async fn compact_to(&self, up_to_seq: u64) -> Result<()> {
        let entries = self.read_all().await?;
        let remaining: Vec<LogEntry> = entries.into_iter().filter(|e| e.seq > up_to_seq).collect();

        let mut file = self
            .file
            .lock()
            .map_err(|e| NoaError::Io(std::io::Error::other(e.to_string())))?;

        file.seek(SeekFrom::Start(0))?;
        file.set_len(0)?;

        for entry in &remaining {
            let line = format::serialize_entry(entry)?;
            writeln!(file, "{}", line)?;
        }
        file.sync_all()?;

        Ok(())
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::log::OpType;
    use tempfile::TempDir;

    fn make_entry(seq: u64, op: OpType, path: &str, ts: u64) -> LogEntry {
        LogEntry {
            seq,
            op,
            path: Some(path.to_string()),
            blob_id: None,
            from_path: None,
            resolved_conflict_ours_id: None,
            resolved_conflict_theirs_id: None,
            snapshot_id: None,
            ts,
            message: None,
        }
    }

    #[tokio::test]
    async fn test_append_and_read() {
        let tmp = TempDir::new().unwrap();
        let log_path = tmp.path().join("test.log");
        let log = FileAgentLog::create(&log_path).unwrap();

        let e1 = make_entry(1, OpType::Write, "a.rs", 100);
        let e2 = make_entry(2, OpType::Delete, "b.rs", 200);

        log.append(&e1).await.unwrap();
        log.append(&e2).await.unwrap();

        let entries = log.read_all().await.unwrap();
        assert_eq!(entries.len(), 2);
        assert_eq!(entries[0], e1);
        assert_eq!(entries[1], e2);
    }

    #[tokio::test]
    async fn test_read_since() {
        let tmp = TempDir::new().unwrap();
        let log_path = tmp.path().join("test.log");
        let log = FileAgentLog::create(&log_path).unwrap();

        for i in 1..=5 {
            log.append(&make_entry(
                i,
                OpType::Write,
                &format!("f{}.rs", i),
                i * 100,
            ))
            .await
            .unwrap();
        }

        let entries = log.read_since(2).await.unwrap();
        assert_eq!(entries.len(), 3);
        assert_eq!(entries[0].seq, 3);
        assert_eq!(entries[2].seq, 5);
    }

    #[tokio::test]
    async fn test_concurrent_appends() {
        use std::sync::Arc;

        let tmp = TempDir::new().unwrap();
        let log_path = tmp.path().join("concurrent.log");
        let log = Arc::new(FileAgentLog::create(&log_path).unwrap());

        let mut handles = Vec::new();
        for thread_id in 0..10 {
            let log = Arc::clone(&log);
            handles.push(tokio::spawn(async move {
                for i in 0..10 {
                    let seq = thread_id * 10 + i + 1;
                    let entry = make_entry(
                        seq,
                        OpType::Write,
                        &format!("t{}-{}.rs", thread_id, i),
                        seq * 100,
                    );
                    log.append(&entry).await.unwrap();
                }
            }));
        }

        for h in handles {
            h.await.unwrap();
        }

        let entries = log.read_all().await.unwrap();
        assert_eq!(entries.len(), 100);
    }

    #[tokio::test]
    async fn test_open_existing() {
        let tmp = TempDir::new().unwrap();
        let log_path = tmp.path().join("existing.log");
        let log = FileAgentLog::create(&log_path).unwrap();
        log.append(&make_entry(1, OpType::Write, "x.rs", 100))
            .await
            .unwrap();
        drop(log);

        let log2 = FileAgentLog::open(&log_path).unwrap();
        let entries = log2.read_all().await.unwrap();
        assert_eq!(entries.len(), 1);
    }

    #[tokio::test]
    async fn test_open_missing_fails() {
        let tmp = TempDir::new().unwrap();
        let result = FileAgentLog::open(&tmp.path().join("missing.log"));
        assert!(result.is_err());
    }
}