use super::TaskExecutionDAL;
use crate::dal::unified::models::{
NewUnifiedExecutionEvent, NewUnifiedTaskOutbox, UnifiedTaskExecution,
};
use crate::database::schema::unified::{execution_events, task_executions, task_outbox};
use crate::database::universal_types::{UniversalTimestamp, UniversalUuid};
use crate::error::ValidationError;
use crate::models::execution_event::ExecutionEventType;
use diesel::prelude::*;
impl<'a> TaskExecutionDAL<'a> {
pub async fn mark_completed(&self, task_id: UniversalUuid) -> Result<(), ValidationError> {
crate::dispatch_backend!(
self.dal.backend(),
self.mark_completed_postgres(task_id).await,
self.mark_completed_sqlite(task_id).await
)
}
#[cfg(feature = "postgres")]
async fn mark_completed_postgres(&self, task_id: UniversalUuid) -> Result<(), ValidationError> {
use diesel::connection::Connection;
let conn = self
.dal
.database
.get_postgres_connection()
.await
.map_err(|e| ValidationError::ConnectionPool(e.to_string()))?;
conn.interact(move |conn| {
conn.transaction::<_, diesel::result::Error, _>(|conn| {
let now = UniversalTimestamp::now();
let task: UnifiedTaskExecution =
task_executions::table.find(task_id).first(conn)?;
diesel::update(task_executions::table.find(task_id))
.set((
task_executions::status.eq("Completed"),
task_executions::completed_at.eq(Some(now)),
task_executions::updated_at.eq(now),
))
.execute(conn)?;
let event = NewUnifiedExecutionEvent {
id: UniversalUuid::new_v4(),
pipeline_execution_id: task.pipeline_execution_id,
task_execution_id: Some(task_id),
event_type: ExecutionEventType::TaskCompleted.as_str().to_string(),
event_data: None,
worker_id: None,
created_at: now,
};
diesel::insert_into(execution_events::table)
.values(&event)
.execute(conn)?;
Ok(())
})
})
.await
.map_err(|e| ValidationError::ConnectionPool(e.to_string()))??;
Ok(())
}
#[cfg(feature = "sqlite")]
async fn mark_completed_sqlite(&self, task_id: UniversalUuid) -> Result<(), ValidationError> {
use diesel::connection::Connection;
let conn = self
.dal
.database
.get_sqlite_connection()
.await
.map_err(|e| ValidationError::ConnectionPool(e.to_string()))?;
conn.interact(move |conn| {
conn.transaction::<_, diesel::result::Error, _>(|conn| {
let now = UniversalTimestamp::now();
let task: UnifiedTaskExecution =
task_executions::table.find(task_id).first(conn)?;
diesel::update(task_executions::table.find(task_id))
.set((
task_executions::status.eq("Completed"),
task_executions::completed_at.eq(Some(now)),
task_executions::updated_at.eq(now),
))
.execute(conn)?;
let event = NewUnifiedExecutionEvent {
id: UniversalUuid::new_v4(),
pipeline_execution_id: task.pipeline_execution_id,
task_execution_id: Some(task_id),
event_type: ExecutionEventType::TaskCompleted.as_str().to_string(),
event_data: None,
worker_id: None,
created_at: now,
};
diesel::insert_into(execution_events::table)
.values(&event)
.execute(conn)?;
Ok(())
})
})
.await
.map_err(|e| ValidationError::ConnectionPool(e.to_string()))??;
Ok(())
}
pub async fn mark_failed(
&self,
task_id: UniversalUuid,
error_message: &str,
) -> Result<(), ValidationError> {
crate::dispatch_backend!(
self.dal.backend(),
self.mark_failed_postgres(task_id, error_message).await,
self.mark_failed_sqlite(task_id, error_message).await
)
}
#[cfg(feature = "postgres")]
async fn mark_failed_postgres(
&self,
task_id: UniversalUuid,
error_message: &str,
) -> Result<(), ValidationError> {
use diesel::connection::Connection;
let conn = self
.dal
.database
.get_postgres_connection()
.await
.map_err(|e| ValidationError::ConnectionPool(e.to_string()))?;
let error_message = error_message.to_string();
conn.interact(move |conn| {
conn.transaction::<_, diesel::result::Error, _>(|conn| {
let now = UniversalTimestamp::now();
let task: UnifiedTaskExecution =
task_executions::table.find(task_id).first(conn)?;
diesel::update(task_executions::table.find(task_id))
.set((
task_executions::status.eq("Failed"),
task_executions::completed_at.eq(Some(now)),
task_executions::last_error.eq(&error_message),
task_executions::updated_at.eq(now),
))
.execute(conn)?;
let event_data = serde_json::json!({ "error": error_message }).to_string();
let event = NewUnifiedExecutionEvent {
id: UniversalUuid::new_v4(),
pipeline_execution_id: task.pipeline_execution_id,
task_execution_id: Some(task_id),
event_type: ExecutionEventType::TaskFailed.as_str().to_string(),
event_data: Some(event_data),
worker_id: None,
created_at: now,
};
diesel::insert_into(execution_events::table)
.values(&event)
.execute(conn)?;
Ok(())
})
})
.await
.map_err(|e| ValidationError::ConnectionPool(e.to_string()))??;
Ok(())
}
#[cfg(feature = "sqlite")]
async fn mark_failed_sqlite(
&self,
task_id: UniversalUuid,
error_message: &str,
) -> Result<(), ValidationError> {
use diesel::connection::Connection;
let conn = self
.dal
.database
.get_sqlite_connection()
.await
.map_err(|e| ValidationError::ConnectionPool(e.to_string()))?;
let error_message = error_message.to_string();
conn.interact(move |conn| {
conn.transaction::<_, diesel::result::Error, _>(|conn| {
let now = UniversalTimestamp::now();
let task: UnifiedTaskExecution =
task_executions::table.find(task_id).first(conn)?;
diesel::update(task_executions::table.find(task_id))
.set((
task_executions::status.eq("Failed"),
task_executions::completed_at.eq(Some(now)),
task_executions::last_error.eq(&error_message),
task_executions::updated_at.eq(now),
))
.execute(conn)?;
let event_data = serde_json::json!({ "error": error_message }).to_string();
let event = NewUnifiedExecutionEvent {
id: UniversalUuid::new_v4(),
pipeline_execution_id: task.pipeline_execution_id,
task_execution_id: Some(task_id),
event_type: ExecutionEventType::TaskFailed.as_str().to_string(),
event_data: Some(event_data),
worker_id: None,
created_at: now,
};
diesel::insert_into(execution_events::table)
.values(&event)
.execute(conn)?;
Ok(())
})
})
.await
.map_err(|e| ValidationError::ConnectionPool(e.to_string()))??;
Ok(())
}
pub async fn mark_ready(&self, task_id: UniversalUuid) -> Result<(), ValidationError> {
crate::dispatch_backend!(
self.dal.backend(),
self.mark_ready_postgres(task_id).await,
self.mark_ready_sqlite(task_id).await
)
}
#[cfg(feature = "postgres")]
async fn mark_ready_postgres(&self, task_id: UniversalUuid) -> Result<(), ValidationError> {
use diesel::connection::Connection;
let conn = self
.dal
.database
.get_postgres_connection()
.await
.map_err(|e| ValidationError::ConnectionPool(e.to_string()))?;
conn.interact(move |conn| {
conn.transaction::<_, diesel::result::Error, _>(|conn| {
let now = UniversalTimestamp::now();
let task: UnifiedTaskExecution =
task_executions::table.find(task_id).first(conn)?;
diesel::update(task_executions::table.find(task_id))
.set((
task_executions::status.eq("Ready"),
task_executions::updated_at.eq(now),
))
.execute(conn)?;
let event = NewUnifiedExecutionEvent {
id: UniversalUuid::new_v4(),
pipeline_execution_id: task.pipeline_execution_id,
task_execution_id: Some(task_id),
event_type: ExecutionEventType::TaskMarkedReady.as_str().to_string(),
event_data: None,
worker_id: None,
created_at: now,
};
diesel::insert_into(execution_events::table)
.values(&event)
.execute(conn)?;
let outbox_entry = NewUnifiedTaskOutbox {
task_execution_id: task_id,
created_at: now,
};
diesel::insert_into(task_outbox::table)
.values(&outbox_entry)
.execute(conn)?;
Ok(())
})
})
.await
.map_err(|e| ValidationError::ConnectionPool(e.to_string()))??;
tracing::debug!(task_id = %task_id, "Task marked as Ready with outbox entry");
Ok(())
}
#[cfg(feature = "sqlite")]
async fn mark_ready_sqlite(&self, task_id: UniversalUuid) -> Result<(), ValidationError> {
use diesel::connection::Connection;
let conn = self
.dal
.database
.get_sqlite_connection()
.await
.map_err(|e| ValidationError::ConnectionPool(e.to_string()))?;
conn.interact(move |conn| {
conn.transaction::<_, diesel::result::Error, _>(|conn| {
let now = UniversalTimestamp::now();
let task: UnifiedTaskExecution =
task_executions::table.find(task_id).first(conn)?;
diesel::update(task_executions::table.find(task_id))
.set((
task_executions::status.eq("Ready"),
task_executions::updated_at.eq(now),
))
.execute(conn)?;
let event = NewUnifiedExecutionEvent {
id: UniversalUuid::new_v4(),
pipeline_execution_id: task.pipeline_execution_id,
task_execution_id: Some(task_id),
event_type: ExecutionEventType::TaskMarkedReady.as_str().to_string(),
event_data: None,
worker_id: None,
created_at: now,
};
diesel::insert_into(execution_events::table)
.values(&event)
.execute(conn)?;
let outbox_entry = NewUnifiedTaskOutbox {
task_execution_id: task_id,
created_at: now,
};
diesel::insert_into(task_outbox::table)
.values(&outbox_entry)
.execute(conn)?;
Ok(())
})
})
.await
.map_err(|e| ValidationError::ConnectionPool(e.to_string()))??;
tracing::debug!(task_id = %task_id, "Task marked as Ready with outbox entry");
Ok(())
}
pub async fn mark_skipped(
&self,
task_id: UniversalUuid,
reason: &str,
) -> Result<(), ValidationError> {
crate::dispatch_backend!(
self.dal.backend(),
self.mark_skipped_postgres(task_id, reason).await,
self.mark_skipped_sqlite(task_id, reason).await
)
}
#[cfg(feature = "postgres")]
async fn mark_skipped_postgres(
&self,
task_id: UniversalUuid,
reason: &str,
) -> Result<(), ValidationError> {
use diesel::connection::Connection;
let conn = self
.dal
.database
.get_postgres_connection()
.await
.map_err(|e| ValidationError::ConnectionPool(e.to_string()))?;
let reason = reason.to_string();
let reason_log = reason.clone();
conn.interact(move |conn| {
conn.transaction::<_, diesel::result::Error, _>(|conn| {
let now = UniversalTimestamp::now();
let task: UnifiedTaskExecution =
task_executions::table.find(task_id).first(conn)?;
diesel::update(task_executions::table.find(task_id))
.set((
task_executions::status.eq("Skipped"),
task_executions::error_details.eq(&reason),
task_executions::updated_at.eq(now),
))
.execute(conn)?;
let event_data = serde_json::json!({ "reason": reason }).to_string();
let event = NewUnifiedExecutionEvent {
id: UniversalUuid::new_v4(),
pipeline_execution_id: task.pipeline_execution_id,
task_execution_id: Some(task_id),
event_type: ExecutionEventType::TaskSkipped.as_str().to_string(),
event_data: Some(event_data),
worker_id: None,
created_at: now,
};
diesel::insert_into(execution_events::table)
.values(&event)
.execute(conn)?;
Ok(())
})
})
.await
.map_err(|e| ValidationError::ConnectionPool(e.to_string()))??;
tracing::info!(task_id = %task_id, reason = %reason_log, "Task marked as Skipped");
Ok(())
}
#[cfg(feature = "sqlite")]
async fn mark_skipped_sqlite(
&self,
task_id: UniversalUuid,
reason: &str,
) -> Result<(), ValidationError> {
use diesel::connection::Connection;
let conn = self
.dal
.database
.get_sqlite_connection()
.await
.map_err(|e| ValidationError::ConnectionPool(e.to_string()))?;
let reason = reason.to_string();
let reason_log = reason.clone();
conn.interact(move |conn| {
conn.transaction::<_, diesel::result::Error, _>(|conn| {
let now = UniversalTimestamp::now();
let task: UnifiedTaskExecution =
task_executions::table.find(task_id).first(conn)?;
diesel::update(task_executions::table.find(task_id))
.set((
task_executions::status.eq("Skipped"),
task_executions::error_details.eq(&reason),
task_executions::updated_at.eq(now),
))
.execute(conn)?;
let event_data = serde_json::json!({ "reason": reason }).to_string();
let event = NewUnifiedExecutionEvent {
id: UniversalUuid::new_v4(),
pipeline_execution_id: task.pipeline_execution_id,
task_execution_id: Some(task_id),
event_type: ExecutionEventType::TaskSkipped.as_str().to_string(),
event_data: Some(event_data),
worker_id: None,
created_at: now,
};
diesel::insert_into(execution_events::table)
.values(&event)
.execute(conn)?;
Ok(())
})
})
.await
.map_err(|e| ValidationError::ConnectionPool(e.to_string()))??;
tracing::info!(task_id = %task_id, reason = %reason_log, "Task marked as Skipped");
Ok(())
}
pub async fn mark_abandoned(
&self,
task_id: UniversalUuid,
reason: &str,
) -> Result<(), ValidationError> {
crate::dispatch_backend!(
self.dal.backend(),
self.mark_abandoned_postgres(task_id, reason).await,
self.mark_abandoned_sqlite(task_id, reason).await
)
}
#[cfg(feature = "postgres")]
async fn mark_abandoned_postgres(
&self,
task_id: UniversalUuid,
reason: &str,
) -> Result<(), ValidationError> {
use diesel::connection::Connection;
let conn = self
.dal
.database
.get_postgres_connection()
.await
.map_err(|e| ValidationError::ConnectionPool(e.to_string()))?;
let reason = reason.to_string();
conn.interact(move |conn| {
conn.transaction::<_, diesel::result::Error, _>(|conn| {
let now = UniversalTimestamp::now();
let task: UnifiedTaskExecution =
task_executions::table.find(task_id).first(conn)?;
diesel::update(task_executions::table.find(task_id))
.set((
task_executions::status.eq("Failed"),
task_executions::completed_at.eq(Some(now)),
task_executions::error_details.eq(format!("ABANDONED: {}", reason)),
task_executions::updated_at.eq(now),
))
.execute(conn)?;
let event_data = serde_json::json!({ "reason": reason }).to_string();
let event = NewUnifiedExecutionEvent {
id: UniversalUuid::new_v4(),
pipeline_execution_id: task.pipeline_execution_id,
task_execution_id: Some(task_id),
event_type: ExecutionEventType::TaskAbandoned.as_str().to_string(),
event_data: Some(event_data),
worker_id: None,
created_at: now,
};
diesel::insert_into(execution_events::table)
.values(&event)
.execute(conn)?;
Ok(())
})
})
.await
.map_err(|e| ValidationError::ConnectionPool(e.to_string()))??;
Ok(())
}
#[cfg(feature = "sqlite")]
async fn mark_abandoned_sqlite(
&self,
task_id: UniversalUuid,
reason: &str,
) -> Result<(), ValidationError> {
use diesel::connection::Connection;
let conn = self
.dal
.database
.get_sqlite_connection()
.await
.map_err(|e| ValidationError::ConnectionPool(e.to_string()))?;
let reason = reason.to_string();
conn.interact(move |conn| {
conn.transaction::<_, diesel::result::Error, _>(|conn| {
let now = UniversalTimestamp::now();
let task: UnifiedTaskExecution =
task_executions::table.find(task_id).first(conn)?;
diesel::update(task_executions::table.find(task_id))
.set((
task_executions::status.eq("Failed"),
task_executions::completed_at.eq(Some(now)),
task_executions::error_details.eq(format!("ABANDONED: {}", reason)),
task_executions::updated_at.eq(now),
))
.execute(conn)?;
let event_data = serde_json::json!({ "reason": reason }).to_string();
let event = NewUnifiedExecutionEvent {
id: UniversalUuid::new_v4(),
pipeline_execution_id: task.pipeline_execution_id,
task_execution_id: Some(task_id),
event_type: ExecutionEventType::TaskAbandoned.as_str().to_string(),
event_data: Some(event_data),
worker_id: None,
created_at: now,
};
diesel::insert_into(execution_events::table)
.values(&event)
.execute(conn)?;
Ok(())
})
})
.await
.map_err(|e| ValidationError::ConnectionPool(e.to_string()))??;
Ok(())
}
pub async fn set_sub_status(
&self,
task_id: UniversalUuid,
sub_status: Option<&str>,
) -> Result<(), ValidationError> {
crate::dispatch_backend!(
self.dal.backend(),
self.set_sub_status_postgres(task_id, sub_status).await,
self.set_sub_status_sqlite(task_id, sub_status).await
)
}
#[cfg(feature = "postgres")]
async fn set_sub_status_postgres(
&self,
task_id: UniversalUuid,
sub_status: Option<&str>,
) -> Result<(), ValidationError> {
use diesel::connection::Connection;
let conn = self
.dal
.database
.get_postgres_connection()
.await
.map_err(|e| ValidationError::ConnectionPool(e.to_string()))?;
let sub_status_owned = sub_status.map(|s| s.to_string());
conn.interact(move |conn| {
conn.transaction::<_, diesel::result::Error, _>(|conn| {
let now = UniversalTimestamp::now();
let task: UnifiedTaskExecution =
task_executions::table.find(task_id).first(conn)?;
let was_deferred = task.sub_status.as_deref() == Some("Deferred");
diesel::update(task_executions::table.find(task_id))
.set((
task_executions::sub_status.eq(&sub_status_owned),
task_executions::updated_at.eq(now),
))
.execute(conn)?;
let event_type = match (was_deferred, sub_status_owned.as_deref()) {
(false, Some("Deferred")) => Some(ExecutionEventType::TaskDeferred),
(true, Some("Active") | None) => Some(ExecutionEventType::TaskResumed),
_ => None,
};
if let Some(event_type) = event_type {
let event = NewUnifiedExecutionEvent {
id: UniversalUuid::new_v4(),
pipeline_execution_id: task.pipeline_execution_id,
task_execution_id: Some(task_id),
event_type: event_type.as_str().to_string(),
event_data: None,
worker_id: None,
created_at: now,
};
diesel::insert_into(execution_events::table)
.values(&event)
.execute(conn)?;
}
Ok(())
})
})
.await
.map_err(|e| ValidationError::ConnectionPool(e.to_string()))??;
Ok(())
}
#[cfg(feature = "sqlite")]
async fn set_sub_status_sqlite(
&self,
task_id: UniversalUuid,
sub_status: Option<&str>,
) -> Result<(), ValidationError> {
use diesel::connection::Connection;
let conn = self
.dal
.database
.get_sqlite_connection()
.await
.map_err(|e| ValidationError::ConnectionPool(e.to_string()))?;
let sub_status_owned = sub_status.map(|s| s.to_string());
conn.interact(move |conn| {
conn.transaction::<_, diesel::result::Error, _>(|conn| {
let now = UniversalTimestamp::now();
let task: UnifiedTaskExecution =
task_executions::table.find(task_id).first(conn)?;
let was_deferred = task.sub_status.as_deref() == Some("Deferred");
diesel::update(task_executions::table.find(task_id))
.set((
task_executions::sub_status.eq(&sub_status_owned),
task_executions::updated_at.eq(now),
))
.execute(conn)?;
let event_type = match (was_deferred, sub_status_owned.as_deref()) {
(false, Some("Deferred")) => Some(ExecutionEventType::TaskDeferred),
(true, Some("Active") | None) => Some(ExecutionEventType::TaskResumed),
_ => None,
};
if let Some(event_type) = event_type {
let event = NewUnifiedExecutionEvent {
id: UniversalUuid::new_v4(),
pipeline_execution_id: task.pipeline_execution_id,
task_execution_id: Some(task_id),
event_type: event_type.as_str().to_string(),
event_data: None,
worker_id: None,
created_at: now,
};
diesel::insert_into(execution_events::table)
.values(&event)
.execute(conn)?;
}
Ok(())
})
})
.await
.map_err(|e| ValidationError::ConnectionPool(e.to_string()))??;
Ok(())
}
pub async fn reset_retry_state(&self, task_id: UniversalUuid) -> Result<(), ValidationError> {
crate::dispatch_backend!(
self.dal.backend(),
self.reset_retry_state_postgres(task_id).await,
self.reset_retry_state_sqlite(task_id).await
)
}
#[cfg(feature = "postgres")]
async fn reset_retry_state_postgres(
&self,
task_id: UniversalUuid,
) -> Result<(), ValidationError> {
use diesel::connection::Connection;
let conn = self
.dal
.database
.get_postgres_connection()
.await
.map_err(|e| ValidationError::ConnectionPool(e.to_string()))?;
conn.interact(move |conn| {
conn.transaction::<_, diesel::result::Error, _>(|conn| {
let now = UniversalTimestamp::now();
let task: UnifiedTaskExecution =
task_executions::table.find(task_id).first(conn)?;
diesel::update(task_executions::table.find(task_id))
.set((
task_executions::attempt.eq(1),
task_executions::retry_at.eq(None::<UniversalTimestamp>),
task_executions::started_at.eq(None::<UniversalTimestamp>),
task_executions::completed_at.eq(None::<UniversalTimestamp>),
task_executions::last_error.eq(None::<String>),
task_executions::status.eq("Ready"),
task_executions::updated_at.eq(now),
))
.execute(conn)?;
let event = NewUnifiedExecutionEvent {
id: UniversalUuid::new_v4(),
pipeline_execution_id: task.pipeline_execution_id,
task_execution_id: Some(task_id),
event_type: ExecutionEventType::TaskReset.as_str().to_string(),
event_data: None,
worker_id: None,
created_at: now,
};
diesel::insert_into(execution_events::table)
.values(&event)
.execute(conn)?;
Ok(())
})
})
.await
.map_err(|e| ValidationError::ConnectionPool(e.to_string()))??;
Ok(())
}
#[cfg(feature = "sqlite")]
async fn reset_retry_state_sqlite(
&self,
task_id: UniversalUuid,
) -> Result<(), ValidationError> {
use diesel::connection::Connection;
let conn = self
.dal
.database
.get_sqlite_connection()
.await
.map_err(|e| ValidationError::ConnectionPool(e.to_string()))?;
conn.interact(move |conn| {
conn.transaction::<_, diesel::result::Error, _>(|conn| {
let now = UniversalTimestamp::now();
let task: UnifiedTaskExecution =
task_executions::table.find(task_id).first(conn)?;
diesel::update(task_executions::table.find(task_id))
.set((
task_executions::attempt.eq(1),
task_executions::retry_at.eq(None::<UniversalTimestamp>),
task_executions::started_at.eq(None::<UniversalTimestamp>),
task_executions::completed_at.eq(None::<UniversalTimestamp>),
task_executions::last_error.eq(None::<String>),
task_executions::status.eq("Ready"),
task_executions::updated_at.eq(now),
))
.execute(conn)?;
let event = NewUnifiedExecutionEvent {
id: UniversalUuid::new_v4(),
pipeline_execution_id: task.pipeline_execution_id,
task_execution_id: Some(task_id),
event_type: ExecutionEventType::TaskReset.as_str().to_string(),
event_data: None,
worker_id: None,
created_at: now,
};
diesel::insert_into(execution_events::table)
.values(&event)
.execute(conn)?;
Ok(())
})
})
.await
.map_err(|e| ValidationError::ConnectionPool(e.to_string()))??;
Ok(())
}
}