use super::models::{NewUnifiedTaskOutbox, UnifiedTaskOutbox};
use super::DAL;
use crate::database::schema::unified::task_outbox;
use crate::database::universal_types::{UniversalTimestamp, UniversalUuid};
use crate::error::ValidationError;
use crate::models::task_outbox::{NewTaskOutbox, TaskOutbox};
use diesel::prelude::*;
#[derive(Clone)]
pub struct TaskOutboxDAL<'a> {
dal: &'a DAL,
}
impl<'a> TaskOutboxDAL<'a> {
pub fn new(dal: &'a DAL) -> Self {
Self { dal }
}
pub async fn create(&self, new_entry: NewTaskOutbox) -> Result<TaskOutbox, ValidationError> {
crate::dispatch_backend!(
self.dal.backend(),
self.create_postgres(new_entry).await,
self.create_sqlite(new_entry).await
)
}
#[cfg(feature = "postgres")]
async fn create_postgres(
&self,
new_entry: NewTaskOutbox,
) -> Result<TaskOutbox, ValidationError> {
let conn = self
.dal
.database
.get_postgres_connection()
.await
.map_err(|e| ValidationError::ConnectionPool(e.to_string()))?;
let now = UniversalTimestamp::now();
let new_unified = NewUnifiedTaskOutbox {
task_execution_id: new_entry.task_execution_id,
created_at: now,
};
let result: UnifiedTaskOutbox = conn
.interact(move |conn| {
diesel::insert_into(task_outbox::table)
.values(&new_unified)
.get_result(conn)
})
.await
.map_err(|e| ValidationError::ConnectionPool(e.to_string()))??;
Ok(TaskOutbox {
id: result.id,
task_execution_id: result.task_execution_id,
created_at: result.created_at,
})
}
#[cfg(feature = "sqlite")]
async fn create_sqlite(&self, new_entry: NewTaskOutbox) -> Result<TaskOutbox, ValidationError> {
let conn = self
.dal
.database
.get_sqlite_connection()
.await
.map_err(|e| ValidationError::ConnectionPool(e.to_string()))?;
let now = UniversalTimestamp::now();
let new_unified = NewUnifiedTaskOutbox {
task_execution_id: new_entry.task_execution_id,
created_at: now,
};
let result: UnifiedTaskOutbox = conn
.interact(move |conn| {
diesel::insert_into(task_outbox::table)
.values(&new_unified)
.get_result(conn)
})
.await
.map_err(|e| ValidationError::ConnectionPool(e.to_string()))??;
Ok(TaskOutbox {
id: result.id,
task_execution_id: result.task_execution_id,
created_at: result.created_at,
})
}
pub async fn delete_by_task(
&self,
task_execution_id: UniversalUuid,
) -> Result<(), ValidationError> {
crate::dispatch_backend!(
self.dal.backend(),
self.delete_by_task_postgres(task_execution_id).await,
self.delete_by_task_sqlite(task_execution_id).await
)
}
#[cfg(feature = "postgres")]
async fn delete_by_task_postgres(
&self,
task_execution_id: UniversalUuid,
) -> Result<(), ValidationError> {
let conn = self
.dal
.database
.get_postgres_connection()
.await
.map_err(|e| ValidationError::ConnectionPool(e.to_string()))?;
conn.interact(move |conn| {
diesel::delete(
task_outbox::table.filter(task_outbox::task_execution_id.eq(task_execution_id)),
)
.execute(conn)
})
.await
.map_err(|e| ValidationError::ConnectionPool(e.to_string()))??;
Ok(())
}
#[cfg(feature = "sqlite")]
async fn delete_by_task_sqlite(
&self,
task_execution_id: UniversalUuid,
) -> Result<(), ValidationError> {
let conn = self
.dal
.database
.get_sqlite_connection()
.await
.map_err(|e| ValidationError::ConnectionPool(e.to_string()))?;
conn.interact(move |conn| {
diesel::delete(
task_outbox::table.filter(task_outbox::task_execution_id.eq(task_execution_id)),
)
.execute(conn)
})
.await
.map_err(|e| ValidationError::ConnectionPool(e.to_string()))??;
Ok(())
}
pub async fn list_pending(&self, limit: i64) -> Result<Vec<TaskOutbox>, ValidationError> {
crate::dispatch_backend!(
self.dal.backend(),
self.list_pending_postgres(limit).await,
self.list_pending_sqlite(limit).await
)
}
#[cfg(feature = "postgres")]
async fn list_pending_postgres(&self, limit: i64) -> Result<Vec<TaskOutbox>, ValidationError> {
let conn = self
.dal
.database
.get_postgres_connection()
.await
.map_err(|e| ValidationError::ConnectionPool(e.to_string()))?;
let results: Vec<UnifiedTaskOutbox> = conn
.interact(move |conn| {
task_outbox::table
.order(task_outbox::created_at.asc())
.limit(limit)
.load(conn)
})
.await
.map_err(|e| ValidationError::ConnectionPool(e.to_string()))??;
Ok(results
.into_iter()
.map(|r| TaskOutbox {
id: r.id,
task_execution_id: r.task_execution_id,
created_at: r.created_at,
})
.collect())
}
#[cfg(feature = "sqlite")]
async fn list_pending_sqlite(&self, limit: i64) -> Result<Vec<TaskOutbox>, ValidationError> {
let conn = self
.dal
.database
.get_sqlite_connection()
.await
.map_err(|e| ValidationError::ConnectionPool(e.to_string()))?;
let results: Vec<UnifiedTaskOutbox> = conn
.interact(move |conn| {
task_outbox::table
.order(task_outbox::created_at.asc())
.limit(limit)
.load(conn)
})
.await
.map_err(|e| ValidationError::ConnectionPool(e.to_string()))??;
Ok(results
.into_iter()
.map(|r| TaskOutbox {
id: r.id,
task_execution_id: r.task_execution_id,
created_at: r.created_at,
})
.collect())
}
pub async fn count_pending(&self) -> Result<i64, ValidationError> {
crate::dispatch_backend!(
self.dal.backend(),
self.count_pending_postgres().await,
self.count_pending_sqlite().await
)
}
#[cfg(feature = "postgres")]
async fn count_pending_postgres(&self) -> Result<i64, ValidationError> {
let conn = self
.dal
.database
.get_postgres_connection()
.await
.map_err(|e| ValidationError::ConnectionPool(e.to_string()))?;
let count: i64 = conn
.interact(move |conn| task_outbox::table.count().get_result(conn))
.await
.map_err(|e| ValidationError::ConnectionPool(e.to_string()))??;
Ok(count)
}
#[cfg(feature = "sqlite")]
async fn count_pending_sqlite(&self) -> Result<i64, ValidationError> {
let conn = self
.dal
.database
.get_sqlite_connection()
.await
.map_err(|e| ValidationError::ConnectionPool(e.to_string()))?;
let count: i64 = conn
.interact(move |conn| task_outbox::table.count().get_result(conn))
.await
.map_err(|e| ValidationError::ConnectionPool(e.to_string()))??;
Ok(count)
}
pub async fn delete_older_than(
&self,
cutoff: UniversalTimestamp,
) -> Result<i64, ValidationError> {
crate::dispatch_backend!(
self.dal.backend(),
self.delete_older_than_postgres(cutoff).await,
self.delete_older_than_sqlite(cutoff).await
)
}
#[cfg(feature = "postgres")]
async fn delete_older_than_postgres(
&self,
cutoff: UniversalTimestamp,
) -> Result<i64, ValidationError> {
let conn = self
.dal
.database
.get_postgres_connection()
.await
.map_err(|e| ValidationError::ConnectionPool(e.to_string()))?;
let deleted: usize = conn
.interact(move |conn| {
diesel::delete(task_outbox::table.filter(task_outbox::created_at.lt(cutoff)))
.execute(conn)
})
.await
.map_err(|e| ValidationError::ConnectionPool(e.to_string()))??;
Ok(deleted as i64)
}
#[cfg(feature = "sqlite")]
async fn delete_older_than_sqlite(
&self,
cutoff: UniversalTimestamp,
) -> Result<i64, ValidationError> {
let conn = self
.dal
.database
.get_sqlite_connection()
.await
.map_err(|e| ValidationError::ConnectionPool(e.to_string()))?;
let deleted: usize = conn
.interact(move |conn| {
diesel::delete(task_outbox::table.filter(task_outbox::created_at.lt(cutoff)))
.execute(conn)
})
.await
.map_err(|e| ValidationError::ConnectionPool(e.to_string()))??;
Ok(deleted as i64)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::database::Database;
use crate::models::pipeline_execution::NewPipelineExecution;
use crate::models::task_execution::NewTaskExecution;
use crate::models::task_outbox::NewTaskOutbox;
#[cfg(feature = "sqlite")]
async fn unique_dal() -> DAL {
let url = format!(
"sqlite:///tmp/outbox_test_{}.db?mode=rwc",
uuid::Uuid::new_v4()
);
let db = Database::new(&url, "", 5);
db.run_migrations()
.await
.expect("migrations should succeed");
DAL::new(db)
}
#[cfg(feature = "sqlite")]
async fn create_ready_task(dal: &DAL, task_name: &str) -> UniversalUuid {
let pipeline = dal
.pipeline_execution()
.create(NewPipelineExecution {
pipeline_name: "test_pipeline".into(),
pipeline_version: "1.0".into(),
status: "Running".into(),
context_id: None,
})
.await
.unwrap();
let task = dal
.task_execution()
.create(NewTaskExecution {
pipeline_execution_id: pipeline.id,
task_name: task_name.into(),
status: "NotStarted".into(),
attempt: 1,
max_attempts: 3,
trigger_rules: r#"{"type":"Always"}"#.into(),
task_configuration: "{}".into(),
})
.await
.unwrap();
dal.task_execution().mark_ready(task.id).await.unwrap();
task.id
}
#[cfg(feature = "sqlite")]
#[tokio::test]
async fn test_create_outbox_entry() {
let dal = unique_dal().await;
let task_id = create_ready_task(&dal, "task_create_test").await;
let pending = dal.task_outbox().list_pending(10).await.unwrap();
assert_eq!(pending.len(), 1);
assert_eq!(pending[0].task_execution_id, task_id);
}
#[cfg(feature = "sqlite")]
#[tokio::test]
async fn test_list_pending_empty() {
let dal = unique_dal().await;
let pending = dal.task_outbox().list_pending(10).await.unwrap();
assert!(pending.is_empty());
}
#[cfg(feature = "sqlite")]
#[tokio::test]
async fn test_list_pending_respects_limit() {
let dal = unique_dal().await;
create_ready_task(&dal, "task_a").await;
create_ready_task(&dal, "task_b").await;
create_ready_task(&dal, "task_c").await;
let page = dal.task_outbox().list_pending(2).await.unwrap();
assert_eq!(page.len(), 2);
let all = dal.task_outbox().list_pending(100).await.unwrap();
assert_eq!(all.len(), 3);
}
#[cfg(feature = "sqlite")]
#[tokio::test]
async fn test_list_pending_ordered_oldest_first() {
let dal = unique_dal().await;
create_ready_task(&dal, "first").await;
create_ready_task(&dal, "second").await;
let pending = dal.task_outbox().list_pending(10).await.unwrap();
assert_eq!(pending.len(), 2);
let t0: chrono::DateTime<chrono::Utc> = pending[0].created_at.into();
let t1: chrono::DateTime<chrono::Utc> = pending[1].created_at.into();
assert!(t0 <= t1);
}
#[cfg(feature = "sqlite")]
#[tokio::test]
async fn test_count_pending_empty() {
let dal = unique_dal().await;
let count = dal.task_outbox().count_pending().await.unwrap();
assert_eq!(count, 0);
}
#[cfg(feature = "sqlite")]
#[tokio::test]
async fn test_count_pending_after_inserts() {
let dal = unique_dal().await;
create_ready_task(&dal, "t1").await;
create_ready_task(&dal, "t2").await;
let count = dal.task_outbox().count_pending().await.unwrap();
assert_eq!(count, 2);
}
#[cfg(feature = "sqlite")]
#[tokio::test]
async fn test_delete_by_task() {
let dal = unique_dal().await;
let task_id = create_ready_task(&dal, "to_delete").await;
let count_before = dal.task_outbox().count_pending().await.unwrap();
assert_eq!(count_before, 1);
dal.task_outbox().delete_by_task(task_id).await.unwrap();
let count_after = dal.task_outbox().count_pending().await.unwrap();
assert_eq!(count_after, 0);
}
#[cfg(feature = "sqlite")]
#[tokio::test]
async fn test_delete_by_task_nonexistent() {
let dal = unique_dal().await;
let bogus = UniversalUuid::new_v4();
dal.task_outbox().delete_by_task(bogus).await.unwrap();
}
#[cfg(feature = "sqlite")]
#[tokio::test]
async fn test_delete_by_task_only_removes_target() {
let dal = unique_dal().await;
let task_a = create_ready_task(&dal, "keep_me").await;
let task_b = create_ready_task(&dal, "delete_me").await;
dal.task_outbox().delete_by_task(task_b).await.unwrap();
let remaining = dal.task_outbox().list_pending(10).await.unwrap();
assert_eq!(remaining.len(), 1);
assert_eq!(remaining[0].task_execution_id, task_a);
}
#[cfg(feature = "sqlite")]
#[tokio::test]
async fn test_delete_older_than() {
let dal = unique_dal().await;
create_ready_task(&dal, "old_task").await;
let future_cutoff =
UniversalTimestamp::from(chrono::Utc::now() + chrono::Duration::hours(1));
let deleted = dal
.task_outbox()
.delete_older_than(future_cutoff)
.await
.unwrap();
assert_eq!(deleted, 1);
let count = dal.task_outbox().count_pending().await.unwrap();
assert_eq!(count, 0);
}
#[cfg(feature = "sqlite")]
#[tokio::test]
async fn test_delete_older_than_keeps_recent() {
let dal = unique_dal().await;
create_ready_task(&dal, "recent_task").await;
let past_cutoff = UniversalTimestamp::from(chrono::Utc::now() - chrono::Duration::hours(1));
let deleted = dal
.task_outbox()
.delete_older_than(past_cutoff)
.await
.unwrap();
assert_eq!(deleted, 0);
let count = dal.task_outbox().count_pending().await.unwrap();
assert_eq!(count, 1);
}
#[cfg(feature = "sqlite")]
#[tokio::test]
async fn test_direct_create() {
let dal = unique_dal().await;
let pipeline = dal
.pipeline_execution()
.create(NewPipelineExecution {
pipeline_name: "p".into(),
pipeline_version: "1".into(),
status: "Running".into(),
context_id: None,
})
.await
.unwrap();
let task = dal
.task_execution()
.create(NewTaskExecution {
pipeline_execution_id: pipeline.id,
task_name: "direct".into(),
status: "NotStarted".into(),
attempt: 1,
max_attempts: 1,
trigger_rules: r#"{"type":"Always"}"#.into(),
task_configuration: "{}".into(),
})
.await
.unwrap();
let entry = dal
.task_outbox()
.create(NewTaskOutbox {
task_execution_id: task.id,
})
.await
.unwrap();
assert_eq!(entry.task_execution_id, task.id);
assert_eq!(dal.task_outbox().count_pending().await.unwrap(), 1);
}
#[cfg(feature = "sqlite")]
#[tokio::test]
async fn test_mark_ready_populates_outbox() {
let dal = unique_dal().await;
let pipeline = dal
.pipeline_execution()
.create(NewPipelineExecution {
pipeline_name: "p".into(),
pipeline_version: "1".into(),
status: "Running".into(),
context_id: None,
})
.await
.unwrap();
let task = dal
.task_execution()
.create(NewTaskExecution {
pipeline_execution_id: pipeline.id,
task_name: "ready_test".into(),
status: "NotStarted".into(),
attempt: 1,
max_attempts: 1,
trigger_rules: r#"{"type":"Always"}"#.into(),
task_configuration: "{}".into(),
})
.await
.unwrap();
assert_eq!(dal.task_outbox().count_pending().await.unwrap(), 0);
dal.task_execution().mark_ready(task.id).await.unwrap();
let pending = dal.task_outbox().list_pending(10).await.unwrap();
assert_eq!(pending.len(), 1);
assert_eq!(pending[0].task_execution_id, task.id);
}
}