mod crud;
mod queries;
mod tracking;
use super::DAL;
use crate::database::universal_types::UniversalUuid;
use crate::error::ValidationError;
use crate::models::cron_execution::{CronExecution, NewCronExecution};
use chrono::{DateTime, Utc};
#[derive(Debug)]
pub struct CronExecutionStats {
pub total_executions: i64,
pub successful_executions: i64,
pub lost_executions: i64,
pub success_rate: f64,
}
#[derive(Clone)]
pub struct CronExecutionDAL<'a> {
dal: &'a DAL,
}
impl<'a> CronExecutionDAL<'a> {
pub fn new(dal: &'a DAL) -> Self {
Self { dal }
}
pub async fn create(
&self,
new_execution: NewCronExecution,
) -> Result<CronExecution, ValidationError> {
crate::dispatch_backend!(
self.dal.backend(),
self.create_postgres(new_execution).await,
self.create_sqlite(new_execution).await
)
}
pub async fn update_pipeline_execution_id(
&self,
cron_execution_id: UniversalUuid,
pipeline_execution_id: UniversalUuid,
) -> Result<(), ValidationError> {
crate::dispatch_backend!(
self.dal.backend(),
self.update_pipeline_execution_id_postgres(cron_execution_id, pipeline_execution_id)
.await,
self.update_pipeline_execution_id_sqlite(cron_execution_id, pipeline_execution_id)
.await
)
}
pub async fn find_lost_executions(
&self,
older_than_minutes: i32,
) -> Result<Vec<CronExecution>, ValidationError> {
crate::dispatch_backend!(
self.dal.backend(),
self.find_lost_executions_postgres(older_than_minutes).await,
self.find_lost_executions_sqlite(older_than_minutes).await
)
}
pub async fn get_by_id(&self, id: UniversalUuid) -> Result<CronExecution, ValidationError> {
crate::dispatch_backend!(
self.dal.backend(),
self.get_by_id_postgres(id).await,
self.get_by_id_sqlite(id).await
)
}
pub async fn get_by_schedule_id(
&self,
schedule_id: UniversalUuid,
limit: i64,
offset: i64,
) -> Result<Vec<CronExecution>, ValidationError> {
crate::dispatch_backend!(
self.dal.backend(),
self.get_by_schedule_id_postgres(schedule_id, limit, offset)
.await,
self.get_by_schedule_id_sqlite(schedule_id, limit, offset)
.await
)
}
pub async fn get_by_pipeline_execution_id(
&self,
pipeline_execution_id: UniversalUuid,
) -> Result<Option<CronExecution>, ValidationError> {
crate::dispatch_backend!(
self.dal.backend(),
self.get_by_pipeline_execution_id_postgres(pipeline_execution_id)
.await,
self.get_by_pipeline_execution_id_sqlite(pipeline_execution_id)
.await
)
}
pub async fn get_by_time_range(
&self,
start_time: DateTime<Utc>,
end_time: DateTime<Utc>,
limit: i64,
offset: i64,
) -> Result<Vec<CronExecution>, ValidationError> {
crate::dispatch_backend!(
self.dal.backend(),
self.get_by_time_range_postgres(start_time, end_time, limit, offset)
.await,
self.get_by_time_range_sqlite(start_time, end_time, limit, offset)
.await
)
}
pub async fn count_by_schedule(
&self,
schedule_id: UniversalUuid,
) -> Result<i64, ValidationError> {
crate::dispatch_backend!(
self.dal.backend(),
self.count_by_schedule_postgres(schedule_id).await,
self.count_by_schedule_sqlite(schedule_id).await
)
}
pub async fn execution_exists(
&self,
schedule_id: UniversalUuid,
scheduled_time: DateTime<Utc>,
) -> Result<bool, ValidationError> {
crate::dispatch_backend!(
self.dal.backend(),
self.execution_exists_postgres(schedule_id, scheduled_time)
.await,
self.execution_exists_sqlite(schedule_id, scheduled_time)
.await
)
}
pub async fn get_latest_by_schedule(
&self,
schedule_id: UniversalUuid,
) -> Result<Option<CronExecution>, ValidationError> {
crate::dispatch_backend!(
self.dal.backend(),
self.get_latest_by_schedule_postgres(schedule_id).await,
self.get_latest_by_schedule_sqlite(schedule_id).await
)
}
pub async fn delete_older_than(
&self,
older_than: DateTime<Utc>,
) -> Result<usize, ValidationError> {
crate::dispatch_backend!(
self.dal.backend(),
self.delete_older_than_postgres(older_than).await,
self.delete_older_than_sqlite(older_than).await
)
}
pub async fn get_execution_stats(
&self,
since: DateTime<Utc>,
) -> Result<CronExecutionStats, ValidationError> {
crate::dispatch_backend!(
self.dal.backend(),
self.get_execution_stats_postgres(since).await,
self.get_execution_stats_sqlite(since).await
)
}
}