use crate::task::{HeaderMap, IntoHeaders, ToHashMap};
use sea_orm::entity::prelude::*;
use std::collections::HashMap;
use tracing::warn;
#[derive(Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum)]
#[sea_orm(rs_type = "String", db_type = "String(StringLen::N(50))")]
pub enum TaskState {
#[sea_orm(string_value = "pending")]
Pending,
#[sea_orm(string_value = "active")]
Active,
#[sea_orm(string_value = "scheduled")]
Scheduled,
#[sea_orm(string_value = "retry")]
Retry,
#[sea_orm(string_value = "archived")]
Archived,
#[sea_orm(string_value = "completed")]
Completed,
#[sea_orm(string_value = "aggregating")]
Aggregating,
}
#[derive(Clone, Debug, PartialEq, Eq, DeriveEntityModel)]
#[sea_orm(table_name = "asynq_tasks")]
pub struct Model {
#[sea_orm(primary_key, auto_increment = false)]
pub id: String,
pub queue: String,
pub task_type: String,
#[sea_orm(column_type = "VarBinary(StringLen::None)")]
pub payload: Vec<u8>,
pub state: TaskState,
pub retry: i32,
pub retried: i32,
pub error_msg: Option<String>,
pub last_failed_at: Option<DateTimeWithTimeZone>,
pub timeout_seconds: i64,
pub deadline: Option<DateTimeWithTimeZone>,
pub unique_key: Option<String>,
pub group_key: Option<String>,
pub retention_seconds: i64,
pub completed_at: Option<DateTimeWithTimeZone>,
pub process_at: DateTimeWithTimeZone,
pub created_at: DateTimeWithTimeZone,
pub updated_at: DateTimeWithTimeZone,
pub lease_expires_at: Option<DateTimeWithTimeZone>,
#[sea_orm(nullable)]
pub headers: Option<serde_json::Value>,
pub tenant_id: Option<String>,
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
pub enum Relation {
#[sea_orm(
belongs_to = "super::queues::Entity",
from = "Column::Queue",
to = "super::queues::Column::Name"
)]
Queue,
#[sea_orm(has_many = "super::workers::Entity")]
Workers,
}
impl Related<super::queues::Entity> for Entity {
fn to() -> RelationDef {
Relation::Queue.def()
}
}
impl Related<super::workers::Entity> for Entity {
fn to() -> RelationDef {
Relation::Workers.def()
}
}
impl ActiveModelBehavior for ActiveModel {}
impl Model {
pub fn parse_headers(&self) -> HeaderMap {
match &self.headers {
Some(h) => match serde_json::from_value::<HashMap<String, String>>(h.clone()) {
Ok(headers) => headers.into_headers(),
Err(e) => {
warn!(
task_id = %self.id,
error = %e,
"Failed to parse task headers JSON, returning empty headers"
);
HeaderMap::new()
}
},
None => HeaderMap::new(),
}
}
}
pub fn serialize_headers<H: IntoHeaders>(headers: H) -> Option<serde_json::Value> {
let headers = headers.into_headers();
let headers = headers.to_hashmap();
if headers.is_empty() {
None
} else {
match serde_json::to_value(headers) {
Ok(json) => Some(json),
Err(e) => {
warn!(
error = %e,
"Failed to serialize task headers to JSON, headers will not be stored"
);
None
}
}
}
}