Skip to main content

lash_core/runtime/
lifecycle.rs

1use super::*;
2
3impl LashRuntime {
4    pub(super) async fn from_host_state(
5        policy: SessionPolicy,
6        host: RuntimeHost,
7        services: RuntimeServices,
8        mut state: PersistedSessionState,
9    ) -> Result<Self, SessionError> {
10        if state.session_id.is_empty() {
11            state.session_id = uuid::Uuid::new_v4().to_string();
12        }
13        // Defaulted state (e.g. `PersistedSessionState::default()` used
14        // by fresh-session constructors) carries an unconfigured policy.
15        // Fill it in from the caller's policy so tests and hosts that
16        // pass a real policy alongside default state don't trip the
17        // max_context_tokens guard below.
18        if state.policy.provider.kind() == "unconfigured" {
19            state.policy = policy.clone();
20        }
21        normalize_session_graph(&mut state);
22        if policy.max_context_tokens.is_none() {
23            return Err(SessionError::Protocol(
24                "session policy missing max_context_tokens; hosts must supply explicit model metadata"
25                    .to_string(),
26            ));
27        }
28        let services = services.with_attachment_store(Arc::clone(&host.core.attachment_store));
29        let mut session = Session::new(
30            services.clone(),
31            &state.session_id,
32            state.policy.execution_mode.clone(),
33        )
34        .await?;
35        if let Some(tool_state) = state.tool_state_snapshot.clone()
36            && let Err(err) = session.plugins().tool_registry().apply_state(tool_state)
37        {
38            tracing::warn!("failed to restore tool state from checkpoint: {err}");
39        }
40        if let Some(snapshot) = state.plugin_snapshot.clone() {
41            session
42                .plugins()
43                .restore(&snapshot)
44                .map_err(|err| SessionError::Protocol(err.to_string()))?;
45        }
46        let mode_session = Arc::clone(session.plugins().mode_session());
47        let session_id = state.session_id.clone();
48        mode_session
49            .restore_session(
50                crate::plugin::ModeSessionContext::new(&mut session, &session_id),
51                &state,
52            )
53            .await?;
54        state.discard_runtime_snapshots();
55        session
56            .plugins()
57            .emit_runtime_event(crate::PluginRuntimeEvent::SessionRestored(
58                crate::SessionReadView::from_persisted_state(&state),
59            ))
60            .await;
61        let mode_turn_options = state.mode_turn_options.clone();
62        Ok(Self {
63            session: Some(session),
64            policy,
65            host,
66            services,
67            state,
68            runtime_scope_id: Arc::<str>::from(uuid::Uuid::new_v4().to_string()),
69            managed_sessions: Arc::new(Mutex::new(HashMap::new())),
70            active_handoff_continuations: Arc::new(Mutex::new(HashMap::new())),
71            managed_turns: Arc::new(Mutex::new(HashMap::new())),
72            mode_turn_options,
73            shared_token_ledger: Arc::new(std::sync::Mutex::new(Vec::new())),
74            background_sync_needed: Arc::new(AtomicBool::new(false)),
75            pending_first_turn_inputs: Arc::new(std::sync::Mutex::new(HashMap::new())),
76            turn_phase_probe: None,
77        })
78    }
79
80    /// Build a runtime for an embedded host with no background worker support.
81    pub async fn from_embedded_state(
82        policy: SessionPolicy,
83        host: EmbeddedRuntimeHost,
84        services: RuntimeServices,
85        state: PersistedSessionState,
86    ) -> Result<Self, SessionError> {
87        Self::from_host_state(policy, host.into(), services, state).await
88    }
89
90    /// Build a runtime for a host that supports background plugin work.
91    pub async fn from_background_state(
92        policy: SessionPolicy,
93        host: BackgroundRuntimeHost,
94        services: RuntimeServices,
95        state: PersistedSessionState,
96    ) -> Result<Self, SessionError> {
97        Self::from_host_state(policy, host.into(), services, state).await
98    }
99
100    /// Build a runtime for an embedded host with persistent store support.
101    pub async fn from_persistent_embedded_state(
102        policy: SessionPolicy,
103        host: EmbeddedRuntimeHost,
104        services: PersistentRuntimeServices,
105        state: PersistedSessionState,
106    ) -> Result<Self, SessionError> {
107        Self::from_host_state(policy, host.into(), services.into_runtime_services(), state).await
108    }
109
110    /// Build a runtime for a background-capable host with persistent store support.
111    pub async fn from_persistent_background_state(
112        policy: SessionPolicy,
113        host: BackgroundRuntimeHost,
114        services: PersistentRuntimeServices,
115        state: PersistedSessionState,
116    ) -> Result<Self, SessionError> {
117        Self::from_host_state(policy, host.into(), services.into_runtime_services(), state).await
118    }
119
120    /// Embedder-preferred constructor: build a `LashRuntime` from a
121    /// shared `RuntimeEnvironment`.
122    ///
123    /// Everything expensive (plugin factories, HTTP client pool, prompt
124    /// template, path resolver) lives on the environment and is
125    /// reused across every runtime the embedder builds. This call is
126    /// O(plugin-session-registration + state-hydration), not
127    /// O(full-infrastructure-init).
128    ///
129    /// * `env` — the shared environment. `env.plugin_host` must be set.
130    /// * `policy` — per-session policy (model, provider, execution mode).
131    /// * `state` — persisted session state (empty for a fresh session).
132    /// * `store` — per-session store. `None` builds an embedded runtime
133    ///   with no persistence; `Some` builds a persistent
134    ///   background-capable runtime.
135    pub async fn from_environment(
136        env: &RuntimeEnvironment,
137        policy: SessionPolicy,
138        mut state: PersistedSessionState,
139        store: Option<Arc<dyn crate::store::RuntimePersistence>>,
140    ) -> Result<Self, SessionError> {
141        // ActivePathOnly without a store is a data-loss footgun: trim
142        // drops orphans from RAM with nowhere to reload them from.
143        if matches!(env.residency, Residency::ActivePathOnly) && store.is_none() {
144            return Err(SessionError::Protocol(
145                "Residency::ActivePathOnly requires a persistent store — \
146                 without one, trimmed orphans are irrecoverable"
147                    .to_string(),
148            ));
149        }
150        // Heal FIRST (against the full resident set), then trim.
151        // `heal_orphaned_leaf` is driven by `normalize_session_graph`
152        // which runs again inside `from_host_state`. Running it here
153        // too lets us trim safely before delegating.
154        normalize_session_graph(&mut state);
155        apply_residency_on_load(&mut state, env.residency);
156        let plugin_host = env.plugin_host.as_ref().ok_or_else(|| {
157            SessionError::Protocol(
158                "RuntimeEnvironment.plugin_host is required for from_environment".to_string(),
159            )
160        })?;
161        let plugin_session = plugin_host
162            .build_session(
163                state.session_id.as_str(),
164                policy.execution_mode.clone(),
165                policy.standard_context_approach.clone(),
166                state.plugin_snapshot.as_ref(),
167            )
168            .map_err(|err| SessionError::Protocol(err.to_string()))?;
169        let core = RuntimeCoreConfig {
170            attachment_store: Arc::clone(&env.attachment_store),
171            prompt: env.prompt.clone(),
172            trace_sink: env.trace_sink.clone(),
173            trace_level: env.trace_level,
174            trace_context: env.trace_context.clone(),
175            termination: env.termination.clone(),
176        };
177        let mut embedded = EmbeddedRuntimeHost::new(core);
178        if let Some(factory) = env.session_store_factory.as_ref() {
179            embedded = embedded.with_session_store_factory(Arc::clone(factory));
180        }
181        let runtime = if let Some(store) = store {
182            let services = PersistentRuntimeServices::new_with_bridges(
183                plugin_session,
184                crate::session::TurnInjectionBridge::new(),
185                crate::session::TurnInputInjectionBridge::new(),
186                store,
187            );
188            match env.background_task_host.as_ref() {
189                Some(executor) => {
190                    let host = BackgroundRuntimeHost::new(embedded, Arc::clone(executor));
191                    Self::from_persistent_background_state(policy, host, services, state).await?
192                }
193                None => {
194                    Self::from_persistent_embedded_state(policy, embedded, services, state).await?
195                }
196            }
197        } else {
198            let services = RuntimeServices::new(plugin_session);
199            match env.background_task_host.as_ref() {
200                Some(executor) => {
201                    let host = BackgroundRuntimeHost::new(embedded, Arc::clone(executor));
202                    Self::from_background_state(policy, host, services, state).await?
203                }
204                None => Self::from_embedded_state(policy, embedded, services, state).await?,
205            }
206        };
207        Ok(runtime)
208    }
209
210    /// Persist any dirty state and drop the runtime, returning a lightweight
211    /// handle the embedder can cache and resume later via
212    /// [`LashRuntime::resume`]. This is the webserver-embedder handoff
213    /// primitive: the handle holds only the session id, policy, and store
214    /// reference — no graph nodes, no plugin session, no HTTP client.
215    pub async fn park(mut self) -> Result<ParkedSession, SessionError> {
216        let store = self.services.store.clone().ok_or_else(|| {
217            SessionError::Protocol(
218                "park() requires a persistent runtime (store is not set)".to_string(),
219            )
220        })?;
221        let session_id = self.state.session_id.clone();
222        let policy = self.policy.clone();
223        // Flush any dirty resident state to the store before dropping.
224        let commit = crate::store::RuntimeCommit::persisted_state(&self.state, &[]);
225        let result = store.commit_runtime_state(commit).await.map_err(|err| {
226            SessionError::Protocol(format!("failed to persist runtime state: {err}"))
227        })?;
228        self.state.apply_persisted_commit_result(result);
229        // Drain pending tombstones if any. Under KeepHistory this is a
230        // no-op (tombstones never get added). Under DropOrphans,
231        // Phase-9's not-yet-wired rewrite path would have populated the
232        // set — wired fully in Phase 10's vacuum() design.
233        Ok(ParkedSession {
234            session_id,
235            store,
236            policy,
237        })
238    }
239
240    /// Resume a previously parked session against a shared environment.
241    /// Loads only the active-path graph when
242    /// `env.residency == ActivePathOnly`; under `KeepAll`
243    /// loads the full graph (current behavior).
244    pub async fn resume(
245        parked: ParkedSession,
246        env: &RuntimeEnvironment,
247    ) -> Result<Self, SessionError> {
248        // Under ActivePathOnly, skip the full-graph load: fetch head
249        // metadata + the active-path chain only. SQLite impls can
250        // ActivePathOnly is an exact store capability. Stores that do
251        // not support it must return UnsupportedReadScope; resume does
252        // not fall back to a full graph load.
253        let loaded = match env.residency {
254            Residency::KeepAll => {
255                crate::store::load_persisted_session_state(parked.store.as_ref()).await
256            }
257            Residency::ActivePathOnly => {
258                crate::store::load_persisted_session_state_active_path(parked.store.as_ref(), None)
259                    .await
260            }
261        }
262        .map_err(|err| SessionError::Protocol(format!("failed to load runtime state: {err}")))?;
263        let state = loaded.unwrap_or_else(|| PersistedSessionState {
264            session_id: parked.session_id.clone(),
265            policy: parked.policy.clone(),
266            ..PersistedSessionState::default()
267        });
268        Self::from_environment(env, parked.policy, state, Some(parked.store)).await
269    }
270
271    /// Opt-in async read for historic (non-active-path) nodes under
272    /// `Residency::ActivePathOnly`. Plugins that walk the full graph
273    /// call this instead of `session_graph().find_node()` so missing
274    /// nodes surface as `Ok(None)` rather than silently missing.
275    pub async fn get_historic_node(
276        &self,
277        node_id: &str,
278    ) -> Result<Option<crate::SessionNodeRecord>, SessionError> {
279        if let Some(node) = self.state.session_graph.find_node(node_id) {
280            return Ok(Some(node.clone()));
281        }
282        let store = self.services.store.clone().ok_or_else(|| {
283            SessionError::Protocol("get_historic_node() requires a persistent runtime".to_string())
284        })?;
285        store
286            .load_node(node_id)
287            .await
288            .map_err(|err| SessionError::Protocol(format!("failed to load historic node: {err}")))
289    }
290
291    /// Store-resident node IDs that are NOT reachable from the current
292    /// leaf — i.e. orphans eligible for tombstoning. lash owns RAM; the
293    /// host owns disk lifecycle, so this is a primitive the host calls
294    /// on its own schedule (e.g. every N turns, or off-peak).
295    ///
296    /// Typical autonomous-agent loop:
297    ///
298    /// ```ignore
299    /// let orphans = runtime.orphaned_node_ids().await?;
300    /// if !orphans.is_empty() {
301    ///     store.tombstone_nodes(&orphans).await;
302    /// }
303    /// // And less often:
304    /// store.vacuum().await;
305    /// ```
306    pub async fn orphaned_node_ids(&self) -> Result<Vec<String>, SessionError> {
307        let store = self.services.store.clone().ok_or_else(|| {
308            SessionError::Protocol("orphaned_node_ids() requires a persistent runtime".to_string())
309        })?;
310        let Some(read) = store
311            .load_session(crate::store::SessionReadScope::FullGraph)
312            .await
313            .map_err(|err| SessionError::Protocol(format!("failed to load full graph: {err}")))?
314        else {
315            return Ok(Vec::new());
316        };
317        let active: std::collections::HashSet<&str> = read
318            .graph
319            .active_path_nodes()
320            .iter()
321            .map(|node| node.node_id.as_str())
322            .collect();
323        Ok(read
324            .graph
325            .nodes
326            .iter()
327            .filter(|node| !active.contains(node.node_id.as_str()))
328            .map(|node| node.node_id.clone())
329            .collect())
330    }
331}