Skip to main content

lash_core/runtime/
session_api.rs

1use super::*;
2
3impl LashRuntime {
4    pub fn session_id(&self) -> &str {
5        &self.state.session_id
6    }
7
8    pub(super) fn stamp_live_plugin_state(&mut self) {
9        if let Some(session) = self.session.as_ref() {
10            let snapshot = session.plugins().tool_registry().export_state();
11            self.state.tool_state_generation = Some(snapshot.generation());
12            self.state.tool_state_snapshot = Some(snapshot);
13            let captured = session.plugins().snapshot();
14            crate::runtime::state::store_plugin_snapshot(&mut self.state.plugin_snapshot, captured);
15            self.state.plugin_snapshot_revision =
16                Some(session.plugins().snapshot_revision_fingerprint());
17        } else {
18            self.state.tool_state_generation = None;
19            self.state.tool_state_snapshot = None;
20            self.state.plugin_snapshot = None;
21            self.state.plugin_snapshot_revision = None;
22        }
23    }
24    pub(super) fn active_tool_catalog_shared(
25        &self,
26    ) -> Result<Arc<Vec<serde_json::Value>>, crate::PluginError> {
27        self.session
28            .as_ref()
29            .map(|session| session.shared_tool_catalog(&self.state.session_id))
30            .unwrap_or_else(|| Ok(Arc::new(Vec::new())))
31    }
32
33    pub fn tool_state(&self) -> Result<crate::ToolState, SessionError> {
34        let Some(session) = self.session.as_ref() else {
35            return Err(SessionError::Protocol(
36                "runtime session not available".to_string(),
37            ));
38        };
39        Ok(session.plugins().tool_registry().export_state())
40    }
41    /// Override protocol-owned turn options for this session.
42    pub fn set_protocol_turn_options(&mut self, options: crate::ProtocolTurnOptions) {
43        self.state.protocol_turn_options = options.clone();
44        if let Some(frame) = self.state.current_agent_frame_mut() {
45            frame.protocol_turn_options = options.clone();
46        }
47        self.protocol_turn_options = options;
48    }
49
50    /// The durable protocol turn options recorded on the session.
51    pub fn protocol_turn_options(&self) -> &crate::ProtocolTurnOptions {
52        &self.state.protocol_turn_options
53    }
54
55    /// Override protocol-owned turn options and mirror them to **every** agent
56    /// frame. Used by the protocol materialization hook (apply-at-open): the
57    /// last applied value is recorded on the session and on all frames.
58    pub fn set_protocol_turn_options_all_frames(&mut self, options: crate::ProtocolTurnOptions) {
59        self.state.protocol_turn_options = options.clone();
60        for frame in &mut self.state.agent_frames {
61            frame.protocol_turn_options = options.clone();
62        }
63        self.protocol_turn_options = options;
64    }
65
66    /// Run the protocol plugin's materialization hook against this runtime.
67    ///
68    /// Fires the [`ProtocolSessionPlugin::configure_runtime_on_materialize`]
69    /// hook, so both the child-create path and the root/builder-open path
70    /// converge on one seam. `plugin_options` are the plugin-keyed options that
71    /// reached this materialization (builder options for root opens, request
72    /// options for child create); `is_root_session` distinguishes root from
73    /// child.
74    pub fn configure_protocol_on_materialize(
75        &mut self,
76        plugin_options: &crate::PluginOptions,
77        is_root_session: bool,
78    ) -> Result<(), crate::PluginError> {
79        let protocol_session = self
80            .session
81            .as_ref()
82            .map(|session| Arc::clone(session.plugins().protocol_session()));
83        if let Some(protocol_session) = protocol_session {
84            let materialization = crate::plugin::ProtocolSessionMaterialization {
85                plugin_options,
86                is_root_session,
87            };
88            protocol_session
89                .configure_runtime_on_materialize(
90                    crate::plugin::ProtocolRuntimeContext::new(self),
91                    materialization,
92                )
93                .map_err(|err| crate::PluginError::Session(err.to_string()))?;
94        }
95        Ok(())
96    }
97
98    /// Export current session state for inspection/UI purposes.
99    /// This keeps persistence-heavy snapshots untouched; callers that need a
100    /// fully persisted view should use `export_persisted_state`.
101    pub fn export_state(&self) -> crate::SessionSnapshot {
102        self.state.to_snapshot()
103    }
104
105    pub fn read_view(&self) -> crate::SessionReadView {
106        crate::SessionReadView::from_runtime_state(
107            &self.state,
108            self.state.effective_policy().clone(),
109            self.state.effective_protocol_turn_options().clone(),
110        )
111    }
112
113    /// Export the narrow persistence snapshot used by stores and resume logic.
114    pub fn export_persistence_state(&self) -> RuntimeSessionState {
115        self.state.clone()
116    }
117
118    pub fn apply_persistence_state(
119        &mut self,
120        state: RuntimeSessionState,
121    ) -> Result<(), SessionError> {
122        self.set_persisted_state(state)
123    }
124
125    pub(crate) fn export_graph_first_state(&self) -> RuntimeSessionState {
126        self.state.clone()
127    }
128
129    /// Export a persistence-ready state envelope with dynamic/plugin snapshots
130    /// refreshed from the live session.
131    pub fn export_persisted_state(&self) -> RuntimeSessionState {
132        let mut state = self.state.clone();
133        state.protocol_turn_options = self.protocol_turn_options.clone();
134        if let Some(frame) = state.current_agent_frame_mut() {
135            frame.protocol_turn_options = self.protocol_turn_options.clone();
136        }
137        if let Some(session) = self.session.as_ref() {
138            let snapshot = session.plugins().tool_registry().export_state();
139            state.tool_state_generation = Some(snapshot.generation());
140            state.tool_state_snapshot = Some(snapshot);
141            let captured = session.plugins().snapshot();
142            crate::runtime::state::store_plugin_snapshot(&mut state.plugin_snapshot, captured);
143            state.plugin_snapshot_revision =
144                Some(session.plugins().snapshot_revision_fingerprint());
145        }
146        normalize_session_graph(&mut state);
147        state
148    }
149
150    pub fn usage_report(&self) -> SessionUsageReport {
151        let mut entries = self.state.token_ledger.clone();
152        let drained = self.shared_token_ledger.lock().expect("token ledger lock");
153        for entry in drained.iter().cloned() {
154            merge_ledger_entry(&mut entries, entry);
155        }
156        SessionUsageReport::from_entries(&entries)
157    }
158
159    pub async fn await_background_work(&mut self) -> Result<(), SessionError> {
160        if self.process_sync_needed.swap(false, Ordering::AcqRel) {
161            self.refresh_session_graph_from_store().await?;
162        }
163        Ok(())
164    }
165
166    pub(super) async fn refresh_session_graph_from_store(&mut self) -> Result<(), SessionError> {
167        // Fresh replacement opens intentionally start from an empty resident
168        // graph and commit a full replacement. Do not resurrect the old head
169        // before that first commit.
170        if self.state.graph_replace_required && self.state.head_revision.is_none() {
171            return Ok(());
172        }
173        let Some(store) = self
174            .session
175            .as_ref()
176            .and_then(|session| session.history_store())
177        else {
178            return Ok(());
179        };
180        let scope = match self.residency {
181            crate::Residency::KeepAll => crate::store::SessionReadScope::FullGraph,
182            crate::Residency::ActivePathOnly => crate::store::SessionReadScope::ActivePath {
183                leaf_node_id: self.state.session_graph.leaf_node_id.clone(),
184            },
185        };
186        let Some(read) = store.load_session(scope).await.map_err(|err| {
187            SessionError::Protocol(format!("failed to refresh session graph from store: {err}"))
188        })?
189        else {
190            return Ok(());
191        };
192        let has_newer_graph = self.state.head_revision != Some(read.head_revision)
193            || read.graph.leaf_node_id != self.state.session_graph.leaf_node_id
194            || read.checkpoint_ref != self.state.checkpoint_ref;
195        if !has_newer_graph {
196            return Ok(());
197        }
198        let head = crate::store::SessionHead {
199            session_id: read.session_id.clone(),
200            head_revision: read.head_revision,
201            agent_frames: read.agent_frames.clone(),
202            current_agent_frame_id: read.current_agent_frame_id.clone(),
203            graph: read.graph,
204            config: read.config.clone(),
205            checkpoint_ref: read.checkpoint_ref.clone(),
206            token_ledger: merge_usage_delta_entries(read.token_ledger),
207        };
208        apply_session_head(&mut self.state, &head);
209        apply_session_checkpoint(&mut self.state, read.checkpoint);
210        self.policy = self.state.effective_policy().clone();
211        self.protocol_turn_options = self.state.effective_protocol_turn_options().clone();
212        Ok(())
213    }
214
215    pub(super) fn runtime_session_services(
216        &self,
217    ) -> Result<Arc<RuntimeSessionServices>, PluginOperationInvokeError> {
218        Ok(Arc::new(RuntimeSessionServices::new(self, true, None)?))
219    }
220
221    pub(super) fn runtime_session_services_for_turn(
222        &self,
223        child_usage_event_relay: Option<ChildUsageEventRelay>,
224    ) -> Result<Arc<RuntimeSessionServices>, PluginOperationInvokeError> {
225        Ok(Arc::new(RuntimeSessionServices::new(
226            self,
227            false,
228            child_usage_event_relay,
229        )?))
230    }
231
232    pub fn session_state_service(
233        &self,
234    ) -> Result<Arc<dyn crate::plugin::SessionStateService>, PluginOperationInvokeError> {
235        self.runtime_session_services()
236            .map(|services| services.state_service())
237    }
238
239    pub fn session_lifecycle_service(
240        &self,
241    ) -> Result<Arc<dyn crate::plugin::SessionLifecycleService>, PluginOperationInvokeError> {
242        self.runtime_session_services()
243            .map(|services| services.lifecycle_service())
244    }
245
246    pub fn session_graph_service(
247        &self,
248    ) -> Result<Arc<dyn crate::plugin::SessionGraphService>, PluginOperationInvokeError> {
249        self.runtime_session_services()
250            .map(|services| services.graph_service())
251    }
252
253    pub fn process_service(
254        &self,
255    ) -> Result<Arc<dyn crate::ProcessService>, PluginOperationInvokeError> {
256        self.runtime_session_services()
257            .map(|services| services.process_service())
258    }
259
260    pub fn process_cancel_ability(&self) -> Arc<dyn crate::ProcessCancelAbility> {
261        Arc::clone(&self.host.core.control.process_cancel_ability)
262    }
263
264    pub fn effect_host(&self) -> Arc<dyn crate::EffectHost> {
265        Arc::clone(&self.host.core.control.effect_host)
266    }
267
268    pub async fn enqueue_turn_input(
269        &self,
270        input: crate::TurnInput,
271        ingress: crate::TurnInputIngress,
272        source_key: Option<String>,
273    ) -> Result<crate::PendingTurnInput, RuntimeError> {
274        let store = self
275            .session
276            .as_ref()
277            .and_then(|session| session.history_store())
278            .ok_or_else(queued_turn_input_store_required)?;
279        enqueue_turn_input_to_store(
280            self.state.session_id.clone(),
281            store,
282            self.host.queued_work_driver.clone(),
283            input,
284            ingress,
285            source_key,
286        )
287        .await
288    }
289
290    pub async fn cancel_queued_work_batch(
291        &self,
292        session_id: &str,
293        batch_id: &str,
294    ) -> Result<Option<crate::QueuedWorkBatch>, RuntimeError> {
295        let store = self
296            .session
297            .as_ref()
298            .and_then(|session| session.history_store())
299            .ok_or_else(queued_turn_input_store_required)?;
300        store
301            .cancel_queued_work_batch(session_id, batch_id)
302            .await
303            .map_err(|err| RuntimeError::new(RuntimeErrorCode::StoreCommitFailed, err.to_string()))
304    }
305
306    /// The plugin session bound to the currently active runtime session, if any.
307    pub fn plugin_session(&self) -> Option<Arc<crate::PluginSession>> {
308        self.session.as_ref().map(|s| Arc::clone(s.plugins()))
309    }
310
311    pub fn open_agent_frame(
312        &mut self,
313        request: crate::OpenAgentFrameRequest,
314    ) -> crate::OpenAgentFrameResult {
315        open_agent_frame_in_state_with_clock(
316            &mut self.state,
317            request,
318            self.host.core.clock.as_ref(),
319        )
320    }
321
322    /// Run the registered compaction provider and commit the resulting
323    /// seed nodes into a fresh Agent Frame.
324    pub async fn compact_context(
325        &mut self,
326        instructions: Option<String>,
327        scoped_effect_controller: crate::ScopedEffectController<'_>,
328    ) -> Result<bool, PluginOperationInvokeError> {
329        let services = self.runtime_session_services()?;
330        let Some(plugin_session) = self.session.as_ref().map(|s| Arc::clone(s.plugins())) else {
331            return Err(PluginOperationInvokeError::Unknown(
332                "runtime session not available".to_string(),
333            ));
334        };
335        let ctx = crate::CompactionContext {
336            session_id: self.state.session_id.clone(),
337            state: self.read_view(),
338            instructions,
339            sessions: services.state_service(),
340            session_lifecycle: services.lifecycle_service(),
341            session_graph: services.graph_service(),
342            scoped_effect_controller,
343        };
344        let Some(compaction) = plugin_session.compact_context(&ctx).await.map_err(|err| {
345            PluginOperationInvokeError::Unknown(format!("context compaction failed: {err}"))
346        })?
347        else {
348            return Ok(false);
349        };
350        let frame_id = format!(
351            "{}:frame:compaction:{}",
352            self.state.session_id,
353            uuid::Uuid::new_v4()
354        );
355        let result = self.open_agent_frame(
356            crate::OpenAgentFrameRequest::new(frame_id, crate::AgentFrameReason::compaction())
357                .with_initial_nodes(compaction.initial_nodes),
358        );
359        if result.opened {
360            self.stamp_live_plugin_state();
361        }
362        Ok(result.opened)
363    }
364
365    pub(super) fn session_policy(&self) -> SessionPolicy {
366        self.policy.clone()
367    }
368
369    pub(super) async fn notify_session_config_changed(&self, previous: SessionPolicy) {
370        let Some(session) = self.session.as_ref() else {
371            return;
372        };
373        let current = self.session_policy();
374        if current == previous {
375            return;
376        }
377        let Ok(services) = self.runtime_session_services() else {
378            return;
379        };
380        session
381            .plugins()
382            .emit_runtime_event(crate::PluginLifecycleEvent::SessionConfigChanged(Box::new(
383                SessionConfigChangedContext {
384                    session_id: self.state.session_id.clone(),
385                    previous,
386                    current,
387                    sessions: services.state_service(),
388                },
389            )))
390            .await;
391    }
392
393    pub(super) async fn apply_session_config_mutations(&mut self, previous: SessionPolicy) {
394        let Some(session) = self.session.as_ref() else {
395            return;
396        };
397        let current = self.session_policy();
398        if current == previous {
399            return;
400        }
401        let Ok(services) = self.runtime_session_services() else {
402            return;
403        };
404        self.policy = session
405            .plugins()
406            .mutate_session_config(
407                SessionConfigChangedContext {
408                    session_id: self.state.session_id.clone(),
409                    previous,
410                    current,
411                    sessions: services.state_service(),
412                },
413                self.policy.clone(),
414            )
415            .await;
416        self.state.policy = self.policy.clone();
417    }
418}
419
420pub(in crate::runtime) async fn enqueue_turn_input_to_store(
421    session_id: String,
422    store: Arc<dyn crate::RuntimePersistence>,
423    queued_work_driver: Option<crate::QueuedWorkDriver>,
424    input: crate::TurnInput,
425    ingress: crate::TurnInputIngress,
426    source_key: Option<String>,
427) -> Result<crate::PendingTurnInput, RuntimeError> {
428    super::turn_loop::ensure_durable_effect_input(&input)?;
429    let is_next_turn = matches!(ingress, crate::TurnInputIngress::NextTurn);
430    let mut draft = crate::PendingTurnInputDraft::new(session_id, ingress, input);
431    draft.source_key = source_key;
432    let enqueued = store
433        .enqueue_pending_turn_input(draft)
434        .await
435        .map_err(|err| RuntimeError::new(RuntimeErrorCode::StoreCommitFailed, err.to_string()))?;
436    if is_next_turn && let Some(driver) = queued_work_driver.as_ref() {
437        driver
438            .claim_and_run_pending(Some(&enqueued.session_id), "queued_turn_input")
439            .await
440            .map_err(|err| {
441                RuntimeError::new(
442                    RuntimeErrorCode::Other("queued_work".to_string()),
443                    err.to_string(),
444                )
445            })?;
446    }
447    Ok(enqueued)
448}
449
450impl LashRuntime {
451    pub async fn submit_session_command(
452        &mut self,
453        command: crate::SessionCommand,
454        idempotency_key: impl Into<String>,
455    ) -> Result<crate::SessionCommandReceipt, RuntimeError> {
456        let idempotency_key = idempotency_key.into();
457        if idempotency_key.trim().is_empty() {
458            return Err(RuntimeError::new(
459                RuntimeErrorCode::Other("session_command_idempotency_key".to_string()),
460                "session command idempotency key cannot be empty",
461            ));
462        }
463        let source_key = command.source_key(&idempotency_key);
464        let session_id = self.state.session_id.clone();
465        let Some(store) = self
466            .session
467            .as_ref()
468            .and_then(|session| session.history_store())
469        else {
470            let batch_id = format!("inline-command:{}", uuid::Uuid::new_v4());
471            self.apply_session_command(command, None, None).await?;
472            return Ok(crate::SessionCommandReceipt {
473                session_id,
474                batch_id,
475                source_key,
476            });
477        };
478        let draft = crate::QueuedWorkBatchDraft::new(
479            session_id.clone(),
480            crate::DeliveryPolicy::AfterCurrentTurnCommit,
481            crate::SlotPolicy::Exclusive,
482            vec![crate::QueuedWorkPayload::session_command(command)],
483        )
484        .with_source_key(source_key.clone());
485        let enqueued = store.enqueue_queued_work(draft).await.map_err(|err| {
486            RuntimeError::new(RuntimeErrorCode::StoreCommitFailed, err.to_string())
487        })?;
488        if let Some(driver) = self.host.queued_work_driver.as_ref() {
489            driver
490                .claim_and_run_pending(Some(&session_id), "session_command")
491                .await
492                .map_err(|err| {
493                    RuntimeError::new(
494                        RuntimeErrorCode::Other("queued_work".to_string()),
495                        err.to_string(),
496                    )
497                })?;
498        }
499        Ok(crate::SessionCommandReceipt {
500            session_id,
501            batch_id: enqueued.batch_id,
502            source_key,
503        })
504    }
505
506    pub async fn drain_next_session_command(
507        &mut self,
508        session_execution_lease: &crate::SessionExecutionLeaseFence,
509    ) -> Result<Option<crate::SessionCommandReceipt>, RuntimeError> {
510        let Some(store) = self
511            .session
512            .as_ref()
513            .and_then(|session| session.history_store())
514        else {
515            return Ok(None);
516        };
517        let claim = store
518            .claim_leading_ready_session_command(
519                &self.state.session_id,
520                session_execution_lease,
521                &self.runtime_lease_owner,
522                self.host.core.control.lease_timings.ttl_ms(),
523            )
524            .await
525            .map_err(super::runtime_error_from_store_commit)?;
526        let Some(claim) = claim else {
527            return Ok(None);
528        };
529        let Some((batch, command)) = claim.exclusive_session_command() else {
530            return Err(RuntimeError::new(
531                "session_command_claim",
532                format!(
533                    "queued-work claim `{}` did not contain exactly one session command batch",
534                    claim.claim_id
535                ),
536            ));
537        };
538        let batch_id = batch.batch_id.clone();
539        let source_key = batch.source_key.clone().unwrap_or_else(|| batch_id.clone());
540        let command = command.clone();
541        self.apply_session_command(
542            command,
543            Some(claim.completion()),
544            Some(session_execution_lease),
545        )
546        .await?;
547        Ok(Some(crate::SessionCommandReceipt {
548            session_id: self.state.session_id.clone(),
549            batch_id,
550            source_key,
551        }))
552    }
553
554    async fn apply_session_command(
555        &mut self,
556        command: crate::SessionCommand,
557        completion: Option<crate::QueuedWorkCompletion>,
558        session_execution_lease: Option<&crate::SessionExecutionLeaseFence>,
559    ) -> Result<(), RuntimeError> {
560        self.refresh_session_graph_from_store()
561            .await
562            .map_err(|err| RuntimeError::new("session_command_refresh", err.to_string()))?;
563        let graph = match command {
564            crate::SessionCommand::RefreshToolCatalog { .. } => {
565                self.refresh_session_tool_catalog().await.map_err(|err| {
566                    RuntimeError::new("session_command_refresh_tools", err.to_string())
567                })?;
568                crate::store::GraphCommitDelta::Unchanged {
569                    leaf_node_id: self.state.session_graph.leaf_node_id.clone(),
570                }
571            }
572            crate::SessionCommand::ResetSession { .. } => {
573                let mut state = crate::RuntimeSessionState {
574                    session_id: self.state.session_id.clone(),
575                    policy: self.policy.clone(),
576                    graph_replace_required: true,
577                    ..crate::RuntimeSessionState::default()
578                };
579                state.ensure_agent_frame_initialized();
580                self.set_persisted_state(state)
581                    .map_err(|err| RuntimeError::new("session_command_reset", err.to_string()))?;
582                crate::store::GraphCommitDelta::ReplaceFull(self.state.session_graph.clone())
583            }
584        };
585        let Some(store) = self
586            .session
587            .as_ref()
588            .and_then(|session| session.history_store())
589        else {
590            return Ok(());
591        };
592        let mut commit =
593            crate::store::RuntimeCommit::persisted_state_with_graph_commit(&self.state, graph, &[]);
594        let Some(session_execution_lease) = session_execution_lease else {
595            return Err(RuntimeError::new(
596                RuntimeErrorCode::StoreCommitFailed,
597                "session command commit requires a session execution lease",
598            ));
599        };
600        commit = commit.with_session_execution_lease(session_execution_lease.clone());
601        if let Some(completion) = completion {
602            commit = commit.completing_queue_claim(completion);
603        }
604        let result = store
605            .commit_runtime_state(commit)
606            .await
607            .map_err(super::runtime_error_from_store_commit)?;
608        self.state.apply_persisted_commit_result(result);
609        Ok(())
610    }
611}
612
613pub(in crate::runtime) fn queued_turn_input_store_required() -> RuntimeError {
614    RuntimeError::new(
615        RuntimeErrorCode::StoreCommitFailed,
616        "queued turn input requires a persistent runtime store",
617    )
618}