Skip to main content

lash_core/runtime/
session_ops.rs

1//! `LashRuntime` session-graph and execution-state operations.
2//!
3//! Extracted from `runtime/mod.rs`. This file re-opens `impl LashRuntime`;
4//! no types live here and no public API is changed.
5
6use std::sync::Arc;
7
8use crate::{PluginActionInvokeError, SessionError};
9
10use super::LashRuntime;
11use super::state::{RuntimeSessionState, append_session_nodes_to_state, normalize_session_graph};
12
13struct AppendedHostEvent {
14    source_type: String,
15    #[cfg(test)]
16    node_id: String,
17    invocation: crate::RuntimeInvocation,
18    graph: crate::store::GraphCommitDelta,
19}
20
21struct AppendedActivation {
22    node_id: String,
23    invocation: crate::RuntimeInvocation,
24}
25
26impl LashRuntime {
27    /// Replace the host-owned state envelope.
28    pub fn set_persisted_state(&mut self, state: RuntimeSessionState) -> Result<(), SessionError> {
29        let mut state = state;
30        normalize_session_graph(&mut state);
31        if let Some(session) = self.session.as_ref() {
32            session.invalidate_runtime_caches();
33            // Restore the persisted tool surface so the live registry matches the
34            // state being installed (mirrors `from_host_state`). Without this the
35            // registry keeps its prior generation/tools and silently diverges from
36            // `state`. `restore_state` adopts the snapshot's generation, so a
37            // surface that reached generation >= 2 restores cleanly.
38            if let Some(tool_state) = state.tool_state_snapshot.clone() {
39                session
40                    .plugins()
41                    .tool_registry()
42                    .restore_state(tool_state)
43                    .map_err(|err| SessionError::Protocol(err.to_string()))?;
44            }
45            let snapshot = state.plugin_snapshot.clone().unwrap_or_default();
46            session
47                .plugins()
48                .restore(&snapshot)
49                .map_err(|err| SessionError::Protocol(err.to_string()))?;
50            state.plugin_snapshot_revision =
51                Some(session.plugins().snapshot_revision_fingerprint());
52        }
53        self.policy = state.policy.clone();
54        self.protocol_turn_options = state.protocol_turn_options.clone();
55        self.state = state;
56        Ok(())
57    }
58
59    pub async fn append_session_nodes(
60        &mut self,
61        request: crate::AppendSessionNodesRequest,
62    ) -> Result<crate::AppendSessionNodesResult, SessionError> {
63        self.refresh_session_graph_from_store().await?;
64        if let Some(required) = request.requires_ancestor_node_id.as_deref()
65            && !self.state.session_graph.active_path_contains(required)
66        {
67            return Ok(crate::AppendSessionNodesResult::StaleBranch {
68                current_leaf_node_id: self.state.session_graph.leaf_node_id.clone(),
69            });
70        }
71        let node_ids = append_session_nodes_to_state(&mut self.state, &request.nodes);
72        if let Some(session) = self.session.as_mut() {
73            let protocol_session = Arc::clone(session.plugins().protocol_session());
74            let session_id = self.state.session_id.clone();
75            protocol_session
76                .append_session_nodes(
77                    crate::plugin::ProtocolSessionContext::new(session, &session_id),
78                    &request.nodes,
79                )
80                .await?;
81        }
82        self.stamp_live_plugin_state();
83        if let Some(store) = self
84            .session
85            .as_ref()
86            .and_then(|session| session.history_store())
87        {
88            let graph = crate::store::GraphCommitDelta::Append {
89                nodes: node_ids
90                    .iter()
91                    .filter_map(|id| self.state.session_graph.find_node(id).cloned())
92                    .collect(),
93                leaf_node_id: self.state.session_graph.leaf_node_id.clone(),
94            };
95            let commit = crate::store::RuntimeCommit::persisted_state_with_graph_commit(
96                &self.state,
97                graph,
98                &[],
99            );
100            match store.commit_runtime_state(commit).await {
101                Ok(result) => self.state.apply_persisted_commit_result(result),
102                Err(err) => tracing::warn!("failed to persist runtime state: {err}"),
103            }
104        }
105        Ok(crate::AppendSessionNodesResult::Appended {
106            node_ids,
107            leaf_node_id: self
108                .state
109                .session_graph
110                .leaf_node_id
111                .clone()
112                .unwrap_or_default(),
113        })
114    }
115
116    pub async fn apply_protocol_session_extension(
117        &mut self,
118        extension: crate::ProtocolSessionExtensionHandle,
119    ) -> Result<(), SessionError> {
120        let Some(session) = self.session.as_ref() else {
121            return Err(SessionError::Protocol(
122                "runtime session is not available".to_string(),
123            ));
124        };
125        let protocol_session = Arc::clone(session.plugins().protocol_session());
126        protocol_session.apply_session_extension(extension).await
127    }
128
129    pub async fn validate_protocol_turn_extension(
130        &mut self,
131        extension: &crate::ProtocolTurnExtensionHandle,
132    ) -> Result<(), SessionError> {
133        let Some(session) = self.session.as_ref() else {
134            return Err(SessionError::Protocol(
135                "runtime session is not available".to_string(),
136            ));
137        };
138        let protocol_session = Arc::clone(session.plugins().protocol_session());
139        protocol_session.validate_turn_extension(extension).await
140    }
141
142    pub async fn branch_to_node(
143        &mut self,
144        node_id: Option<String>,
145    ) -> Result<crate::SessionSnapshot, SessionError> {
146        let mut state = self.export_state();
147        state.session_graph.branch_to(node_id);
148        let mut persisted_state = RuntimeSessionState::from_snapshot(state);
149        normalize_session_graph(&mut persisted_state);
150
151        let policy = persisted_state.policy.clone();
152        let host = self.host.clone();
153        let services = self.services.clone();
154        let managed_sessions = Arc::clone(&self.managed_sessions);
155        let managed_turns = Arc::clone(&self.managed_turns);
156        let process_sync_needed = Arc::clone(&self.process_sync_needed);
157        let runtime_scope_id = Arc::clone(&self.runtime_scope_id);
158        let turn_phase_probe = self.turn_phase_probe.clone();
159
160        let mut rebuilt = Self::from_host_state(policy, host, services, persisted_state).await?;
161        rebuilt.managed_sessions = managed_sessions;
162        rebuilt.managed_turns = managed_turns;
163        rebuilt.process_sync_needed = process_sync_needed;
164        rebuilt.runtime_scope_id = runtime_scope_id;
165        rebuilt.turn_phase_probe = turn_phase_probe;
166
167        let exported = rebuilt.export_state();
168        *self = rebuilt;
169        Ok(exported)
170    }
171
172    /// Promote a managed child session into the foreground runtime.
173    ///
174    /// Child sessions created through `SessionLifecycleService::create_session` are real
175    /// runtimes, not serialized placeholders. Foreground activation must therefore
176    /// claim that runtime instead of reconstructing a new empty state in the UI.
177    pub async fn activate_managed_session(&mut self, session_id: &str) -> Result<(), SessionError> {
178        let child = {
179            let mut registry = self.managed_sessions.lock().await;
180            registry.remove(session_id).ok_or_else(|| {
181                SessionError::Protocol(format!("unknown managed session `{session_id}`"))
182            })?
183        };
184        let child = child.try_into_runtime().map_err(|_| {
185            SessionError::Protocol(format!("managed session `{session_id}` is still in use"))
186        })?;
187        *self = child;
188        Ok(())
189    }
190
191    /// Explicitly snapshot protocol-local execution state, if any.
192    pub async fn snapshot_execution_state(&mut self) -> Result<Option<Vec<u8>>, SessionError> {
193        let Some(session) = self.session.as_mut() else {
194            return Err(SessionError::Protocol(
195                "runtime session not available".to_string(),
196            ));
197        };
198        let code_executor = session
199            .plugins()
200            .code_executor()
201            .ok_or(SessionError::CodeExecutionUnavailable)?;
202        let session_id = self.state.session_id.clone();
203        let blob = code_executor
204            .snapshot_execution_state(crate::plugin::ProtocolSessionContext::new(
205                session,
206                &session_id,
207            ))
208            .await?;
209        self.state.execution_state_snapshot = blob.clone();
210        Ok(blob)
211    }
212
213    /// Explicitly restore protocol-local execution state from an opaque snapshot blob.
214    pub async fn restore_execution_state(&mut self, snapshot: &[u8]) -> Result<(), SessionError> {
215        let Some(session) = self.session.as_mut() else {
216            return Err(SessionError::Protocol(
217                "runtime session not available".to_string(),
218            ));
219        };
220        let code_executor = session
221            .plugins()
222            .code_executor()
223            .ok_or(SessionError::CodeExecutionUnavailable)?;
224        let session_id = self.state.session_id.clone();
225        code_executor
226            .restore_execution_state(
227                crate::plugin::ProtocolSessionContext::new(session, &session_id),
228                snapshot,
229            )
230            .await?;
231        self.state.execution_state_snapshot = Some(snapshot.to_vec());
232        Ok(())
233    }
234
235    #[cfg(test)]
236    pub(in crate::runtime) async fn emit_host_event(
237        &mut self,
238        resource_type: &str,
239        alias: &str,
240        event: &str,
241        payload: serde_json::Value,
242    ) -> Result<crate::HostEventEmitReport, SessionError> {
243        let effect_host = Arc::clone(&self.host.core.control.effect_host);
244        let source_type = self
245            .validate_and_append_host_event(resource_type, alias, event, &payload)
246            .await?;
247        let scoped_effect_controller = effect_host
248            .scoped(crate::EffectScope::host_event(
249                &self.state.session_id,
250                source_type.node_id.clone(),
251            ))
252            .map_err(|err| SessionError::Protocol(err.to_string()))?;
253        self.activate_host_event_source(
254            &source_type.source_type,
255            payload,
256            scoped_effect_controller,
257            source_type.invocation,
258        )
259        .await
260    }
261
262    #[cfg(test)]
263    async fn validate_and_append_host_event(
264        &mut self,
265        resource_type: &str,
266        alias: &str,
267        event: &str,
268        payload: &serde_json::Value,
269    ) -> Result<AppendedHostEvent, SessionError> {
270        let appended = self
271            .validate_and_append_host_event_to_state(resource_type, alias, event, payload)
272            .await?;
273        if let Some(store) = self
274            .session
275            .as_ref()
276            .and_then(|session| session.history_store())
277        {
278            let commit = crate::store::RuntimeCommit::persisted_state_with_graph_commit(
279                &self.state,
280                appended.graph.clone(),
281                &[],
282            );
283            match store.commit_runtime_state(commit).await {
284                Ok(result) => self.state.apply_persisted_commit_result(result),
285                Err(err) => tracing::warn!("failed to persist runtime state: {err}"),
286            }
287        }
288        Ok(appended)
289    }
290
291    pub(in crate::runtime) async fn emit_host_event_without_persist(
292        &mut self,
293        resource_type: &str,
294        alias: &str,
295        event: &str,
296        payload: serde_json::Value,
297        scoped_effect_controller: crate::ScopedEffectController<'_>,
298    ) -> Result<(crate::HostEventEmitReport, crate::store::GraphCommitDelta), SessionError> {
299        let appended = self
300            .validate_and_append_host_event_to_state(resource_type, alias, event, &payload)
301            .await?;
302        let report = self
303            .activate_host_event_source(
304                &appended.source_type,
305                payload,
306                scoped_effect_controller,
307                appended.invocation.clone(),
308            )
309            .await?;
310        Ok((report, appended.graph))
311    }
312
313    async fn validate_and_append_host_event_to_state(
314        &mut self,
315        resource_type: &str,
316        alias: &str,
317        event: &str,
318        payload: &serde_json::Value,
319    ) -> Result<AppendedHostEvent, SessionError> {
320        self.refresh_session_graph_from_store().await?;
321        {
322            let Some(session) = self.session.as_ref() else {
323                return Err(SessionError::Protocol(
324                    "runtime session not available".to_string(),
325                ));
326            };
327            crate::session::triggers::validate_host_event(
328                session.plugins(),
329                resource_type,
330                alias,
331                event,
332                payload,
333            )
334            .map_err(|err| SessionError::Protocol(err.to_string()))?;
335        }
336        let source_type = crate::host_event_source_type(alias, event);
337        let nodes = vec![crate::SessionAppendNode::plugin(
338            "lash.host_event",
339            serde_json::json!({
340                "resource_type": resource_type,
341                "alias": alias,
342                "event": event,
343                "source_type": source_type.clone(),
344                "payload": payload.clone(),
345            }),
346        )];
347        let node_ids = append_session_nodes_to_state(&mut self.state, &nodes);
348        if let Some(session) = self.session.as_mut() {
349            let protocol_session = Arc::clone(session.plugins().protocol_session());
350            let session_id = self.state.session_id.clone();
351            protocol_session
352                .append_session_nodes(
353                    crate::plugin::ProtocolSessionContext::new(session, &session_id),
354                    &nodes,
355                )
356                .await?;
357        }
358        self.stamp_live_plugin_state();
359        let host_event_node_id = node_ids.into_iter().next().unwrap_or_default();
360        let graph = crate::store::GraphCommitDelta::Append {
361            nodes: self
362                .state
363                .session_graph
364                .find_node(&host_event_node_id)
365                .cloned()
366                .into_iter()
367                .collect(),
368            leaf_node_id: self.state.session_graph.leaf_node_id.clone(),
369        };
370        let host_event_invocation = crate::runtime::causal::session_node_invocation(
371            &self.state.session_id,
372            host_event_node_id.clone(),
373        );
374        Ok(AppendedHostEvent {
375            source_type,
376            #[cfg(test)]
377            node_id: host_event_node_id.clone(),
378            invocation: host_event_invocation,
379            graph,
380        })
381    }
382
383    async fn activate_host_event_source(
384        &mut self,
385        source_type: &str,
386        payload: serde_json::Value,
387        scoped_effect_controller: crate::ScopedEffectController<'_>,
388        host_event_invocation: crate::RuntimeInvocation,
389    ) -> Result<crate::HostEventEmitReport, SessionError> {
390        let Some(session) = self.session.as_ref() else {
391            return Err(SessionError::Protocol(
392                "runtime session not available".to_string(),
393            ));
394        };
395        let manager = self
396            .runtime_session_services()
397            .map_err(|err| SessionError::Protocol(err.to_string()))?;
398        let activation = session
399            .plugins()
400            .trigger_activation_service(manager.process_service(), scoped_effect_controller);
401        let started_process_ids = activation
402            .activate_source_type(source_type, payload, Some(host_event_invocation))
403            .await
404            .map_err(|err| SessionError::Protocol(err.to_string()))?;
405        Ok(crate::HostEventEmitReport {
406            started_process_ids,
407        })
408    }
409
410    pub async fn activate_lashlang_trigger(
411        &mut self,
412        handle: &str,
413        payload: serde_json::Value,
414    ) -> Result<crate::HostEventEmitReport, SessionError> {
415        let effect_host = Arc::clone(&self.host.core.control.effect_host);
416        let appended = self
417            .append_lashlang_trigger_activation(handle, &payload)
418            .await?;
419        let scoped_effect_controller = effect_host
420            .scoped(crate::EffectScope::host_event(
421                &self.state.session_id,
422                appended.node_id.clone(),
423            ))
424            .map_err(|err| SessionError::Protocol(err.to_string()))?;
425        self.activate_lashlang_trigger_from_scope(
426            handle,
427            payload,
428            scoped_effect_controller,
429            appended.invocation,
430        )
431        .await
432    }
433
434    pub async fn activate_lashlang_trigger_with_effect_scope(
435        &mut self,
436        handle: &str,
437        payload: serde_json::Value,
438        scoped_effect_controller: crate::ScopedEffectController<'_>,
439    ) -> Result<crate::HostEventEmitReport, SessionError> {
440        let appended = self
441            .append_lashlang_trigger_activation(handle, &payload)
442            .await?;
443        self.activate_lashlang_trigger_from_scope(
444            handle,
445            payload,
446            scoped_effect_controller,
447            appended.invocation,
448        )
449        .await
450    }
451
452    async fn append_lashlang_trigger_activation(
453        &mut self,
454        handle: &str,
455        payload: &serde_json::Value,
456    ) -> Result<AppendedActivation, SessionError> {
457        let append = self
458            .append_session_nodes(crate::AppendSessionNodesRequest {
459                nodes: vec![crate::SessionAppendNode::plugin(
460                    "lash.trigger_activation",
461                    serde_json::json!({
462                        "handle": handle,
463                        "payload": payload.clone(),
464                    }),
465                )],
466                requires_ancestor_node_id: None,
467            })
468            .await?;
469        let activation_node_id = match append {
470            crate::AppendSessionNodesResult::Appended { node_ids, .. } => {
471                node_ids.into_iter().next().unwrap_or_default()
472            }
473            crate::AppendSessionNodesResult::StaleBranch {
474                current_leaf_node_id,
475            } => {
476                return Err(SessionError::Protocol(format!(
477                    "trigger activation append targeted a stale session branch at {:?}",
478                    current_leaf_node_id
479                )));
480            }
481        };
482        let activation_invocation = crate::runtime::causal::session_node_invocation(
483            &self.state.session_id,
484            activation_node_id.clone(),
485        );
486        Ok(AppendedActivation {
487            node_id: activation_node_id,
488            invocation: activation_invocation,
489        })
490    }
491
492    async fn activate_lashlang_trigger_from_scope(
493        &mut self,
494        handle: &str,
495        payload: serde_json::Value,
496        scoped_effect_controller: crate::ScopedEffectController<'_>,
497        activation_invocation: crate::RuntimeInvocation,
498    ) -> Result<crate::HostEventEmitReport, SessionError> {
499        let Some(session) = self.session.as_ref() else {
500            return Err(SessionError::Protocol(
501                "runtime session not available".to_string(),
502            ));
503        };
504        let manager = self
505            .runtime_session_services()
506            .map_err(|err| SessionError::Protocol(err.to_string()))?;
507        let activation = session
508            .plugins()
509            .trigger_activation_service(manager.process_service(), scoped_effect_controller);
510        let started = activation
511            .activate(handle, payload, Some(activation_invocation))
512            .await
513            .map_err(|err| SessionError::Protocol(err.to_string()))?;
514        Ok(crate::HostEventEmitReport {
515            started_process_ids: started.into_iter().collect(),
516        })
517    }
518
519    pub async fn activate_lashlang_trigger_source_type(
520        &mut self,
521        source_type: impl AsRef<str>,
522        payload: serde_json::Value,
523    ) -> Result<crate::HostEventEmitReport, SessionError> {
524        let effect_host = Arc::clone(&self.host.core.control.effect_host);
525        let source_type = source_type.as_ref().to_string();
526        let appended = self
527            .append_lashlang_trigger_source_activation(&source_type, &payload)
528            .await?;
529        let scoped_effect_controller = effect_host
530            .scoped(crate::EffectScope::host_event(
531                &self.state.session_id,
532                appended.node_id.clone(),
533            ))
534            .map_err(|err| SessionError::Protocol(err.to_string()))?;
535        self.activate_lashlang_trigger_source_from_scope(
536            &source_type,
537            payload,
538            scoped_effect_controller,
539            appended.invocation,
540        )
541        .await
542    }
543
544    pub async fn activate_lashlang_trigger_source_type_with_effect_scope(
545        &mut self,
546        source_type: impl AsRef<str>,
547        payload: serde_json::Value,
548        scoped_effect_controller: crate::ScopedEffectController<'_>,
549    ) -> Result<crate::HostEventEmitReport, SessionError> {
550        let source_type = source_type.as_ref().to_string();
551        let appended = self
552            .append_lashlang_trigger_source_activation(&source_type, &payload)
553            .await?;
554        self.activate_lashlang_trigger_source_from_scope(
555            &source_type,
556            payload,
557            scoped_effect_controller,
558            appended.invocation,
559        )
560        .await
561    }
562
563    async fn append_lashlang_trigger_source_activation(
564        &mut self,
565        source_type: &str,
566        payload: &serde_json::Value,
567    ) -> Result<AppendedActivation, SessionError> {
568        let append = self
569            .append_session_nodes(crate::AppendSessionNodesRequest {
570                nodes: vec![crate::SessionAppendNode::plugin(
571                    "lash.trigger_source_activation",
572                    serde_json::json!({
573                        "source_type": source_type,
574                        "payload": payload.clone(),
575                    }),
576                )],
577                requires_ancestor_node_id: None,
578            })
579            .await?;
580        let activation_node_id = match append {
581            crate::AppendSessionNodesResult::Appended { node_ids, .. } => {
582                node_ids.into_iter().next().unwrap_or_default()
583            }
584            crate::AppendSessionNodesResult::StaleBranch {
585                current_leaf_node_id,
586            } => {
587                return Err(SessionError::Protocol(format!(
588                    "trigger source activation append targeted a stale session branch at {:?}",
589                    current_leaf_node_id
590                )));
591            }
592        };
593        let activation_invocation = crate::runtime::causal::session_node_invocation(
594            &self.state.session_id,
595            activation_node_id.clone(),
596        );
597        Ok(AppendedActivation {
598            node_id: activation_node_id,
599            invocation: activation_invocation,
600        })
601    }
602
603    async fn activate_lashlang_trigger_source_from_scope(
604        &mut self,
605        source_type: &str,
606        payload: serde_json::Value,
607        scoped_effect_controller: crate::ScopedEffectController<'_>,
608        activation_invocation: crate::RuntimeInvocation,
609    ) -> Result<crate::HostEventEmitReport, SessionError> {
610        let Some(session) = self.session.as_ref() else {
611            return Err(SessionError::Protocol(
612                "runtime session not available".to_string(),
613            ));
614        };
615        let manager = self
616            .runtime_session_services()
617            .map_err(|err| SessionError::Protocol(err.to_string()))?;
618        let activation = session
619            .plugins()
620            .trigger_activation_service(manager.process_service(), scoped_effect_controller);
621        let started_process_ids = activation
622            .activate_source_type(source_type, payload, Some(activation_invocation))
623            .await
624            .map_err(|err| SessionError::Protocol(err.to_string()))?;
625        Ok(crate::HostEventEmitReport {
626            started_process_ids,
627        })
628    }
629
630    pub fn list_lashlang_trigger_registrations(
631        &self,
632    ) -> Result<Vec<crate::TriggerRegistration>, SessionError> {
633        let Some(session) = self.session.as_ref() else {
634            return Err(SessionError::Protocol(
635                "runtime session not available".to_string(),
636            ));
637        };
638        session
639            .plugins()
640            .list_all_lashlang_triggers()
641            .map_err(|err| SessionError::Protocol(err.to_string()))
642    }
643
644    pub fn lashlang_trigger_registrations_by_source_type(
645        &self,
646        source_type: impl Into<crate::TriggerSourceType>,
647    ) -> Result<Vec<crate::TriggerRegistration>, SessionError> {
648        let Some(session) = self.session.as_ref() else {
649            return Err(SessionError::Protocol(
650                "runtime session not available".to_string(),
651            ));
652        };
653        session
654            .plugins()
655            .lashlang_trigger_registrations_by_source_type(source_type.into())
656            .map_err(|err| SessionError::Protocol(err.to_string()))
657    }
658
659    pub async fn invoke_plugin_action(
660        &self,
661        name: &str,
662        args: serde_json::Value,
663        session_id: Option<String>,
664    ) -> Result<crate::ToolResult, PluginActionInvokeError> {
665        let manager = self.runtime_session_services()?;
666        let Some(session) = self.session.as_ref() else {
667            return Err(PluginActionInvokeError::Unknown(name.to_string()));
668        };
669        session
670            .plugins()
671            .invoke_plugin_action(
672                name,
673                args,
674                session_id,
675                true,
676                manager.state_service(),
677                manager.lifecycle_service(),
678                manager.graph_service(),
679                manager.process_service(),
680            )
681            .await
682    }
683}