use chrono::Utc;
use serde_json::json;
use sqlx::postgres::PgPoolOptions;
use std::env::var;
use std::sync::Arc;
use stormchaser_engine::handler;
use stormchaser_model::auth::OpaClient;
use stormchaser_model::step::{StepInstance, StepStatus};
use stormchaser_model::RunId;
use uuid::Uuid;
use stormchaser_tls::TlsConfig;
use stormchaser_tls::TlsReloader;
#[tokio::test]
async fn test_artifact_persistence_on_completion() {
let db_url = var("DATABASE_URL").unwrap_or_else(|_| {
dotenvy::dotenv().ok();
format!(
"postgres://stormchaser:{}@localhost:5432/stormchaser",
var("STORMCHASER_DEV_PASSWORD")
.expect("STORMCHASER_DEV_PASSWORD must be set if DATABASE_URL is not set")
)
});
let pool = PgPoolOptions::new()
.max_connections(5)
.connect(&db_url)
.await
.unwrap();
let connection_id = Uuid::new_v4();
let backend_name = format!("test-minio-{}", connection_id);
sqlx::query(
r#"
INSERT INTO connections (id, name, connection_type, config, is_default_sfs)
VALUES ($1, $2, 's3', $3, FALSE)
"#,
)
.bind(connection_id)
.bind(&backend_name)
.bind(json!({"bucket": "test-bucket", "endpoint": "http://localhost:9000"}))
.execute(&pool)
.await
.unwrap();
let run_id = RunId::new_v4();
let dsl = format!(
r#"
stormchaser_dsl_version = "v1"
workflow "artifact-test" {{
storage "workspace" {{
size = "1Gi"
backend = "{}"
artifact "app-bin" {{
path = "bin/app"
retention = "7d"
}}
}}
steps {{
step "build" "RunContainer" {{
image = "alpine"
storage_mounts = [
{{ name = "workspace", mount_path = "/workspace" }}
]
}}
}}
}}
"#,
backend_name
);
let nats_url = var("NATS_URL").unwrap_or_else(|_| "nats://localhost:4222".into());
let nats_client = async_nats::connect(nats_url).await.unwrap();
let opa_client = Arc::new(OpaClient::new(None, None));
let payload = json!({
"run_id": run_id,
"dsl": dsl,
"inputs": {},
"initiating_user": "test"
});
handler::handle_workflow_direct(
payload,
pool.clone(),
opa_client.clone(),
nats_client.clone(),
)
.await
.unwrap();
handler::handle_workflow_start_pending(
run_id,
pool.clone(),
nats_client.clone(),
Arc::new(TlsReloader::new(TlsConfig::default()).await.unwrap()),
)
.await
.unwrap();
let step: StepInstance = sqlx::query_as(
r#"SELECT id, run_id, step_name, step_type, status as "status", iteration_index, runner_id, affinity_context, started_at, finished_at, exit_code, error, spec, params, created_at FROM step_instances WHERE run_id = $1"#
)
.bind(run_id)
.fetch_one(&pool)
.await
.unwrap();
let fencing_token: i64 =
sqlx::query_scalar("SELECT fencing_token FROM workflow_runs WHERE id = $1")
.bind(run_id)
.fetch_one(&pool)
.await
.expect("workflow run fencing token should be queryable");
let completed_payload = json!({
"run_id": run_id,
"step_id": step.id,"event_type": "StepCompletedEvent", "fencing_token": fencing_token,
"timestamp": Utc::now(),
"status": StepStatus::Succeeded,
"exit_code": 0,
"artifacts": {
"app-bin": {
"hash": "abc123hash",
"size": 1024,
"content_type": "application/octet-stream"
}
}
});
let log_backend = Arc::new(None);
handler::handle_step_completed(
serde_json::from_value(completed_payload).unwrap(),
pool.clone(),
nats_client.clone(),
log_backend.clone(),
Arc::new(TlsReloader::new(TlsConfig::default()).await.unwrap()),
)
.await
.unwrap();
let artifact_exists: bool = sqlx::query_scalar(
"SELECT EXISTS(SELECT 1 FROM archived_artifact_registry WHERE run_id = $1 AND artifact_name = $2)",
)
.bind(run_id)
.bind("app-bin")
.fetch_one(&pool)
.await
.unwrap();
assert!(
artifact_exists,
"Artifact should have been registered in archived_artifact_registry"
);
let artifact: (String, Uuid, String) = sqlx::query_as(
"SELECT artifact_name, connection_id, remote_path FROM archived_artifact_registry WHERE run_id = $1",
)
.bind(run_id)
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(artifact.0, "app-bin");
assert_eq!(artifact.1, connection_id);
assert_eq!(
artifact.2,
format!("artifacts/{}/workspace/app-bin", run_id)
);
}
#[tokio::test]
async fn test_test_report_persistence_on_completion() {
let db_url = var("DATABASE_URL").unwrap_or_else(|_| {
dotenvy::dotenv().ok();
format!(
"postgres://stormchaser:{}@localhost:5432/stormchaser",
var("STORMCHASER_DEV_PASSWORD")
.expect("STORMCHASER_DEV_PASSWORD must be set if DATABASE_URL is not set")
)
});
let pool = PgPoolOptions::new()
.max_connections(5)
.connect(&db_url)
.await
.unwrap();
let run_id = RunId::new_v4();
let dsl = r#"
stormchaser_dsl_version = "v1"
workflow "report-test" {
steps {
step "test" "RunContainer" {
image = "alpine"
reports {
report "unit-tests" {
path = "target/results/*.xml"
format = "junit"
}
}
}
}
}
"#;
let nats_url = var("NATS_URL").unwrap_or_else(|_| "nats://localhost:4222".into());
let nats_client = async_nats::connect(nats_url).await.unwrap();
let opa_client = Arc::new(OpaClient::new(None, None));
let payload = json!({
"run_id": run_id,
"dsl": dsl,
"inputs": {},
"initiating_user": "test"
});
handler::handle_workflow_direct(
payload,
pool.clone(),
opa_client.clone(),
nats_client.clone(),
)
.await
.unwrap();
handler::handle_workflow_start_pending(
run_id,
pool.clone(),
nats_client.clone(),
Arc::new(TlsReloader::new(TlsConfig::default()).await.unwrap()),
)
.await
.unwrap();
let step: StepInstance = sqlx::query_as(
r#"SELECT id, run_id, step_name, step_type, status as "status", iteration_index, runner_id, affinity_context, started_at, finished_at, exit_code, error, spec, params, created_at FROM step_instances WHERE run_id = $1"#
)
.bind(run_id)
.fetch_one(&pool)
.await
.unwrap();
let fencing_token: i64 =
sqlx::query_scalar("SELECT fencing_token FROM workflow_runs WHERE id = $1")
.bind(run_id)
.fetch_one(&pool)
.await
.expect("workflow run fencing token should be queryable");
let completed_payload = json!({
"run_id": run_id,
"step_id": step.id,"event_type": "StepCompletedEvent", "fencing_token": fencing_token,
"timestamp": Utc::now(),
"status": StepStatus::Succeeded,
"exit_code": 0,
"test_reports": {
"unit-tests_results.xml": {
"name": "unit-tests",
"file_name": "results.xml",
"format": "junit",
"content": "<testsuite>...</testsuite>",
"hash": "hash123"
}
}
});
let log_backend = Arc::new(None);
handler::handle_step_completed(
serde_json::from_value(completed_payload).unwrap(),
pool.clone(),
nats_client.clone(),
log_backend.clone(),
Arc::new(TlsReloader::new(TlsConfig::default()).await.unwrap()),
)
.await
.unwrap();
let report_exists: bool = sqlx::query_scalar(
"SELECT EXISTS(SELECT 1 FROM archived_step_test_reports WHERE run_id = $1 AND report_name = $2)",
)
.bind(run_id)
.bind("unit-tests")
.fetch_one(&pool)
.await
.unwrap();
assert!(
report_exists,
"Test report should have been registered in archived_step_test_reports"
);
let report: (String, String, String, String) = sqlx::query_as(
"SELECT report_name, file_name, format, checksum FROM archived_step_test_reports WHERE run_id = $1",
)
.bind(run_id)
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(report.0, "unit-tests");
assert_eq!(report.1, "results.xml");
assert_eq!(report.2, "junit");
assert_eq!(report.3, "hash123");
}