lash-core 0.1.0-alpha.49

Sans-IO turn machine and runtime kernel for the lash agent runtime.
Documentation
use std::sync::{Arc, OnceLock};

use tokio::sync::mpsc::UnboundedSender;

use crate::PluginMessage;
use crate::tool_dispatch::ToolDispatchContext;
use crate::{PromptContribution, RuntimeServices, SandboxMessage, SessionEvent, ToolProvider};

mod execution_context;
pub(crate) mod process_handles;
mod tool_execution;

pub use execution_context::RuntimeExecutionContext;
pub(crate) use execution_context::lashlang_host_environment_from_tool_catalog;
pub use tool_execution::{ToolInvocation, ToolInvocationReply};

#[derive(Clone, Debug, PartialEq, Eq)]
struct ToolCatalogCacheKey {
    include_base_tools: bool,
    context_overlay_revision: u64,
    tool_generation: u64,
    plugin_revision: u64,
    lashlang_language_features: lashlang::LashlangLanguageFeatures,
}

#[derive(Debug, Default)]
struct ToolCatalogDerived {
    catalog: OnceLock<Arc<Vec<serde_json::Value>>>,
}

struct ToolCatalogArtifact {
    tool_catalog: Arc<crate::ToolCatalog>,
    preamble: Arc<crate::TurnDriverPreamble>,
    derived: ToolCatalogDerived,
}

#[derive(Clone)]
pub(crate) struct ToolCatalogHandle(Arc<ToolCatalogArtifact>);

impl ToolCatalogHandle {
    fn tool_catalog(&self) -> Arc<crate::ToolCatalog> {
        Arc::clone(&self.0.tool_catalog)
    }

    fn preamble(&self) -> Arc<crate::TurnDriverPreamble> {
        Arc::clone(&self.0.preamble)
    }

    fn catalog(&self) -> Arc<Vec<serde_json::Value>> {
        Arc::clone(self.0.derived.catalog.get_or_init(|| {
            Arc::new(crate::tool_registry::project_tool_catalog(
                self.0.tool_catalog.searchable_tools_iter().cloned(),
            ))
        }))
    }
}

#[derive(Clone, Debug)]
pub struct InjectedTurnInput {
    pub id: Option<String>,
    pub message: PluginMessage,
}

#[derive(Debug, thiserror::Error)]
pub enum SessionError {
    #[error("I/O error: {0}")]
    Io(#[from] std::io::Error),
    #[error("JSON error: {0}")]
    Json(#[from] serde_json::Error),
    #[error("code execution is not available in this session")]
    CodeExecutionUnavailable,
    #[error("code execution runtime exited unexpectedly")]
    CodeExecutionRuntimeStopped,
    #[error(
        "provider mismatch for session `{session_id}`: persisted provider `{expected}` does not match live provider `{actual}`"
    )]
    ProviderMismatch {
        expected: String,
        actual: String,
        session_id: String,
    },
    #[error("provider is not configured for session `{session_id}`")]
    ProviderUnconfigured { session_id: String },
    #[error("provider `{provider_id}` is not registered for session `{session_id}`")]
    ProviderUnavailable {
        provider_id: String,
        session_id: String,
    },
    #[error("protocol error: {0}")]
    Protocol(String),
}

#[derive(Clone, Debug)]
pub struct ExecRequest {
    pub code: String,
    pub accept_finish: bool,
}

pub struct Session {
    session_id: String,
    services: RuntimeServices,
    include_base_tools: bool,
    context_overlay_revision: u64,
    context_tools: Vec<Arc<dyn ToolProvider>>,
    tool_registry: Arc<crate::ToolRegistry>,
    context_prompt_contributions: Vec<PromptContribution>,
    message_tx: Option<UnboundedSender<SandboxMessage>>,
    tool_catalog_cache: std::sync::Mutex<Vec<(ToolCatalogCacheKey, ToolCatalogHandle)>>,
    /// Memoizes the rendered system prompt across turns. Most consecutive
    /// turns reuse the same template + context overlay, so the cache hits
    /// and we skip the section/Vec-join work in
    /// `lash_sansio::PromptTemplate::render`.
    prompt_cache: Arc<lash_sansio::PromptCache>,
}

impl Session {
    pub async fn new(services: RuntimeServices, session_id: &str) -> Result<Self, SessionError> {
        let tool_registry = services.plugins.tool_registry();
        let mut session = Self {
            session_id: session_id.to_string(),
            services,
            include_base_tools: true,
            context_overlay_revision: 0,
            context_tools: Vec::new(),
            tool_registry,
            context_prompt_contributions: Vec::new(),
            message_tx: None,
            tool_catalog_cache: std::sync::Mutex::new(Vec::new()),
            prompt_cache: Arc::new(lash_sansio::PromptCache::new()),
        };

        let protocol_session = Arc::clone(session.plugins().protocol_session());
        protocol_session
            .initialize_session(crate::plugin::ProtocolSessionContext::new(
                &mut session,
                session_id,
            ))
            .await?;

        Ok(session)
    }

    pub fn session_id(&self) -> &str {
        &self.session_id
    }

    pub(crate) fn protocol_extra_prompt_contributions(&self) -> Vec<PromptContribution> {
        // Protocol-specific prompt contributions are owned by the protocol
        // plugins via their
        // `reg.prompt().contribute(...)` hooks. Nothing to add here.
        Vec::new()
    }

    pub fn tools(&self) -> Arc<dyn ToolProvider> {
        Arc::clone(&self.tool_registry) as Arc<dyn ToolProvider>
    }

    pub fn plugins(&self) -> &Arc<crate::PluginSession> {
        &self.services.plugins
    }

    pub fn set_context_overlay(
        &mut self,
        tool_providers: Vec<Arc<dyn ToolProvider>>,
        prompt_contributions: Vec<PromptContribution>,
        include_base_tools: bool,
    ) -> Result<(), crate::PluginError> {
        let tool_providers_unchanged = self.context_tools.len() == tool_providers.len()
            && self
                .context_tools
                .iter()
                .zip(&tool_providers)
                .all(|(current, next)| Arc::ptr_eq(current, next));
        if self.include_base_tools == include_base_tools
            && self.context_prompt_contributions == prompt_contributions
            && tool_providers_unchanged
        {
            return Ok(());
        }
        let registry = self
            .services
            .plugins
            .tool_registry()
            .compose_session_catalog(include_base_tools, tool_providers.clone())
            .map(Arc::new)
            .map_err(|err| {
                crate::PluginError::Session(format!("failed to build session tool registry: {err}"))
            })?;
        self.include_base_tools = include_base_tools;
        self.context_overlay_revision = self.context_overlay_revision.wrapping_add(1);
        self.context_tools = tool_providers;
        self.tool_registry = registry;
        self.context_prompt_contributions = prompt_contributions;
        self.tool_catalog_cache
            .lock()
            .expect("tool catalog cache lock")
            .clear();
        Ok(())
    }

    pub fn prompt_cache(&self) -> Arc<lash_sansio::PromptCache> {
        Arc::clone(&self.prompt_cache)
    }

    pub fn context_prompt_contributions(&self) -> &[PromptContribution] {
        &self.context_prompt_contributions
    }

    pub fn history_store(&self) -> Option<Arc<dyn crate::store::RuntimePersistence>> {
        self.services.store.clone()
    }

    fn tool_catalog_cache_key(&self) -> ToolCatalogCacheKey {
        ToolCatalogCacheKey {
            include_base_tools: self.include_base_tools,
            context_overlay_revision: self.context_overlay_revision,
            tool_generation: self.tool_registry.generation(),
            plugin_revision: self.plugins().snapshot_revision_fingerprint(),
            lashlang_language_features: self.plugins().lashlang_language_features(),
        }
    }

    fn build_tool_catalog_entry(
        &self,
        session_id: &str,
    ) -> Result<ToolCatalogHandle, crate::PluginError> {
        let provider = self.tools();
        let tools = provider.tool_manifests();
        let contract_provider = Arc::clone(&provider);
        let resolve_contract: lash_sansio::ToolContractResolver =
            Arc::new(move |name: &str| contract_provider.resolve_contract(name));
        let tool_catalog = Arc::new(self.plugins().resolve_tool_catalog(
            crate::plugin::ToolCatalogContext {
                session_id: session_id.to_string(),
                tools,
                resolve_contract: Some(Arc::clone(&resolve_contract)),
                tool_access: self.plugins().tool_access().clone(),
                subagent: self.plugins().subagent_context().cloned(),
                lashlang_abilities: self.plugins().lashlang_abilities(),
            },
        )?);
        let input = crate::ProtocolBuildInput {
            tool_catalog: Arc::clone(&tool_catalog),
            lashlang_host_environment:
                execution_context::lashlang_host_environment_from_tool_catalog(
                    &tool_catalog,
                    self.plugins().lashlang_abilities(),
                    self.plugins().lashlang_language_features(),
                    self.plugins().lashlang_resources(),
                ),
            extra_prompt_contributions: self.protocol_extra_prompt_contributions(),
        };
        let driver = self.plugins().protocol_driver();
        let preamble = driver.build_preamble(input);
        Ok(ToolCatalogHandle(Arc::new(ToolCatalogArtifact {
            tool_catalog,
            preamble: Arc::new(preamble),
            derived: ToolCatalogDerived::default(),
        })))
    }

    fn tool_catalog_cache_entry(
        &self,
        session_id: &str,
    ) -> Result<ToolCatalogHandle, crate::PluginError> {
        let key = self.tool_catalog_cache_key();
        let mut cache = self
            .tool_catalog_cache
            .lock()
            .expect("tool catalog cache lock");
        if let Some((_, entry)) = cache.iter().find(|(entry_key, _)| *entry_key == key) {
            return Ok(entry.clone());
        }
        let entry = self.build_tool_catalog_entry(session_id)?;
        cache.push((key, entry.clone()));
        Ok(entry)
    }

    pub fn resolved_tool_catalog(
        &self,
        session_id: &str,
    ) -> Result<Arc<crate::ToolCatalog>, crate::PluginError> {
        Ok(self.tool_catalog_cache_entry(session_id)?.tool_catalog())
    }

    pub(crate) fn turn_driver_preamble(
        &self,
        session_id: &str,
    ) -> Result<Arc<crate::TurnDriverPreamble>, crate::PluginError> {
        Ok(self.tool_catalog_cache_entry(session_id)?.preamble())
    }

    pub(crate) fn shared_tool_catalog(
        &self,
        session_id: &str,
    ) -> Result<Arc<Vec<serde_json::Value>>, crate::PluginError> {
        Ok(self.tool_catalog_cache_entry(session_id)?.catalog())
    }

    pub fn tool_catalog(
        &self,
        session_id: &str,
    ) -> Result<Vec<serde_json::Value>, crate::PluginError> {
        Ok(self.shared_tool_catalog(session_id)?.as_ref().clone())
    }

    #[allow(
        clippy::too_many_arguments,
        reason = "code execution bridge carries explicit per-turn runtime dependencies"
    )]
    pub(crate) fn code_execution_context<'run>(
        &self,
        session_id: &str,
        agent_frame_id: &str,
        sessions: Arc<dyn crate::plugin::SessionStateService>,
        session_lifecycle: Arc<dyn crate::plugin::SessionLifecycleService>,
        session_graph: Arc<dyn crate::plugin::SessionGraphService>,
        processes: Arc<dyn crate::ProcessService>,
        process_cancel_ability: Arc<dyn crate::ProcessCancelAbility>,
        effect_controller: crate::runtime::RuntimeEffectControllerHandle<'run>,
        direct_completions: crate::DirectCompletionClient<'run>,
        trigger_router: Option<crate::TriggerRouter>,
        event_tx: tokio::sync::mpsc::Sender<SessionEvent>,
        chronological_projection: Arc<crate::ChronologicalProjection>,
        protocol_extension: Option<crate::ProtocolTurnExtensionHandle>,
        turn_context: crate::TurnContext,
        execution_env_spec: crate::ProcessExecutionEnvSpec,
        checkpoint_messages: crate::tool_dispatch::CheckpointMessageBuffer,
    ) -> Result<RuntimeExecutionContext<'run>, crate::PluginError> {
        let dispatch = Arc::new(ToolDispatchContext {
            plugins: Arc::clone(self.plugins()),
            tools: self.tools(),
            tool_catalog: self.resolved_tool_catalog(session_id)?,
            sessions,
            session_lifecycle,
            session_graph,
            processes,
            process_cancel_ability,
            trigger_router,
            effect_controller,
            direct_completions: direct_completions.clone(),
            parent_invocation: None,
            execution_env_spec: execution_env_spec.clone(),
            session_id: session_id.to_string(),
            agent_frame_id: agent_frame_id.to_string(),
            event_tx,
            checkpoint_messages,
            trigger_outcomes: crate::tool_dispatch::ToolTriggerOutcomeBuffer::default(),
            attachment_store: Arc::clone(&self.services.attachment_store),
            turn_context: turn_context.clone(),
        });
        Ok(RuntimeExecutionContext::new(
            session_id.to_string(),
            dispatch,
            self.plugins().lashlang_abilities(),
            self.plugins().lashlang_language_features(),
            Arc::clone(&self.services.lashlang_artifact_store),
            Arc::clone(&self.services.attachment_store),
            chronological_projection,
            protocol_extension,
            turn_context,
        ))
        .map(|context| context.with_execution_env_spec(execution_env_spec))
    }

    /// Set the message sender for streaming messages during execution.
    pub fn set_message_sender(&mut self, tx: UnboundedSender<SandboxMessage>) {
        self.message_tx = Some(tx);
    }

    /// Clear the message sender (drops the sender, causing receivers to terminate).
    pub fn clear_message_sender(&mut self) {
        self.message_tx = None;
    }

    pub fn invalidate_runtime_caches(&self) {
        self.tool_catalog_cache
            .lock()
            .expect("tool catalog cache lock")
            .clear();
        self.prompt_cache.clear();
    }

    pub async fn refresh_tool_catalog(&mut self) -> Result<(), SessionError> {
        self.tool_registry = self
            .services
            .plugins
            .tool_registry()
            .compose_session_catalog(self.include_base_tools, self.context_tools.clone())
            .map(Arc::new)
            .map_err(|err| SessionError::Protocol(format!("tool reconfigure failed: {err}")))?;
        self.tool_catalog_cache
            .lock()
            .expect("tool catalog cache lock")
            .clear();
        Ok(())
    }
}