aion-rs 0.1.0

Transport-agnostic Aion workflow engine with durability, replay, timers, and supervision.
//! Shared helpers for durable activity NIFs.

use std::cell::RefCell;
use std::sync::Arc;

use aion_core::{ActivityError, ActivityErrorKind, ActivityId, Payload};
use beamr::atom::Atom;
use beamr::term::Term;
use beamr::term::binary;
use beamr::term::binary_ref::BinaryRef;
use beamr::term::boxed;
use chrono::Utc;
use tokio::runtime::Handle;

use crate::RuntimeHandle;
use crate::registry::Registry;
use crate::runtime::nif_context::{NifContext, NifContextError};
use crate::runtime::nif_state::EngineNifState;

thread_local! {
    static ACTIVITY_NIF_HEAP: RefCell<Vec<Box<[u64]>>> = const { RefCell::new(Vec::new()) };
}

#[derive(Clone)]
pub(super) struct RuntimeContext {
    pub(super) registry: Arc<Registry>,
    pub(super) runtime: Arc<RuntimeHandle>,
    pub(super) tokio_handle: Handle,
}

pub(crate) fn install_nif_runtime_context(
    state: &EngineNifState,
    registry: Arc<Registry>,
    runtime: Arc<RuntimeHandle>,
    tokio_handle: Handle,
) {
    let context = RuntimeContext {
        registry,
        runtime,
        tokio_handle,
    };
    match state.runtime_context.write() {
        Ok(mut slot) => *slot = Some(context),
        Err(poisoned) => *poisoned.into_inner() = Some(context),
    }
}

pub(super) fn runtime_context(state: &EngineNifState) -> Result<RuntimeContext, NifContextError> {
    let guard = state
        .runtime_context
        .read()
        .map_err(|_| NifContextError::TermEncoding {
            reason: "nif runtime context lock is poisoned".to_owned(),
        })?;
    guard.clone().ok_or_else(|| NifContextError::TermEncoding {
        reason: "nif runtime context is not installed".to_owned(),
    })
}

fn park_heap(heap: Box<[u64]>) {
    ACTIVITY_NIF_HEAP.with_borrow_mut(|parked| parked.push(heap));
}

pub(super) fn alloc_binary_term(bytes: &[u8]) -> Option<Term> {
    let word_count = 2 + binary::packed_word_count(bytes.len());
    let mut heap = vec![0_u64; word_count].into_boxed_slice();
    let term = binary::write_binary(&mut heap, bytes)?;
    park_heap(heap);
    Some(term)
}

pub(super) fn alloc_tuple_term(elements: &[Term]) -> Option<Term> {
    let word_count = 1 + elements.len();
    let mut heap = vec![0_u64; word_count].into_boxed_slice();
    let term = boxed::write_tuple(&mut heap, elements)?;
    park_heap(heap);
    Some(term)
}

pub(super) fn tagged_result_term(tag: Atom, bytes: &[u8]) -> Option<Term> {
    let value = alloc_binary_term(bytes)?;
    alloc_tuple_term(&[Term::atom(tag), value])
}

pub(super) fn ok_result_term(bytes: &[u8]) -> Option<Term> {
    tagged_result_term(Atom::OK, bytes)
}

pub(super) fn error_result_term(message: &str) -> Option<Term> {
    tagged_result_term(Atom::ERROR, message.as_bytes())
}

pub(super) fn decode_string_arg(term: Term) -> Result<String, String> {
    let bin = BinaryRef::new(term).ok_or_else(|| "argument is not a binary".to_owned())?;
    String::from_utf8(bin.as_bytes().to_vec()).map_err(|_| "argument is not valid UTF-8".to_owned())
}

pub(super) fn json_payload(text: &str, phase: &str, label: &str) -> Result<Payload, Term> {
    let value = serde_json::from_str(text).map_err(|error| {
        error_result_term(&format!("{phase} {label}: invalid JSON payload: {error}"))
            .unwrap_or(Term::NIL)
    })?;
    Payload::from_json(&value).map_err(|error| {
        error_result_term(&format!("{phase} {label}: {error}")).unwrap_or(Term::NIL)
    })
}

pub(super) fn activity_error(reason: String) -> ActivityError {
    ActivityError {
        kind: ActivityErrorKind::Terminal,
        message: reason,
        details: None,
    }
}

pub(super) fn context_error_term(error: &NifContextError) -> Term {
    match error.to_error_term() {
        Ok(term) => term,
        Err(_) => Term::NIL,
    }
}

pub(super) fn record_started(
    context: &NifContext,
    activity_id: ActivityId,
    activity_type: String,
    input: Payload,
) -> Result<(), Term> {
    let recorded_at = Utc::now();
    context
        .record_activity_scheduled_started(recorded_at, activity_id, activity_type, input)
        .map_err(|error| context_error_term(&error))
}

pub(super) fn record_completed(
    context: &NifContext,
    activity_id: ActivityId,
    result: Payload,
) -> Result<(), Term> {
    let recorded_at = Utc::now();
    context
        .record_activity_completed(recorded_at, activity_id, result)
        .map_err(|error| context_error_term(&error))
}

pub(super) fn record_failed(
    context: &NifContext,
    activity_id: ActivityId,
    error: ActivityError,
) -> Result<(), Term> {
    let recorded_at = Utc::now();
    context
        .record_activity_failed(recorded_at, activity_id, error, 1)
        .map_err(|error| context_error_term(&error))
}

pub(super) fn correlation_id(ordinal: u64) -> String {
    ActivityId::from_sequence_position(ordinal).to_string()
}

pub(super) fn activity_id_from_correlation(correlation: &str) -> Result<ActivityId, Term> {
    let Some(raw) = correlation.strip_prefix("activity:") else {
        return Err(
            error_result_term("await_activity_result: invalid correlation id").unwrap_or(Term::NIL),
        );
    };
    let sequence = raw.parse::<u64>().map_err(|error| {
        error_result_term(&format!(
            "await_activity_result: invalid correlation id sequence: {error}"
        ))
        .unwrap_or(Term::NIL)
    })?;
    Ok(ActivityId::from_sequence_position(sequence))
}