ix-daemon 0.1.0

Global per-user daemon for Ixchel IPC, sync queueing, and single-writer enforcement
Documentation
use crate::{SyncState, SyncStats};
use std::collections::HashMap;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::{RwLock, broadcast};

#[derive(Debug, Clone, Hash, PartialEq, Eq)]
pub struct QueueKey {
    pub repo_root: String,
    pub tool: String,
}

impl QueueKey {
    pub fn new(repo_root: impl Into<String>, tool: impl Into<String>) -> Self {
        Self {
            repo_root: repo_root.into(),
            tool: tool.into(),
        }
    }
}

#[derive(Debug, Clone)]
pub struct SyncJob {
    pub id: String,
    pub key: QueueKey,
    pub directory: String,
    pub force: bool,
    pub state: SyncState,
    pub queued_at: Instant,
    pub started_at: Option<Instant>,
    pub completed_at: Option<Instant>,
    pub stats: Option<SyncStats>,
    pub error: Option<String>,
    state_tx: broadcast::Sender<SyncState>,
}

impl SyncJob {
    pub fn new(key: QueueKey, directory: String, force: bool) -> Self {
        let (state_tx, _) = broadcast::channel(16);
        Self {
            id: uuid::Uuid::new_v4().to_string(),
            key,
            directory,
            force,
            state: SyncState::Queued,
            queued_at: Instant::now(),
            started_at: None,
            completed_at: None,
            stats: None,
            error: None,
            state_tx,
        }
    }

    pub fn subscribe(&self) -> broadcast::Receiver<SyncState> {
        self.state_tx.subscribe()
    }

    fn set_state(&mut self, new_state: SyncState) {
        self.state = new_state;
        let _ = self.state_tx.send(new_state);
    }

    pub fn start(&mut self) {
        self.started_at = Some(Instant::now());
        self.set_state(SyncState::Running);
    }

    pub fn complete(&mut self, job_stats: SyncStats) {
        self.completed_at = Some(Instant::now());
        self.stats = Some(job_stats);
        self.set_state(SyncState::Done);
    }

    pub fn fail(&mut self, error: String) {
        self.completed_at = Some(Instant::now());
        self.error = Some(error);
        self.set_state(SyncState::Error);
    }

    #[allow(clippy::cast_possible_truncation)]
    pub fn queued_at_ms(&self) -> u64 {
        self.queued_at.elapsed().as_millis() as u64
    }
}

pub struct SyncQueue {
    jobs: Arc<RwLock<HashMap<String, SyncJob>>>,
    pending: Arc<RwLock<HashMap<QueueKey, String>>>,
}

impl SyncQueue {
    pub fn new() -> Self {
        Self {
            jobs: Arc::new(RwLock::new(HashMap::new())),
            pending: Arc::new(RwLock::new(HashMap::new())),
        }
    }

    pub async fn enqueue(
        &self,
        repo_root: &str,
        tool: &str,
        directory: &str,
        force: bool,
    ) -> (String, bool) {
        let key = QueueKey::new(repo_root, tool);

        {
            let pending = self.pending.read().await;
            if let Some(existing_id) = pending.get(&key) {
                let jobs = self.jobs.read().await;
                if let Some(job) = jobs.get(existing_id)
                    && job.state == SyncState::Queued
                    && !force
                {
                    return (existing_id.clone(), false);
                }
            }
        }

        let job = SyncJob::new(key.clone(), directory.to_string(), force);
        let id = job.id.clone();

        self.jobs.write().await.insert(id.clone(), job);
        self.pending.write().await.insert(key, id.clone());

        (id, true)
    }

    pub async fn get(&self, id: &str) -> Option<SyncJob> {
        self.jobs.read().await.get(id).cloned()
    }

    pub async fn get_pending(&self, key: &QueueKey) -> Option<SyncJob> {
        let id = self.pending.read().await.get(key).cloned()?;
        self.jobs.read().await.get(&id).cloned()
    }

    #[allow(clippy::significant_drop_tightening)]
    pub async fn start(&self, id: &str) -> bool {
        let mut jobs = self.jobs.write().await;
        if let Some(job) = jobs.get_mut(id)
            && job.state == SyncState::Queued
        {
            job.start();
            return true;
        }
        false
    }

    pub async fn complete(&self, id: &str, job_stats: SyncStats) {
        let key = {
            let mut jobs = self.jobs.write().await;
            jobs.get_mut(id).map(|job| {
                job.complete(job_stats);
                job.key.clone()
            })
        };

        if let Some(key) = key {
            let mut pending = self.pending.write().await;
            if pending.get(&key).is_some_and(|pid| pid == id) {
                pending.remove(&key);
            }
        }
    }

    pub async fn fail(&self, id: &str, error: String) {
        let key = {
            let mut jobs = self.jobs.write().await;
            jobs.get_mut(id).map(|job| {
                job.fail(error);
                job.key.clone()
            })
        };

        if let Some(key) = key {
            let mut pending = self.pending.write().await;
            if pending.get(&key).is_some_and(|pid| pid == id) {
                pending.remove(&key);
            }
        }
    }

    #[allow(clippy::significant_drop_tightening)]
    pub async fn wait(&self, id: &str, timeout: Duration) -> Option<SyncState> {
        let mut rx = {
            let jobs = self.jobs.read().await;
            let job = jobs.get(id)?;

            if job.state == SyncState::Done || job.state == SyncState::Error {
                return Some(job.state);
            }

            job.subscribe()
        };

        let deadline = Instant::now() + timeout;

        loop {
            let remaining = deadline.saturating_duration_since(Instant::now());
            if remaining.is_zero() {
                return None;
            }

            match tokio::time::timeout(remaining, rx.recv()).await {
                Ok(Ok(new_state)) => {
                    if new_state == SyncState::Done || new_state == SyncState::Error {
                        return Some(new_state);
                    }
                }
                Ok(Err(_)) | Err(_) => return None,
            }
        }
    }

    pub async fn list_queues(&self) -> Vec<crate::QueueInfo> {
        let pending = self.pending.read().await;
        let jobs = self.jobs.read().await;

        pending
            .iter()
            .filter_map(|(key, id)| {
                let job = jobs.get(id)?;
                let active = (job.state == SyncState::Running).then(|| id.clone());
                let pending_count = u32::from(job.state == SyncState::Queued);
                Some(crate::QueueInfo {
                    repo_root: key.repo_root.clone(),
                    tool: key.tool.clone(),
                    pending: pending_count,
                    active,
                })
            })
            .collect()
    }

    pub async fn cleanup_old(&self, max_age: Duration) {
        let now = Instant::now();
        let mut jobs = self.jobs.write().await;

        jobs.retain(|_, job| {
            job.completed_at
                .is_none_or(|completed_at| now.duration_since(completed_at) < max_age)
        });
    }
}

impl Default for SyncQueue {
    fn default() -> Self {
        Self::new()
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[tokio::test]
    async fn test_enqueue_new_job() {
        let queue = SyncQueue::new();
        let (id, is_new) = queue
            .enqueue("/repo", "decisions", ".ixchel/decisions", false)
            .await;

        assert!(is_new);
        assert!(!id.is_empty());

        let job = queue.get(&id).await.unwrap();
        assert_eq!(job.state, SyncState::Queued);
        assert_eq!(job.key.repo_root, "/repo");
        assert_eq!(job.key.tool, "decisions");
    }

    #[tokio::test]
    async fn test_enqueue_coalesces_duplicate() {
        let queue = SyncQueue::new();
        let (id1, is_new1) = queue
            .enqueue("/repo", "decisions", ".ixchel/decisions", false)
            .await;
        let (id2, is_new2) = queue
            .enqueue("/repo", "decisions", ".ixchel/decisions", false)
            .await;

        assert!(is_new1);
        assert!(!is_new2);
        assert_eq!(id1, id2);
    }

    #[tokio::test]
    async fn test_enqueue_force_creates_new() {
        let queue = SyncQueue::new();
        let (id1, _) = queue
            .enqueue("/repo", "decisions", ".ixchel/decisions", false)
            .await;
        let (id2, is_new2) = queue
            .enqueue("/repo", "decisions", ".ixchel/decisions", true)
            .await;

        assert!(is_new2);
        assert_ne!(id1, id2);
    }

    #[tokio::test]
    async fn test_different_repos_separate_queues() {
        let queue = SyncQueue::new();
        let (id1, is_new1) = queue
            .enqueue("/repo1", "decisions", ".ixchel/decisions", false)
            .await;
        let (id2, is_new2) = queue
            .enqueue("/repo2", "decisions", ".ixchel/decisions", false)
            .await;

        assert!(is_new1);
        assert!(is_new2);
        assert_ne!(id1, id2);
    }

    #[tokio::test]
    async fn test_job_lifecycle() {
        let queue = SyncQueue::new();
        let (id, _) = queue
            .enqueue("/repo", "decisions", ".ixchel/decisions", false)
            .await;

        assert!(queue.start(&id).await);

        let job = queue.get(&id).await.unwrap();
        assert_eq!(job.state, SyncState::Running);

        queue
            .complete(
                &id,
                SyncStats {
                    files_scanned: 10,
                    files_updated: 2,
                    duration_ms: 100,
                },
            )
            .await;

        let job = queue.get(&id).await.unwrap();
        assert_eq!(job.state, SyncState::Done);
        assert!(job.stats.is_some());
    }

    #[tokio::test]
    async fn test_wait_already_complete() {
        let queue = SyncQueue::new();
        let (id, _) = queue
            .enqueue("/repo", "decisions", ".ixchel/decisions", false)
            .await;

        queue.start(&id).await;
        queue.complete(&id, SyncStats::default()).await;

        let result = queue.wait(&id, Duration::from_millis(100)).await;
        assert_eq!(result, Some(SyncState::Done));
    }

    #[tokio::test]
    async fn test_list_queues() {
        let queue = SyncQueue::new();
        queue
            .enqueue("/repo1", "decisions", ".ixchel/decisions", false)
            .await;
        queue
            .enqueue("/repo2", "issues", ".ixchel/issues", false)
            .await;

        let queues = queue.list_queues().await;
        assert_eq!(queues.len(), 2);
    }
}