codebase-graph 1.1.6

Native codebaseGraph CLI and MCP server for local code knowledge graphs.
use crate::error::NativeError;
use lbug::{Connection, Database, SystemConfig};
use std::{
    fs::{self, OpenOptions},
    io::Write,
    path::{Path, PathBuf},
    thread,
    time::{Duration, SystemTime},
};

const WRITE_INTENT_FILE: &str = "graph-write-intent.lock";
const WRITE_INTENT_STALE_AFTER: Duration = Duration::from_secs(60);

#[derive(Clone, Copy, Debug)]
pub struct RetryPolicy {
    max_attempts: usize,
    initial_delay: Duration,
    max_delay: Duration,
}

impl RetryPolicy {
    pub const fn new(max_attempts: usize, initial_delay: Duration, max_delay: Duration) -> Self {
        Self {
            max_attempts,
            initial_delay,
            max_delay,
        }
    }
}

pub const READ_RETRY_POLICY: RetryPolicy =
    RetryPolicy::new(3, Duration::from_millis(40), Duration::from_millis(160));
pub const WRITE_RETRY_POLICY: RetryPolicy =
    RetryPolicy::new(8, Duration::from_millis(100), Duration::from_millis(1_000));

pub struct WriteIntentGuard {
    path: PathBuf,
}

impl Drop for WriteIntentGuard {
    fn drop(&mut self) {
        let _ = fs::remove_file(&self.path);
    }
}

pub fn is_transient_database_error(message: &str) -> bool {
    let message = message.to_ascii_lowercase();
    [
        "could not set lock",
        "lock is held",
        "database is locked",
        "database busy",
        "resource busy",
        "couldn't replay shadow pages",
        "read-only mode",
    ]
    .iter()
    .any(|needle| message.contains(needle))
}

pub fn retry_transient_database<T>(
    policy: RetryPolicy,
    mut operation: impl FnMut() -> Result<T, NativeError>,
) -> Result<T, NativeError> {
    let max_attempts = policy.max_attempts.max(1);
    let mut delay = policy.initial_delay;
    for attempt in 1..=max_attempts {
        match operation() {
            Ok(value) => return Ok(value),
            Err(error) => {
                if attempt == max_attempts || !is_transient_database_error(&error.to_string()) {
                    return Err(error);
                }
                thread::sleep(delay);
                delay = delay.saturating_mul(2).min(policy.max_delay);
            }
        }
    }
    unreachable!("retry loop always returns")
}

pub fn open_ladybug_database(db_path: &Path, read_only: bool) -> Result<Database, NativeError> {
    if read_only {
        wait_for_write_intent(db_path, READ_RETRY_POLICY)?;
    }
    Database::new(db_path, SystemConfig::default().read_only(read_only)).map_err(|error| {
        NativeError::Database(format!(
            "failed to open graph database {}: {error}",
            db_path.display()
        ))
    })
}

pub fn connect_ladybug_database(database: &Database) -> Result<Connection<'_>, NativeError> {
    Connection::new(database)
        .map_err(|error| NativeError::Database(format!("failed to connect to graph: {error}")))
}

pub fn acquire_write_intent(db_path: &Path) -> Result<WriteIntentGuard, NativeError> {
    let path = write_intent_path(db_path);
    if let Some(parent) = path.parent() {
        fs::create_dir_all(parent)?;
    }
    retry_transient_database(WRITE_RETRY_POLICY, || {
        remove_stale_write_intent(&path)?;
        match OpenOptions::new().write(true).create_new(true).open(&path) {
            Ok(mut file) => {
                writeln!(file, "pid={} unix_ms={}", std::process::id(), unix_ms())?;
                Ok(WriteIntentGuard { path: path.clone() })
            }
            Err(error) if error.kind() == std::io::ErrorKind::AlreadyExists => {
                Err(NativeError::Database(format!(
                    "graph database busy: write intent exists at {}",
                    path.display()
                )))
            }
            Err(error) => Err(NativeError::Io(error)),
        }
    })
}

fn wait_for_write_intent(db_path: &Path, policy: RetryPolicy) -> Result<(), NativeError> {
    let path = write_intent_path(db_path);
    retry_transient_database(policy, || {
        remove_stale_write_intent(&path)?;
        if path.exists() {
            Err(NativeError::Database(format!(
                "graph database busy: refresh/write in progress at {}",
                path.display()
            )))
        } else {
            Ok(())
        }
    })
}

fn remove_stale_write_intent(path: &Path) -> Result<(), NativeError> {
    let Ok(metadata) = fs::metadata(path) else {
        return Ok(());
    };
    let Ok(modified) = metadata.modified() else {
        return Ok(());
    };
    let Ok(age) = SystemTime::now().duration_since(modified) else {
        return Ok(());
    };
    if age >= WRITE_INTENT_STALE_AFTER {
        fs::remove_file(path)?;
    }
    Ok(())
}

fn write_intent_path(db_path: &Path) -> PathBuf {
    db_path
        .parent()
        .unwrap_or_else(|| Path::new("."))
        .join(WRITE_INTENT_FILE)
}

fn unix_ms() -> u128 {
    SystemTime::now()
        .duration_since(SystemTime::UNIX_EPOCH)
        .map(|duration| duration.as_millis())
        .unwrap_or(0)
}