use duroxide::providers::{ExecutionMetadata, Provider, WorkItem};
use duroxide::{Event, EventKind};
use duroxide_cdb::CosmosDBProvider;
use std::sync::Arc as StdArc;
use std::time::{Duration, Instant};
#[allow(dead_code)]
fn get_cosmos_endpoint() -> String {
dotenvy::dotenv().ok();
std::env::var("COSMOS_ENDPOINT").unwrap_or_else(|_| "http://localhost:8081".to_string())
}
#[allow(dead_code)]
fn get_cosmos_key() -> String {
dotenvy::dotenv().ok();
std::env::var("COSMOS_KEY").unwrap_or_else(|_| {
"C2y6yDjf5/R+ob0N8A7Cgv30VRDJIWEHLM+4QDU5DE2nQ9nDuVTqobD4b8mGGyPMbIZnqyMsEcaGQy67XIw/Jw==".to_string()
})
}
#[allow(dead_code)]
fn get_cosmos_database() -> String {
dotenvy::dotenv().ok();
std::env::var("COSMOS_DATABASE").unwrap_or_else(|_| "duroxide".to_string())
}
#[allow(dead_code)]
fn next_container_name() -> String {
let guid = uuid::Uuid::new_v4().to_string();
let suffix = &guid[guid.len() - 8..];
format!("e2e-test-{suffix}")
}
#[allow(dead_code)]
pub async fn wait_for_history<F>(
store: StdArc<dyn Provider>,
instance: &str,
predicate: F,
timeout_ms: u64,
) -> bool
where
F: Fn(&Vec<Event>) -> bool,
{
wait_for_history_event(
store,
instance,
|hist| if predicate(hist) { Some(()) } else { None },
timeout_ms,
)
.await
.is_some()
}
#[allow(dead_code)]
pub async fn wait_for_subscription(
store: StdArc<dyn Provider>,
instance: &str,
name: &str,
timeout_ms: u64,
) -> bool {
wait_for_history(
store,
instance,
|hist| {
hist.iter().any(
|e| matches!(&e.kind, EventKind::ExternalSubscribed { name: n, .. } if n == name),
)
},
timeout_ms,
)
.await
}
#[allow(dead_code)]
pub async fn wait_for_history_event<T, F>(
store: StdArc<dyn Provider>,
instance: &str,
selector: F,
timeout_ms: u64,
) -> Option<T>
where
T: Clone,
F: Fn(&Vec<Event>) -> Option<T>,
{
let deadline = Instant::now() + Duration::from_millis(timeout_ms);
loop {
let hist = store.read(instance).await.unwrap_or_default();
if let Some(e) = selector(&hist) {
return Some(e);
}
if Instant::now() > deadline {
return None;
}
tokio::time::sleep(Duration::from_millis(5)).await;
}
}
#[allow(dead_code)]
pub async fn create_cosmos_store() -> (StdArc<dyn Provider>, String) {
let endpoint = get_cosmos_endpoint();
let key = get_cosmos_key();
let database = get_cosmos_database();
let container = next_container_name();
let provider = CosmosDBProvider::new_with_container(&endpoint, &key, &database, &container)
.await
.expect("Failed to create CosmosDB provider for e2e tests");
(StdArc::new(provider) as StdArc<dyn Provider>, container)
}
#[allow(dead_code)]
pub async fn create_cosmos_store_for_container(container: &str) -> StdArc<dyn Provider> {
let endpoint = get_cosmos_endpoint();
let key = get_cosmos_key();
let database = get_cosmos_database();
let provider = CosmosDBProvider::new_with_container(&endpoint, &key, &database, container)
.await
.expect("Failed to create second CosmosDB provider for same container");
StdArc::new(provider) as StdArc<dyn Provider>
}
#[allow(dead_code)]
pub async fn create_cosmos_provider() -> (CosmosDBProvider, String) {
let endpoint = get_cosmos_endpoint();
let key = get_cosmos_key();
let database = get_cosmos_database();
let container = next_container_name();
let provider = CosmosDBProvider::new_with_container(&endpoint, &key, &database, &container)
.await
.expect("Failed to create CosmosDB provider");
(provider, container)
}
#[allow(dead_code)]
pub async fn create_cosmos_provider_fast_reconciler() -> (CosmosDBProvider, String) {
let endpoint = get_cosmos_endpoint();
let key = get_cosmos_key();
let database = get_cosmos_database();
let container = next_container_name();
let config = duroxide_cdb::CosmosDBProviderConfig {
endpoint,
key,
database,
container: container.clone(),
reconciler_interval: Duration::from_millis(500),
reconciler_age_threshold: Duration::from_millis(200),
..Default::default()
};
let provider = CosmosDBProvider::new_with_config(config)
.await
.expect("Failed to create CosmosDB provider with fast reconciler");
(provider, container)
}
#[allow(dead_code)]
pub async fn cleanup_container(container_name: &str) {
let endpoint = get_cosmos_endpoint();
let key = get_cosmos_key();
let database = get_cosmos_database();
if let Ok(provider) =
CosmosDBProvider::new_with_container(&endpoint, &key, &database, container_name).await
{
let _ = provider.cleanup().await;
}
}
#[allow(dead_code)]
pub async fn test_create_execution(
provider: &dyn Provider,
instance: &str,
orchestration: &str,
version: &str,
input: &str,
parent_instance: Option<&str>,
parent_id: Option<u64>,
) -> Result<u64, String> {
let admin = provider
.as_management_capability()
.ok_or_else(|| "Provider doesn't support management operations".to_string())?;
let execs = admin
.list_executions(instance)
.await
.map_err(|e| e.message.clone())?;
let next_execution_id = if execs.is_empty() {
duroxide::INITIAL_EXECUTION_ID
} else {
execs.iter().max().copied().unwrap() + 1
};
provider
.enqueue_for_orchestrator(
WorkItem::StartOrchestration {
instance: instance.to_string(),
orchestration: orchestration.to_string(),
version: Some(version.to_string()),
input: input.to_string(),
parent_instance: parent_instance.map(|s| s.to_string()),
parent_id,
execution_id: next_execution_id,
},
None,
)
.await
.map_err(|e| e.message.clone())?;
let (_item, lock_token, _attempt_count) = provider
.fetch_orchestration_item(
std::time::Duration::from_secs(30),
std::time::Duration::ZERO,
None,
)
.await
.map_err(|e| e.message.clone())?
.ok_or_else(|| "Failed to fetch orchestration item".to_string())?;
let execution_id = next_execution_id;
provider
.ack_orchestration_item(
&lock_token,
execution_id,
vec![Event::with_event_id(
duroxide::INITIAL_EVENT_ID,
instance,
execution_id,
None,
EventKind::OrchestrationStarted {
name: orchestration.to_string(),
version: version.to_string(),
input: input.to_string(),
parent_instance: parent_instance.map(|s| s.to_string()),
parent_id,
carry_forward_events: None,
initial_custom_status: None,
},
)],
vec![],
vec![],
ExecutionMetadata {
orchestration_name: Some(orchestration.to_string()),
orchestration_version: Some(version.to_string()),
..Default::default()
},
vec![],
)
.await
.map_err(|e| e.message.clone())?;
Ok(execution_id)
}