use super::models::{NewUnifiedRecoveryEvent, UnifiedRecoveryEvent};
use super::DAL;
use crate::database::schema::unified::recovery_events;
use crate::database::universal_types::{UniversalTimestamp, UniversalUuid};
use crate::error::ValidationError;
use crate::models::recovery_event::{NewRecoveryEvent, RecoveryEvent, RecoveryType};
use diesel::prelude::*;
#[derive(Clone)]
pub struct RecoveryEventDAL<'a> {
dal: &'a DAL,
}
#[allow(dead_code)]
impl<'a> RecoveryEventDAL<'a> {
pub fn new(dal: &'a DAL) -> Self {
Self { dal }
}
pub async fn create(
&self,
new_event: NewRecoveryEvent,
) -> Result<RecoveryEvent, ValidationError> {
crate::dispatch_backend!(
self.dal.backend(),
self.create_postgres(new_event).await,
self.create_sqlite(new_event).await
)
}
#[cfg(feature = "postgres")]
async fn create_postgres(
&self,
new_event: NewRecoveryEvent,
) -> Result<RecoveryEvent, ValidationError> {
let conn = self
.dal
.database
.get_postgres_connection()
.await
.map_err(|e| ValidationError::ConnectionPool(e.to_string()))?;
let id = UniversalUuid::new_v4();
let now = UniversalTimestamp::now();
let new_unified = NewUnifiedRecoveryEvent {
id,
workflow_execution_id: new_event.workflow_execution_id,
task_execution_id: new_event.task_execution_id,
recovery_type: new_event.recovery_type,
recovered_at: now,
details: new_event.details,
created_at: now,
updated_at: now,
};
conn.interact(move |conn| {
diesel::insert_into(recovery_events::table)
.values(&new_unified)
.execute(conn)
})
.await
.map_err(|e| ValidationError::ConnectionPool(e.to_string()))??;
let result: UnifiedRecoveryEvent = conn
.interact(move |conn| recovery_events::table.find(id).first(conn))
.await
.map_err(|e| ValidationError::ConnectionPool(e.to_string()))??;
Ok(result.into())
}
#[cfg(feature = "sqlite")]
async fn create_sqlite(
&self,
new_event: NewRecoveryEvent,
) -> Result<RecoveryEvent, ValidationError> {
let conn = self
.dal
.database
.get_sqlite_connection()
.await
.map_err(|e| ValidationError::ConnectionPool(e.to_string()))?;
let id = UniversalUuid::new_v4();
let now = UniversalTimestamp::now();
let new_unified = NewUnifiedRecoveryEvent {
id,
workflow_execution_id: new_event.workflow_execution_id,
task_execution_id: new_event.task_execution_id,
recovery_type: new_event.recovery_type,
recovered_at: now,
details: new_event.details,
created_at: now,
updated_at: now,
};
conn.interact(move |conn| {
diesel::insert_into(recovery_events::table)
.values(&new_unified)
.execute(conn)
})
.await
.map_err(|e| ValidationError::ConnectionPool(e.to_string()))??;
let result: UnifiedRecoveryEvent = conn
.interact(move |conn| recovery_events::table.find(id).first(conn))
.await
.map_err(|e| ValidationError::ConnectionPool(e.to_string()))??;
Ok(result.into())
}
pub(crate) async fn get_by_workflow(
&self,
workflow_execution_id: UniversalUuid,
) -> Result<Vec<RecoveryEvent>, ValidationError> {
crate::dispatch_backend!(
self.dal.backend(),
self.get_by_workflow_postgres(workflow_execution_id).await,
self.get_by_workflow_sqlite(workflow_execution_id).await
)
}
#[cfg(feature = "postgres")]
async fn get_by_workflow_postgres(
&self,
workflow_execution_id: UniversalUuid,
) -> Result<Vec<RecoveryEvent>, ValidationError> {
let conn = self
.dal
.database
.get_postgres_connection()
.await
.map_err(|e| ValidationError::ConnectionPool(e.to_string()))?;
let results: Vec<UnifiedRecoveryEvent> = conn
.interact(move |conn| {
recovery_events::table
.filter(recovery_events::workflow_execution_id.eq(workflow_execution_id))
.order(recovery_events::recovered_at.desc())
.load(conn)
})
.await
.map_err(|e| ValidationError::ConnectionPool(e.to_string()))??;
Ok(results.into_iter().map(Into::into).collect())
}
#[cfg(feature = "sqlite")]
async fn get_by_workflow_sqlite(
&self,
workflow_execution_id: UniversalUuid,
) -> Result<Vec<RecoveryEvent>, ValidationError> {
let conn = self
.dal
.database
.get_sqlite_connection()
.await
.map_err(|e| ValidationError::ConnectionPool(e.to_string()))?;
let results: Vec<UnifiedRecoveryEvent> = conn
.interact(move |conn| {
recovery_events::table
.filter(recovery_events::workflow_execution_id.eq(workflow_execution_id))
.order(recovery_events::recovered_at.desc())
.load(conn)
})
.await
.map_err(|e| ValidationError::ConnectionPool(e.to_string()))??;
Ok(results.into_iter().map(Into::into).collect())
}
pub(crate) async fn get_by_task(
&self,
task_execution_id: UniversalUuid,
) -> Result<Vec<RecoveryEvent>, ValidationError> {
crate::dispatch_backend!(
self.dal.backend(),
self.get_by_task_postgres(task_execution_id).await,
self.get_by_task_sqlite(task_execution_id).await
)
}
#[cfg(feature = "postgres")]
async fn get_by_task_postgres(
&self,
task_execution_id: UniversalUuid,
) -> Result<Vec<RecoveryEvent>, ValidationError> {
let conn = self
.dal
.database
.get_postgres_connection()
.await
.map_err(|e| ValidationError::ConnectionPool(e.to_string()))?;
let results: Vec<UnifiedRecoveryEvent> = conn
.interact(move |conn| {
recovery_events::table
.filter(recovery_events::task_execution_id.eq(task_execution_id))
.order(recovery_events::recovered_at.desc())
.load(conn)
})
.await
.map_err(|e| ValidationError::ConnectionPool(e.to_string()))??;
Ok(results.into_iter().map(Into::into).collect())
}
#[cfg(feature = "sqlite")]
async fn get_by_task_sqlite(
&self,
task_execution_id: UniversalUuid,
) -> Result<Vec<RecoveryEvent>, ValidationError> {
let conn = self
.dal
.database
.get_sqlite_connection()
.await
.map_err(|e| ValidationError::ConnectionPool(e.to_string()))?;
let results: Vec<UnifiedRecoveryEvent> = conn
.interact(move |conn| {
recovery_events::table
.filter(recovery_events::task_execution_id.eq(task_execution_id))
.order(recovery_events::recovered_at.desc())
.load(conn)
})
.await
.map_err(|e| ValidationError::ConnectionPool(e.to_string()))??;
Ok(results.into_iter().map(Into::into).collect())
}
pub(crate) async fn get_by_type(
&self,
recovery_type: &str,
) -> Result<Vec<RecoveryEvent>, ValidationError> {
crate::dispatch_backend!(
self.dal.backend(),
self.get_by_type_postgres(recovery_type).await,
self.get_by_type_sqlite(recovery_type).await
)
}
#[cfg(feature = "postgres")]
async fn get_by_type_postgres(
&self,
recovery_type: &str,
) -> Result<Vec<RecoveryEvent>, ValidationError> {
let conn = self
.dal
.database
.get_postgres_connection()
.await
.map_err(|e| ValidationError::ConnectionPool(e.to_string()))?;
let recovery_type = recovery_type.to_string();
let results: Vec<UnifiedRecoveryEvent> = conn
.interact(move |conn| {
recovery_events::table
.filter(recovery_events::recovery_type.eq(recovery_type))
.order(recovery_events::recovered_at.desc())
.load(conn)
})
.await
.map_err(|e| ValidationError::ConnectionPool(e.to_string()))??;
Ok(results.into_iter().map(Into::into).collect())
}
#[cfg(feature = "sqlite")]
async fn get_by_type_sqlite(
&self,
recovery_type: &str,
) -> Result<Vec<RecoveryEvent>, ValidationError> {
let conn = self
.dal
.database
.get_sqlite_connection()
.await
.map_err(|e| ValidationError::ConnectionPool(e.to_string()))?;
let recovery_type = recovery_type.to_string();
let results: Vec<UnifiedRecoveryEvent> = conn
.interact(move |conn| {
recovery_events::table
.filter(recovery_events::recovery_type.eq(recovery_type))
.order(recovery_events::recovered_at.desc())
.load(conn)
})
.await
.map_err(|e| ValidationError::ConnectionPool(e.to_string()))??;
Ok(results.into_iter().map(Into::into).collect())
}
pub(crate) async fn get_workflow_unavailable_events(
&self,
) -> Result<Vec<RecoveryEvent>, ValidationError> {
self.get_by_type(RecoveryType::WorkflowUnavailable.as_str())
.await
}
pub(crate) async fn get_recent(
&self,
limit: i64,
) -> Result<Vec<RecoveryEvent>, ValidationError> {
crate::dispatch_backend!(
self.dal.backend(),
self.get_recent_postgres(limit).await,
self.get_recent_sqlite(limit).await
)
}
#[cfg(feature = "postgres")]
async fn get_recent_postgres(&self, limit: i64) -> Result<Vec<RecoveryEvent>, ValidationError> {
let conn = self
.dal
.database
.get_postgres_connection()
.await
.map_err(|e| ValidationError::ConnectionPool(e.to_string()))?;
let results: Vec<UnifiedRecoveryEvent> = conn
.interact(move |conn| {
recovery_events::table
.order(recovery_events::recovered_at.desc())
.limit(limit)
.load(conn)
})
.await
.map_err(|e| ValidationError::ConnectionPool(e.to_string()))??;
Ok(results.into_iter().map(Into::into).collect())
}
#[cfg(feature = "sqlite")]
async fn get_recent_sqlite(&self, limit: i64) -> Result<Vec<RecoveryEvent>, ValidationError> {
let conn = self
.dal
.database
.get_sqlite_connection()
.await
.map_err(|e| ValidationError::ConnectionPool(e.to_string()))?;
let results: Vec<UnifiedRecoveryEvent> = conn
.interact(move |conn| {
recovery_events::table
.order(recovery_events::recovered_at.desc())
.limit(limit)
.load(conn)
})
.await
.map_err(|e| ValidationError::ConnectionPool(e.to_string()))??;
Ok(results.into_iter().map(Into::into).collect())
}
}