1use std::sync::Arc;
7
8use crate::{PluginOperationInvokeError, SessionError};
9
10use super::LashRuntime;
11use super::state::{
12 RuntimeSessionState, append_session_nodes_to_state_with_clock, normalize_session_graph,
13};
14
15impl LashRuntime {
16 pub fn set_persisted_state(&mut self, state: RuntimeSessionState) -> Result<(), SessionError> {
18 let mut state = state;
19 normalize_session_graph(&mut state);
20 if let Some(session) = self.session.as_ref() {
21 session.invalidate_runtime_caches();
22 if let Some(tool_state) = state.tool_state_snapshot.clone() {
28 let report = session
29 .plugins()
30 .tool_registry()
31 .restore_state(tool_state)
32 .map_err(|err| SessionError::Protocol(err.to_string()))?;
33 if !report.orphaned.is_empty() {
34 tracing::warn!(
35 session_id = %state.session_id,
36 orphaned = ?report.orphaned,
37 "persisted state installed with orphaned tools: no registered \
38 source resolves them; they remain non-members until their source returns"
39 );
40 }
41 }
42 let snapshot = state.plugin_snapshot.clone().unwrap_or_default();
43 session
44 .plugins()
45 .restore(&snapshot)
46 .map_err(|err| SessionError::Protocol(err.to_string()))?;
47 state.plugin_snapshot_revision =
48 Some(session.plugins().snapshot_revision_fingerprint());
49 }
50 self.policy = state.policy.clone();
51 self.protocol_turn_options = state.protocol_turn_options.clone();
52 self.state = state;
53 Ok(())
54 }
55
56 pub async fn append_session_nodes(
57 &mut self,
58 request: crate::AppendSessionNodesRequest,
59 ) -> Result<crate::AppendSessionNodesResult, SessionError> {
60 self.refresh_session_graph_from_store().await?;
61 if let Some(required) = request.requires_ancestor_node_id.as_deref()
62 && !self.state.session_graph.active_path_contains(required)
63 {
64 return Ok(crate::AppendSessionNodesResult::StaleBranch {
65 current_leaf_node_id: self.state.session_graph.leaf_node_id.clone(),
66 });
67 }
68 let node_ids = append_session_nodes_to_state_with_clock(
69 &mut self.state,
70 &request.nodes,
71 self.host.core.clock.as_ref(),
72 );
73 if let Some(session) = self.session.as_mut() {
74 let protocol_session = Arc::clone(session.plugins().protocol_session());
75 let session_id = self.state.session_id.clone();
76 protocol_session
77 .append_session_nodes(
78 crate::plugin::ProtocolSessionContext::new(session, &session_id),
79 &request.nodes,
80 )
81 .await?;
82 }
83 self.stamp_live_plugin_state();
84 if let Some(store) = self
85 .session
86 .as_ref()
87 .and_then(|session| session.history_store())
88 {
89 let graph = crate::store::GraphCommitDelta::Append {
90 nodes: node_ids
91 .iter()
92 .filter_map(|id| self.state.session_graph.find_node(id).cloned())
93 .collect(),
94 leaf_node_id: self.state.session_graph.leaf_node_id.clone(),
95 };
96 let commit = crate::store::RuntimeCommit::persisted_state_with_graph_commit(
97 &self.state,
98 graph,
99 &[],
100 );
101 match super::commit_runtime_state_with_fresh_session_execution_lease(
102 store,
103 commit,
104 &self.runtime_lease_owner,
105 Arc::clone(&self.host.core.clock),
106 )
107 .await
108 {
109 Ok(result) => self.state.apply_persisted_commit_result(result),
110 Err(err) => tracing::warn!("failed to persist runtime state: {err}"),
111 }
112 }
113 Ok(crate::AppendSessionNodesResult::Appended {
114 node_ids,
115 leaf_node_id: self
116 .state
117 .session_graph
118 .leaf_node_id
119 .clone()
120 .unwrap_or_default(),
121 })
122 }
123
124 pub async fn apply_protocol_session_extension(
125 &mut self,
126 extension: crate::ProtocolSessionExtensionHandle,
127 ) -> Result<(), SessionError> {
128 let Some(session) = self.session.as_ref() else {
129 return Err(SessionError::Protocol(
130 "runtime session is not available".to_string(),
131 ));
132 };
133 let protocol_session = Arc::clone(session.plugins().protocol_session());
134 protocol_session.apply_session_extension(extension).await
135 }
136
137 pub async fn validate_protocol_turn_extension(
138 &mut self,
139 extension: &crate::ProtocolTurnExtensionHandle,
140 ) -> Result<(), SessionError> {
141 let Some(session) = self.session.as_ref() else {
142 return Err(SessionError::Protocol(
143 "runtime session is not available".to_string(),
144 ));
145 };
146 let protocol_session = Arc::clone(session.plugins().protocol_session());
147 protocol_session.validate_turn_extension(extension).await
148 }
149
150 pub async fn branch_to_node(
151 &mut self,
152 node_id: Option<String>,
153 ) -> Result<crate::SessionSnapshot, SessionError> {
154 let mut state = self.export_state();
155 state.session_graph.branch_to(node_id);
156 let mut persisted_state = RuntimeSessionState::from_snapshot(state);
157 normalize_session_graph(&mut persisted_state);
158
159 let policy = persisted_state.policy.clone();
160 let host = self.host.clone();
161 let services = self.services.clone();
162 let managed_sessions = Arc::clone(&self.managed_sessions);
163 let managed_turns = Arc::clone(&self.managed_turns);
164 let process_sync_needed = Arc::clone(&self.process_sync_needed);
165 let runtime_scope_id = Arc::clone(&self.runtime_scope_id);
166 let runtime_lease_owner = self.runtime_lease_owner.clone();
167 let turn_phase_probe = self.turn_phase_probe.clone();
168
169 let mut rebuilt = Self::from_host_state(policy, host, services, persisted_state).await?;
170 rebuilt.managed_sessions = managed_sessions;
171 rebuilt.managed_turns = managed_turns;
172 rebuilt.process_sync_needed = process_sync_needed;
173 rebuilt.runtime_scope_id = runtime_scope_id;
174 rebuilt.runtime_lease_owner = runtime_lease_owner;
175 rebuilt.turn_phase_probe = turn_phase_probe;
176
177 let exported = rebuilt.export_state();
178 *self = rebuilt;
179 Ok(exported)
180 }
181
182 pub async fn activate_managed_session(&mut self, session_id: &str) -> Result<(), SessionError> {
188 let child = {
189 let mut registry = self.managed_sessions.lock().await;
190 registry.remove(session_id).ok_or_else(|| {
191 SessionError::Protocol(format!("unknown managed session `{session_id}`"))
192 })?
193 };
194 let child = child.try_into_runtime().map_err(|_| {
195 SessionError::Protocol(format!("managed session `{session_id}` is still in use"))
196 })?;
197 *self = child;
198 Ok(())
199 }
200
201 pub async fn snapshot_execution_state(&mut self) -> Result<Option<Vec<u8>>, SessionError> {
203 let Some(session) = self.session.as_mut() else {
204 return Err(SessionError::Protocol(
205 "runtime session not available".to_string(),
206 ));
207 };
208 let code_executor = session
209 .plugins()
210 .code_executor()
211 .ok_or(SessionError::CodeExecutionUnavailable)?;
212 let session_id = self.state.session_id.clone();
213 let blob = code_executor
214 .snapshot_execution_state(crate::plugin::ProtocolSessionContext::new(
215 session,
216 &session_id,
217 ))
218 .await?;
219 self.state.execution_state_snapshot = blob.clone();
220 Ok(blob)
221 }
222
223 pub async fn restore_execution_state(&mut self, snapshot: &[u8]) -> Result<(), SessionError> {
225 let Some(session) = self.session.as_mut() else {
226 return Err(SessionError::Protocol(
227 "runtime session not available".to_string(),
228 ));
229 };
230 let code_executor = session
231 .plugins()
232 .code_executor()
233 .ok_or(SessionError::CodeExecutionUnavailable)?;
234 let session_id = self.state.session_id.clone();
235 code_executor
236 .restore_execution_state(
237 crate::plugin::ProtocolSessionContext::new(session, &session_id),
238 snapshot,
239 )
240 .await?;
241 self.state.execution_state_snapshot = Some(snapshot.to_vec());
242 Ok(())
243 }
244
245 pub async fn list_trigger_registrations(
246 &self,
247 ) -> Result<Vec<crate::TriggerRegistration>, SessionError> {
248 let store = self.host.trigger_store.as_ref().ok_or_else(|| {
249 SessionError::Protocol("trigger store is unavailable in this runtime".to_string())
250 })?;
251 let records = store
252 .list_subscriptions(crate::TriggerSubscriptionFilter::for_session(
253 self.state.session_id.clone(),
254 ))
255 .await
256 .map_err(|err| SessionError::Protocol(err.to_string()))?;
257 Ok(records
258 .iter()
259 .map(crate::TriggerRegistration::from)
260 .collect())
261 }
262
263 pub async fn trigger_registrations_by_source_type(
264 &self,
265 source_type: impl Into<crate::TriggerEventType>,
266 ) -> Result<Vec<crate::TriggerRegistration>, SessionError> {
267 let store = self.host.trigger_store.as_ref().ok_or_else(|| {
268 SessionError::Protocol("trigger store is unavailable in this runtime".to_string())
269 })?;
270 let mut filter =
271 crate::TriggerSubscriptionFilter::for_session(self.state.session_id.clone());
272 filter.source_type = Some(source_type.into().to_string());
273 let records = store
274 .list_subscriptions(filter)
275 .await
276 .map_err(|err| SessionError::Protocol(err.to_string()))?;
277 Ok(records
278 .iter()
279 .map(crate::TriggerRegistration::from)
280 .collect())
281 }
282
283 pub async fn query_plugin(
284 &self,
285 name: &str,
286 args: serde_json::Value,
287 session_id: Option<String>,
288 ) -> Result<(String, serde_json::Value), PluginOperationInvokeError> {
289 let manager = self.runtime_session_services()?;
290 let Some(session) = self.session.as_ref() else {
291 return Err(PluginOperationInvokeError::Unknown(
292 "runtime session not available".to_string(),
293 ));
294 };
295 session
296 .plugins()
297 .query_plugin(
298 name,
299 args,
300 session_id,
301 true,
302 manager.read_service(),
303 manager.process_read_service(),
304 )
305 .await
306 }
307
308 pub async fn run_plugin_command(
309 &mut self,
310 name: &str,
311 args: serde_json::Value,
312 session_id: Option<String>,
313 ) -> Result<crate::PluginCommandReceipt<serde_json::Value>, PluginOperationInvokeError> {
314 let manager = self.runtime_session_services()?;
315 let Some(session) = self.session.as_ref() else {
316 return Err(PluginOperationInvokeError::Unknown(
317 "runtime session not available".to_string(),
318 ));
319 };
320 let (plugin_id, outcome) = session
321 .plugins()
322 .run_plugin_command(
323 name,
324 args,
325 session_id,
326 true,
327 manager.state_service(),
328 manager.lifecycle_service(),
329 manager.graph_service(),
330 manager.process_service(),
331 )
332 .await?;
333 let (events, pending_turn_inputs) = self
334 .apply_plugin_operation_effects(&plugin_id, outcome.events, outcome.directives)
335 .await?;
336 Ok(crate::PluginCommandReceipt {
337 output: outcome.output,
338 events,
339 pending_turn_inputs,
340 })
341 }
342
343 pub async fn run_plugin_task(
344 &mut self,
345 name: &str,
346 args: serde_json::Value,
347 session_id: Option<String>,
348 scoped_effect_controller: crate::ScopedEffectController<'static>,
349 cancellation_token: tokio_util::sync::CancellationToken,
350 ) -> Result<crate::PluginTaskReceipt<serde_json::Value>, PluginOperationInvokeError> {
351 let manager = self.runtime_session_services()?;
352 let Some(session) = self.session.as_ref() else {
353 return Err(PluginOperationInvokeError::Unknown(
354 "runtime session not available".to_string(),
355 ));
356 };
357 let (plugin_id, outcome) = session
358 .plugins()
359 .run_plugin_task(
360 name,
361 args,
362 session_id,
363 true,
364 manager.state_service(),
365 manager.lifecycle_service(),
366 manager.graph_service(),
367 manager.process_service(),
368 scoped_effect_controller,
369 cancellation_token,
370 )
371 .await?;
372 let (events, pending_turn_inputs) = self
373 .apply_plugin_operation_effects(&plugin_id, outcome.events, outcome.directives)
374 .await?;
375 Ok(crate::PluginTaskReceipt {
376 output: outcome.output,
377 events,
378 pending_turn_inputs,
379 })
380 }
381
382 async fn apply_plugin_operation_effects(
383 &mut self,
384 plugin_id: &str,
385 events: Vec<crate::PluginRuntimeEvent>,
386 directives: Vec<crate::PluginRuntimeDirective>,
387 ) -> Result<
388 (
389 Vec<crate::PluginOwned<crate::PluginRuntimeEvent>>,
390 Vec<crate::PendingTurnInput>,
391 ),
392 PluginOperationInvokeError,
393 > {
394 let owned_events = events
395 .into_iter()
396 .map(|event| crate::PluginOwned {
397 plugin_id: plugin_id.to_string(),
398 value: event,
399 })
400 .collect::<Vec<_>>();
401 if !owned_events.is_empty() {
402 let nodes = owned_events
403 .iter()
404 .map(|owned| {
405 crate::plugin_runtime_protocol_event(&owned.plugin_id, owned.value.clone())
406 .map(crate::SessionAppendNode::protocol_event)
407 .map_err(|err| {
408 PluginOperationInvokeError::Failed(format!(
409 "failed to encode plugin runtime event: {err}"
410 ))
411 })
412 })
413 .collect::<Result<Vec<_>, _>>()?;
414 self.append_plugin_runtime_event_nodes(&nodes).await?;
415 }
416 self.stamp_live_plugin_state();
417 self.persist_plugin_operation_state_if_needed().await?;
418
419 let mut pending_turn_inputs = Vec::new();
420 for directive in directives {
421 match directive {
422 crate::PluginRuntimeDirective::QueueTurn { input, source_key } => {
423 let pending = self
424 .enqueue_turn_input(input, crate::TurnInputIngress::NextTurn, source_key)
425 .await
426 .map_err(|err| {
427 PluginOperationInvokeError::Failed(format!(
428 "failed to queue plugin turn request: {err}"
429 ))
430 })?;
431 pending_turn_inputs.push(pending);
432 }
433 }
434 }
435
436 Ok((owned_events, pending_turn_inputs))
437 }
438
439 async fn append_plugin_runtime_event_nodes(
440 &mut self,
441 nodes: &[crate::SessionAppendNode],
442 ) -> Result<(), PluginOperationInvokeError> {
443 let node_ids = append_session_nodes_to_state_with_clock(
444 &mut self.state,
445 nodes,
446 self.host.core.clock.as_ref(),
447 );
448 if let Some(store) = self
449 .session
450 .as_ref()
451 .and_then(|session| session.history_store())
452 {
453 let graph = crate::store::GraphCommitDelta::Append {
454 nodes: node_ids
455 .iter()
456 .filter_map(|id| self.state.session_graph.find_node(id).cloned())
457 .collect(),
458 leaf_node_id: self.state.session_graph.leaf_node_id.clone(),
459 };
460 let commit = crate::store::RuntimeCommit::persisted_state_with_graph_commit(
461 &self.state,
462 graph,
463 &[],
464 );
465 let result = super::commit_runtime_state_with_fresh_session_execution_lease(
466 store,
467 commit,
468 &self.runtime_lease_owner,
469 Arc::clone(&self.host.core.clock),
470 )
471 .await
472 .map_err(|err| {
473 PluginOperationInvokeError::Failed(format!(
474 "failed to persist plugin runtime events: {err}"
475 ))
476 })?;
477 self.state.apply_persisted_commit_result(result);
478 }
479 Ok(())
480 }
481
482 async fn persist_plugin_operation_state_if_needed(
483 &mut self,
484 ) -> Result<(), PluginOperationInvokeError> {
485 let Some(store) = self
486 .session
487 .as_ref()
488 .and_then(|session| session.history_store())
489 else {
490 return Ok(());
491 };
492 let commit = crate::store::RuntimeCommit::persisted_state(&self.state, &[]);
493 let result = super::commit_runtime_state_with_fresh_session_execution_lease(
494 store,
495 commit,
496 &self.runtime_lease_owner,
497 Arc::clone(&self.host.core.clock),
498 )
499 .await
500 .map_err(|err| {
501 PluginOperationInvokeError::Failed(format!(
502 "failed to persist plugin operation state: {err}"
503 ))
504 })?;
505 self.state.apply_persisted_commit_result(result);
506 Ok(())
507 }
508}