use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use std::fmt::{self};
use tabled::Tabled;
#[derive(Debug, Clone, Serialize, Deserialize, Tabled)]
#[cfg_attr(feature = "sqlx", derive(sqlx::FromRow))]
pub struct QueueMessage {
pub id: i64,
pub queue_id: i64,
pub payload: serde_json::Value,
pub vt: chrono::DateTime<chrono::Utc>,
pub enqueued_at: chrono::DateTime<chrono::Utc>,
pub read_ct: i32,
#[serde(skip_serializing_if = "Option::is_none")]
#[tabled(skip)]
pub dequeued_at: Option<chrono::DateTime<chrono::Utc>>,
#[serde(skip_serializing_if = "Option::is_none")]
#[tabled(skip)]
pub producer_worker_id: Option<i64>,
#[serde(skip_serializing_if = "Option::is_none")]
#[tabled(skip)]
pub consumer_worker_id: Option<i64>,
#[serde(skip_serializing_if = "Option::is_none")]
#[tabled(skip)]
pub archived_at: Option<chrono::DateTime<chrono::Utc>>,
}
impl fmt::Display for QueueMessage {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"QueueMessage {{ id: {}, queue_id: {}, read_ct: {}, enqueued_at: {}, vt: {}, payload: {} }}",
self.id, self.queue_id, self.read_ct, self.enqueued_at, self.vt, self.payload
)
}
}
impl QueueMessage {
pub fn get_processing_duration(&self) -> Option<chrono::Duration> {
self.dequeued_at.map(|dequeued| dequeued - self.enqueued_at)
}
}
#[derive(Clone, Debug, Serialize, Deserialize, Tabled)]
#[cfg_attr(feature = "sqlx", derive(sqlx::FromRow))]
pub struct QueueRecord {
pub id: i64,
pub queue_name: String,
pub created_at: DateTime<Utc>,
}
impl fmt::Display for QueueRecord {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"QueueRecord {{ id: {}, queue_name: {}, created_at: {} }}",
self.id, self.queue_name, self.created_at
)
}
}
#[derive(Debug, Clone, Serialize, Deserialize, Tabled)]
#[cfg_attr(feature = "sqlx", derive(sqlx::FromRow))]
pub struct WorkerRecord {
pub id: i64,
pub name: String,
#[serde(skip_serializing_if = "Option::is_none")]
#[tabled(skip)]
pub queue_id: Option<i64>,
pub started_at: DateTime<Utc>,
pub heartbeat_at: DateTime<Utc>,
#[tabled(skip)]
pub shutdown_at: Option<DateTime<Utc>>,
pub status: WorkerStatus,
}
impl fmt::Display for WorkerRecord {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self.queue_id {
Some(queue_id) => write!(
f,
"WorkerRecord {{ id: {}, name: {}, queue_id: {}, status: {:?} }}",
self.id, self.name, queue_id, self.status
),
None => write!(
f,
"WorkerRecord {{ id: {}, name: {}, queue_id: None, status: {:?} }}",
self.id, self.name, self.status
),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[cfg_attr(feature = "sqlx", derive(sqlx::Type))]
#[cfg_attr(
feature = "sqlx",
sqlx(type_name = "worker_status", rename_all = "snake_case")
)]
pub enum WorkerStatus {
Ready,
Polling,
Suspended,
Interrupted,
Stopped,
}
impl fmt::Display for WorkerStatus {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
WorkerStatus::Ready => write!(f, "ready"),
WorkerStatus::Polling => write!(f, "polling"),
WorkerStatus::Suspended => write!(f, "suspended"),
WorkerStatus::Interrupted => write!(f, "interrupted"),
WorkerStatus::Stopped => write!(f, "stopped"),
}
}
}
impl std::str::FromStr for WorkerStatus {
type Err = String;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"ready" => Ok(WorkerStatus::Ready),
"polling" => Ok(WorkerStatus::Polling),
"suspended" => Ok(WorkerStatus::Suspended),
"interrupted" => Ok(WorkerStatus::Interrupted),
"stopped" => Ok(WorkerStatus::Stopped),
_ => Err(format!("Invalid worker status: {}", s)),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct NewQueueRecord {
pub queue_name: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct NewWorkerRecord {
pub name: String,
pub queue_id: Option<i64>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct NewQueueMessage {
pub queue_id: i64,
pub payload: serde_json::Value,
pub read_ct: i32,
pub enqueued_at: DateTime<Utc>,
pub vt: DateTime<Utc>,
pub producer_worker_id: Option<i64>,
pub consumer_worker_id: Option<i64>,
}
#[derive(Debug, Clone)]
pub struct BatchInsertParams {
pub read_ct: i32,
pub enqueued_at: DateTime<Utc>,
pub vt: DateTime<Utc>,
pub producer_worker_id: Option<i64>,
pub consumer_worker_id: Option<i64>,
}
#[derive(Debug, PartialEq, Eq, Clone, Copy, Serialize, Deserialize)]
#[cfg_attr(feature = "sqlx", derive(sqlx::Type))]
#[cfg_attr(
feature = "sqlx",
sqlx(
type_name = "pgqrs_workflow_status",
rename_all = "SCREAMING_SNAKE_CASE"
)
)]
pub enum WorkflowStatus {
Queued,
Running,
Paused,
Success,
Error,
}
impl fmt::Display for WorkflowStatus {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
WorkflowStatus::Queued => write!(f, "QUEUED"),
WorkflowStatus::Running => write!(f, "RUNNING"),
WorkflowStatus::Paused => write!(f, "PAUSED"),
WorkflowStatus::Success => write!(f, "SUCCESS"),
WorkflowStatus::Error => write!(f, "ERROR"),
}
}
}
impl std::str::FromStr for WorkflowStatus {
type Err = String;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"QUEUED" => Ok(WorkflowStatus::Queued),
"RUNNING" => Ok(WorkflowStatus::Running),
"PAUSED" => Ok(WorkflowStatus::Paused),
"SUCCESS" => Ok(WorkflowStatus::Success),
"ERROR" => Ok(WorkflowStatus::Error),
_ => Err(format!("Invalid workflow status: {}", s)),
}
}
}
#[derive(Debug, Clone)]
#[cfg_attr(feature = "sqlx", derive(sqlx::FromRow))]
pub struct WorkflowRecord {
pub id: i64,
pub name: String,
pub queue_id: i64,
pub created_at: DateTime<Utc>,
}
pub struct NewWorkflowRecord {
pub name: String,
pub queue_id: i64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[cfg_attr(feature = "sqlx", derive(sqlx::FromRow))]
pub struct RunRecord {
pub id: i64,
pub workflow_id: i64,
pub message_id: i64,
pub status: WorkflowStatus,
pub input: Option<serde_json::Value>,
pub output: Option<serde_json::Value>,
pub error: Option<serde_json::Value>,
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct NewRunRecord {
pub workflow_id: i64,
pub message_id: i64,
pub input: Option<serde_json::Value>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[cfg_attr(feature = "sqlx", derive(sqlx::FromRow))]
pub struct StepRecord {
pub id: i64,
pub run_id: i64,
pub step_name: String,
pub status: WorkflowStatus,
pub input: Option<serde_json::Value>,
pub output: Option<serde_json::Value>,
pub error: Option<serde_json::Value>,
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
pub retry_at: Option<DateTime<Utc>>,
pub retry_count: i32,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct NewStepRecord {
pub run_id: i64,
pub step_name: String,
pub input: Option<serde_json::Value>,
}