Skip to main content

lash_core/runtime/
lifecycle.rs

1use super::*;
2
3impl LashRuntime {
4    /// Override the owner identity used for durable session execution leases.
5    ///
6    /// Normal embedded runtimes use a fresh owner and incarnation so concurrent
7    /// opens of the same session exclude each other. Durable orchestrators may
8    /// set a stable `(owner_id, incarnation_id)` pair for one serialized logical
9    /// workflow.
10    pub fn set_runtime_lease_owner(&mut self, owner: crate::LeaseOwnerIdentity) {
11        self.runtime_lease_owner = owner;
12    }
13
14    pub fn unregister_plugin_session(&self) -> Result<(), crate::PluginError> {
15        if let Some(session) = self.session.as_ref() {
16            session
17                .plugins()
18                .host()
19                .unregister_session(&self.state.session_id)?;
20        }
21        Ok(())
22    }
23
24    pub(super) async fn from_host_state(
25        policy: SessionPolicy,
26        host: RuntimeHost,
27        services: RuntimeServices,
28        mut state: RuntimeSessionState,
29    ) -> Result<Self, SessionError> {
30        if state.session_id.is_empty() {
31            state.session_id = uuid::Uuid::new_v4().to_string();
32        }
33        // Defaulted state (e.g. `RuntimeSessionState::default()` used
34        // by fresh-session constructors) carries an empty policy.
35        // Fill it in from the caller's policy so tests and hosts that
36        // pass a real policy alongside default state don't trip the explicit
37        // model-spec guard below.
38        let state_policy_was_unconfigured = state.policy.recorded_provider_id().is_empty()
39            && state.policy.model.id.trim().is_empty();
40        if state_policy_was_unconfigured {
41            state.policy = policy.clone();
42        }
43        state.ensure_agent_frame_initialized();
44        let state_policy = state.policy.clone();
45        if let Some(frame) = state.current_agent_frame_mut()
46            && frame.assignment.policy.recorded_provider_id().is_empty()
47            && frame.assignment.policy.model.id.trim().is_empty()
48        {
49            frame.assignment.policy = state_policy;
50        }
51        state.policy = state.effective_policy().clone();
52        state.protocol_turn_options = state.effective_protocol_turn_options().clone();
53        normalize_session_graph(&mut state);
54        let policy = state.effective_policy().clone();
55        if policy.model.id.trim().is_empty() {
56            return Err(SessionError::Protocol(
57                "session policy missing model spec; hosts must supply explicit model metadata"
58                    .to_string(),
59            ));
60        }
61        let mut host = host;
62        // When a persistent backend is wired in, wrap the attachment
63        // store so every `put` records a write-ahead intent row first.
64        // Crashes between put and the next turn commit then surface as
65        // uncommitted manifest rows that GC can reconcile. Ephemeral
66        // (no-store) runtimes use the inner store directly — there's
67        // nothing to reconcile against.
68        if let Some(store) = services.store.clone() {
69            let manifest: Arc<dyn crate::AttachmentManifest> =
70                Arc::new(crate::attachments::PersistenceManifestAdapter(store));
71            let scoped: Arc<dyn crate::AttachmentStore> =
72                Arc::new(crate::SessionScopedAttachmentStore::new(
73                    Arc::clone(&host.core.durability.attachment_store),
74                    manifest,
75                    state.session_id.clone(),
76                ));
77            host.core.durability.attachment_store = scoped;
78        }
79        let services = services
80            .with_attachment_store(Arc::clone(&host.core.durability.attachment_store))
81            .with_process_env_store(Arc::clone(&host.core.durability.process_env_store))
82            .with_clock(Arc::clone(&host.core.clock));
83        let mut session = Session::new(services.clone(), &state.session_id).await?;
84        if let Some(tool_state) = state.tool_state_snapshot.clone() {
85            // Cold rebuild restores the exact persisted tool catalog, adopting
86            // the snapshot's generation. `apply_state` (a delta-apply that
87            // requires `snapshot.generation == base` and bumps) would reject a
88            // session whose surface reached generation ≥ 2 onto a fresh base-1
89            // registry — the worker-rebuild / restart divergence. `restore_state`
90            // adopts the snapshot's generation wholesale, so any generation
91            // rebuilds.
92            let report = session
93                .plugins()
94                .tool_registry()
95                .restore_state(tool_state)
96                .map_err(|err| SessionError::Protocol(err.to_string()))?;
97            if !report.orphaned.is_empty() {
98                tracing::warn!(
99                    session_id = %state.session_id,
100                    orphaned = ?report.orphaned,
101                    "session restored with orphaned tools: no registered source \
102                     resolves them; they remain non-members until their source returns"
103                );
104            }
105        }
106        session.refresh_tool_catalog().await?;
107        if let Some(snapshot) = state.plugin_snapshot.clone() {
108            session
109                .plugins()
110                .restore(&snapshot)
111                .map_err(|err| SessionError::Protocol(err.to_string()))?;
112        }
113        let protocol_session = Arc::clone(session.plugins().protocol_session());
114        let session_id = state.session_id.clone();
115        protocol_session
116            .restore_session(
117                crate::plugin::ProtocolSessionContext::new(&mut session, &session_id),
118                &state,
119            )
120            .await?;
121        state.discard_runtime_snapshots();
122        session
123            .plugins()
124            .emit_runtime_event(crate::PluginLifecycleEvent::SessionRestored(
125                crate::SessionReadView::from_persisted_state(&state),
126            ))
127            .await;
128        let protocol_turn_options = state.protocol_turn_options.clone();
129        let runtime_scope_id = uuid::Uuid::new_v4().to_string();
130        let runtime_lease_owner = crate::LeaseOwnerIdentity::opaque(
131            runtime_scope_id.clone(),
132            uuid::Uuid::new_v4().to_string(),
133        );
134        Ok(Self {
135            session: Some(session),
136            policy,
137            host,
138            services,
139            state,
140            runtime_scope_id: Arc::<str>::from(runtime_scope_id),
141            runtime_lease_owner,
142            managed_sessions: Arc::new(Mutex::new(HashMap::new())),
143            managed_turns: Arc::new(Mutex::new(HashMap::new())),
144            protocol_turn_options,
145            shared_token_ledger: Arc::new(std::sync::Mutex::new(Vec::new())),
146            process_sync_needed: Arc::new(AtomicBool::new(false)),
147            turn_phase_probe: None,
148            residency: Residency::default(),
149        })
150    }
151
152    /// Build a runtime for an embedded host with no background worker support.
153    pub async fn from_embedded_state(
154        policy: SessionPolicy,
155        host: EmbeddedRuntimeHost,
156        services: RuntimeServices,
157        state: RuntimeSessionState,
158    ) -> Result<Self, SessionError> {
159        Self::from_host_state(policy, host.into(), services, state).await
160    }
161
162    /// Build a runtime for a host that supports background plugin work.
163    pub async fn from_background_state(
164        policy: SessionPolicy,
165        host: ProcessRuntimeHost,
166        services: RuntimeServices,
167        state: RuntimeSessionState,
168    ) -> Result<Self, SessionError> {
169        Self::from_host_state(policy, host.into(), services, state).await
170    }
171
172    /// Build a runtime for an embedded host with persistent store support.
173    pub async fn from_persistent_embedded_state(
174        policy: SessionPolicy,
175        host: EmbeddedRuntimeHost,
176        services: PersistentRuntimeServices,
177        state: RuntimeSessionState,
178    ) -> Result<Self, SessionError> {
179        Self::from_host_state(policy, host.into(), services.into_runtime_services(), state).await
180    }
181
182    /// Build a runtime for a background-capable host with persistent store support.
183    pub async fn from_persistent_background_state(
184        policy: SessionPolicy,
185        host: ProcessRuntimeHost,
186        services: PersistentRuntimeServices,
187        state: RuntimeSessionState,
188    ) -> Result<Self, SessionError> {
189        Self::from_host_state(policy, host.into(), services.into_runtime_services(), state).await
190    }
191
192    /// Assemble a runtime from already-resolved parts: the single place that maps
193    /// `(store, process_registry)` to the right host/services constructor, applies
194    /// residency, and stamps it onto the runtime.
195    ///
196    /// Every construction path — the live open (`from_environment`), the worker
197    /// rebuild (`EmbeddedRuntimeBuilder::build`), and child-session
198    /// materialization — routes through here so the store/registry wiring and
199    /// residency cannot drift between them. That drift previously shipped: the
200    /// worker rebuild silently kept the full graph and skipped the persisted
201    /// tool-catalog restore that the live path applied.
202    pub(crate) async fn assemble_runtime(
203        policy: SessionPolicy,
204        embedded_host: EmbeddedRuntimeHost,
205        plugin_session: Arc<crate::PluginSession>,
206        store: Option<Arc<dyn crate::store::RuntimePersistence>>,
207        process_registry: Option<Arc<dyn ProcessRegistry>>,
208        mut state: RuntimeSessionState,
209        residency: Residency,
210    ) -> Result<Self, SessionError> {
211        // ActivePathOnly without a store is a data-loss footgun: trimming drops
212        // orphans from RAM with nowhere to reload them from.
213        if matches!(residency, Residency::ActivePathOnly) && store.is_none() {
214            return Err(SessionError::Protocol(
215                "Residency::ActivePathOnly requires a persistent store — \
216                 without one, trimmed orphans are irrecoverable"
217                    .to_string(),
218            ));
219        }
220        // Heal FIRST (against the full resident set), then trim to the residency.
221        // `from_host_state` normalizes again, which is safe on a trimmed graph.
222        normalize_session_graph(&mut state);
223        apply_residency_on_load(&mut state, residency);
224        let mut runtime = match (store, process_registry) {
225            (Some(store), Some(registry)) => {
226                let host = ProcessRuntimeHost::new(embedded_host, registry);
227                let services = PersistentRuntimeServices::new(plugin_session, store);
228                Self::from_persistent_background_state(policy, host, services, state).await?
229            }
230            (Some(store), None) => {
231                let services = PersistentRuntimeServices::new(plugin_session, store);
232                Self::from_persistent_embedded_state(policy, embedded_host, services, state).await?
233            }
234            (None, Some(registry)) => {
235                let host = ProcessRuntimeHost::new(embedded_host, registry);
236                let services = RuntimeServices::new(plugin_session);
237                Self::from_background_state(policy, host, services, state).await?
238            }
239            (None, None) => {
240                let services = RuntimeServices::new(plugin_session);
241                Self::from_embedded_state(policy, embedded_host, services, state).await?
242            }
243        };
244        runtime.residency = residency;
245        Ok(runtime)
246    }
247
248    /// Embedder-preferred constructor: build a `LashRuntime` from a
249    /// shared `RuntimeEnvironment`.
250    ///
251    /// Everything expensive (plugin factories, HTTP client pool, prompt
252    /// template, path resolver) lives on the environment and is
253    /// reused across every runtime the embedder builds. This call is
254    /// O(plugin-session-registration + state-hydration), not
255    /// O(full-infrastructure-init).
256    ///
257    /// * `env` — the shared environment. `env.plugin_host` must be set.
258    /// * `policy` — per-session policy (model, provider, autonomy, turn limits).
259    /// * `state` — persisted session state (empty for a fresh session).
260    /// * `store` — per-session store. `None` builds an embedded runtime
261    ///   with no persistence; `Some` builds a persistent
262    ///   background-capable runtime.
263    pub async fn from_environment(
264        env: &RuntimeEnvironment,
265        policy: SessionPolicy,
266        state: RuntimeSessionState,
267        store: Option<Arc<dyn crate::store::RuntimePersistence>>,
268    ) -> Result<Self, SessionError> {
269        let plugin_host = env.plugin_host.as_ref().ok_or_else(|| {
270            SessionError::Protocol(
271                "RuntimeEnvironment.plugin_host is required for from_environment".to_string(),
272            )
273        })?;
274        let plugin_session = plugin_host
275            .build_session(state.session_id.as_str(), state.plugin_snapshot.as_ref())
276            .map_err(|err| SessionError::Protocol(err.to_string()))?;
277        let mut embedded = EmbeddedRuntimeHost::new(env.core.clone());
278        if let Some(factory) = env.session_store_factory.as_ref() {
279            embedded = embedded.with_session_store_factory(Arc::clone(factory));
280        }
281        if let Some(store) = env.trigger_store.as_ref() {
282            embedded = embedded.with_trigger_store(Arc::clone(store));
283        }
284        let mut runtime = Self::assemble_runtime(
285            policy,
286            embedded,
287            plugin_session,
288            store,
289            env.process_registry.as_ref().cloned(),
290            state,
291            env.residency,
292        )
293        .await?;
294        // Thread the host-owned work drivers onto this session's host so
295        // process starts and queued turns can drive ready work directly.
296        runtime.host.process_work_driver = env.process_work_driver.clone();
297        runtime.host.queued_work_driver = env.queued_work_driver.clone();
298        Ok(runtime)
299    }
300
301    /// Persist any dirty state and drop the runtime, returning a lightweight
302    /// handle the embedder can cache and resume later via
303    /// [`LashRuntime::resume`]. This is the webserver-embedder parking
304    /// primitive: the handle holds only the session id, policy, and store
305    /// reference — no graph nodes, no plugin session, no HTTP client.
306    pub async fn park(mut self) -> Result<ParkedSession, SessionError> {
307        let store = self.services.store.clone().ok_or_else(|| {
308            SessionError::Protocol(
309                "park() requires a persistent runtime (store is not set)".to_string(),
310            )
311        })?;
312        let session_id = self.state.session_id.clone();
313        let policy = self.policy.clone();
314        // Flush any dirty resident state to the store before dropping.
315        let commit = crate::store::RuntimeCommit::persisted_state(&self.state, &[]);
316        let result = commit_runtime_state_with_fresh_session_execution_lease(
317            Arc::clone(&store),
318            commit,
319            &self.runtime_lease_owner,
320            Arc::clone(&self.host.core.clock),
321        )
322        .await
323        .map_err(|err| SessionError::Protocol(format!("failed to persist runtime state: {err}")))?;
324        self.state.apply_persisted_commit_result(result);
325        // Drain pending tombstones if any. Under KeepHistory this is a
326        // no-op (tombstones never get added). Under DropOrphans, a future
327        // orphan-trim path would populate the set for Phase 10's vacuum()
328        // design.
329        Ok(ParkedSession {
330            session_id,
331            store,
332            policy,
333        })
334    }
335
336    /// Resume a previously parked session against a shared environment.
337    /// Loads only the active-path graph when
338    /// `env.residency == ActivePathOnly`; under `KeepAll`
339    /// loads the full graph (current behavior).
340    pub async fn resume(
341        parked: ParkedSession,
342        env: &RuntimeEnvironment,
343    ) -> Result<Self, SessionError> {
344        // Under ActivePathOnly, skip the full-graph load: fetch head
345        // metadata + the active-path chain only. Durable impls can
346        // ActivePathOnly is an exact store capability. Stores that do
347        // not support it must return UnsupportedReadScope; resume does
348        // not fall back to a full graph load.
349        let loaded = match env.residency {
350            Residency::KeepAll => {
351                crate::store::load_persisted_session_state(parked.store.as_ref()).await
352            }
353            Residency::ActivePathOnly => {
354                crate::store::load_persisted_session_state_active_path(parked.store.as_ref(), None)
355                    .await
356            }
357        }
358        .map_err(|err| SessionError::Protocol(format!("failed to load runtime state: {err}")))?;
359        let state = loaded.unwrap_or_else(|| RuntimeSessionState {
360            session_id: parked.session_id.clone(),
361            policy: parked.policy.clone(),
362            ..RuntimeSessionState::default()
363        });
364        Self::from_environment(env, parked.policy, state, Some(parked.store)).await
365    }
366
367    /// Opt-in async read for historic (non-active-path) nodes under
368    /// `Residency::ActivePathOnly`. Plugins that walk the full graph
369    /// call this instead of `session_graph().find_node()` so missing
370    /// nodes surface as `Ok(None)` rather than silently missing.
371    pub async fn get_historic_node(
372        &self,
373        node_id: &str,
374    ) -> Result<Option<crate::SessionNodeRecord>, SessionError> {
375        if let Some(node) = self.state.session_graph.find_node(node_id) {
376            return Ok(Some(node.clone()));
377        }
378        let store = self.services.store.clone().ok_or_else(|| {
379            SessionError::Protocol("get_historic_node() requires a persistent runtime".to_string())
380        })?;
381        store
382            .load_node(node_id)
383            .await
384            .map_err(|err| SessionError::Protocol(format!("failed to load historic node: {err}")))
385    }
386
387    /// Store-resident node IDs that are NOT reachable from the current
388    /// leaf — i.e. orphans eligible for tombstoning. lash owns RAM; the
389    /// host owns disk lifecycle, so this is a primitive the host calls
390    /// on its own schedule (e.g. every N turns, or off-peak).
391    ///
392    /// Typical autonomous-agent loop:
393    ///
394    /// ```ignore
395    /// let orphans = runtime.orphaned_node_ids().await?;
396    /// if !orphans.is_empty() {
397    ///     store.tombstone_nodes(&orphans).await;
398    /// }
399    /// // And less often:
400    /// store.vacuum().await;
401    /// ```
402    pub async fn orphaned_node_ids(&self) -> Result<Vec<String>, SessionError> {
403        let store = self.services.store.clone().ok_or_else(|| {
404            SessionError::Protocol("orphaned_node_ids() requires a persistent runtime".to_string())
405        })?;
406        let Some(read) = store
407            .load_session(crate::store::SessionReadScope::FullGraph)
408            .await
409            .map_err(|err| SessionError::Protocol(format!("failed to load full graph: {err}")))?
410        else {
411            return Ok(Vec::new());
412        };
413        let active: std::collections::HashSet<&str> = read
414            .graph
415            .active_path_nodes()
416            .iter()
417            .map(|node| node.node_id.as_str())
418            .collect();
419        Ok(read
420            .graph
421            .nodes
422            .iter()
423            .filter(|node| !active.contains(node.node_id.as_str()))
424            .map(|node| node.node_id.clone())
425            .collect())
426    }
427}