1use std::sync::Arc;
7
8use crate::{PluginOperationInvokeError, SessionError};
9
10use super::LashRuntime;
11use super::state::{
12 RuntimeSessionState, append_session_nodes_to_state_with_clock, normalize_session_graph,
13};
14
15impl LashRuntime {
16 pub fn set_persisted_state(&mut self, state: RuntimeSessionState) -> Result<(), SessionError> {
18 let mut state = state;
19 normalize_session_graph(&mut state);
20 if let Some(session) = self.session.as_ref() {
21 session.invalidate_runtime_caches();
22 if let Some(tool_state) = state.tool_state_snapshot.clone() {
28 let report = session
29 .plugins()
30 .tool_registry()
31 .restore_state(tool_state)
32 .map_err(|err| SessionError::Protocol(err.to_string()))?;
33 if !report.orphaned.is_empty() {
34 tracing::warn!(
35 session_id = %state.session_id,
36 orphaned = ?report.orphaned,
37 "persisted state installed with orphaned tools: no registered \
38 source resolves them; they are Off until their source returns"
39 );
40 }
41 }
42 let snapshot = state.plugin_snapshot.clone().unwrap_or_default();
43 session
44 .plugins()
45 .restore(&snapshot)
46 .map_err(|err| SessionError::Protocol(err.to_string()))?;
47 state.plugin_snapshot_revision =
48 Some(session.plugins().snapshot_revision_fingerprint());
49 }
50 self.policy = state.policy.clone();
51 self.protocol_turn_options = state.protocol_turn_options.clone();
52 self.state = state;
53 Ok(())
54 }
55
56 pub async fn append_session_nodes(
57 &mut self,
58 request: crate::AppendSessionNodesRequest,
59 ) -> Result<crate::AppendSessionNodesResult, SessionError> {
60 self.refresh_session_graph_from_store().await?;
61 if let Some(required) = request.requires_ancestor_node_id.as_deref()
62 && !self.state.session_graph.active_path_contains(required)
63 {
64 return Ok(crate::AppendSessionNodesResult::StaleBranch {
65 current_leaf_node_id: self.state.session_graph.leaf_node_id.clone(),
66 });
67 }
68 let node_ids = append_session_nodes_to_state_with_clock(
69 &mut self.state,
70 &request.nodes,
71 self.host.core.clock.as_ref(),
72 );
73 if let Some(session) = self.session.as_mut() {
74 let protocol_session = Arc::clone(session.plugins().protocol_session());
75 let session_id = self.state.session_id.clone();
76 protocol_session
77 .append_session_nodes(
78 crate::plugin::ProtocolSessionContext::new(session, &session_id),
79 &request.nodes,
80 )
81 .await?;
82 }
83 self.stamp_live_plugin_state();
84 if let Some(store) = self
85 .session
86 .as_ref()
87 .and_then(|session| session.history_store())
88 {
89 let graph = crate::store::GraphCommitDelta::Append {
90 nodes: node_ids
91 .iter()
92 .filter_map(|id| self.state.session_graph.find_node(id).cloned())
93 .collect(),
94 leaf_node_id: self.state.session_graph.leaf_node_id.clone(),
95 };
96 let commit = crate::store::RuntimeCommit::persisted_state_with_graph_commit(
97 &self.state,
98 graph,
99 &[],
100 );
101 match store.commit_runtime_state(commit).await {
102 Ok(result) => self.state.apply_persisted_commit_result(result),
103 Err(err) => tracing::warn!("failed to persist runtime state: {err}"),
104 }
105 }
106 Ok(crate::AppendSessionNodesResult::Appended {
107 node_ids,
108 leaf_node_id: self
109 .state
110 .session_graph
111 .leaf_node_id
112 .clone()
113 .unwrap_or_default(),
114 })
115 }
116
117 pub async fn apply_protocol_session_extension(
118 &mut self,
119 extension: crate::ProtocolSessionExtensionHandle,
120 ) -> Result<(), SessionError> {
121 let Some(session) = self.session.as_ref() else {
122 return Err(SessionError::Protocol(
123 "runtime session is not available".to_string(),
124 ));
125 };
126 let protocol_session = Arc::clone(session.plugins().protocol_session());
127 protocol_session.apply_session_extension(extension).await
128 }
129
130 pub async fn validate_protocol_turn_extension(
131 &mut self,
132 extension: &crate::ProtocolTurnExtensionHandle,
133 ) -> Result<(), SessionError> {
134 let Some(session) = self.session.as_ref() else {
135 return Err(SessionError::Protocol(
136 "runtime session is not available".to_string(),
137 ));
138 };
139 let protocol_session = Arc::clone(session.plugins().protocol_session());
140 protocol_session.validate_turn_extension(extension).await
141 }
142
143 pub async fn branch_to_node(
144 &mut self,
145 node_id: Option<String>,
146 ) -> Result<crate::SessionSnapshot, SessionError> {
147 let mut state = self.export_state();
148 state.session_graph.branch_to(node_id);
149 let mut persisted_state = RuntimeSessionState::from_snapshot(state);
150 normalize_session_graph(&mut persisted_state);
151
152 let policy = persisted_state.policy.clone();
153 let host = self.host.clone();
154 let services = self.services.clone();
155 let managed_sessions = Arc::clone(&self.managed_sessions);
156 let managed_turns = Arc::clone(&self.managed_turns);
157 let process_sync_needed = Arc::clone(&self.process_sync_needed);
158 let runtime_scope_id = Arc::clone(&self.runtime_scope_id);
159 let turn_phase_probe = self.turn_phase_probe.clone();
160
161 let mut rebuilt = Self::from_host_state(policy, host, services, persisted_state).await?;
162 rebuilt.managed_sessions = managed_sessions;
163 rebuilt.managed_turns = managed_turns;
164 rebuilt.process_sync_needed = process_sync_needed;
165 rebuilt.runtime_scope_id = runtime_scope_id;
166 rebuilt.turn_phase_probe = turn_phase_probe;
167
168 let exported = rebuilt.export_state();
169 *self = rebuilt;
170 Ok(exported)
171 }
172
173 pub async fn activate_managed_session(&mut self, session_id: &str) -> Result<(), SessionError> {
179 let child = {
180 let mut registry = self.managed_sessions.lock().await;
181 registry.remove(session_id).ok_or_else(|| {
182 SessionError::Protocol(format!("unknown managed session `{session_id}`"))
183 })?
184 };
185 let child = child.try_into_runtime().map_err(|_| {
186 SessionError::Protocol(format!("managed session `{session_id}` is still in use"))
187 })?;
188 *self = child;
189 Ok(())
190 }
191
192 pub async fn snapshot_execution_state(&mut self) -> Result<Option<Vec<u8>>, SessionError> {
194 let Some(session) = self.session.as_mut() else {
195 return Err(SessionError::Protocol(
196 "runtime session not available".to_string(),
197 ));
198 };
199 let code_executor = session
200 .plugins()
201 .code_executor()
202 .ok_or(SessionError::CodeExecutionUnavailable)?;
203 let session_id = self.state.session_id.clone();
204 let blob = code_executor
205 .snapshot_execution_state(crate::plugin::ProtocolSessionContext::new(
206 session,
207 &session_id,
208 ))
209 .await?;
210 self.state.execution_state_snapshot = blob.clone();
211 Ok(blob)
212 }
213
214 pub async fn restore_execution_state(&mut self, snapshot: &[u8]) -> Result<(), SessionError> {
216 let Some(session) = self.session.as_mut() else {
217 return Err(SessionError::Protocol(
218 "runtime session not available".to_string(),
219 ));
220 };
221 let code_executor = session
222 .plugins()
223 .code_executor()
224 .ok_or(SessionError::CodeExecutionUnavailable)?;
225 let session_id = self.state.session_id.clone();
226 code_executor
227 .restore_execution_state(
228 crate::plugin::ProtocolSessionContext::new(session, &session_id),
229 snapshot,
230 )
231 .await?;
232 self.state.execution_state_snapshot = Some(snapshot.to_vec());
233 Ok(())
234 }
235
236 pub async fn list_trigger_registrations(
237 &self,
238 ) -> Result<Vec<crate::TriggerRegistration>, SessionError> {
239 let store = self.host.trigger_store.as_ref().ok_or_else(|| {
240 SessionError::Protocol("trigger store is unavailable in this runtime".to_string())
241 })?;
242 let records = store
243 .list_subscriptions(crate::TriggerSubscriptionFilter::for_session(
244 self.state.session_id.clone(),
245 ))
246 .await
247 .map_err(|err| SessionError::Protocol(err.to_string()))?;
248 Ok(records
249 .iter()
250 .map(crate::TriggerRegistration::from)
251 .collect())
252 }
253
254 pub async fn trigger_registrations_by_source_type(
255 &self,
256 source_type: impl Into<crate::TriggerEventType>,
257 ) -> Result<Vec<crate::TriggerRegistration>, SessionError> {
258 let store = self.host.trigger_store.as_ref().ok_or_else(|| {
259 SessionError::Protocol("trigger store is unavailable in this runtime".to_string())
260 })?;
261 let mut filter =
262 crate::TriggerSubscriptionFilter::for_session(self.state.session_id.clone());
263 filter.source_type = Some(source_type.into().to_string());
264 let records = store
265 .list_subscriptions(filter)
266 .await
267 .map_err(|err| SessionError::Protocol(err.to_string()))?;
268 Ok(records
269 .iter()
270 .map(crate::TriggerRegistration::from)
271 .collect())
272 }
273
274 pub async fn query_plugin(
275 &self,
276 name: &str,
277 args: serde_json::Value,
278 session_id: Option<String>,
279 ) -> Result<(String, serde_json::Value), PluginOperationInvokeError> {
280 let manager = self.runtime_session_services()?;
281 let Some(session) = self.session.as_ref() else {
282 return Err(PluginOperationInvokeError::Unknown(
283 "runtime session not available".to_string(),
284 ));
285 };
286 session
287 .plugins()
288 .query_plugin(
289 name,
290 args,
291 session_id,
292 true,
293 manager.read_service(),
294 manager.process_read_service(),
295 )
296 .await
297 }
298
299 pub async fn run_plugin_command(
300 &mut self,
301 name: &str,
302 args: serde_json::Value,
303 session_id: Option<String>,
304 ) -> Result<crate::PluginCommandReceipt<serde_json::Value>, PluginOperationInvokeError> {
305 let manager = self.runtime_session_services()?;
306 let Some(session) = self.session.as_ref() else {
307 return Err(PluginOperationInvokeError::Unknown(
308 "runtime session not available".to_string(),
309 ));
310 };
311 let (plugin_id, outcome) = session
312 .plugins()
313 .run_plugin_command(
314 name,
315 args,
316 session_id,
317 true,
318 manager.state_service(),
319 manager.lifecycle_service(),
320 manager.graph_service(),
321 manager.process_service(),
322 )
323 .await?;
324 let (events, queued_batches) = self
325 .apply_plugin_operation_effects(&plugin_id, outcome.events, outcome.directives)
326 .await?;
327 Ok(crate::PluginCommandReceipt {
328 output: outcome.output,
329 events,
330 queued_batches,
331 })
332 }
333
334 pub async fn run_plugin_task(
335 &mut self,
336 name: &str,
337 args: serde_json::Value,
338 session_id: Option<String>,
339 scoped_effect_controller: crate::ScopedEffectController<'static>,
340 cancellation_token: tokio_util::sync::CancellationToken,
341 ) -> Result<crate::PluginTaskReceipt<serde_json::Value>, PluginOperationInvokeError> {
342 let manager = self.runtime_session_services()?;
343 let Some(session) = self.session.as_ref() else {
344 return Err(PluginOperationInvokeError::Unknown(
345 "runtime session not available".to_string(),
346 ));
347 };
348 let (plugin_id, outcome) = session
349 .plugins()
350 .run_plugin_task(
351 name,
352 args,
353 session_id,
354 true,
355 manager.state_service(),
356 manager.lifecycle_service(),
357 manager.graph_service(),
358 manager.process_service(),
359 scoped_effect_controller,
360 cancellation_token,
361 )
362 .await?;
363 let (events, queued_batches) = self
364 .apply_plugin_operation_effects(&plugin_id, outcome.events, outcome.directives)
365 .await?;
366 Ok(crate::PluginTaskReceipt {
367 output: outcome.output,
368 events,
369 queued_batches,
370 })
371 }
372
373 async fn apply_plugin_operation_effects(
374 &mut self,
375 plugin_id: &str,
376 events: Vec<crate::PluginRuntimeEvent>,
377 directives: Vec<crate::PluginRuntimeDirective>,
378 ) -> Result<
379 (
380 Vec<crate::PluginOwned<crate::PluginRuntimeEvent>>,
381 Vec<crate::runtime::QueuedWorkBatch>,
382 ),
383 PluginOperationInvokeError,
384 > {
385 let owned_events = events
386 .into_iter()
387 .map(|event| crate::PluginOwned {
388 plugin_id: plugin_id.to_string(),
389 value: event,
390 })
391 .collect::<Vec<_>>();
392 if !owned_events.is_empty() {
393 let nodes = owned_events
394 .iter()
395 .map(|owned| {
396 crate::plugin_runtime_protocol_event(&owned.plugin_id, owned.value.clone())
397 .map(crate::SessionAppendNode::protocol_event)
398 .map_err(|err| {
399 PluginOperationInvokeError::Failed(format!(
400 "failed to encode plugin runtime event: {err}"
401 ))
402 })
403 })
404 .collect::<Result<Vec<_>, _>>()?;
405 self.append_plugin_runtime_event_nodes(&nodes).await?;
406 }
407 self.stamp_live_plugin_state();
408 self.persist_plugin_operation_state_if_needed().await?;
409
410 let mut queued_batches = Vec::new();
411 for directive in directives {
412 match directive {
413 crate::PluginRuntimeDirective::QueueTurn {
414 input,
415 delivery_policy,
416 slot_policy,
417 source_key,
418 } => {
419 let batch = self
420 .enqueue_turn_input(input, delivery_policy, slot_policy, source_key)
421 .await
422 .map_err(|err| {
423 PluginOperationInvokeError::Failed(format!(
424 "failed to queue plugin turn request: {err}"
425 ))
426 })?;
427 queued_batches.push(batch);
428 }
429 }
430 }
431
432 Ok((owned_events, queued_batches))
433 }
434
435 async fn append_plugin_runtime_event_nodes(
436 &mut self,
437 nodes: &[crate::SessionAppendNode],
438 ) -> Result<(), PluginOperationInvokeError> {
439 let node_ids = append_session_nodes_to_state_with_clock(
440 &mut self.state,
441 nodes,
442 self.host.core.clock.as_ref(),
443 );
444 if let Some(store) = self
445 .session
446 .as_ref()
447 .and_then(|session| session.history_store())
448 {
449 let graph = crate::store::GraphCommitDelta::Append {
450 nodes: node_ids
451 .iter()
452 .filter_map(|id| self.state.session_graph.find_node(id).cloned())
453 .collect(),
454 leaf_node_id: self.state.session_graph.leaf_node_id.clone(),
455 };
456 let commit = crate::store::RuntimeCommit::persisted_state_with_graph_commit(
457 &self.state,
458 graph,
459 &[],
460 );
461 let result = store.commit_runtime_state(commit).await.map_err(|err| {
462 PluginOperationInvokeError::Failed(format!(
463 "failed to persist plugin runtime events: {err}"
464 ))
465 })?;
466 self.state.apply_persisted_commit_result(result);
467 }
468 Ok(())
469 }
470
471 async fn persist_plugin_operation_state_if_needed(
472 &mut self,
473 ) -> Result<(), PluginOperationInvokeError> {
474 let Some(store) = self
475 .session
476 .as_ref()
477 .and_then(|session| session.history_store())
478 else {
479 return Ok(());
480 };
481 let commit = crate::store::RuntimeCommit::persisted_state(&self.state, &[]);
482 let result = store.commit_runtime_state(commit).await.map_err(|err| {
483 PluginOperationInvokeError::Failed(format!(
484 "failed to persist plugin operation state: {err}"
485 ))
486 })?;
487 self.state.apply_persisted_commit_result(result);
488 Ok(())
489 }
490}