Skip to main content

lash_core/runtime/
session_api.rs

1use super::*;
2
3impl LashRuntime {
4    pub fn session_id(&self) -> &str {
5        &self.state.session_id
6    }
7
8    pub(super) fn stamp_live_plugin_state(&mut self) {
9        if let Some(session) = self.session.as_ref() {
10            let snapshot = session.plugins().tool_registry().export_state();
11            self.state.tool_state_generation = Some(snapshot.generation());
12            self.state.tool_state_snapshot = Some(snapshot);
13            let captured = session.plugins().snapshot();
14            crate::runtime::state::store_plugin_snapshot(&mut self.state.plugin_snapshot, captured);
15            self.state.plugin_snapshot_revision =
16                Some(session.plugins().snapshot_revision_fingerprint());
17        } else {
18            self.state.tool_state_generation = None;
19            self.state.tool_state_snapshot = None;
20            self.state.plugin_snapshot = None;
21            self.state.plugin_snapshot_revision = None;
22        }
23    }
24    pub(super) fn active_tool_catalog_shared(
25        &self,
26    ) -> Result<Arc<Vec<serde_json::Value>>, crate::PluginError> {
27        self.session
28            .as_ref()
29            .map(|session| session.shared_tool_catalog(&self.state.session_id))
30            .unwrap_or_else(|| Ok(Arc::new(Vec::new())))
31    }
32
33    pub fn tool_state(&self) -> Result<crate::ToolState, SessionError> {
34        let Some(session) = self.session.as_ref() else {
35            return Err(SessionError::Protocol(
36                "runtime session not available".to_string(),
37            ));
38        };
39        Ok(session.plugins().tool_registry().export_state())
40    }
41    /// Override protocol-owned turn options for this session.
42    pub fn set_protocol_turn_options(&mut self, options: crate::ProtocolTurnOptions) {
43        self.state.protocol_turn_options = options.clone();
44        if let Some(frame) = self.state.current_agent_frame_mut() {
45            frame.protocol_turn_options = options.clone();
46        }
47        self.protocol_turn_options = options;
48    }
49
50    /// Export current session state for inspection/UI purposes.
51    /// This keeps persistence-heavy snapshots untouched; callers that need a
52    /// fully persisted view should use `export_persisted_state`.
53    pub fn export_state(&self) -> crate::SessionSnapshot {
54        self.state.to_snapshot()
55    }
56
57    pub fn read_view(&self) -> crate::SessionReadView {
58        crate::SessionReadView::from_runtime_state(
59            &self.state,
60            self.state.effective_policy().clone(),
61            self.state.effective_protocol_turn_options().clone(),
62        )
63    }
64
65    /// Export the narrow persistence snapshot used by stores and resume logic.
66    pub fn export_persistence_state(&self) -> RuntimeSessionState {
67        self.state.clone()
68    }
69
70    pub fn apply_persistence_state(
71        &mut self,
72        state: RuntimeSessionState,
73    ) -> Result<(), SessionError> {
74        self.set_persisted_state(state)
75    }
76
77    pub(crate) fn export_graph_first_state(&self) -> RuntimeSessionState {
78        self.state.clone()
79    }
80
81    /// Export a persistence-ready state envelope with dynamic/plugin snapshots
82    /// refreshed from the live session.
83    pub fn export_persisted_state(&self) -> RuntimeSessionState {
84        let mut state = self.state.clone();
85        state.protocol_turn_options = self.protocol_turn_options.clone();
86        if let Some(frame) = state.current_agent_frame_mut() {
87            frame.protocol_turn_options = self.protocol_turn_options.clone();
88        }
89        if let Some(session) = self.session.as_ref() {
90            let snapshot = session.plugins().tool_registry().export_state();
91            state.tool_state_generation = Some(snapshot.generation());
92            state.tool_state_snapshot = Some(snapshot);
93            let captured = session.plugins().snapshot();
94            crate::runtime::state::store_plugin_snapshot(&mut state.plugin_snapshot, captured);
95            state.plugin_snapshot_revision =
96                Some(session.plugins().snapshot_revision_fingerprint());
97        }
98        normalize_session_graph(&mut state);
99        state
100    }
101
102    pub fn usage_report(&self) -> SessionUsageReport {
103        let mut entries = self.state.token_ledger.clone();
104        let drained = self.shared_token_ledger.lock().expect("token ledger lock");
105        for entry in drained.iter().cloned() {
106            merge_ledger_entry(&mut entries, entry);
107        }
108        SessionUsageReport::from_entries(&entries)
109    }
110
111    pub async fn await_background_work(&mut self) -> Result<(), SessionError> {
112        if self.process_sync_needed.swap(false, Ordering::AcqRel) {
113            self.refresh_session_graph_from_store().await?;
114        }
115        Ok(())
116    }
117
118    pub(super) async fn refresh_session_graph_from_store(&mut self) -> Result<(), SessionError> {
119        // Fresh replacement opens intentionally start from an empty resident
120        // graph and commit a full replacement. Do not resurrect the old head
121        // before that first commit.
122        if self.state.graph_replace_required && self.state.head_revision.is_none() {
123            return Ok(());
124        }
125        let Some(store) = self
126            .session
127            .as_ref()
128            .and_then(|session| session.history_store())
129        else {
130            return Ok(());
131        };
132        let scope = match self.residency {
133            crate::Residency::KeepAll => crate::store::SessionReadScope::FullGraph,
134            crate::Residency::ActivePathOnly => crate::store::SessionReadScope::ActivePath {
135                leaf_node_id: self.state.session_graph.leaf_node_id.clone(),
136            },
137        };
138        let Some(read) = store.load_session(scope).await.map_err(|err| {
139            SessionError::Protocol(format!("failed to refresh session graph from store: {err}"))
140        })?
141        else {
142            return Ok(());
143        };
144        let has_newer_graph = self.state.head_revision != Some(read.head_revision)
145            || read.graph.leaf_node_id != self.state.session_graph.leaf_node_id
146            || read.checkpoint_ref != self.state.checkpoint_ref;
147        if !has_newer_graph {
148            return Ok(());
149        }
150        let head = crate::store::SessionHead {
151            session_id: read.session_id.clone(),
152            head_revision: read.head_revision,
153            agent_frames: read.agent_frames.clone(),
154            current_agent_frame_id: read.current_agent_frame_id.clone(),
155            graph: read.graph,
156            config: read.config.clone(),
157            checkpoint_ref: read.checkpoint_ref.clone(),
158            token_ledger: merge_usage_delta_entries(read.token_ledger),
159        };
160        apply_session_head(&mut self.state, &head);
161        apply_session_checkpoint(&mut self.state, read.checkpoint);
162        self.policy = self.state.effective_policy().clone();
163        self.protocol_turn_options = self.state.effective_protocol_turn_options().clone();
164        Ok(())
165    }
166
167    pub(super) fn runtime_session_services(
168        &self,
169    ) -> Result<Arc<RuntimeSessionServices>, PluginActionInvokeError> {
170        Ok(Arc::new(RuntimeSessionServices::new(self, true, None)?))
171    }
172
173    pub(super) fn runtime_session_services_for_turn(
174        &self,
175        child_usage_event_relay: Option<ChildUsageEventRelay>,
176    ) -> Result<Arc<RuntimeSessionServices>, PluginActionInvokeError> {
177        Ok(Arc::new(RuntimeSessionServices::new(
178            self,
179            false,
180            child_usage_event_relay,
181        )?))
182    }
183
184    pub fn session_state_service(
185        &self,
186    ) -> Result<Arc<dyn crate::plugin::SessionStateService>, PluginActionInvokeError> {
187        self.runtime_session_services()
188            .map(|services| services.state_service())
189    }
190
191    pub fn session_lifecycle_service(
192        &self,
193    ) -> Result<Arc<dyn crate::plugin::SessionLifecycleService>, PluginActionInvokeError> {
194        self.runtime_session_services()
195            .map(|services| services.lifecycle_service())
196    }
197
198    pub fn session_graph_service(
199        &self,
200    ) -> Result<Arc<dyn crate::plugin::SessionGraphService>, PluginActionInvokeError> {
201        self.runtime_session_services()
202            .map(|services| services.graph_service())
203    }
204
205    pub fn process_service(
206        &self,
207    ) -> Result<Arc<dyn crate::ProcessService>, PluginActionInvokeError> {
208        self.runtime_session_services()
209            .map(|services| services.process_service())
210    }
211
212    pub fn process_cancel_ability(&self) -> Arc<dyn crate::ProcessCancelAbility> {
213        Arc::clone(&self.host.core.control.process_cancel_ability)
214    }
215
216    pub fn effect_host(&self) -> Arc<dyn crate::EffectHost> {
217        Arc::clone(&self.host.core.control.effect_host)
218    }
219
220    pub async fn enqueue_turn_input(
221        &self,
222        input: crate::TurnInput,
223        delivery_policy: crate::DeliveryPolicy,
224        slot_policy: crate::SlotPolicy,
225        source_key: Option<String>,
226    ) -> Result<crate::QueuedWorkBatch, RuntimeError> {
227        let store = self
228            .session
229            .as_ref()
230            .and_then(|session| session.history_store())
231            .ok_or_else(queued_turn_input_store_required)?;
232        enqueue_turn_input_to_store(
233            self.state.session_id.clone(),
234            store,
235            self.host.queued_work_poke.clone(),
236            input,
237            delivery_policy,
238            slot_policy,
239            source_key,
240        )
241        .await
242    }
243
244    pub async fn cancel_queued_work_batch(
245        &self,
246        session_id: &str,
247        batch_id: &str,
248    ) -> Result<Option<crate::QueuedWorkBatch>, RuntimeError> {
249        let store = self
250            .session
251            .as_ref()
252            .and_then(|session| session.history_store())
253            .ok_or_else(queued_turn_input_store_required)?;
254        store
255            .cancel_queued_work_batch(session_id, batch_id)
256            .await
257            .map_err(|err| RuntimeError::new(RuntimeErrorCode::StoreCommitFailed, err.to_string()))
258    }
259
260    /// The plugin session bound to the currently active runtime session, if any.
261    pub fn plugin_session(&self) -> Option<Arc<crate::PluginSession>> {
262        self.session.as_ref().map(|s| Arc::clone(s.plugins()))
263    }
264
265    pub fn open_agent_frame(
266        &mut self,
267        request: crate::OpenAgentFrameRequest,
268    ) -> crate::OpenAgentFrameResult {
269        open_agent_frame_in_state(&mut self.state, request)
270    }
271
272    /// Run the registered compaction provider and commit the resulting
273    /// seed nodes into a fresh Agent Frame.
274    pub async fn compact_context(
275        &mut self,
276        instructions: Option<String>,
277        scoped_effect_controller: crate::ScopedEffectController<'_>,
278    ) -> Result<bool, PluginActionInvokeError> {
279        let services = self.runtime_session_services()?;
280        let Some(plugin_session) = self.session.as_ref().map(|s| Arc::clone(s.plugins())) else {
281            return Err(PluginActionInvokeError::Unknown(
282                "runtime session not available".to_string(),
283            ));
284        };
285        let ctx = crate::CompactionContext {
286            session_id: self.state.session_id.clone(),
287            state: self.read_view(),
288            instructions,
289            sessions: services.state_service(),
290            session_lifecycle: services.lifecycle_service(),
291            session_graph: services.graph_service(),
292            scoped_effect_controller,
293        };
294        let Some(compaction) = plugin_session.compact_context(&ctx).await.map_err(|err| {
295            PluginActionInvokeError::Unknown(format!("context compaction failed: {err}"))
296        })?
297        else {
298            return Ok(false);
299        };
300        let frame_id = format!(
301            "{}:frame:compaction:{}",
302            self.state.session_id,
303            uuid::Uuid::new_v4()
304        );
305        let result = self.open_agent_frame(
306            crate::OpenAgentFrameRequest::new(frame_id, crate::AgentFrameReason::compaction())
307                .with_initial_nodes(compaction.initial_nodes),
308        );
309        if result.opened {
310            self.stamp_live_plugin_state();
311        }
312        Ok(result.opened)
313    }
314
315    pub(super) fn session_policy(&self) -> SessionPolicy {
316        self.policy.clone()
317    }
318
319    pub(super) async fn notify_session_config_changed(&self, previous: SessionPolicy) {
320        let Some(session) = self.session.as_ref() else {
321            return;
322        };
323        let current = self.session_policy();
324        if current == previous {
325            return;
326        }
327        let Ok(services) = self.runtime_session_services() else {
328            return;
329        };
330        session
331            .plugins()
332            .emit_runtime_event(crate::PluginLifecycleEvent::SessionConfigChanged(Box::new(
333                SessionConfigChangedContext {
334                    session_id: self.state.session_id.clone(),
335                    previous,
336                    current,
337                    sessions: services.state_service(),
338                },
339            )))
340            .await;
341    }
342
343    pub(super) async fn apply_session_config_mutations(&mut self, previous: SessionPolicy) {
344        let Some(session) = self.session.as_ref() else {
345            return;
346        };
347        let current = self.session_policy();
348        if current == previous {
349            return;
350        }
351        let Ok(services) = self.runtime_session_services() else {
352            return;
353        };
354        self.policy = session
355            .plugins()
356            .mutate_session_config(
357                SessionConfigChangedContext {
358                    session_id: self.state.session_id.clone(),
359                    previous,
360                    current,
361                    sessions: services.state_service(),
362                },
363                self.policy.clone(),
364            )
365            .await;
366        self.state.policy = self.policy.clone();
367    }
368}
369
370pub(in crate::runtime) async fn enqueue_turn_input_to_store(
371    session_id: String,
372    store: Arc<dyn crate::RuntimePersistence>,
373    queued_work_poke: Option<crate::QueuedWorkPoke>,
374    input: crate::TurnInput,
375    delivery_policy: crate::DeliveryPolicy,
376    slot_policy: crate::SlotPolicy,
377    source_key: Option<String>,
378) -> Result<crate::QueuedWorkBatch, RuntimeError> {
379    super::turn_loop::ensure_durable_effect_input(&input)?;
380    let mut draft = crate::QueuedWorkBatchDraft::new(
381        session_id,
382        delivery_policy,
383        slot_policy,
384        vec![crate::QueuedWorkPayload::turn_input(input)],
385    );
386    draft.source_key = source_key;
387    let enqueued = store
388        .enqueue_queued_work(draft)
389        .await
390        .map_err(|err| RuntimeError::new(RuntimeErrorCode::StoreCommitFailed, err.to_string()))?;
391    if let Some(poke) = queued_work_poke.as_ref() {
392        poke.poke_session(enqueued.session_id.clone(), "queued_turn_input");
393    }
394    Ok(enqueued)
395}
396
397impl LashRuntime {
398    pub async fn submit_session_command(
399        &mut self,
400        command: crate::SessionCommand,
401        idempotency_key: impl Into<String>,
402    ) -> Result<crate::SessionCommandReceipt, RuntimeError> {
403        let idempotency_key = idempotency_key.into();
404        if idempotency_key.trim().is_empty() {
405            return Err(RuntimeError::new(
406                RuntimeErrorCode::Other("session_command_idempotency_key".to_string()),
407                "session command idempotency key cannot be empty",
408            ));
409        }
410        let source_key = command.source_key(&idempotency_key);
411        let session_id = self.state.session_id.clone();
412        let Some(store) = self
413            .session
414            .as_ref()
415            .and_then(|session| session.history_store())
416        else {
417            let batch_id = format!("inline-command:{}", uuid::Uuid::new_v4());
418            self.apply_session_command(command, None).await?;
419            return Ok(crate::SessionCommandReceipt {
420                session_id,
421                batch_id,
422                source_key,
423            });
424        };
425        let draft = crate::QueuedWorkBatchDraft::new(
426            session_id.clone(),
427            crate::DeliveryPolicy::AfterCurrentTurnCommit,
428            crate::SlotPolicy::Exclusive,
429            vec![crate::QueuedWorkPayload::session_command(command)],
430        )
431        .with_source_key(source_key.clone());
432        let enqueued = store.enqueue_queued_work(draft).await.map_err(|err| {
433            RuntimeError::new(RuntimeErrorCode::StoreCommitFailed, err.to_string())
434        })?;
435        if let Some(poke) = self.host.queued_work_poke.as_ref() {
436            poke.poke_session(session_id.clone(), "session_command");
437        }
438        Ok(crate::SessionCommandReceipt {
439            session_id,
440            batch_id: enqueued.batch_id,
441            source_key,
442        })
443    }
444
445    pub async fn drain_next_session_command(
446        &mut self,
447    ) -> Result<Option<crate::SessionCommandReceipt>, RuntimeError> {
448        let Some(store) = self
449            .session
450            .as_ref()
451            .and_then(|session| session.history_store())
452        else {
453            return Ok(None);
454        };
455        let claim = store
456            .claim_ready_queued_work(
457                &self.state.session_id,
458                &self.runtime_scope_id,
459                crate::QueuedWorkClaimBoundary::Idle,
460                crate::QUEUED_WORK_CLAIM_TTL_MS,
461                1,
462            )
463            .await
464            .map_err(|err| {
465                RuntimeError::new(RuntimeErrorCode::StoreCommitFailed, err.to_string())
466            })?;
467        let Some(claim) = claim else {
468            return Ok(None);
469        };
470        let Some((batch, command)) = claim.exclusive_session_command() else {
471            store
472                .abandon_queued_work_claim(&claim)
473                .await
474                .map_err(|err| {
475                    RuntimeError::new(RuntimeErrorCode::StoreCommitFailed, err.to_string())
476                })?;
477            return Ok(None);
478        };
479        let batch_id = batch.batch_id.clone();
480        let source_key = batch.source_key.clone().unwrap_or_else(|| batch_id.clone());
481        let command = command.clone();
482        self.apply_session_command(command, Some(claim.completion()))
483            .await?;
484        Ok(Some(crate::SessionCommandReceipt {
485            session_id: self.state.session_id.clone(),
486            batch_id,
487            source_key,
488        }))
489    }
490
491    async fn apply_session_command(
492        &mut self,
493        command: crate::SessionCommand,
494        completion: Option<crate::QueuedWorkCompletion>,
495    ) -> Result<(), RuntimeError> {
496        self.refresh_session_graph_from_store()
497            .await
498            .map_err(|err| RuntimeError::new("session_command_refresh", err.to_string()))?;
499        let graph = match command {
500            crate::SessionCommand::RefreshToolSurface {
501                expected_generation,
502                ..
503            } => {
504                if let Some(expected) = expected_generation {
505                    let actual = self
506                        .tool_state()
507                        .map_err(|err| {
508                            RuntimeError::new("session_command_tool_state", err.to_string())
509                        })?
510                        .generation();
511                    if actual != expected {
512                        return Err(RuntimeError::new(
513                            "session_command_generation_mismatch",
514                            format!(
515                                "expected tool generation {expected}, but live generation is {actual}"
516                            ),
517                        ));
518                    }
519                }
520                self.refresh_session_tool_surface().await.map_err(|err| {
521                    RuntimeError::new("session_command_refresh_tools", err.to_string())
522                })?;
523                crate::store::GraphCommitDelta::Unchanged {
524                    leaf_node_id: self.state.session_graph.leaf_node_id.clone(),
525                }
526            }
527            crate::SessionCommand::ResetSession { .. } => {
528                let mut state = crate::RuntimeSessionState {
529                    session_id: self.state.session_id.clone(),
530                    policy: self.policy.clone(),
531                    graph_replace_required: true,
532                    ..crate::RuntimeSessionState::default()
533                };
534                state.ensure_agent_frame_initialized();
535                self.set_persisted_state(state)
536                    .map_err(|err| RuntimeError::new("session_command_reset", err.to_string()))?;
537                crate::store::GraphCommitDelta::ReplaceFull(self.state.session_graph.clone())
538            }
539        };
540        let Some(store) = self
541            .session
542            .as_ref()
543            .and_then(|session| session.history_store())
544        else {
545            return Ok(());
546        };
547        let mut commit =
548            crate::store::RuntimeCommit::persisted_state_with_graph_commit(&self.state, graph, &[]);
549        if let Some(completion) = completion {
550            commit = commit.completing_queue_claim(completion);
551        }
552        let result = store.commit_runtime_state(commit).await.map_err(|err| {
553            RuntimeError::new(RuntimeErrorCode::StoreCommitFailed, err.to_string())
554        })?;
555        self.state.apply_persisted_commit_result(result);
556        Ok(())
557    }
558}
559
560pub(in crate::runtime) fn queued_turn_input_store_required() -> RuntimeError {
561    RuntimeError::new(
562        RuntimeErrorCode::StoreCommitFailed,
563        "queued turn input requires a persistent runtime store",
564    )
565}