Skip to main content

lash_core/runtime/
session_ops.rs

1//! `LashRuntime` session-graph and execution-state operations.
2//!
3//! Extracted from `runtime/mod.rs`. This file re-opens `impl LashRuntime`;
4//! no types live here and no public API is changed.
5
6use std::sync::Arc;
7
8use crate::{PluginOperationInvokeError, SessionError};
9
10use super::LashRuntime;
11use super::state::{
12    RuntimeSessionState, append_session_nodes_to_state_with_clock, normalize_session_graph,
13};
14
15impl LashRuntime {
16    /// Replace the host-owned state envelope.
17    pub fn set_persisted_state(&mut self, state: RuntimeSessionState) -> Result<(), SessionError> {
18        let mut state = state;
19        normalize_session_graph(&mut state);
20        if let Some(session) = self.session.as_ref() {
21            session.invalidate_runtime_caches();
22            // Restore the persisted tool catalog so the live registry matches the
23            // state being installed (mirrors `from_host_state`). Without this the
24            // registry keeps its prior generation/tools and silently diverges from
25            // `state`. `restore_state` adopts the snapshot's generation, so a
26            // surface that reached generation >= 2 restores cleanly.
27            if let Some(tool_state) = state.tool_state_snapshot.clone() {
28                let report = session
29                    .plugins()
30                    .tool_registry()
31                    .restore_state(tool_state)
32                    .map_err(|err| SessionError::Protocol(err.to_string()))?;
33                if !report.orphaned.is_empty() {
34                    tracing::warn!(
35                        session_id = %state.session_id,
36                        orphaned = ?report.orphaned,
37                        "persisted state installed with orphaned tools: no registered \
38                         source resolves them; they are Off until their source returns"
39                    );
40                }
41            }
42            let snapshot = state.plugin_snapshot.clone().unwrap_or_default();
43            session
44                .plugins()
45                .restore(&snapshot)
46                .map_err(|err| SessionError::Protocol(err.to_string()))?;
47            state.plugin_snapshot_revision =
48                Some(session.plugins().snapshot_revision_fingerprint());
49        }
50        self.policy = state.policy.clone();
51        self.protocol_turn_options = state.protocol_turn_options.clone();
52        self.state = state;
53        Ok(())
54    }
55
56    pub async fn append_session_nodes(
57        &mut self,
58        request: crate::AppendSessionNodesRequest,
59    ) -> Result<crate::AppendSessionNodesResult, SessionError> {
60        self.refresh_session_graph_from_store().await?;
61        if let Some(required) = request.requires_ancestor_node_id.as_deref()
62            && !self.state.session_graph.active_path_contains(required)
63        {
64            return Ok(crate::AppendSessionNodesResult::StaleBranch {
65                current_leaf_node_id: self.state.session_graph.leaf_node_id.clone(),
66            });
67        }
68        let node_ids = append_session_nodes_to_state_with_clock(
69            &mut self.state,
70            &request.nodes,
71            self.host.core.clock.as_ref(),
72        );
73        if let Some(session) = self.session.as_mut() {
74            let protocol_session = Arc::clone(session.plugins().protocol_session());
75            let session_id = self.state.session_id.clone();
76            protocol_session
77                .append_session_nodes(
78                    crate::plugin::ProtocolSessionContext::new(session, &session_id),
79                    &request.nodes,
80                )
81                .await?;
82        }
83        self.stamp_live_plugin_state();
84        if let Some(store) = self
85            .session
86            .as_ref()
87            .and_then(|session| session.history_store())
88        {
89            let graph = crate::store::GraphCommitDelta::Append {
90                nodes: node_ids
91                    .iter()
92                    .filter_map(|id| self.state.session_graph.find_node(id).cloned())
93                    .collect(),
94                leaf_node_id: self.state.session_graph.leaf_node_id.clone(),
95            };
96            let commit = crate::store::RuntimeCommit::persisted_state_with_graph_commit(
97                &self.state,
98                graph,
99                &[],
100            );
101            match store.commit_runtime_state(commit).await {
102                Ok(result) => self.state.apply_persisted_commit_result(result),
103                Err(err) => tracing::warn!("failed to persist runtime state: {err}"),
104            }
105        }
106        Ok(crate::AppendSessionNodesResult::Appended {
107            node_ids,
108            leaf_node_id: self
109                .state
110                .session_graph
111                .leaf_node_id
112                .clone()
113                .unwrap_or_default(),
114        })
115    }
116
117    pub async fn apply_protocol_session_extension(
118        &mut self,
119        extension: crate::ProtocolSessionExtensionHandle,
120    ) -> Result<(), SessionError> {
121        let Some(session) = self.session.as_ref() else {
122            return Err(SessionError::Protocol(
123                "runtime session is not available".to_string(),
124            ));
125        };
126        let protocol_session = Arc::clone(session.plugins().protocol_session());
127        protocol_session.apply_session_extension(extension).await
128    }
129
130    pub async fn validate_protocol_turn_extension(
131        &mut self,
132        extension: &crate::ProtocolTurnExtensionHandle,
133    ) -> Result<(), SessionError> {
134        let Some(session) = self.session.as_ref() else {
135            return Err(SessionError::Protocol(
136                "runtime session is not available".to_string(),
137            ));
138        };
139        let protocol_session = Arc::clone(session.plugins().protocol_session());
140        protocol_session.validate_turn_extension(extension).await
141    }
142
143    pub async fn branch_to_node(
144        &mut self,
145        node_id: Option<String>,
146    ) -> Result<crate::SessionSnapshot, SessionError> {
147        let mut state = self.export_state();
148        state.session_graph.branch_to(node_id);
149        let mut persisted_state = RuntimeSessionState::from_snapshot(state);
150        normalize_session_graph(&mut persisted_state);
151
152        let policy = persisted_state.policy.clone();
153        let host = self.host.clone();
154        let services = self.services.clone();
155        let managed_sessions = Arc::clone(&self.managed_sessions);
156        let managed_turns = Arc::clone(&self.managed_turns);
157        let process_sync_needed = Arc::clone(&self.process_sync_needed);
158        let runtime_scope_id = Arc::clone(&self.runtime_scope_id);
159        let turn_phase_probe = self.turn_phase_probe.clone();
160
161        let mut rebuilt = Self::from_host_state(policy, host, services, persisted_state).await?;
162        rebuilt.managed_sessions = managed_sessions;
163        rebuilt.managed_turns = managed_turns;
164        rebuilt.process_sync_needed = process_sync_needed;
165        rebuilt.runtime_scope_id = runtime_scope_id;
166        rebuilt.turn_phase_probe = turn_phase_probe;
167
168        let exported = rebuilt.export_state();
169        *self = rebuilt;
170        Ok(exported)
171    }
172
173    /// Promote a managed child session into the foreground runtime.
174    ///
175    /// Child sessions created through `SessionLifecycleService::create_session` are real
176    /// runtimes, not serialized placeholders. Foreground activation must therefore
177    /// claim that runtime instead of reconstructing a new empty state in the UI.
178    pub async fn activate_managed_session(&mut self, session_id: &str) -> Result<(), SessionError> {
179        let child = {
180            let mut registry = self.managed_sessions.lock().await;
181            registry.remove(session_id).ok_or_else(|| {
182                SessionError::Protocol(format!("unknown managed session `{session_id}`"))
183            })?
184        };
185        let child = child.try_into_runtime().map_err(|_| {
186            SessionError::Protocol(format!("managed session `{session_id}` is still in use"))
187        })?;
188        *self = child;
189        Ok(())
190    }
191
192    /// Explicitly snapshot protocol-local execution state, if any.
193    pub async fn snapshot_execution_state(&mut self) -> Result<Option<Vec<u8>>, SessionError> {
194        let Some(session) = self.session.as_mut() else {
195            return Err(SessionError::Protocol(
196                "runtime session not available".to_string(),
197            ));
198        };
199        let code_executor = session
200            .plugins()
201            .code_executor()
202            .ok_or(SessionError::CodeExecutionUnavailable)?;
203        let session_id = self.state.session_id.clone();
204        let blob = code_executor
205            .snapshot_execution_state(crate::plugin::ProtocolSessionContext::new(
206                session,
207                &session_id,
208            ))
209            .await?;
210        self.state.execution_state_snapshot = blob.clone();
211        Ok(blob)
212    }
213
214    /// Explicitly restore protocol-local execution state from an opaque snapshot blob.
215    pub async fn restore_execution_state(&mut self, snapshot: &[u8]) -> Result<(), SessionError> {
216        let Some(session) = self.session.as_mut() else {
217            return Err(SessionError::Protocol(
218                "runtime session not available".to_string(),
219            ));
220        };
221        let code_executor = session
222            .plugins()
223            .code_executor()
224            .ok_or(SessionError::CodeExecutionUnavailable)?;
225        let session_id = self.state.session_id.clone();
226        code_executor
227            .restore_execution_state(
228                crate::plugin::ProtocolSessionContext::new(session, &session_id),
229                snapshot,
230            )
231            .await?;
232        self.state.execution_state_snapshot = Some(snapshot.to_vec());
233        Ok(())
234    }
235
236    pub async fn list_trigger_registrations(
237        &self,
238    ) -> Result<Vec<crate::TriggerRegistration>, SessionError> {
239        let store = self.host.trigger_store.as_ref().ok_or_else(|| {
240            SessionError::Protocol("trigger store is unavailable in this runtime".to_string())
241        })?;
242        let records = store
243            .list_subscriptions(crate::TriggerSubscriptionFilter::for_session(
244                self.state.session_id.clone(),
245            ))
246            .await
247            .map_err(|err| SessionError::Protocol(err.to_string()))?;
248        Ok(records
249            .iter()
250            .map(crate::TriggerRegistration::from)
251            .collect())
252    }
253
254    pub async fn trigger_registrations_by_source_type(
255        &self,
256        source_type: impl Into<crate::TriggerEventType>,
257    ) -> Result<Vec<crate::TriggerRegistration>, SessionError> {
258        let store = self.host.trigger_store.as_ref().ok_or_else(|| {
259            SessionError::Protocol("trigger store is unavailable in this runtime".to_string())
260        })?;
261        let mut filter =
262            crate::TriggerSubscriptionFilter::for_session(self.state.session_id.clone());
263        filter.source_type = Some(source_type.into().to_string());
264        let records = store
265            .list_subscriptions(filter)
266            .await
267            .map_err(|err| SessionError::Protocol(err.to_string()))?;
268        Ok(records
269            .iter()
270            .map(crate::TriggerRegistration::from)
271            .collect())
272    }
273
274    pub async fn query_plugin(
275        &self,
276        name: &str,
277        args: serde_json::Value,
278        session_id: Option<String>,
279    ) -> Result<(String, serde_json::Value), PluginOperationInvokeError> {
280        let manager = self.runtime_session_services()?;
281        let Some(session) = self.session.as_ref() else {
282            return Err(PluginOperationInvokeError::Unknown(
283                "runtime session not available".to_string(),
284            ));
285        };
286        session
287            .plugins()
288            .query_plugin(
289                name,
290                args,
291                session_id,
292                true,
293                manager.read_service(),
294                manager.process_read_service(),
295            )
296            .await
297    }
298
299    pub async fn run_plugin_command(
300        &mut self,
301        name: &str,
302        args: serde_json::Value,
303        session_id: Option<String>,
304    ) -> Result<crate::PluginCommandReceipt<serde_json::Value>, PluginOperationInvokeError> {
305        let manager = self.runtime_session_services()?;
306        let Some(session) = self.session.as_ref() else {
307            return Err(PluginOperationInvokeError::Unknown(
308                "runtime session not available".to_string(),
309            ));
310        };
311        let (plugin_id, outcome) = session
312            .plugins()
313            .run_plugin_command(
314                name,
315                args,
316                session_id,
317                true,
318                manager.state_service(),
319                manager.lifecycle_service(),
320                manager.graph_service(),
321                manager.process_service(),
322            )
323            .await?;
324        let (events, queued_batches) = self
325            .apply_plugin_operation_effects(&plugin_id, outcome.events, outcome.directives)
326            .await?;
327        Ok(crate::PluginCommandReceipt {
328            output: outcome.output,
329            events,
330            queued_batches,
331        })
332    }
333
334    pub async fn run_plugin_task(
335        &mut self,
336        name: &str,
337        args: serde_json::Value,
338        session_id: Option<String>,
339        scoped_effect_controller: crate::ScopedEffectController<'static>,
340        cancellation_token: tokio_util::sync::CancellationToken,
341    ) -> Result<crate::PluginTaskReceipt<serde_json::Value>, PluginOperationInvokeError> {
342        let manager = self.runtime_session_services()?;
343        let Some(session) = self.session.as_ref() else {
344            return Err(PluginOperationInvokeError::Unknown(
345                "runtime session not available".to_string(),
346            ));
347        };
348        let (plugin_id, outcome) = session
349            .plugins()
350            .run_plugin_task(
351                name,
352                args,
353                session_id,
354                true,
355                manager.state_service(),
356                manager.lifecycle_service(),
357                manager.graph_service(),
358                manager.process_service(),
359                scoped_effect_controller,
360                cancellation_token,
361            )
362            .await?;
363        let (events, queued_batches) = self
364            .apply_plugin_operation_effects(&plugin_id, outcome.events, outcome.directives)
365            .await?;
366        Ok(crate::PluginTaskReceipt {
367            output: outcome.output,
368            events,
369            queued_batches,
370        })
371    }
372
373    async fn apply_plugin_operation_effects(
374        &mut self,
375        plugin_id: &str,
376        events: Vec<crate::PluginRuntimeEvent>,
377        directives: Vec<crate::PluginRuntimeDirective>,
378    ) -> Result<
379        (
380            Vec<crate::PluginOwned<crate::PluginRuntimeEvent>>,
381            Vec<crate::runtime::QueuedWorkBatch>,
382        ),
383        PluginOperationInvokeError,
384    > {
385        let owned_events = events
386            .into_iter()
387            .map(|event| crate::PluginOwned {
388                plugin_id: plugin_id.to_string(),
389                value: event,
390            })
391            .collect::<Vec<_>>();
392        if !owned_events.is_empty() {
393            let nodes = owned_events
394                .iter()
395                .map(|owned| {
396                    crate::plugin_runtime_protocol_event(&owned.plugin_id, owned.value.clone())
397                        .map(crate::SessionAppendNode::protocol_event)
398                        .map_err(|err| {
399                            PluginOperationInvokeError::Failed(format!(
400                                "failed to encode plugin runtime event: {err}"
401                            ))
402                        })
403                })
404                .collect::<Result<Vec<_>, _>>()?;
405            self.append_plugin_runtime_event_nodes(&nodes).await?;
406        }
407        self.stamp_live_plugin_state();
408        self.persist_plugin_operation_state_if_needed().await?;
409
410        let mut queued_batches = Vec::new();
411        for directive in directives {
412            match directive {
413                crate::PluginRuntimeDirective::QueueTurn {
414                    input,
415                    delivery_policy,
416                    slot_policy,
417                    source_key,
418                } => {
419                    let batch = self
420                        .enqueue_turn_input(input, delivery_policy, slot_policy, source_key)
421                        .await
422                        .map_err(|err| {
423                            PluginOperationInvokeError::Failed(format!(
424                                "failed to queue plugin turn request: {err}"
425                            ))
426                        })?;
427                    queued_batches.push(batch);
428                }
429            }
430        }
431
432        Ok((owned_events, queued_batches))
433    }
434
435    async fn append_plugin_runtime_event_nodes(
436        &mut self,
437        nodes: &[crate::SessionAppendNode],
438    ) -> Result<(), PluginOperationInvokeError> {
439        let node_ids = append_session_nodes_to_state_with_clock(
440            &mut self.state,
441            nodes,
442            self.host.core.clock.as_ref(),
443        );
444        if let Some(store) = self
445            .session
446            .as_ref()
447            .and_then(|session| session.history_store())
448        {
449            let graph = crate::store::GraphCommitDelta::Append {
450                nodes: node_ids
451                    .iter()
452                    .filter_map(|id| self.state.session_graph.find_node(id).cloned())
453                    .collect(),
454                leaf_node_id: self.state.session_graph.leaf_node_id.clone(),
455            };
456            let commit = crate::store::RuntimeCommit::persisted_state_with_graph_commit(
457                &self.state,
458                graph,
459                &[],
460            );
461            let result = store.commit_runtime_state(commit).await.map_err(|err| {
462                PluginOperationInvokeError::Failed(format!(
463                    "failed to persist plugin runtime events: {err}"
464                ))
465            })?;
466            self.state.apply_persisted_commit_result(result);
467        }
468        Ok(())
469    }
470
471    async fn persist_plugin_operation_state_if_needed(
472        &mut self,
473    ) -> Result<(), PluginOperationInvokeError> {
474        let Some(store) = self
475            .session
476            .as_ref()
477            .and_then(|session| session.history_store())
478        else {
479            return Ok(());
480        };
481        let commit = crate::store::RuntimeCommit::persisted_state(&self.state, &[]);
482        let result = store.commit_runtime_state(commit).await.map_err(|err| {
483            PluginOperationInvokeError::Failed(format!(
484                "failed to persist plugin operation state: {err}"
485            ))
486        })?;
487        self.state.apply_persisted_commit_result(result);
488        Ok(())
489    }
490}