1use std::sync::Arc;
7
8use crate::{PluginOperationInvokeError, SessionError};
9
10use super::LashRuntime;
11use super::state::{
12 RuntimeSessionState, append_session_nodes_to_state_with_clock, normalize_session_graph,
13};
14
15impl LashRuntime {
16 pub fn set_persisted_state(&mut self, state: RuntimeSessionState) -> Result<(), SessionError> {
18 let mut state = state;
19 normalize_session_graph(&mut state);
20 if let Some(session) = self.session.as_ref() {
21 session.invalidate_runtime_caches();
22 if let Some(tool_state) = state.tool_state_snapshot.clone() {
28 let report = session
29 .plugins()
30 .tool_registry()
31 .restore_state(tool_state)
32 .map_err(|err| SessionError::Protocol(err.to_string()))?;
33 if !report.orphaned.is_empty() {
34 tracing::warn!(
35 session_id = %state.session_id,
36 orphaned = ?report.orphaned,
37 "persisted state installed with orphaned tools: no registered \
38 source resolves them; they remain non-members until their source returns"
39 );
40 }
41 }
42 let snapshot = state.plugin_snapshot.clone().unwrap_or_default();
43 session
44 .plugins()
45 .restore(&snapshot)
46 .map_err(|err| SessionError::Protocol(err.to_string()))?;
47 state.plugin_snapshot_revision =
48 Some(session.plugins().snapshot_revision_fingerprint());
49 }
50 self.policy = state.policy.clone();
51 self.protocol_turn_options = state.protocol_turn_options.clone();
52 self.state = state;
53 Ok(())
54 }
55
56 pub async fn append_session_nodes(
57 &mut self,
58 request: crate::AppendSessionNodesRequest,
59 ) -> Result<crate::AppendSessionNodesResult, SessionError> {
60 self.refresh_session_graph_from_store().await?;
61 if let Some(required) = request.requires_ancestor_node_id.as_deref()
62 && !self.state.session_graph.active_path_contains(required)
63 {
64 return Ok(crate::AppendSessionNodesResult::StaleBranch {
65 current_leaf_node_id: self.state.session_graph.leaf_node_id.clone(),
66 });
67 }
68 let node_ids = append_session_nodes_to_state_with_clock(
69 &mut self.state,
70 &request.nodes,
71 self.host.core.clock.as_ref(),
72 );
73 if let Some(session) = self.session.as_mut() {
74 let protocol_session = Arc::clone(session.plugins().protocol_session());
75 let session_id = self.state.session_id.clone();
76 protocol_session
77 .append_session_nodes(
78 crate::plugin::ProtocolSessionContext::new(session, &session_id),
79 &request.nodes,
80 )
81 .await?;
82 }
83 self.stamp_live_plugin_state();
84 if let Some(store) = self
85 .session
86 .as_ref()
87 .and_then(|session| session.history_store())
88 {
89 let graph = crate::store::GraphCommitDelta::Append {
90 nodes: node_ids
91 .iter()
92 .filter_map(|id| self.state.session_graph.find_node(id).cloned())
93 .collect(),
94 leaf_node_id: self.state.session_graph.leaf_node_id.clone(),
95 };
96 let commit = crate::store::RuntimeCommit::persisted_state_with_graph_commit(
97 &self.state,
98 graph,
99 &[],
100 );
101 match super::commit_runtime_state_with_fresh_session_execution_lease(
102 store,
103 commit,
104 &self.runtime_lease_owner,
105 Arc::clone(&self.host.core.clock),
106 )
107 .await
108 {
109 Ok(result) => self.state.apply_persisted_commit_result(result),
110 Err(err) => tracing::warn!("failed to persist runtime state: {err}"),
111 }
112 }
113 Ok(crate::AppendSessionNodesResult::Appended {
114 node_ids,
115 leaf_node_id: self
116 .state
117 .session_graph
118 .leaf_node_id
119 .clone()
120 .unwrap_or_default(),
121 })
122 }
123
124 pub async fn apply_protocol_session_extension(
125 &mut self,
126 extension: crate::ProtocolSessionExtensionHandle,
127 ) -> Result<(), SessionError> {
128 let Some(session) = self.session.as_ref() else {
129 return Err(SessionError::Protocol(
130 "runtime session is not available".to_string(),
131 ));
132 };
133 let protocol_session = Arc::clone(session.plugins().protocol_session());
134 protocol_session.apply_session_extension(extension).await
135 }
136
137 pub async fn validate_protocol_turn_extension(
138 &mut self,
139 extension: &crate::ProtocolTurnExtensionHandle,
140 ) -> Result<(), SessionError> {
141 let Some(session) = self.session.as_ref() else {
142 return Err(SessionError::Protocol(
143 "runtime session is not available".to_string(),
144 ));
145 };
146 let protocol_session = Arc::clone(session.plugins().protocol_session());
147 protocol_session.validate_turn_extension(extension).await
148 }
149
150 pub async fn branch_to_node(
151 &mut self,
152 node_id: Option<String>,
153 ) -> Result<crate::SessionSnapshot, SessionError> {
154 let mut state = self.export_state();
155 state.session_graph.branch_to(node_id);
156 let mut persisted_state = RuntimeSessionState::from_snapshot(state);
157 normalize_session_graph(&mut persisted_state);
158
159 let policy = persisted_state.policy.clone();
160 let host = self.host.clone();
161 let services = self.services.clone();
162 let managed_sessions = Arc::clone(&self.managed_sessions);
163 let managed_turns = Arc::clone(&self.managed_turns);
164 let process_sync_needed = Arc::clone(&self.process_sync_needed);
165 let runtime_scope_id = Arc::clone(&self.runtime_scope_id);
166 let runtime_lease_owner = self.runtime_lease_owner.clone();
167 let turn_phase_probe = self.turn_phase_probe.clone();
168
169 let mut rebuilt = Self::from_host_state(policy, host, services, persisted_state).await?;
170 rebuilt.managed_sessions = managed_sessions;
171 rebuilt.managed_turns = managed_turns;
172 rebuilt.process_sync_needed = process_sync_needed;
173 rebuilt.runtime_scope_id = runtime_scope_id;
174 rebuilt.runtime_lease_owner = runtime_lease_owner;
175 rebuilt.turn_phase_probe = turn_phase_probe;
176
177 let exported = rebuilt.export_state();
178 *self = rebuilt;
179 Ok(exported)
180 }
181
182 pub async fn activate_managed_session(&mut self, session_id: &str) -> Result<(), SessionError> {
188 let child = {
189 let mut registry = self.managed_sessions.lock().await;
190 registry.remove(session_id).ok_or_else(|| {
191 SessionError::Protocol(format!("unknown managed session `{session_id}`"))
192 })?
193 };
194 let child = child.try_into_runtime().map_err(|_| {
195 SessionError::Protocol(format!("managed session `{session_id}` is still in use"))
196 })?;
197 *self = child;
198 Ok(())
199 }
200
201 pub async fn snapshot_execution_state(&mut self) -> Result<Option<Vec<u8>>, SessionError> {
203 let Some(session) = self.session.as_mut() else {
204 return Err(SessionError::Protocol(
205 "runtime session not available".to_string(),
206 ));
207 };
208 let code_executor = session
209 .plugins()
210 .code_executor()
211 .ok_or(SessionError::CodeExecutionUnavailable)?;
212 let session_id = self.state.session_id.clone();
213 let blob = code_executor
214 .snapshot_execution_state(crate::plugin::ProtocolSessionContext::new(
215 session,
216 &session_id,
217 ))
218 .await?;
219 self.state.execution_state_snapshot = blob.clone();
220 Ok(blob)
221 }
222
223 pub async fn restore_execution_state(&mut self, snapshot: &[u8]) -> Result<(), SessionError> {
225 let Some(session) = self.session.as_mut() else {
226 return Err(SessionError::Protocol(
227 "runtime session not available".to_string(),
228 ));
229 };
230 let code_executor = session
231 .plugins()
232 .code_executor()
233 .ok_or(SessionError::CodeExecutionUnavailable)?;
234 let session_id = self.state.session_id.clone();
235 code_executor
236 .restore_execution_state(
237 crate::plugin::ProtocolSessionContext::new(session, &session_id),
238 snapshot,
239 )
240 .await?;
241 self.state.execution_state_snapshot = Some(snapshot.to_vec());
242 Ok(())
243 }
244
245 pub async fn list_trigger_registrations(
246 &self,
247 ) -> Result<Vec<crate::TriggerRegistration>, SessionError> {
248 let store = self.host.trigger_store.as_ref().ok_or_else(|| {
249 SessionError::Protocol("trigger store is unavailable in this runtime".to_string())
250 })?;
251 let records = store
252 .list_subscriptions(crate::TriggerSubscriptionFilter::for_session(
253 self.state.session_id.clone(),
254 ))
255 .await
256 .map_err(|err| SessionError::Protocol(err.to_string()))?;
257 Ok(records
258 .iter()
259 .map(crate::TriggerRegistration::from)
260 .collect())
261 }
262
263 pub async fn trigger_registrations_by_source_type(
264 &self,
265 source_type: impl Into<crate::TriggerEventType>,
266 ) -> Result<Vec<crate::TriggerRegistration>, SessionError> {
267 let store = self.host.trigger_store.as_ref().ok_or_else(|| {
268 SessionError::Protocol("trigger store is unavailable in this runtime".to_string())
269 })?;
270 let mut filter =
271 crate::TriggerSubscriptionFilter::for_session(self.state.session_id.clone());
272 filter.source_type = Some(source_type.into().to_string());
273 let records = store
274 .list_subscriptions(filter)
275 .await
276 .map_err(|err| SessionError::Protocol(err.to_string()))?;
277 Ok(records
278 .iter()
279 .map(crate::TriggerRegistration::from)
280 .collect())
281 }
282
283 pub async fn query_plugin(
284 &self,
285 name: &str,
286 args: serde_json::Value,
287 session_id: Option<String>,
288 ) -> Result<(String, serde_json::Value), PluginOperationInvokeError> {
289 let manager = self.runtime_session_services()?;
290 let Some(session) = self.session.as_ref() else {
291 return Err(PluginOperationInvokeError::Unknown(
292 "runtime session not available".to_string(),
293 ));
294 };
295 session
296 .plugins()
297 .query_plugin(
298 name,
299 args,
300 session_id,
301 true,
302 manager.read_service(),
303 manager.process_read_service(),
304 )
305 .await
306 }
307
308 pub async fn run_plugin_command(
309 &mut self,
310 name: &str,
311 args: serde_json::Value,
312 session_id: Option<String>,
313 ) -> Result<crate::PluginCommandReceipt<serde_json::Value>, PluginOperationInvokeError> {
314 let manager = self.runtime_session_services()?;
315 let Some(session) = self.session.as_ref() else {
316 return Err(PluginOperationInvokeError::Unknown(
317 "runtime session not available".to_string(),
318 ));
319 };
320 let (plugin_id, outcome) = session
321 .plugins()
322 .run_plugin_command(
323 name,
324 args,
325 session_id,
326 true,
327 manager.state_service(),
328 manager.lifecycle_service(),
329 manager.graph_service(),
330 manager.process_service(),
331 )
332 .await?;
333 let (events, queued_batches) = self
334 .apply_plugin_operation_effects(&plugin_id, outcome.events, outcome.directives)
335 .await?;
336 Ok(crate::PluginCommandReceipt {
337 output: outcome.output,
338 events,
339 queued_batches,
340 })
341 }
342
343 pub async fn run_plugin_task(
344 &mut self,
345 name: &str,
346 args: serde_json::Value,
347 session_id: Option<String>,
348 scoped_effect_controller: crate::ScopedEffectController<'static>,
349 cancellation_token: tokio_util::sync::CancellationToken,
350 ) -> Result<crate::PluginTaskReceipt<serde_json::Value>, PluginOperationInvokeError> {
351 let manager = self.runtime_session_services()?;
352 let Some(session) = self.session.as_ref() else {
353 return Err(PluginOperationInvokeError::Unknown(
354 "runtime session not available".to_string(),
355 ));
356 };
357 let (plugin_id, outcome) = session
358 .plugins()
359 .run_plugin_task(
360 name,
361 args,
362 session_id,
363 true,
364 manager.state_service(),
365 manager.lifecycle_service(),
366 manager.graph_service(),
367 manager.process_service(),
368 scoped_effect_controller,
369 cancellation_token,
370 )
371 .await?;
372 let (events, queued_batches) = self
373 .apply_plugin_operation_effects(&plugin_id, outcome.events, outcome.directives)
374 .await?;
375 Ok(crate::PluginTaskReceipt {
376 output: outcome.output,
377 events,
378 queued_batches,
379 })
380 }
381
382 async fn apply_plugin_operation_effects(
383 &mut self,
384 plugin_id: &str,
385 events: Vec<crate::PluginRuntimeEvent>,
386 directives: Vec<crate::PluginRuntimeDirective>,
387 ) -> Result<
388 (
389 Vec<crate::PluginOwned<crate::PluginRuntimeEvent>>,
390 Vec<crate::runtime::QueuedWorkBatch>,
391 ),
392 PluginOperationInvokeError,
393 > {
394 let owned_events = events
395 .into_iter()
396 .map(|event| crate::PluginOwned {
397 plugin_id: plugin_id.to_string(),
398 value: event,
399 })
400 .collect::<Vec<_>>();
401 if !owned_events.is_empty() {
402 let nodes = owned_events
403 .iter()
404 .map(|owned| {
405 crate::plugin_runtime_protocol_event(&owned.plugin_id, owned.value.clone())
406 .map(crate::SessionAppendNode::protocol_event)
407 .map_err(|err| {
408 PluginOperationInvokeError::Failed(format!(
409 "failed to encode plugin runtime event: {err}"
410 ))
411 })
412 })
413 .collect::<Result<Vec<_>, _>>()?;
414 self.append_plugin_runtime_event_nodes(&nodes).await?;
415 }
416 self.stamp_live_plugin_state();
417 self.persist_plugin_operation_state_if_needed().await?;
418
419 let mut queued_batches = Vec::new();
420 for directive in directives {
421 match directive {
422 crate::PluginRuntimeDirective::QueueTurn {
423 input,
424 delivery_policy,
425 slot_policy,
426 source_key,
427 } => {
428 let batch = self
429 .enqueue_turn_input(input, delivery_policy, slot_policy, source_key)
430 .await
431 .map_err(|err| {
432 PluginOperationInvokeError::Failed(format!(
433 "failed to queue plugin turn request: {err}"
434 ))
435 })?;
436 queued_batches.push(batch);
437 }
438 }
439 }
440
441 Ok((owned_events, queued_batches))
442 }
443
444 async fn append_plugin_runtime_event_nodes(
445 &mut self,
446 nodes: &[crate::SessionAppendNode],
447 ) -> Result<(), PluginOperationInvokeError> {
448 let node_ids = append_session_nodes_to_state_with_clock(
449 &mut self.state,
450 nodes,
451 self.host.core.clock.as_ref(),
452 );
453 if let Some(store) = self
454 .session
455 .as_ref()
456 .and_then(|session| session.history_store())
457 {
458 let graph = crate::store::GraphCommitDelta::Append {
459 nodes: node_ids
460 .iter()
461 .filter_map(|id| self.state.session_graph.find_node(id).cloned())
462 .collect(),
463 leaf_node_id: self.state.session_graph.leaf_node_id.clone(),
464 };
465 let commit = crate::store::RuntimeCommit::persisted_state_with_graph_commit(
466 &self.state,
467 graph,
468 &[],
469 );
470 let result = super::commit_runtime_state_with_fresh_session_execution_lease(
471 store,
472 commit,
473 &self.runtime_lease_owner,
474 Arc::clone(&self.host.core.clock),
475 )
476 .await
477 .map_err(|err| {
478 PluginOperationInvokeError::Failed(format!(
479 "failed to persist plugin runtime events: {err}"
480 ))
481 })?;
482 self.state.apply_persisted_commit_result(result);
483 }
484 Ok(())
485 }
486
487 async fn persist_plugin_operation_state_if_needed(
488 &mut self,
489 ) -> Result<(), PluginOperationInvokeError> {
490 let Some(store) = self
491 .session
492 .as_ref()
493 .and_then(|session| session.history_store())
494 else {
495 return Ok(());
496 };
497 let commit = crate::store::RuntimeCommit::persisted_state(&self.state, &[]);
498 let result = super::commit_runtime_state_with_fresh_session_execution_lease(
499 store,
500 commit,
501 &self.runtime_lease_owner,
502 Arc::clone(&self.host.core.clock),
503 )
504 .await
505 .map_err(|err| {
506 PluginOperationInvokeError::Failed(format!(
507 "failed to persist plugin operation state: {err}"
508 ))
509 })?;
510 self.state.apply_persisted_commit_result(result);
511 Ok(())
512 }
513}