use anyhow::Result;
use chrono::Utc;
use sea_orm::entity::prelude::*;
use sea_orm::{ActiveValue::Set, Condition, QueryOrder};
use serde::{Deserialize, Serialize};
const MAX_BACKOFF_SECS: i64 = 3600;
#[derive(Clone, Debug, PartialEq, Eq, DeriveEntityModel, Serialize, Deserialize)]
#[sea_orm(table_name = "process_event_deliveries")]
pub struct Model {
#[sea_orm(primary_key)]
pub id: i32,
pub process_event_id: i32,
pub gateway_name: String,
pub delivered_at: Option<i64>,
pub attempts: i32,
pub last_error: Option<String>,
pub next_retry_at: Option<i64>,
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
pub enum Relation {}
impl ActiveModelBehavior for ActiveModel {}
impl Model {
pub async fn find_pending_for_gateway(
db: &DatabaseConnection,
gateway_name: &str,
) -> Result<Vec<Model>> {
let now = Utc::now().timestamp();
let rows = Entity::find()
.filter(Column::GatewayName.eq(gateway_name))
.filter(Column::DeliveredAt.is_null())
.filter(
Condition::any()
.add(Column::NextRetryAt.is_null())
.add(Column::NextRetryAt.lte(now)),
)
.order_by_asc(Column::ProcessEventId)
.all(db)
.await?;
Ok(rows)
}
pub async fn count_pending_for_event(
db: &DatabaseConnection,
process_event_id: i32,
) -> Result<u64> {
let count = Entity::find()
.filter(Column::ProcessEventId.eq(process_event_id))
.filter(Column::DeliveredAt.is_null())
.count(db)
.await?;
Ok(count)
}
pub async fn upsert_pending(
db: &DatabaseConnection,
process_event_id: i32,
gateway_name: &str,
) -> Result<i32> {
if let Some(existing) = Entity::find()
.filter(Column::ProcessEventId.eq(process_event_id))
.filter(Column::GatewayName.eq(gateway_name))
.one(db)
.await?
{
return Ok(existing.id);
}
let active = ActiveModel {
process_event_id: Set(process_event_id),
gateway_name: Set(gateway_name.to_string()),
delivered_at: Set(None),
attempts: Set(0),
last_error: Set(None),
next_retry_at: Set(None),
..Default::default()
};
let inserted = active.insert(db).await?;
Ok(inserted.id)
}
pub async fn mark_delivered(db: &DatabaseConnection, id: i32) -> Result<()> {
let now = Utc::now().timestamp();
let active = ActiveModel {
id: Set(id),
delivered_at: Set(Some(now)),
last_error: Set(None),
..Default::default()
};
active.update(db).await?;
Ok(())
}
pub async fn record_failure(db: &DatabaseConnection, id: i32, error: &str) -> Result<()> {
let record = Entity::find_by_id(id)
.one(db)
.await?
.ok_or_else(|| anyhow::anyhow!("process_event_deliveries record {} not found", id))?;
let new_attempts = record.attempts + 1;
let backoff_secs = (1i64 << new_attempts).min(MAX_BACKOFF_SECS);
let next_retry_at = Utc::now().timestamp() + backoff_secs;
let active = ActiveModel {
id: Set(id),
attempts: Set(new_attempts),
last_error: Set(Some(error.to_string())),
next_retry_at: Set(Some(next_retry_at)),
..Default::default()
};
active.update(db).await?;
Ok(())
}
}
pub fn backoff_secs_for_attempt(attempts: i32) -> i64 {
let shift = attempts.min(62) as u32;
(1i64 << shift).min(MAX_BACKOFF_SECS)
}