Skip to main content

lash_core/runtime/
session_ops.rs

1//! `LashRuntime` session-graph and execution-state operations.
2//!
3//! Extracted from `runtime/mod.rs`. This file re-opens `impl LashRuntime`;
4//! no types live here and no public API is changed.
5
6use std::sync::Arc;
7
8use crate::{PluginOperationInvokeError, SessionError};
9
10use super::LashRuntime;
11use super::state::{
12    RuntimeSessionState, append_session_nodes_to_state_with_clock, normalize_session_graph,
13};
14
15impl LashRuntime {
16    /// Replace the host-owned state envelope.
17    pub fn set_persisted_state(&mut self, state: RuntimeSessionState) -> Result<(), SessionError> {
18        let mut state = state;
19        normalize_session_graph(&mut state);
20        if let Some(session) = self.session.as_ref() {
21            session.invalidate_runtime_caches();
22            // Restore the persisted tool catalog so the live registry matches the
23            // state being installed (mirrors `from_host_state`). Without this the
24            // registry keeps its prior generation/tools and silently diverges from
25            // `state`. `restore_state` adopts the snapshot's generation, so a
26            // surface that reached generation >= 2 restores cleanly.
27            if let Some(tool_state) = state.tool_state_snapshot.clone() {
28                let report = session
29                    .plugins()
30                    .tool_registry()
31                    .restore_state(tool_state)
32                    .map_err(|err| SessionError::Protocol(err.to_string()))?;
33                if !report.orphaned.is_empty() {
34                    tracing::warn!(
35                        session_id = %state.session_id,
36                        orphaned = ?report.orphaned,
37                        "persisted state installed with orphaned tools: no registered \
38                         source resolves them; they remain non-members until their source returns"
39                    );
40                }
41            }
42            let snapshot = state.plugin_snapshot.clone().unwrap_or_default();
43            session
44                .plugins()
45                .restore(&snapshot)
46                .map_err(|err| SessionError::Protocol(err.to_string()))?;
47            state.plugin_snapshot_revision =
48                Some(session.plugins().snapshot_revision_fingerprint());
49        }
50        self.policy = state.policy.clone();
51        self.protocol_turn_options = state.protocol_turn_options.clone();
52        self.state = state;
53        Ok(())
54    }
55
56    pub async fn append_session_nodes(
57        &mut self,
58        request: crate::AppendSessionNodesRequest,
59    ) -> Result<crate::AppendSessionNodesResult, SessionError> {
60        self.refresh_session_graph_from_store().await?;
61        if let Some(required) = request.requires_ancestor_node_id.as_deref()
62            && !self.state.session_graph.active_path_contains(required)
63        {
64            return Ok(crate::AppendSessionNodesResult::StaleBranch {
65                current_leaf_node_id: self.state.session_graph.leaf_node_id.clone(),
66            });
67        }
68        let node_ids = append_session_nodes_to_state_with_clock(
69            &mut self.state,
70            &request.nodes,
71            self.host.core.clock.as_ref(),
72        );
73        if let Some(session) = self.session.as_mut() {
74            let protocol_session = Arc::clone(session.plugins().protocol_session());
75            let session_id = self.state.session_id.clone();
76            protocol_session
77                .append_session_nodes(
78                    crate::plugin::ProtocolSessionContext::new(session, &session_id),
79                    &request.nodes,
80                )
81                .await?;
82        }
83        self.stamp_live_plugin_state();
84        if let Some(store) = self
85            .session
86            .as_ref()
87            .and_then(|session| session.history_store())
88        {
89            let graph = crate::store::GraphCommitDelta::Append {
90                nodes: node_ids
91                    .iter()
92                    .filter_map(|id| self.state.session_graph.find_node(id).cloned())
93                    .collect(),
94                leaf_node_id: self.state.session_graph.leaf_node_id.clone(),
95            };
96            let commit = crate::store::RuntimeCommit::persisted_state_with_graph_commit(
97                &self.state,
98                graph,
99                &[],
100            );
101            match super::commit_runtime_state_with_fresh_session_execution_lease(
102                store,
103                commit,
104                &self.runtime_lease_owner,
105                self.host.core.control.lease_timings,
106                Arc::clone(&self.host.core.clock),
107            )
108            .await
109            {
110                Ok(result) => self.state.apply_persisted_commit_result(result),
111                Err(err) => tracing::warn!("failed to persist runtime state: {err}"),
112            }
113        }
114        Ok(crate::AppendSessionNodesResult::Appended {
115            node_ids,
116            leaf_node_id: self
117                .state
118                .session_graph
119                .leaf_node_id
120                .clone()
121                .unwrap_or_default(),
122        })
123    }
124
125    pub async fn apply_protocol_session_extension(
126        &mut self,
127        extension: crate::ProtocolSessionExtensionHandle,
128    ) -> Result<(), SessionError> {
129        let Some(session) = self.session.as_ref() else {
130            return Err(SessionError::Protocol(
131                "runtime session is not available".to_string(),
132            ));
133        };
134        let protocol_session = Arc::clone(session.plugins().protocol_session());
135        protocol_session.apply_session_extension(extension).await
136    }
137
138    pub async fn validate_protocol_turn_extension(
139        &mut self,
140        extension: &crate::ProtocolTurnExtensionHandle,
141    ) -> Result<(), SessionError> {
142        let Some(session) = self.session.as_ref() else {
143            return Err(SessionError::Protocol(
144                "runtime session is not available".to_string(),
145            ));
146        };
147        let protocol_session = Arc::clone(session.plugins().protocol_session());
148        protocol_session.validate_turn_extension(extension).await
149    }
150
151    pub async fn branch_to_node(
152        &mut self,
153        node_id: Option<String>,
154    ) -> Result<crate::SessionSnapshot, SessionError> {
155        let mut state = self.export_state();
156        state.session_graph.branch_to(node_id);
157        let mut persisted_state = RuntimeSessionState::from_snapshot(state);
158        normalize_session_graph(&mut persisted_state);
159
160        let policy = persisted_state.policy.clone();
161        let host = self.host.clone();
162        let services = self.services.clone();
163        let managed_sessions = Arc::clone(&self.managed_sessions);
164        let managed_turns = Arc::clone(&self.managed_turns);
165        let process_sync_needed = Arc::clone(&self.process_sync_needed);
166        let runtime_scope_id = Arc::clone(&self.runtime_scope_id);
167        let runtime_lease_owner = self.runtime_lease_owner.clone();
168        let turn_phase_probe = self.turn_phase_probe.clone();
169
170        let mut rebuilt = Self::from_host_state(policy, host, services, persisted_state).await?;
171        rebuilt.managed_sessions = managed_sessions;
172        rebuilt.managed_turns = managed_turns;
173        rebuilt.process_sync_needed = process_sync_needed;
174        rebuilt.runtime_scope_id = runtime_scope_id;
175        rebuilt.runtime_lease_owner = runtime_lease_owner;
176        rebuilt.turn_phase_probe = turn_phase_probe;
177
178        let exported = rebuilt.export_state();
179        *self = rebuilt;
180        Ok(exported)
181    }
182
183    /// Promote a managed child session into the foreground runtime.
184    ///
185    /// Child sessions created through `SessionLifecycleService::create_session` are real
186    /// runtimes, not serialized placeholders. Foreground activation must therefore
187    /// claim that runtime instead of reconstructing a new empty state in the UI.
188    pub async fn activate_managed_session(&mut self, session_id: &str) -> Result<(), SessionError> {
189        let child = {
190            let mut registry = self.managed_sessions.lock().await;
191            registry.remove(session_id).ok_or_else(|| {
192                SessionError::Protocol(format!("unknown managed session `{session_id}`"))
193            })?
194        };
195        let child = child.try_into_runtime().map_err(|_| {
196            SessionError::Protocol(format!("managed session `{session_id}` is still in use"))
197        })?;
198        *self = child;
199        Ok(())
200    }
201
202    /// Explicitly snapshot protocol-local execution state, if any.
203    pub async fn snapshot_execution_state(&mut self) -> Result<Option<Vec<u8>>, SessionError> {
204        let Some(session) = self.session.as_mut() else {
205            return Err(SessionError::Protocol(
206                "runtime session not available".to_string(),
207            ));
208        };
209        let code_executor = session
210            .plugins()
211            .code_executor()
212            .ok_or(SessionError::CodeExecutionUnavailable)?;
213        let session_id = self.state.session_id.clone();
214        let blob = code_executor
215            .snapshot_execution_state(crate::plugin::ProtocolSessionContext::new(
216                session,
217                &session_id,
218            ))
219            .await?;
220        self.state.execution_state_snapshot = blob.clone();
221        Ok(blob)
222    }
223
224    /// Explicitly restore protocol-local execution state from an opaque snapshot blob.
225    pub async fn restore_execution_state(&mut self, snapshot: &[u8]) -> Result<(), SessionError> {
226        let Some(session) = self.session.as_mut() else {
227            return Err(SessionError::Protocol(
228                "runtime session not available".to_string(),
229            ));
230        };
231        let code_executor = session
232            .plugins()
233            .code_executor()
234            .ok_or(SessionError::CodeExecutionUnavailable)?;
235        let session_id = self.state.session_id.clone();
236        code_executor
237            .restore_execution_state(
238                crate::plugin::ProtocolSessionContext::new(session, &session_id),
239                snapshot,
240            )
241            .await?;
242        self.state.execution_state_snapshot = Some(snapshot.to_vec());
243        Ok(())
244    }
245
246    pub async fn list_trigger_registrations(
247        &self,
248    ) -> Result<Vec<crate::TriggerRegistration>, SessionError> {
249        let store = self.host.trigger_store.as_ref().ok_or_else(|| {
250            SessionError::Protocol("trigger store is unavailable in this runtime".to_string())
251        })?;
252        let records = store
253            .list_subscriptions(crate::TriggerSubscriptionFilter::for_session(
254                self.state.session_id.clone(),
255            ))
256            .await
257            .map_err(|err| SessionError::Protocol(err.to_string()))?;
258        Ok(records
259            .iter()
260            .map(crate::TriggerRegistration::from)
261            .collect())
262    }
263
264    pub async fn trigger_registrations_by_source_type(
265        &self,
266        source_type: impl Into<crate::TriggerEventType>,
267    ) -> Result<Vec<crate::TriggerRegistration>, SessionError> {
268        let store = self.host.trigger_store.as_ref().ok_or_else(|| {
269            SessionError::Protocol("trigger store is unavailable in this runtime".to_string())
270        })?;
271        let mut filter =
272            crate::TriggerSubscriptionFilter::for_session(self.state.session_id.clone());
273        filter.source_type = Some(source_type.into().to_string());
274        let records = store
275            .list_subscriptions(filter)
276            .await
277            .map_err(|err| SessionError::Protocol(err.to_string()))?;
278        Ok(records
279            .iter()
280            .map(crate::TriggerRegistration::from)
281            .collect())
282    }
283
284    pub async fn query_plugin(
285        &self,
286        name: &str,
287        args: serde_json::Value,
288        session_id: Option<String>,
289    ) -> Result<(String, serde_json::Value), PluginOperationInvokeError> {
290        let manager = self.runtime_session_services()?;
291        let Some(session) = self.session.as_ref() else {
292            return Err(PluginOperationInvokeError::Unknown(
293                "runtime session not available".to_string(),
294            ));
295        };
296        session
297            .plugins()
298            .query_plugin(
299                name,
300                args,
301                session_id,
302                true,
303                manager.read_service(),
304                manager.process_read_service(),
305            )
306            .await
307    }
308
309    pub async fn run_plugin_command(
310        &mut self,
311        name: &str,
312        args: serde_json::Value,
313        session_id: Option<String>,
314    ) -> Result<crate::PluginCommandReceipt<serde_json::Value>, PluginOperationInvokeError> {
315        let manager = self.runtime_session_services()?;
316        let Some(session) = self.session.as_ref() else {
317            return Err(PluginOperationInvokeError::Unknown(
318                "runtime session not available".to_string(),
319            ));
320        };
321        let (plugin_id, outcome) = session
322            .plugins()
323            .run_plugin_command(
324                name,
325                args,
326                session_id,
327                true,
328                manager.state_service(),
329                manager.lifecycle_service(),
330                manager.graph_service(),
331                manager.process_service(),
332            )
333            .await?;
334        let (events, pending_turn_inputs) = self
335            .apply_plugin_operation_effects(&plugin_id, outcome.events, outcome.directives)
336            .await?;
337        Ok(crate::PluginCommandReceipt {
338            output: outcome.output,
339            events,
340            pending_turn_inputs,
341        })
342    }
343
344    pub async fn run_plugin_task(
345        &mut self,
346        name: &str,
347        args: serde_json::Value,
348        session_id: Option<String>,
349        scoped_effect_controller: crate::ScopedEffectController<'static>,
350        cancellation_token: tokio_util::sync::CancellationToken,
351    ) -> Result<crate::PluginTaskReceipt<serde_json::Value>, PluginOperationInvokeError> {
352        let manager = self.runtime_session_services()?;
353        let Some(session) = self.session.as_ref() else {
354            return Err(PluginOperationInvokeError::Unknown(
355                "runtime session not available".to_string(),
356            ));
357        };
358        let (plugin_id, outcome) = session
359            .plugins()
360            .run_plugin_task(
361                name,
362                args,
363                session_id,
364                true,
365                manager.state_service(),
366                manager.lifecycle_service(),
367                manager.graph_service(),
368                manager.process_service(),
369                scoped_effect_controller,
370                cancellation_token,
371            )
372            .await?;
373        let (events, pending_turn_inputs) = self
374            .apply_plugin_operation_effects(&plugin_id, outcome.events, outcome.directives)
375            .await?;
376        Ok(crate::PluginTaskReceipt {
377            output: outcome.output,
378            events,
379            pending_turn_inputs,
380        })
381    }
382
383    async fn apply_plugin_operation_effects(
384        &mut self,
385        plugin_id: &str,
386        events: Vec<crate::PluginRuntimeEvent>,
387        directives: Vec<crate::PluginRuntimeDirective>,
388    ) -> Result<
389        (
390            Vec<crate::PluginOwned<crate::PluginRuntimeEvent>>,
391            Vec<crate::PendingTurnInput>,
392        ),
393        PluginOperationInvokeError,
394    > {
395        let owned_events = events
396            .into_iter()
397            .map(|event| crate::PluginOwned {
398                plugin_id: plugin_id.to_string(),
399                value: event,
400            })
401            .collect::<Vec<_>>();
402        if !owned_events.is_empty() {
403            let nodes = owned_events
404                .iter()
405                .map(|owned| {
406                    crate::plugin_runtime_protocol_event(&owned.plugin_id, owned.value.clone())
407                        .map(crate::SessionAppendNode::protocol_event)
408                        .map_err(|err| {
409                            PluginOperationInvokeError::Failed(format!(
410                                "failed to encode plugin runtime event: {err}"
411                            ))
412                        })
413                })
414                .collect::<Result<Vec<_>, _>>()?;
415            self.append_plugin_runtime_event_nodes(&nodes).await?;
416        }
417        self.stamp_live_plugin_state();
418        self.persist_plugin_operation_state_if_needed().await?;
419
420        let mut pending_turn_inputs = Vec::new();
421        for directive in directives {
422            match directive {
423                crate::PluginRuntimeDirective::QueueTurn { input, source_key } => {
424                    let pending = self
425                        .enqueue_turn_input(input, crate::TurnInputIngress::NextTurn, source_key)
426                        .await
427                        .map_err(|err| {
428                            PluginOperationInvokeError::Failed(format!(
429                                "failed to queue plugin turn request: {err}"
430                            ))
431                        })?;
432                    pending_turn_inputs.push(pending);
433                }
434            }
435        }
436
437        Ok((owned_events, pending_turn_inputs))
438    }
439
440    async fn append_plugin_runtime_event_nodes(
441        &mut self,
442        nodes: &[crate::SessionAppendNode],
443    ) -> Result<(), PluginOperationInvokeError> {
444        let node_ids = append_session_nodes_to_state_with_clock(
445            &mut self.state,
446            nodes,
447            self.host.core.clock.as_ref(),
448        );
449        if let Some(store) = self
450            .session
451            .as_ref()
452            .and_then(|session| session.history_store())
453        {
454            let graph = crate::store::GraphCommitDelta::Append {
455                nodes: node_ids
456                    .iter()
457                    .filter_map(|id| self.state.session_graph.find_node(id).cloned())
458                    .collect(),
459                leaf_node_id: self.state.session_graph.leaf_node_id.clone(),
460            };
461            let commit = crate::store::RuntimeCommit::persisted_state_with_graph_commit(
462                &self.state,
463                graph,
464                &[],
465            );
466            let result = super::commit_runtime_state_with_fresh_session_execution_lease(
467                store,
468                commit,
469                &self.runtime_lease_owner,
470                self.host.core.control.lease_timings,
471                Arc::clone(&self.host.core.clock),
472            )
473            .await
474            .map_err(|err| {
475                PluginOperationInvokeError::Failed(format!(
476                    "failed to persist plugin runtime events: {err}"
477                ))
478            })?;
479            self.state.apply_persisted_commit_result(result);
480        }
481        Ok(())
482    }
483
484    async fn persist_plugin_operation_state_if_needed(
485        &mut self,
486    ) -> Result<(), PluginOperationInvokeError> {
487        let Some(store) = self
488            .session
489            .as_ref()
490            .and_then(|session| session.history_store())
491        else {
492            return Ok(());
493        };
494        let commit = crate::store::RuntimeCommit::persisted_state(&self.state, &[]);
495        let result = super::commit_runtime_state_with_fresh_session_execution_lease(
496            store,
497            commit,
498            &self.runtime_lease_owner,
499            self.host.core.control.lease_timings,
500            Arc::clone(&self.host.core.clock),
501        )
502        .await
503        .map_err(|err| {
504            PluginOperationInvokeError::Failed(format!(
505                "failed to persist plugin operation state: {err}"
506            ))
507        })?;
508        self.state.apply_persisted_commit_result(result);
509        Ok(())
510    }
511}