#[cfg(feature = "aws-backend")]
#[tokio::test]
async fn test_dynamo_progress_persistence_and_backfill_integration() {
use std::sync::Arc;
use datafold::progress::{DynamoDbProgressStore, ProgressStore, Job, JobType};
use datafold::fold_db_core::infrastructure::backfill_tracker::BackfillTracker;
use datafold::storage::config::DynamoDbConfig;
println!("Starting DynamoDB Progress Tracker Test...");
let table_name = std::env::var("TABLE_NAME").unwrap_or_else(|_| "datafold-process-dev".to_string());
let region = "us-east-1".to_string();
println!("Connecting to DynamoDB table: {}", table_name);
let config = aws_config::defaults(aws_config::BehaviorVersion::latest())
.region(aws_sdk_dynamodb::config::Region::new(region))
.load()
.await;
let client = aws_sdk_dynamodb::Client::new(&config);
use aws_sdk_dynamodb::types::{AttributeDefinition, KeySchemaElement, KeyType, ScalarAttributeType, BillingMode};
let table_exists = client.describe_table().table_name(&table_name).send().await.is_ok();
if !table_exists {
println!("Creating table {}", table_name);
client.create_table()
.table_name(&table_name)
.attribute_definitions(AttributeDefinition::builder().attribute_name("PK").attribute_type(ScalarAttributeType::S).build().unwrap())
.attribute_definitions(AttributeDefinition::builder().attribute_name("SK").attribute_type(ScalarAttributeType::S).build().unwrap())
.key_schema(KeySchemaElement::builder().attribute_name("PK").key_type(KeyType::Hash).build().unwrap())
.key_schema(KeySchemaElement::builder().attribute_name("SK").key_type(KeyType::Range).build().unwrap())
.billing_mode(BillingMode::PayPerRequest)
.send()
.await
.expect("Failed to create table");
loop {
let resp = client.describe_table().table_name(&table_name).send().await.unwrap();
if let Some(desc) = resp.table {
if let Some(status) = desc.table_status {
if matches!(status, aws_sdk_dynamodb::types::TableStatus::Active) {
break;
}
}
}
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
}
println!("Table active");
}
let store = Arc::new(DynamoDbProgressStore::new(client, table_name.clone()));
let job_id = format!("test-job-{}", uuid::Uuid::new_v4());
let user_id = "test-user-system".to_string();
let mut job = Job::new(job_id.clone(), JobType::Backfill)
.with_user(user_id.clone())
.with_metadata(serde_json::json!({"test": "true"}));
job.update_progress(10, "Started".to_string());
store.save(&job).await.expect("Failed to save job");
println!("Saved generic job {}", job_id);
tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
let jobs = store.list_by_user(&user_id).await.expect("Failed to list jobs");
let loaded = jobs.iter().find(|j| j.id == job_id);
assert!(loaded.is_some(), "Job should exist in user list");
let loaded_job = loaded.unwrap();
assert_eq!(loaded_job.id, job_id);
assert_eq!(loaded_job.user_id, Some(user_id.clone()));
assert_eq!(loaded_job.progress_percentage, 10);
println!("Testing BackfillTracker integration...");
let tracker = BackfillTracker::new(Some(store.clone()));
let backfill_hash = format!("bf-{}", uuid::Uuid::new_v4());
let schema_name = "test_schema".to_string();
let transform_id = "test_transform".to_string();
tracker.start_backfill_with_hash(backfill_hash.clone(), schema_name, transform_id.clone()).await;
tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
let bf_jobs_global = store.list_by_user("global").await.expect("Failed to list global jobs");
let bf_job = bf_jobs_global.iter().find(|j| j.id == backfill_hash);
assert!(bf_job.is_some(), "Backfill job should be persisted immediately");
let bf_job = bf_job.unwrap();
assert_eq!(bf_job.status, datafold::progress::JobStatus::Running);
tracker.set_mutations_expected(&backfill_hash, 10).await;
tracker.force_complete(&backfill_hash).await;
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
let bf_jobs_global_end = store.list_by_user("global").await.expect("Failed to list global jobs");
let bf_completed = bf_jobs_global_end.iter().find(|j| j.id == backfill_hash);
assert!(bf_completed.is_some());
let bf_completed = bf_completed.unwrap();
assert_eq!(bf_completed.status, datafold::progress::JobStatus::Completed);
println!("Backfill integration verified successfully!");
store.delete(&job_id).await.unwrap();
store.delete(&backfill_hash).await.unwrap();
}