focl 0.1.0

focl/focld - lightweight Rust BGP speaker
Documentation
use std::fs;
use std::path::{Path, PathBuf};

use anyhow::{Context, Result};
use chrono::Utc;
use rusqlite::{params, Connection};

#[derive(Debug, Clone)]
pub struct ReplicationQueue {
    db_path: PathBuf,
}

#[derive(Debug, Clone)]
pub struct ReplicationJob {
    pub id: i64,
    pub segment_path: PathBuf,
    pub manifest_path: PathBuf,
    pub destination_key: String,
    pub attempts: u32,
    pub max_retries: u32,
}

impl ReplicationQueue {
    pub fn new(root: &Path) -> Result<Self> {
        let db_path = root.join(".replication").join("queue.sqlite");
        if let Some(parent) = db_path.parent() {
            fs::create_dir_all(parent)
                .with_context(|| format!("failed creating replication dir {}", parent.display()))?;
        }

        let queue = Self { db_path };
        queue.init()?;
        Ok(queue)
    }

    pub fn db_path(&self) -> &Path {
        &self.db_path
    }

    fn open(&self) -> Result<Connection> {
        let conn = Connection::open(&self.db_path)
            .with_context(|| format!("failed opening queue db {}", self.db_path.display()))?;
        Ok(conn)
    }

    fn init(&self) -> Result<()> {
        let conn = self.open()?;
        conn.execute_batch(
            "
            CREATE TABLE IF NOT EXISTS replication_queue (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                segment_path TEXT NOT NULL,
                manifest_path TEXT NOT NULL,
                destination_key TEXT NOT NULL,
                attempts INTEGER NOT NULL DEFAULT 0,
                max_retries INTEGER NOT NULL DEFAULT 0,
                next_retry_ts INTEGER NOT NULL,
                status TEXT NOT NULL,
                last_error TEXT,
                created_ts INTEGER NOT NULL,
                updated_ts INTEGER NOT NULL
            );
            CREATE INDEX IF NOT EXISTS idx_replication_queue_ready
            ON replication_queue(status, next_retry_ts);
            ",
        )?;
        Ok(())
    }

    pub fn enqueue(
        &self,
        segment_path: &Path,
        manifest_path: &Path,
        destination_key: &str,
        max_retries: u32,
    ) -> Result<()> {
        let now = Utc::now().timestamp();
        let conn = self.open()?;
        conn.execute(
            "
            INSERT INTO replication_queue (
                segment_path, manifest_path, destination_key, attempts, max_retries,
                next_retry_ts, status, created_ts, updated_ts
            ) VALUES (?, ?, ?, 0, ?, ?, 'pending', ?, ?)
            ",
            params![
                segment_path.display().to_string(),
                manifest_path.display().to_string(),
                destination_key,
                max_retries,
                now,
                now,
                now
            ],
        )?;
        Ok(())
    }

    pub fn claim_ready(&self, limit: usize) -> Result<Vec<ReplicationJob>> {
        let now = Utc::now().timestamp();
        let conn = self.open()?;
        let tx = conn.unchecked_transaction()?;

        let jobs: Vec<ReplicationJob> = {
            let mut stmt = tx.prepare(
                "
                SELECT id, segment_path, manifest_path, destination_key, attempts, max_retries
                FROM replication_queue
                WHERE status = 'pending' AND next_retry_ts <= ?
                ORDER BY id ASC
                LIMIT ?
                ",
            )?;

            let rows = stmt.query_map(params![now, limit as i64], |row| {
                Ok(ReplicationJob {
                    id: row.get(0)?,
                    segment_path: PathBuf::from(row.get::<_, String>(1)?),
                    manifest_path: PathBuf::from(row.get::<_, String>(2)?),
                    destination_key: row.get(3)?,
                    attempts: row.get::<_, u32>(4)?,
                    max_retries: row.get::<_, u32>(5)?,
                })
            })?;

            rows.collect::<Result<Vec<_>, _>>()?
        };

        for job in &jobs {
            tx.execute(
                "UPDATE replication_queue SET status = 'in_progress', updated_ts = ? WHERE id = ?",
                params![now, job.id],
            )?;
        }

        tx.commit()?;
        Ok(jobs)
    }

    pub fn mark_success(&self, job_id: i64) -> Result<()> {
        let conn = self.open()?;
        conn.execute(
            "DELETE FROM replication_queue WHERE id = ?",
            params![job_id],
        )?;
        Ok(())
    }

    pub fn mark_failed(
        &self,
        job: &ReplicationJob,
        error: &str,
        retry_backoff_secs: u64,
    ) -> Result<()> {
        let now = Utc::now().timestamp();
        let conn = self.open()?;
        let next_attempt = job.attempts.saturating_add(1);

        let exhausted = job.max_retries > 0 && next_attempt >= job.max_retries;
        if exhausted {
            conn.execute(
                "
                UPDATE replication_queue
                SET attempts = ?, status = 'failed', last_error = ?, updated_ts = ?
                WHERE id = ?
                ",
                params![next_attempt, error, now, job.id],
            )?;
        } else {
            let next_retry = now + retry_backoff_secs as i64;
            conn.execute(
                "
                UPDATE replication_queue
                SET attempts = ?, status = 'pending', next_retry_ts = ?, last_error = ?, updated_ts = ?
                WHERE id = ?
                ",
                params![next_attempt, next_retry, error, now, job.id],
            )?;
        }

        Ok(())
    }

    pub fn pending_count(&self) -> Result<usize> {
        let conn = self.open()?;
        let count: i64 = conn.query_row(
            "SELECT COUNT(*) FROM replication_queue WHERE status IN ('pending', 'in_progress')",
            [],
            |row| row.get(0),
        )?;
        Ok(count as usize)
    }

    pub fn failed_count(&self) -> Result<usize> {
        let conn = self.open()?;
        let count: i64 = conn.query_row(
            "SELECT COUNT(*) FROM replication_queue WHERE status = 'failed'",
            [],
            |row| row.get(0),
        )?;
        Ok(count as usize)
    }

    pub fn retry_failed(&self) -> Result<usize> {
        let now = Utc::now().timestamp();
        let conn = self.open()?;
        let updated = conn.execute(
            "
            UPDATE replication_queue
            SET status = 'pending', next_retry_ts = ?, updated_ts = ?
            WHERE status = 'failed'
            ",
            params![now, now],
        )?;
        Ok(updated)
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn queue_persists_jobs() {
        let tmp = tempfile::tempdir().unwrap();
        let queue = ReplicationQueue::new(tmp.path()).unwrap();

        queue
            .enqueue(
                Path::new("/tmp/segment.gz"),
                Path::new("/tmp/segment.gz.json"),
                "local:/tmp/archive",
                0,
            )
            .unwrap();

        assert_eq!(queue.pending_count().unwrap(), 1);

        let jobs = queue.claim_ready(10).unwrap();
        assert_eq!(jobs.len(), 1);

        queue.mark_success(jobs[0].id).unwrap();
        assert_eq!(queue.pending_count().unwrap(), 0);
    }
}