iridium-db 0.4.0

A high-performance vector-graph hybrid storage and indexing engine
use super::{
    BackgroundJobConfig, BackgroundJobError, BackgroundJobRecord, BackgroundJobResult,
    BackgroundJobState, BackgroundJobTracker,
};

impl BackgroundJobTracker {
    pub fn new(config: BackgroundJobConfig) -> BackgroundJobResult<Self> {
        if config.max_queued_jobs == 0 {
            return Err(BackgroundJobError::InvalidInput(
                "max_queued_jobs must be > 0".to_string(),
            ));
        }
        if config.max_retained_terminal_jobs == 0 {
            return Err(BackgroundJobError::InvalidInput(
                "max_retained_terminal_jobs must be > 0".to_string(),
            ));
        }
        Ok(Self {
            config,
            next_job_id: 1,
            is_shutdown: false,
            queued: std::collections::VecDeque::new(),
            jobs: std::collections::HashMap::new(),
            terminal_order: std::collections::VecDeque::new(),
        })
    }

    pub fn submit(&mut self, job_kind: &str) -> BackgroundJobResult<u64> {
        if self.is_shutdown {
            return Err(BackgroundJobError::Shutdown);
        }
        if self.queued.len() >= self.config.max_queued_jobs {
            return Err(BackgroundJobError::Backpressure {
                queued_jobs: self.queued.len(),
                max_queued_jobs: self.config.max_queued_jobs,
            });
        }
        if job_kind.trim().is_empty() {
            return Err(BackgroundJobError::InvalidInput(
                "job_kind must not be empty".to_string(),
            ));
        }
        let job_id = self.next_job_id;
        self.next_job_id += 1;
        self.jobs.insert(
            job_id,
            BackgroundJobRecord {
                job_id,
                job_kind: job_kind.to_string(),
                attempts: 0,
                state: BackgroundJobState::Queued,
            },
        );
        self.queued.push_back(job_id);
        Ok(job_id)
    }

    pub fn start_next(&mut self) -> Option<u64> {
        let job_id = self.queued.pop_front()?;
        let record = self.jobs.get_mut(&job_id)?;
        record.attempts = record.attempts.saturating_add(1);
        record.state = BackgroundJobState::Running;
        Some(job_id)
    }

    pub fn mark_succeeded(&mut self, job_id: u64) -> BackgroundJobResult<()> {
        let record = self
            .jobs
            .get_mut(&job_id)
            .ok_or(BackgroundJobError::NotFound(job_id))?;
        if !matches!(record.state, BackgroundJobState::Running) {
            return Err(BackgroundJobError::InvalidTransition(format!(
                "job {} must be running to mark succeeded",
                job_id
            )));
        }
        record.state = BackgroundJobState::Succeeded;
        self.record_terminal(job_id);
        Ok(())
    }

    pub fn retry_running(&mut self, job_id: u64) -> BackgroundJobResult<u32> {
        let record = self
            .jobs
            .get_mut(&job_id)
            .ok_or(BackgroundJobError::NotFound(job_id))?;
        if !matches!(record.state, BackgroundJobState::Running) {
            return Err(BackgroundJobError::InvalidTransition(format!(
                "job {} must be running to retry",
                job_id
            )));
        }
        record.attempts = record.attempts.saturating_add(1);
        Ok(record.attempts)
    }

    pub fn mark_failed(&mut self, job_id: u64, reason: &str) -> BackgroundJobResult<()> {
        let record = self
            .jobs
            .get_mut(&job_id)
            .ok_or(BackgroundJobError::NotFound(job_id))?;
        if !matches!(record.state, BackgroundJobState::Running) {
            return Err(BackgroundJobError::InvalidTransition(format!(
                "job {} must be running to mark failed",
                job_id
            )));
        }
        record.state = BackgroundJobState::Failed {
            reason: reason.to_string(),
        };
        self.record_terminal(job_id);
        Ok(())
    }

    pub fn cancel(&mut self, job_id: u64) -> BackgroundJobResult<()> {
        let record = self
            .jobs
            .get_mut(&job_id)
            .ok_or(BackgroundJobError::NotFound(job_id))?;
        if !matches!(
            record.state,
            BackgroundJobState::Queued | BackgroundJobState::Running
        ) {
            return Err(BackgroundJobError::InvalidTransition(format!(
                "job {} must be queued/running to cancel",
                job_id
            )));
        }
        record.state = BackgroundJobState::Canceled;
        self.queued.retain(|queued_id| *queued_id != job_id);
        self.record_terminal(job_id);
        Ok(())
    }

    pub fn request_shutdown(&mut self) -> usize {
        self.is_shutdown = true;
        let pending = self.queued.drain(..).collect::<Vec<u64>>();
        for job_id in &pending {
            if let Some(record) = self.jobs.get_mut(job_id) {
                if matches!(record.state, BackgroundJobState::Queued) {
                    record.state = BackgroundJobState::Canceled;
                    self.record_terminal(*job_id);
                }
            }
        }
        pending.len()
    }

    pub fn status(&self, job_id: u64) -> Option<&BackgroundJobRecord> {
        self.jobs.get(&job_id)
    }

    pub fn snapshot(&self) -> Vec<BackgroundJobRecord> {
        let mut records = self
            .jobs
            .values()
            .cloned()
            .collect::<Vec<BackgroundJobRecord>>();
        records.sort_by_key(|record| record.job_id);
        records
    }

    pub fn queued_jobs(&self) -> usize {
        self.queued.len()
    }

    pub fn is_shutdown(&self) -> bool {
        self.is_shutdown
    }

    fn record_terminal(&mut self, job_id: u64) {
        self.terminal_order.push_back(job_id);
        while self.terminal_order.len() > self.config.max_retained_terminal_jobs {
            if let Some(evict_id) = self.terminal_order.pop_front() {
                self.jobs.remove(&evict_id);
            }
        }
    }
}