liminal-sdk 0.2.0

Application-facing SDK traits for liminal messaging clients
Documentation
use std::collections::VecDeque;
use std::path::PathBuf;
use std::sync::{Arc, Mutex};
use std::time::Duration;

use liminal_sdk::embedded::{EmbeddedChannelBackend, EmbeddedChannelMessage};
use liminal_sdk::{
    ChannelHandle, ConnectionLifecycle, ConnectionState, ConversationEvent, ConversationId,
    DisconnectReason, EmbeddedChannelHandle, EmbeddedConfig, PressureResponse, ReconnectConfig,
    ReconnectJitter, SchemaMetadata, SchemaValidate, SdkError, SubscriptionId,
    SubscriptionRecovery,
};
use serde::Deserialize;
use serde::Serialize;
use serde_json::{Value, json};

const SCENARIOS: &str = include_str!(concat!(
    env!("CARGO_MANIFEST_DIR"),
    "/../../tests/conformance/scenarios.json"
));

#[derive(Debug, Deserialize)]
struct ScenarioSuite {
    scenarios: Vec<Scenario>,
}

#[derive(Debug, Deserialize)]
struct Scenario {
    name: String,
    expected: Value,
}

#[derive(Debug)]
struct FixedJitter;

impl ReconnectJitter for FixedJitter {
    fn jitter(&mut self, attempt: u32, capped_delay: Duration) -> Duration {
        std::hint::black_box((attempt, capped_delay));
        Duration::ZERO
    }
}

#[derive(Debug)]
struct ScriptedPressureBackend {
    responses: Mutex<VecDeque<PressureResponse>>,
}

impl ScriptedPressureBackend {
    fn new(responses: impl Into<VecDeque<PressureResponse>>) -> Self {
        Self {
            responses: Mutex::new(responses.into()),
        }
    }
}

impl EmbeddedChannelBackend for ScriptedPressureBackend {
    fn publish(&self, message: &dyn EmbeddedChannelMessage) -> Result<PressureResponse, SdkError> {
        let schema = message.schema_metadata();
        std::hint::black_box(schema);

        let mut responses = self.responses.lock().map_err(|_error| SdkError::Protocol {
            description: "scripted pressure backend lock was poisoned".to_string(),
        })?;
        responses.pop_front().ok_or_else(|| SdkError::Protocol {
            description: "scripted pressure backend exhausted responses".to_string(),
        })
    }
}

#[derive(Serialize)]
struct ConformanceMessage {
    id: u64,
}

impl SchemaValidate for ConformanceMessage {
    fn schema_metadata() -> SchemaMetadata {
        SchemaMetadata::new(
            "conformance.message",
            "1",
            br#"{"type":"object","required":["id"]}"#.as_slice(),
        )
    }
}

#[test]
fn rust_sdk_conformance_scenarios_match_shared_expectations() -> Result<(), SdkError> {
    let suite = load_scenarios()?;
    let mut results = Vec::with_capacity(suite.scenarios.len());
    let mut all_passed = true;

    for scenario in suite.scenarios {
        let observed = observe_scenario(scenario.name.as_str())?;
        let pass = observed == scenario.expected;
        all_passed &= pass;
        results.push(json!({
            "scenario": scenario.name,
            "pass": pass,
            "expected": scenario.expected,
            "observed": observed,
        }));
    }

    let output = json!({
        "sdk": "rust",
        "results": results,
    });
    let text =
        serde_json::to_string_pretty(&output).map_err(|error| serialization_error(&error))?;
    println!("{text}");
    write_result_if_requested("rust", &text)?;

    assert!(all_passed, "rust SDK conformance scenarios diverged");
    Ok(())
}

fn load_scenarios() -> Result<ScenarioSuite, SdkError> {
    serde_json::from_str(SCENARIOS).map_err(|error| serialization_error(&error))
}

fn observe_scenario(name: &str) -> Result<Value, SdkError> {
    match name {
        "connection.normal_connect" => observe_normal_connect(),
        "connection.reconnect_after_drop" => observe_reconnect_after_drop(),
        "connection.clean_disconnect" => observe_clean_disconnect(),
        "subscription.resume_from_last_acknowledged" => observe_subscription_recovery(),
        "backpressure.publish_variants" => observe_backpressure_variants(),
        "conversation.open_message_close" => Ok(observe_conversation_lifecycle()),
        other => Err(SdkError::Protocol {
            description: format!("unknown conformance scenario {other}"),
        }),
    }
}

fn observe_normal_connect() -> Result<Value, SdkError> {
    let mut lifecycle = lifecycle();
    let mut transitions = vec![state_name(lifecycle.state())];

    lifecycle.connected()?;
    transitions.push(state_name(lifecycle.state()));

    Ok(json!({
        "state_transitions": transitions,
        "final_state": state_name(lifecycle.state()),
    }))
}

fn observe_reconnect_after_drop() -> Result<Value, SdkError> {
    let mut lifecycle = lifecycle();
    let mut transitions = vec![state_name(lifecycle.state())];
    let mut attempts = Vec::new();

    lifecycle.connected()?;
    transitions.push(state_name(lifecycle.state()));

    let mut jitter = FixedJitter;
    let delay = lifecycle.reconnect(&mut jitter)?;
    std::hint::black_box(delay);
    transitions.push(state_name(lifecycle.state()));
    if let ConnectionState::Reconnecting { attempt } = lifecycle.state() {
        attempts.push(*attempt);
    }

    lifecycle.connected()?;
    transitions.push(state_name(lifecycle.state()));

    Ok(json!({
        "state_transitions": transitions,
        "final_state": state_name(lifecycle.state()),
        "reconnect_attempts": attempts,
    }))
}

fn observe_clean_disconnect() -> Result<Value, SdkError> {
    let mut lifecycle = lifecycle();
    let mut transitions = vec![state_name(lifecycle.state())];

    lifecycle.connected()?;
    transitions.push(state_name(lifecycle.state()));
    lifecycle.disconnect(DisconnectReason::Normal)?;
    transitions.push(state_name(lifecycle.state()));

    Ok(json!({
        "state_transitions": transitions,
        "final_state": state_name(lifecycle.state()),
        "disconnect_reason": disconnect_reason(lifecycle.state()),
    }))
}

fn observe_subscription_recovery() -> Result<Value, SdkError> {
    let mut recovery = SubscriptionRecovery::new();
    let subscription_id = SubscriptionId::new(1);

    recovery.acknowledge(subscription_id, 5);
    let requests = recovery.resume_requests()?;
    let from_sequence = match requests.as_slice() {
        [request] if request.subscription_id == subscription_id => request.from_sequence,
        _ => {
            return Err(SdkError::Store {
                description: "subscription recovery did not produce one resume request".to_string(),
            });
        }
    };

    Ok(json!({
        "subscription": "orders",
        "last_acknowledged_sequence": recovery.last_acknowledged_sequence(subscription_id),
        "from_sequence": from_sequence,
    }))
}

fn observe_backpressure_variants() -> Result<Value, SdkError> {
    let backend = ScriptedPressureBackend::new(VecDeque::from([
        PressureResponse::Accept,
        PressureResponse::Defer {
            delay: Duration::from_millis(250),
        },
        PressureResponse::Reject {
            reason: "consumer overloaded".to_string(),
        },
    ]));
    let config = EmbeddedConfig::new("orders", "conv-1").with_channel_backend(Arc::new(backend));
    let channel = EmbeddedChannelHandle::new(&config);

    let responses = vec![
        normalize_pressure(channel.publish(ConformanceMessage { id: 1 })?)?,
        normalize_pressure(channel.publish(ConformanceMessage { id: 2 })?)?,
        normalize_pressure(channel.publish(ConformanceMessage { id: 3 })?)?,
    ];

    Ok(json!({ "responses": responses }))
}

fn observe_conversation_lifecycle() -> Value {
    let conversation_id = ConversationId::new("conv-1");
    let events = [
        ConversationEvent::Opened {
            conversation_id: conversation_id.clone(),
        },
        ConversationEvent::Message {
            conversation_id: conversation_id.clone(),
        },
        ConversationEvent::Closing {
            conversation_id: conversation_id.clone(),
        },
        ConversationEvent::Closed { conversation_id },
    ];
    let normalized = events
        .iter()
        .map(conversation_event_kind)
        .collect::<Vec<_>>();

    json!({ "events": normalized })
}

fn lifecycle() -> ConnectionLifecycle {
    ConnectionLifecycle::new(ReconnectConfig::new(
        Duration::from_millis(10),
        Duration::from_millis(100),
    ))
}

const fn state_name(state: &ConnectionState) -> &'static str {
    match state {
        ConnectionState::Connecting => "connecting",
        ConnectionState::Connected => "connected",
        ConnectionState::Reconnecting { .. } => "reconnecting",
        ConnectionState::Disconnected { .. } => "disconnected",
    }
}

const fn disconnect_reason(state: &ConnectionState) -> &'static str {
    match state {
        ConnectionState::Disconnected {
            reason: DisconnectReason::Normal,
        } => "normal",
        ConnectionState::Disconnected {
            reason: DisconnectReason::Error,
        } => "error",
        ConnectionState::Disconnected {
            reason: DisconnectReason::Timeout,
        } => "timeout",
        _ => "none",
    }
}

fn normalize_pressure(response: PressureResponse) -> Result<Value, SdkError> {
    match response {
        PressureResponse::Accept => Ok(json!({ "kind": "accept" })),
        PressureResponse::Defer { delay } => Ok(json!({
            "kind": "defer",
            "delay": duration_millis(delay)?,
        })),
        PressureResponse::Reject { reason } => Ok(json!({
            "kind": "reject",
            "reason": reason,
        })),
    }
}

fn duration_millis(duration: Duration) -> Result<u64, SdkError> {
    u64::try_from(duration.as_millis()).map_err(|error| SdkError::Protocol {
        description: format!("pressure delay exceeded u64 milliseconds: {error}"),
    })
}

const fn conversation_event_kind(event: &ConversationEvent) -> &'static str {
    match event {
        ConversationEvent::Opened { .. } => "opened",
        ConversationEvent::Message { .. } => "message",
        ConversationEvent::Closing { .. } => "closing",
        ConversationEvent::Closed { .. } => "closed",
        ConversationEvent::Error { .. } => "error",
    }
}

fn write_result_if_requested(sdk: &str, text: &str) -> Result<(), SdkError> {
    let Ok(directory) = std::env::var("CONFORMANCE_RESULTS_DIR") else {
        return Ok(());
    };

    let directory = PathBuf::from(directory);
    std::fs::create_dir_all(&directory).map_err(|error| io_error(&error))?;
    let path = directory.join(format!("{sdk}.json"));
    std::fs::write(path, text).map_err(|error| io_error(&error))
}

fn serialization_error(error: &serde_json::Error) -> SdkError {
    SdkError::Serialization {
        description: error.to_string(),
    }
}

fn io_error(error: &std::io::Error) -> SdkError {
    SdkError::Store {
        description: error.to_string(),
    }
}