Skip to main content

lash_core/runtime/
session_ops.rs

1//! `LashRuntime` session-graph and execution-state operations.
2//!
3//! Extracted from `runtime/mod.rs`. This file re-opens `impl LashRuntime`;
4//! no types live here and no public API is changed.
5
6use std::sync::Arc;
7
8use crate::{PluginActionInvokeError, SessionError};
9
10use super::LashRuntime;
11use super::state::{RuntimeSessionState, append_session_nodes_to_state, normalize_session_graph};
12
13impl LashRuntime {
14    /// Replace the host-owned state envelope.
15    pub fn set_persisted_state(&mut self, state: RuntimeSessionState) -> Result<(), SessionError> {
16        let mut state = state;
17        normalize_session_graph(&mut state);
18        if let Some(session) = self.session.as_ref() {
19            session.invalidate_runtime_caches();
20            // Restore the persisted tool surface so the live registry matches the
21            // state being installed (mirrors `from_host_state`). Without this the
22            // registry keeps its prior generation/tools and silently diverges from
23            // `state`. `restore_state` adopts the snapshot's generation, so a
24            // surface that reached generation >= 2 restores cleanly.
25            if let Some(tool_state) = state.tool_state_snapshot.clone() {
26                let report = session
27                    .plugins()
28                    .tool_registry()
29                    .restore_state(tool_state)
30                    .map_err(|err| SessionError::Protocol(err.to_string()))?;
31                if !report.orphaned.is_empty() {
32                    tracing::warn!(
33                        session_id = %state.session_id,
34                        orphaned = ?report.orphaned,
35                        "persisted state installed with orphaned tools: no registered \
36                         source resolves them; they are Off until their source returns"
37                    );
38                }
39            }
40            let snapshot = state.plugin_snapshot.clone().unwrap_or_default();
41            session
42                .plugins()
43                .restore(&snapshot)
44                .map_err(|err| SessionError::Protocol(err.to_string()))?;
45            state.plugin_snapshot_revision =
46                Some(session.plugins().snapshot_revision_fingerprint());
47        }
48        self.policy = state.policy.clone();
49        self.protocol_turn_options = state.protocol_turn_options.clone();
50        self.state = state;
51        Ok(())
52    }
53
54    pub async fn append_session_nodes(
55        &mut self,
56        request: crate::AppendSessionNodesRequest,
57    ) -> Result<crate::AppendSessionNodesResult, SessionError> {
58        self.refresh_session_graph_from_store().await?;
59        if let Some(required) = request.requires_ancestor_node_id.as_deref()
60            && !self.state.session_graph.active_path_contains(required)
61        {
62            return Ok(crate::AppendSessionNodesResult::StaleBranch {
63                current_leaf_node_id: self.state.session_graph.leaf_node_id.clone(),
64            });
65        }
66        let node_ids = append_session_nodes_to_state(&mut self.state, &request.nodes);
67        if let Some(session) = self.session.as_mut() {
68            let protocol_session = Arc::clone(session.plugins().protocol_session());
69            let session_id = self.state.session_id.clone();
70            protocol_session
71                .append_session_nodes(
72                    crate::plugin::ProtocolSessionContext::new(session, &session_id),
73                    &request.nodes,
74                )
75                .await?;
76        }
77        self.stamp_live_plugin_state();
78        if let Some(store) = self
79            .session
80            .as_ref()
81            .and_then(|session| session.history_store())
82        {
83            let graph = crate::store::GraphCommitDelta::Append {
84                nodes: node_ids
85                    .iter()
86                    .filter_map(|id| self.state.session_graph.find_node(id).cloned())
87                    .collect(),
88                leaf_node_id: self.state.session_graph.leaf_node_id.clone(),
89            };
90            let commit = crate::store::RuntimeCommit::persisted_state_with_graph_commit(
91                &self.state,
92                graph,
93                &[],
94            );
95            match store.commit_runtime_state(commit).await {
96                Ok(result) => self.state.apply_persisted_commit_result(result),
97                Err(err) => tracing::warn!("failed to persist runtime state: {err}"),
98            }
99        }
100        Ok(crate::AppendSessionNodesResult::Appended {
101            node_ids,
102            leaf_node_id: self
103                .state
104                .session_graph
105                .leaf_node_id
106                .clone()
107                .unwrap_or_default(),
108        })
109    }
110
111    pub async fn apply_protocol_session_extension(
112        &mut self,
113        extension: crate::ProtocolSessionExtensionHandle,
114    ) -> Result<(), SessionError> {
115        let Some(session) = self.session.as_ref() else {
116            return Err(SessionError::Protocol(
117                "runtime session is not available".to_string(),
118            ));
119        };
120        let protocol_session = Arc::clone(session.plugins().protocol_session());
121        protocol_session.apply_session_extension(extension).await
122    }
123
124    pub async fn validate_protocol_turn_extension(
125        &mut self,
126        extension: &crate::ProtocolTurnExtensionHandle,
127    ) -> Result<(), SessionError> {
128        let Some(session) = self.session.as_ref() else {
129            return Err(SessionError::Protocol(
130                "runtime session is not available".to_string(),
131            ));
132        };
133        let protocol_session = Arc::clone(session.plugins().protocol_session());
134        protocol_session.validate_turn_extension(extension).await
135    }
136
137    pub async fn branch_to_node(
138        &mut self,
139        node_id: Option<String>,
140    ) -> Result<crate::SessionSnapshot, SessionError> {
141        let mut state = self.export_state();
142        state.session_graph.branch_to(node_id);
143        let mut persisted_state = RuntimeSessionState::from_snapshot(state);
144        normalize_session_graph(&mut persisted_state);
145
146        let policy = persisted_state.policy.clone();
147        let host = self.host.clone();
148        let services = self.services.clone();
149        let managed_sessions = Arc::clone(&self.managed_sessions);
150        let managed_turns = Arc::clone(&self.managed_turns);
151        let process_sync_needed = Arc::clone(&self.process_sync_needed);
152        let runtime_scope_id = Arc::clone(&self.runtime_scope_id);
153        let turn_phase_probe = self.turn_phase_probe.clone();
154
155        let mut rebuilt = Self::from_host_state(policy, host, services, persisted_state).await?;
156        rebuilt.managed_sessions = managed_sessions;
157        rebuilt.managed_turns = managed_turns;
158        rebuilt.process_sync_needed = process_sync_needed;
159        rebuilt.runtime_scope_id = runtime_scope_id;
160        rebuilt.turn_phase_probe = turn_phase_probe;
161
162        let exported = rebuilt.export_state();
163        *self = rebuilt;
164        Ok(exported)
165    }
166
167    /// Promote a managed child session into the foreground runtime.
168    ///
169    /// Child sessions created through `SessionLifecycleService::create_session` are real
170    /// runtimes, not serialized placeholders. Foreground activation must therefore
171    /// claim that runtime instead of reconstructing a new empty state in the UI.
172    pub async fn activate_managed_session(&mut self, session_id: &str) -> Result<(), SessionError> {
173        let child = {
174            let mut registry = self.managed_sessions.lock().await;
175            registry.remove(session_id).ok_or_else(|| {
176                SessionError::Protocol(format!("unknown managed session `{session_id}`"))
177            })?
178        };
179        let child = child.try_into_runtime().map_err(|_| {
180            SessionError::Protocol(format!("managed session `{session_id}` is still in use"))
181        })?;
182        *self = child;
183        Ok(())
184    }
185
186    /// Explicitly snapshot protocol-local execution state, if any.
187    pub async fn snapshot_execution_state(&mut self) -> Result<Option<Vec<u8>>, SessionError> {
188        let Some(session) = self.session.as_mut() else {
189            return Err(SessionError::Protocol(
190                "runtime session not available".to_string(),
191            ));
192        };
193        let code_executor = session
194            .plugins()
195            .code_executor()
196            .ok_or(SessionError::CodeExecutionUnavailable)?;
197        let session_id = self.state.session_id.clone();
198        let blob = code_executor
199            .snapshot_execution_state(crate::plugin::ProtocolSessionContext::new(
200                session,
201                &session_id,
202            ))
203            .await?;
204        self.state.execution_state_snapshot = blob.clone();
205        Ok(blob)
206    }
207
208    /// Explicitly restore protocol-local execution state from an opaque snapshot blob.
209    pub async fn restore_execution_state(&mut self, snapshot: &[u8]) -> Result<(), SessionError> {
210        let Some(session) = self.session.as_mut() else {
211            return Err(SessionError::Protocol(
212                "runtime session not available".to_string(),
213            ));
214        };
215        let code_executor = session
216            .plugins()
217            .code_executor()
218            .ok_or(SessionError::CodeExecutionUnavailable)?;
219        let session_id = self.state.session_id.clone();
220        code_executor
221            .restore_execution_state(
222                crate::plugin::ProtocolSessionContext::new(session, &session_id),
223                snapshot,
224            )
225            .await?;
226        self.state.execution_state_snapshot = Some(snapshot.to_vec());
227        Ok(())
228    }
229
230    pub async fn list_lashlang_trigger_registrations(
231        &self,
232    ) -> Result<Vec<crate::TriggerRegistration>, SessionError> {
233        let store = self.host.host_event_store.as_ref().ok_or_else(|| {
234            SessionError::Protocol("host event store is unavailable in this runtime".to_string())
235        })?;
236        let records = store
237            .list_subscriptions(crate::TriggerSubscriptionFilter::for_session(
238                self.state.session_id.clone(),
239            ))
240            .await
241            .map_err(|err| SessionError::Protocol(err.to_string()))?;
242        Ok(records
243            .iter()
244            .map(crate::TriggerRegistration::from)
245            .collect())
246    }
247
248    pub async fn lashlang_trigger_registrations_by_source_type(
249        &self,
250        source_type: impl Into<crate::TriggerSourceType>,
251    ) -> Result<Vec<crate::TriggerRegistration>, SessionError> {
252        let store = self.host.host_event_store.as_ref().ok_or_else(|| {
253            SessionError::Protocol("host event store is unavailable in this runtime".to_string())
254        })?;
255        let mut filter =
256            crate::TriggerSubscriptionFilter::for_session(self.state.session_id.clone());
257        filter.source_type = Some(source_type.into().to_string());
258        let records = store
259            .list_subscriptions(filter)
260            .await
261            .map_err(|err| SessionError::Protocol(err.to_string()))?;
262        Ok(records
263            .iter()
264            .map(crate::TriggerRegistration::from)
265            .collect())
266    }
267
268    pub async fn invoke_plugin_action(
269        &self,
270        name: &str,
271        args: serde_json::Value,
272        session_id: Option<String>,
273    ) -> Result<crate::ToolResult, PluginActionInvokeError> {
274        let manager = self.runtime_session_services()?;
275        let Some(session) = self.session.as_ref() else {
276            return Err(PluginActionInvokeError::Unknown(name.to_string()));
277        };
278        session
279            .plugins()
280            .invoke_plugin_action(
281                name,
282                args,
283                session_id,
284                true,
285                manager.state_service(),
286                manager.lifecycle_service(),
287                manager.graph_service(),
288                manager.process_service(),
289            )
290            .await
291    }
292}