Skip to main content

lash_core/runtime/
session_api.rs

1use super::*;
2
3impl LashRuntime {
4    pub fn session_id(&self) -> &str {
5        &self.state.session_id
6    }
7
8    pub(super) fn stamp_live_plugin_state(&mut self) {
9        if let Some(session) = self.session.as_ref() {
10            let snapshot = session.plugins().tool_registry().export_state();
11            self.state.tool_state_generation = Some(snapshot.generation());
12            self.state.tool_state_snapshot = Some(snapshot);
13            let captured = session.plugins().snapshot();
14            crate::runtime::state::store_plugin_snapshot(&mut self.state.plugin_snapshot, captured);
15            self.state.plugin_snapshot_revision =
16                Some(session.plugins().snapshot_revision_fingerprint());
17        } else {
18            self.state.tool_state_generation = None;
19            self.state.tool_state_snapshot = None;
20            self.state.plugin_snapshot = None;
21            self.state.plugin_snapshot_revision = None;
22        }
23    }
24    pub(super) fn active_tool_catalog_shared(
25        &self,
26    ) -> Result<Arc<Vec<serde_json::Value>>, crate::PluginError> {
27        self.session
28            .as_ref()
29            .map(|session| session.shared_tool_catalog(&self.state.session_id))
30            .unwrap_or_else(|| Ok(Arc::new(Vec::new())))
31    }
32
33    pub fn tool_state(&self) -> Result<crate::ToolState, SessionError> {
34        let Some(session) = self.session.as_ref() else {
35            return Err(SessionError::Protocol(
36                "runtime session not available".to_string(),
37            ));
38        };
39        Ok(session.plugins().tool_registry().export_state())
40    }
41    /// Override protocol-owned turn options for this session.
42    pub fn set_protocol_turn_options(&mut self, options: crate::ProtocolTurnOptions) {
43        self.state.protocol_turn_options = options.clone();
44        if let Some(frame) = self.state.current_agent_frame_mut() {
45            frame.protocol_turn_options = options.clone();
46        }
47        self.protocol_turn_options = options;
48    }
49
50    /// Export current session state for inspection/UI purposes.
51    /// This keeps persistence-heavy snapshots untouched; callers that need a
52    /// fully persisted view should use `export_persisted_state`.
53    pub fn export_state(&self) -> crate::SessionSnapshot {
54        self.state.to_snapshot()
55    }
56
57    pub fn read_view(&self) -> crate::SessionReadView {
58        crate::SessionReadView::from_runtime_state(
59            &self.state,
60            self.state.effective_policy().clone(),
61            self.state.effective_protocol_turn_options().clone(),
62        )
63    }
64
65    /// Export the narrow persistence snapshot used by stores and resume logic.
66    pub fn export_persistence_state(&self) -> RuntimeSessionState {
67        self.state.clone()
68    }
69
70    pub fn apply_persistence_state(
71        &mut self,
72        state: RuntimeSessionState,
73    ) -> Result<(), SessionError> {
74        self.set_persisted_state(state)
75    }
76
77    pub(crate) fn export_graph_first_state(&self) -> RuntimeSessionState {
78        self.state.clone()
79    }
80
81    /// Export a persistence-ready state envelope with dynamic/plugin snapshots
82    /// refreshed from the live session.
83    pub fn export_persisted_state(&self) -> RuntimeSessionState {
84        let mut state = self.state.clone();
85        state.protocol_turn_options = self.protocol_turn_options.clone();
86        if let Some(frame) = state.current_agent_frame_mut() {
87            frame.protocol_turn_options = self.protocol_turn_options.clone();
88        }
89        if let Some(session) = self.session.as_ref() {
90            let snapshot = session.plugins().tool_registry().export_state();
91            state.tool_state_generation = Some(snapshot.generation());
92            state.tool_state_snapshot = Some(snapshot);
93            let captured = session.plugins().snapshot();
94            crate::runtime::state::store_plugin_snapshot(&mut state.plugin_snapshot, captured);
95            state.plugin_snapshot_revision =
96                Some(session.plugins().snapshot_revision_fingerprint());
97        }
98        normalize_session_graph(&mut state);
99        state
100    }
101
102    pub fn usage_report(&self) -> SessionUsageReport {
103        let mut entries = self.state.token_ledger.clone();
104        let drained = self.shared_token_ledger.lock().expect("token ledger lock");
105        for entry in drained.iter().cloned() {
106            merge_ledger_entry(&mut entries, entry);
107        }
108        SessionUsageReport::from_entries(&entries)
109    }
110
111    pub async fn await_background_work(&mut self) -> Result<(), SessionError> {
112        if self.process_sync_needed.swap(false, Ordering::AcqRel) {
113            self.refresh_session_graph_from_store().await?;
114        }
115        Ok(())
116    }
117
118    pub(super) async fn refresh_session_graph_from_store(&mut self) -> Result<(), SessionError> {
119        // Fresh replacement opens intentionally start from an empty resident
120        // graph and commit a full replacement. Do not resurrect the old head
121        // before that first commit.
122        if self.state.graph_replace_required && self.state.head_revision.is_none() {
123            return Ok(());
124        }
125        let Some(store) = self
126            .session
127            .as_ref()
128            .and_then(|session| session.history_store())
129        else {
130            return Ok(());
131        };
132        let scope = match self.residency {
133            crate::Residency::KeepAll => crate::store::SessionReadScope::FullGraph,
134            crate::Residency::ActivePathOnly => crate::store::SessionReadScope::ActivePath {
135                leaf_node_id: self.state.session_graph.leaf_node_id.clone(),
136            },
137        };
138        let Some(read) = store.load_session(scope).await.map_err(|err| {
139            SessionError::Protocol(format!("failed to refresh session graph from store: {err}"))
140        })?
141        else {
142            return Ok(());
143        };
144        let has_newer_graph = self.state.head_revision != Some(read.head_revision)
145            || read.graph.leaf_node_id != self.state.session_graph.leaf_node_id
146            || read.checkpoint_ref != self.state.checkpoint_ref;
147        if !has_newer_graph {
148            return Ok(());
149        }
150        let head = crate::store::SessionHead {
151            session_id: read.session_id.clone(),
152            head_revision: read.head_revision,
153            agent_frames: read.agent_frames.clone(),
154            current_agent_frame_id: read.current_agent_frame_id.clone(),
155            graph: read.graph,
156            config: read.config.clone(),
157            checkpoint_ref: read.checkpoint_ref.clone(),
158            token_ledger: merge_usage_delta_entries(read.token_ledger),
159        };
160        apply_session_head(&mut self.state, &head);
161        apply_session_checkpoint(&mut self.state, read.checkpoint);
162        self.policy = self.state.effective_policy().clone();
163        self.protocol_turn_options = self.state.effective_protocol_turn_options().clone();
164        Ok(())
165    }
166
167    pub(super) fn runtime_session_services(
168        &self,
169    ) -> Result<Arc<RuntimeSessionServices>, PluginActionInvokeError> {
170        Ok(Arc::new(RuntimeSessionServices::new(self, true, None)?))
171    }
172
173    pub(super) fn runtime_session_services_for_turn(
174        &self,
175        child_usage_event_relay: Option<ChildUsageEventRelay>,
176    ) -> Result<Arc<RuntimeSessionServices>, PluginActionInvokeError> {
177        Ok(Arc::new(RuntimeSessionServices::new(
178            self,
179            false,
180            child_usage_event_relay,
181        )?))
182    }
183
184    pub fn session_state_service(
185        &self,
186    ) -> Result<Arc<dyn crate::plugin::SessionStateService>, PluginActionInvokeError> {
187        self.runtime_session_services()
188            .map(|services| services.state_service())
189    }
190
191    pub fn session_lifecycle_service(
192        &self,
193    ) -> Result<Arc<dyn crate::plugin::SessionLifecycleService>, PluginActionInvokeError> {
194        self.runtime_session_services()
195            .map(|services| services.lifecycle_service())
196    }
197
198    pub fn session_graph_service(
199        &self,
200    ) -> Result<Arc<dyn crate::plugin::SessionGraphService>, PluginActionInvokeError> {
201        self.runtime_session_services()
202            .map(|services| services.graph_service())
203    }
204
205    pub fn process_service(
206        &self,
207    ) -> Result<Arc<dyn crate::ProcessService>, PluginActionInvokeError> {
208        self.runtime_session_services()
209            .map(|services| services.process_service())
210    }
211
212    pub fn process_cancel_ability(&self) -> Arc<dyn crate::ProcessCancelAbility> {
213        Arc::clone(&self.host.core.control.process_cancel_ability)
214    }
215
216    pub fn effect_host(&self) -> Arc<dyn crate::EffectHost> {
217        Arc::clone(&self.host.core.control.effect_host)
218    }
219
220    pub async fn enqueue_turn_input(
221        &self,
222        input: crate::TurnInput,
223        delivery_policy: crate::DeliveryPolicy,
224        slot_policy: crate::SlotPolicy,
225        source_key: Option<String>,
226    ) -> Result<crate::QueuedWorkBatch, RuntimeError> {
227        let store = self
228            .session
229            .as_ref()
230            .and_then(|session| session.history_store())
231            .ok_or_else(queued_turn_input_store_required)?;
232        enqueue_turn_input_to_store(
233            self.state.session_id.clone(),
234            store,
235            self.host.queued_work_driver.clone(),
236            input,
237            delivery_policy,
238            slot_policy,
239            source_key,
240        )
241        .await
242    }
243
244    pub async fn cancel_queued_work_batch(
245        &self,
246        session_id: &str,
247        batch_id: &str,
248    ) -> Result<Option<crate::QueuedWorkBatch>, RuntimeError> {
249        let store = self
250            .session
251            .as_ref()
252            .and_then(|session| session.history_store())
253            .ok_or_else(queued_turn_input_store_required)?;
254        store
255            .cancel_queued_work_batch(session_id, batch_id)
256            .await
257            .map_err(|err| RuntimeError::new(RuntimeErrorCode::StoreCommitFailed, err.to_string()))
258    }
259
260    /// The plugin session bound to the currently active runtime session, if any.
261    pub fn plugin_session(&self) -> Option<Arc<crate::PluginSession>> {
262        self.session.as_ref().map(|s| Arc::clone(s.plugins()))
263    }
264
265    pub fn open_agent_frame(
266        &mut self,
267        request: crate::OpenAgentFrameRequest,
268    ) -> crate::OpenAgentFrameResult {
269        open_agent_frame_in_state_with_clock(
270            &mut self.state,
271            request,
272            self.host.core.clock.as_ref(),
273        )
274    }
275
276    /// Run the registered compaction provider and commit the resulting
277    /// seed nodes into a fresh Agent Frame.
278    pub async fn compact_context(
279        &mut self,
280        instructions: Option<String>,
281        scoped_effect_controller: crate::ScopedEffectController<'_>,
282    ) -> Result<bool, PluginActionInvokeError> {
283        let services = self.runtime_session_services()?;
284        let Some(plugin_session) = self.session.as_ref().map(|s| Arc::clone(s.plugins())) else {
285            return Err(PluginActionInvokeError::Unknown(
286                "runtime session not available".to_string(),
287            ));
288        };
289        let ctx = crate::CompactionContext {
290            session_id: self.state.session_id.clone(),
291            state: self.read_view(),
292            instructions,
293            sessions: services.state_service(),
294            session_lifecycle: services.lifecycle_service(),
295            session_graph: services.graph_service(),
296            scoped_effect_controller,
297        };
298        let Some(compaction) = plugin_session.compact_context(&ctx).await.map_err(|err| {
299            PluginActionInvokeError::Unknown(format!("context compaction failed: {err}"))
300        })?
301        else {
302            return Ok(false);
303        };
304        let frame_id = format!(
305            "{}:frame:compaction:{}",
306            self.state.session_id,
307            uuid::Uuid::new_v4()
308        );
309        let result = self.open_agent_frame(
310            crate::OpenAgentFrameRequest::new(frame_id, crate::AgentFrameReason::compaction())
311                .with_initial_nodes(compaction.initial_nodes),
312        );
313        if result.opened {
314            self.stamp_live_plugin_state();
315        }
316        Ok(result.opened)
317    }
318
319    pub(super) fn session_policy(&self) -> SessionPolicy {
320        self.policy.clone()
321    }
322
323    pub(super) async fn notify_session_config_changed(&self, previous: SessionPolicy) {
324        let Some(session) = self.session.as_ref() else {
325            return;
326        };
327        let current = self.session_policy();
328        if current == previous {
329            return;
330        }
331        let Ok(services) = self.runtime_session_services() else {
332            return;
333        };
334        session
335            .plugins()
336            .emit_runtime_event(crate::PluginLifecycleEvent::SessionConfigChanged(Box::new(
337                SessionConfigChangedContext {
338                    session_id: self.state.session_id.clone(),
339                    previous,
340                    current,
341                    sessions: services.state_service(),
342                },
343            )))
344            .await;
345    }
346
347    pub(super) async fn apply_session_config_mutations(&mut self, previous: SessionPolicy) {
348        let Some(session) = self.session.as_ref() else {
349            return;
350        };
351        let current = self.session_policy();
352        if current == previous {
353            return;
354        }
355        let Ok(services) = self.runtime_session_services() else {
356            return;
357        };
358        self.policy = session
359            .plugins()
360            .mutate_session_config(
361                SessionConfigChangedContext {
362                    session_id: self.state.session_id.clone(),
363                    previous,
364                    current,
365                    sessions: services.state_service(),
366                },
367                self.policy.clone(),
368            )
369            .await;
370        self.state.policy = self.policy.clone();
371    }
372}
373
374pub(in crate::runtime) async fn enqueue_turn_input_to_store(
375    session_id: String,
376    store: Arc<dyn crate::RuntimePersistence>,
377    queued_work_driver: Option<crate::QueuedWorkDriver>,
378    input: crate::TurnInput,
379    delivery_policy: crate::DeliveryPolicy,
380    slot_policy: crate::SlotPolicy,
381    source_key: Option<String>,
382) -> Result<crate::QueuedWorkBatch, RuntimeError> {
383    super::turn_loop::ensure_durable_effect_input(&input)?;
384    let mut draft = crate::QueuedWorkBatchDraft::new(
385        session_id,
386        delivery_policy,
387        slot_policy,
388        vec![crate::QueuedWorkPayload::turn_input(input)],
389    );
390    draft.source_key = source_key;
391    let enqueued = store
392        .enqueue_queued_work(draft)
393        .await
394        .map_err(|err| RuntimeError::new(RuntimeErrorCode::StoreCommitFailed, err.to_string()))?;
395    if enqueued.delivery_policy == crate::DeliveryPolicy::AfterCurrentTurnCommit
396        && let Some(driver) = queued_work_driver.as_ref()
397    {
398        driver
399            .claim_and_run_pending(Some(&enqueued.session_id), "queued_turn_input")
400            .await
401            .map_err(|err| {
402                RuntimeError::new(
403                    RuntimeErrorCode::Other("queued_work".to_string()),
404                    err.to_string(),
405                )
406            })?;
407    }
408    Ok(enqueued)
409}
410
411impl LashRuntime {
412    pub async fn submit_session_command(
413        &mut self,
414        command: crate::SessionCommand,
415        idempotency_key: impl Into<String>,
416    ) -> Result<crate::SessionCommandReceipt, RuntimeError> {
417        let idempotency_key = idempotency_key.into();
418        if idempotency_key.trim().is_empty() {
419            return Err(RuntimeError::new(
420                RuntimeErrorCode::Other("session_command_idempotency_key".to_string()),
421                "session command idempotency key cannot be empty",
422            ));
423        }
424        let source_key = command.source_key(&idempotency_key);
425        let session_id = self.state.session_id.clone();
426        let Some(store) = self
427            .session
428            .as_ref()
429            .and_then(|session| session.history_store())
430        else {
431            let batch_id = format!("inline-command:{}", uuid::Uuid::new_v4());
432            self.apply_session_command(command, None).await?;
433            return Ok(crate::SessionCommandReceipt {
434                session_id,
435                batch_id,
436                source_key,
437            });
438        };
439        let draft = crate::QueuedWorkBatchDraft::new(
440            session_id.clone(),
441            crate::DeliveryPolicy::AfterCurrentTurnCommit,
442            crate::SlotPolicy::Exclusive,
443            vec![crate::QueuedWorkPayload::session_command(command)],
444        )
445        .with_source_key(source_key.clone());
446        let enqueued = store.enqueue_queued_work(draft).await.map_err(|err| {
447            RuntimeError::new(RuntimeErrorCode::StoreCommitFailed, err.to_string())
448        })?;
449        if let Some(driver) = self.host.queued_work_driver.as_ref() {
450            driver
451                .claim_and_run_pending(Some(&session_id), "session_command")
452                .await
453                .map_err(|err| {
454                    RuntimeError::new(
455                        RuntimeErrorCode::Other("queued_work".to_string()),
456                        err.to_string(),
457                    )
458                })?;
459        }
460        Ok(crate::SessionCommandReceipt {
461            session_id,
462            batch_id: enqueued.batch_id,
463            source_key,
464        })
465    }
466
467    pub async fn drain_next_session_command(
468        &mut self,
469    ) -> Result<Option<crate::SessionCommandReceipt>, RuntimeError> {
470        let Some(store) = self
471            .session
472            .as_ref()
473            .and_then(|session| session.history_store())
474        else {
475            return Ok(None);
476        };
477        let claim = store
478            .claim_ready_queued_work(
479                &self.state.session_id,
480                &self.runtime_scope_id,
481                crate::QueuedWorkClaimBoundary::Idle,
482                crate::QUEUED_WORK_CLAIM_TTL_MS,
483                1,
484            )
485            .await
486            .map_err(|err| {
487                RuntimeError::new(RuntimeErrorCode::StoreCommitFailed, err.to_string())
488            })?;
489        let Some(claim) = claim else {
490            return Ok(None);
491        };
492        let Some((batch, command)) = claim.exclusive_session_command() else {
493            store
494                .abandon_queued_work_claim(&claim)
495                .await
496                .map_err(|err| {
497                    RuntimeError::new(RuntimeErrorCode::StoreCommitFailed, err.to_string())
498                })?;
499            return Ok(None);
500        };
501        let batch_id = batch.batch_id.clone();
502        let source_key = batch.source_key.clone().unwrap_or_else(|| batch_id.clone());
503        let command = command.clone();
504        self.apply_session_command(command, Some(claim.completion()))
505            .await?;
506        Ok(Some(crate::SessionCommandReceipt {
507            session_id: self.state.session_id.clone(),
508            batch_id,
509            source_key,
510        }))
511    }
512
513    async fn apply_session_command(
514        &mut self,
515        command: crate::SessionCommand,
516        completion: Option<crate::QueuedWorkCompletion>,
517    ) -> Result<(), RuntimeError> {
518        self.refresh_session_graph_from_store()
519            .await
520            .map_err(|err| RuntimeError::new("session_command_refresh", err.to_string()))?;
521        let graph = match command {
522            crate::SessionCommand::RefreshToolCatalog { .. } => {
523                self.refresh_session_tool_catalog().await.map_err(|err| {
524                    RuntimeError::new("session_command_refresh_tools", err.to_string())
525                })?;
526                crate::store::GraphCommitDelta::Unchanged {
527                    leaf_node_id: self.state.session_graph.leaf_node_id.clone(),
528                }
529            }
530            crate::SessionCommand::ResetSession { .. } => {
531                let mut state = crate::RuntimeSessionState {
532                    session_id: self.state.session_id.clone(),
533                    policy: self.policy.clone(),
534                    graph_replace_required: true,
535                    ..crate::RuntimeSessionState::default()
536                };
537                state.ensure_agent_frame_initialized();
538                self.set_persisted_state(state)
539                    .map_err(|err| RuntimeError::new("session_command_reset", err.to_string()))?;
540                crate::store::GraphCommitDelta::ReplaceFull(self.state.session_graph.clone())
541            }
542        };
543        let Some(store) = self
544            .session
545            .as_ref()
546            .and_then(|session| session.history_store())
547        else {
548            return Ok(());
549        };
550        let mut commit =
551            crate::store::RuntimeCommit::persisted_state_with_graph_commit(&self.state, graph, &[]);
552        if let Some(completion) = completion {
553            commit = commit.completing_queue_claim(completion);
554        }
555        let result = store.commit_runtime_state(commit).await.map_err(|err| {
556            RuntimeError::new(RuntimeErrorCode::StoreCommitFailed, err.to_string())
557        })?;
558        self.state.apply_persisted_commit_result(result);
559        Ok(())
560    }
561}
562
563pub(in crate::runtime) fn queued_turn_input_store_required() -> RuntimeError {
564    RuntimeError::new(
565        RuntimeErrorCode::StoreCommitFailed,
566        "queued turn input requires a persistent runtime store",
567    )
568}