lash-runtime 0.1.0-alpha.85

Durable agent runtime for Rust: sessions, turns, tools, plugins. Embeddable facade over lash-core.
Documentation
//! The global process facade surface.
//!
//! [`Processes`] (reached via [`LashCore::processes`](crate::LashCore::processes),
//! re-exported as [`lash::process::Processes`](crate::process::Processes)) is THE
//! host-level process surface (ADR 0019 grill): start, observe, signal, cancel,
//! transfer, prune, and abandon-request every process, with the two distinct
//! scope filters — `granted_to` (what a session may address) and `originated_by`
//! (what a session created). The session-scoped
//! [`SessionProcessAdmin`](crate::admin::SessionProcessAdmin) is thin sugar over
//! this surface pre-filtered by a session's grant; it lives in `admin` because it
//! wraps a [`SessionAdmin`](crate::admin::SessionAdmin).

use crate::support::*;

#[derive(Clone)]
pub struct Processes {
    pub(crate) core: LashCore,
}

impl Processes {
    fn registry(&self) -> Result<Arc<dyn lash_core::ProcessRegistry>> {
        self.core
            .env
            .process_registry
            .as_ref()
            .cloned()
            .ok_or_else(|| {
                EmbedError::Plugin(lash_core::PluginError::Session(
                    "process registry is unavailable in this runtime".to_string(),
                ))
            })
    }

    fn make_observer(&self) -> Result<lash_core::ProcessWorkObserver> {
        Ok(lash_core::ProcessWorkObserver::new(self.registry()?))
    }

    fn process_invocation(command: &lash_core::ProcessCommand) -> lash_core::RuntimeInvocation {
        let effect_id = command.effect_id();
        lash_core::RuntimeInvocation::effect(
            lash_core::runtime::RuntimeScope::new("runtime"),
            effect_id.clone(),
            lash_core::RuntimeEffectKind::Process,
            effect_id,
        )
    }

    async fn run_command(
        &self,
        command: lash_core::ProcessCommand,
        scoped_effect_controller: ScopedEffectController<'_>,
    ) -> Result<lash_core::ProcessEffectOutcome> {
        let registry = self.registry()?;
        let invocation = Self::process_invocation(&command);
        let outcome = scoped_effect_controller
            .controller()
            .execute_effect(
                lash_core::RuntimeEffectEnvelope::new(
                    invocation,
                    lash_core::RuntimeEffectCommand::process(command),
                ),
                lash_core::RuntimeEffectLocalExecutor::processes(
                    registry,
                    self.core.env.process_work_driver.clone(),
                ),
            )
            .await
            .map_err(|err| EmbedError::Plugin(lash_core::PluginError::Session(err.to_string())))?;
        match outcome {
            lash_core::RuntimeEffectOutcome::Process { result } => Ok(result),
            _ => Err(EmbedError::Plugin(lash_core::PluginError::Session(
                "process effect returned non-process outcome".to_string(),
            ))),
        }
    }

    pub async fn start(
        &self,
        request: lash_core::ProcessStartRequest,
        scoped_effect_controller: ScopedEffectController<'_>,
    ) -> Result<lash_core::ProcessRecord> {
        let env_ref = match request.env_spec.as_ref() {
            Some(env_spec) => Some(
                lash_core::runtime::persist_process_execution_env(
                    self.core.env.core.durability.process_env_store.as_ref(),
                    env_spec,
                )
                .await?,
            ),
            None => None,
        };
        let grant = request.grant.clone();
        let registration = request.into_registration(env_ref);
        let command = lash_core::ProcessCommand::Start {
            registration,
            grant,
            execution_context: Box::new(lash_core::ProcessExecutionContext::default()),
        };
        let outcome = self
            .run_command(command, scoped_effect_controller.clone())
            .await?;
        let lash_core::ProcessEffectOutcome::Start { record } = outcome else {
            return Err(EmbedError::Plugin(lash_core::PluginError::Session(
                "process start returned the wrong outcome".to_string(),
            )));
        };
        if let Some(driver) = self.core.work_driver.drivers().await.process {
            driver.claim_and_run_pending("admin_process_start").await?;
        }
        Ok(*record)
    }

    pub async fn list(
        &self,
        filter: &lash_core::ProcessListFilter,
    ) -> Result<Vec<lash_core::ObservedProcess>> {
        self.make_observer()?.list(filter).await.map_err(Into::into)
    }

    /// List processes a session may address — the **grant** filter (ADR 0019).
    /// This is the security lens (what a session is authorized to see), distinct
    /// from [`list_originated_by`](Self::list_originated_by). `session.processes()`
    /// is thin sugar over this method pre-scoped to the session's own grant.
    pub async fn list_granted_to(
        &self,
        session_scope: &lash_core::SessionScope,
        filter: &lash_core::ProcessListFilter,
    ) -> Result<Vec<lash_core::ObservedProcess>> {
        self.make_observer()?
            .list_granted_to(session_scope, filter)
            .await
            .map_err(Into::into)
    }

    /// List processes a session originated — the **provenance** filter (ADR
    /// 0019). This is the lineage lens (what a session created), distinct from
    /// [`list_granted_to`](Self::list_granted_to): a process a session started
    /// then transferred away still matches here, and one merely granted to it
    /// does not.
    pub async fn list_originated_by(
        &self,
        session_scope: &lash_core::SessionScope,
        filter: &lash_core::ProcessListFilter,
    ) -> Result<Vec<lash_core::ObservedProcess>> {
        self.make_observer()?
            .list_originated_by(session_scope, filter)
            .await
            .map_err(Into::into)
    }

    pub async fn get(&self, process_id: &str) -> Result<Option<lash_core::ObservedProcess>> {
        Ok(self.make_observer()?.process(process_id).await)
    }

    pub async fn events(
        &self,
        process_id: &str,
        after_sequence: u64,
    ) -> Result<Vec<lash_core::ObservedProcessEvent>> {
        self.make_observer()?
            .events_after(process_id, after_sequence)
            .await
            .map_err(Into::into)
    }

    pub async fn await_output(&self, process_id: &str) -> Result<lash_core::ProcessAwaitOutput> {
        if let Some(driver) = self.core.env.process_work_driver.as_ref() {
            return driver.await_terminal(process_id).await.map_err(Into::into);
        }
        lash_core::ProcessAwaiter::polling(self.registry()?)
            .await_terminal(process_id)
            .await
            .map_err(Into::into)
    }

    pub async fn cancel(
        &self,
        process_id: &str,
        scoped_effect_controller: ScopedEffectController<'_>,
    ) -> Result<lash_core::ProcessCancelSummary> {
        let command = lash_core::ProcessCommand::Cancel {
            process_id: process_id.to_string(),
            reason: Some("requested by host".to_string()),
        };
        let outcome = self
            .run_command(command, scoped_effect_controller.clone())
            .await?;
        let lash_core::ProcessEffectOutcome::Cancel { record } = outcome else {
            return Err(EmbedError::Plugin(lash_core::PluginError::Session(
                "process cancel returned the wrong outcome".to_string(),
            )));
        };
        Ok(lash_core::ProcessCancelSummary::from_record(*record))
    }

    pub async fn signal(
        &self,
        process_id: &str,
        signal_name: impl Into<String>,
        signal_id: impl Into<String>,
        request: lash_core::ProcessEventAppendRequest,
        scoped_effect_controller: ScopedEffectController<'_>,
    ) -> Result<lash_core::ProcessEvent> {
        let signal_name = signal_name.into();
        let event_type = request.event_type.clone();
        let payload = request.payload.clone();
        let command = lash_core::ProcessCommand::Signal {
            process_id: process_id.to_string(),
            signal_name: signal_name.clone(),
            signal_id: signal_id.into(),
            request,
        };
        let outcome = self
            .run_command(command, scoped_effect_controller.clone())
            .await?;
        let lash_core::ProcessEffectOutcome::Signal { event } = outcome else {
            return Err(EmbedError::Plugin(lash_core::PluginError::Session(
                "process signal returned the wrong outcome".to_string(),
            )));
        };
        let registry = self.registry()?;
        let waiting_ordinal =
            registry
                .get_process(process_id)
                .await
                .and_then(|record| match record.wait {
                    Some(lash_core::WaitState {
                        kind:
                            lash_core::WaitKind::Signal {
                                name,
                                event_type: wait_event_type,
                                ordinal,
                                ..
                            },
                        ..
                    }) if name == signal_name && wait_event_type == event_type => Some(ordinal),
                    _ => None,
                });
        let ordinal = match waiting_ordinal {
            Some(ordinal) => ordinal,
            None => {
                registry
                    .count_events_through(process_id, &event_type, event.sequence)
                    .await?
            }
        };
        if ordinal > 0 {
            let key = scoped_effect_controller
                .controller()
                .await_event_key(
                    &lash_core::ExecutionScope::process(process_id),
                    lash_core::AwaitEventWaitIdentity::process_signal(
                        process_id,
                        &signal_name,
                        ordinal,
                    ),
                )
                .await
                .map_err(|err| {
                    EmbedError::Plugin(lash_core::PluginError::Session(err.to_string()))
                })?;
            let _ = scoped_effect_controller
                .controller()
                .resolve_await_event(&key, lash_core::Resolution::Ok(payload))
                .await
                .map_err(|err| {
                    EmbedError::Plugin(lash_core::PluginError::Session(err.to_string()))
                })?;
        }
        Ok(*event)
    }

    pub async fn session_snapshot(
        &self,
        session_id: impl Into<String>,
    ) -> Result<lash_core::ProcessWorkSnapshot> {
        self.make_observer()?
            .snapshot_for_session(session_id)
            .await
            .map_err(Into::into)
    }

    pub fn observer(&self) -> Result<lash_core::ProcessWorkObserver> {
        self.make_observer()
    }

    /// Cancel every currently-running process. A host-wide lever; for a
    /// session-scoped stop use [`SessionProcessAdmin::cancel_all`](crate::admin::SessionProcessAdmin::cancel_all).
    pub async fn cancel_all(
        &self,
        scoped_effect_controller: ScopedEffectController<'_>,
    ) -> Result<Vec<lash_core::ProcessCancelSummary>> {
        let running = self
            .list(&lash_core::ProcessListFilter {
                status: lash_core::ProcessStatusFilter::Running,
                ..lash_core::ProcessListFilter::default()
            })
            .await?;
        let mut summaries = Vec::with_capacity(running.len());
        for process in running {
            summaries.push(
                self.cancel(&process.process_id, scoped_effect_controller.clone())
                    .await?,
            );
        }
        Ok(summaries)
    }

    /// Move handle grants for `process_ids` from one session scope to another.
    /// Processes are global; this re-homes only the addressing grant, never the
    /// process itself.
    pub async fn transfer(
        &self,
        from_scope: &lash_core::SessionScope,
        to_scope: &lash_core::SessionScope,
        process_ids: &[String],
    ) -> Result<()> {
        self.registry()?
            .transfer_handle_grants(from_scope, to_scope, process_ids)
            .await
            .map_err(Into::into)
    }

    /// Host-scheduled retention lever (ADR 0017): physically delete terminal
    /// process rows (and their events, grants, leases) older than
    /// `cutoff_epoch_ms`, returning what was reclaimed. Non-terminal rows are
    /// never touched. Choose a cutoff comfortably longer than any live await.
    pub async fn prune(&self, cutoff_epoch_ms: u64) -> Result<lash_core::ProcessPruneReport> {
        self.registry()?
            .prune_terminal_processes(cutoff_epoch_ms)
            .await
            .map_err(Into::into)
    }

    /// Record a durable, non-terminal **Abandon Request** on a process (ADR
    /// 0019): a third party's authorization to accept uncertainty about an
    /// owner. This never terminalizes anything itself — the recovery sweep
    /// reconciles it into `Abandoned` only once the owner's lease has lapsed;
    /// the marker stays visible to observers while pending. Returns the process
    /// as observed after the marker is written.
    pub async fn request_abandon(
        &self,
        process_id: &str,
        requested_by: impl Into<String>,
        reason: Option<String>,
    ) -> Result<lash_core::ObservedProcess> {
        let request = lash_core::AbandonRequest {
            requested_by: requested_by.into(),
            requested_at_ms: now_epoch_ms(),
            reason,
        };
        self.registry()?
            .request_process_abandon(process_id, request)
            .await?;
        self.get(process_id).await?.ok_or_else(|| {
            EmbedError::Plugin(lash_core::PluginError::Session(format!(
                "process `{process_id}` vanished after recording its abandon request"
            )))
        })
    }
}

/// Host wall-clock epoch milliseconds for facade-issued markers (e.g. the
/// Abandon Request timestamp). The registry stays state-only, so the facade
/// stamps the request time itself. Shared with the session-scoped abandon lever
/// in [`crate::admin`].
pub(crate) fn now_epoch_ms() -> u64 {
    std::time::SystemTime::now()
        .duration_since(std::time::UNIX_EPOCH)
        .map(|elapsed| elapsed.as_millis() as u64)
        .unwrap_or(0)
}