Skip to main content

lash_core/runtime/
session_ops.rs

1//! `LashRuntime` session-graph and execution-state operations.
2//!
3//! Extracted from `runtime/mod.rs`. This file re-opens `impl LashRuntime`;
4//! no types live here and no public API is changed.
5
6use std::sync::Arc;
7
8use crate::{PluginOperationInvokeError, SessionError};
9
10use super::LashRuntime;
11use super::state::{
12    RuntimeSessionState, append_session_nodes_to_state_with_clock, normalize_session_graph,
13};
14
15impl LashRuntime {
16    /// Replace the host-owned state envelope.
17    pub fn set_persisted_state(&mut self, state: RuntimeSessionState) -> Result<(), SessionError> {
18        let mut state = state;
19        normalize_session_graph(&mut state);
20        if let Some(session) = self.session.as_ref() {
21            session.invalidate_runtime_caches();
22            // Restore the persisted tool catalog so the live registry matches the
23            // state being installed (mirrors `from_host_state`). Without this the
24            // registry keeps its prior generation/tools and silently diverges from
25            // `state`. `restore_state` adopts the snapshot's generation, so a
26            // surface that reached generation >= 2 restores cleanly.
27            if let Some(tool_state) = state.tool_state_snapshot.clone() {
28                let report = session
29                    .plugins()
30                    .tool_registry()
31                    .restore_state(tool_state)
32                    .map_err(|err| SessionError::Protocol(err.to_string()))?;
33                if !report.orphaned.is_empty() {
34                    tracing::warn!(
35                        session_id = %state.session_id,
36                        orphaned = ?report.orphaned,
37                        "persisted state installed with orphaned tools: no registered \
38                         source resolves them; they are Off until their source returns"
39                    );
40                }
41            }
42            let snapshot = state.plugin_snapshot.clone().unwrap_or_default();
43            session
44                .plugins()
45                .restore(&snapshot)
46                .map_err(|err| SessionError::Protocol(err.to_string()))?;
47            state.plugin_snapshot_revision =
48                Some(session.plugins().snapshot_revision_fingerprint());
49        }
50        self.policy = state.policy.clone();
51        self.protocol_turn_options = state.protocol_turn_options.clone();
52        self.state = state;
53        Ok(())
54    }
55
56    pub async fn append_session_nodes(
57        &mut self,
58        request: crate::AppendSessionNodesRequest,
59    ) -> Result<crate::AppendSessionNodesResult, SessionError> {
60        self.refresh_session_graph_from_store().await?;
61        if let Some(required) = request.requires_ancestor_node_id.as_deref()
62            && !self.state.session_graph.active_path_contains(required)
63        {
64            return Ok(crate::AppendSessionNodesResult::StaleBranch {
65                current_leaf_node_id: self.state.session_graph.leaf_node_id.clone(),
66            });
67        }
68        let node_ids = append_session_nodes_to_state_with_clock(
69            &mut self.state,
70            &request.nodes,
71            self.host.core.clock.as_ref(),
72        );
73        if let Some(session) = self.session.as_mut() {
74            let protocol_session = Arc::clone(session.plugins().protocol_session());
75            let session_id = self.state.session_id.clone();
76            protocol_session
77                .append_session_nodes(
78                    crate::plugin::ProtocolSessionContext::new(session, &session_id),
79                    &request.nodes,
80                )
81                .await?;
82        }
83        self.stamp_live_plugin_state();
84        if let Some(store) = self
85            .session
86            .as_ref()
87            .and_then(|session| session.history_store())
88        {
89            let graph = crate::store::GraphCommitDelta::Append {
90                nodes: node_ids
91                    .iter()
92                    .filter_map(|id| self.state.session_graph.find_node(id).cloned())
93                    .collect(),
94                leaf_node_id: self.state.session_graph.leaf_node_id.clone(),
95            };
96            let commit = crate::store::RuntimeCommit::persisted_state_with_graph_commit(
97                &self.state,
98                graph,
99                &[],
100            );
101            match super::commit_runtime_state_with_fresh_session_execution_lease(
102                store,
103                commit,
104                "runtime.append_session_nodes",
105                Arc::clone(&self.host.core.clock),
106            )
107            .await
108            {
109                Ok(result) => self.state.apply_persisted_commit_result(result),
110                Err(err) => tracing::warn!("failed to persist runtime state: {err}"),
111            }
112        }
113        Ok(crate::AppendSessionNodesResult::Appended {
114            node_ids,
115            leaf_node_id: self
116                .state
117                .session_graph
118                .leaf_node_id
119                .clone()
120                .unwrap_or_default(),
121        })
122    }
123
124    pub async fn apply_protocol_session_extension(
125        &mut self,
126        extension: crate::ProtocolSessionExtensionHandle,
127    ) -> Result<(), SessionError> {
128        let Some(session) = self.session.as_ref() else {
129            return Err(SessionError::Protocol(
130                "runtime session is not available".to_string(),
131            ));
132        };
133        let protocol_session = Arc::clone(session.plugins().protocol_session());
134        protocol_session.apply_session_extension(extension).await
135    }
136
137    pub async fn validate_protocol_turn_extension(
138        &mut self,
139        extension: &crate::ProtocolTurnExtensionHandle,
140    ) -> Result<(), SessionError> {
141        let Some(session) = self.session.as_ref() else {
142            return Err(SessionError::Protocol(
143                "runtime session is not available".to_string(),
144            ));
145        };
146        let protocol_session = Arc::clone(session.plugins().protocol_session());
147        protocol_session.validate_turn_extension(extension).await
148    }
149
150    pub async fn branch_to_node(
151        &mut self,
152        node_id: Option<String>,
153    ) -> Result<crate::SessionSnapshot, SessionError> {
154        let mut state = self.export_state();
155        state.session_graph.branch_to(node_id);
156        let mut persisted_state = RuntimeSessionState::from_snapshot(state);
157        normalize_session_graph(&mut persisted_state);
158
159        let policy = persisted_state.policy.clone();
160        let host = self.host.clone();
161        let services = self.services.clone();
162        let managed_sessions = Arc::clone(&self.managed_sessions);
163        let managed_turns = Arc::clone(&self.managed_turns);
164        let process_sync_needed = Arc::clone(&self.process_sync_needed);
165        let runtime_scope_id = Arc::clone(&self.runtime_scope_id);
166        let turn_phase_probe = self.turn_phase_probe.clone();
167
168        let mut rebuilt = Self::from_host_state(policy, host, services, persisted_state).await?;
169        rebuilt.managed_sessions = managed_sessions;
170        rebuilt.managed_turns = managed_turns;
171        rebuilt.process_sync_needed = process_sync_needed;
172        rebuilt.runtime_scope_id = runtime_scope_id;
173        rebuilt.turn_phase_probe = turn_phase_probe;
174
175        let exported = rebuilt.export_state();
176        *self = rebuilt;
177        Ok(exported)
178    }
179
180    /// Promote a managed child session into the foreground runtime.
181    ///
182    /// Child sessions created through `SessionLifecycleService::create_session` are real
183    /// runtimes, not serialized placeholders. Foreground activation must therefore
184    /// claim that runtime instead of reconstructing a new empty state in the UI.
185    pub async fn activate_managed_session(&mut self, session_id: &str) -> Result<(), SessionError> {
186        let child = {
187            let mut registry = self.managed_sessions.lock().await;
188            registry.remove(session_id).ok_or_else(|| {
189                SessionError::Protocol(format!("unknown managed session `{session_id}`"))
190            })?
191        };
192        let child = child.try_into_runtime().map_err(|_| {
193            SessionError::Protocol(format!("managed session `{session_id}` is still in use"))
194        })?;
195        *self = child;
196        Ok(())
197    }
198
199    /// Explicitly snapshot protocol-local execution state, if any.
200    pub async fn snapshot_execution_state(&mut self) -> Result<Option<Vec<u8>>, SessionError> {
201        let Some(session) = self.session.as_mut() else {
202            return Err(SessionError::Protocol(
203                "runtime session not available".to_string(),
204            ));
205        };
206        let code_executor = session
207            .plugins()
208            .code_executor()
209            .ok_or(SessionError::CodeExecutionUnavailable)?;
210        let session_id = self.state.session_id.clone();
211        let blob = code_executor
212            .snapshot_execution_state(crate::plugin::ProtocolSessionContext::new(
213                session,
214                &session_id,
215            ))
216            .await?;
217        self.state.execution_state_snapshot = blob.clone();
218        Ok(blob)
219    }
220
221    /// Explicitly restore protocol-local execution state from an opaque snapshot blob.
222    pub async fn restore_execution_state(&mut self, snapshot: &[u8]) -> Result<(), SessionError> {
223        let Some(session) = self.session.as_mut() else {
224            return Err(SessionError::Protocol(
225                "runtime session not available".to_string(),
226            ));
227        };
228        let code_executor = session
229            .plugins()
230            .code_executor()
231            .ok_or(SessionError::CodeExecutionUnavailable)?;
232        let session_id = self.state.session_id.clone();
233        code_executor
234            .restore_execution_state(
235                crate::plugin::ProtocolSessionContext::new(session, &session_id),
236                snapshot,
237            )
238            .await?;
239        self.state.execution_state_snapshot = Some(snapshot.to_vec());
240        Ok(())
241    }
242
243    pub async fn list_trigger_registrations(
244        &self,
245    ) -> Result<Vec<crate::TriggerRegistration>, SessionError> {
246        let store = self.host.trigger_store.as_ref().ok_or_else(|| {
247            SessionError::Protocol("trigger store is unavailable in this runtime".to_string())
248        })?;
249        let records = store
250            .list_subscriptions(crate::TriggerSubscriptionFilter::for_session(
251                self.state.session_id.clone(),
252            ))
253            .await
254            .map_err(|err| SessionError::Protocol(err.to_string()))?;
255        Ok(records
256            .iter()
257            .map(crate::TriggerRegistration::from)
258            .collect())
259    }
260
261    pub async fn trigger_registrations_by_source_type(
262        &self,
263        source_type: impl Into<crate::TriggerEventType>,
264    ) -> Result<Vec<crate::TriggerRegistration>, SessionError> {
265        let store = self.host.trigger_store.as_ref().ok_or_else(|| {
266            SessionError::Protocol("trigger store is unavailable in this runtime".to_string())
267        })?;
268        let mut filter =
269            crate::TriggerSubscriptionFilter::for_session(self.state.session_id.clone());
270        filter.source_type = Some(source_type.into().to_string());
271        let records = store
272            .list_subscriptions(filter)
273            .await
274            .map_err(|err| SessionError::Protocol(err.to_string()))?;
275        Ok(records
276            .iter()
277            .map(crate::TriggerRegistration::from)
278            .collect())
279    }
280
281    pub async fn query_plugin(
282        &self,
283        name: &str,
284        args: serde_json::Value,
285        session_id: Option<String>,
286    ) -> Result<(String, serde_json::Value), PluginOperationInvokeError> {
287        let manager = self.runtime_session_services()?;
288        let Some(session) = self.session.as_ref() else {
289            return Err(PluginOperationInvokeError::Unknown(
290                "runtime session not available".to_string(),
291            ));
292        };
293        session
294            .plugins()
295            .query_plugin(
296                name,
297                args,
298                session_id,
299                true,
300                manager.read_service(),
301                manager.process_read_service(),
302            )
303            .await
304    }
305
306    pub async fn run_plugin_command(
307        &mut self,
308        name: &str,
309        args: serde_json::Value,
310        session_id: Option<String>,
311    ) -> Result<crate::PluginCommandReceipt<serde_json::Value>, PluginOperationInvokeError> {
312        let manager = self.runtime_session_services()?;
313        let Some(session) = self.session.as_ref() else {
314            return Err(PluginOperationInvokeError::Unknown(
315                "runtime session not available".to_string(),
316            ));
317        };
318        let (plugin_id, outcome) = session
319            .plugins()
320            .run_plugin_command(
321                name,
322                args,
323                session_id,
324                true,
325                manager.state_service(),
326                manager.lifecycle_service(),
327                manager.graph_service(),
328                manager.process_service(),
329            )
330            .await?;
331        let (events, queued_batches) = self
332            .apply_plugin_operation_effects(&plugin_id, outcome.events, outcome.directives)
333            .await?;
334        Ok(crate::PluginCommandReceipt {
335            output: outcome.output,
336            events,
337            queued_batches,
338        })
339    }
340
341    pub async fn run_plugin_task(
342        &mut self,
343        name: &str,
344        args: serde_json::Value,
345        session_id: Option<String>,
346        scoped_effect_controller: crate::ScopedEffectController<'static>,
347        cancellation_token: tokio_util::sync::CancellationToken,
348    ) -> Result<crate::PluginTaskReceipt<serde_json::Value>, PluginOperationInvokeError> {
349        let manager = self.runtime_session_services()?;
350        let Some(session) = self.session.as_ref() else {
351            return Err(PluginOperationInvokeError::Unknown(
352                "runtime session not available".to_string(),
353            ));
354        };
355        let (plugin_id, outcome) = session
356            .plugins()
357            .run_plugin_task(
358                name,
359                args,
360                session_id,
361                true,
362                manager.state_service(),
363                manager.lifecycle_service(),
364                manager.graph_service(),
365                manager.process_service(),
366                scoped_effect_controller,
367                cancellation_token,
368            )
369            .await?;
370        let (events, queued_batches) = self
371            .apply_plugin_operation_effects(&plugin_id, outcome.events, outcome.directives)
372            .await?;
373        Ok(crate::PluginTaskReceipt {
374            output: outcome.output,
375            events,
376            queued_batches,
377        })
378    }
379
380    async fn apply_plugin_operation_effects(
381        &mut self,
382        plugin_id: &str,
383        events: Vec<crate::PluginRuntimeEvent>,
384        directives: Vec<crate::PluginRuntimeDirective>,
385    ) -> Result<
386        (
387            Vec<crate::PluginOwned<crate::PluginRuntimeEvent>>,
388            Vec<crate::runtime::QueuedWorkBatch>,
389        ),
390        PluginOperationInvokeError,
391    > {
392        let owned_events = events
393            .into_iter()
394            .map(|event| crate::PluginOwned {
395                plugin_id: plugin_id.to_string(),
396                value: event,
397            })
398            .collect::<Vec<_>>();
399        if !owned_events.is_empty() {
400            let nodes = owned_events
401                .iter()
402                .map(|owned| {
403                    crate::plugin_runtime_protocol_event(&owned.plugin_id, owned.value.clone())
404                        .map(crate::SessionAppendNode::protocol_event)
405                        .map_err(|err| {
406                            PluginOperationInvokeError::Failed(format!(
407                                "failed to encode plugin runtime event: {err}"
408                            ))
409                        })
410                })
411                .collect::<Result<Vec<_>, _>>()?;
412            self.append_plugin_runtime_event_nodes(&nodes).await?;
413        }
414        self.stamp_live_plugin_state();
415        self.persist_plugin_operation_state_if_needed().await?;
416
417        let mut queued_batches = Vec::new();
418        for directive in directives {
419            match directive {
420                crate::PluginRuntimeDirective::QueueTurn {
421                    input,
422                    delivery_policy,
423                    slot_policy,
424                    source_key,
425                } => {
426                    let batch = self
427                        .enqueue_turn_input(input, delivery_policy, slot_policy, source_key)
428                        .await
429                        .map_err(|err| {
430                            PluginOperationInvokeError::Failed(format!(
431                                "failed to queue plugin turn request: {err}"
432                            ))
433                        })?;
434                    queued_batches.push(batch);
435                }
436            }
437        }
438
439        Ok((owned_events, queued_batches))
440    }
441
442    async fn append_plugin_runtime_event_nodes(
443        &mut self,
444        nodes: &[crate::SessionAppendNode],
445    ) -> Result<(), PluginOperationInvokeError> {
446        let node_ids = append_session_nodes_to_state_with_clock(
447            &mut self.state,
448            nodes,
449            self.host.core.clock.as_ref(),
450        );
451        if let Some(store) = self
452            .session
453            .as_ref()
454            .and_then(|session| session.history_store())
455        {
456            let graph = crate::store::GraphCommitDelta::Append {
457                nodes: node_ids
458                    .iter()
459                    .filter_map(|id| self.state.session_graph.find_node(id).cloned())
460                    .collect(),
461                leaf_node_id: self.state.session_graph.leaf_node_id.clone(),
462            };
463            let commit = crate::store::RuntimeCommit::persisted_state_with_graph_commit(
464                &self.state,
465                graph,
466                &[],
467            );
468            let result = super::commit_runtime_state_with_fresh_session_execution_lease(
469                store,
470                commit,
471                "runtime.plugin_runtime_events",
472                Arc::clone(&self.host.core.clock),
473            )
474            .await
475            .map_err(|err| {
476                PluginOperationInvokeError::Failed(format!(
477                    "failed to persist plugin runtime events: {err}"
478                ))
479            })?;
480            self.state.apply_persisted_commit_result(result);
481        }
482        Ok(())
483    }
484
485    async fn persist_plugin_operation_state_if_needed(
486        &mut self,
487    ) -> Result<(), PluginOperationInvokeError> {
488        let Some(store) = self
489            .session
490            .as_ref()
491            .and_then(|session| session.history_store())
492        else {
493            return Ok(());
494        };
495        let commit = crate::store::RuntimeCommit::persisted_state(&self.state, &[]);
496        let result = super::commit_runtime_state_with_fresh_session_execution_lease(
497            store,
498            commit,
499            "runtime.plugin_operation_state",
500            Arc::clone(&self.host.core.clock),
501        )
502        .await
503        .map_err(|err| {
504            PluginOperationInvokeError::Failed(format!(
505                "failed to persist plugin operation state: {err}"
506            ))
507        })?;
508        self.state.apply_persisted_commit_result(result);
509        Ok(())
510    }
511}