use super::TaskExecutionDAL;
use crate::dal::unified::models::{
NewUnifiedExecutionEvent, NewUnifiedTaskExecution, UnifiedTaskExecution,
};
use crate::database::schema::unified::{execution_events, task_executions};
use crate::database::universal_types::{UniversalTimestamp, UniversalUuid};
use crate::error::ValidationError;
use crate::models::execution_event::ExecutionEventType;
use crate::models::task_execution::{NewTaskExecution, TaskExecution};
use diesel::prelude::*;
impl<'a> TaskExecutionDAL<'a> {
pub async fn create(
&self,
new_task: NewTaskExecution,
) -> Result<TaskExecution, ValidationError> {
crate::dispatch_backend!(
self.dal.backend(),
self.create_postgres(new_task).await,
self.create_sqlite(new_task).await
)
}
#[cfg(feature = "postgres")]
async fn create_postgres(
&self,
new_task: NewTaskExecution,
) -> Result<TaskExecution, ValidationError> {
use diesel::connection::Connection;
let conn = self
.dal
.database
.get_postgres_connection()
.await
.map_err(|e| ValidationError::ConnectionPool(e.to_string()))?;
let task: UnifiedTaskExecution = conn
.interact(move |conn| {
conn.transaction::<_, diesel::result::Error, _>(|conn| {
let id = UniversalUuid::new_v4();
let now = UniversalTimestamp::now();
let new_unified_task = NewUnifiedTaskExecution {
id,
workflow_execution_id: new_task.workflow_execution_id,
task_name: new_task.task_name,
status: new_task.status,
attempt: new_task.attempt,
max_attempts: new_task.max_attempts,
trigger_rules: new_task.trigger_rules,
task_configuration: new_task.task_configuration,
created_at: now,
updated_at: now,
};
let task: UnifiedTaskExecution = diesel::insert_into(task_executions::table)
.values(&new_unified_task)
.get_result(conn)?;
let event = NewUnifiedExecutionEvent {
id: UniversalUuid::new_v4(),
workflow_execution_id: task.workflow_execution_id,
task_execution_id: Some(task.id),
event_type: ExecutionEventType::TaskCreated.as_str().to_string(),
event_data: None,
worker_id: None,
created_at: now,
};
diesel::insert_into(execution_events::table)
.values(&event)
.execute(conn)?;
Ok(task)
})
})
.await
.map_err(|e| ValidationError::ConnectionPool(e.to_string()))??;
Ok(task.into())
}
#[cfg(feature = "sqlite")]
async fn create_sqlite(
&self,
new_task: NewTaskExecution,
) -> Result<TaskExecution, ValidationError> {
use diesel::connection::Connection;
let conn = self
.dal
.database
.get_sqlite_connection()
.await
.map_err(|e| ValidationError::ConnectionPool(e.to_string()))?;
let task: UnifiedTaskExecution = conn
.interact(move |conn| {
conn.transaction::<_, diesel::result::Error, _>(|conn| {
let id = UniversalUuid::new_v4();
let now = UniversalTimestamp::now();
let new_unified_task = NewUnifiedTaskExecution {
id,
workflow_execution_id: new_task.workflow_execution_id,
task_name: new_task.task_name,
status: new_task.status,
attempt: new_task.attempt,
max_attempts: new_task.max_attempts,
trigger_rules: new_task.trigger_rules,
task_configuration: new_task.task_configuration,
created_at: now,
updated_at: now,
};
let task: UnifiedTaskExecution = diesel::insert_into(task_executions::table)
.values(&new_unified_task)
.get_result(conn)?;
let event = NewUnifiedExecutionEvent {
id: UniversalUuid::new_v4(),
workflow_execution_id: task.workflow_execution_id,
task_execution_id: Some(task.id),
event_type: ExecutionEventType::TaskCreated.as_str().to_string(),
event_data: None,
worker_id: None,
created_at: now,
};
diesel::insert_into(execution_events::table)
.values(&event)
.execute(conn)?;
Ok(task)
})
})
.await
.map_err(|e| ValidationError::ConnectionPool(e.to_string()))??;
Ok(task.into())
}
pub async fn get_by_id(
&self,
task_id: UniversalUuid,
) -> Result<TaskExecution, ValidationError> {
crate::dispatch_backend!(
self.dal.backend(),
self.get_by_id_postgres(task_id).await,
self.get_by_id_sqlite(task_id).await
)
}
#[cfg(feature = "postgres")]
async fn get_by_id_postgres(
&self,
task_id: UniversalUuid,
) -> Result<TaskExecution, ValidationError> {
let conn = self
.dal
.database
.get_postgres_connection()
.await
.map_err(|e| ValidationError::ConnectionPool(e.to_string()))?;
let task: UnifiedTaskExecution = conn
.interact(move |conn| task_executions::table.find(task_id).first(conn))
.await
.map_err(|e| ValidationError::ConnectionPool(e.to_string()))??;
Ok(task.into())
}
#[cfg(feature = "sqlite")]
async fn get_by_id_sqlite(
&self,
task_id: UniversalUuid,
) -> Result<TaskExecution, ValidationError> {
let conn = self
.dal
.database
.get_sqlite_connection()
.await
.map_err(|e| ValidationError::ConnectionPool(e.to_string()))?;
let task: UnifiedTaskExecution = conn
.interact(move |conn| task_executions::table.find(task_id).first(conn))
.await
.map_err(|e| ValidationError::ConnectionPool(e.to_string()))??;
Ok(task.into())
}
pub async fn get_all_tasks_for_workflow(
&self,
workflow_execution_id: UniversalUuid,
) -> Result<Vec<TaskExecution>, ValidationError> {
crate::dispatch_backend!(
self.dal.backend(),
self.get_all_tasks_for_workflow_postgres(workflow_execution_id)
.await,
self.get_all_tasks_for_workflow_sqlite(workflow_execution_id)
.await
)
}
#[cfg(feature = "postgres")]
async fn get_all_tasks_for_workflow_postgres(
&self,
workflow_execution_id: UniversalUuid,
) -> Result<Vec<TaskExecution>, ValidationError> {
let conn = self
.dal
.database
.get_postgres_connection()
.await
.map_err(|e| ValidationError::ConnectionPool(e.to_string()))?;
let tasks: Vec<UnifiedTaskExecution> = conn
.interact(move |conn| {
task_executions::table
.filter(task_executions::workflow_execution_id.eq(workflow_execution_id))
.load(conn)
})
.await
.map_err(|e| ValidationError::ConnectionPool(e.to_string()))??;
Ok(tasks.into_iter().map(Into::into).collect())
}
#[cfg(feature = "sqlite")]
async fn get_all_tasks_for_workflow_sqlite(
&self,
workflow_execution_id: UniversalUuid,
) -> Result<Vec<TaskExecution>, ValidationError> {
let conn = self
.dal
.database
.get_sqlite_connection()
.await
.map_err(|e| ValidationError::ConnectionPool(e.to_string()))?;
let tasks: Vec<UnifiedTaskExecution> = conn
.interact(move |conn| {
task_executions::table
.filter(task_executions::workflow_execution_id.eq(workflow_execution_id))
.load(conn)
})
.await
.map_err(|e| ValidationError::ConnectionPool(e.to_string()))??;
Ok(tasks.into_iter().map(Into::into).collect())
}
}