convergio-ipc 0.1.2

Message bus, SSE event streaming, agent registry
Documentation
use rusqlite::{params, OptionalExtension};

use crate::types::{AcquireResult, FileLock, IpcResult};
use convergio_db::pool::ConnPool;

pub fn acquire(
    pool: &ConnPool,
    file_path: &str,
    agent: &str,
    host: &str,
    pid: i64,
) -> IpcResult<AcquireResult> {
    let conn = pool.get()?;
    let tx = conn.unchecked_transaction()?;

    let existing: Option<FileLock> = tx
        .query_row(
            "SELECT file_path, locked_by, host, pid, locked_at
             FROM ipc_file_locks WHERE file_path = ?1",
            params![file_path],
            map_lock,
        )
        .optional()?;

    if let Some(lock) = existing {
        if lock.locked_by == agent && lock.host == host {
            tx.execute(
                "UPDATE ipc_file_locks SET pid = ?1, locked_at = strftime('%Y-%m-%dT%H:%M:%f','now')
                 WHERE file_path = ?2",
                params![pid, file_path],
            )?;
            tx.commit()?;
            return Ok(AcquireResult::Acquired);
        }
        tx.commit()?;
        return Ok(AcquireResult::Rejected(lock));
    }

    tx.execute(
        "INSERT INTO ipc_file_locks (file_path, locked_by, host, pid)
         VALUES (?1, ?2, ?3, ?4)",
        params![file_path, agent, host, pid],
    )?;
    tx.commit()?;
    Ok(AcquireResult::Acquired)
}

pub fn release(pool: &ConnPool, file_path: &str, agent: &str, host: &str) -> IpcResult<bool> {
    let conn = pool.get()?;
    let deleted = conn.execute(
        "DELETE FROM ipc_file_locks WHERE file_path = ?1 AND locked_by = ?2 AND host = ?3",
        params![file_path, agent, host],
    )?;
    Ok(deleted > 0)
}

pub fn list_locks(pool: &ConnPool) -> IpcResult<Vec<FileLock>> {
    let conn = pool.get()?;
    let mut stmt = conn.prepare(
        "SELECT file_path, locked_by, host, pid, locked_at
         FROM ipc_file_locks ORDER BY locked_at DESC",
    )?;
    let locks = stmt
        .query_map([], map_lock)?
        .filter_map(|r| r.ok())
        .collect();
    Ok(locks)
}

pub fn prune_dead(pool: &ConnPool, local_host: &str) -> IpcResult<usize> {
    let locks = list_locks(pool)?;
    let conn = pool.get()?;
    let mut pruned = 0;
    for lock in &locks {
        if lock.host != local_host {
            continue;
        }
        if !is_pid_alive(lock.pid) {
            conn.execute(
                "DELETE FROM ipc_file_locks WHERE file_path = ?1 AND locked_by = ?2 AND host = ?3",
                params![lock.file_path, lock.locked_by, lock.host],
            )?;
            pruned += 1;
        }
    }
    Ok(pruned)
}

fn is_pid_alive(pid: i64) -> bool {
    #[cfg(unix)]
    {
        unsafe { libc::kill(pid as i32, 0) == 0 }
    }
    #[cfg(not(unix))]
    {
        let _ = pid;
        true
    }
}

fn map_lock(row: &rusqlite::Row<'_>) -> rusqlite::Result<FileLock> {
    Ok(FileLock {
        file_path: row.get(0)?,
        locked_by: row.get(1)?,
        host: row.get(2)?,
        pid: row.get(3)?,
        locked_at: row.get(4)?,
    })
}

#[cfg(test)]
mod tests {
    use super::*;

    fn pool() -> ConnPool {
        let p = convergio_db::pool::create_memory_pool().unwrap();
        let conn = p.get().unwrap();
        convergio_db::migration::ensure_registry(&conn).unwrap();
        convergio_db::migration::apply_migrations(&conn, "ipc", &crate::schema::migrations())
            .unwrap();
        p
    }

    #[test]
    fn acquire_and_release() {
        let p = pool();
        let r = acquire(&p, "src/main.rs", "elena", "m5max", 1234).unwrap();
        assert_eq!(r, AcquireResult::Acquired);
        let locks = list_locks(&p).unwrap();
        assert_eq!(locks.len(), 1);
        assert!(release(&p, "src/main.rs", "elena", "m5max").unwrap());
        assert_eq!(list_locks(&p).unwrap().len(), 0);
    }

    #[test]
    fn reacquire_by_same_agent() {
        let p = pool();
        acquire(&p, "f.rs", "elena", "m5max", 100).unwrap();
        let r = acquire(&p, "f.rs", "elena", "m5max", 200).unwrap();
        assert_eq!(r, AcquireResult::Acquired);
    }

    #[test]
    fn rejected_by_different_agent() {
        let p = pool();
        acquire(&p, "f.rs", "elena", "m5max", 100).unwrap();
        let r = acquire(&p, "f.rs", "baccio", "m1pro", 200).unwrap();
        assert!(matches!(r, AcquireResult::Rejected(_)));
    }
}