1use std::sync::Arc;
7
8use crate::{PluginActionInvokeError, SessionError};
9
10use super::LashRuntime;
11use super::state::{RuntimeSessionState, append_session_nodes_to_state, normalize_session_graph};
12
13struct AppendedHostEvent {
14 source_type: String,
15 #[cfg(test)]
16 node_id: String,
17 invocation: crate::RuntimeInvocation,
18 graph: crate::store::GraphCommitDelta,
19}
20
21struct AppendedActivation {
22 node_id: String,
23 invocation: crate::RuntimeInvocation,
24}
25
26impl LashRuntime {
27 pub fn set_persisted_state(&mut self, state: RuntimeSessionState) -> Result<(), SessionError> {
29 let mut state = state;
30 normalize_session_graph(&mut state);
31 if let Some(session) = self.session.as_ref() {
32 session.invalidate_runtime_caches();
33 if let Some(tool_state) = state.tool_state_snapshot.clone() {
39 session
40 .plugins()
41 .tool_registry()
42 .restore_state(tool_state)
43 .map_err(|err| SessionError::Protocol(err.to_string()))?;
44 }
45 let snapshot = state.plugin_snapshot.clone().unwrap_or_default();
46 session
47 .plugins()
48 .restore(&snapshot)
49 .map_err(|err| SessionError::Protocol(err.to_string()))?;
50 state.plugin_snapshot_revision =
51 Some(session.plugins().snapshot_revision_fingerprint());
52 }
53 self.policy = state.policy.clone();
54 self.protocol_turn_options = state.protocol_turn_options.clone();
55 self.state = state;
56 Ok(())
57 }
58
59 pub async fn append_session_nodes(
60 &mut self,
61 request: crate::AppendSessionNodesRequest,
62 ) -> Result<crate::AppendSessionNodesResult, SessionError> {
63 self.refresh_session_graph_from_store().await?;
64 if let Some(required) = request.requires_ancestor_node_id.as_deref()
65 && !self.state.session_graph.active_path_contains(required)
66 {
67 return Ok(crate::AppendSessionNodesResult::StaleBranch {
68 current_leaf_node_id: self.state.session_graph.leaf_node_id.clone(),
69 });
70 }
71 let node_ids = append_session_nodes_to_state(&mut self.state, &request.nodes);
72 if let Some(session) = self.session.as_mut() {
73 let protocol_session = Arc::clone(session.plugins().protocol_session());
74 let session_id = self.state.session_id.clone();
75 protocol_session
76 .append_session_nodes(
77 crate::plugin::ProtocolSessionContext::new(session, &session_id),
78 &request.nodes,
79 )
80 .await?;
81 }
82 self.stamp_live_plugin_state();
83 if let Some(store) = self
84 .session
85 .as_ref()
86 .and_then(|session| session.history_store())
87 {
88 let graph = crate::store::GraphCommitDelta::Append {
89 nodes: node_ids
90 .iter()
91 .filter_map(|id| self.state.session_graph.find_node(id).cloned())
92 .collect(),
93 leaf_node_id: self.state.session_graph.leaf_node_id.clone(),
94 };
95 let commit = crate::store::RuntimeCommit::persisted_state_with_graph_commit(
96 &self.state,
97 graph,
98 &[],
99 );
100 match store.commit_runtime_state(commit).await {
101 Ok(result) => self.state.apply_persisted_commit_result(result),
102 Err(err) => tracing::warn!("failed to persist runtime state: {err}"),
103 }
104 }
105 Ok(crate::AppendSessionNodesResult::Appended {
106 node_ids,
107 leaf_node_id: self
108 .state
109 .session_graph
110 .leaf_node_id
111 .clone()
112 .unwrap_or_default(),
113 })
114 }
115
116 pub async fn apply_protocol_session_extension(
117 &mut self,
118 extension: crate::ProtocolSessionExtensionHandle,
119 ) -> Result<(), SessionError> {
120 let Some(session) = self.session.as_ref() else {
121 return Err(SessionError::Protocol(
122 "runtime session is not available".to_string(),
123 ));
124 };
125 let protocol_session = Arc::clone(session.plugins().protocol_session());
126 protocol_session.apply_session_extension(extension).await
127 }
128
129 pub async fn validate_protocol_turn_extension(
130 &mut self,
131 extension: &crate::ProtocolTurnExtensionHandle,
132 ) -> Result<(), SessionError> {
133 let Some(session) = self.session.as_ref() else {
134 return Err(SessionError::Protocol(
135 "runtime session is not available".to_string(),
136 ));
137 };
138 let protocol_session = Arc::clone(session.plugins().protocol_session());
139 protocol_session.validate_turn_extension(extension).await
140 }
141
142 pub async fn branch_to_node(
143 &mut self,
144 node_id: Option<String>,
145 ) -> Result<crate::SessionSnapshot, SessionError> {
146 let mut state = self.export_state();
147 state.session_graph.branch_to(node_id);
148 let mut persisted_state = RuntimeSessionState::from_snapshot(state);
149 normalize_session_graph(&mut persisted_state);
150
151 let policy = persisted_state.policy.clone();
152 let host = self.host.clone();
153 let services = self.services.clone();
154 let managed_sessions = Arc::clone(&self.managed_sessions);
155 let managed_turns = Arc::clone(&self.managed_turns);
156 let process_sync_needed = Arc::clone(&self.process_sync_needed);
157 let runtime_scope_id = Arc::clone(&self.runtime_scope_id);
158 let turn_phase_probe = self.turn_phase_probe.clone();
159
160 let mut rebuilt = Self::from_host_state(policy, host, services, persisted_state).await?;
161 rebuilt.managed_sessions = managed_sessions;
162 rebuilt.managed_turns = managed_turns;
163 rebuilt.process_sync_needed = process_sync_needed;
164 rebuilt.runtime_scope_id = runtime_scope_id;
165 rebuilt.turn_phase_probe = turn_phase_probe;
166
167 let exported = rebuilt.export_state();
168 *self = rebuilt;
169 Ok(exported)
170 }
171
172 pub async fn activate_managed_session(&mut self, session_id: &str) -> Result<(), SessionError> {
178 let child = {
179 let mut registry = self.managed_sessions.lock().await;
180 registry.remove(session_id).ok_or_else(|| {
181 SessionError::Protocol(format!("unknown managed session `{session_id}`"))
182 })?
183 };
184 let child = child.try_into_runtime().map_err(|_| {
185 SessionError::Protocol(format!("managed session `{session_id}` is still in use"))
186 })?;
187 *self = child;
188 Ok(())
189 }
190
191 pub async fn snapshot_execution_state(&mut self) -> Result<Option<Vec<u8>>, SessionError> {
193 let Some(session) = self.session.as_mut() else {
194 return Err(SessionError::Protocol(
195 "runtime session not available".to_string(),
196 ));
197 };
198 let code_executor = session
199 .plugins()
200 .code_executor()
201 .ok_or(SessionError::CodeExecutionUnavailable)?;
202 let session_id = self.state.session_id.clone();
203 let blob = code_executor
204 .snapshot_execution_state(crate::plugin::ProtocolSessionContext::new(
205 session,
206 &session_id,
207 ))
208 .await?;
209 self.state.execution_state_snapshot = blob.clone();
210 Ok(blob)
211 }
212
213 pub async fn restore_execution_state(&mut self, snapshot: &[u8]) -> Result<(), SessionError> {
215 let Some(session) = self.session.as_mut() else {
216 return Err(SessionError::Protocol(
217 "runtime session not available".to_string(),
218 ));
219 };
220 let code_executor = session
221 .plugins()
222 .code_executor()
223 .ok_or(SessionError::CodeExecutionUnavailable)?;
224 let session_id = self.state.session_id.clone();
225 code_executor
226 .restore_execution_state(
227 crate::plugin::ProtocolSessionContext::new(session, &session_id),
228 snapshot,
229 )
230 .await?;
231 self.state.execution_state_snapshot = Some(snapshot.to_vec());
232 Ok(())
233 }
234
235 #[cfg(test)]
236 pub(in crate::runtime) async fn emit_host_event(
237 &mut self,
238 resource_type: &str,
239 alias: &str,
240 event: &str,
241 payload: serde_json::Value,
242 ) -> Result<crate::HostEventEmitReport, SessionError> {
243 let effect_host = Arc::clone(&self.host.core.control.effect_host);
244 let source_type = self
245 .validate_and_append_host_event(resource_type, alias, event, &payload)
246 .await?;
247 let scoped_effect_controller = effect_host
248 .scoped(crate::EffectScope::host_event(
249 &self.state.session_id,
250 source_type.node_id.clone(),
251 ))
252 .map_err(|err| SessionError::Protocol(err.to_string()))?;
253 self.activate_host_event_source(
254 &source_type.source_type,
255 payload,
256 scoped_effect_controller,
257 source_type.invocation,
258 )
259 .await
260 }
261
262 #[cfg(test)]
263 async fn validate_and_append_host_event(
264 &mut self,
265 resource_type: &str,
266 alias: &str,
267 event: &str,
268 payload: &serde_json::Value,
269 ) -> Result<AppendedHostEvent, SessionError> {
270 let appended = self
271 .validate_and_append_host_event_to_state(resource_type, alias, event, payload)
272 .await?;
273 if let Some(store) = self
274 .session
275 .as_ref()
276 .and_then(|session| session.history_store())
277 {
278 let commit = crate::store::RuntimeCommit::persisted_state_with_graph_commit(
279 &self.state,
280 appended.graph.clone(),
281 &[],
282 );
283 match store.commit_runtime_state(commit).await {
284 Ok(result) => self.state.apply_persisted_commit_result(result),
285 Err(err) => tracing::warn!("failed to persist runtime state: {err}"),
286 }
287 }
288 Ok(appended)
289 }
290
291 pub(in crate::runtime) async fn emit_host_event_without_persist(
292 &mut self,
293 resource_type: &str,
294 alias: &str,
295 event: &str,
296 payload: serde_json::Value,
297 scoped_effect_controller: crate::ScopedEffectController<'_>,
298 ) -> Result<(crate::HostEventEmitReport, crate::store::GraphCommitDelta), SessionError> {
299 let appended = self
300 .validate_and_append_host_event_to_state(resource_type, alias, event, &payload)
301 .await?;
302 let report = self
303 .activate_host_event_source(
304 &appended.source_type,
305 payload,
306 scoped_effect_controller,
307 appended.invocation.clone(),
308 )
309 .await?;
310 Ok((report, appended.graph))
311 }
312
313 async fn validate_and_append_host_event_to_state(
314 &mut self,
315 resource_type: &str,
316 alias: &str,
317 event: &str,
318 payload: &serde_json::Value,
319 ) -> Result<AppendedHostEvent, SessionError> {
320 self.refresh_session_graph_from_store().await?;
321 {
322 let Some(session) = self.session.as_ref() else {
323 return Err(SessionError::Protocol(
324 "runtime session not available".to_string(),
325 ));
326 };
327 crate::session::triggers::validate_host_event(
328 session.plugins(),
329 resource_type,
330 alias,
331 event,
332 payload,
333 )
334 .map_err(|err| SessionError::Protocol(err.to_string()))?;
335 }
336 let source_type = crate::host_event_source_type(alias, event);
337 let nodes = vec![crate::SessionAppendNode::plugin(
338 "lash.host_event",
339 serde_json::json!({
340 "resource_type": resource_type,
341 "alias": alias,
342 "event": event,
343 "source_type": source_type.clone(),
344 "payload": payload.clone(),
345 }),
346 )];
347 let node_ids = append_session_nodes_to_state(&mut self.state, &nodes);
348 if let Some(session) = self.session.as_mut() {
349 let protocol_session = Arc::clone(session.plugins().protocol_session());
350 let session_id = self.state.session_id.clone();
351 protocol_session
352 .append_session_nodes(
353 crate::plugin::ProtocolSessionContext::new(session, &session_id),
354 &nodes,
355 )
356 .await?;
357 }
358 self.stamp_live_plugin_state();
359 let host_event_node_id = node_ids.into_iter().next().unwrap_or_default();
360 let graph = crate::store::GraphCommitDelta::Append {
361 nodes: self
362 .state
363 .session_graph
364 .find_node(&host_event_node_id)
365 .cloned()
366 .into_iter()
367 .collect(),
368 leaf_node_id: self.state.session_graph.leaf_node_id.clone(),
369 };
370 let host_event_invocation = crate::runtime::causal::session_node_invocation(
371 &self.state.session_id,
372 host_event_node_id.clone(),
373 );
374 Ok(AppendedHostEvent {
375 source_type,
376 #[cfg(test)]
377 node_id: host_event_node_id.clone(),
378 invocation: host_event_invocation,
379 graph,
380 })
381 }
382
383 async fn activate_host_event_source(
384 &mut self,
385 source_type: &str,
386 payload: serde_json::Value,
387 scoped_effect_controller: crate::ScopedEffectController<'_>,
388 host_event_invocation: crate::RuntimeInvocation,
389 ) -> Result<crate::HostEventEmitReport, SessionError> {
390 let Some(session) = self.session.as_ref() else {
391 return Err(SessionError::Protocol(
392 "runtime session not available".to_string(),
393 ));
394 };
395 let manager = self
396 .runtime_session_services()
397 .map_err(|err| SessionError::Protocol(err.to_string()))?;
398 let activation = session
399 .plugins()
400 .trigger_activation_service(manager.process_service(), scoped_effect_controller);
401 let started_process_ids = activation
402 .activate_source_type(source_type, payload, Some(host_event_invocation))
403 .await
404 .map_err(|err| SessionError::Protocol(err.to_string()))?;
405 Ok(crate::HostEventEmitReport {
406 started_process_ids,
407 })
408 }
409
410 pub async fn activate_lashlang_trigger(
411 &mut self,
412 handle: &str,
413 payload: serde_json::Value,
414 ) -> Result<crate::HostEventEmitReport, SessionError> {
415 let effect_host = Arc::clone(&self.host.core.control.effect_host);
416 let appended = self
417 .append_lashlang_trigger_activation(handle, &payload)
418 .await?;
419 let scoped_effect_controller = effect_host
420 .scoped(crate::EffectScope::host_event(
421 &self.state.session_id,
422 appended.node_id.clone(),
423 ))
424 .map_err(|err| SessionError::Protocol(err.to_string()))?;
425 self.activate_lashlang_trigger_from_scope(
426 handle,
427 payload,
428 scoped_effect_controller,
429 appended.invocation,
430 )
431 .await
432 }
433
434 pub async fn activate_lashlang_trigger_with_effect_scope(
435 &mut self,
436 handle: &str,
437 payload: serde_json::Value,
438 scoped_effect_controller: crate::ScopedEffectController<'_>,
439 ) -> Result<crate::HostEventEmitReport, SessionError> {
440 let appended = self
441 .append_lashlang_trigger_activation(handle, &payload)
442 .await?;
443 self.activate_lashlang_trigger_from_scope(
444 handle,
445 payload,
446 scoped_effect_controller,
447 appended.invocation,
448 )
449 .await
450 }
451
452 async fn append_lashlang_trigger_activation(
453 &mut self,
454 handle: &str,
455 payload: &serde_json::Value,
456 ) -> Result<AppendedActivation, SessionError> {
457 let append = self
458 .append_session_nodes(crate::AppendSessionNodesRequest {
459 nodes: vec![crate::SessionAppendNode::plugin(
460 "lash.trigger_activation",
461 serde_json::json!({
462 "handle": handle,
463 "payload": payload.clone(),
464 }),
465 )],
466 requires_ancestor_node_id: None,
467 })
468 .await?;
469 let activation_node_id = match append {
470 crate::AppendSessionNodesResult::Appended { node_ids, .. } => {
471 node_ids.into_iter().next().unwrap_or_default()
472 }
473 crate::AppendSessionNodesResult::StaleBranch {
474 current_leaf_node_id,
475 } => {
476 return Err(SessionError::Protocol(format!(
477 "trigger activation append targeted a stale session branch at {:?}",
478 current_leaf_node_id
479 )));
480 }
481 };
482 let activation_invocation = crate::runtime::causal::session_node_invocation(
483 &self.state.session_id,
484 activation_node_id.clone(),
485 );
486 Ok(AppendedActivation {
487 node_id: activation_node_id,
488 invocation: activation_invocation,
489 })
490 }
491
492 async fn activate_lashlang_trigger_from_scope(
493 &mut self,
494 handle: &str,
495 payload: serde_json::Value,
496 scoped_effect_controller: crate::ScopedEffectController<'_>,
497 activation_invocation: crate::RuntimeInvocation,
498 ) -> Result<crate::HostEventEmitReport, SessionError> {
499 let Some(session) = self.session.as_ref() else {
500 return Err(SessionError::Protocol(
501 "runtime session not available".to_string(),
502 ));
503 };
504 let manager = self
505 .runtime_session_services()
506 .map_err(|err| SessionError::Protocol(err.to_string()))?;
507 let activation = session
508 .plugins()
509 .trigger_activation_service(manager.process_service(), scoped_effect_controller);
510 let started = activation
511 .activate(handle, payload, Some(activation_invocation))
512 .await
513 .map_err(|err| SessionError::Protocol(err.to_string()))?;
514 Ok(crate::HostEventEmitReport {
515 started_process_ids: started.into_iter().collect(),
516 })
517 }
518
519 pub async fn activate_lashlang_trigger_source_type(
520 &mut self,
521 source_type: impl AsRef<str>,
522 payload: serde_json::Value,
523 ) -> Result<crate::HostEventEmitReport, SessionError> {
524 let effect_host = Arc::clone(&self.host.core.control.effect_host);
525 let source_type = source_type.as_ref().to_string();
526 let appended = self
527 .append_lashlang_trigger_source_activation(&source_type, &payload)
528 .await?;
529 let scoped_effect_controller = effect_host
530 .scoped(crate::EffectScope::host_event(
531 &self.state.session_id,
532 appended.node_id.clone(),
533 ))
534 .map_err(|err| SessionError::Protocol(err.to_string()))?;
535 self.activate_lashlang_trigger_source_from_scope(
536 &source_type,
537 payload,
538 scoped_effect_controller,
539 appended.invocation,
540 )
541 .await
542 }
543
544 pub async fn activate_lashlang_trigger_source_type_with_effect_scope(
545 &mut self,
546 source_type: impl AsRef<str>,
547 payload: serde_json::Value,
548 scoped_effect_controller: crate::ScopedEffectController<'_>,
549 ) -> Result<crate::HostEventEmitReport, SessionError> {
550 let source_type = source_type.as_ref().to_string();
551 let appended = self
552 .append_lashlang_trigger_source_activation(&source_type, &payload)
553 .await?;
554 self.activate_lashlang_trigger_source_from_scope(
555 &source_type,
556 payload,
557 scoped_effect_controller,
558 appended.invocation,
559 )
560 .await
561 }
562
563 async fn append_lashlang_trigger_source_activation(
564 &mut self,
565 source_type: &str,
566 payload: &serde_json::Value,
567 ) -> Result<AppendedActivation, SessionError> {
568 let append = self
569 .append_session_nodes(crate::AppendSessionNodesRequest {
570 nodes: vec![crate::SessionAppendNode::plugin(
571 "lash.trigger_source_activation",
572 serde_json::json!({
573 "source_type": source_type,
574 "payload": payload.clone(),
575 }),
576 )],
577 requires_ancestor_node_id: None,
578 })
579 .await?;
580 let activation_node_id = match append {
581 crate::AppendSessionNodesResult::Appended { node_ids, .. } => {
582 node_ids.into_iter().next().unwrap_or_default()
583 }
584 crate::AppendSessionNodesResult::StaleBranch {
585 current_leaf_node_id,
586 } => {
587 return Err(SessionError::Protocol(format!(
588 "trigger source activation append targeted a stale session branch at {:?}",
589 current_leaf_node_id
590 )));
591 }
592 };
593 let activation_invocation = crate::runtime::causal::session_node_invocation(
594 &self.state.session_id,
595 activation_node_id.clone(),
596 );
597 Ok(AppendedActivation {
598 node_id: activation_node_id,
599 invocation: activation_invocation,
600 })
601 }
602
603 async fn activate_lashlang_trigger_source_from_scope(
604 &mut self,
605 source_type: &str,
606 payload: serde_json::Value,
607 scoped_effect_controller: crate::ScopedEffectController<'_>,
608 activation_invocation: crate::RuntimeInvocation,
609 ) -> Result<crate::HostEventEmitReport, SessionError> {
610 let Some(session) = self.session.as_ref() else {
611 return Err(SessionError::Protocol(
612 "runtime session not available".to_string(),
613 ));
614 };
615 let manager = self
616 .runtime_session_services()
617 .map_err(|err| SessionError::Protocol(err.to_string()))?;
618 let activation = session
619 .plugins()
620 .trigger_activation_service(manager.process_service(), scoped_effect_controller);
621 let started_process_ids = activation
622 .activate_source_type(source_type, payload, Some(activation_invocation))
623 .await
624 .map_err(|err| SessionError::Protocol(err.to_string()))?;
625 Ok(crate::HostEventEmitReport {
626 started_process_ids,
627 })
628 }
629
630 pub fn list_lashlang_trigger_registrations(
631 &self,
632 ) -> Result<Vec<crate::TriggerRegistration>, SessionError> {
633 let Some(session) = self.session.as_ref() else {
634 return Err(SessionError::Protocol(
635 "runtime session not available".to_string(),
636 ));
637 };
638 session
639 .plugins()
640 .list_all_lashlang_triggers()
641 .map_err(|err| SessionError::Protocol(err.to_string()))
642 }
643
644 pub fn lashlang_trigger_registrations_by_source_type(
645 &self,
646 source_type: impl Into<crate::TriggerSourceType>,
647 ) -> Result<Vec<crate::TriggerRegistration>, SessionError> {
648 let Some(session) = self.session.as_ref() else {
649 return Err(SessionError::Protocol(
650 "runtime session not available".to_string(),
651 ));
652 };
653 session
654 .plugins()
655 .lashlang_trigger_registrations_by_source_type(source_type.into())
656 .map_err(|err| SessionError::Protocol(err.to_string()))
657 }
658
659 pub async fn invoke_plugin_action(
660 &self,
661 name: &str,
662 args: serde_json::Value,
663 session_id: Option<String>,
664 ) -> Result<crate::ToolResult, PluginActionInvokeError> {
665 let manager = self.runtime_session_services()?;
666 let Some(session) = self.session.as_ref() else {
667 return Err(PluginActionInvokeError::Unknown(name.to_string()));
668 };
669 session
670 .plugins()
671 .invoke_plugin_action(
672 name,
673 args,
674 session_id,
675 true,
676 manager.state_service(),
677 manager.lifecycle_service(),
678 manager.graph_service(),
679 manager.process_service(),
680 )
681 .await
682 }
683}