lash-core 0.1.0-alpha.37

Sans-IO turn machine and runtime kernel for the lash agent runtime.
Documentation
use std::sync::Arc;

use arc_swap::ArcSwap;
use tokio::sync::Mutex;

use super::{LashRuntime, ProcessHandleGrantEntry, ProcessHandleSummary, ProcessRegistry};

#[derive(Clone)]
pub struct RuntimeObservation {
    pub session_id: Arc<str>,
    pub policy: crate::SessionPolicy,
    pub read_view: crate::SessionReadView,
    pub persisted_state: super::RuntimeSessionState,
    pub usage_report: super::SessionUsageReport,
    pub tool_state: Option<crate::ToolState>,
    pub tool_catalog: Arc<Vec<serde_json::Value>>,
    pub tool_catalog_error: Option<String>,
    pub process_registry: Option<Arc<dyn ProcessRegistry>>,
    pub queue_store: Option<Arc<dyn crate::RuntimePersistence>>,
    pub queued_work_poke: Option<super::QueuedWorkPoke>,
}

impl RuntimeObservation {
    fn from_runtime(runtime: &LashRuntime, previous: Option<&RuntimeObservation>) -> Self {
        let (tool_catalog, tool_catalog_error) = match runtime.active_tool_catalog_shared() {
            Ok(catalog) => (catalog, None),
            Err(err) => (Arc::new(Vec::new()), Some(err.to_string())),
        };
        let tool_state_generation = runtime
            .session
            .as_ref()
            .map(|session| session.plugins().tool_registry().generation());
        let tool_state = match (
            tool_state_generation,
            previous.and_then(|observation| observation.tool_state.as_ref()),
        ) {
            (Some(generation), Some(snapshot)) if snapshot.generation() == generation => {
                Some(snapshot.clone())
            }
            (Some(_), _) => match runtime.tool_state() {
                Ok(state) => Some(state),
                Err(err) => {
                    tracing::warn!(
                        session_id = %runtime.session_id(),
                        error = %err,
                        "failed to capture tool state for observation; omitting the snapshot",
                    );
                    None
                }
            },
            (None, _) => None,
        };
        Self {
            session_id: Arc::from(runtime.session_id()),
            policy: runtime.read_view().policy().clone(),
            read_view: runtime.read_view(),
            persisted_state: runtime.export_persisted_state(),
            usage_report: runtime.usage_report(),
            tool_state,
            tool_catalog,
            tool_catalog_error,
            process_registry: runtime.host.process_registry.clone(),
            queue_store: runtime
                .session
                .as_ref()
                .and_then(|session| session.history_store()),
            queued_work_poke: runtime.host.queued_work_poke.clone(),
        }
    }

    pub fn session_id(&self) -> &str {
        &self.session_id
    }

    pub fn process_scope(&self) -> crate::ProcessScope {
        crate::ProcessScope::new(self.session_id.as_ref())
    }

    pub fn process_scope_id(&self) -> crate::ProcessScopeId {
        self.process_scope().id()
    }

    pub async fn list_process_handles(&self) -> Vec<ProcessHandleSummary> {
        let Some(executor) = self.process_registry.as_ref() else {
            return Vec::new();
        };
        self.list_process_handles_with_mode(executor, crate::ProcessListMode::Live)
            .await
    }

    pub async fn list_all_process_handles(&self) -> Vec<ProcessHandleSummary> {
        let Some(executor) = self.process_registry.as_ref() else {
            return Vec::new();
        };
        self.list_process_handles_with_mode(executor, crate::ProcessListMode::All)
            .await
    }

    async fn list_process_handles_with_mode(
        &self,
        executor: &Arc<dyn crate::ProcessRegistry>,
        mode: crate::ProcessListMode,
    ) -> Vec<ProcessHandleSummary> {
        let root_scope = self.process_scope();
        let mut entries = list_scope_process_handles(executor, &root_scope, mode).await;
        let agent_frame_id = self.persisted_state.current_agent_frame_id.as_str();
        if !agent_frame_id.is_empty() {
            let frame_scope =
                crate::ProcessScope::for_agent_frame(self.session_id.as_ref(), agent_frame_id);
            if frame_scope.id() != root_scope.id() {
                entries.extend(list_scope_process_handles(executor, &frame_scope, mode).await);
                entries.sort_by(|(left, _), (right, _)| left.process_id.cmp(&right.process_id));
                entries.dedup_by(|(left, _), (right, _)| left.process_id == right.process_id);
            }
        }
        entries
            .into_iter()
            .map(ProcessHandleSummary::from)
            .collect()
    }
}

async fn list_scope_process_handles(
    executor: &Arc<dyn crate::ProcessRegistry>,
    scope: &crate::ProcessScope,
    mode: crate::ProcessListMode,
) -> Vec<ProcessHandleGrantEntry> {
    match mode {
        crate::ProcessListMode::Live => executor.list_live_handle_grants(scope).await,
        crate::ProcessListMode::All => executor.list_handle_grants(scope).await,
    }
    .unwrap_or_default()
}

#[derive(Clone)]
pub struct RuntimeHandle {
    pub(in crate::runtime) runtime: Arc<Mutex<LashRuntime>>,
    observation: Arc<ArcSwap<RuntimeObservation>>,
}

impl RuntimeHandle {
    pub fn new(runtime: LashRuntime) -> Self {
        let observation = RuntimeObservation::from_runtime(&runtime, None);
        Self {
            runtime: Arc::new(Mutex::new(runtime)),
            observation: Arc::new(ArcSwap::from_pointee(observation)),
        }
    }

    pub fn writer(&self) -> Arc<Mutex<LashRuntime>> {
        Arc::clone(&self.runtime)
    }

    pub fn observe(&self) -> Arc<RuntimeObservation> {
        self.observation.load_full()
    }

    pub fn publish_from(&self, runtime: &LashRuntime) {
        let previous = self.observation.load_full();
        self.observation
            .store(Arc::new(RuntimeObservation::from_runtime(
                runtime,
                Some(previous.as_ref()),
            )));
    }

    pub async fn enqueue_turn_input(
        &self,
        input: crate::TurnInput,
        delivery_policy: crate::DeliveryPolicy,
        slot_policy: crate::SlotPolicy,
        source_key: Option<String>,
    ) -> Result<crate::QueuedWorkBatch, crate::RuntimeError> {
        let observation = self.observe();
        let store = observation
            .queue_store
            .clone()
            .ok_or_else(super::session_api::queued_turn_input_store_required)?;
        super::session_api::enqueue_turn_input_to_store(
            observation.session_id.as_ref().to_string(),
            store,
            observation.queued_work_poke.clone(),
            input,
            delivery_policy,
            slot_policy,
            source_key,
        )
        .await
    }

    pub async fn cancel_queued_work_batch(
        &self,
        session_id: &str,
        batch_id: &str,
    ) -> Result<Option<crate::QueuedWorkBatch>, crate::RuntimeError> {
        let observation = self.observe();
        let store = observation
            .queue_store
            .clone()
            .ok_or_else(super::session_api::queued_turn_input_store_required)?;
        store
            .cancel_queued_work_batch(session_id, batch_id)
            .await
            .map_err(|err| {
                crate::RuntimeError::new(
                    crate::RuntimeErrorCode::StoreCommitFailed,
                    err.to_string(),
                )
            })
    }

    pub fn try_into_runtime(self) -> Result<LashRuntime, Self> {
        match Arc::try_unwrap(self.runtime) {
            Ok(mutex) => Ok(mutex.into_inner()),
            Err(runtime) => Err(Self {
                runtime,
                observation: self.observation,
            }),
        }
    }
}