Skip to main content

lash_core/runtime/
session_api.rs

1use super::*;
2
3fn rewrite_history_scope_id(
4    session_id: &str,
5    turn_index: usize,
6    trigger: &crate::RewriteTrigger,
7) -> String {
8    let trigger_id = match trigger {
9        crate::RewriteTrigger::Manual { .. } => "manual",
10        crate::RewriteTrigger::WindowShrink { .. } => "window-shrink",
11        crate::RewriteTrigger::Periodic => "periodic",
12    };
13    format!("{session_id}:rewrite-history:{trigger_id}:{turn_index}")
14}
15
16impl LashRuntime {
17    pub fn session_id(&self) -> &str {
18        &self.state.session_id
19    }
20
21    pub(super) fn stamp_live_plugin_state(&mut self) {
22        if let Some(session) = self.session.as_ref() {
23            let snapshot = session.plugins().tool_registry().export_state();
24            self.state.tool_state_generation = Some(snapshot.generation());
25            self.state.tool_state_snapshot = Some(snapshot);
26            let captured = session.plugins().snapshot();
27            crate::runtime::state::store_plugin_snapshot(&mut self.state.plugin_snapshot, captured);
28            self.state.plugin_snapshot_revision =
29                Some(session.plugins().snapshot_revision_fingerprint());
30        } else {
31            self.state.tool_state_generation = None;
32            self.state.tool_state_snapshot = None;
33            self.state.plugin_snapshot = None;
34            self.state.plugin_snapshot_revision = None;
35        }
36    }
37    pub(super) fn active_tool_catalog_shared(
38        &self,
39    ) -> Result<Arc<Vec<serde_json::Value>>, crate::PluginError> {
40        self.session
41            .as_ref()
42            .map(|session| session.shared_tool_catalog(&self.state.session_id))
43            .unwrap_or_else(|| Ok(Arc::new(Vec::new())))
44    }
45
46    pub fn tool_state(&self) -> Result<crate::ToolState, SessionError> {
47        let Some(session) = self.session.as_ref() else {
48            return Err(SessionError::Protocol(
49                "runtime session not available".to_string(),
50            ));
51        };
52        Ok(session.plugins().tool_registry().export_state())
53    }
54    /// Override protocol-owned turn options for this session.
55    pub fn set_protocol_turn_options(&mut self, options: crate::ProtocolTurnOptions) {
56        self.state.protocol_turn_options = options.clone();
57        if let Some(frame) = self.state.current_agent_frame_mut() {
58            frame.protocol_turn_options = options.clone();
59        }
60        self.protocol_turn_options = options;
61    }
62
63    /// Export current session state for inspection/UI purposes.
64    /// This keeps persistence-heavy snapshots untouched; callers that need a
65    /// fully persisted view should use `export_persisted_state`.
66    pub fn export_state(&self) -> crate::SessionSnapshot {
67        self.state.to_snapshot()
68    }
69
70    pub fn read_view(&self) -> crate::SessionReadView {
71        crate::SessionReadView::from_runtime_state(
72            &self.state,
73            self.state.effective_policy().clone(),
74            self.state.effective_protocol_turn_options().clone(),
75        )
76    }
77
78    /// Export the narrow persistence snapshot used by stores and resume logic.
79    pub fn export_persistence_state(&self) -> RuntimeSessionState {
80        self.state.clone()
81    }
82
83    pub fn apply_persistence_state(
84        &mut self,
85        state: RuntimeSessionState,
86    ) -> Result<(), SessionError> {
87        self.set_persisted_state(state)
88    }
89
90    pub(crate) fn export_graph_first_state(&self) -> RuntimeSessionState {
91        self.state.clone()
92    }
93
94    /// Export a persistence-ready state envelope with dynamic/plugin snapshots
95    /// refreshed from the live session.
96    pub fn export_persisted_state(&self) -> RuntimeSessionState {
97        let mut state = self.state.clone();
98        state.protocol_turn_options = self.protocol_turn_options.clone();
99        if let Some(frame) = state.current_agent_frame_mut() {
100            frame.protocol_turn_options = self.protocol_turn_options.clone();
101        }
102        if let Some(session) = self.session.as_ref() {
103            let snapshot = session.plugins().tool_registry().export_state();
104            state.tool_state_generation = Some(snapshot.generation());
105            state.tool_state_snapshot = Some(snapshot);
106            let captured = session.plugins().snapshot();
107            crate::runtime::state::store_plugin_snapshot(&mut state.plugin_snapshot, captured);
108            state.plugin_snapshot_revision =
109                Some(session.plugins().snapshot_revision_fingerprint());
110        }
111        normalize_session_graph(&mut state);
112        state
113    }
114
115    pub fn usage_report(&self) -> SessionUsageReport {
116        let mut entries = self.state.token_ledger.clone();
117        let drained = self.shared_token_ledger.lock().expect("token ledger lock");
118        for entry in drained.iter().cloned() {
119            merge_ledger_entry(&mut entries, entry);
120        }
121        SessionUsageReport::from_entries(&entries)
122    }
123
124    pub async fn await_background_work(&mut self) -> Result<(), SessionError> {
125        if self.process_sync_needed.swap(false, Ordering::AcqRel) {
126            self.refresh_session_graph_from_store().await?;
127        }
128        Ok(())
129    }
130
131    pub(super) async fn refresh_session_graph_from_store(&mut self) -> Result<(), SessionError> {
132        // Fresh replacement opens intentionally start from an empty resident
133        // graph and commit a full replacement. Do not resurrect the old head
134        // before that first commit.
135        if self.state.graph_replace_required && self.state.head_revision.is_none() {
136            return Ok(());
137        }
138        let Some(store) = self
139            .session
140            .as_ref()
141            .and_then(|session| session.history_store())
142        else {
143            return Ok(());
144        };
145        let scope = match self.residency {
146            crate::Residency::KeepAll => crate::store::SessionReadScope::FullGraph,
147            crate::Residency::ActivePathOnly => crate::store::SessionReadScope::ActivePath {
148                leaf_node_id: self.state.session_graph.leaf_node_id.clone(),
149            },
150        };
151        let Some(read) = store.load_session(scope).await.map_err(|err| {
152            SessionError::Protocol(format!("failed to refresh session graph from store: {err}"))
153        })?
154        else {
155            return Ok(());
156        };
157        let has_newer_graph = self.state.head_revision != Some(read.head_revision)
158            || read.graph.leaf_node_id != self.state.session_graph.leaf_node_id
159            || read.checkpoint_ref != self.state.checkpoint_ref;
160        if !has_newer_graph {
161            return Ok(());
162        }
163        let head = crate::store::SessionHead {
164            session_id: read.session_id.clone(),
165            head_revision: read.head_revision,
166            agent_frames: read.agent_frames.clone(),
167            current_agent_frame_id: read.current_agent_frame_id.clone(),
168            graph: read.graph,
169            config: read.config.clone(),
170            checkpoint_ref: read.checkpoint_ref.clone(),
171            token_ledger: merge_usage_delta_entries(read.token_ledger),
172        };
173        apply_session_head(&mut self.state, &head);
174        apply_session_checkpoint(&mut self.state, read.checkpoint);
175        self.policy = self.state.effective_policy().clone();
176        self.protocol_turn_options = self.state.effective_protocol_turn_options().clone();
177        Ok(())
178    }
179
180    pub(super) fn runtime_session_services(
181        &self,
182    ) -> Result<Arc<RuntimeSessionServices>, PluginActionInvokeError> {
183        Ok(Arc::new(RuntimeSessionServices::new(self, true, None)?))
184    }
185
186    pub(super) fn runtime_session_services_for_turn(
187        &self,
188        child_usage_event_relay: Option<ChildUsageEventRelay>,
189    ) -> Result<Arc<RuntimeSessionServices>, PluginActionInvokeError> {
190        Ok(Arc::new(RuntimeSessionServices::new(
191            self,
192            false,
193            child_usage_event_relay,
194        )?))
195    }
196
197    pub fn session_state_service(
198        &self,
199    ) -> Result<Arc<dyn crate::plugin::SessionStateService>, PluginActionInvokeError> {
200        self.runtime_session_services()
201            .map(|services| services.state_service())
202    }
203
204    pub fn session_lifecycle_service(
205        &self,
206    ) -> Result<Arc<dyn crate::plugin::SessionLifecycleService>, PluginActionInvokeError> {
207        self.runtime_session_services()
208            .map(|services| services.lifecycle_service())
209    }
210
211    pub fn session_graph_service(
212        &self,
213    ) -> Result<Arc<dyn crate::plugin::SessionGraphService>, PluginActionInvokeError> {
214        self.runtime_session_services()
215            .map(|services| services.graph_service())
216    }
217
218    pub fn process_service(
219        &self,
220    ) -> Result<Arc<dyn crate::ProcessService>, PluginActionInvokeError> {
221        self.runtime_session_services()
222            .map(|services| services.process_service())
223    }
224
225    pub fn process_cancel_ability(&self) -> Arc<dyn crate::ProcessCancelAbility> {
226        Arc::clone(&self.host.core.control.process_cancel_ability)
227    }
228
229    pub fn effect_host(&self) -> Arc<dyn crate::EffectHost> {
230        Arc::clone(&self.host.core.control.effect_host)
231    }
232
233    pub async fn enqueue_turn_input(
234        &self,
235        input: crate::TurnInput,
236        delivery_policy: crate::DeliveryPolicy,
237        slot_policy: crate::SlotPolicy,
238        source_key: Option<String>,
239    ) -> Result<crate::QueuedWorkBatch, RuntimeError> {
240        let store = self
241            .session
242            .as_ref()
243            .and_then(|session| session.history_store())
244            .ok_or_else(queued_turn_input_store_required)?;
245        enqueue_turn_input_to_store(
246            self.state.session_id.clone(),
247            store,
248            self.host.queued_work_poke.clone(),
249            input,
250            delivery_policy,
251            slot_policy,
252            source_key,
253        )
254        .await
255    }
256
257    pub async fn cancel_queued_work_batch(
258        &self,
259        session_id: &str,
260        batch_id: &str,
261    ) -> Result<Option<crate::QueuedWorkBatch>, RuntimeError> {
262        let store = self
263            .session
264            .as_ref()
265            .and_then(|session| session.history_store())
266            .ok_or_else(queued_turn_input_store_required)?;
267        store
268            .cancel_queued_work_batch(session_id, batch_id)
269            .await
270            .map_err(|err| RuntimeError::new(RuntimeErrorCode::StoreCommitFailed, err.to_string()))
271    }
272
273    /// The plugin session bound to the currently active runtime session, if any.
274    pub fn plugin_session(&self) -> Option<Arc<crate::PluginSession>> {
275        self.session.as_ref().map(|s| Arc::clone(s.plugins()))
276    }
277
278    /// Run the registered history rewrite pipeline against the current
279    /// state, applying the resulting messages back onto the runtime.
280    /// Returns true when at least one rewriter produced a summary or
281    /// otherwise mutated the message list.
282    pub async fn rewrite_history(
283        &mut self,
284        trigger: crate::RewriteTrigger,
285    ) -> Result<bool, PluginActionInvokeError> {
286        let services = self.runtime_session_services()?;
287        let Some(plugin_session) = self.session.as_ref().map(|s| Arc::clone(s.plugins())) else {
288            return Err(PluginActionInvokeError::Unknown(
289                "runtime session not available".to_string(),
290            ));
291        };
292        let ctx = crate::RewriteContext {
293            session_id: self.state.session_id.clone(),
294            trigger: trigger.clone(),
295            state: self.read_view(),
296            sessions: services.state_service(),
297            session_lifecycle: services.lifecycle_service(),
298            session_graph: services.graph_service(),
299            scoped_effect_controller: self
300                .host
301                .core
302                .control
303                .effect_host
304                .scoped(crate::EffectScope::runtime_operation(
305                    rewrite_history_scope_id(
306                        &self.state.session_id,
307                        self.state.turn_index,
308                        &trigger,
309                    ),
310                ))
311                .map_err(|err| PluginActionInvokeError::Unknown(err.to_string()))?,
312        };
313        let input = crate::HistoryState::from_snapshot(&self.state.to_snapshot());
314        let baseline_messages = input.messages.len();
315        let outcome = plugin_session
316            .rewrite_history(&ctx, input)
317            .await
318            .map_err(|err| {
319                PluginActionInvokeError::Unknown(format!("rewrite_history failed: {err}"))
320            })?;
321        let mutated =
322            outcome.metadata.produced_summary || outcome.messages.len() != baseline_messages;
323        if mutated {
324            self.state
325                .replace_active_read_state(&outcome.messages, &outcome.tool_calls);
326            if let Some(session) = self.session.as_ref() {
327                self.state.tool_state_snapshot = Some(session.tool_registry().export_state());
328                let captured = session.plugins().snapshot();
329                crate::runtime::state::store_plugin_snapshot(
330                    &mut self.state.plugin_snapshot,
331                    captured,
332                );
333                self.state.plugin_snapshot_revision =
334                    Some(session.plugins().snapshot_revision_fingerprint());
335            }
336        }
337        Ok(mutated)
338    }
339
340    pub(super) fn session_policy(&self) -> SessionPolicy {
341        self.policy.clone()
342    }
343
344    pub(super) async fn notify_session_config_changed(&self, previous: SessionPolicy) {
345        let Some(session) = self.session.as_ref() else {
346            return;
347        };
348        let current = self.session_policy();
349        if current == previous {
350            return;
351        }
352        let Ok(services) = self.runtime_session_services() else {
353            return;
354        };
355        session
356            .plugins()
357            .emit_runtime_event(crate::PluginLifecycleEvent::SessionConfigChanged(Box::new(
358                SessionConfigChangedContext {
359                    session_id: self.state.session_id.clone(),
360                    previous,
361                    current,
362                    sessions: services.state_service(),
363                },
364            )))
365            .await;
366    }
367
368    pub(super) async fn apply_session_config_mutations(&mut self, previous: SessionPolicy) {
369        let Some(session) = self.session.as_ref() else {
370            return;
371        };
372        let current = self.session_policy();
373        if current == previous {
374            return;
375        }
376        let Ok(services) = self.runtime_session_services() else {
377            return;
378        };
379        self.policy = session
380            .plugins()
381            .mutate_session_config(
382                SessionConfigChangedContext {
383                    session_id: self.state.session_id.clone(),
384                    previous,
385                    current,
386                    sessions: services.state_service(),
387                },
388                self.policy.clone(),
389            )
390            .await;
391        self.state.policy = self.policy.clone();
392    }
393}
394
395pub(in crate::runtime) async fn enqueue_turn_input_to_store(
396    session_id: String,
397    store: Arc<dyn crate::RuntimePersistence>,
398    queued_work_poke: Option<crate::QueuedWorkPoke>,
399    input: crate::TurnInput,
400    delivery_policy: crate::DeliveryPolicy,
401    slot_policy: crate::SlotPolicy,
402    source_key: Option<String>,
403) -> Result<crate::QueuedWorkBatch, RuntimeError> {
404    super::turn_loop::ensure_durable_effect_input(&input)?;
405    let mut draft = crate::QueuedWorkBatchDraft::new(
406        session_id,
407        delivery_policy,
408        slot_policy,
409        vec![crate::QueuedWorkPayload::turn_input(input)],
410    );
411    draft.source_key = source_key;
412    let enqueued = store
413        .enqueue_queued_work(draft)
414        .await
415        .map_err(|err| RuntimeError::new(RuntimeErrorCode::StoreCommitFailed, err.to_string()))?;
416    if let Some(poke) = queued_work_poke.as_ref() {
417        poke.poke_session(enqueued.session_id.clone(), "queued_turn_input");
418    }
419    Ok(enqueued)
420}
421
422impl LashRuntime {
423    pub async fn submit_session_command(
424        &mut self,
425        command: crate::SessionCommand,
426        idempotency_key: impl Into<String>,
427    ) -> Result<crate::SessionCommandReceipt, RuntimeError> {
428        let idempotency_key = idempotency_key.into();
429        if idempotency_key.trim().is_empty() {
430            return Err(RuntimeError::new(
431                RuntimeErrorCode::Other("session_command_idempotency_key".to_string()),
432                "session command idempotency key cannot be empty",
433            ));
434        }
435        let source_key = command.source_key(&idempotency_key);
436        let session_id = self.state.session_id.clone();
437        let Some(store) = self
438            .session
439            .as_ref()
440            .and_then(|session| session.history_store())
441        else {
442            let batch_id = format!("inline-command:{}", uuid::Uuid::new_v4());
443            self.apply_session_command(command, None).await?;
444            return Ok(crate::SessionCommandReceipt {
445                session_id,
446                batch_id,
447                source_key,
448            });
449        };
450        let draft = crate::QueuedWorkBatchDraft::new(
451            session_id.clone(),
452            crate::DeliveryPolicy::AfterCurrentTurnCommit,
453            crate::SlotPolicy::Exclusive,
454            vec![crate::QueuedWorkPayload::session_command(command)],
455        )
456        .with_source_key(source_key.clone());
457        let enqueued = store.enqueue_queued_work(draft).await.map_err(|err| {
458            RuntimeError::new(RuntimeErrorCode::StoreCommitFailed, err.to_string())
459        })?;
460        if let Some(poke) = self.host.queued_work_poke.as_ref() {
461            poke.poke_session(session_id.clone(), "session_command");
462        }
463        Ok(crate::SessionCommandReceipt {
464            session_id,
465            batch_id: enqueued.batch_id,
466            source_key,
467        })
468    }
469
470    pub async fn drain_next_session_command(
471        &mut self,
472    ) -> Result<Option<crate::SessionCommandReceipt>, RuntimeError> {
473        let Some(store) = self
474            .session
475            .as_ref()
476            .and_then(|session| session.history_store())
477        else {
478            return Ok(None);
479        };
480        let claim = store
481            .claim_ready_queued_work(
482                &self.state.session_id,
483                &self.runtime_scope_id,
484                crate::QueuedWorkClaimBoundary::Idle,
485                crate::QUEUED_WORK_CLAIM_TTL_MS,
486                1,
487            )
488            .await
489            .map_err(|err| {
490                RuntimeError::new(RuntimeErrorCode::StoreCommitFailed, err.to_string())
491            })?;
492        let Some(claim) = claim else {
493            return Ok(None);
494        };
495        let Some((batch, command)) = claim.exclusive_session_command() else {
496            store
497                .abandon_queued_work_claim(&claim)
498                .await
499                .map_err(|err| {
500                    RuntimeError::new(RuntimeErrorCode::StoreCommitFailed, err.to_string())
501                })?;
502            return Ok(None);
503        };
504        let batch_id = batch.batch_id.clone();
505        let source_key = batch.source_key.clone().unwrap_or_else(|| batch_id.clone());
506        let command = command.clone();
507        self.apply_session_command(command, Some(claim.completion()))
508            .await?;
509        Ok(Some(crate::SessionCommandReceipt {
510            session_id: self.state.session_id.clone(),
511            batch_id,
512            source_key,
513        }))
514    }
515
516    async fn apply_session_command(
517        &mut self,
518        command: crate::SessionCommand,
519        completion: Option<crate::QueuedWorkCompletion>,
520    ) -> Result<(), RuntimeError> {
521        self.refresh_session_graph_from_store()
522            .await
523            .map_err(|err| RuntimeError::new("session_command_refresh", err.to_string()))?;
524        let graph = match command {
525            crate::SessionCommand::RefreshToolSurface {
526                expected_generation,
527                ..
528            } => {
529                if let Some(expected) = expected_generation {
530                    let actual = self
531                        .tool_state()
532                        .map_err(|err| {
533                            RuntimeError::new("session_command_tool_state", err.to_string())
534                        })?
535                        .generation();
536                    if actual != expected {
537                        return Err(RuntimeError::new(
538                            "session_command_generation_mismatch",
539                            format!(
540                                "expected tool generation {expected}, but live generation is {actual}"
541                            ),
542                        ));
543                    }
544                }
545                self.refresh_session_tool_surface().await.map_err(|err| {
546                    RuntimeError::new("session_command_refresh_tools", err.to_string())
547                })?;
548                crate::store::GraphCommitDelta::Unchanged {
549                    leaf_node_id: self.state.session_graph.leaf_node_id.clone(),
550                }
551            }
552            crate::SessionCommand::EmitHostEvent {
553                resource_type,
554                alias,
555                event,
556                payload,
557            } => {
558                let effect_host = Arc::clone(&self.host.core.control.effect_host);
559                let drain_id = completion
560                    .as_ref()
561                    .map(|completion| completion.claim_id.clone())
562                    .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
563                let scoped = effect_host
564                    .scoped(crate::EffectScope::queue_drain(
565                        &self.state.session_id,
566                        drain_id,
567                    ))
568                    .map_err(|err| {
569                        RuntimeError::new("session_command_effect_scope", err.to_string())
570                    })?;
571                let (_report, graph) = self
572                    .emit_host_event_without_persist(
573                        &resource_type,
574                        &alias,
575                        &event,
576                        payload,
577                        scoped,
578                    )
579                    .await
580                    .map_err(|err| {
581                        RuntimeError::new("session_command_host_event", err.to_string())
582                    })?;
583                graph
584            }
585            crate::SessionCommand::ResetSession { .. } => {
586                let mut state = crate::RuntimeSessionState {
587                    session_id: self.state.session_id.clone(),
588                    policy: self.policy.clone(),
589                    graph_replace_required: true,
590                    ..crate::RuntimeSessionState::default()
591                };
592                state.ensure_agent_frame_initialized();
593                self.set_persisted_state(state)
594                    .map_err(|err| RuntimeError::new("session_command_reset", err.to_string()))?;
595                crate::store::GraphCommitDelta::ReplaceFull(self.state.session_graph.clone())
596            }
597        };
598        let Some(store) = self
599            .session
600            .as_ref()
601            .and_then(|session| session.history_store())
602        else {
603            return Ok(());
604        };
605        let mut commit =
606            crate::store::RuntimeCommit::persisted_state_with_graph_commit(&self.state, graph, &[]);
607        if let Some(completion) = completion {
608            commit = commit.completing_queue_claim(completion);
609        }
610        let result = store.commit_runtime_state(commit).await.map_err(|err| {
611            RuntimeError::new(RuntimeErrorCode::StoreCommitFailed, err.to_string())
612        })?;
613        self.state.apply_persisted_commit_result(result);
614        Ok(())
615    }
616}
617
618pub(in crate::runtime) fn queued_turn_input_store_required() -> RuntimeError {
619    RuntimeError::new(
620        RuntimeErrorCode::StoreCommitFailed,
621        "queued turn input requires a persistent runtime store",
622    )
623}