liminal-rs 0.2.0

A conversation-based messaging bus built on beamr
Documentation
use std::error::Error;
use std::sync::Arc;

use super::*;
use crate::conversation::ConversationSupervisor;
use crate::routing::{
    FieldValue, ModuleLoader, RoutingDecision, RoutingModule, RoutingSlot, SupervisedExecutor,
};

mod support;

use support::{TestSetup, completed, request, response, test_error, worker};

#[test]
fn successful_dispatch_records_conversation_lifecycle() -> Result<(), Box<dyn Error>> {
    let setup = TestSetup::new(
        vec![worker("worker-a", 11), worker("worker-b", 12)],
        vec![response("worker-b", completed(b"ok"))],
    );
    setup.router.push_decision(Some("worker-b"))?;

    let result =
        dispatch_activity_with_context(&setup.context(), "prod", "email", request("email"))?;

    assert_eq!(result, completed(b"ok"));
    let expected_channel = String::from(dispatch_channel("prod", "email")?);
    assert_eq!(setup.factory.opens()?, vec![expected_channel]);
    assert_eq!(setup.conversation.links()?, vec!["worker-b"]);
    assert_eq!(
        setup.conversation.sent_requests()?[0].request,
        request("email")
    );
    assert_eq!(setup.router.calls()?.len(), 1);
    assert_eq!(
        setup.recorder.kinds()?,
        vec![
            DispatchOperationKind::ConversationOpened,
            DispatchOperationKind::WorkerSelected,
            DispatchOperationKind::MessageSent,
            DispatchOperationKind::MessageReceived,
            DispatchOperationKind::ConversationClosed,
        ]
    );
    assert_eq!(
        setup.recorder.states()?,
        vec![
            ActivityDispatchState::ActivityScheduled,
            ActivityDispatchState::ActivityStarted,
            ActivityDispatchState::ActivityCompleted,
        ]
    );
    Ok(())
}

#[test]
fn failed_activity_maps_to_dispatch_failed() -> Result<(), Box<dyn Error>> {
    let setup = TestSetup::new(
        vec![worker("worker-a", 11)],
        vec![response(
            "worker-a",
            ActivityResult::Failed {
                error: test_error("activity exploded"),
            },
        )],
    );

    let result =
        dispatch_activity_with_context(&setup.context(), "prod", "email", request("email"));

    assert!(matches!(
        result,
        Err(AionSurfaceError::DispatchFailed { .. })
    ));
    let error = result
        .err()
        .map_or_else(String::new, |error| error.to_string());
    assert!(error.contains("activity exploded"));
    assert_eq!(
        setup.recorder.states()?,
        vec![
            ActivityDispatchState::ActivityScheduled,
            ActivityDispatchState::ActivityStarted,
            ActivityDispatchState::ActivityFailed {
                retry_eligible: false,
            },
        ]
    );
    assert_eq!(setup.conversation.closes()?, 1);
    Ok(())
}

#[test]
fn linked_worker_exit_reselects_and_resends_same_request() -> Result<(), Box<dyn Error>> {
    let setup = TestSetup::new(
        vec![worker("worker-a", 11), worker("worker-b", 12)],
        vec![
            DispatchConversationEvent::WorkerExited {
                worker_id: "worker-a".to_owned(),
                message: "linked process exited".to_owned(),
            },
            response("worker-b", completed(b"ok")),
        ],
    );

    let result =
        dispatch_activity_with_context(&setup.context(), "prod", "email", request("email"))?;

    assert_eq!(result, completed(b"ok"));
    assert_eq!(setup.conversation.links()?, vec!["worker-a", "worker-b"]);
    let sent = setup.conversation.sent_requests()?;
    assert_eq!(sent.len(), 2);
    assert_eq!(sent[0].request, sent[1].request);
    let calls = setup.router.calls()?;
    assert_eq!(calls.len(), 2);
    assert_eq!(calls[1].excluded, vec!["worker-a"]);
    assert!(setup.recorder.operations()?.iter().any(|operation| {
        operation.kind == DispatchOperationKind::WorkerExited
            && operation.activity_state
                == Some(ActivityDispatchState::ActivityFailed {
                    retry_eligible: true,
                })
    }));
    Ok(())
}

#[test]
fn no_workers_after_exit_returns_dispatch_failed() -> Result<(), Box<dyn Error>> {
    let setup = TestSetup::new(
        vec![worker("worker-a", 11)],
        vec![DispatchConversationEvent::WorkerExited {
            worker_id: "worker-a".to_owned(),
            message: "linked process exited".to_owned(),
        }],
    );

    let result =
        dispatch_activity_with_context(&setup.context(), "prod", "email", request("email"));

    assert!(matches!(
        result,
        Err(AionSurfaceError::DispatchFailed { .. })
    ));
    let error = result
        .err()
        .map_or_else(String::new, |error| error.to_string());
    assert!(error.contains("NoWorkersAvailable"));
    assert_eq!(setup.router.calls()?.len(), 2);
    assert_eq!(setup.conversation.closes()?, 1);
    Ok(())
}

#[test]
fn replay_returns_recorded_outcome_without_live_conversation() -> Result<(), Box<dyn Error>> {
    let setup = TestSetup::replaying(RecordedDispatchOutcome::new(Ok(completed(b"replayed"))));

    let result =
        dispatch_activity_with_context(&setup.context(), "prod", "email", request("email"))?;

    assert_eq!(result, completed(b"replayed"));
    assert_no_live_dispatch(&setup)?;
    Ok(())
}

#[test]
fn replay_returns_recorded_failure_without_re_dispatch() -> Result<(), Box<dyn Error>> {
    let setup = TestSetup::replaying(RecordedDispatchOutcome::new(Err(test_error(
        "NoWorkersAvailable: recorded crash re-dispatch exhausted workers",
    ))));

    let result =
        dispatch_activity_with_context(&setup.context(), "prod", "email", request("email"));

    assert!(matches!(
        result,
        Err(AionSurfaceError::DispatchFailed { .. })
    ));
    let error = result
        .err()
        .map_or_else(String::new, |error| error.to_string());
    assert!(error.contains("NoWorkersAvailable"));
    assert_no_live_dispatch(&setup)?;
    Ok(())
}

fn assert_no_live_dispatch(setup: &TestSetup) -> Result<(), Box<dyn Error>> {
    assert_eq!(setup.factory.opens()?.len(), 0);
    assert_eq!(setup.router.calls()?.len(), 0);
    assert_eq!(setup.pool.calls()?, 0);
    assert_eq!(setup.recorder.operations()?.len(), 0);
    Ok(())
}

#[test]
fn routing_function_adapter_selects_from_current_candidates() -> Result<(), Box<dyn Error>> {
    let loader = ModuleLoader::new();
    let function = loader.load(RoutingModule::new(
        b"select-email-worker",
        |message, consumers| {
            if message.get("activity_type") == Some(&FieldValue::Text("send-email".to_owned())) {
                consumers
                    .get(1)
                    .map_or_else(RoutingDecision::none, |state| {
                        RoutingDecision::select(state.consumer.clone())
                    })
            } else {
                RoutingDecision::none()
            }
        },
    ));
    let supervisor = ConversationSupervisor::new()?;
    let router = RoutingFunctionDispatchRouter::new(
        Arc::new(RoutingSlot::new(function)),
        SupervisedExecutor::with_default_timeout(supervisor.scheduler()),
    );
    let workers = vec![worker("worker-a", 11), worker("worker-b", 12)];
    let channel_name = dispatch_channel("prod", "email")?;

    let selected = router.select_worker("wf-1", &channel_name, &request("email"), &workers, &[])?;

    assert_eq!(
        selected.map(|worker| worker.worker_id),
        Some("worker-b".to_owned())
    );
    Ok(())
}

#[test]
fn routing_is_called_per_dispatch_not_membership_change() -> Result<(), Box<dyn Error>> {
    let setup = TestSetup::new(
        vec![worker("worker-a", 11)],
        vec![
            response("worker-a", completed(b"one")),
            response("worker-b", completed(b"two")),
        ],
    );
    let first =
        dispatch_activity_with_context(&setup.context(), "prod", "email", request("email"))?;
    setup
        .pool
        .set_workers(vec![worker("worker-a", 11), worker("worker-b", 12)])?;

    let second =
        dispatch_activity_with_context(&setup.context(), "prod", "email", request("email"))?;

    assert_eq!(first, completed(b"one"));
    assert_eq!(second, completed(b"two"));
    assert_eq!(setup.router.calls()?.len(), 2);
    assert_eq!(
        setup.router.calls()?[1].candidates,
        vec!["worker-a", "worker-b"]
    );
    Ok(())
}