opencrabs 0.3.37

The autonomous, self-improving AI agent. Single Rust binary. Every channel. Install with: cargo install opencrabs
Documentation
use crate::db::Pool;
use crate::db::database::interact_err;
use crate::db::models::CronJob;
use anyhow::{Context, Result};
use rusqlite::params;

/// Extension trait for rusqlite to add `.optional()` to query results
trait OptionalExt<T> {
    fn optional(self) -> rusqlite::Result<Option<T>>;
}

impl<T> OptionalExt<T> for rusqlite::Result<T> {
    fn optional(self) -> rusqlite::Result<Option<T>> {
        match self {
            Ok(v) => Ok(Some(v)),
            Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
            Err(e) => Err(e),
        }
    }
}

#[derive(Clone)]
pub struct CronJobRepository {
    pool: Pool,
}

impl CronJobRepository {
    pub fn new(pool: Pool) -> Self {
        Self { pool }
    }

    pub async fn insert(&self, job: &CronJob) -> Result<()> {
        let j = job.clone();
        self.pool
            .get()
            .await
            .context("Failed to get connection")?
            .interact(move |conn| {
                conn.execute(
                    "INSERT INTO cron_jobs (id, name, cron_expr, timezone, prompt, provider, model, thinking, auto_approve, deliver_to, deliver_api_key, enabled, next_run_at, created_at, updated_at, profile_name)
                     VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16)",
                    params![
                        j.id.to_string(),
                        j.name,
                        j.cron_expr,
                        j.timezone,
                        j.prompt,
                        j.provider,
                        j.model,
                        j.thinking,
                        j.auto_approve as i32,
                        j.deliver_to,
                        j.deliver_api_key,
                        j.enabled as i32,
                        j.next_run_at.map(|d| d.to_rfc3339()),
                        j.created_at.to_rfc3339(),
                        j.updated_at.to_rfc3339(),
                        j.profile_name,
                    ],
                )
            })
            .await
            .map_err(interact_err)?
            .context("Failed to insert cron job")?;
        Ok(())
    }

    pub async fn list_all(&self) -> Result<Vec<CronJob>> {
        self.pool
            .get()
            .await
            .context("Failed to get connection")?
            .interact(|conn| -> anyhow::Result<Vec<CronJob>> {
                let mut stmt = conn.prepare_cached("SELECT * FROM cron_jobs ORDER BY name")?;
                let rows = stmt.query_map([], |row| {
                    // Capture the id BEFORE from_row so a decode failure
                    // points at a specific job rather than the anonymous
                    // "row 0/0" rusqlite produces by default. Useful for
                    // the 2026-05-17 class of bug where one bad row
                    // silently empties an entire list query.
                    let id: String = row.get::<_, String>("id").unwrap_or_default();
                    CronJob::from_row(row).map_err(|e| {
                        rusqlite::Error::FromSqlConversionFailure(
                            0,
                            rusqlite::types::Type::Text,
                            format!("cron_jobs row id={id}: {e}").into(),
                        )
                    })
                })?;
                let mut out = Vec::new();
                for row in rows {
                    out.push(row.context("cron_jobs row decode failed")?);
                }
                Ok(out)
            })
            .await
            .map_err(interact_err)?
            .context("Failed to list cron jobs")
    }

    pub async fn list_enabled(&self) -> Result<Vec<CronJob>> {
        self.pool
            .get()
            .await
            .context("Failed to get connection")?
            .interact(|conn| {
                let mut stmt =
                    conn.prepare_cached("SELECT * FROM cron_jobs WHERE enabled = 1 ORDER BY name")?;
                let rows = stmt.query_map([], CronJob::from_row)?;
                rows.collect::<std::result::Result<Vec<_>, _>>()
            })
            .await
            .map_err(interact_err)?
            .context("Failed to list enabled cron jobs")
    }

    pub async fn find_by_id(&self, id: &str) -> Result<Option<CronJob>> {
        let id = id.to_string();
        self.pool
            .get()
            .await
            .context("Failed to get connection")?
            .interact(move |conn| {
                conn.prepare_cached("SELECT * FROM cron_jobs WHERE id = ?1")?
                    .query_row(params![id], CronJob::from_row)
                    .optional()
            })
            .await
            .map_err(interact_err)?
            .context("Failed to find cron job")
    }

    pub async fn find_by_name(&self, name: &str) -> Result<Option<CronJob>> {
        let name = name.to_string();
        self.pool
            .get()
            .await
            .context("Failed to get connection")?
            .interact(move |conn| {
                conn.prepare_cached("SELECT * FROM cron_jobs WHERE name = ?1")?
                    .query_row(params![name], CronJob::from_row)
                    .optional()
            })
            .await
            .map_err(interact_err)?
            .context("Failed to find cron job by name")
    }

    pub async fn delete(&self, id: &str) -> Result<bool> {
        let id = id.to_string();
        let rows = self
            .pool
            .get()
            .await
            .context("Failed to get connection")?
            .interact(move |conn| conn.execute("DELETE FROM cron_jobs WHERE id = ?1", params![id]))
            .await
            .map_err(interact_err)?
            .context("Failed to delete cron job")?;
        Ok(rows > 0)
    }

    pub async fn set_enabled(&self, id: &str, enabled: bool) -> Result<bool> {
        let id = id.to_string();
        let rows = self
            .pool
            .get()
            .await
            .context("Failed to get connection")?
            .interact(move |conn| {
                conn.execute(
                    "UPDATE cron_jobs SET enabled = ?1, updated_at = strftime('%Y-%m-%dT%H:%M:%SZ', 'now') WHERE id = ?2",
                    params![enabled as i32, id],
                )
            })
            .await
            .map_err(interact_err)?
            .context("Failed to set cron job enabled")?;
        Ok(rows > 0)
    }

    /// Set next_run_at to a past timestamp so the scheduler fires it on the next tick.
    /// Also ensures the job is enabled.
    pub async fn trigger_now(&self, id: &str) -> Result<bool> {
        let id = id.to_string();
        let rows = self
            .pool
            .get()
            .await
            .context("Failed to get connection")?
            .interact(move |conn| {
                conn.execute(
                    "UPDATE cron_jobs SET next_run_at = '2000-01-01T00:00:00Z', enabled = 1, updated_at = strftime('%Y-%m-%dT%H:%M:%SZ', 'now') WHERE id = ?1",
                    params![id],
                )
            })
            .await
            .map_err(interact_err)?
            .context("Failed to trigger cron job")?;
        Ok(rows > 0)
    }

    pub async fn update_last_run(&self, id: &str, next_run_at: Option<&str>) -> Result<()> {
        let id = id.to_string();
        let next = next_run_at.map(|s| s.to_string());
        self.pool
            .get()
            .await
            .context("Failed to get connection")?
            .interact(move |conn| {
                conn.execute(
                    "UPDATE cron_jobs SET last_run_at = strftime('%Y-%m-%dT%H:%M:%SZ', 'now'), next_run_at = ?1, updated_at = strftime('%Y-%m-%dT%H:%M:%SZ', 'now') WHERE id = ?2",
                    params![next, id],
                )
            })
            .await
            .map_err(interact_err)?
            .context("Failed to update last run")?;
        Ok(())
    }
}