liminal-rs 0.2.0

A conversation-based messaging bus built on beamr
Documentation
use serde_json::{Value, json};

use super::{
    SignalDeclaration, SignalPayload, SignalSession, delivery_failed, lifecycle_failed,
    validation_failed,
};
use crate::aion::AionSurfaceError;
use crate::aion::channels::ChannelName;
use crate::aion::types::Payload;
use crate::channel::Schema;

pub(super) fn build_signal_schema(
    channel_name: &ChannelName,
    declarations: &[SignalDeclaration],
) -> Result<Schema, AionSurfaceError> {
    let variants: Vec<Value> = declarations.iter().map(signal_variant_schema).collect();
    Schema::new(json!({ "oneOf": variants })).map_err(|error| lifecycle_failed(channel_name, error))
}

pub(super) fn validate_signal(
    session: &SignalSession,
    signal: &SignalPayload,
) -> Result<(), AionSurfaceError> {
    let Some(declaration) = session
        .declarations
        .iter()
        .find(|declaration| declaration.signal_name == signal.signal_name)
    else {
        let expected = expected_types(&session.declarations);
        return Err(validation_failed(
            &session.channel_name,
            session.workflow_id.as_str(),
            signal,
            expected.as_str(),
            "signal name is not declared",
        ));
    };

    if signal.payload.content_type != declaration.content_type {
        return Err(validation_failed(
            &session.channel_name,
            session.workflow_id.as_str(),
            signal,
            declaration.content_type.as_str(),
            "content type does not match declaration",
        ));
    }

    if let Some(schema) = &declaration.payload_schema {
        schema.validate(&signal.payload.data).map_err(|error| {
            validation_failed(
                &session.channel_name,
                session.workflow_id.as_str(),
                signal,
                declaration.content_type.as_str(),
                format!("payload schema mismatch: {error}"),
            )
        })?;
    }
    Ok(())
}

pub(super) fn encode_signal(
    channel_name: &ChannelName,
    workflow_id: &str,
    signal: &SignalPayload,
) -> Result<Vec<u8>, AionSurfaceError> {
    serde_json::to_vec(&json!({
        "signal_name": signal.signal_name.as_str(),
        "content_type": signal.payload.content_type.as_str(),
        "data": signal.payload.data.as_slice()
    }))
    .map_err(|error| delivery_failed(channel_name, workflow_id, &signal.signal_name, error))
}

pub(super) fn drain_delivery(session: &SignalSession) -> Result<SignalPayload, AionSurfaceError> {
    let envelope = session
        .subscription
        .try_next()
        .map_err(|error| {
            delivery_failed(
                &session.channel_name,
                session.workflow_id.as_str(),
                "<unknown>",
                error,
            )
        })?
        .ok_or_else(|| {
            delivery_failed(
                &session.channel_name,
                session.workflow_id.as_str(),
                "<unknown>",
                "signal channel delivered no envelope",
            )
        })?;
    decode_signal(
        &session.channel_name,
        session.workflow_id.as_str(),
        &envelope.payload,
    )
}

fn signal_variant_schema(declaration: &SignalDeclaration) -> Value {
    json!({
        "type": "object",
        "properties": {
            "signal_name": { "type": "string", "enum": [declaration.signal_name.as_str()] },
            "content_type": { "type": "string", "enum": [declaration.content_type.as_str()] },
            "data": {
                "type": "array",
                "items": { "type": "integer", "minimum": 0, "maximum": 255 }
            }
        },
        "required": ["signal_name", "content_type", "data"],
        "additionalProperties": false
    })
}

fn decode_signal(
    channel_name: &ChannelName,
    workflow_id: &str,
    payload: &[u8],
) -> Result<SignalPayload, AionSurfaceError> {
    let value: Value = serde_json::from_slice(payload)
        .map_err(|error| delivery_failed(channel_name, workflow_id, "<unknown>", error))?;
    let signal_name = value
        .get("signal_name")
        .and_then(Value::as_str)
        .ok_or_else(|| {
            delivery_failed(
                channel_name,
                workflow_id,
                "<unknown>",
                "missing signal_name",
            )
        })?;
    let content_type = value
        .get("content_type")
        .and_then(Value::as_str)
        .ok_or_else(|| {
            delivery_failed(
                channel_name,
                workflow_id,
                signal_name,
                "missing content_type",
            )
        })?;
    let data =
        serde_json::from_value(value.get("data").cloned().ok_or_else(|| {
            delivery_failed(channel_name, workflow_id, signal_name, "missing data")
        })?)
        .map_err(|error| delivery_failed(channel_name, workflow_id, signal_name, error))?;

    Ok(SignalPayload {
        signal_name: signal_name.to_owned(),
        payload: Payload {
            data,
            content_type: content_type.to_owned(),
        },
    })
}

fn expected_types(declarations: &[SignalDeclaration]) -> String {
    declarations
        .iter()
        .map(|declaration| format!("{}={}", declaration.signal_name, declaration.content_type))
        .collect::<Vec<_>>()
        .join(", ")
}