lash_core/runtime/
session_ops.rs1use std::sync::Arc;
7
8use crate::{PluginActionInvokeError, SessionError};
9
10use super::LashRuntime;
11use super::state::{
12 PersistedSessionState, SessionStateEnvelope, append_session_nodes_to_state,
13 normalize_session_graph,
14};
15
16impl LashRuntime {
17 pub fn set_persisted_state(&mut self, state: PersistedSessionState) {
19 let mut state = state;
20 normalize_session_graph(&mut state);
21 if let Some(session) = self.session.as_ref() {
22 let snapshot = state.plugin_snapshot.clone().unwrap_or_default();
23 if let Err(err) = session.plugins().restore(&snapshot) {
24 tracing::warn!("failed to restore plugin snapshot in set_state: {err}");
25 }
26 state.plugin_snapshot_revision =
27 Some(session.plugins().snapshot_revision_fingerprint());
28 }
29 self.policy = state.policy.clone();
30 self.mode_turn_options = state.mode_turn_options.clone();
31 self.state = state;
32 }
33
34 pub async fn append_session_nodes(
35 &mut self,
36 request: crate::AppendSessionNodesRequest,
37 ) -> Result<crate::AppendSessionNodesResult, SessionError> {
38 self.refresh_session_graph_from_store().await;
39 if let Some(required) = request.requires_ancestor_node_id.as_deref()
40 && !self.state.session_graph.active_path_contains(required)
41 {
42 return Ok(crate::AppendSessionNodesResult::StaleBranch {
43 current_leaf_node_id: self.state.session_graph.leaf_node_id.clone(),
44 });
45 }
46 let node_ids = append_session_nodes_to_state(&mut self.state, &request.nodes);
47 if let Some(session) = self.session.as_mut() {
48 let mode_session = Arc::clone(session.plugins().mode_session());
49 let session_id = self.state.session_id.clone();
50 mode_session
51 .append_session_nodes(
52 crate::plugin::ModeSessionContext::new(session, &session_id),
53 &request.nodes,
54 )
55 .await?;
56 }
57 if let Some(store) = self
58 .session
59 .as_ref()
60 .and_then(|session| session.history_store())
61 {
62 let graph = crate::store::GraphCommitDelta::Append {
63 nodes: node_ids
64 .iter()
65 .filter_map(|id| self.state.session_graph.find_node(id).cloned())
66 .collect(),
67 leaf_node_id: self.state.session_graph.leaf_node_id.clone(),
68 };
69 let commit = crate::store::RuntimeCommit::persisted_state_with_graph_commit(
70 &self.state,
71 graph,
72 &[],
73 );
74 match store.commit_runtime_state(commit).await {
75 Ok(result) => self.state.apply_persisted_commit_result(result),
76 Err(err) => tracing::warn!("failed to persist runtime state: {err}"),
77 }
78 }
79 Ok(crate::AppendSessionNodesResult::Appended {
80 node_ids,
81 leaf_node_id: self
82 .state
83 .session_graph
84 .leaf_node_id
85 .clone()
86 .unwrap_or_default(),
87 })
88 }
89
90 pub async fn apply_mode_session_extension(
91 &mut self,
92 extension: crate::ModeSessionExtensionHandle,
93 ) -> Result<(), SessionError> {
94 let Some(session) = self.session.as_ref() else {
95 return Err(SessionError::Protocol(
96 "runtime session is not available".to_string(),
97 ));
98 };
99 let mode_session = Arc::clone(session.plugins().mode_session());
100 mode_session.apply_session_extension(extension).await
101 }
102
103 pub async fn validate_mode_turn_extension(
104 &mut self,
105 extension: &crate::ModeTurnExtensionHandle,
106 ) -> Result<(), SessionError> {
107 let Some(session) = self.session.as_ref() else {
108 return Err(SessionError::Protocol(
109 "runtime session is not available".to_string(),
110 ));
111 };
112 let mode_session = Arc::clone(session.plugins().mode_session());
113 mode_session.validate_turn_extension(extension).await
114 }
115
116 pub async fn branch_to_node(
117 &mut self,
118 node_id: Option<String>,
119 ) -> Result<SessionStateEnvelope, SessionError> {
120 let mut state = self.export_state();
121 state.session_graph.branch_to(node_id);
122 let mut persisted_state = PersistedSessionState::from_state(state);
123 normalize_session_graph(&mut persisted_state);
124
125 let policy = persisted_state.policy.clone();
126 let host = self.host.clone();
127 let services = self.services.clone();
128 let managed_sessions = Arc::clone(&self.managed_sessions);
129 let active_handoff_continuations = Arc::clone(&self.active_handoff_continuations);
130 let managed_turns = Arc::clone(&self.managed_turns);
131 let background_sync_needed = Arc::clone(&self.background_sync_needed);
132 let runtime_scope_id = Arc::clone(&self.runtime_scope_id);
133 let turn_phase_probe = self.turn_phase_probe.clone();
134
135 let mut rebuilt = Self::from_host_state(policy, host, services, persisted_state).await?;
136 rebuilt.managed_sessions = managed_sessions;
137 rebuilt.active_handoff_continuations = active_handoff_continuations;
138 rebuilt.managed_turns = managed_turns;
139 rebuilt.background_sync_needed = background_sync_needed;
140 rebuilt.runtime_scope_id = runtime_scope_id;
141 rebuilt.turn_phase_probe = turn_phase_probe;
142
143 let exported = rebuilt.export_state();
144 *self = rebuilt;
145 Ok(exported)
146 }
147
148 pub async fn activate_managed_session(&mut self, session_id: &str) -> Result<(), SessionError> {
154 let child = {
155 let mut registry = self.managed_sessions.lock().await;
156 registry.remove(session_id).ok_or_else(|| {
157 SessionError::Protocol(format!("unknown managed session `{session_id}`"))
158 })?
159 };
160 let child = child.try_into_runtime().map_err(|_| {
161 SessionError::Protocol(format!("managed session `{session_id}` is still in use"))
162 })?;
163 *self = child;
164 Ok(())
165 }
166
167 pub async fn reset_session(&mut self) -> Result<(), SessionError> {
169 let Some(session) = self.session.as_mut() else {
170 return Err(SessionError::Protocol(
171 "runtime session not available".to_string(),
172 ));
173 };
174 session.reset().await
175 }
176
177 pub async fn snapshot_execution_state(&mut self) -> Result<Option<Vec<u8>>, SessionError> {
179 let Some(session) = self.session.as_mut() else {
180 return Err(SessionError::Protocol(
181 "runtime session not available".to_string(),
182 ));
183 };
184 let mode_session = Arc::clone(session.plugins().mode_session());
185 let session_id = self.state.session_id.clone();
186 let blob = mode_session
187 .snapshot_execution_state(crate::plugin::ModeSessionContext::new(session, &session_id))
188 .await?;
189 self.state.execution_state_snapshot = blob.clone();
190 Ok(blob)
191 }
192
193 pub async fn restore_execution_state(&mut self, snapshot: &[u8]) -> Result<(), SessionError> {
195 let Some(session) = self.session.as_mut() else {
196 return Err(SessionError::Protocol(
197 "runtime session not available".to_string(),
198 ));
199 };
200 let mode_session = Arc::clone(session.plugins().mode_session());
201 let session_id = self.state.session_id.clone();
202 mode_session
203 .restore_execution_state(
204 crate::plugin::ModeSessionContext::new(session, &session_id),
205 snapshot,
206 )
207 .await?;
208 self.state.execution_state_snapshot = Some(snapshot.to_vec());
209 Ok(())
210 }
211
212 pub async fn invoke_plugin_action(
213 &self,
214 name: &str,
215 args: serde_json::Value,
216 session_id: Option<String>,
217 ) -> Result<crate::ToolResult, PluginActionInvokeError> {
218 let manager = self.runtime_session_manager()?;
219 let Some(session) = self.session.as_ref() else {
220 return Err(PluginActionInvokeError::Unknown(name.to_string()));
221 };
222 session
223 .plugins()
224 .invoke_plugin_action(name, args, session_id, true, manager)
225 .await
226 }
227}