use super::{ClaimResult, HeartbeatResult, RunnerClaimResult, StaleClaim, TaskExecutionDAL};
use crate::dal::unified::models::{NewUnifiedExecutionEvent, UnifiedTaskExecution};
use crate::database::schema::unified::{execution_events, task_executions, task_outbox};
use crate::database::universal_types::{UniversalTimestamp, UniversalUuid};
use crate::error::ValidationError;
use crate::models::execution_event::ExecutionEventType;
use crate::models::task_execution::TaskExecution;
use diesel::prelude::*;
use uuid::Uuid;
impl<'a> TaskExecutionDAL<'a> {
pub async fn schedule_retry(
&self,
task_id: UniversalUuid,
retry_at: UniversalTimestamp,
new_attempt: i32,
) -> Result<(), ValidationError> {
crate::dispatch_backend!(
self.dal.backend(),
self.schedule_retry_postgres(task_id, retry_at, new_attempt)
.await,
self.schedule_retry_sqlite(task_id, retry_at, new_attempt)
.await
)
}
#[cfg(feature = "postgres")]
async fn schedule_retry_postgres(
&self,
task_id: UniversalUuid,
retry_at: UniversalTimestamp,
new_attempt: i32,
) -> Result<(), ValidationError> {
use crate::dal::unified::models::NewUnifiedTaskOutbox;
use diesel::connection::Connection;
let conn = self
.dal
.database
.get_postgres_connection()
.await
.map_err(|e| ValidationError::ConnectionPool(e.to_string()))?;
conn.interact(move |conn| {
conn.transaction::<_, diesel::result::Error, _>(|conn| {
let now = UniversalTimestamp::now();
let task: UnifiedTaskExecution =
task_executions::table.find(task_id).first(conn)?;
diesel::update(task_executions::table.find(task_id))
.set((
task_executions::status.eq("Ready"),
task_executions::attempt.eq(new_attempt),
task_executions::retry_at.eq(Some(retry_at)),
task_executions::started_at.eq(None::<UniversalTimestamp>),
task_executions::completed_at.eq(None::<UniversalTimestamp>),
task_executions::updated_at.eq(now),
))
.execute(conn)?;
let event_data = serde_json::json!({
"attempt": new_attempt,
"retry_at": retry_at.to_string()
})
.to_string();
let event = NewUnifiedExecutionEvent {
id: UniversalUuid::new_v4(),
workflow_execution_id: task.workflow_execution_id,
task_execution_id: Some(task_id),
event_type: ExecutionEventType::TaskRetryScheduled.as_str().to_string(),
event_data: Some(event_data),
worker_id: None,
created_at: now,
};
diesel::insert_into(execution_events::table)
.values(&event)
.execute(conn)?;
let outbox_entry = NewUnifiedTaskOutbox {
task_execution_id: task_id,
created_at: retry_at,
};
diesel::insert_into(task_outbox::table)
.values(&outbox_entry)
.execute(conn)?;
Ok(())
})
})
.await
.map_err(|e| ValidationError::ConnectionPool(e.to_string()))??;
Ok(())
}
#[cfg(feature = "sqlite")]
async fn schedule_retry_sqlite(
&self,
task_id: UniversalUuid,
retry_at: UniversalTimestamp,
new_attempt: i32,
) -> Result<(), ValidationError> {
use crate::dal::unified::models::NewUnifiedTaskOutbox;
use diesel::connection::Connection;
let conn = self
.dal
.database
.get_sqlite_connection()
.await
.map_err(|e| ValidationError::ConnectionPool(e.to_string()))?;
conn.interact(move |conn| {
conn.transaction::<_, diesel::result::Error, _>(|conn| {
let now = UniversalTimestamp::now();
let task: UnifiedTaskExecution =
task_executions::table.find(task_id).first(conn)?;
diesel::update(task_executions::table.find(task_id))
.set((
task_executions::status.eq("Ready"),
task_executions::attempt.eq(new_attempt),
task_executions::retry_at.eq(Some(retry_at)),
task_executions::started_at.eq(None::<UniversalTimestamp>),
task_executions::completed_at.eq(None::<UniversalTimestamp>),
task_executions::updated_at.eq(now),
))
.execute(conn)?;
let event_data = serde_json::json!({
"attempt": new_attempt,
"retry_at": retry_at.to_string()
})
.to_string();
let event = NewUnifiedExecutionEvent {
id: UniversalUuid::new_v4(),
workflow_execution_id: task.workflow_execution_id,
task_execution_id: Some(task_id),
event_type: ExecutionEventType::TaskRetryScheduled.as_str().to_string(),
event_data: Some(event_data),
worker_id: None,
created_at: now,
};
diesel::insert_into(execution_events::table)
.values(&event)
.execute(conn)?;
let outbox_entry = NewUnifiedTaskOutbox {
task_execution_id: task_id,
created_at: retry_at,
};
diesel::insert_into(task_outbox::table)
.values(&outbox_entry)
.execute(conn)?;
Ok(())
})
})
.await
.map_err(|e| ValidationError::ConnectionPool(e.to_string()))??;
Ok(())
}
pub async fn claim_ready_task(
&self,
limit: usize,
) -> Result<Vec<ClaimResult>, ValidationError> {
crate::dispatch_backend!(
self.dal.backend(),
self.claim_ready_task_postgres(limit).await,
self.claim_ready_task_sqlite(limit).await
)
}
#[cfg(feature = "postgres")]
async fn claim_ready_task_postgres(
&self,
limit: usize,
) -> Result<Vec<ClaimResult>, ValidationError> {
use diesel::connection::Connection;
let conn = self
.dal
.database
.get_postgres_connection()
.await
.map_err(|e| ValidationError::ConnectionPool(e.to_string()))?;
let limit = limit as i64;
#[derive(Debug, QueryableByName, Clone)]
#[diesel(check_for_backend(diesel::pg::Pg))]
struct PgClaimResult {
#[diesel(sql_type = diesel::sql_types::Uuid)]
id: Uuid,
#[diesel(sql_type = diesel::sql_types::Uuid)]
workflow_execution_id: Uuid,
#[diesel(sql_type = diesel::sql_types::Text)]
task_name: String,
#[diesel(sql_type = diesel::sql_types::Integer)]
attempt: i32,
}
let pg_results: Vec<PgClaimResult> = conn
.interact(move |conn| {
conn.transaction::<_, diesel::result::Error, _>(|conn| {
let now = UniversalTimestamp::now();
let claimed: Vec<PgClaimResult> = diesel::sql_query(format!(
r#"
WITH claimed_outbox AS (
DELETE FROM task_outbox
WHERE id IN (
SELECT id FROM task_outbox
WHERE created_at <= NOW()
ORDER BY created_at ASC
LIMIT {}
FOR UPDATE SKIP LOCKED
)
RETURNING task_execution_id
)
UPDATE task_executions
SET status = 'Running', started_at = NOW(), updated_at = NOW()
FROM claimed_outbox
WHERE task_executions.id = claimed_outbox.task_execution_id
RETURNING task_executions.id, task_executions.workflow_execution_id, task_executions.task_name, task_executions.attempt
"#,
limit
))
.load(conn)?;
for task in &claimed {
let event = NewUnifiedExecutionEvent {
id: UniversalUuid::new_v4(),
workflow_execution_id: UniversalUuid(task.workflow_execution_id),
task_execution_id: Some(UniversalUuid(task.id)),
event_type: ExecutionEventType::TaskClaimed.as_str().to_string(),
event_data: None,
worker_id: None,
created_at: now,
};
diesel::insert_into(execution_events::table)
.values(&event)
.execute(conn)?;
}
Ok(claimed)
})
})
.await
.map_err(|e| ValidationError::ConnectionPool(e.to_string()))??;
Ok(pg_results
.into_iter()
.map(|pg| ClaimResult {
id: UniversalUuid(pg.id),
workflow_execution_id: UniversalUuid(pg.workflow_execution_id),
task_name: pg.task_name,
attempt: pg.attempt,
})
.collect())
}
#[cfg(feature = "sqlite")]
async fn claim_ready_task_sqlite(
&self,
limit: usize,
) -> Result<Vec<ClaimResult>, ValidationError> {
use crate::dal::unified::models::UnifiedTaskOutbox;
use diesel::connection::Connection;
let conn = self
.dal
.database
.get_sqlite_connection()
.await
.map_err(|e| ValidationError::ConnectionPool(e.to_string()))?;
let limit = limit as i64;
let tasks: Vec<UnifiedTaskExecution> = conn
.interact(
move |conn| -> Result<Vec<UnifiedTaskExecution>, diesel::result::Error> {
conn.transaction::<Vec<UnifiedTaskExecution>, diesel::result::Error, _>(
|conn| {
let now = UniversalTimestamp::now();
let outbox_entries: Vec<UnifiedTaskOutbox> = task_outbox::table
.filter(task_outbox::created_at.le(now))
.order(task_outbox::created_at.asc())
.limit(limit)
.load(conn)?;
if outbox_entries.is_empty() {
return Ok(Vec::new());
}
let task_ids: Vec<_> =
outbox_entries.iter().map(|o| o.task_execution_id).collect();
let outbox_ids: Vec<_> = outbox_entries.iter().map(|o| o.id).collect();
diesel::delete(task_outbox::table)
.filter(task_outbox::id.eq_any(&outbox_ids))
.execute(conn)?;
let claimed_tasks: Vec<UnifiedTaskExecution> = task_executions::table
.filter(task_executions::id.eq_any(&task_ids))
.load(conn)?;
diesel::update(task_executions::table)
.filter(task_executions::id.eq_any(&task_ids))
.set((
task_executions::status.eq("Running"),
task_executions::started_at.eq(Some(now)),
task_executions::updated_at.eq(now),
))
.execute(conn)?;
for task in &claimed_tasks {
let event = NewUnifiedExecutionEvent {
id: UniversalUuid::new_v4(),
workflow_execution_id: task.workflow_execution_id,
task_execution_id: Some(task.id),
event_type: ExecutionEventType::TaskClaimed
.as_str()
.to_string(),
event_data: None,
worker_id: None,
created_at: now,
};
diesel::insert_into(execution_events::table)
.values(&event)
.execute(conn)?;
}
Ok(claimed_tasks)
},
)
},
)
.await
.map_err(|e| ValidationError::ConnectionPool(e.to_string()))??;
Ok(tasks
.into_iter()
.map(|task| ClaimResult {
id: task.id,
workflow_execution_id: task.workflow_execution_id,
task_name: task.task_name,
attempt: task.attempt,
})
.collect())
}
pub async fn claim_for_runner(
&self,
task_id: UniversalUuid,
runner_id: UniversalUuid,
) -> Result<RunnerClaimResult, ValidationError> {
crate::dispatch_backend!(
self.dal.backend(),
self.claim_for_runner_postgres(task_id, runner_id).await,
self.claim_for_runner_sqlite(task_id, runner_id).await
)
}
#[cfg(feature = "postgres")]
async fn claim_for_runner_postgres(
&self,
task_id: UniversalUuid,
runner_id: UniversalUuid,
) -> Result<RunnerClaimResult, ValidationError> {
let conn = self
.dal
.database
.get_postgres_connection()
.await
.map_err(|e| ValidationError::ConnectionPool(e.to_string()))?;
let now = UniversalTimestamp::now();
let rows_updated: usize = conn
.interact(move |conn| {
diesel::update(
task_executions::table
.find(task_id)
.filter(task_executions::claimed_by.is_null()),
)
.set((
task_executions::claimed_by.eq(Some(runner_id)),
task_executions::heartbeat_at.eq(Some(now)),
task_executions::updated_at.eq(now),
))
.execute(conn)
})
.await
.map_err(|e| ValidationError::ConnectionPool(e.to_string()))??;
Ok(if rows_updated > 0 {
RunnerClaimResult::Claimed
} else {
RunnerClaimResult::AlreadyClaimed
})
}
#[cfg(feature = "sqlite")]
async fn claim_for_runner_sqlite(
&self,
task_id: UniversalUuid,
runner_id: UniversalUuid,
) -> Result<RunnerClaimResult, ValidationError> {
let conn = self
.dal
.database
.get_sqlite_connection()
.await
.map_err(|e| ValidationError::ConnectionPool(e.to_string()))?;
let now = UniversalTimestamp::now();
let rows_updated: usize = conn
.interact(move |conn| {
diesel::update(
task_executions::table
.find(task_id)
.filter(task_executions::claimed_by.is_null()),
)
.set((
task_executions::claimed_by.eq(Some(runner_id)),
task_executions::heartbeat_at.eq(Some(now)),
task_executions::updated_at.eq(now),
))
.execute(conn)
})
.await
.map_err(|e| ValidationError::ConnectionPool(e.to_string()))??;
Ok(if rows_updated > 0 {
RunnerClaimResult::Claimed
} else {
RunnerClaimResult::AlreadyClaimed
})
}
pub async fn heartbeat(
&self,
task_id: UniversalUuid,
runner_id: UniversalUuid,
) -> Result<HeartbeatResult, ValidationError> {
crate::dispatch_backend!(
self.dal.backend(),
self.heartbeat_postgres(task_id, runner_id).await,
self.heartbeat_sqlite(task_id, runner_id).await
)
}
#[cfg(feature = "postgres")]
async fn heartbeat_postgres(
&self,
task_id: UniversalUuid,
runner_id: UniversalUuid,
) -> Result<HeartbeatResult, ValidationError> {
let conn = self
.dal
.database
.get_postgres_connection()
.await
.map_err(|e| ValidationError::ConnectionPool(e.to_string()))?;
let now = UniversalTimestamp::now();
let rows_updated: usize = conn
.interact(move |conn| {
diesel::update(
task_executions::table
.find(task_id)
.filter(task_executions::claimed_by.eq(Some(runner_id))),
)
.set((
task_executions::heartbeat_at.eq(Some(now)),
task_executions::updated_at.eq(now),
))
.execute(conn)
})
.await
.map_err(|e| ValidationError::ConnectionPool(e.to_string()))??;
Ok(if rows_updated > 0 {
HeartbeatResult::Ok
} else {
HeartbeatResult::ClaimLost
})
}
#[cfg(feature = "sqlite")]
async fn heartbeat_sqlite(
&self,
task_id: UniversalUuid,
runner_id: UniversalUuid,
) -> Result<HeartbeatResult, ValidationError> {
let conn = self
.dal
.database
.get_sqlite_connection()
.await
.map_err(|e| ValidationError::ConnectionPool(e.to_string()))?;
let now = UniversalTimestamp::now();
let rows_updated: usize = conn
.interact(move |conn| {
diesel::update(
task_executions::table
.find(task_id)
.filter(task_executions::claimed_by.eq(Some(runner_id))),
)
.set((
task_executions::heartbeat_at.eq(Some(now)),
task_executions::updated_at.eq(now),
))
.execute(conn)
})
.await
.map_err(|e| ValidationError::ConnectionPool(e.to_string()))??;
Ok(if rows_updated > 0 {
HeartbeatResult::Ok
} else {
HeartbeatResult::ClaimLost
})
}
pub async fn release_runner_claim(
&self,
task_id: UniversalUuid,
) -> Result<(), ValidationError> {
crate::dispatch_backend!(
self.dal.backend(),
self.release_runner_claim_postgres(task_id).await,
self.release_runner_claim_sqlite(task_id).await
)
}
#[cfg(feature = "postgres")]
async fn release_runner_claim_postgres(
&self,
task_id: UniversalUuid,
) -> Result<(), ValidationError> {
let conn = self
.dal
.database
.get_postgres_connection()
.await
.map_err(|e| ValidationError::ConnectionPool(e.to_string()))?;
let now = UniversalTimestamp::now();
conn.interact(move |conn| {
diesel::update(task_executions::table.find(task_id))
.set((
task_executions::claimed_by.eq(None::<UniversalUuid>),
task_executions::heartbeat_at.eq(None::<UniversalTimestamp>),
task_executions::updated_at.eq(now),
))
.execute(conn)
})
.await
.map_err(|e| ValidationError::ConnectionPool(e.to_string()))??;
Ok(())
}
#[cfg(feature = "sqlite")]
async fn release_runner_claim_sqlite(
&self,
task_id: UniversalUuid,
) -> Result<(), ValidationError> {
let conn = self
.dal
.database
.get_sqlite_connection()
.await
.map_err(|e| ValidationError::ConnectionPool(e.to_string()))?;
let now = UniversalTimestamp::now();
conn.interact(move |conn| {
diesel::update(task_executions::table.find(task_id))
.set((
task_executions::claimed_by.eq(None::<UniversalUuid>),
task_executions::heartbeat_at.eq(None::<UniversalTimestamp>),
task_executions::updated_at.eq(now),
))
.execute(conn)
})
.await
.map_err(|e| ValidationError::ConnectionPool(e.to_string()))??;
Ok(())
}
pub async fn find_stale_claims(
&self,
threshold: std::time::Duration,
) -> Result<Vec<StaleClaim>, ValidationError> {
crate::dispatch_backend!(
self.dal.backend(),
self.find_stale_claims_postgres(threshold).await,
self.find_stale_claims_sqlite(threshold).await
)
}
#[cfg(feature = "postgres")]
async fn find_stale_claims_postgres(
&self,
threshold: std::time::Duration,
) -> Result<Vec<StaleClaim>, ValidationError> {
let conn = self
.dal
.database
.get_postgres_connection()
.await
.map_err(|e| ValidationError::ConnectionPool(e.to_string()))?;
let cutoff = UniversalTimestamp(
chrono::Utc::now()
- chrono::Duration::from_std(threshold).unwrap_or(chrono::Duration::seconds(60)),
);
let stale: Vec<UnifiedTaskExecution> = conn
.interact(move |conn| {
task_executions::table
.filter(task_executions::claimed_by.is_not_null())
.filter(task_executions::heartbeat_at.lt(Some(cutoff)))
.filter(task_executions::status.eq("Running"))
.load(conn)
})
.await
.map_err(|e| ValidationError::ConnectionPool(e.to_string()))??;
Ok(stale
.into_iter()
.filter_map(|t| {
Some(StaleClaim {
task_id: t.id,
claimed_by: t.claimed_by?,
heartbeat_at: t.heartbeat_at?.0,
})
})
.collect())
}
#[cfg(feature = "sqlite")]
async fn find_stale_claims_sqlite(
&self,
threshold: std::time::Duration,
) -> Result<Vec<StaleClaim>, ValidationError> {
let conn = self
.dal
.database
.get_sqlite_connection()
.await
.map_err(|e| ValidationError::ConnectionPool(e.to_string()))?;
let cutoff = UniversalTimestamp(
chrono::Utc::now()
- chrono::Duration::from_std(threshold).unwrap_or(chrono::Duration::seconds(60)),
);
let stale: Vec<UnifiedTaskExecution> = conn
.interact(move |conn| {
task_executions::table
.filter(task_executions::claimed_by.is_not_null())
.filter(task_executions::heartbeat_at.lt(Some(cutoff)))
.filter(task_executions::status.eq("Running"))
.load(conn)
})
.await
.map_err(|e| ValidationError::ConnectionPool(e.to_string()))??;
Ok(stale
.into_iter()
.filter_map(|t| {
Some(StaleClaim {
task_id: t.id,
claimed_by: t.claimed_by?,
heartbeat_at: t.heartbeat_at?.0,
})
})
.collect())
}
pub async fn get_ready_for_retry(&self) -> Result<Vec<TaskExecution>, ValidationError> {
crate::dispatch_backend!(
self.dal.backend(),
self.get_ready_for_retry_postgres().await,
self.get_ready_for_retry_sqlite().await
)
}
#[cfg(feature = "postgres")]
async fn get_ready_for_retry_postgres(&self) -> Result<Vec<TaskExecution>, ValidationError> {
let conn = self
.dal
.database
.get_postgres_connection()
.await
.map_err(|e| ValidationError::ConnectionPool(e.to_string()))?;
let now = UniversalTimestamp::now();
let ready_tasks: Vec<UnifiedTaskExecution> = conn
.interact(move |conn| {
task_executions::table
.filter(task_executions::status.eq("Ready"))
.filter(
task_executions::retry_at
.is_null()
.or(task_executions::retry_at.le(now)),
)
.load(conn)
})
.await
.map_err(|e| ValidationError::ConnectionPool(e.to_string()))??;
Ok(ready_tasks.into_iter().map(Into::into).collect())
}
#[cfg(feature = "sqlite")]
async fn get_ready_for_retry_sqlite(&self) -> Result<Vec<TaskExecution>, ValidationError> {
let conn = self
.dal
.database
.get_sqlite_connection()
.await
.map_err(|e| ValidationError::ConnectionPool(e.to_string()))?;
let now = UniversalTimestamp::now();
let ready_tasks: Vec<UnifiedTaskExecution> = conn
.interact(move |conn| {
task_executions::table
.filter(task_executions::status.eq("Ready"))
.filter(
task_executions::retry_at
.is_null()
.or(task_executions::retry_at.le(now)),
)
.load(conn)
})
.await
.map_err(|e| ValidationError::ConnectionPool(e.to_string()))??;
Ok(ready_tasks.into_iter().map(Into::into).collect())
}
}