buf-toolchain 1.70.0

Installer for official Buf CLI binaries pinned by crate semver.
//! Cache-slot lock helpers for coordinating concurrent build.rs writers.

use std::fs::{self, OpenOptions};
use std::io::Write;
use std::path::{Path, PathBuf};
use std::thread::sleep;
use std::time::{Duration, SystemTime, UNIX_EPOCH};

const LOCK_FILENAME: &str = ".cache-slot.lock";
const LOCK_WAIT_STEP: Duration = Duration::from_millis(250);
const LOCK_WAIT_TIMEOUT: Duration = Duration::from_secs(120);
const LOCK_STALE_AFTER: Duration = Duration::from_secs(600);

pub enum SlotLockState {
    Acquired(SlotLockGuard),
    WaitedForOtherWriter,
}

pub struct SlotLockGuard {
    path: PathBuf,
}

impl Drop for SlotLockGuard {
    fn drop(&mut self) {
        fs::remove_file(&self.path).ok();
    }
}

pub fn acquire_or_wait_for_slot(
    slot: &Path,
    warn: &mut dyn FnMut(String),
) -> Result<SlotLockState, String> {
    let lock_path = slot.join(LOCK_FILENAME);
    match try_acquire(&lock_path) {
        Ok(Some(guard)) => Ok(SlotLockState::Acquired(guard)),
        Ok(None) => {
            warn(format!(
                "buf-tools: cache slot lock exists at {}. Waiting for peer writer.",
                lock_path.display()
            ));
            wait_for_unlock(&lock_path, warn)?;
            Ok(SlotLockState::WaitedForOtherWriter)
        }
        Err(err) => Err(err),
    }
}

fn try_acquire(lock_path: &Path) -> Result<Option<SlotLockGuard>, String> {
    let mut lock = match OpenOptions::new()
        .write(true)
        .create_new(true)
        .open(lock_path)
    {
        Ok(file) => file,
        Err(err) if err.kind() == std::io::ErrorKind::AlreadyExists => return Ok(None),
        Err(err) => return Err(format!("create lock {}: {err}", lock_path.display())),
    };
    let now = SystemTime::now()
        .duration_since(UNIX_EPOCH)
        .map_err(|e| format!("system clock before unix epoch: {e}"))?
        .as_secs();
    let pid = std::process::id();
    writeln!(lock, "pid={pid} unix_ts={now}")
        .map_err(|e| format!("write lock {}: {e}", lock_path.display()))?;
    Ok(Some(SlotLockGuard {
        path: lock_path.to_path_buf(),
    }))
}

fn wait_for_unlock(lock_path: &Path, warn: &mut dyn FnMut(String)) -> Result<(), String> {
    let mut waited = Duration::ZERO;
    while lock_path.exists() {
        if is_stale(lock_path)? {
            warn(format!(
                "buf-tools: cache slot lock looks stale at {}. Removing stale lock.",
                lock_path.display()
            ));
            fs::remove_file(lock_path)
                .map_err(|e| format!("remove stale lock {}: {e}", lock_path.display()))?;
            return Ok(());
        }
        if waited >= LOCK_WAIT_TIMEOUT {
            return Err(format!(
                "buf-tools: timed out waiting for cache slot lock {} after {}s",
                lock_path.display(),
                LOCK_WAIT_TIMEOUT.as_secs()
            ));
        }
        sleep(LOCK_WAIT_STEP);
        waited += LOCK_WAIT_STEP;
    }
    warn(format!(
        "buf-tools: peer writer released cache slot lock {} after {}ms",
        lock_path.display(),
        waited.as_millis()
    ));
    Ok(())
}

fn is_stale(lock_path: &Path) -> Result<bool, String> {
    if let Some(owner_pid) = read_lock_pid(lock_path)?
        && !pid_is_alive(owner_pid)
    {
        return Ok(true);
    }
    let meta = fs::metadata(lock_path).map_err(|e| format!("stat {}: {e}", lock_path.display()))?;
    let modified = meta
        .modified()
        .map_err(|e| format!("mtime {}: {e}", lock_path.display()))?;
    let age = SystemTime::now()
        .duration_since(modified)
        .map_err(|e| format!("mtime in future for {}: {e}", lock_path.display()))?;
    Ok(age >= LOCK_STALE_AFTER)
}

fn read_lock_pid(lock_path: &Path) -> Result<Option<u32>, String> {
    let content =
        fs::read_to_string(lock_path).map_err(|e| format!("read {}: {e}", lock_path.display()))?;
    let Some(pid_part) = content
        .split_whitespace()
        .find(|part| part.starts_with("pid="))
    else {
        return Ok(None);
    };
    let pid = pid_part["pid=".len()..].parse::<u32>().ok();
    Ok(pid)
}

#[cfg(unix)]
fn pid_is_alive(pid: u32) -> bool {
    PathBuf::from("/proc").join(pid.to_string()).exists()
}

#[cfg(not(unix))]
fn pid_is_alive(_pid: u32) -> bool {
    true
}

#[cfg(test)]
mod tests {
    use super::*;
    use std::sync::{Arc, Mutex};
    use std::thread;

    fn test_dir(name: &str) -> PathBuf {
        let unique = format!(
            "buf-tools-lock-test-{}-{}",
            name,
            SystemTime::now()
                .duration_since(UNIX_EPOCH)
                .expect("clock")
                .as_nanos()
        );
        let dir = std::env::temp_dir().join(unique);
        fs::create_dir_all(&dir).expect("mkdir");
        dir
    }

    #[test]
    fn acquires_and_releases_lock() {
        let slot = test_dir("acquire-release");
        let mut warn = |_msg: String| {};
        let state = acquire_or_wait_for_slot(&slot, &mut warn).expect("acquire");
        let guard = match state {
            SlotLockState::Acquired(guard) => guard,
            SlotLockState::WaitedForOtherWriter => panic!("unexpected wait"),
        };
        assert!(slot.join(LOCK_FILENAME).exists());
        drop(guard);
        assert!(!slot.join(LOCK_FILENAME).exists());
    }

    #[test]
    fn waits_for_peer_then_returns_waited_state() {
        let slot = test_dir("wait-peer");
        let lock_path = slot.join(LOCK_FILENAME);
        let mut lock = OpenOptions::new()
            .create_new(true)
            .write(true)
            .open(&lock_path)
            .expect("create peer lock");
        writeln!(lock, "peer lock").expect("write peer lock");
        drop(lock);

        let logs: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(Vec::new()));
        let logs_for_thread = Arc::clone(&logs);
        let slot_for_thread = slot.clone();
        let handle = thread::spawn(move || {
            let mut warn = |msg: String| logs_for_thread.lock().expect("lock logs").push(msg);
            acquire_or_wait_for_slot(&slot_for_thread, &mut warn)
        });

        sleep(Duration::from_millis(300));
        fs::remove_file(&lock_path).expect("release peer lock");

        let state = handle.join().expect("thread join").expect("wait state");
        assert!(matches!(state, SlotLockState::WaitedForOtherWriter));
        let joined = logs.lock().expect("lock logs").join("\n");
        assert!(
            joined.contains("Waiting for peer writer")
                && joined.contains("released cache slot lock"),
            "logs were: {joined}"
        );
    }
}