mod postgres_multi_tenant_tests {
use cloacina::context::Context;
use cloacina::dal::DAL;
use cloacina::database::universal_types::UniversalUuid;
use cloacina::executor::PipelineExecutor;
use cloacina::runner::DefaultRunner;
use cloacina::*;
use cloacina::{register_task_constructor, register_workflow_constructor};
use serde_json::Value;
use std::env;
use std::sync::Arc;
#[task(id = "tenant_marker_task", dependencies = [])]
async fn tenant_marker_task(context: &mut Context<Value>) -> Result<(), TaskError> {
context.insert("executed", Value::Bool(true))?;
Ok(())
}
fn setup_tenant_workflow(tenant_schema: &str) -> Workflow {
let workflow_name = format!("isolation_test_{}", tenant_schema);
let workflow = Workflow::builder(&workflow_name)
.tenant(tenant_schema)
.description("Test workflow for multi-tenant isolation")
.add_task(Arc::new(tenant_marker_task_task()))
.unwrap()
.build()
.unwrap();
let namespace = TaskNamespace::new(
workflow.tenant(),
workflow.package(),
workflow.name(),
"tenant_marker_task",
);
register_task_constructor(namespace, || Arc::new(tenant_marker_task_task()));
register_workflow_constructor(workflow.name().to_string(), {
let workflow = workflow.clone();
move || workflow.clone()
});
workflow
}
#[tokio::test]
async fn test_schema_isolation() -> Result<(), Box<dyn std::error::Error>> {
let database_url = env::var("DATABASE_URL").unwrap_or_else(|_| {
"postgresql://cloacina:cloacina@localhost:5432/cloacina".to_string()
});
let runner_a = DefaultRunner::with_schema(&database_url, "tenant_iso_a").await?;
let runner_b = DefaultRunner::with_schema(&database_url, "tenant_iso_b").await?;
let workflow_a = setup_tenant_workflow("tenant_iso_a");
let workflow_b = setup_tenant_workflow("tenant_iso_b");
let context_a = Context::new();
let execution_a = runner_a.execute_async(workflow_a.name(), context_a).await?;
let execution_a_id = execution_a.execution_id;
execution_a.wait_for_completion().await?;
let dal_a = DAL::new(runner_a.database().clone());
let dal_b = DAL::new(runner_b.database().clone());
let executions_a = dal_a.pipeline_execution().list_recent(100).await?;
assert!(
executions_a
.iter()
.any(|e| e.id == UniversalUuid(execution_a_id)),
"Tenant A should see their own execution"
);
let executions_b = dal_b.pipeline_execution().list_recent(100).await?;
assert!(
!executions_b
.iter()
.any(|e| e.id == UniversalUuid(execution_a_id)),
"Tenant B should NOT see tenant A's execution - isolation violated!"
);
let context_b = Context::new();
let execution_b = runner_b.execute_async(workflow_b.name(), context_b).await?;
let execution_b_id = execution_b.execution_id;
execution_b.wait_for_completion().await?;
let executions_a = dal_a.pipeline_execution().list_recent(100).await?;
let executions_b = dal_b.pipeline_execution().list_recent(100).await?;
assert!(
executions_a
.iter()
.any(|e| e.id == UniversalUuid(execution_a_id)),
"Tenant A should still see their own execution"
);
assert!(
!executions_a
.iter()
.any(|e| e.id == UniversalUuid(execution_b_id)),
"Tenant A should NOT see tenant B's execution"
);
assert!(
executions_b
.iter()
.any(|e| e.id == UniversalUuid(execution_b_id)),
"Tenant B should see their own execution"
);
assert!(
!executions_b
.iter()
.any(|e| e.id == UniversalUuid(execution_a_id)),
"Tenant B should NOT see tenant A's execution"
);
runner_a.shutdown().await?;
runner_b.shutdown().await?;
Ok(())
}
#[tokio::test]
async fn test_independent_execution() -> Result<(), Box<dyn std::error::Error>> {
let database_url = env::var("DATABASE_URL").unwrap_or_else(|_| {
"postgresql://cloacina:cloacina@localhost:5432/cloacina".to_string()
});
let runner_a = DefaultRunner::with_schema(&database_url, "tenant_indep_a").await?;
let runner_b = DefaultRunner::with_schema(&database_url, "tenant_indep_b").await?;
let workflow_a = setup_tenant_workflow("tenant_indep_a");
let workflow_b = setup_tenant_workflow("tenant_indep_b");
let context_a = Context::new();
let context_b = Context::new();
let (execution_a, execution_b) = tokio::join!(
runner_a.execute_async(workflow_a.name(), context_a),
runner_b.execute_async(workflow_b.name(), context_b)
);
let execution_a = execution_a?;
let execution_b = execution_b?;
let execution_a_id = execution_a.execution_id;
let execution_b_id = execution_b.execution_id;
assert_ne!(
execution_a_id, execution_b_id,
"Each tenant should have unique execution IDs"
);
let (result_a, result_b) = tokio::join!(
execution_a.wait_for_completion(),
execution_b.wait_for_completion()
);
result_a?;
result_b?;
let dal_a = DAL::new(runner_a.database().clone());
let dal_b = DAL::new(runner_b.database().clone());
let executions_a = dal_a.pipeline_execution().list_recent(100).await?;
let executions_b = dal_b.pipeline_execution().list_recent(100).await?;
let tenant_a_workflows: Vec<_> = executions_a
.iter()
.filter(|e| e.pipeline_name.contains("tenant_indep_a"))
.collect();
let tenant_b_workflows: Vec<_> = executions_b
.iter()
.filter(|e| e.pipeline_name.contains("tenant_indep_b"))
.collect();
assert!(
!tenant_a_workflows.is_empty(),
"Tenant A should have executions"
);
assert!(
!tenant_b_workflows.is_empty(),
"Tenant B should have executions"
);
runner_a.shutdown().await?;
runner_b.shutdown().await?;
Ok(())
}
#[tokio::test]
async fn test_invalid_schema_names() {
let database_url = "postgresql://cloacina:cloacina@localhost:5432/cloacina";
let result = DefaultRunner::with_schema(database_url, "tenant-123").await;
assert!(result.is_err());
let result = DefaultRunner::with_schema(database_url, "tenant 123").await;
assert!(result.is_err());
let result = DefaultRunner::with_schema(database_url, "tenant@123").await;
assert!(result.is_err());
let database_url = env::var("DATABASE_URL").unwrap_or_else(|_| database_url.to_string());
let result = DefaultRunner::with_schema(&database_url, "tenant_123").await;
if let Ok(executor) = result {
let _ = executor.shutdown().await;
}
}
#[tokio::test]
async fn test_sqlite_schema_rejection() {
let result = DefaultRunner::builder()
.database_url("sqlite://test.db")
.schema("tenant_123")
.build()
.await;
assert!(matches!(result, Err(PipelineError::Configuration { .. })));
}
#[tokio::test]
async fn test_builder_pattern() -> Result<(), Box<dyn std::error::Error>> {
let database_url = env::var("DATABASE_URL").unwrap_or_else(|_| {
"postgresql://cloacina:cloacina@localhost:5432/cloacina".to_string()
});
let executor = DefaultRunner::builder()
.database_url(&database_url)
.schema("tenant_builder_test")
.build()
.await?;
executor.shutdown().await?;
Ok(())
}
}
mod sqlite_multi_tenant_tests {
use cloacina::context::Context;
use cloacina::dal::DAL;
use cloacina::database::universal_types::UniversalUuid;
use cloacina::executor::PipelineExecutor;
use cloacina::runner::DefaultRunner;
use cloacina::*;
use cloacina::{register_task_constructor, register_workflow_constructor};
use serde_json::Value;
use std::sync::Arc;
#[task(id = "sqlite_tenant_task", dependencies = [])]
async fn sqlite_tenant_task(context: &mut Context<Value>) -> Result<(), TaskError> {
context.insert("sqlite_executed", Value::Bool(true))?;
Ok(())
}
fn setup_sqlite_workflow(db_name: &str) -> Workflow {
let workflow_name = format!("sqlite_isolation_{}", db_name);
let workflow = Workflow::builder(&workflow_name)
.description("Test workflow for SQLite multi-tenant isolation")
.add_task(Arc::new(sqlite_tenant_task_task()))
.unwrap()
.build()
.unwrap();
let namespace = TaskNamespace::new(
workflow.tenant(),
workflow.package(),
workflow.name(),
"sqlite_tenant_task",
);
register_task_constructor(namespace, || Arc::new(sqlite_tenant_task_task()));
register_workflow_constructor(workflow.name().to_string(), {
let workflow = workflow.clone();
move || workflow.clone()
});
workflow
}
#[tokio::test]
async fn test_sqlite_file_isolation() -> Result<(), Box<dyn std::error::Error>> {
let db_a = "sqlite_tenant_a_iso_test.db";
let db_b = "sqlite_tenant_b_iso_test.db";
let _ = std::fs::remove_file(db_a);
let _ = std::fs::remove_file(db_b);
let runner_a = DefaultRunner::new(&format!("sqlite://{}", db_a)).await?;
let runner_b = DefaultRunner::new(&format!("sqlite://{}", db_b)).await?;
let workflow_a = setup_sqlite_workflow("a");
let workflow_b = setup_sqlite_workflow("b");
let context_a = Context::new();
let execution_a = runner_a.execute_async(workflow_a.name(), context_a).await?;
let execution_a_id = execution_a.execution_id;
execution_a.wait_for_completion().await?;
let dal_a = DAL::new(runner_a.database().clone());
let dal_b = DAL::new(runner_b.database().clone());
let executions_a = dal_a.pipeline_execution().list_recent(100).await?;
assert!(
executions_a
.iter()
.any(|e| e.id == UniversalUuid(execution_a_id)),
"Tenant A should see their own execution"
);
let executions_b = dal_b.pipeline_execution().list_recent(100).await?;
assert!(
!executions_b
.iter()
.any(|e| e.id == UniversalUuid(execution_a_id)),
"Tenant B should NOT see tenant A's execution - file isolation"
);
let context_b = Context::new();
let execution_b = runner_b.execute_async(workflow_b.name(), context_b).await?;
let execution_b_id = execution_b.execution_id;
execution_b.wait_for_completion().await?;
let executions_a = dal_a.pipeline_execution().list_recent(100).await?;
let executions_b = dal_b.pipeline_execution().list_recent(100).await?;
assert!(executions_a
.iter()
.any(|e| e.id == UniversalUuid(execution_a_id)));
assert!(!executions_a
.iter()
.any(|e| e.id == UniversalUuid(execution_b_id)));
assert!(executions_b
.iter()
.any(|e| e.id == UniversalUuid(execution_b_id)));
assert!(!executions_b
.iter()
.any(|e| e.id == UniversalUuid(execution_a_id)));
runner_a.shutdown().await?;
runner_b.shutdown().await?;
let _ = std::fs::remove_file(db_a);
let _ = std::fs::remove_file(db_b);
Ok(())
}
#[tokio::test]
async fn test_sqlite_separate_files() -> Result<(), Box<dyn std::error::Error>> {
let db_file = "multi_tenant_test_sep.db";
let _ = std::fs::remove_file(db_file);
let executor = DefaultRunner::new(&format!("sqlite://{}", db_file)).await?;
assert!(
std::path::Path::new(db_file).exists(),
"Database file should be created"
);
executor.shutdown().await?;
let _ = std::fs::remove_file(db_file);
Ok(())
}
}