lash-core 0.1.0-alpha.37

Sans-IO turn machine and runtime kernel for the lash agent runtime.
Documentation
use crate::plugin::PluginError;

use super::ToolProcessEventContext;

pub(crate) async fn enqueue_wake_delivery(
    store: Option<&dyn crate::RuntimePersistence>,
    wake_delivery: Option<crate::ProcessWakeDelivery>,
    trace_host: Option<&dyn crate::plugin::SessionGraphService>,
    queued_work_poke: Option<&crate::QueuedWorkPoke>,
) -> Result<(), PluginError> {
    let Some(wake_delivery) = wake_delivery else {
        return Ok(());
    };
    let Some(store) = store else {
        return Err(PluginError::Session(format!(
            "process wake for session `{}` requires a runtime persistence store",
            wake_delivery.target_session_id
        )));
    };
    let enqueued = store
        .enqueue_queued_work(crate::process_wake_batch_draft(wake_delivery))
        .await
        .map_err(|err| PluginError::Session(err.to_string()))?;
    let target_session_id = enqueued.session_id.clone();
    if let Some(host) = trace_host
        && let Err(err) = host
            .emit_trace_event(
                lash_trace::TraceContext::default().for_session(enqueued.session_id.clone()),
                lash_trace::TraceEvent::Custom {
                    name: "queued_work.enqueued".to_string(),
                    payload: serde_json::json!({
                        "batch_id": enqueued.batch_id,
                        "source_key": enqueued.source_key,
                        "delivery_policy": enqueued.delivery_policy,
                        "slot_policy": enqueued.slot_policy,
                        "payload_types": ["process_wake"],
                    }),
                },
            )
            .await
    {
        tracing::warn!("failed to emit process wake queue trace: {err}");
    }
    if let Some(poke) = queued_work_poke {
        poke.poke_session(target_session_id, "process_wake");
    }
    Ok(())
}

#[derive(Clone)]
pub struct ToolProcessEventControl {
    pub(super) context: Option<ToolProcessEventContext>,
}

impl ToolProcessEventControl {
    pub async fn wait_event_after(
        &self,
        event_type: &str,
        after_sequence: u64,
    ) -> Result<crate::ProcessEvent, PluginError> {
        let Some(process) = self.context.as_ref() else {
            return Err(PluginError::Session(
                "process event waiting is unavailable outside a durable process".to_string(),
            ));
        };
        process
            .registry
            .wait_event_after(&process.process_id, event_type, after_sequence)
            .await
    }

    pub async fn emit(
        &self,
        event_type: impl Into<String>,
        payload: serde_json::Value,
    ) -> Result<crate::ProcessEvent, PluginError> {
        self.emit_request(crate::ProcessEventAppendRequest::new(event_type, payload))
            .await
    }

    pub async fn emit_request(
        &self,
        request: crate::ProcessEventAppendRequest,
    ) -> Result<crate::ProcessEvent, PluginError> {
        let Some(process) = self.context.as_ref() else {
            return Err(PluginError::Session(
                "process event emission is unavailable outside a durable process".to_string(),
            ));
        };
        let result = process
            .registry
            .append_event(
                &process.process_id,
                request.with_optional_wake_target_scope(process.wake_target_scope.clone()),
            )
            .await?;
        enqueue_wake_delivery(
            process.store.as_deref(),
            result.wake_delivery,
            Some(process.session_graph.as_ref()),
            process.queued_work_poke.as_ref(),
        )
        .await?;
        Ok(result.event)
    }
}