duroxide-cdb 0.1.10

A CosmosDB-based provider implementation for Duroxide, a durable task orchestration framework
Documentation
use duroxide::providers::{ExecutionMetadata, Provider, WorkItem};
use duroxide::{Event, EventKind};
use duroxide_cdb::CosmosDBProvider;
use std::sync::Arc as StdArc;
use std::time::{Duration, Instant};

#[allow(dead_code)]
fn get_cosmos_endpoint() -> String {
    dotenvy::dotenv().ok();
    std::env::var("COSMOS_ENDPOINT").unwrap_or_else(|_| "http://localhost:8081".to_string())
}

#[allow(dead_code)]
fn get_cosmos_key() -> String {
    dotenvy::dotenv().ok();
    std::env::var("COSMOS_KEY").unwrap_or_else(|_| {
        "C2y6yDjf5/R+ob0N8A7Cgv30VRDJIWEHLM+4QDU5DE2nQ9nDuVTqobD4b8mGGyPMbIZnqyMsEcaGQy67XIw/Jw==".to_string()
    })
}

#[allow(dead_code)]
fn get_cosmos_database() -> String {
    dotenvy::dotenv().ok();
    std::env::var("COSMOS_DATABASE").unwrap_or_else(|_| "duroxide".to_string())
}

#[allow(dead_code)]
fn next_container_name() -> String {
    let guid = uuid::Uuid::new_v4().to_string();
    let suffix = &guid[guid.len() - 8..];
    format!("e2e-test-{suffix}")
}

#[allow(dead_code)]
pub async fn wait_for_history<F>(
    store: StdArc<dyn Provider>,
    instance: &str,
    predicate: F,
    timeout_ms: u64,
) -> bool
where
    F: Fn(&Vec<Event>) -> bool,
{
    wait_for_history_event(
        store,
        instance,
        |hist| if predicate(hist) { Some(()) } else { None },
        timeout_ms,
    )
    .await
    .is_some()
}

#[allow(dead_code)]
pub async fn wait_for_subscription(
    store: StdArc<dyn Provider>,
    instance: &str,
    name: &str,
    timeout_ms: u64,
) -> bool {
    wait_for_history(
        store,
        instance,
        |hist| {
            hist.iter().any(
                |e| matches!(&e.kind, EventKind::ExternalSubscribed { name: n, .. } if n == name),
            )
        },
        timeout_ms,
    )
    .await
}

#[allow(dead_code)]
pub async fn wait_for_history_event<T, F>(
    store: StdArc<dyn Provider>,
    instance: &str,
    selector: F,
    timeout_ms: u64,
) -> Option<T>
where
    T: Clone,
    F: Fn(&Vec<Event>) -> Option<T>,
{
    let deadline = Instant::now() + Duration::from_millis(timeout_ms);
    loop {
        let hist = store.read(instance).await.unwrap_or_default();
        if let Some(e) = selector(&hist) {
            return Some(e);
        }
        if Instant::now() > deadline {
            return None;
        }
        tokio::time::sleep(Duration::from_millis(5)).await;
    }
}

#[allow(dead_code)]
pub async fn create_cosmos_store() -> (StdArc<dyn Provider>, String) {
    let endpoint = get_cosmos_endpoint();
    let key = get_cosmos_key();
    let database = get_cosmos_database();
    let container = next_container_name();

    let provider = CosmosDBProvider::new_with_container(&endpoint, &key, &database, &container)
        .await
        .expect("Failed to create CosmosDB provider for e2e tests");

    (StdArc::new(provider) as StdArc<dyn Provider>, container)
}

/// Create a second, independent provider instance pointing to the same
/// container. This simulates a separate node with its own HTTP client,
/// session token cache, and outbox reconciler — unlike `Arc::clone()`.
#[allow(dead_code)]
pub async fn create_cosmos_store_for_container(container: &str) -> StdArc<dyn Provider> {
    let endpoint = get_cosmos_endpoint();
    let key = get_cosmos_key();
    let database = get_cosmos_database();

    let provider = CosmosDBProvider::new_with_container(&endpoint, &key, &database, container)
        .await
        .expect("Failed to create second CosmosDB provider for same container");

    StdArc::new(provider) as StdArc<dyn Provider>
}

/// Create a raw CosmosDBProvider (not wrapped in Arc) for tests that need
/// to configure the provider before wrapping (e.g., fault injection).
#[allow(dead_code)]
pub async fn create_cosmos_provider() -> (CosmosDBProvider, String) {
    let endpoint = get_cosmos_endpoint();
    let key = get_cosmos_key();
    let database = get_cosmos_database();
    let container = next_container_name();

    let provider = CosmosDBProvider::new_with_container(&endpoint, &key, &database, &container)
        .await
        .expect("Failed to create CosmosDB provider");

    (provider, container)
}

/// Create a provider with a fast reconciler for outbox fault injection tests.
/// Reconciler runs every 500ms with a 200ms age threshold.
#[allow(dead_code)]
pub async fn create_cosmos_provider_fast_reconciler() -> (CosmosDBProvider, String) {
    let endpoint = get_cosmos_endpoint();
    let key = get_cosmos_key();
    let database = get_cosmos_database();
    let container = next_container_name();

    let config = duroxide_cdb::CosmosDBProviderConfig {
        endpoint,
        key,
        database,
        container: container.clone(),
        reconciler_interval: Duration::from_millis(500),
        reconciler_age_threshold: Duration::from_millis(200),
        ..Default::default()
    };

    let provider = CosmosDBProvider::new_with_config(config)
        .await
        .expect("Failed to create CosmosDB provider with fast reconciler");

    (provider, container)
}

/// Clean up a test container by deleting it
#[allow(dead_code)]
pub async fn cleanup_container(container_name: &str) {
    let endpoint = get_cosmos_endpoint();
    let key = get_cosmos_key();
    let database = get_cosmos_database();

    if let Ok(provider) =
        CosmosDBProvider::new_with_container(&endpoint, &key, &database, container_name).await
    {
        let _ = provider.cleanup().await;
    }
}

/// Test helper to create a new orchestration instance with initial history.
///
/// This replicates what the runtime does in production by using real provider APIs:
/// 1. Enqueues StartOrchestration work item
/// 2. Fetches it to get a lock token
/// 3. Acks with OrchestrationStarted event
///
/// Use this to seed test state without spinning up a full runtime.
#[allow(dead_code)]
pub async fn test_create_execution(
    provider: &dyn Provider,
    instance: &str,
    orchestration: &str,
    version: &str,
    input: &str,
    parent_instance: Option<&str>,
    parent_id: Option<u64>,
) -> Result<u64, String> {
    // Calculate next execution ID (max + 1, or INITIAL if none exist)
    let admin = provider
        .as_management_capability()
        .ok_or_else(|| "Provider doesn't support management operations".to_string())?;
    let execs = admin
        .list_executions(instance)
        .await
        .map_err(|e| e.message.clone())?;
    let next_execution_id = if execs.is_empty() {
        duroxide::INITIAL_EXECUTION_ID
    } else {
        execs.iter().max().copied().unwrap() + 1
    };

    // Enqueue StartOrchestration work item with calculated execution_id
    provider
        .enqueue_for_orchestrator(
            WorkItem::StartOrchestration {
                instance: instance.to_string(),
                orchestration: orchestration.to_string(),
                version: Some(version.to_string()),
                input: input.to_string(),
                parent_instance: parent_instance.map(|s| s.to_string()),
                parent_id,
                execution_id: next_execution_id,
            },
            None,
        )
        .await
        .map_err(|e| e.message.clone())?;

    // Fetch to get lock token
    let (_item, lock_token, _attempt_count) = provider
        .fetch_orchestration_item(
            std::time::Duration::from_secs(30),
            std::time::Duration::ZERO,
            None,
        )
        .await
        .map_err(|e| e.message.clone())?
        .ok_or_else(|| "Failed to fetch orchestration item".to_string())?;

    let execution_id = next_execution_id;

    // Ack with OrchestrationStarted event
    provider
        .ack_orchestration_item(
            &lock_token,
            execution_id,
            vec![Event::with_event_id(
                duroxide::INITIAL_EVENT_ID,
                instance,
                execution_id,
                None,
                EventKind::OrchestrationStarted {
                    name: orchestration.to_string(),
                    version: version.to_string(),
                    input: input.to_string(),
                    parent_instance: parent_instance.map(|s| s.to_string()),
                    parent_id,
                    carry_forward_events: None,
                    initial_custom_status: None,
                },
            )],
            vec![],
            vec![],
            ExecutionMetadata {
                orchestration_name: Some(orchestration.to_string()),
                orchestration_version: Some(version.to_string()),
                ..Default::default()
            },
            vec![],
        )
        .await
        .map_err(|e| e.message.clone())?;

    Ok(execution_id)
}