lash_core/runtime/
session_ops.rs1use std::sync::Arc;
7
8use crate::{PluginActionInvokeError, SessionError};
9
10use super::LashRuntime;
11use super::state::{RuntimeSessionState, append_session_nodes_to_state, normalize_session_graph};
12
13impl LashRuntime {
14 pub fn set_persisted_state(&mut self, state: RuntimeSessionState) -> Result<(), SessionError> {
16 let mut state = state;
17 normalize_session_graph(&mut state);
18 if let Some(session) = self.session.as_ref() {
19 session.invalidate_runtime_caches();
20 if let Some(tool_state) = state.tool_state_snapshot.clone() {
26 let report = session
27 .plugins()
28 .tool_registry()
29 .restore_state(tool_state)
30 .map_err(|err| SessionError::Protocol(err.to_string()))?;
31 if !report.orphaned.is_empty() {
32 tracing::warn!(
33 session_id = %state.session_id,
34 orphaned = ?report.orphaned,
35 "persisted state installed with orphaned tools: no registered \
36 source resolves them; they are Off until their source returns"
37 );
38 }
39 }
40 let snapshot = state.plugin_snapshot.clone().unwrap_or_default();
41 session
42 .plugins()
43 .restore(&snapshot)
44 .map_err(|err| SessionError::Protocol(err.to_string()))?;
45 state.plugin_snapshot_revision =
46 Some(session.plugins().snapshot_revision_fingerprint());
47 }
48 self.policy = state.policy.clone();
49 self.protocol_turn_options = state.protocol_turn_options.clone();
50 self.state = state;
51 Ok(())
52 }
53
54 pub async fn append_session_nodes(
55 &mut self,
56 request: crate::AppendSessionNodesRequest,
57 ) -> Result<crate::AppendSessionNodesResult, SessionError> {
58 self.refresh_session_graph_from_store().await?;
59 if let Some(required) = request.requires_ancestor_node_id.as_deref()
60 && !self.state.session_graph.active_path_contains(required)
61 {
62 return Ok(crate::AppendSessionNodesResult::StaleBranch {
63 current_leaf_node_id: self.state.session_graph.leaf_node_id.clone(),
64 });
65 }
66 let node_ids = append_session_nodes_to_state(&mut self.state, &request.nodes);
67 if let Some(session) = self.session.as_mut() {
68 let protocol_session = Arc::clone(session.plugins().protocol_session());
69 let session_id = self.state.session_id.clone();
70 protocol_session
71 .append_session_nodes(
72 crate::plugin::ProtocolSessionContext::new(session, &session_id),
73 &request.nodes,
74 )
75 .await?;
76 }
77 self.stamp_live_plugin_state();
78 if let Some(store) = self
79 .session
80 .as_ref()
81 .and_then(|session| session.history_store())
82 {
83 let graph = crate::store::GraphCommitDelta::Append {
84 nodes: node_ids
85 .iter()
86 .filter_map(|id| self.state.session_graph.find_node(id).cloned())
87 .collect(),
88 leaf_node_id: self.state.session_graph.leaf_node_id.clone(),
89 };
90 let commit = crate::store::RuntimeCommit::persisted_state_with_graph_commit(
91 &self.state,
92 graph,
93 &[],
94 );
95 match store.commit_runtime_state(commit).await {
96 Ok(result) => self.state.apply_persisted_commit_result(result),
97 Err(err) => tracing::warn!("failed to persist runtime state: {err}"),
98 }
99 }
100 Ok(crate::AppendSessionNodesResult::Appended {
101 node_ids,
102 leaf_node_id: self
103 .state
104 .session_graph
105 .leaf_node_id
106 .clone()
107 .unwrap_or_default(),
108 })
109 }
110
111 pub async fn apply_protocol_session_extension(
112 &mut self,
113 extension: crate::ProtocolSessionExtensionHandle,
114 ) -> Result<(), SessionError> {
115 let Some(session) = self.session.as_ref() else {
116 return Err(SessionError::Protocol(
117 "runtime session is not available".to_string(),
118 ));
119 };
120 let protocol_session = Arc::clone(session.plugins().protocol_session());
121 protocol_session.apply_session_extension(extension).await
122 }
123
124 pub async fn validate_protocol_turn_extension(
125 &mut self,
126 extension: &crate::ProtocolTurnExtensionHandle,
127 ) -> Result<(), SessionError> {
128 let Some(session) = self.session.as_ref() else {
129 return Err(SessionError::Protocol(
130 "runtime session is not available".to_string(),
131 ));
132 };
133 let protocol_session = Arc::clone(session.plugins().protocol_session());
134 protocol_session.validate_turn_extension(extension).await
135 }
136
137 pub async fn branch_to_node(
138 &mut self,
139 node_id: Option<String>,
140 ) -> Result<crate::SessionSnapshot, SessionError> {
141 let mut state = self.export_state();
142 state.session_graph.branch_to(node_id);
143 let mut persisted_state = RuntimeSessionState::from_snapshot(state);
144 normalize_session_graph(&mut persisted_state);
145
146 let policy = persisted_state.policy.clone();
147 let host = self.host.clone();
148 let services = self.services.clone();
149 let managed_sessions = Arc::clone(&self.managed_sessions);
150 let managed_turns = Arc::clone(&self.managed_turns);
151 let process_sync_needed = Arc::clone(&self.process_sync_needed);
152 let runtime_scope_id = Arc::clone(&self.runtime_scope_id);
153 let turn_phase_probe = self.turn_phase_probe.clone();
154
155 let mut rebuilt = Self::from_host_state(policy, host, services, persisted_state).await?;
156 rebuilt.managed_sessions = managed_sessions;
157 rebuilt.managed_turns = managed_turns;
158 rebuilt.process_sync_needed = process_sync_needed;
159 rebuilt.runtime_scope_id = runtime_scope_id;
160 rebuilt.turn_phase_probe = turn_phase_probe;
161
162 let exported = rebuilt.export_state();
163 *self = rebuilt;
164 Ok(exported)
165 }
166
167 pub async fn activate_managed_session(&mut self, session_id: &str) -> Result<(), SessionError> {
173 let child = {
174 let mut registry = self.managed_sessions.lock().await;
175 registry.remove(session_id).ok_or_else(|| {
176 SessionError::Protocol(format!("unknown managed session `{session_id}`"))
177 })?
178 };
179 let child = child.try_into_runtime().map_err(|_| {
180 SessionError::Protocol(format!("managed session `{session_id}` is still in use"))
181 })?;
182 *self = child;
183 Ok(())
184 }
185
186 pub async fn snapshot_execution_state(&mut self) -> Result<Option<Vec<u8>>, SessionError> {
188 let Some(session) = self.session.as_mut() else {
189 return Err(SessionError::Protocol(
190 "runtime session not available".to_string(),
191 ));
192 };
193 let code_executor = session
194 .plugins()
195 .code_executor()
196 .ok_or(SessionError::CodeExecutionUnavailable)?;
197 let session_id = self.state.session_id.clone();
198 let blob = code_executor
199 .snapshot_execution_state(crate::plugin::ProtocolSessionContext::new(
200 session,
201 &session_id,
202 ))
203 .await?;
204 self.state.execution_state_snapshot = blob.clone();
205 Ok(blob)
206 }
207
208 pub async fn restore_execution_state(&mut self, snapshot: &[u8]) -> Result<(), SessionError> {
210 let Some(session) = self.session.as_mut() else {
211 return Err(SessionError::Protocol(
212 "runtime session not available".to_string(),
213 ));
214 };
215 let code_executor = session
216 .plugins()
217 .code_executor()
218 .ok_or(SessionError::CodeExecutionUnavailable)?;
219 let session_id = self.state.session_id.clone();
220 code_executor
221 .restore_execution_state(
222 crate::plugin::ProtocolSessionContext::new(session, &session_id),
223 snapshot,
224 )
225 .await?;
226 self.state.execution_state_snapshot = Some(snapshot.to_vec());
227 Ok(())
228 }
229
230 pub async fn list_lashlang_trigger_registrations(
231 &self,
232 ) -> Result<Vec<crate::TriggerRegistration>, SessionError> {
233 let store = self.host.host_event_store.as_ref().ok_or_else(|| {
234 SessionError::Protocol("host event store is unavailable in this runtime".to_string())
235 })?;
236 let records = store
237 .list_subscriptions(crate::TriggerSubscriptionFilter::for_session(
238 self.state.session_id.clone(),
239 ))
240 .await
241 .map_err(|err| SessionError::Protocol(err.to_string()))?;
242 Ok(records
243 .iter()
244 .map(crate::TriggerRegistration::from)
245 .collect())
246 }
247
248 pub async fn lashlang_trigger_registrations_by_source_type(
249 &self,
250 source_type: impl Into<crate::TriggerSourceType>,
251 ) -> Result<Vec<crate::TriggerRegistration>, SessionError> {
252 let store = self.host.host_event_store.as_ref().ok_or_else(|| {
253 SessionError::Protocol("host event store is unavailable in this runtime".to_string())
254 })?;
255 let mut filter =
256 crate::TriggerSubscriptionFilter::for_session(self.state.session_id.clone());
257 filter.source_type = Some(source_type.into().to_string());
258 let records = store
259 .list_subscriptions(filter)
260 .await
261 .map_err(|err| SessionError::Protocol(err.to_string()))?;
262 Ok(records
263 .iter()
264 .map(crate::TriggerRegistration::from)
265 .collect())
266 }
267
268 pub async fn invoke_plugin_action(
269 &self,
270 name: &str,
271 args: serde_json::Value,
272 session_id: Option<String>,
273 ) -> Result<crate::ToolResult, PluginActionInvokeError> {
274 let manager = self.runtime_session_services()?;
275 let Some(session) = self.session.as_ref() else {
276 return Err(PluginActionInvokeError::Unknown(name.to_string()));
277 };
278 session
279 .plugins()
280 .invoke_plugin_action(
281 name,
282 args,
283 session_id,
284 true,
285 manager.state_service(),
286 manager.lifecycle_service(),
287 manager.graph_service(),
288 manager.process_service(),
289 )
290 .await
291 }
292}