#![cfg(feature = "sqlite")]
use async_trait::async_trait;
use deltaflow::{HasEntityId, Pipeline, RetryPolicy, SqliteRecorder, Step, StepError};
use serde::Serialize;
use sqlx::sqlite::SqlitePoolOptions;
#[derive(Clone, Debug, Serialize)]
struct TestInput {
id: String,
value: u32,
}
impl HasEntityId for TestInput {
fn entity_id(&self) -> String {
self.id.clone()
}
}
struct DoubleStep;
#[async_trait]
impl Step for DoubleStep {
type Input = TestInput;
type Output = TestInput;
fn name(&self) -> &'static str {
"double"
}
async fn execute(&self, mut input: Self::Input) -> Result<Self::Output, StepError> {
input.value *= 2;
Ok(input)
}
}
struct AddTenStep;
#[async_trait]
impl Step for AddTenStep {
type Input = TestInput;
type Output = TestInput;
fn name(&self) -> &'static str {
"add_ten"
}
async fn execute(&self, mut input: Self::Input) -> Result<Self::Output, StepError> {
input.value += 10;
Ok(input)
}
}
struct PermanentFailStep;
#[async_trait]
impl Step for PermanentFailStep {
type Input = TestInput;
type Output = TestInput;
fn name(&self) -> &'static str {
"permanent_fail"
}
async fn execute(&self, _input: Self::Input) -> Result<Self::Output, StepError> {
Err(StepError::permanent(anyhow::anyhow!("permanent failure")))
}
}
#[tokio::test]
async fn successful_run_recording() {
let pool = SqlitePoolOptions::new()
.connect(":memory:")
.await
.expect("Failed to create in-memory database");
let recorder = SqliteRecorder::new(pool.clone());
recorder
.run_migrations()
.await
.expect("Failed to run migrations");
let pipeline = Pipeline::new("test_success")
.start_with(DoubleStep)
.then(AddTenStep)
.with_recorder(recorder)
.build();
let input = TestInput {
id: "entity-123".to_string(),
value: 5,
};
let result = pipeline.run(input).await;
assert!(result.is_ok());
assert_eq!(result.unwrap().value, 20);
let run_status: String =
sqlx::query_scalar("SELECT status FROM delta_runs WHERE pipeline_name = 'test_success'")
.fetch_one(&pool)
.await
.expect("Failed to fetch run status");
assert_eq!(run_status, "completed");
let entity_id: String =
sqlx::query_scalar("SELECT entity_id FROM delta_runs WHERE pipeline_name = 'test_success'")
.fetch_one(&pool)
.await
.expect("Failed to fetch entity_id");
assert_eq!(entity_id, "entity-123");
let step_statuses: Vec<String> = sqlx::query_scalar(
"SELECT status FROM delta_steps WHERE run_id = (SELECT id FROM delta_runs WHERE pipeline_name = 'test_success') ORDER BY step_index",
)
.fetch_all(&pool)
.await
.expect("Failed to fetch step statuses");
assert_eq!(step_statuses.len(), 2);
assert_eq!(step_statuses[0], "completed");
assert_eq!(step_statuses[1], "completed");
let step_names: Vec<String> = sqlx::query_scalar(
"SELECT step_name FROM delta_steps WHERE run_id = (SELECT id FROM delta_runs WHERE pipeline_name = 'test_success') ORDER BY step_index",
)
.fetch_all(&pool)
.await
.expect("Failed to fetch step names");
assert_eq!(step_names[0], "double");
assert_eq!(step_names[1], "add_ten");
}
#[tokio::test]
async fn failed_run_recording() {
let pool = SqlitePoolOptions::new()
.connect(":memory:")
.await
.expect("Failed to create in-memory database");
let recorder = SqliteRecorder::new(pool.clone());
recorder
.run_migrations()
.await
.expect("Failed to run migrations");
let pipeline = Pipeline::new("test_failure")
.start_with(DoubleStep)
.then(PermanentFailStep)
.with_retry(RetryPolicy::fixed(2, std::time::Duration::from_millis(1)))
.with_recorder(recorder)
.build();
let input = TestInput {
id: "entity-456".to_string(),
value: 5,
};
let result = pipeline.run(input).await;
assert!(result.is_err());
let run_status: String =
sqlx::query_scalar("SELECT status FROM delta_runs WHERE pipeline_name = 'test_failure'")
.fetch_one(&pool)
.await
.expect("Failed to fetch run status");
assert_eq!(run_status, "failed");
let error_message: Option<String> = sqlx::query_scalar(
"SELECT error_message FROM delta_runs WHERE pipeline_name = 'test_failure'",
)
.fetch_one(&pool)
.await
.expect("Failed to fetch error message");
assert!(error_message.is_some());
assert!(error_message.unwrap().contains("permanent_fail"));
let first_step_status: String = sqlx::query_scalar(
"SELECT status FROM delta_steps WHERE run_id = (SELECT id FROM delta_runs WHERE pipeline_name = 'test_failure') AND step_index = 0",
)
.fetch_one(&pool)
.await
.expect("Failed to fetch first step status");
assert_eq!(first_step_status, "completed");
let failing_step_status: String = sqlx::query_scalar(
"SELECT status FROM delta_steps WHERE run_id = (SELECT id FROM delta_runs WHERE pipeline_name = 'test_failure') AND step_index = 1",
)
.fetch_one(&pool)
.await
.expect("Failed to fetch failing step status");
assert_eq!(failing_step_status, "failed");
let step_error: Option<String> = sqlx::query_scalar(
"SELECT error_message FROM delta_steps WHERE run_id = (SELECT id FROM delta_runs WHERE pipeline_name = 'test_failure') AND step_index = 1",
)
.fetch_one(&pool)
.await
.expect("Failed to fetch step error message");
assert!(step_error.is_some());
assert!(step_error.unwrap().contains("permanent failure"));
}
#[tokio::test]
async fn entity_id_recording() {
let pool = SqlitePoolOptions::new()
.connect(":memory:")
.await
.expect("Failed to create in-memory database");
let recorder = SqliteRecorder::new(pool.clone());
recorder
.run_migrations()
.await
.expect("Failed to run migrations");
let pipeline = Pipeline::new("test_entity_id")
.start_with(DoubleStep)
.with_recorder(recorder)
.build();
let input = TestInput {
id: "unique-entity-789".to_string(),
value: 42,
};
let result = pipeline.run(input).await;
assert!(result.is_ok());
let stored_entity_id: String = sqlx::query_scalar(
"SELECT entity_id FROM delta_runs WHERE pipeline_name = 'test_entity_id'",
)
.fetch_one(&pool)
.await
.expect("Failed to fetch entity_id");
assert_eq!(stored_entity_id, "unique-entity-789");
let pipeline_name: String = sqlx::query_scalar(
"SELECT pipeline_name FROM delta_runs WHERE entity_id = 'unique-entity-789'",
)
.fetch_one(&pool)
.await
.expect("Failed to query by entity_id");
assert_eq!(pipeline_name, "test_entity_id");
}