iridium-db 0.4.0

A high-performance vector-graph hybrid storage and indexing engine
use std::path::{Path, PathBuf};
use std::time::{Duration, SystemTime};

pub trait Reactor {
    fn now(&self) -> SystemTime;
    fn read_file(&self, path: &Path) -> std::io::Result<Vec<u8>>;
    fn write_file(&self, path: &Path, data: &[u8]) -> std::io::Result<()>;
    fn append_file(&self, path: &Path, data: &[u8]) -> std::io::Result<u64>;
    fn read_dir(&self, path: &Path) -> std::io::Result<Vec<PathBuf>>;
    fn create_dir_all(&self, path: &Path) -> std::io::Result<()>;
    fn metadata_len(&self, path: &Path) -> std::io::Result<u64>;
    fn sleep(&self, duration: Duration);
    fn random_u64(&self) -> u64;
}

pub struct SystemReactor;

impl Reactor for SystemReactor {
    fn now(&self) -> SystemTime {
        SystemTime::now()
    }

    fn read_file(&self, path: &Path) -> std::io::Result<Vec<u8>> {
        std::fs::read(path)
    }

    fn write_file(&self, path: &Path, data: &[u8]) -> std::io::Result<()> {
        std::fs::write(path, data)
    }

    fn append_file(&self, path: &Path, data: &[u8]) -> std::io::Result<u64> {
        use std::io::Write;
        let mut file = std::fs::OpenOptions::new()
            .create(true)
            .append(true)
            .open(path)?;
        file.write_all(data)?;
        file.flush()?;
        Ok(data.len() as u64)
    }

    fn read_dir(&self, path: &Path) -> std::io::Result<Vec<PathBuf>> {
        let mut entries = Vec::new();
        for entry in std::fs::read_dir(path)? {
            entries.push(entry?.path());
        }
        Ok(entries)
    }

    fn create_dir_all(&self, path: &Path) -> std::io::Result<()> {
        std::fs::create_dir_all(path)
    }

    fn metadata_len(&self, path: &Path) -> std::io::Result<u64> {
        Ok(std::fs::metadata(path)?.len())
    }

    fn sleep(&self, duration: Duration) {
        std::thread::sleep(duration);
    }

    fn random_u64(&self) -> u64 {
        let nanos = SystemTime::now()
            .duration_since(SystemTime::UNIX_EPOCH)
            .unwrap_or_default()
            .as_nanos();
        nanos as u64
    }
}

pub struct DeterministicReactor {
    now: std::sync::Mutex<SystemTime>,
    random_seed: std::sync::atomic::AtomicU64,
}

impl DeterministicReactor {
    pub fn new(now: SystemTime, seed: u64) -> Self {
        Self {
            now: std::sync::Mutex::new(now),
            random_seed: std::sync::atomic::AtomicU64::new(seed),
        }
    }
}

impl Default for DeterministicReactor {
    fn default() -> Self {
        Self::new(SystemTime::UNIX_EPOCH, 0)
    }
}

impl Reactor for DeterministicReactor {
    fn now(&self) -> SystemTime {
        *self.now.lock().expect("reactor time lock")
    }

    fn read_file(&self, path: &Path) -> std::io::Result<Vec<u8>> {
        std::fs::read(path)
    }

    fn write_file(&self, path: &Path, data: &[u8]) -> std::io::Result<()> {
        std::fs::write(path, data)
    }

    fn append_file(&self, path: &Path, data: &[u8]) -> std::io::Result<u64> {
        use std::io::Write;
        let mut file = std::fs::OpenOptions::new()
            .create(true)
            .append(true)
            .open(path)?;
        file.write_all(data)?;
        file.flush()?;
        Ok(data.len() as u64)
    }

    fn read_dir(&self, path: &Path) -> std::io::Result<Vec<PathBuf>> {
        let mut entries = Vec::new();
        for entry in std::fs::read_dir(path)? {
            entries.push(entry?.path());
        }
        Ok(entries)
    }

    fn create_dir_all(&self, path: &Path) -> std::io::Result<()> {
        std::fs::create_dir_all(path)
    }

    fn metadata_len(&self, path: &Path) -> std::io::Result<u64> {
        Ok(std::fs::metadata(path)?.len())
    }

    fn sleep(&self, duration: Duration) {
        let mut now = self.now.lock().expect("reactor time lock");
        *now += duration;
    }

    fn random_u64(&self) -> u64 {
        use std::sync::atomic::Ordering;
        let mut current = self.random_seed.load(Ordering::SeqCst);
        loop {
            let next = current.wrapping_mul(6364136223846793005).wrapping_add(1);
            match self.random_seed.compare_exchange(
                current,
                next,
                Ordering::SeqCst,
                Ordering::SeqCst,
            ) {
                Ok(_) => return next,
                Err(updated) => current = updated,
            }
        }
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn deterministic_reactor_advances_time() {
        let start = SystemTime::UNIX_EPOCH + Duration::from_secs(10);
        let reactor = DeterministicReactor::new(start, 1);
        assert_eq!(reactor.now(), start);
        reactor.sleep(Duration::from_secs(5));
        assert_eq!(reactor.now(), start + Duration::from_secs(5));
    }

    #[test]
    fn deterministic_reactor_random_is_repeatable() {
        let reactor = DeterministicReactor::new(SystemTime::UNIX_EPOCH, 42);
        let first = reactor.random_u64();
        let second = reactor.random_u64();
        let reactor2 = DeterministicReactor::new(SystemTime::UNIX_EPOCH, 42);
        assert_eq!(first, reactor2.random_u64());
        assert_eq!(second, reactor2.random_u64());
    }
}