robson-core 0.1.0

Rust async agent orchestrator for automated development workflows
Documentation
use anyhow::Result;
use chrono::Utc;
use sea_orm::entity::prelude::*;
use sea_orm::{ActiveValue::Set, Condition, QueryOrder};
use serde::{Deserialize, Serialize};

/// Maximum backoff in seconds (1 hour).
const MAX_BACKOFF_SECS: i64 = 3600;

#[derive(Clone, Debug, PartialEq, Eq, DeriveEntityModel, Serialize, Deserialize)]
#[sea_orm(table_name = "process_event_deliveries")]
pub struct Model {
    #[sea_orm(primary_key)]
    pub id: i32,
    /// FK to process_events.id
    pub process_event_id: i32,
    /// Matches Gateway::name() — one record per (process_event_id, gateway_name)
    pub gateway_name: String,
    /// Unix epoch when this gateway successfully delivered the event. NULL = pending.
    pub delivered_at: Option<i64>,
    /// Number of delivery attempts made so far.
    pub attempts: i32,
    /// Last error message from a failed delivery attempt (UTF-8).
    pub last_error: Option<String>,
    /// Unix epoch of the earliest next retry. NULL = retry immediately.
    pub next_retry_at: Option<i64>,
}

#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
pub enum Relation {}

impl ActiveModelBehavior for ActiveModel {}

impl Model {
    /// Returns all undelivered records for `gateway_name` whose retry window has elapsed.
    ///
    /// Includes:
    /// - Rows where `delivered_at IS NULL` AND (`next_retry_at IS NULL` OR `next_retry_at <= now`)
    pub async fn find_pending_for_gateway(
        db: &DatabaseConnection,
        gateway_name: &str,
    ) -> Result<Vec<Model>> {
        let now = Utc::now().timestamp();
        let rows = Entity::find()
            .filter(Column::GatewayName.eq(gateway_name))
            .filter(Column::DeliveredAt.is_null())
            .filter(
                Condition::any()
                    .add(Column::NextRetryAt.is_null())
                    .add(Column::NextRetryAt.lte(now)),
            )
            .order_by_asc(Column::ProcessEventId)
            .all(db)
            .await?;
        Ok(rows)
    }

    /// Returns the count of undelivered records for a given process_event_id.
    pub async fn count_pending_for_event(
        db: &DatabaseConnection,
        process_event_id: i32,
    ) -> Result<u64> {
        let count = Entity::find()
            .filter(Column::ProcessEventId.eq(process_event_id))
            .filter(Column::DeliveredAt.is_null())
            .count(db)
            .await?;
        Ok(count)
    }

    /// Creates a delivery record for `(process_event_id, gateway_name)` if one does not exist.
    /// Returns the existing or newly created record id.
    pub async fn upsert_pending(
        db: &DatabaseConnection,
        process_event_id: i32,
        gateway_name: &str,
    ) -> Result<i32> {
        // Check if already exists
        if let Some(existing) = Entity::find()
            .filter(Column::ProcessEventId.eq(process_event_id))
            .filter(Column::GatewayName.eq(gateway_name))
            .one(db)
            .await?
        {
            return Ok(existing.id);
        }

        let active = ActiveModel {
            process_event_id: Set(process_event_id),
            gateway_name: Set(gateway_name.to_string()),
            delivered_at: Set(None),
            attempts: Set(0),
            last_error: Set(None),
            next_retry_at: Set(None),
            ..Default::default()
        };
        let inserted = active.insert(db).await?;
        Ok(inserted.id)
    }

    /// Marks a delivery record as successfully delivered at the current Unix epoch.
    pub async fn mark_delivered(db: &DatabaseConnection, id: i32) -> Result<()> {
        let now = Utc::now().timestamp();
        let active = ActiveModel {
            id: Set(id),
            delivered_at: Set(Some(now)),
            last_error: Set(None),
            ..Default::default()
        };
        active.update(db).await?;
        Ok(())
    }

    /// Records a failed delivery attempt, increments attempts, and schedules exponential backoff.
    ///
    /// `next_retry_at = now + min(2^attempts, MAX_BACKOFF_SECS)`
    pub async fn record_failure(db: &DatabaseConnection, id: i32, error: &str) -> Result<()> {
        let record = Entity::find_by_id(id)
            .one(db)
            .await?
            .ok_or_else(|| anyhow::anyhow!("process_event_deliveries record {} not found", id))?;

        let new_attempts = record.attempts + 1;
        let backoff_secs = (1i64 << new_attempts).min(MAX_BACKOFF_SECS);
        let next_retry_at = Utc::now().timestamp() + backoff_secs;

        let active = ActiveModel {
            id: Set(id),
            attempts: Set(new_attempts),
            last_error: Set(Some(error.to_string())),
            next_retry_at: Set(Some(next_retry_at)),
            ..Default::default()
        };
        active.update(db).await?;
        Ok(())
    }
}

/// Computes the next retry delay in seconds for a given attempt number (1-based).
///
/// `delay = min(2^attempts, MAX_BACKOFF_SECS)`
pub fn backoff_secs_for_attempt(attempts: i32) -> i64 {
    // Cap the shift to 62 to prevent overflow on i64 before applying the MAX_BACKOFF_SECS cap.
    let shift = attempts.min(62) as u32;
    (1i64 << shift).min(MAX_BACKOFF_SECS)
}