1use std::sync::Arc;
7
8use crate::{PluginOperationInvokeError, SessionError};
9
10use super::LashRuntime;
11use super::state::{
12 RuntimeSessionState, append_session_nodes_to_state_with_clock, normalize_session_graph,
13};
14
15impl LashRuntime {
16 pub fn set_persisted_state(&mut self, state: RuntimeSessionState) -> Result<(), SessionError> {
18 let mut state = state;
19 normalize_session_graph(&mut state);
20 if let Some(session) = self.session.as_ref() {
21 session.invalidate_runtime_caches();
22 if let Some(tool_state) = state.tool_state_snapshot.clone() {
28 let report = session
29 .plugins()
30 .tool_registry()
31 .restore_state(tool_state)
32 .map_err(|err| SessionError::Protocol(err.to_string()))?;
33 if !report.orphaned.is_empty() {
34 tracing::warn!(
35 session_id = %state.session_id,
36 orphaned = ?report.orphaned,
37 "persisted state installed with orphaned tools: no registered \
38 source resolves them; they are Off until their source returns"
39 );
40 }
41 }
42 let snapshot = state.plugin_snapshot.clone().unwrap_or_default();
43 session
44 .plugins()
45 .restore(&snapshot)
46 .map_err(|err| SessionError::Protocol(err.to_string()))?;
47 state.plugin_snapshot_revision =
48 Some(session.plugins().snapshot_revision_fingerprint());
49 }
50 self.policy = state.policy.clone();
51 self.protocol_turn_options = state.protocol_turn_options.clone();
52 self.state = state;
53 Ok(())
54 }
55
56 pub async fn append_session_nodes(
57 &mut self,
58 request: crate::AppendSessionNodesRequest,
59 ) -> Result<crate::AppendSessionNodesResult, SessionError> {
60 self.refresh_session_graph_from_store().await?;
61 if let Some(required) = request.requires_ancestor_node_id.as_deref()
62 && !self.state.session_graph.active_path_contains(required)
63 {
64 return Ok(crate::AppendSessionNodesResult::StaleBranch {
65 current_leaf_node_id: self.state.session_graph.leaf_node_id.clone(),
66 });
67 }
68 let node_ids = append_session_nodes_to_state_with_clock(
69 &mut self.state,
70 &request.nodes,
71 self.host.core.clock.as_ref(),
72 );
73 if let Some(session) = self.session.as_mut() {
74 let protocol_session = Arc::clone(session.plugins().protocol_session());
75 let session_id = self.state.session_id.clone();
76 protocol_session
77 .append_session_nodes(
78 crate::plugin::ProtocolSessionContext::new(session, &session_id),
79 &request.nodes,
80 )
81 .await?;
82 }
83 self.stamp_live_plugin_state();
84 if let Some(store) = self
85 .session
86 .as_ref()
87 .and_then(|session| session.history_store())
88 {
89 let graph = crate::store::GraphCommitDelta::Append {
90 nodes: node_ids
91 .iter()
92 .filter_map(|id| self.state.session_graph.find_node(id).cloned())
93 .collect(),
94 leaf_node_id: self.state.session_graph.leaf_node_id.clone(),
95 };
96 let commit = crate::store::RuntimeCommit::persisted_state_with_graph_commit(
97 &self.state,
98 graph,
99 &[],
100 );
101 match super::commit_runtime_state_with_fresh_session_execution_lease(
102 store,
103 commit,
104 "runtime.append_session_nodes",
105 Arc::clone(&self.host.core.clock),
106 )
107 .await
108 {
109 Ok(result) => self.state.apply_persisted_commit_result(result),
110 Err(err) => tracing::warn!("failed to persist runtime state: {err}"),
111 }
112 }
113 Ok(crate::AppendSessionNodesResult::Appended {
114 node_ids,
115 leaf_node_id: self
116 .state
117 .session_graph
118 .leaf_node_id
119 .clone()
120 .unwrap_or_default(),
121 })
122 }
123
124 pub async fn apply_protocol_session_extension(
125 &mut self,
126 extension: crate::ProtocolSessionExtensionHandle,
127 ) -> Result<(), SessionError> {
128 let Some(session) = self.session.as_ref() else {
129 return Err(SessionError::Protocol(
130 "runtime session is not available".to_string(),
131 ));
132 };
133 let protocol_session = Arc::clone(session.plugins().protocol_session());
134 protocol_session.apply_session_extension(extension).await
135 }
136
137 pub async fn validate_protocol_turn_extension(
138 &mut self,
139 extension: &crate::ProtocolTurnExtensionHandle,
140 ) -> Result<(), SessionError> {
141 let Some(session) = self.session.as_ref() else {
142 return Err(SessionError::Protocol(
143 "runtime session is not available".to_string(),
144 ));
145 };
146 let protocol_session = Arc::clone(session.plugins().protocol_session());
147 protocol_session.validate_turn_extension(extension).await
148 }
149
150 pub async fn branch_to_node(
151 &mut self,
152 node_id: Option<String>,
153 ) -> Result<crate::SessionSnapshot, SessionError> {
154 let mut state = self.export_state();
155 state.session_graph.branch_to(node_id);
156 let mut persisted_state = RuntimeSessionState::from_snapshot(state);
157 normalize_session_graph(&mut persisted_state);
158
159 let policy = persisted_state.policy.clone();
160 let host = self.host.clone();
161 let services = self.services.clone();
162 let managed_sessions = Arc::clone(&self.managed_sessions);
163 let managed_turns = Arc::clone(&self.managed_turns);
164 let process_sync_needed = Arc::clone(&self.process_sync_needed);
165 let runtime_scope_id = Arc::clone(&self.runtime_scope_id);
166 let turn_phase_probe = self.turn_phase_probe.clone();
167
168 let mut rebuilt = Self::from_host_state(policy, host, services, persisted_state).await?;
169 rebuilt.managed_sessions = managed_sessions;
170 rebuilt.managed_turns = managed_turns;
171 rebuilt.process_sync_needed = process_sync_needed;
172 rebuilt.runtime_scope_id = runtime_scope_id;
173 rebuilt.turn_phase_probe = turn_phase_probe;
174
175 let exported = rebuilt.export_state();
176 *self = rebuilt;
177 Ok(exported)
178 }
179
180 pub async fn activate_managed_session(&mut self, session_id: &str) -> Result<(), SessionError> {
186 let child = {
187 let mut registry = self.managed_sessions.lock().await;
188 registry.remove(session_id).ok_or_else(|| {
189 SessionError::Protocol(format!("unknown managed session `{session_id}`"))
190 })?
191 };
192 let child = child.try_into_runtime().map_err(|_| {
193 SessionError::Protocol(format!("managed session `{session_id}` is still in use"))
194 })?;
195 *self = child;
196 Ok(())
197 }
198
199 pub async fn snapshot_execution_state(&mut self) -> Result<Option<Vec<u8>>, SessionError> {
201 let Some(session) = self.session.as_mut() else {
202 return Err(SessionError::Protocol(
203 "runtime session not available".to_string(),
204 ));
205 };
206 let code_executor = session
207 .plugins()
208 .code_executor()
209 .ok_or(SessionError::CodeExecutionUnavailable)?;
210 let session_id = self.state.session_id.clone();
211 let blob = code_executor
212 .snapshot_execution_state(crate::plugin::ProtocolSessionContext::new(
213 session,
214 &session_id,
215 ))
216 .await?;
217 self.state.execution_state_snapshot = blob.clone();
218 Ok(blob)
219 }
220
221 pub async fn restore_execution_state(&mut self, snapshot: &[u8]) -> Result<(), SessionError> {
223 let Some(session) = self.session.as_mut() else {
224 return Err(SessionError::Protocol(
225 "runtime session not available".to_string(),
226 ));
227 };
228 let code_executor = session
229 .plugins()
230 .code_executor()
231 .ok_or(SessionError::CodeExecutionUnavailable)?;
232 let session_id = self.state.session_id.clone();
233 code_executor
234 .restore_execution_state(
235 crate::plugin::ProtocolSessionContext::new(session, &session_id),
236 snapshot,
237 )
238 .await?;
239 self.state.execution_state_snapshot = Some(snapshot.to_vec());
240 Ok(())
241 }
242
243 pub async fn list_trigger_registrations(
244 &self,
245 ) -> Result<Vec<crate::TriggerRegistration>, SessionError> {
246 let store = self.host.trigger_store.as_ref().ok_or_else(|| {
247 SessionError::Protocol("trigger store is unavailable in this runtime".to_string())
248 })?;
249 let records = store
250 .list_subscriptions(crate::TriggerSubscriptionFilter::for_session(
251 self.state.session_id.clone(),
252 ))
253 .await
254 .map_err(|err| SessionError::Protocol(err.to_string()))?;
255 Ok(records
256 .iter()
257 .map(crate::TriggerRegistration::from)
258 .collect())
259 }
260
261 pub async fn trigger_registrations_by_source_type(
262 &self,
263 source_type: impl Into<crate::TriggerEventType>,
264 ) -> Result<Vec<crate::TriggerRegistration>, SessionError> {
265 let store = self.host.trigger_store.as_ref().ok_or_else(|| {
266 SessionError::Protocol("trigger store is unavailable in this runtime".to_string())
267 })?;
268 let mut filter =
269 crate::TriggerSubscriptionFilter::for_session(self.state.session_id.clone());
270 filter.source_type = Some(source_type.into().to_string());
271 let records = store
272 .list_subscriptions(filter)
273 .await
274 .map_err(|err| SessionError::Protocol(err.to_string()))?;
275 Ok(records
276 .iter()
277 .map(crate::TriggerRegistration::from)
278 .collect())
279 }
280
281 pub async fn query_plugin(
282 &self,
283 name: &str,
284 args: serde_json::Value,
285 session_id: Option<String>,
286 ) -> Result<(String, serde_json::Value), PluginOperationInvokeError> {
287 let manager = self.runtime_session_services()?;
288 let Some(session) = self.session.as_ref() else {
289 return Err(PluginOperationInvokeError::Unknown(
290 "runtime session not available".to_string(),
291 ));
292 };
293 session
294 .plugins()
295 .query_plugin(
296 name,
297 args,
298 session_id,
299 true,
300 manager.read_service(),
301 manager.process_read_service(),
302 )
303 .await
304 }
305
306 pub async fn run_plugin_command(
307 &mut self,
308 name: &str,
309 args: serde_json::Value,
310 session_id: Option<String>,
311 ) -> Result<crate::PluginCommandReceipt<serde_json::Value>, PluginOperationInvokeError> {
312 let manager = self.runtime_session_services()?;
313 let Some(session) = self.session.as_ref() else {
314 return Err(PluginOperationInvokeError::Unknown(
315 "runtime session not available".to_string(),
316 ));
317 };
318 let (plugin_id, outcome) = session
319 .plugins()
320 .run_plugin_command(
321 name,
322 args,
323 session_id,
324 true,
325 manager.state_service(),
326 manager.lifecycle_service(),
327 manager.graph_service(),
328 manager.process_service(),
329 )
330 .await?;
331 let (events, queued_batches) = self
332 .apply_plugin_operation_effects(&plugin_id, outcome.events, outcome.directives)
333 .await?;
334 Ok(crate::PluginCommandReceipt {
335 output: outcome.output,
336 events,
337 queued_batches,
338 })
339 }
340
341 pub async fn run_plugin_task(
342 &mut self,
343 name: &str,
344 args: serde_json::Value,
345 session_id: Option<String>,
346 scoped_effect_controller: crate::ScopedEffectController<'static>,
347 cancellation_token: tokio_util::sync::CancellationToken,
348 ) -> Result<crate::PluginTaskReceipt<serde_json::Value>, PluginOperationInvokeError> {
349 let manager = self.runtime_session_services()?;
350 let Some(session) = self.session.as_ref() else {
351 return Err(PluginOperationInvokeError::Unknown(
352 "runtime session not available".to_string(),
353 ));
354 };
355 let (plugin_id, outcome) = session
356 .plugins()
357 .run_plugin_task(
358 name,
359 args,
360 session_id,
361 true,
362 manager.state_service(),
363 manager.lifecycle_service(),
364 manager.graph_service(),
365 manager.process_service(),
366 scoped_effect_controller,
367 cancellation_token,
368 )
369 .await?;
370 let (events, queued_batches) = self
371 .apply_plugin_operation_effects(&plugin_id, outcome.events, outcome.directives)
372 .await?;
373 Ok(crate::PluginTaskReceipt {
374 output: outcome.output,
375 events,
376 queued_batches,
377 })
378 }
379
380 async fn apply_plugin_operation_effects(
381 &mut self,
382 plugin_id: &str,
383 events: Vec<crate::PluginRuntimeEvent>,
384 directives: Vec<crate::PluginRuntimeDirective>,
385 ) -> Result<
386 (
387 Vec<crate::PluginOwned<crate::PluginRuntimeEvent>>,
388 Vec<crate::runtime::QueuedWorkBatch>,
389 ),
390 PluginOperationInvokeError,
391 > {
392 let owned_events = events
393 .into_iter()
394 .map(|event| crate::PluginOwned {
395 plugin_id: plugin_id.to_string(),
396 value: event,
397 })
398 .collect::<Vec<_>>();
399 if !owned_events.is_empty() {
400 let nodes = owned_events
401 .iter()
402 .map(|owned| {
403 crate::plugin_runtime_protocol_event(&owned.plugin_id, owned.value.clone())
404 .map(crate::SessionAppendNode::protocol_event)
405 .map_err(|err| {
406 PluginOperationInvokeError::Failed(format!(
407 "failed to encode plugin runtime event: {err}"
408 ))
409 })
410 })
411 .collect::<Result<Vec<_>, _>>()?;
412 self.append_plugin_runtime_event_nodes(&nodes).await?;
413 }
414 self.stamp_live_plugin_state();
415 self.persist_plugin_operation_state_if_needed().await?;
416
417 let mut queued_batches = Vec::new();
418 for directive in directives {
419 match directive {
420 crate::PluginRuntimeDirective::QueueTurn {
421 input,
422 delivery_policy,
423 slot_policy,
424 source_key,
425 } => {
426 let batch = self
427 .enqueue_turn_input(input, delivery_policy, slot_policy, source_key)
428 .await
429 .map_err(|err| {
430 PluginOperationInvokeError::Failed(format!(
431 "failed to queue plugin turn request: {err}"
432 ))
433 })?;
434 queued_batches.push(batch);
435 }
436 }
437 }
438
439 Ok((owned_events, queued_batches))
440 }
441
442 async fn append_plugin_runtime_event_nodes(
443 &mut self,
444 nodes: &[crate::SessionAppendNode],
445 ) -> Result<(), PluginOperationInvokeError> {
446 let node_ids = append_session_nodes_to_state_with_clock(
447 &mut self.state,
448 nodes,
449 self.host.core.clock.as_ref(),
450 );
451 if let Some(store) = self
452 .session
453 .as_ref()
454 .and_then(|session| session.history_store())
455 {
456 let graph = crate::store::GraphCommitDelta::Append {
457 nodes: node_ids
458 .iter()
459 .filter_map(|id| self.state.session_graph.find_node(id).cloned())
460 .collect(),
461 leaf_node_id: self.state.session_graph.leaf_node_id.clone(),
462 };
463 let commit = crate::store::RuntimeCommit::persisted_state_with_graph_commit(
464 &self.state,
465 graph,
466 &[],
467 );
468 let result = super::commit_runtime_state_with_fresh_session_execution_lease(
469 store,
470 commit,
471 "runtime.plugin_runtime_events",
472 Arc::clone(&self.host.core.clock),
473 )
474 .await
475 .map_err(|err| {
476 PluginOperationInvokeError::Failed(format!(
477 "failed to persist plugin runtime events: {err}"
478 ))
479 })?;
480 self.state.apply_persisted_commit_result(result);
481 }
482 Ok(())
483 }
484
485 async fn persist_plugin_operation_state_if_needed(
486 &mut self,
487 ) -> Result<(), PluginOperationInvokeError> {
488 let Some(store) = self
489 .session
490 .as_ref()
491 .and_then(|session| session.history_store())
492 else {
493 return Ok(());
494 };
495 let commit = crate::store::RuntimeCommit::persisted_state(&self.state, &[]);
496 let result = super::commit_runtime_state_with_fresh_session_execution_lease(
497 store,
498 commit,
499 "runtime.plugin_operation_state",
500 Arc::clone(&self.host.core.clock),
501 )
502 .await
503 .map_err(|err| {
504 PluginOperationInvokeError::Failed(format!(
505 "failed to persist plugin operation state: {err}"
506 ))
507 })?;
508 self.state.apply_persisted_commit_result(result);
509 Ok(())
510 }
511}