mod crud;
use super::DAL;
use crate::database::universal_types::UniversalUuid;
use crate::error::ValidationError;
use crate::models::schedule::{NewScheduleExecution, ScheduleExecution};
use chrono::{DateTime, Utc};
#[derive(Debug)]
pub struct ScheduleExecutionStats {
pub total_executions: i64,
pub successful_executions: i64,
pub lost_executions: i64,
pub success_rate: f64,
}
#[derive(Clone)]
pub struct ScheduleExecutionDAL<'a> {
dal: &'a DAL,
}
impl<'a> ScheduleExecutionDAL<'a> {
pub fn new(dal: &'a DAL) -> Self {
Self { dal }
}
pub async fn create(
&self,
new_execution: NewScheduleExecution,
) -> Result<ScheduleExecution, ValidationError> {
crate::dispatch_backend!(
self.dal.backend(),
self.create_postgres(new_execution).await,
self.create_sqlite(new_execution).await
)
}
pub async fn get_by_id(&self, id: UniversalUuid) -> Result<ScheduleExecution, ValidationError> {
crate::dispatch_backend!(
self.dal.backend(),
self.get_by_id_postgres(id).await,
self.get_by_id_sqlite(id).await
)
}
pub async fn list_by_schedule(
&self,
schedule_id: UniversalUuid,
limit: i64,
offset: i64,
) -> Result<Vec<ScheduleExecution>, ValidationError> {
crate::dispatch_backend!(
self.dal.backend(),
self.list_by_schedule_postgres(schedule_id, limit, offset)
.await,
self.list_by_schedule_sqlite(schedule_id, limit, offset)
.await
)
}
pub async fn complete(
&self,
id: UniversalUuid,
completed_at: DateTime<Utc>,
) -> Result<(), ValidationError> {
crate::dispatch_backend!(
self.dal.backend(),
self.complete_postgres(id, completed_at).await,
self.complete_sqlite(id, completed_at).await
)
}
pub async fn has_active_execution(
&self,
schedule_id: UniversalUuid,
context_hash: &str,
) -> Result<bool, ValidationError> {
let context_hash_owned = context_hash.to_string();
crate::dispatch_backend!(
self.dal.backend(),
self.has_active_execution_postgres(schedule_id, context_hash_owned.clone())
.await,
self.has_active_execution_sqlite(schedule_id, context_hash_owned)
.await
)
}
pub async fn update_workflow_execution_id(
&self,
id: UniversalUuid,
workflow_execution_id: UniversalUuid,
) -> Result<(), ValidationError> {
crate::dispatch_backend!(
self.dal.backend(),
self.update_workflow_execution_id_postgres(id, workflow_execution_id)
.await,
self.update_workflow_execution_id_sqlite(id, workflow_execution_id)
.await
)
}
pub async fn find_lost_executions(
&self,
older_than_minutes: i32,
) -> Result<Vec<ScheduleExecution>, ValidationError> {
crate::dispatch_backend!(
self.dal.backend(),
self.find_lost_executions_postgres(older_than_minutes).await,
self.find_lost_executions_sqlite(older_than_minutes).await
)
}
pub async fn get_latest_by_schedule(
&self,
schedule_id: UniversalUuid,
) -> Result<Option<ScheduleExecution>, ValidationError> {
crate::dispatch_backend!(
self.dal.backend(),
self.get_latest_by_schedule_postgres(schedule_id).await,
self.get_latest_by_schedule_sqlite(schedule_id).await
)
}
pub async fn get_execution_stats(
&self,
since: DateTime<Utc>,
) -> Result<ScheduleExecutionStats, ValidationError> {
crate::dispatch_backend!(
self.dal.backend(),
self.get_execution_stats_postgres(since).await,
self.get_execution_stats_sqlite(since).await
)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::database::universal_types::UniversalTimestamp;
use crate::database::Database;
use crate::models::schedule::{NewSchedule, NewScheduleExecution};
#[cfg(feature = "sqlite")]
async fn unique_dal() -> DAL {
let url = format!(
"file:sched_exec_test_{}?mode=memory&cache=shared",
uuid::Uuid::new_v4()
);
let db = Database::new(&url, "", 5);
db.run_migrations()
.await
.expect("migrations should succeed");
DAL::new(db)
}
#[cfg(feature = "sqlite")]
async fn create_schedule(dal: &DAL) -> UniversalUuid {
let next_run = UniversalTimestamp::now();
let schedule = dal
.schedule()
.create(NewSchedule::cron("test_wf", "0 * * * *", next_run))
.await
.unwrap();
schedule.id
}
#[cfg(feature = "sqlite")]
fn new_exec(schedule_id: UniversalUuid) -> NewScheduleExecution {
NewScheduleExecution {
schedule_id,
workflow_execution_id: None,
scheduled_time: Some(UniversalTimestamp::now()),
claimed_at: Some(UniversalTimestamp::now()),
context_hash: Some(uuid::Uuid::new_v4().to_string()),
}
}
#[cfg(feature = "sqlite")]
#[tokio::test]
async fn test_create_execution() {
let dal = unique_dal().await;
let sched_id = create_schedule(&dal).await;
let exec = dal
.schedule_execution()
.create(new_exec(sched_id))
.await
.unwrap();
assert_eq!(exec.schedule_id, sched_id);
assert!(exec.workflow_execution_id.is_none());
assert!(exec.completed_at.is_none());
assert!(exec.scheduled_time.is_some());
assert!(exec.context_hash.is_some());
}
#[cfg(feature = "sqlite")]
#[tokio::test]
async fn test_get_by_id() {
let dal = unique_dal().await;
let sched_id = create_schedule(&dal).await;
let created = dal
.schedule_execution()
.create(new_exec(sched_id))
.await
.unwrap();
let fetched = dal
.schedule_execution()
.get_by_id(created.id)
.await
.unwrap();
assert_eq!(fetched.id, created.id);
assert_eq!(fetched.schedule_id, sched_id);
}
#[cfg(feature = "sqlite")]
#[tokio::test]
async fn test_get_by_id_not_found() {
let dal = unique_dal().await;
let result = dal
.schedule_execution()
.get_by_id(UniversalUuid::new_v4())
.await;
assert!(result.is_err());
}
#[cfg(feature = "sqlite")]
#[tokio::test]
async fn test_list_by_schedule() {
let dal = unique_dal().await;
let sched_id = create_schedule(&dal).await;
let other_sched_id = create_schedule(&dal).await;
for _ in 0..3 {
dal.schedule_execution()
.create(new_exec(sched_id))
.await
.unwrap();
}
dal.schedule_execution()
.create(new_exec(other_sched_id))
.await
.unwrap();
let list = dal
.schedule_execution()
.list_by_schedule(sched_id, 100, 0)
.await
.unwrap();
assert_eq!(list.len(), 3);
let limited = dal
.schedule_execution()
.list_by_schedule(sched_id, 2, 0)
.await
.unwrap();
assert_eq!(limited.len(), 2);
let offset = dal
.schedule_execution()
.list_by_schedule(sched_id, 100, 2)
.await
.unwrap();
assert_eq!(offset.len(), 1);
}
#[cfg(feature = "sqlite")]
#[tokio::test]
async fn test_complete_execution() {
let dal = unique_dal().await;
let sched_id = create_schedule(&dal).await;
let exec = dal
.schedule_execution()
.create(new_exec(sched_id))
.await
.unwrap();
assert!(exec.completed_at.is_none());
let completed_at = Utc::now();
dal.schedule_execution()
.complete(exec.id, completed_at)
.await
.unwrap();
let updated = dal.schedule_execution().get_by_id(exec.id).await.unwrap();
assert!(updated.completed_at.is_some());
}
#[cfg(feature = "sqlite")]
#[tokio::test]
async fn test_has_active_execution() {
let dal = unique_dal().await;
let sched_id = create_schedule(&dal).await;
let active = dal
.schedule_execution()
.has_active_execution(sched_id, "hash1")
.await
.unwrap();
assert!(!active);
let mut ne = new_exec(sched_id);
ne.context_hash = Some("hash1".to_string());
dal.schedule_execution().create(ne).await.unwrap();
let active = dal
.schedule_execution()
.has_active_execution(sched_id, "hash1")
.await
.unwrap();
assert!(active);
let active_other = dal
.schedule_execution()
.has_active_execution(sched_id, "hash_other")
.await
.unwrap();
assert!(!active_other);
}
#[cfg(feature = "sqlite")]
#[tokio::test]
async fn test_has_active_execution_completed_not_active() {
let dal = unique_dal().await;
let sched_id = create_schedule(&dal).await;
let mut ne = new_exec(sched_id);
ne.context_hash = Some("hash_done".to_string());
let exec = dal.schedule_execution().create(ne).await.unwrap();
dal.schedule_execution()
.complete(exec.id, Utc::now())
.await
.unwrap();
let active = dal
.schedule_execution()
.has_active_execution(sched_id, "hash_done")
.await
.unwrap();
assert!(!active);
}
#[cfg(feature = "sqlite")]
#[tokio::test]
async fn test_update_workflow_execution_id() {
let dal = unique_dal().await;
let sched_id = create_schedule(&dal).await;
let exec = dal
.schedule_execution()
.create(new_exec(sched_id))
.await
.unwrap();
assert!(exec.workflow_execution_id.is_none());
use crate::models::workflow_execution::NewWorkflowExecution;
let wf_exec = dal
.workflow_execution()
.create(NewWorkflowExecution {
workflow_name: "fk-test".to_string(),
workflow_version: "1.0".to_string(),
status: "Running".to_string(),
context_id: None,
})
.await
.unwrap();
dal.schedule_execution()
.update_workflow_execution_id(exec.id, wf_exec.id)
.await
.unwrap();
let updated = dal.schedule_execution().get_by_id(exec.id).await.unwrap();
assert_eq!(updated.workflow_execution_id, Some(wf_exec.id));
}
#[cfg(feature = "sqlite")]
#[tokio::test]
async fn test_get_latest_by_schedule() {
let dal = unique_dal().await;
let sched_id = create_schedule(&dal).await;
let latest = dal
.schedule_execution()
.get_latest_by_schedule(sched_id)
.await
.unwrap();
assert!(latest.is_none());
let _first = dal
.schedule_execution()
.create(new_exec(sched_id))
.await
.unwrap();
let second = dal
.schedule_execution()
.create(new_exec(sched_id))
.await
.unwrap();
let latest = dal
.schedule_execution()
.get_latest_by_schedule(sched_id)
.await
.unwrap();
assert!(latest.is_some());
assert_eq!(latest.unwrap().id, second.id);
}
#[cfg(feature = "sqlite")]
#[tokio::test]
async fn test_find_lost_executions_none_lost() {
let dal = unique_dal().await;
let sched_id = create_schedule(&dal).await;
dal.schedule_execution()
.create(new_exec(sched_id))
.await
.unwrap();
let lost = dal
.schedule_execution()
.find_lost_executions(60)
.await
.unwrap();
assert!(lost.is_empty());
}
#[cfg(feature = "sqlite")]
#[tokio::test]
async fn test_find_lost_executions_completed_not_lost() {
let dal = unique_dal().await;
let sched_id = create_schedule(&dal).await;
let exec = dal
.schedule_execution()
.create(new_exec(sched_id))
.await
.unwrap();
dal.schedule_execution()
.complete(exec.id, Utc::now())
.await
.unwrap();
let lost = dal
.schedule_execution()
.find_lost_executions(0)
.await
.unwrap();
assert!(lost.is_empty());
}
#[cfg(feature = "sqlite")]
#[tokio::test]
async fn test_get_execution_stats_empty() {
let dal = unique_dal().await;
let since = Utc::now() - chrono::Duration::hours(1);
let stats = dal
.schedule_execution()
.get_execution_stats(since)
.await
.unwrap();
assert_eq!(stats.total_executions, 0);
assert_eq!(stats.successful_executions, 0);
assert_eq!(stats.lost_executions, 0);
assert_eq!(stats.success_rate, 0.0);
}
#[cfg(feature = "sqlite")]
#[tokio::test]
async fn test_get_execution_stats_with_data() {
let dal = unique_dal().await;
let sched_id = create_schedule(&dal).await;
let since = Utc::now() - chrono::Duration::hours(1);
let exec1 = dal
.schedule_execution()
.create(new_exec(sched_id))
.await
.unwrap();
dal.schedule_execution()
.create(new_exec(sched_id))
.await
.unwrap();
use crate::models::workflow_execution::NewWorkflowExecution;
let wf_exec = dal
.workflow_execution()
.create(NewWorkflowExecution {
workflow_name: "stats-test".to_string(),
workflow_version: "1.0".to_string(),
status: "Completed".to_string(),
context_id: None,
})
.await
.unwrap();
dal.schedule_execution()
.update_workflow_execution_id(exec1.id, wf_exec.id)
.await
.unwrap();
let stats = dal
.schedule_execution()
.get_execution_stats(since)
.await
.unwrap();
assert_eq!(stats.total_executions, 2);
assert_eq!(stats.successful_executions, 1);
assert_eq!(stats.success_rate, 50.0);
}
}