mod postgres_multi_tenant_tests {
use cloacina::context::Context;
use cloacina::dal::DAL;
use cloacina::database::universal_types::UniversalUuid;
use cloacina::executor::WorkflowExecutor;
use cloacina::runner::DefaultRunner;
use cloacina::*;
use serde_json::Value;
use std::env;
use std::sync::Arc;
#[task(id = "tenant_marker_task", dependencies = [])]
async fn tenant_marker_task(context: &mut Context<Value>) -> Result<(), TaskError> {
context.insert("executed", Value::Bool(true))?;
Ok(())
}
fn setup_tenant_workflow(tenant_schema: &str, runtime: &cloacina::Runtime) -> Workflow {
let workflow_name = format!("isolation_test_{}", tenant_schema);
let workflow = Workflow::builder(&workflow_name)
.tenant(tenant_schema)
.description("Test workflow for multi-tenant isolation")
.add_task(Arc::new(tenant_marker_task_task()))
.unwrap()
.build()
.unwrap();
let namespace = TaskNamespace::new(
workflow.tenant(),
workflow.package(),
workflow.name(),
"tenant_marker_task",
);
let task = Arc::new(tenant_marker_task_task());
runtime.register_task(namespace, move || task.clone());
runtime.register_workflow(workflow.name().to_string(), {
let workflow = workflow.clone();
move || workflow.clone()
});
workflow
}
#[tokio::test]
async fn test_schema_isolation() -> Result<(), Box<dyn std::error::Error>> {
let database_url = env::var("DATABASE_URL").unwrap_or_else(|_| {
"postgresql://cloacina:cloacina@localhost:5432/cloacina".to_string()
});
let runtime = cloacina::Runtime::new();
let workflow_a = setup_tenant_workflow("tenant_iso_a", &runtime);
let workflow_b = setup_tenant_workflow("tenant_iso_b", &runtime);
let runner_a = DefaultRunner::builder()
.database_url(&database_url)
.schema("tenant_iso_a")
.runtime(runtime.clone())
.build()
.await?;
let runner_b = DefaultRunner::builder()
.database_url(&database_url)
.schema("tenant_iso_b")
.runtime(runtime)
.build()
.await?;
let context_a = Context::new();
let execution_a = runner_a.execute_async(workflow_a.name(), context_a).await?;
let execution_a_id = execution_a.execution_id;
execution_a.wait_for_completion().await?;
let dal_a = DAL::new(runner_a.database().clone());
let dal_b = DAL::new(runner_b.database().clone());
let executions_a = dal_a.workflow_execution().list_recent(100).await?;
assert!(
executions_a
.iter()
.any(|e| e.id == UniversalUuid(execution_a_id)),
"Tenant A should see their own execution"
);
let executions_b = dal_b.workflow_execution().list_recent(100).await?;
assert!(
!executions_b
.iter()
.any(|e| e.id == UniversalUuid(execution_a_id)),
"Tenant B should NOT see tenant A's execution - isolation violated!"
);
let context_b = Context::new();
let execution_b = runner_b.execute_async(workflow_b.name(), context_b).await?;
let execution_b_id = execution_b.execution_id;
execution_b.wait_for_completion().await?;
let executions_a = dal_a.workflow_execution().list_recent(100).await?;
let executions_b = dal_b.workflow_execution().list_recent(100).await?;
assert!(
executions_a
.iter()
.any(|e| e.id == UniversalUuid(execution_a_id)),
"Tenant A should still see their own execution"
);
assert!(
!executions_a
.iter()
.any(|e| e.id == UniversalUuid(execution_b_id)),
"Tenant A should NOT see tenant B's execution"
);
assert!(
executions_b
.iter()
.any(|e| e.id == UniversalUuid(execution_b_id)),
"Tenant B should see their own execution"
);
assert!(
!executions_b
.iter()
.any(|e| e.id == UniversalUuid(execution_a_id)),
"Tenant B should NOT see tenant A's execution"
);
runner_a.shutdown().await?;
runner_b.shutdown().await?;
Ok(())
}
#[tokio::test]
async fn test_independent_execution() -> Result<(), Box<dyn std::error::Error>> {
let database_url = env::var("DATABASE_URL").unwrap_or_else(|_| {
"postgresql://cloacina:cloacina@localhost:5432/cloacina".to_string()
});
let runtime = cloacina::Runtime::new();
let workflow_a = setup_tenant_workflow("tenant_indep_a", &runtime);
let workflow_b = setup_tenant_workflow("tenant_indep_b", &runtime);
let runner_a = DefaultRunner::builder()
.database_url(&database_url)
.schema("tenant_indep_a")
.runtime(runtime.clone())
.build()
.await?;
let runner_b = DefaultRunner::builder()
.database_url(&database_url)
.schema("tenant_indep_b")
.runtime(runtime)
.build()
.await?;
let context_a = Context::new();
let context_b = Context::new();
let (execution_a, execution_b) = tokio::join!(
runner_a.execute_async(workflow_a.name(), context_a),
runner_b.execute_async(workflow_b.name(), context_b)
);
let execution_a = execution_a?;
let execution_b = execution_b?;
let execution_a_id = execution_a.execution_id;
let execution_b_id = execution_b.execution_id;
assert_ne!(
execution_a_id, execution_b_id,
"Each tenant should have unique execution IDs"
);
let (result_a, result_b) = tokio::join!(
execution_a.wait_for_completion(),
execution_b.wait_for_completion()
);
result_a?;
result_b?;
let dal_a = DAL::new(runner_a.database().clone());
let dal_b = DAL::new(runner_b.database().clone());
let executions_a = dal_a.workflow_execution().list_recent(100).await?;
let executions_b = dal_b.workflow_execution().list_recent(100).await?;
let tenant_a_workflows: Vec<_> = executions_a
.iter()
.filter(|e| e.workflow_name.contains("tenant_indep_a"))
.collect();
let tenant_b_workflows: Vec<_> = executions_b
.iter()
.filter(|e| e.workflow_name.contains("tenant_indep_b"))
.collect();
assert!(
!tenant_a_workflows.is_empty(),
"Tenant A should have executions"
);
assert!(
!tenant_b_workflows.is_empty(),
"Tenant B should have executions"
);
runner_a.shutdown().await?;
runner_b.shutdown().await?;
Ok(())
}
#[tokio::test]
async fn test_invalid_schema_names() {
let database_url = "postgresql://cloacina:cloacina@localhost:5432/cloacina";
let result = DefaultRunner::with_schema(database_url, "tenant-123").await;
assert!(result.is_err());
let result = DefaultRunner::with_schema(database_url, "tenant 123").await;
assert!(result.is_err());
let result = DefaultRunner::with_schema(database_url, "tenant@123").await;
assert!(result.is_err());
let database_url = env::var("DATABASE_URL").unwrap_or_else(|_| database_url.to_string());
let result = DefaultRunner::with_schema(&database_url, "tenant_123").await;
if let Ok(executor) = result {
let _ = executor.shutdown().await;
}
}
#[tokio::test]
async fn test_sqlite_schema_rejection() {
let result = DefaultRunner::builder()
.database_url("sqlite://test.db")
.schema("tenant_123")
.build()
.await;
assert!(matches!(
result,
Err(WorkflowExecutionError::Configuration { .. })
));
}
#[tokio::test]
async fn test_builder_pattern() -> Result<(), Box<dyn std::error::Error>> {
let database_url = env::var("DATABASE_URL").unwrap_or_else(|_| {
"postgresql://cloacina:cloacina@localhost:5432/cloacina".to_string()
});
let executor = DefaultRunner::builder()
.database_url(&database_url)
.schema("tenant_builder_test")
.build()
.await?;
executor.shutdown().await?;
Ok(())
}
}
mod sqlite_multi_tenant_tests {
use cloacina::context::Context;
use cloacina::dal::DAL;
use cloacina::database::universal_types::UniversalUuid;
use cloacina::executor::WorkflowExecutor;
use cloacina::runner::DefaultRunner;
use cloacina::*;
use serde_json::Value;
use std::sync::Arc;
#[task(id = "sqlite_tenant_task", dependencies = [])]
async fn sqlite_tenant_task(context: &mut Context<Value>) -> Result<(), TaskError> {
context.insert("sqlite_executed", Value::Bool(true))?;
Ok(())
}
fn setup_sqlite_workflow(db_name: &str, runtime: &cloacina::Runtime) -> Workflow {
let workflow_name = format!("sqlite_isolation_{}", db_name);
let workflow = Workflow::builder(&workflow_name)
.description("Test workflow for SQLite multi-tenant isolation")
.add_task(Arc::new(sqlite_tenant_task_task()))
.unwrap()
.build()
.unwrap();
let namespace = TaskNamespace::new(
workflow.tenant(),
workflow.package(),
workflow.name(),
"sqlite_tenant_task",
);
let task = Arc::new(sqlite_tenant_task_task());
runtime.register_task(namespace, move || task.clone());
runtime.register_workflow(workflow.name().to_string(), {
let workflow = workflow.clone();
move || workflow.clone()
});
workflow
}
#[tokio::test]
async fn test_sqlite_file_isolation() -> Result<(), Box<dyn std::error::Error>> {
let tmp = tempfile::TempDir::new()?;
let db_a = tmp.path().join("tenant_a.db");
let db_b = tmp.path().join("tenant_b.db");
let runtime = cloacina::Runtime::new();
let workflow_a = setup_sqlite_workflow("a", &runtime);
let workflow_b = setup_sqlite_workflow("b", &runtime);
let url_a = format!("sqlite://{}", db_a.display());
let url_b = format!("sqlite://{}", db_b.display());
let runner_a = DefaultRunner::builder()
.database_url(&url_a)
.runtime(runtime.clone())
.build()
.await?;
let runner_b = DefaultRunner::builder()
.database_url(&url_b)
.runtime(runtime)
.build()
.await?;
let context_a = Context::new();
let execution_a = runner_a.execute_async(workflow_a.name(), context_a).await?;
let execution_a_id = execution_a.execution_id;
execution_a.wait_for_completion().await?;
let dal_a = DAL::new(runner_a.database().clone());
let dal_b = DAL::new(runner_b.database().clone());
let executions_a = dal_a.workflow_execution().list_recent(100).await?;
assert!(
executions_a
.iter()
.any(|e| e.id == UniversalUuid(execution_a_id)),
"Tenant A should see their own execution"
);
let executions_b = dal_b.workflow_execution().list_recent(100).await?;
assert!(
!executions_b
.iter()
.any(|e| e.id == UniversalUuid(execution_a_id)),
"Tenant B should NOT see tenant A's execution - file isolation"
);
let context_b = Context::new();
let execution_b = runner_b.execute_async(workflow_b.name(), context_b).await?;
let execution_b_id = execution_b.execution_id;
execution_b.wait_for_completion().await?;
let executions_a = dal_a.workflow_execution().list_recent(100).await?;
let executions_b = dal_b.workflow_execution().list_recent(100).await?;
assert!(executions_a
.iter()
.any(|e| e.id == UniversalUuid(execution_a_id)));
assert!(!executions_a
.iter()
.any(|e| e.id == UniversalUuid(execution_b_id)));
assert!(executions_b
.iter()
.any(|e| e.id == UniversalUuid(execution_b_id)));
assert!(!executions_b
.iter()
.any(|e| e.id == UniversalUuid(execution_a_id)));
runner_a.shutdown().await?;
runner_b.shutdown().await?;
Ok(())
}
#[tokio::test]
async fn test_sqlite_separate_files() -> Result<(), Box<dyn std::error::Error>> {
let tmp = tempfile::TempDir::new()?;
let db_file = tmp.path().join("test_sep.db");
let executor = DefaultRunner::new(&format!("sqlite://{}", db_file.display())).await?;
assert!(db_file.exists(), "Database file should be created");
executor.shutdown().await?;
Ok(())
}
}