camel-function 0.9.0

Function runtime service for out-of-process function execution
Documentation
#![cfg(feature = "docker-tests")]

use camel_api::function::*;
use camel_api::{Body, Exchange, Message};
use camel_function::{ContainerProvider, PullPolicy, RunnerHandle};
use std::time::Duration;

async fn build_runner_image() {
    let output = std::process::Command::new("docker")
        .args([
            "build",
            "-t",
            "kennycallado/deno-runner:test",
            &format!("{}/runner", env!("CARGO_MANIFEST_DIR")),
        ])
        .output()
        .expect("docker build failed");
    if !output.status.success() {
        panic!(
            "docker build failed: {}",
            String::from_utf8_lossy(&output.stderr)
        );
    }
}

fn create_provider(test_id: &str) -> ContainerProvider {
    ContainerProvider::builder()
        .image("kennycallado/deno-runner:test")
        .pull_policy(PullPolicy::Never)
        .boot_timeout(Duration::from_secs(15))
        .instance_id(test_id)
        .build()
        .expect("create container provider")
}

fn make_definition(id: &str, source: &str) -> FunctionDefinition {
    FunctionDefinition {
        id: FunctionId(id.to_string()),
        runtime: "deno".to_string(),
        source: source.to_string(),
        timeout_ms: 5000,
        route_id: None,
        step_index: None,
    }
}

async fn wait_for_health(
    provider: &ContainerProvider,
    handle: &RunnerHandle,
    timeout: Duration,
) -> Result<(), String> {
    let start = std::time::Instant::now();
    loop {
        match provider.health_runner(handle).await {
            Ok(camel_function::HealthReport::Healthy) => return Ok(()),
            _ => {
                if start.elapsed() > timeout {
                    return Err(format!("runner not healthy after {:?}", timeout));
                }
                tokio::time::sleep(Duration::from_millis(200)).await;
            }
        }
    }
}

async fn assert_clean(provider: &ContainerProvider) {
    assert!(
        provider.is_clean().await,
        "no containers should remain for instance {}, found: {:?}",
        provider.instance_id(),
        provider.list_instance_containers().await
    );
}

#[tokio::test]
async fn test_spawn_and_health() {
    build_runner_image().await;
    let provider = create_provider("spawn_health");
    let handle = provider.spawn_runner("deno").await.expect("spawn");
    wait_for_health(&provider, &handle, Duration::from_secs(10))
        .await
        .expect("health");
    let report = provider.health_runner(&handle).await.expect("health");
    assert!(matches!(report, camel_function::HealthReport::Healthy));
    provider.shutdown_runner(handle).await.expect("shutdown");
    assert_clean(&provider).await;
}

#[tokio::test]
async fn test_register_and_invoke() {
    build_runner_image().await;
    let provider = create_provider("register_invoke");
    let handle = provider.spawn_runner("deno").await.expect("spawn");
    wait_for_health(&provider, &handle, Duration::from_secs(10))
        .await
        .expect("health");

    let def = make_definition(
        "echo_fn",
        "export default (c) => { c.setBody(c.body().toString().toUpperCase()); }",
    );
    provider
        .register_function(&handle, &def)
        .await
        .expect("register");

    let exchange = Exchange::new(Message::new(Body::Text("hello".into())));
    let patch = provider
        .invoke_function(&handle, &def.id, &exchange)
        .await
        .expect("invoke");
    assert!(matches!(patch.body, Some(PatchBody::Text(ref s)) if s == "HELLO"));

    provider.shutdown_runner(handle).await.expect("shutdown");
    assert_clean(&provider).await;
}

#[tokio::test]
async fn test_shutdown_removes_container() {
    build_runner_image().await;
    let provider = create_provider("shutdown_remove");
    let handle = provider.spawn_runner("deno").await.expect("spawn");
    wait_for_health(&provider, &handle, Duration::from_secs(10))
        .await
        .expect("health");
    provider.shutdown_runner(handle).await.expect("shutdown");
    assert_clean(&provider).await;
}

#[tokio::test]
async fn test_cleanup_all() {
    build_runner_image().await;
    let provider = create_provider("cleanup_all");
    let h1 = provider.spawn_runner("deno").await.expect("spawn");
    wait_for_health(&provider, &h1, Duration::from_secs(10))
        .await
        .expect("health");
    provider.cleanup_all().await;
    assert_clean(&provider).await;
}

#[tokio::test]
async fn test_unregister_removes_function_from_container_runner() {
    build_runner_image().await;
    let provider = create_provider("unregister_runner");
    let handle = provider.spawn_runner("deno").await.expect("spawn");
    wait_for_health(&provider, &handle, Duration::from_secs(10))
        .await
        .expect("health");

    let def = make_definition("f1", "export default (c) => { c.setBody('ok'); }");
    provider
        .register_function(&handle, &def)
        .await
        .expect("register");

    let exchange = Exchange::new(Message::new(Body::Empty));
    let patch = provider
        .invoke_function(&handle, &def.id, &exchange)
        .await
        .expect("invoke before unregister");
    assert!(matches!(patch.body, Some(PatchBody::Text(ref s)) if s == "ok"));

    provider
        .unregister_function(&handle, &def.id)
        .await
        .expect("unregister");

    let invoke_after = provider.invoke_function(&handle, &def.id, &exchange).await;
    assert!(invoke_after.is_err());
    let msg = invoke_after.expect_err("invoke should fail").to_string();
    assert!(msg.contains("not_registered"));

    provider.shutdown_runner(handle).await.expect("shutdown");
    assert_clean(&provider).await;
}