use super::models::{NewUnifiedTaskOutbox, UnifiedTaskOutbox};
use super::DAL;
use crate::database::schema::unified::task_outbox;
use crate::database::universal_types::{UniversalTimestamp, UniversalUuid};
use crate::error::ValidationError;
use crate::models::task_outbox::{NewTaskOutbox, TaskOutbox};
use diesel::prelude::*;
#[derive(Clone)]
pub struct TaskOutboxDAL<'a> {
dal: &'a DAL,
}
impl<'a> TaskOutboxDAL<'a> {
pub fn new(dal: &'a DAL) -> Self {
Self { dal }
}
pub async fn create(&self, new_entry: NewTaskOutbox) -> Result<TaskOutbox, ValidationError> {
crate::dispatch_backend!(
self.dal.backend(),
self.create_postgres(new_entry).await,
self.create_sqlite(new_entry).await
)
}
#[cfg(feature = "postgres")]
async fn create_postgres(
&self,
new_entry: NewTaskOutbox,
) -> Result<TaskOutbox, ValidationError> {
let conn = self
.dal
.database
.get_postgres_connection()
.await
.map_err(|e| ValidationError::ConnectionPool(e.to_string()))?;
let now = UniversalTimestamp::now();
let new_unified = NewUnifiedTaskOutbox {
task_execution_id: new_entry.task_execution_id,
created_at: now,
};
let result: UnifiedTaskOutbox = conn
.interact(move |conn| {
diesel::insert_into(task_outbox::table)
.values(&new_unified)
.get_result(conn)
})
.await
.map_err(|e| ValidationError::ConnectionPool(e.to_string()))??;
Ok(TaskOutbox {
id: result.id,
task_execution_id: result.task_execution_id,
created_at: result.created_at,
})
}
#[cfg(feature = "sqlite")]
async fn create_sqlite(&self, new_entry: NewTaskOutbox) -> Result<TaskOutbox, ValidationError> {
let conn = self
.dal
.database
.get_sqlite_connection()
.await
.map_err(|e| ValidationError::ConnectionPool(e.to_string()))?;
let now = UniversalTimestamp::now();
let new_unified = NewUnifiedTaskOutbox {
task_execution_id: new_entry.task_execution_id,
created_at: now,
};
let result: UnifiedTaskOutbox = conn
.interact(move |conn| {
diesel::insert_into(task_outbox::table)
.values(&new_unified)
.get_result(conn)
})
.await
.map_err(|e| ValidationError::ConnectionPool(e.to_string()))??;
Ok(TaskOutbox {
id: result.id,
task_execution_id: result.task_execution_id,
created_at: result.created_at,
})
}
pub async fn delete_by_task(
&self,
task_execution_id: UniversalUuid,
) -> Result<(), ValidationError> {
crate::dispatch_backend!(
self.dal.backend(),
self.delete_by_task_postgres(task_execution_id).await,
self.delete_by_task_sqlite(task_execution_id).await
)
}
#[cfg(feature = "postgres")]
async fn delete_by_task_postgres(
&self,
task_execution_id: UniversalUuid,
) -> Result<(), ValidationError> {
let conn = self
.dal
.database
.get_postgres_connection()
.await
.map_err(|e| ValidationError::ConnectionPool(e.to_string()))?;
conn.interact(move |conn| {
diesel::delete(
task_outbox::table.filter(task_outbox::task_execution_id.eq(task_execution_id)),
)
.execute(conn)
})
.await
.map_err(|e| ValidationError::ConnectionPool(e.to_string()))??;
Ok(())
}
#[cfg(feature = "sqlite")]
async fn delete_by_task_sqlite(
&self,
task_execution_id: UniversalUuid,
) -> Result<(), ValidationError> {
let conn = self
.dal
.database
.get_sqlite_connection()
.await
.map_err(|e| ValidationError::ConnectionPool(e.to_string()))?;
conn.interact(move |conn| {
diesel::delete(
task_outbox::table.filter(task_outbox::task_execution_id.eq(task_execution_id)),
)
.execute(conn)
})
.await
.map_err(|e| ValidationError::ConnectionPool(e.to_string()))??;
Ok(())
}
pub async fn list_pending(&self, limit: i64) -> Result<Vec<TaskOutbox>, ValidationError> {
crate::dispatch_backend!(
self.dal.backend(),
self.list_pending_postgres(limit).await,
self.list_pending_sqlite(limit).await
)
}
#[cfg(feature = "postgres")]
async fn list_pending_postgres(&self, limit: i64) -> Result<Vec<TaskOutbox>, ValidationError> {
let conn = self
.dal
.database
.get_postgres_connection()
.await
.map_err(|e| ValidationError::ConnectionPool(e.to_string()))?;
let results: Vec<UnifiedTaskOutbox> = conn
.interact(move |conn| {
task_outbox::table
.order(task_outbox::created_at.asc())
.limit(limit)
.load(conn)
})
.await
.map_err(|e| ValidationError::ConnectionPool(e.to_string()))??;
Ok(results
.into_iter()
.map(|r| TaskOutbox {
id: r.id,
task_execution_id: r.task_execution_id,
created_at: r.created_at,
})
.collect())
}
#[cfg(feature = "sqlite")]
async fn list_pending_sqlite(&self, limit: i64) -> Result<Vec<TaskOutbox>, ValidationError> {
let conn = self
.dal
.database
.get_sqlite_connection()
.await
.map_err(|e| ValidationError::ConnectionPool(e.to_string()))?;
let results: Vec<UnifiedTaskOutbox> = conn
.interact(move |conn| {
task_outbox::table
.order(task_outbox::created_at.asc())
.limit(limit)
.load(conn)
})
.await
.map_err(|e| ValidationError::ConnectionPool(e.to_string()))??;
Ok(results
.into_iter()
.map(|r| TaskOutbox {
id: r.id,
task_execution_id: r.task_execution_id,
created_at: r.created_at,
})
.collect())
}
pub async fn count_pending(&self) -> Result<i64, ValidationError> {
crate::dispatch_backend!(
self.dal.backend(),
self.count_pending_postgres().await,
self.count_pending_sqlite().await
)
}
#[cfg(feature = "postgres")]
async fn count_pending_postgres(&self) -> Result<i64, ValidationError> {
let conn = self
.dal
.database
.get_postgres_connection()
.await
.map_err(|e| ValidationError::ConnectionPool(e.to_string()))?;
let count: i64 = conn
.interact(move |conn| task_outbox::table.count().get_result(conn))
.await
.map_err(|e| ValidationError::ConnectionPool(e.to_string()))??;
Ok(count)
}
#[cfg(feature = "sqlite")]
async fn count_pending_sqlite(&self) -> Result<i64, ValidationError> {
let conn = self
.dal
.database
.get_sqlite_connection()
.await
.map_err(|e| ValidationError::ConnectionPool(e.to_string()))?;
let count: i64 = conn
.interact(move |conn| task_outbox::table.count().get_result(conn))
.await
.map_err(|e| ValidationError::ConnectionPool(e.to_string()))??;
Ok(count)
}
pub async fn delete_older_than(
&self,
cutoff: UniversalTimestamp,
) -> Result<i64, ValidationError> {
crate::dispatch_backend!(
self.dal.backend(),
self.delete_older_than_postgres(cutoff).await,
self.delete_older_than_sqlite(cutoff).await
)
}
#[cfg(feature = "postgres")]
async fn delete_older_than_postgres(
&self,
cutoff: UniversalTimestamp,
) -> Result<i64, ValidationError> {
let conn = self
.dal
.database
.get_postgres_connection()
.await
.map_err(|e| ValidationError::ConnectionPool(e.to_string()))?;
let deleted: usize = conn
.interact(move |conn| {
diesel::delete(task_outbox::table.filter(task_outbox::created_at.lt(cutoff)))
.execute(conn)
})
.await
.map_err(|e| ValidationError::ConnectionPool(e.to_string()))??;
Ok(deleted as i64)
}
#[cfg(feature = "sqlite")]
async fn delete_older_than_sqlite(
&self,
cutoff: UniversalTimestamp,
) -> Result<i64, ValidationError> {
let conn = self
.dal
.database
.get_sqlite_connection()
.await
.map_err(|e| ValidationError::ConnectionPool(e.to_string()))?;
let deleted: usize = conn
.interact(move |conn| {
diesel::delete(task_outbox::table.filter(task_outbox::created_at.lt(cutoff)))
.execute(conn)
})
.await
.map_err(|e| ValidationError::ConnectionPool(e.to_string()))??;
Ok(deleted as i64)
}
}