1use std::sync::Arc;
7
8use crate::{PluginOperationInvokeError, SessionError};
9
10use super::LashRuntime;
11use super::state::{
12 RuntimeSessionState, append_session_nodes_to_state_with_clock, normalize_session_graph,
13};
14
15impl LashRuntime {
16 pub fn set_persisted_state(&mut self, state: RuntimeSessionState) -> Result<(), SessionError> {
18 let mut state = state;
19 normalize_session_graph(&mut state);
20 if let Some(session) = self.session.as_ref() {
21 session.invalidate_runtime_caches();
22 if let Some(tool_state) = state.tool_state_snapshot.clone() {
28 let report = session
29 .plugins()
30 .tool_registry()
31 .restore_state(tool_state)
32 .map_err(|err| SessionError::Protocol(err.to_string()))?;
33 if !report.orphaned.is_empty() {
34 tracing::warn!(
35 session_id = %state.session_id,
36 orphaned = ?report.orphaned,
37 "persisted state installed with orphaned tools: no registered \
38 source resolves them; they remain non-members until their source returns"
39 );
40 }
41 }
42 let snapshot = state.plugin_snapshot.clone().unwrap_or_default();
43 session
44 .plugins()
45 .restore(&snapshot)
46 .map_err(|err| SessionError::Protocol(err.to_string()))?;
47 state.plugin_snapshot_revision =
48 Some(session.plugins().snapshot_revision_fingerprint());
49 }
50 self.policy = state.policy.clone();
51 self.protocol_turn_options = state.protocol_turn_options.clone();
52 self.state = state;
53 Ok(())
54 }
55
56 pub async fn append_session_nodes(
57 &mut self,
58 request: crate::AppendSessionNodesRequest,
59 ) -> Result<crate::AppendSessionNodesResult, SessionError> {
60 self.refresh_session_graph_from_store().await?;
61 if let Some(required) = request.requires_ancestor_node_id.as_deref()
62 && !self.state.session_graph.active_path_contains(required)
63 {
64 return Ok(crate::AppendSessionNodesResult::StaleBranch {
65 current_leaf_node_id: self.state.session_graph.leaf_node_id.clone(),
66 });
67 }
68 let node_ids = append_session_nodes_to_state_with_clock(
69 &mut self.state,
70 &request.nodes,
71 self.host.core.clock.as_ref(),
72 );
73 if let Some(session) = self.session.as_mut() {
74 let protocol_session = Arc::clone(session.plugins().protocol_session());
75 let session_id = self.state.session_id.clone();
76 protocol_session
77 .append_session_nodes(
78 crate::plugin::ProtocolSessionContext::new(session, &session_id),
79 &request.nodes,
80 )
81 .await?;
82 }
83 self.stamp_live_plugin_state();
84 if let Some(store) = self
85 .session
86 .as_ref()
87 .and_then(|session| session.history_store())
88 {
89 let graph = crate::store::GraphCommitDelta::Append {
90 nodes: node_ids
91 .iter()
92 .filter_map(|id| self.state.session_graph.find_node(id).cloned())
93 .collect(),
94 leaf_node_id: self.state.session_graph.leaf_node_id.clone(),
95 };
96 let commit = crate::store::RuntimeCommit::persisted_state_with_graph_commit(
97 &self.state,
98 graph,
99 &[],
100 );
101 match super::commit_runtime_state_with_fresh_session_execution_lease(
102 store,
103 commit,
104 &self.runtime_lease_owner,
105 self.host.core.control.lease_timings,
106 Arc::clone(&self.host.core.clock),
107 )
108 .await
109 {
110 Ok(result) => self.state.apply_persisted_commit_result(result),
111 Err(err) => tracing::warn!("failed to persist runtime state: {err}"),
112 }
113 }
114 Ok(crate::AppendSessionNodesResult::Appended {
115 node_ids,
116 leaf_node_id: self
117 .state
118 .session_graph
119 .leaf_node_id
120 .clone()
121 .unwrap_or_default(),
122 })
123 }
124
125 pub async fn apply_protocol_session_extension(
126 &mut self,
127 extension: crate::ProtocolSessionExtensionHandle,
128 ) -> Result<(), SessionError> {
129 let Some(session) = self.session.as_ref() else {
130 return Err(SessionError::Protocol(
131 "runtime session is not available".to_string(),
132 ));
133 };
134 let protocol_session = Arc::clone(session.plugins().protocol_session());
135 protocol_session.apply_session_extension(extension).await
136 }
137
138 pub async fn validate_protocol_turn_extension(
139 &mut self,
140 extension: &crate::ProtocolTurnExtensionHandle,
141 ) -> Result<(), SessionError> {
142 let Some(session) = self.session.as_ref() else {
143 return Err(SessionError::Protocol(
144 "runtime session is not available".to_string(),
145 ));
146 };
147 let protocol_session = Arc::clone(session.plugins().protocol_session());
148 protocol_session.validate_turn_extension(extension).await
149 }
150
151 pub async fn branch_to_node(
152 &mut self,
153 node_id: Option<String>,
154 ) -> Result<crate::SessionSnapshot, SessionError> {
155 let mut state = self.export_state();
156 state.session_graph.branch_to(node_id);
157 let mut persisted_state = RuntimeSessionState::from_snapshot(state);
158 normalize_session_graph(&mut persisted_state);
159
160 let policy = persisted_state.policy.clone();
161 let host = self.host.clone();
162 let services = self.services.clone();
163 let managed_sessions = Arc::clone(&self.managed_sessions);
164 let managed_turns = Arc::clone(&self.managed_turns);
165 let process_sync_needed = Arc::clone(&self.process_sync_needed);
166 let runtime_scope_id = Arc::clone(&self.runtime_scope_id);
167 let runtime_lease_owner = self.runtime_lease_owner.clone();
168 let turn_phase_probe = self.turn_phase_probe.clone();
169
170 let mut rebuilt = Self::from_host_state(policy, host, services, persisted_state).await?;
171 rebuilt.managed_sessions = managed_sessions;
172 rebuilt.managed_turns = managed_turns;
173 rebuilt.process_sync_needed = process_sync_needed;
174 rebuilt.runtime_scope_id = runtime_scope_id;
175 rebuilt.runtime_lease_owner = runtime_lease_owner;
176 rebuilt.turn_phase_probe = turn_phase_probe;
177
178 let exported = rebuilt.export_state();
179 *self = rebuilt;
180 Ok(exported)
181 }
182
183 pub async fn activate_managed_session(&mut self, session_id: &str) -> Result<(), SessionError> {
189 let child = {
190 let mut registry = self.managed_sessions.lock().await;
191 registry.remove(session_id).ok_or_else(|| {
192 SessionError::Protocol(format!("unknown managed session `{session_id}`"))
193 })?
194 };
195 let child = child.try_into_runtime().map_err(|_| {
196 SessionError::Protocol(format!("managed session `{session_id}` is still in use"))
197 })?;
198 *self = child;
199 Ok(())
200 }
201
202 pub async fn snapshot_execution_state(&mut self) -> Result<Option<Vec<u8>>, SessionError> {
204 let Some(session) = self.session.as_mut() else {
205 return Err(SessionError::Protocol(
206 "runtime session not available".to_string(),
207 ));
208 };
209 let code_executor = session
210 .plugins()
211 .code_executor()
212 .ok_or(SessionError::CodeExecutionUnavailable)?;
213 let session_id = self.state.session_id.clone();
214 let blob = code_executor
215 .snapshot_execution_state(crate::plugin::ProtocolSessionContext::new(
216 session,
217 &session_id,
218 ))
219 .await?;
220 self.state.execution_state_snapshot = blob.clone();
221 Ok(blob)
222 }
223
224 pub async fn restore_execution_state(&mut self, snapshot: &[u8]) -> Result<(), SessionError> {
226 let Some(session) = self.session.as_mut() else {
227 return Err(SessionError::Protocol(
228 "runtime session not available".to_string(),
229 ));
230 };
231 let code_executor = session
232 .plugins()
233 .code_executor()
234 .ok_or(SessionError::CodeExecutionUnavailable)?;
235 let session_id = self.state.session_id.clone();
236 code_executor
237 .restore_execution_state(
238 crate::plugin::ProtocolSessionContext::new(session, &session_id),
239 snapshot,
240 )
241 .await?;
242 self.state.execution_state_snapshot = Some(snapshot.to_vec());
243 Ok(())
244 }
245
246 pub async fn list_trigger_registrations(
247 &self,
248 ) -> Result<Vec<crate::TriggerRegistration>, SessionError> {
249 let store = self.host.trigger_store.as_ref().ok_or_else(|| {
250 SessionError::Protocol("trigger store is unavailable in this runtime".to_string())
251 })?;
252 let records = store
253 .list_subscriptions(crate::TriggerSubscriptionFilter::for_session(
254 self.state.session_id.clone(),
255 ))
256 .await
257 .map_err(|err| SessionError::Protocol(err.to_string()))?;
258 Ok(records
259 .iter()
260 .map(crate::TriggerRegistration::from)
261 .collect())
262 }
263
264 pub async fn trigger_registrations_by_source_type(
265 &self,
266 source_type: impl Into<crate::TriggerEventType>,
267 ) -> Result<Vec<crate::TriggerRegistration>, SessionError> {
268 let store = self.host.trigger_store.as_ref().ok_or_else(|| {
269 SessionError::Protocol("trigger store is unavailable in this runtime".to_string())
270 })?;
271 let mut filter =
272 crate::TriggerSubscriptionFilter::for_session(self.state.session_id.clone());
273 filter.source_type = Some(source_type.into().to_string());
274 let records = store
275 .list_subscriptions(filter)
276 .await
277 .map_err(|err| SessionError::Protocol(err.to_string()))?;
278 Ok(records
279 .iter()
280 .map(crate::TriggerRegistration::from)
281 .collect())
282 }
283
284 pub async fn query_plugin(
285 &self,
286 name: &str,
287 args: serde_json::Value,
288 session_id: Option<String>,
289 ) -> Result<(String, serde_json::Value), PluginOperationInvokeError> {
290 let manager = self.runtime_session_services()?;
291 let Some(session) = self.session.as_ref() else {
292 return Err(PluginOperationInvokeError::Unknown(
293 "runtime session not available".to_string(),
294 ));
295 };
296 session
297 .plugins()
298 .query_plugin(
299 name,
300 args,
301 session_id,
302 true,
303 manager.read_service(),
304 manager.process_read_service(),
305 )
306 .await
307 }
308
309 pub async fn run_plugin_command(
310 &mut self,
311 name: &str,
312 args: serde_json::Value,
313 session_id: Option<String>,
314 ) -> Result<crate::PluginCommandReceipt<serde_json::Value>, PluginOperationInvokeError> {
315 let manager = self.runtime_session_services()?;
316 let Some(session) = self.session.as_ref() else {
317 return Err(PluginOperationInvokeError::Unknown(
318 "runtime session not available".to_string(),
319 ));
320 };
321 let (plugin_id, outcome) = session
322 .plugins()
323 .run_plugin_command(
324 name,
325 args,
326 session_id,
327 true,
328 manager.state_service(),
329 manager.lifecycle_service(),
330 manager.graph_service(),
331 manager.process_service(),
332 )
333 .await?;
334 let (events, pending_turn_inputs) = self
335 .apply_plugin_operation_effects(&plugin_id, outcome.events, outcome.directives)
336 .await?;
337 Ok(crate::PluginCommandReceipt {
338 output: outcome.output,
339 events,
340 pending_turn_inputs,
341 })
342 }
343
344 pub async fn run_plugin_task(
345 &mut self,
346 name: &str,
347 args: serde_json::Value,
348 session_id: Option<String>,
349 scoped_effect_controller: crate::ScopedEffectController<'static>,
350 cancellation_token: tokio_util::sync::CancellationToken,
351 ) -> Result<crate::PluginTaskReceipt<serde_json::Value>, PluginOperationInvokeError> {
352 let manager = self.runtime_session_services()?;
353 let Some(session) = self.session.as_ref() else {
354 return Err(PluginOperationInvokeError::Unknown(
355 "runtime session not available".to_string(),
356 ));
357 };
358 let (plugin_id, outcome) = session
359 .plugins()
360 .run_plugin_task(
361 name,
362 args,
363 session_id,
364 true,
365 manager.state_service(),
366 manager.lifecycle_service(),
367 manager.graph_service(),
368 manager.process_service(),
369 scoped_effect_controller,
370 cancellation_token,
371 )
372 .await?;
373 let (events, pending_turn_inputs) = self
374 .apply_plugin_operation_effects(&plugin_id, outcome.events, outcome.directives)
375 .await?;
376 Ok(crate::PluginTaskReceipt {
377 output: outcome.output,
378 events,
379 pending_turn_inputs,
380 })
381 }
382
383 async fn apply_plugin_operation_effects(
384 &mut self,
385 plugin_id: &str,
386 events: Vec<crate::PluginRuntimeEvent>,
387 directives: Vec<crate::PluginRuntimeDirective>,
388 ) -> Result<
389 (
390 Vec<crate::PluginOwned<crate::PluginRuntimeEvent>>,
391 Vec<crate::PendingTurnInput>,
392 ),
393 PluginOperationInvokeError,
394 > {
395 let owned_events = events
396 .into_iter()
397 .map(|event| crate::PluginOwned {
398 plugin_id: plugin_id.to_string(),
399 value: event,
400 })
401 .collect::<Vec<_>>();
402 if !owned_events.is_empty() {
403 let nodes = owned_events
404 .iter()
405 .map(|owned| {
406 crate::plugin_runtime_protocol_event(&owned.plugin_id, owned.value.clone())
407 .map(crate::SessionAppendNode::protocol_event)
408 .map_err(|err| {
409 PluginOperationInvokeError::Failed(format!(
410 "failed to encode plugin runtime event: {err}"
411 ))
412 })
413 })
414 .collect::<Result<Vec<_>, _>>()?;
415 self.append_plugin_runtime_event_nodes(&nodes).await?;
416 }
417 self.stamp_live_plugin_state();
418 self.persist_plugin_operation_state_if_needed().await?;
419
420 let mut pending_turn_inputs = Vec::new();
421 for directive in directives {
422 match directive {
423 crate::PluginRuntimeDirective::QueueTurn { input, source_key } => {
424 let pending = self
425 .enqueue_turn_input(input, crate::TurnInputIngress::NextTurn, source_key)
426 .await
427 .map_err(|err| {
428 PluginOperationInvokeError::Failed(format!(
429 "failed to queue plugin turn request: {err}"
430 ))
431 })?;
432 pending_turn_inputs.push(pending);
433 }
434 }
435 }
436
437 Ok((owned_events, pending_turn_inputs))
438 }
439
440 async fn append_plugin_runtime_event_nodes(
441 &mut self,
442 nodes: &[crate::SessionAppendNode],
443 ) -> Result<(), PluginOperationInvokeError> {
444 let node_ids = append_session_nodes_to_state_with_clock(
445 &mut self.state,
446 nodes,
447 self.host.core.clock.as_ref(),
448 );
449 if let Some(store) = self
450 .session
451 .as_ref()
452 .and_then(|session| session.history_store())
453 {
454 let graph = crate::store::GraphCommitDelta::Append {
455 nodes: node_ids
456 .iter()
457 .filter_map(|id| self.state.session_graph.find_node(id).cloned())
458 .collect(),
459 leaf_node_id: self.state.session_graph.leaf_node_id.clone(),
460 };
461 let commit = crate::store::RuntimeCommit::persisted_state_with_graph_commit(
462 &self.state,
463 graph,
464 &[],
465 );
466 let result = super::commit_runtime_state_with_fresh_session_execution_lease(
467 store,
468 commit,
469 &self.runtime_lease_owner,
470 self.host.core.control.lease_timings,
471 Arc::clone(&self.host.core.clock),
472 )
473 .await
474 .map_err(|err| {
475 PluginOperationInvokeError::Failed(format!(
476 "failed to persist plugin runtime events: {err}"
477 ))
478 })?;
479 self.state.apply_persisted_commit_result(result);
480 }
481 Ok(())
482 }
483
484 async fn persist_plugin_operation_state_if_needed(
485 &mut self,
486 ) -> Result<(), PluginOperationInvokeError> {
487 let Some(store) = self
488 .session
489 .as_ref()
490 .and_then(|session| session.history_store())
491 else {
492 return Ok(());
493 };
494 let commit = crate::store::RuntimeCommit::persisted_state(&self.state, &[]);
495 let result = super::commit_runtime_state_with_fresh_session_execution_lease(
496 store,
497 commit,
498 &self.runtime_lease_owner,
499 self.host.core.control.lease_timings,
500 Arc::clone(&self.host.core.clock),
501 )
502 .await
503 .map_err(|err| {
504 PluginOperationInvokeError::Failed(format!(
505 "failed to persist plugin operation state: {err}"
506 ))
507 })?;
508 self.state.apply_persisted_commit_result(result);
509 Ok(())
510 }
511}