Skip to main content

lash_core/runtime/
session_api.rs

1use super::*;
2
3fn rewrite_history_scope_id(
4    session_id: &str,
5    turn_index: usize,
6    trigger: &crate::RewriteTrigger,
7) -> String {
8    let trigger_id = match trigger {
9        crate::RewriteTrigger::Manual { .. } => "manual",
10        crate::RewriteTrigger::WindowShrink { .. } => "window-shrink",
11        crate::RewriteTrigger::Periodic => "periodic",
12    };
13    format!("{session_id}:rewrite-history:{trigger_id}:{turn_index}")
14}
15
16impl LashRuntime {
17    pub fn session_id(&self) -> &str {
18        &self.state.session_id
19    }
20
21    pub(super) fn stamp_live_plugin_state(&mut self) {
22        if let Some(session) = self.session.as_ref() {
23            let snapshot = session.plugins().tool_registry().export_state();
24            self.state.tool_state_generation = Some(snapshot.generation());
25            self.state.tool_state_snapshot = Some(snapshot);
26            let captured = session.plugins().snapshot();
27            crate::runtime::state::store_plugin_snapshot(&mut self.state.plugin_snapshot, captured);
28            self.state.plugin_snapshot_revision =
29                Some(session.plugins().snapshot_revision_fingerprint());
30        } else {
31            self.state.tool_state_generation = None;
32            self.state.tool_state_snapshot = None;
33            self.state.plugin_snapshot = None;
34            self.state.plugin_snapshot_revision = None;
35        }
36    }
37    pub(super) fn active_tool_catalog_shared(
38        &self,
39    ) -> Result<Arc<Vec<serde_json::Value>>, crate::PluginError> {
40        self.session
41            .as_ref()
42            .map(|session| session.shared_tool_catalog(&self.state.session_id))
43            .unwrap_or_else(|| Ok(Arc::new(Vec::new())))
44    }
45
46    pub fn tool_state(&self) -> Result<crate::ToolState, SessionError> {
47        let Some(session) = self.session.as_ref() else {
48            return Err(SessionError::Protocol(
49                "runtime session not available".to_string(),
50            ));
51        };
52        Ok(session.plugins().tool_registry().export_state())
53    }
54    /// Override protocol-owned turn options for this session.
55    pub fn set_protocol_turn_options(&mut self, options: crate::ProtocolTurnOptions) {
56        self.state.protocol_turn_options = options.clone();
57        if let Some(frame) = self.state.current_agent_frame_mut() {
58            frame.protocol_turn_options = options.clone();
59        }
60        self.protocol_turn_options = options;
61    }
62
63    /// Export current session state for inspection/UI purposes.
64    /// This keeps persistence-heavy snapshots untouched; callers that need a
65    /// fully persisted view should use `export_persisted_state`.
66    pub fn export_state(&self) -> crate::SessionSnapshot {
67        self.state.to_snapshot()
68    }
69
70    pub fn read_view(&self) -> crate::SessionReadView {
71        crate::SessionReadView::from_runtime_state(
72            &self.state,
73            self.state.effective_policy().clone(),
74            self.state.effective_protocol_turn_options().clone(),
75        )
76    }
77
78    /// Export the narrow persistence snapshot used by stores and resume logic.
79    pub fn export_persistence_state(&self) -> RuntimeSessionState {
80        self.state.clone()
81    }
82
83    pub fn apply_persistence_state(
84        &mut self,
85        state: RuntimeSessionState,
86    ) -> Result<(), SessionError> {
87        self.set_persisted_state(state)
88    }
89
90    pub(crate) fn export_graph_first_state(&self) -> RuntimeSessionState {
91        self.state.clone()
92    }
93
94    /// Export a persistence-ready state envelope with dynamic/plugin snapshots
95    /// refreshed from the live session.
96    pub fn export_persisted_state(&self) -> RuntimeSessionState {
97        let mut state = self.state.clone();
98        state.protocol_turn_options = self.protocol_turn_options.clone();
99        if let Some(frame) = state.current_agent_frame_mut() {
100            frame.protocol_turn_options = self.protocol_turn_options.clone();
101        }
102        if let Some(session) = self.session.as_ref() {
103            let snapshot = session.plugins().tool_registry().export_state();
104            state.tool_state_generation = Some(snapshot.generation());
105            state.tool_state_snapshot = Some(snapshot);
106            let captured = session.plugins().snapshot();
107            crate::runtime::state::store_plugin_snapshot(&mut state.plugin_snapshot, captured);
108            state.plugin_snapshot_revision =
109                Some(session.plugins().snapshot_revision_fingerprint());
110        }
111        normalize_session_graph(&mut state);
112        state
113    }
114
115    pub fn usage_report(&self) -> SessionUsageReport {
116        let mut entries = self.state.token_ledger.clone();
117        let drained = self.shared_token_ledger.lock().expect("token ledger lock");
118        for entry in drained.iter().cloned() {
119            merge_ledger_entry(&mut entries, entry);
120        }
121        SessionUsageReport::from_entries(&entries)
122    }
123
124    pub async fn await_background_work(&mut self) -> Result<(), SessionError> {
125        if self.process_sync_needed.swap(false, Ordering::AcqRel) {
126            self.refresh_session_graph_from_store().await?;
127        }
128        Ok(())
129    }
130
131    pub(super) async fn refresh_session_graph_from_store(&mut self) -> Result<(), SessionError> {
132        // Fresh replacement opens intentionally start from an empty resident
133        // graph and commit a full replacement. Do not resurrect the old head
134        // before that first commit.
135        if self.state.graph_replace_required && self.state.head_revision.is_none() {
136            return Ok(());
137        }
138        let Some(store) = self
139            .session
140            .as_ref()
141            .and_then(|session| session.history_store())
142        else {
143            return Ok(());
144        };
145        let scope = match self.residency {
146            crate::Residency::KeepAll => crate::store::SessionReadScope::FullGraph,
147            crate::Residency::ActivePathOnly => crate::store::SessionReadScope::ActivePath {
148                leaf_node_id: self.state.session_graph.leaf_node_id.clone(),
149            },
150        };
151        let Some(read) = store.load_session(scope).await.map_err(|err| {
152            SessionError::Protocol(format!("failed to refresh session graph from store: {err}"))
153        })?
154        else {
155            return Ok(());
156        };
157        let has_newer_graph = self.state.head_revision != Some(read.head_revision)
158            || read.graph.leaf_node_id != self.state.session_graph.leaf_node_id
159            || read.checkpoint_ref != self.state.checkpoint_ref;
160        if !has_newer_graph {
161            return Ok(());
162        }
163        let head = crate::store::SessionHead {
164            session_id: read.session_id.clone(),
165            head_revision: read.head_revision,
166            agent_frames: read.agent_frames.clone(),
167            current_agent_frame_id: read.current_agent_frame_id.clone(),
168            graph: read.graph,
169            config: read.config.clone(),
170            checkpoint_ref: read.checkpoint_ref.clone(),
171            token_ledger: merge_usage_delta_entries(read.token_ledger),
172        };
173        apply_session_head(&mut self.state, &head);
174        apply_session_checkpoint(&mut self.state, read.checkpoint);
175        self.policy = self.state.effective_policy().clone();
176        self.protocol_turn_options = self.state.effective_protocol_turn_options().clone();
177        Ok(())
178    }
179
180    pub(super) fn runtime_session_services(
181        &self,
182    ) -> Result<Arc<RuntimeSessionServices>, PluginActionInvokeError> {
183        Ok(Arc::new(RuntimeSessionServices::new(self, true, None)?))
184    }
185
186    pub(super) fn runtime_session_services_for_turn(
187        &self,
188        child_usage_event_relay: Option<ChildUsageEventRelay>,
189    ) -> Result<Arc<RuntimeSessionServices>, PluginActionInvokeError> {
190        Ok(Arc::new(RuntimeSessionServices::new(
191            self,
192            false,
193            child_usage_event_relay,
194        )?))
195    }
196
197    pub fn session_state_service(
198        &self,
199    ) -> Result<Arc<dyn crate::plugin::SessionStateService>, PluginActionInvokeError> {
200        self.runtime_session_services()
201            .map(|services| services.state_service())
202    }
203
204    pub fn session_lifecycle_service(
205        &self,
206    ) -> Result<Arc<dyn crate::plugin::SessionLifecycleService>, PluginActionInvokeError> {
207        self.runtime_session_services()
208            .map(|services| services.lifecycle_service())
209    }
210
211    pub fn session_graph_service(
212        &self,
213    ) -> Result<Arc<dyn crate::plugin::SessionGraphService>, PluginActionInvokeError> {
214        self.runtime_session_services()
215            .map(|services| services.graph_service())
216    }
217
218    pub fn process_service(
219        &self,
220    ) -> Result<Arc<dyn crate::ProcessService>, PluginActionInvokeError> {
221        self.runtime_session_services()
222            .map(|services| services.process_service())
223    }
224
225    pub fn process_cancel_ability(&self) -> Arc<dyn crate::ProcessCancelAbility> {
226        Arc::clone(&self.host.core.control.process_cancel_ability)
227    }
228
229    pub fn effect_host(&self) -> Arc<dyn crate::EffectHost> {
230        Arc::clone(&self.host.core.control.effect_host)
231    }
232
233    pub async fn enqueue_turn_input(
234        &self,
235        input: crate::TurnInput,
236        delivery_policy: crate::DeliveryPolicy,
237        slot_policy: crate::SlotPolicy,
238        source_key: Option<String>,
239    ) -> Result<crate::QueuedWorkBatch, RuntimeError> {
240        let store = self
241            .session
242            .as_ref()
243            .and_then(|session| session.history_store())
244            .ok_or_else(queued_turn_input_store_required)?;
245        enqueue_turn_input_to_store(
246            self.state.session_id.clone(),
247            store,
248            self.host.queued_work_poke.clone(),
249            input,
250            delivery_policy,
251            slot_policy,
252            source_key,
253        )
254        .await
255    }
256
257    pub async fn cancel_queued_work_batch(
258        &self,
259        session_id: &str,
260        batch_id: &str,
261    ) -> Result<Option<crate::QueuedWorkBatch>, RuntimeError> {
262        let store = self
263            .session
264            .as_ref()
265            .and_then(|session| session.history_store())
266            .ok_or_else(queued_turn_input_store_required)?;
267        store
268            .cancel_queued_work_batch(session_id, batch_id)
269            .await
270            .map_err(|err| RuntimeError::new(RuntimeErrorCode::StoreCommitFailed, err.to_string()))
271    }
272
273    /// The plugin session bound to the currently active runtime session, if any.
274    pub fn plugin_session(&self) -> Option<Arc<crate::PluginSession>> {
275        self.session.as_ref().map(|s| Arc::clone(s.plugins()))
276    }
277
278    /// Run the registered history rewrite pipeline against the current
279    /// state, applying the resulting messages back onto the runtime.
280    /// Returns true when at least one rewriter produced a summary or
281    /// otherwise mutated the message list.
282    pub async fn rewrite_history(
283        &mut self,
284        trigger: crate::RewriteTrigger,
285    ) -> Result<bool, PluginActionInvokeError> {
286        let services = self.runtime_session_services()?;
287        let Some(plugin_session) = self.session.as_ref().map(|s| Arc::clone(s.plugins())) else {
288            return Err(PluginActionInvokeError::Unknown(
289                "runtime session not available".to_string(),
290            ));
291        };
292        let ctx = crate::RewriteContext {
293            session_id: self.state.session_id.clone(),
294            trigger: trigger.clone(),
295            state: self.read_view(),
296            sessions: services.state_service(),
297            session_lifecycle: services.lifecycle_service(),
298            session_graph: services.graph_service(),
299            scoped_effect_controller: self
300                .host
301                .core
302                .control
303                .effect_host
304                .scoped(crate::EffectScope::runtime_operation(
305                    rewrite_history_scope_id(
306                        &self.state.session_id,
307                        self.state.turn_index,
308                        &trigger,
309                    ),
310                ))
311                .map_err(|err| PluginActionInvokeError::Unknown(err.to_string()))?,
312        };
313        let input = crate::HistoryState::from_snapshot(&self.state.to_snapshot());
314        let baseline_messages = input.messages.len();
315        let outcome = plugin_session
316            .rewrite_history(&ctx, input)
317            .await
318            .map_err(|err| {
319                PluginActionInvokeError::Unknown(format!("rewrite_history failed: {err}"))
320            })?;
321        let mutated =
322            outcome.metadata.produced_summary || outcome.messages.len() != baseline_messages;
323        if mutated {
324            self.state.replace_active_read_state(&outcome.messages);
325            if let Some(session) = self.session.as_ref() {
326                self.state.tool_state_snapshot = Some(session.tool_registry().export_state());
327                let captured = session.plugins().snapshot();
328                crate::runtime::state::store_plugin_snapshot(
329                    &mut self.state.plugin_snapshot,
330                    captured,
331                );
332                self.state.plugin_snapshot_revision =
333                    Some(session.plugins().snapshot_revision_fingerprint());
334            }
335        }
336        Ok(mutated)
337    }
338
339    pub(super) fn session_policy(&self) -> SessionPolicy {
340        self.policy.clone()
341    }
342
343    pub(super) async fn notify_session_config_changed(&self, previous: SessionPolicy) {
344        let Some(session) = self.session.as_ref() else {
345            return;
346        };
347        let current = self.session_policy();
348        if current == previous {
349            return;
350        }
351        let Ok(services) = self.runtime_session_services() else {
352            return;
353        };
354        session
355            .plugins()
356            .emit_runtime_event(crate::PluginLifecycleEvent::SessionConfigChanged(Box::new(
357                SessionConfigChangedContext {
358                    session_id: self.state.session_id.clone(),
359                    previous,
360                    current,
361                    sessions: services.state_service(),
362                },
363            )))
364            .await;
365    }
366
367    pub(super) async fn apply_session_config_mutations(&mut self, previous: SessionPolicy) {
368        let Some(session) = self.session.as_ref() else {
369            return;
370        };
371        let current = self.session_policy();
372        if current == previous {
373            return;
374        }
375        let Ok(services) = self.runtime_session_services() else {
376            return;
377        };
378        self.policy = session
379            .plugins()
380            .mutate_session_config(
381                SessionConfigChangedContext {
382                    session_id: self.state.session_id.clone(),
383                    previous,
384                    current,
385                    sessions: services.state_service(),
386                },
387                self.policy.clone(),
388            )
389            .await;
390        self.state.policy = self.policy.clone();
391    }
392}
393
394pub(in crate::runtime) async fn enqueue_turn_input_to_store(
395    session_id: String,
396    store: Arc<dyn crate::RuntimePersistence>,
397    queued_work_poke: Option<crate::QueuedWorkPoke>,
398    input: crate::TurnInput,
399    delivery_policy: crate::DeliveryPolicy,
400    slot_policy: crate::SlotPolicy,
401    source_key: Option<String>,
402) -> Result<crate::QueuedWorkBatch, RuntimeError> {
403    super::turn_loop::ensure_durable_effect_input(&input)?;
404    let mut draft = crate::QueuedWorkBatchDraft::new(
405        session_id,
406        delivery_policy,
407        slot_policy,
408        vec![crate::QueuedWorkPayload::turn_input(input)],
409    );
410    draft.source_key = source_key;
411    let enqueued = store
412        .enqueue_queued_work(draft)
413        .await
414        .map_err(|err| RuntimeError::new(RuntimeErrorCode::StoreCommitFailed, err.to_string()))?;
415    if let Some(poke) = queued_work_poke.as_ref() {
416        poke.poke_session(enqueued.session_id.clone(), "queued_turn_input");
417    }
418    Ok(enqueued)
419}
420
421impl LashRuntime {
422    pub async fn submit_session_command(
423        &mut self,
424        command: crate::SessionCommand,
425        idempotency_key: impl Into<String>,
426    ) -> Result<crate::SessionCommandReceipt, RuntimeError> {
427        let idempotency_key = idempotency_key.into();
428        if idempotency_key.trim().is_empty() {
429            return Err(RuntimeError::new(
430                RuntimeErrorCode::Other("session_command_idempotency_key".to_string()),
431                "session command idempotency key cannot be empty",
432            ));
433        }
434        let source_key = command.source_key(&idempotency_key);
435        let session_id = self.state.session_id.clone();
436        let Some(store) = self
437            .session
438            .as_ref()
439            .and_then(|session| session.history_store())
440        else {
441            let batch_id = format!("inline-command:{}", uuid::Uuid::new_v4());
442            self.apply_session_command(command, None).await?;
443            return Ok(crate::SessionCommandReceipt {
444                session_id,
445                batch_id,
446                source_key,
447            });
448        };
449        let draft = crate::QueuedWorkBatchDraft::new(
450            session_id.clone(),
451            crate::DeliveryPolicy::AfterCurrentTurnCommit,
452            crate::SlotPolicy::Exclusive,
453            vec![crate::QueuedWorkPayload::session_command(command)],
454        )
455        .with_source_key(source_key.clone());
456        let enqueued = store.enqueue_queued_work(draft).await.map_err(|err| {
457            RuntimeError::new(RuntimeErrorCode::StoreCommitFailed, err.to_string())
458        })?;
459        if let Some(poke) = self.host.queued_work_poke.as_ref() {
460            poke.poke_session(session_id.clone(), "session_command");
461        }
462        Ok(crate::SessionCommandReceipt {
463            session_id,
464            batch_id: enqueued.batch_id,
465            source_key,
466        })
467    }
468
469    pub async fn drain_next_session_command(
470        &mut self,
471    ) -> Result<Option<crate::SessionCommandReceipt>, RuntimeError> {
472        let Some(store) = self
473            .session
474            .as_ref()
475            .and_then(|session| session.history_store())
476        else {
477            return Ok(None);
478        };
479        let claim = store
480            .claim_ready_queued_work(
481                &self.state.session_id,
482                &self.runtime_scope_id,
483                crate::QueuedWorkClaimBoundary::Idle,
484                crate::QUEUED_WORK_CLAIM_TTL_MS,
485                1,
486            )
487            .await
488            .map_err(|err| {
489                RuntimeError::new(RuntimeErrorCode::StoreCommitFailed, err.to_string())
490            })?;
491        let Some(claim) = claim else {
492            return Ok(None);
493        };
494        let Some((batch, command)) = claim.exclusive_session_command() else {
495            store
496                .abandon_queued_work_claim(&claim)
497                .await
498                .map_err(|err| {
499                    RuntimeError::new(RuntimeErrorCode::StoreCommitFailed, err.to_string())
500                })?;
501            return Ok(None);
502        };
503        let batch_id = batch.batch_id.clone();
504        let source_key = batch.source_key.clone().unwrap_or_else(|| batch_id.clone());
505        let command = command.clone();
506        self.apply_session_command(command, Some(claim.completion()))
507            .await?;
508        Ok(Some(crate::SessionCommandReceipt {
509            session_id: self.state.session_id.clone(),
510            batch_id,
511            source_key,
512        }))
513    }
514
515    async fn apply_session_command(
516        &mut self,
517        command: crate::SessionCommand,
518        completion: Option<crate::QueuedWorkCompletion>,
519    ) -> Result<(), RuntimeError> {
520        self.refresh_session_graph_from_store()
521            .await
522            .map_err(|err| RuntimeError::new("session_command_refresh", err.to_string()))?;
523        let graph = match command {
524            crate::SessionCommand::RefreshToolSurface {
525                expected_generation,
526                ..
527            } => {
528                if let Some(expected) = expected_generation {
529                    let actual = self
530                        .tool_state()
531                        .map_err(|err| {
532                            RuntimeError::new("session_command_tool_state", err.to_string())
533                        })?
534                        .generation();
535                    if actual != expected {
536                        return Err(RuntimeError::new(
537                            "session_command_generation_mismatch",
538                            format!(
539                                "expected tool generation {expected}, but live generation is {actual}"
540                            ),
541                        ));
542                    }
543                }
544                self.refresh_session_tool_surface().await.map_err(|err| {
545                    RuntimeError::new("session_command_refresh_tools", err.to_string())
546                })?;
547                crate::store::GraphCommitDelta::Unchanged {
548                    leaf_node_id: self.state.session_graph.leaf_node_id.clone(),
549                }
550            }
551            crate::SessionCommand::ResetSession { .. } => {
552                let mut state = crate::RuntimeSessionState {
553                    session_id: self.state.session_id.clone(),
554                    policy: self.policy.clone(),
555                    graph_replace_required: true,
556                    ..crate::RuntimeSessionState::default()
557                };
558                state.ensure_agent_frame_initialized();
559                self.set_persisted_state(state)
560                    .map_err(|err| RuntimeError::new("session_command_reset", err.to_string()))?;
561                crate::store::GraphCommitDelta::ReplaceFull(self.state.session_graph.clone())
562            }
563        };
564        let Some(store) = self
565            .session
566            .as_ref()
567            .and_then(|session| session.history_store())
568        else {
569            return Ok(());
570        };
571        let mut commit =
572            crate::store::RuntimeCommit::persisted_state_with_graph_commit(&self.state, graph, &[]);
573        if let Some(completion) = completion {
574            commit = commit.completing_queue_claim(completion);
575        }
576        let result = store.commit_runtime_state(commit).await.map_err(|err| {
577            RuntimeError::new(RuntimeErrorCode::StoreCommitFailed, err.to_string())
578        })?;
579        self.state.apply_persisted_commit_result(result);
580        Ok(())
581    }
582}
583
584pub(in crate::runtime) fn queued_turn_input_store_required() -> RuntimeError {
585    RuntimeError::new(
586        RuntimeErrorCode::StoreCommitFailed,
587        "queued turn input requires a persistent runtime store",
588    )
589}