Skip to main content

lash_core/runtime/
session_ops.rs

1//! `LashRuntime` session-graph and execution-state operations.
2//!
3//! Extracted from `runtime/mod.rs`. This file re-opens `impl LashRuntime`;
4//! no types live here and no public API is changed.
5
6use std::sync::Arc;
7
8use crate::{PluginOperationInvokeError, SessionError};
9
10use super::LashRuntime;
11use super::state::{
12    RuntimeSessionState, append_session_nodes_to_state_with_clock, normalize_session_graph,
13};
14
15impl LashRuntime {
16    /// Replace the host-owned state envelope.
17    pub fn set_persisted_state(&mut self, state: RuntimeSessionState) -> Result<(), SessionError> {
18        let mut state = state;
19        normalize_session_graph(&mut state);
20        if let Some(session) = self.session.as_ref() {
21            session.invalidate_runtime_caches();
22            // Restore the persisted tool catalog so the live registry matches the
23            // state being installed (mirrors `from_host_state`). Without this the
24            // registry keeps its prior generation/tools and silently diverges from
25            // `state`. `restore_state` adopts the snapshot's generation, so a
26            // surface that reached generation >= 2 restores cleanly.
27            if let Some(tool_state) = state.tool_state_snapshot.clone() {
28                let report = session
29                    .plugins()
30                    .tool_registry()
31                    .restore_state(tool_state)
32                    .map_err(|err| SessionError::Protocol(err.to_string()))?;
33                if !report.orphaned.is_empty() {
34                    tracing::warn!(
35                        session_id = %state.session_id,
36                        orphaned = ?report.orphaned,
37                        "persisted state installed with orphaned tools: no registered \
38                         source resolves them; they remain non-members until their source returns"
39                    );
40                }
41            }
42            let snapshot = state.plugin_snapshot.clone().unwrap_or_default();
43            session
44                .plugins()
45                .restore(&snapshot)
46                .map_err(|err| SessionError::Protocol(err.to_string()))?;
47            state.plugin_snapshot_revision =
48                Some(session.plugins().snapshot_revision_fingerprint());
49        }
50        self.policy = state.policy.clone();
51        self.protocol_turn_options = state.protocol_turn_options.clone();
52        self.state = state;
53        Ok(())
54    }
55
56    pub async fn append_session_nodes(
57        &mut self,
58        request: crate::AppendSessionNodesRequest,
59    ) -> Result<crate::AppendSessionNodesResult, SessionError> {
60        self.refresh_session_graph_from_store().await?;
61        if let Some(required) = request.requires_ancestor_node_id.as_deref()
62            && !self.state.session_graph.active_path_contains(required)
63        {
64            return Ok(crate::AppendSessionNodesResult::StaleBranch {
65                current_leaf_node_id: self.state.session_graph.leaf_node_id.clone(),
66            });
67        }
68        let node_ids = append_session_nodes_to_state_with_clock(
69            &mut self.state,
70            &request.nodes,
71            self.host.core.clock.as_ref(),
72        );
73        if let Some(session) = self.session.as_mut() {
74            let protocol_session = Arc::clone(session.plugins().protocol_session());
75            let session_id = self.state.session_id.clone();
76            protocol_session
77                .append_session_nodes(
78                    crate::plugin::ProtocolSessionContext::new(session, &session_id),
79                    &request.nodes,
80                )
81                .await?;
82        }
83        self.stamp_live_plugin_state();
84        if let Some(store) = self
85            .session
86            .as_ref()
87            .and_then(|session| session.history_store())
88        {
89            let graph = crate::store::GraphCommitDelta::Append {
90                nodes: node_ids
91                    .iter()
92                    .filter_map(|id| self.state.session_graph.find_node(id).cloned())
93                    .collect(),
94                leaf_node_id: self.state.session_graph.leaf_node_id.clone(),
95            };
96            let commit = crate::store::RuntimeCommit::persisted_state_with_graph_commit(
97                &self.state,
98                graph,
99                &[],
100            );
101            match super::commit_runtime_state_with_fresh_session_execution_lease(
102                store,
103                commit,
104                &self.runtime_lease_owner,
105                Arc::clone(&self.host.core.clock),
106            )
107            .await
108            {
109                Ok(result) => self.state.apply_persisted_commit_result(result),
110                Err(err) => tracing::warn!("failed to persist runtime state: {err}"),
111            }
112        }
113        Ok(crate::AppendSessionNodesResult::Appended {
114            node_ids,
115            leaf_node_id: self
116                .state
117                .session_graph
118                .leaf_node_id
119                .clone()
120                .unwrap_or_default(),
121        })
122    }
123
124    pub async fn apply_protocol_session_extension(
125        &mut self,
126        extension: crate::ProtocolSessionExtensionHandle,
127    ) -> Result<(), SessionError> {
128        let Some(session) = self.session.as_ref() else {
129            return Err(SessionError::Protocol(
130                "runtime session is not available".to_string(),
131            ));
132        };
133        let protocol_session = Arc::clone(session.plugins().protocol_session());
134        protocol_session.apply_session_extension(extension).await
135    }
136
137    pub async fn validate_protocol_turn_extension(
138        &mut self,
139        extension: &crate::ProtocolTurnExtensionHandle,
140    ) -> Result<(), SessionError> {
141        let Some(session) = self.session.as_ref() else {
142            return Err(SessionError::Protocol(
143                "runtime session is not available".to_string(),
144            ));
145        };
146        let protocol_session = Arc::clone(session.plugins().protocol_session());
147        protocol_session.validate_turn_extension(extension).await
148    }
149
150    pub async fn branch_to_node(
151        &mut self,
152        node_id: Option<String>,
153    ) -> Result<crate::SessionSnapshot, SessionError> {
154        let mut state = self.export_state();
155        state.session_graph.branch_to(node_id);
156        let mut persisted_state = RuntimeSessionState::from_snapshot(state);
157        normalize_session_graph(&mut persisted_state);
158
159        let policy = persisted_state.policy.clone();
160        let host = self.host.clone();
161        let services = self.services.clone();
162        let managed_sessions = Arc::clone(&self.managed_sessions);
163        let managed_turns = Arc::clone(&self.managed_turns);
164        let process_sync_needed = Arc::clone(&self.process_sync_needed);
165        let runtime_scope_id = Arc::clone(&self.runtime_scope_id);
166        let runtime_lease_owner = self.runtime_lease_owner.clone();
167        let turn_phase_probe = self.turn_phase_probe.clone();
168
169        let mut rebuilt = Self::from_host_state(policy, host, services, persisted_state).await?;
170        rebuilt.managed_sessions = managed_sessions;
171        rebuilt.managed_turns = managed_turns;
172        rebuilt.process_sync_needed = process_sync_needed;
173        rebuilt.runtime_scope_id = runtime_scope_id;
174        rebuilt.runtime_lease_owner = runtime_lease_owner;
175        rebuilt.turn_phase_probe = turn_phase_probe;
176
177        let exported = rebuilt.export_state();
178        *self = rebuilt;
179        Ok(exported)
180    }
181
182    /// Promote a managed child session into the foreground runtime.
183    ///
184    /// Child sessions created through `SessionLifecycleService::create_session` are real
185    /// runtimes, not serialized placeholders. Foreground activation must therefore
186    /// claim that runtime instead of reconstructing a new empty state in the UI.
187    pub async fn activate_managed_session(&mut self, session_id: &str) -> Result<(), SessionError> {
188        let child = {
189            let mut registry = self.managed_sessions.lock().await;
190            registry.remove(session_id).ok_or_else(|| {
191                SessionError::Protocol(format!("unknown managed session `{session_id}`"))
192            })?
193        };
194        let child = child.try_into_runtime().map_err(|_| {
195            SessionError::Protocol(format!("managed session `{session_id}` is still in use"))
196        })?;
197        *self = child;
198        Ok(())
199    }
200
201    /// Explicitly snapshot protocol-local execution state, if any.
202    pub async fn snapshot_execution_state(&mut self) -> Result<Option<Vec<u8>>, SessionError> {
203        let Some(session) = self.session.as_mut() else {
204            return Err(SessionError::Protocol(
205                "runtime session not available".to_string(),
206            ));
207        };
208        let code_executor = session
209            .plugins()
210            .code_executor()
211            .ok_or(SessionError::CodeExecutionUnavailable)?;
212        let session_id = self.state.session_id.clone();
213        let blob = code_executor
214            .snapshot_execution_state(crate::plugin::ProtocolSessionContext::new(
215                session,
216                &session_id,
217            ))
218            .await?;
219        self.state.execution_state_snapshot = blob.clone();
220        Ok(blob)
221    }
222
223    /// Explicitly restore protocol-local execution state from an opaque snapshot blob.
224    pub async fn restore_execution_state(&mut self, snapshot: &[u8]) -> Result<(), SessionError> {
225        let Some(session) = self.session.as_mut() else {
226            return Err(SessionError::Protocol(
227                "runtime session not available".to_string(),
228            ));
229        };
230        let code_executor = session
231            .plugins()
232            .code_executor()
233            .ok_or(SessionError::CodeExecutionUnavailable)?;
234        let session_id = self.state.session_id.clone();
235        code_executor
236            .restore_execution_state(
237                crate::plugin::ProtocolSessionContext::new(session, &session_id),
238                snapshot,
239            )
240            .await?;
241        self.state.execution_state_snapshot = Some(snapshot.to_vec());
242        Ok(())
243    }
244
245    pub async fn list_trigger_registrations(
246        &self,
247    ) -> Result<Vec<crate::TriggerRegistration>, SessionError> {
248        let store = self.host.trigger_store.as_ref().ok_or_else(|| {
249            SessionError::Protocol("trigger store is unavailable in this runtime".to_string())
250        })?;
251        let records = store
252            .list_subscriptions(crate::TriggerSubscriptionFilter::for_session(
253                self.state.session_id.clone(),
254            ))
255            .await
256            .map_err(|err| SessionError::Protocol(err.to_string()))?;
257        Ok(records
258            .iter()
259            .map(crate::TriggerRegistration::from)
260            .collect())
261    }
262
263    pub async fn trigger_registrations_by_source_type(
264        &self,
265        source_type: impl Into<crate::TriggerEventType>,
266    ) -> Result<Vec<crate::TriggerRegistration>, SessionError> {
267        let store = self.host.trigger_store.as_ref().ok_or_else(|| {
268            SessionError::Protocol("trigger store is unavailable in this runtime".to_string())
269        })?;
270        let mut filter =
271            crate::TriggerSubscriptionFilter::for_session(self.state.session_id.clone());
272        filter.source_type = Some(source_type.into().to_string());
273        let records = store
274            .list_subscriptions(filter)
275            .await
276            .map_err(|err| SessionError::Protocol(err.to_string()))?;
277        Ok(records
278            .iter()
279            .map(crate::TriggerRegistration::from)
280            .collect())
281    }
282
283    pub async fn query_plugin(
284        &self,
285        name: &str,
286        args: serde_json::Value,
287        session_id: Option<String>,
288    ) -> Result<(String, serde_json::Value), PluginOperationInvokeError> {
289        let manager = self.runtime_session_services()?;
290        let Some(session) = self.session.as_ref() else {
291            return Err(PluginOperationInvokeError::Unknown(
292                "runtime session not available".to_string(),
293            ));
294        };
295        session
296            .plugins()
297            .query_plugin(
298                name,
299                args,
300                session_id,
301                true,
302                manager.read_service(),
303                manager.process_read_service(),
304            )
305            .await
306    }
307
308    pub async fn run_plugin_command(
309        &mut self,
310        name: &str,
311        args: serde_json::Value,
312        session_id: Option<String>,
313    ) -> Result<crate::PluginCommandReceipt<serde_json::Value>, PluginOperationInvokeError> {
314        let manager = self.runtime_session_services()?;
315        let Some(session) = self.session.as_ref() else {
316            return Err(PluginOperationInvokeError::Unknown(
317                "runtime session not available".to_string(),
318            ));
319        };
320        let (plugin_id, outcome) = session
321            .plugins()
322            .run_plugin_command(
323                name,
324                args,
325                session_id,
326                true,
327                manager.state_service(),
328                manager.lifecycle_service(),
329                manager.graph_service(),
330                manager.process_service(),
331            )
332            .await?;
333        let (events, queued_batches) = self
334            .apply_plugin_operation_effects(&plugin_id, outcome.events, outcome.directives)
335            .await?;
336        Ok(crate::PluginCommandReceipt {
337            output: outcome.output,
338            events,
339            queued_batches,
340        })
341    }
342
343    pub async fn run_plugin_task(
344        &mut self,
345        name: &str,
346        args: serde_json::Value,
347        session_id: Option<String>,
348        scoped_effect_controller: crate::ScopedEffectController<'static>,
349        cancellation_token: tokio_util::sync::CancellationToken,
350    ) -> Result<crate::PluginTaskReceipt<serde_json::Value>, PluginOperationInvokeError> {
351        let manager = self.runtime_session_services()?;
352        let Some(session) = self.session.as_ref() else {
353            return Err(PluginOperationInvokeError::Unknown(
354                "runtime session not available".to_string(),
355            ));
356        };
357        let (plugin_id, outcome) = session
358            .plugins()
359            .run_plugin_task(
360                name,
361                args,
362                session_id,
363                true,
364                manager.state_service(),
365                manager.lifecycle_service(),
366                manager.graph_service(),
367                manager.process_service(),
368                scoped_effect_controller,
369                cancellation_token,
370            )
371            .await?;
372        let (events, queued_batches) = self
373            .apply_plugin_operation_effects(&plugin_id, outcome.events, outcome.directives)
374            .await?;
375        Ok(crate::PluginTaskReceipt {
376            output: outcome.output,
377            events,
378            queued_batches,
379        })
380    }
381
382    async fn apply_plugin_operation_effects(
383        &mut self,
384        plugin_id: &str,
385        events: Vec<crate::PluginRuntimeEvent>,
386        directives: Vec<crate::PluginRuntimeDirective>,
387    ) -> Result<
388        (
389            Vec<crate::PluginOwned<crate::PluginRuntimeEvent>>,
390            Vec<crate::runtime::QueuedWorkBatch>,
391        ),
392        PluginOperationInvokeError,
393    > {
394        let owned_events = events
395            .into_iter()
396            .map(|event| crate::PluginOwned {
397                plugin_id: plugin_id.to_string(),
398                value: event,
399            })
400            .collect::<Vec<_>>();
401        if !owned_events.is_empty() {
402            let nodes = owned_events
403                .iter()
404                .map(|owned| {
405                    crate::plugin_runtime_protocol_event(&owned.plugin_id, owned.value.clone())
406                        .map(crate::SessionAppendNode::protocol_event)
407                        .map_err(|err| {
408                            PluginOperationInvokeError::Failed(format!(
409                                "failed to encode plugin runtime event: {err}"
410                            ))
411                        })
412                })
413                .collect::<Result<Vec<_>, _>>()?;
414            self.append_plugin_runtime_event_nodes(&nodes).await?;
415        }
416        self.stamp_live_plugin_state();
417        self.persist_plugin_operation_state_if_needed().await?;
418
419        let mut queued_batches = Vec::new();
420        for directive in directives {
421            match directive {
422                crate::PluginRuntimeDirective::QueueTurn {
423                    input,
424                    delivery_policy,
425                    slot_policy,
426                    source_key,
427                } => {
428                    let batch = self
429                        .enqueue_turn_input(input, delivery_policy, slot_policy, source_key)
430                        .await
431                        .map_err(|err| {
432                            PluginOperationInvokeError::Failed(format!(
433                                "failed to queue plugin turn request: {err}"
434                            ))
435                        })?;
436                    queued_batches.push(batch);
437                }
438            }
439        }
440
441        Ok((owned_events, queued_batches))
442    }
443
444    async fn append_plugin_runtime_event_nodes(
445        &mut self,
446        nodes: &[crate::SessionAppendNode],
447    ) -> Result<(), PluginOperationInvokeError> {
448        let node_ids = append_session_nodes_to_state_with_clock(
449            &mut self.state,
450            nodes,
451            self.host.core.clock.as_ref(),
452        );
453        if let Some(store) = self
454            .session
455            .as_ref()
456            .and_then(|session| session.history_store())
457        {
458            let graph = crate::store::GraphCommitDelta::Append {
459                nodes: node_ids
460                    .iter()
461                    .filter_map(|id| self.state.session_graph.find_node(id).cloned())
462                    .collect(),
463                leaf_node_id: self.state.session_graph.leaf_node_id.clone(),
464            };
465            let commit = crate::store::RuntimeCommit::persisted_state_with_graph_commit(
466                &self.state,
467                graph,
468                &[],
469            );
470            let result = super::commit_runtime_state_with_fresh_session_execution_lease(
471                store,
472                commit,
473                &self.runtime_lease_owner,
474                Arc::clone(&self.host.core.clock),
475            )
476            .await
477            .map_err(|err| {
478                PluginOperationInvokeError::Failed(format!(
479                    "failed to persist plugin runtime events: {err}"
480                ))
481            })?;
482            self.state.apply_persisted_commit_result(result);
483        }
484        Ok(())
485    }
486
487    async fn persist_plugin_operation_state_if_needed(
488        &mut self,
489    ) -> Result<(), PluginOperationInvokeError> {
490        let Some(store) = self
491            .session
492            .as_ref()
493            .and_then(|session| session.history_store())
494        else {
495            return Ok(());
496        };
497        let commit = crate::store::RuntimeCommit::persisted_state(&self.state, &[]);
498        let result = super::commit_runtime_state_with_fresh_session_execution_lease(
499            store,
500            commit,
501            &self.runtime_lease_owner,
502            Arc::clone(&self.host.core.clock),
503        )
504        .await
505        .map_err(|err| {
506            PluginOperationInvokeError::Failed(format!(
507                "failed to persist plugin operation state: {err}"
508            ))
509        })?;
510        self.state.apply_persisted_commit_result(result);
511        Ok(())
512    }
513}