Skip to main content

lash_core/runtime/
lifecycle.rs

1use super::*;
2
3impl LashRuntime {
4    pub fn unregister_plugin_session(&self) -> Result<(), crate::PluginError> {
5        if let Some(session) = self.session.as_ref() {
6            session
7                .plugins()
8                .host()
9                .unregister_session(&self.state.session_id)?;
10        }
11        Ok(())
12    }
13
14    pub(super) async fn from_host_state(
15        policy: SessionPolicy,
16        host: RuntimeHost,
17        services: RuntimeServices,
18        mut state: RuntimeSessionState,
19    ) -> Result<Self, SessionError> {
20        if state.session_id.is_empty() {
21            state.session_id = uuid::Uuid::new_v4().to_string();
22        }
23        // Defaulted state (e.g. `RuntimeSessionState::default()` used
24        // by fresh-session constructors) carries an empty policy.
25        // Fill it in from the caller's policy so tests and hosts that
26        // pass a real policy alongside default state don't trip the explicit
27        // model-spec guard below.
28        let state_policy_was_unconfigured = state.policy.recorded_provider_id().is_empty()
29            && state.policy.model.id.trim().is_empty();
30        if state_policy_was_unconfigured {
31            state.policy = policy.clone();
32        }
33        state.ensure_agent_frame_initialized();
34        let state_policy = state.policy.clone();
35        if let Some(frame) = state.current_agent_frame_mut()
36            && frame.assignment.policy.recorded_provider_id().is_empty()
37            && frame.assignment.policy.model.id.trim().is_empty()
38        {
39            frame.assignment.policy = state_policy;
40        }
41        state.policy = state.effective_policy().clone();
42        state.protocol_turn_options = state.effective_protocol_turn_options().clone();
43        normalize_session_graph(&mut state);
44        let policy = state.effective_policy().clone();
45        if policy.model.id.trim().is_empty() {
46            return Err(SessionError::Protocol(
47                "session policy missing model spec; hosts must supply explicit model metadata"
48                    .to_string(),
49            ));
50        }
51        let mut host = host;
52        // When a persistent backend is wired in, wrap the attachment
53        // store so every `put` records a write-ahead intent row first.
54        // Crashes between put and the next turn commit then surface as
55        // uncommitted manifest rows that GC can reconcile. Ephemeral
56        // (no-store) runtimes use the inner store directly — there's
57        // nothing to reconcile against.
58        if let Some(store) = services.store.clone() {
59            let manifest: Arc<dyn crate::AttachmentManifest> =
60                Arc::new(crate::attachments::PersistenceManifestAdapter(store));
61            let scoped: Arc<dyn crate::AttachmentStore> =
62                Arc::new(crate::SessionScopedAttachmentStore::new(
63                    Arc::clone(&host.core.durability.attachment_store),
64                    manifest,
65                    state.session_id.clone(),
66                ));
67            host.core.durability.attachment_store = scoped;
68        }
69        let services = services
70            .with_attachment_store(Arc::clone(&host.core.durability.attachment_store))
71            .with_process_env_store(Arc::clone(&host.core.durability.process_env_store))
72            .with_clock(Arc::clone(&host.core.clock));
73        let mut session = Session::new(services.clone(), &state.session_id).await?;
74        if let Some(tool_state) = state.tool_state_snapshot.clone() {
75            // Cold rebuild restores the exact persisted tool catalog, adopting
76            // the snapshot's generation. `apply_state` (a delta-apply that
77            // requires `snapshot.generation == base` and bumps) would reject a
78            // session whose surface reached generation ≥ 2 onto a fresh base-1
79            // registry — the worker-rebuild / restart divergence. `restore_state`
80            // adopts the snapshot's generation wholesale, so any generation
81            // rebuilds.
82            let report = session
83                .plugins()
84                .tool_registry()
85                .restore_state(tool_state)
86                .map_err(|err| SessionError::Protocol(err.to_string()))?;
87            if !report.orphaned.is_empty() {
88                tracing::warn!(
89                    session_id = %state.session_id,
90                    orphaned = ?report.orphaned,
91                    "session restored with orphaned tools: no registered source \
92                     resolves them; they are Off until their source returns"
93                );
94            }
95        }
96        session.refresh_tool_catalog().await?;
97        if let Some(snapshot) = state.plugin_snapshot.clone() {
98            session
99                .plugins()
100                .restore(&snapshot)
101                .map_err(|err| SessionError::Protocol(err.to_string()))?;
102        }
103        let protocol_session = Arc::clone(session.plugins().protocol_session());
104        let session_id = state.session_id.clone();
105        protocol_session
106            .restore_session(
107                crate::plugin::ProtocolSessionContext::new(&mut session, &session_id),
108                &state,
109            )
110            .await?;
111        state.discard_runtime_snapshots();
112        session
113            .plugins()
114            .emit_runtime_event(crate::PluginLifecycleEvent::SessionRestored(
115                crate::SessionReadView::from_persisted_state(&state),
116            ))
117            .await;
118        let protocol_turn_options = state.protocol_turn_options.clone();
119        Ok(Self {
120            session: Some(session),
121            policy,
122            host,
123            services,
124            state,
125            runtime_scope_id: Arc::<str>::from(uuid::Uuid::new_v4().to_string()),
126            managed_sessions: Arc::new(Mutex::new(HashMap::new())),
127            managed_turns: Arc::new(Mutex::new(HashMap::new())),
128            protocol_turn_options,
129            shared_token_ledger: Arc::new(std::sync::Mutex::new(Vec::new())),
130            process_sync_needed: Arc::new(AtomicBool::new(false)),
131            turn_phase_probe: None,
132            residency: Residency::default(),
133        })
134    }
135
136    /// Build a runtime for an embedded host with no background worker support.
137    pub async fn from_embedded_state(
138        policy: SessionPolicy,
139        host: EmbeddedRuntimeHost,
140        services: RuntimeServices,
141        state: RuntimeSessionState,
142    ) -> Result<Self, SessionError> {
143        Self::from_host_state(policy, host.into(), services, state).await
144    }
145
146    /// Build a runtime for a host that supports background plugin work.
147    pub async fn from_background_state(
148        policy: SessionPolicy,
149        host: ProcessRuntimeHost,
150        services: RuntimeServices,
151        state: RuntimeSessionState,
152    ) -> Result<Self, SessionError> {
153        Self::from_host_state(policy, host.into(), services, state).await
154    }
155
156    /// Build a runtime for an embedded host with persistent store support.
157    pub async fn from_persistent_embedded_state(
158        policy: SessionPolicy,
159        host: EmbeddedRuntimeHost,
160        services: PersistentRuntimeServices,
161        state: RuntimeSessionState,
162    ) -> Result<Self, SessionError> {
163        Self::from_host_state(policy, host.into(), services.into_runtime_services(), state).await
164    }
165
166    /// Build a runtime for a background-capable host with persistent store support.
167    pub async fn from_persistent_background_state(
168        policy: SessionPolicy,
169        host: ProcessRuntimeHost,
170        services: PersistentRuntimeServices,
171        state: RuntimeSessionState,
172    ) -> Result<Self, SessionError> {
173        Self::from_host_state(policy, host.into(), services.into_runtime_services(), state).await
174    }
175
176    /// Assemble a runtime from already-resolved parts: the single place that maps
177    /// `(store, process_registry)` to the right host/services constructor, applies
178    /// residency, and stamps it onto the runtime.
179    ///
180    /// Every construction path — the live open (`from_environment`), the worker
181    /// rebuild (`EmbeddedRuntimeBuilder::build`), and child-session
182    /// materialization — routes through here so the store/registry wiring and
183    /// residency cannot drift between them. That drift previously shipped: the
184    /// worker rebuild silently kept the full graph and skipped the persisted
185    /// tool-catalog restore that the live path applied.
186    pub(crate) async fn assemble_runtime(
187        policy: SessionPolicy,
188        embedded_host: EmbeddedRuntimeHost,
189        plugin_session: Arc<crate::PluginSession>,
190        store: Option<Arc<dyn crate::store::RuntimePersistence>>,
191        process_registry: Option<Arc<dyn ProcessRegistry>>,
192        mut state: RuntimeSessionState,
193        residency: Residency,
194    ) -> Result<Self, SessionError> {
195        // ActivePathOnly without a store is a data-loss footgun: trimming drops
196        // orphans from RAM with nowhere to reload them from.
197        if matches!(residency, Residency::ActivePathOnly) && store.is_none() {
198            return Err(SessionError::Protocol(
199                "Residency::ActivePathOnly requires a persistent store — \
200                 without one, trimmed orphans are irrecoverable"
201                    .to_string(),
202            ));
203        }
204        // Heal FIRST (against the full resident set), then trim to the residency.
205        // `from_host_state` normalizes again, which is safe on a trimmed graph.
206        normalize_session_graph(&mut state);
207        apply_residency_on_load(&mut state, residency);
208        let mut runtime = match (store, process_registry) {
209            (Some(store), Some(registry)) => {
210                let host = ProcessRuntimeHost::new(embedded_host, registry);
211                let services = PersistentRuntimeServices::new(plugin_session, store);
212                Self::from_persistent_background_state(policy, host, services, state).await?
213            }
214            (Some(store), None) => {
215                let services = PersistentRuntimeServices::new(plugin_session, store);
216                Self::from_persistent_embedded_state(policy, embedded_host, services, state).await?
217            }
218            (None, Some(registry)) => {
219                let host = ProcessRuntimeHost::new(embedded_host, registry);
220                let services = RuntimeServices::new(plugin_session);
221                Self::from_background_state(policy, host, services, state).await?
222            }
223            (None, None) => {
224                let services = RuntimeServices::new(plugin_session);
225                Self::from_embedded_state(policy, embedded_host, services, state).await?
226            }
227        };
228        runtime.residency = residency;
229        Ok(runtime)
230    }
231
232    /// Embedder-preferred constructor: build a `LashRuntime` from a
233    /// shared `RuntimeEnvironment`.
234    ///
235    /// Everything expensive (plugin factories, HTTP client pool, prompt
236    /// template, path resolver) lives on the environment and is
237    /// reused across every runtime the embedder builds. This call is
238    /// O(plugin-session-registration + state-hydration), not
239    /// O(full-infrastructure-init).
240    ///
241    /// * `env` — the shared environment. `env.plugin_host` must be set.
242    /// * `policy` — per-session policy (model, provider, autonomy, turn limits).
243    /// * `state` — persisted session state (empty for a fresh session).
244    /// * `store` — per-session store. `None` builds an embedded runtime
245    ///   with no persistence; `Some` builds a persistent
246    ///   background-capable runtime.
247    pub async fn from_environment(
248        env: &RuntimeEnvironment,
249        policy: SessionPolicy,
250        state: RuntimeSessionState,
251        store: Option<Arc<dyn crate::store::RuntimePersistence>>,
252    ) -> Result<Self, SessionError> {
253        let plugin_host = env.plugin_host.as_ref().ok_or_else(|| {
254            SessionError::Protocol(
255                "RuntimeEnvironment.plugin_host is required for from_environment".to_string(),
256            )
257        })?;
258        let plugin_session = plugin_host
259            .build_session(state.session_id.as_str(), state.plugin_snapshot.as_ref())
260            .map_err(|err| SessionError::Protocol(err.to_string()))?;
261        let mut embedded = EmbeddedRuntimeHost::new(env.core.clone());
262        if let Some(factory) = env.session_store_factory.as_ref() {
263            embedded = embedded.with_session_store_factory(Arc::clone(factory));
264        }
265        if let Some(store) = env.trigger_store.as_ref() {
266            embedded = embedded.with_trigger_store(Arc::clone(store));
267        }
268        let mut runtime = Self::assemble_runtime(
269            policy,
270            embedded,
271            plugin_session,
272            store,
273            env.process_registry.as_ref().cloned(),
274            state,
275            env.residency,
276        )
277        .await?;
278        // Thread the host-owned work drivers onto this session's host so
279        // process starts and queued turns can drive ready work directly.
280        runtime.host.process_work_driver = env.process_work_driver.clone();
281        runtime.host.queued_work_driver = env.queued_work_driver.clone();
282        Ok(runtime)
283    }
284
285    /// Persist any dirty state and drop the runtime, returning a lightweight
286    /// handle the embedder can cache and resume later via
287    /// [`LashRuntime::resume`]. This is the webserver-embedder parking
288    /// primitive: the handle holds only the session id, policy, and store
289    /// reference — no graph nodes, no plugin session, no HTTP client.
290    pub async fn park(mut self) -> Result<ParkedSession, SessionError> {
291        let store = self.services.store.clone().ok_or_else(|| {
292            SessionError::Protocol(
293                "park() requires a persistent runtime (store is not set)".to_string(),
294            )
295        })?;
296        let session_id = self.state.session_id.clone();
297        let policy = self.policy.clone();
298        // Flush any dirty resident state to the store before dropping.
299        let commit = crate::store::RuntimeCommit::persisted_state(&self.state, &[]);
300        let result = store.commit_runtime_state(commit).await.map_err(|err| {
301            SessionError::Protocol(format!("failed to persist runtime state: {err}"))
302        })?;
303        self.state.apply_persisted_commit_result(result);
304        // Drain pending tombstones if any. Under KeepHistory this is a
305        // no-op (tombstones never get added). Under DropOrphans, a future
306        // orphan-trim path would populate the set for Phase 10's vacuum()
307        // design.
308        Ok(ParkedSession {
309            session_id,
310            store,
311            policy,
312        })
313    }
314
315    /// Resume a previously parked session against a shared environment.
316    /// Loads only the active-path graph when
317    /// `env.residency == ActivePathOnly`; under `KeepAll`
318    /// loads the full graph (current behavior).
319    pub async fn resume(
320        parked: ParkedSession,
321        env: &RuntimeEnvironment,
322    ) -> Result<Self, SessionError> {
323        // Under ActivePathOnly, skip the full-graph load: fetch head
324        // metadata + the active-path chain only. Durable impls can
325        // ActivePathOnly is an exact store capability. Stores that do
326        // not support it must return UnsupportedReadScope; resume does
327        // not fall back to a full graph load.
328        let loaded = match env.residency {
329            Residency::KeepAll => {
330                crate::store::load_persisted_session_state(parked.store.as_ref()).await
331            }
332            Residency::ActivePathOnly => {
333                crate::store::load_persisted_session_state_active_path(parked.store.as_ref(), None)
334                    .await
335            }
336        }
337        .map_err(|err| SessionError::Protocol(format!("failed to load runtime state: {err}")))?;
338        let state = loaded.unwrap_or_else(|| RuntimeSessionState {
339            session_id: parked.session_id.clone(),
340            policy: parked.policy.clone(),
341            ..RuntimeSessionState::default()
342        });
343        Self::from_environment(env, parked.policy, state, Some(parked.store)).await
344    }
345
346    /// Opt-in async read for historic (non-active-path) nodes under
347    /// `Residency::ActivePathOnly`. Plugins that walk the full graph
348    /// call this instead of `session_graph().find_node()` so missing
349    /// nodes surface as `Ok(None)` rather than silently missing.
350    pub async fn get_historic_node(
351        &self,
352        node_id: &str,
353    ) -> Result<Option<crate::SessionNodeRecord>, SessionError> {
354        if let Some(node) = self.state.session_graph.find_node(node_id) {
355            return Ok(Some(node.clone()));
356        }
357        let store = self.services.store.clone().ok_or_else(|| {
358            SessionError::Protocol("get_historic_node() requires a persistent runtime".to_string())
359        })?;
360        store
361            .load_node(node_id)
362            .await
363            .map_err(|err| SessionError::Protocol(format!("failed to load historic node: {err}")))
364    }
365
366    /// Store-resident node IDs that are NOT reachable from the current
367    /// leaf — i.e. orphans eligible for tombstoning. lash owns RAM; the
368    /// host owns disk lifecycle, so this is a primitive the host calls
369    /// on its own schedule (e.g. every N turns, or off-peak).
370    ///
371    /// Typical autonomous-agent loop:
372    ///
373    /// ```ignore
374    /// let orphans = runtime.orphaned_node_ids().await?;
375    /// if !orphans.is_empty() {
376    ///     store.tombstone_nodes(&orphans).await;
377    /// }
378    /// // And less often:
379    /// store.vacuum().await;
380    /// ```
381    pub async fn orphaned_node_ids(&self) -> Result<Vec<String>, SessionError> {
382        let store = self.services.store.clone().ok_or_else(|| {
383            SessionError::Protocol("orphaned_node_ids() requires a persistent runtime".to_string())
384        })?;
385        let Some(read) = store
386            .load_session(crate::store::SessionReadScope::FullGraph)
387            .await
388            .map_err(|err| SessionError::Protocol(format!("failed to load full graph: {err}")))?
389        else {
390            return Ok(Vec::new());
391        };
392        let active: std::collections::HashSet<&str> = read
393            .graph
394            .active_path_nodes()
395            .iter()
396            .map(|node| node.node_id.as_str())
397            .collect();
398        Ok(read
399            .graph
400            .nodes
401            .iter()
402            .filter(|node| !active.contains(node.node_id.as_str()))
403            .map(|node| node.node_id.clone())
404            .collect())
405    }
406}