Skip to main content

lash_core/runtime/
session_api.rs

1use super::*;
2
3impl LashRuntime {
4    pub fn session_id(&self) -> &str {
5        &self.state.session_id
6    }
7
8    pub(super) fn stamp_live_plugin_state(&mut self) {
9        if let Some(session) = self.session.as_ref() {
10            let snapshot = session.plugins().tool_registry().export_state();
11            self.state.tool_state_generation = Some(snapshot.generation());
12            self.state.tool_state_snapshot = Some(snapshot);
13            self.state.plugin_snapshot = session.plugins().snapshot().ok();
14            self.state.plugin_snapshot_revision =
15                Some(session.plugins().snapshot_revision_fingerprint());
16        } else {
17            self.state.tool_state_generation = None;
18            self.state.tool_state_snapshot = None;
19            self.state.plugin_snapshot = None;
20            self.state.plugin_snapshot_revision = None;
21        }
22    }
23    pub(super) fn active_tool_catalog(&self) -> Vec<serde_json::Value> {
24        self.active_tool_catalog_shared().as_ref().clone()
25    }
26
27    pub(super) fn active_tool_catalog_shared(&self) -> Arc<Vec<serde_json::Value>> {
28        self.session
29            .as_ref()
30            .map(|session| {
31                session
32                    .shared_tool_catalog(&self.state.session_id, self.policy.execution_mode.clone())
33            })
34            .unwrap_or_else(|| Arc::new(Vec::new()))
35    }
36
37    pub fn tool_state(&self) -> Result<crate::ToolState, SessionError> {
38        let Some(session) = self.session.as_ref() else {
39            return Err(SessionError::Protocol(
40                "runtime session not available".to_string(),
41            ));
42        };
43        Ok(session.plugins().tool_registry().export_state())
44    }
45    /// Override mode-owned turn options for this session.
46    pub fn set_mode_turn_options(&mut self, options: crate::ModeTurnOptions) {
47        self.state.mode_turn_options = options.clone();
48        self.mode_turn_options = options;
49    }
50
51    /// Export current session state for inspection/UI purposes.
52    /// This keeps persistence-heavy snapshots untouched; callers that need a
53    /// fully persisted view should use `export_persisted_state`.
54    pub fn export_state(&self) -> SessionStateEnvelope {
55        self.state.export_state()
56    }
57
58    pub fn read_view(&self) -> crate::SessionReadView {
59        crate::SessionReadView::from_runtime_state(
60            &self.state,
61            self.policy.clone(),
62            self.mode_turn_options.clone(),
63        )
64    }
65
66    /// Export the narrow persistence snapshot used by stores and resume logic.
67    pub fn export_persistence_state(&self) -> PersistedSessionState {
68        self.state.clone()
69    }
70
71    pub fn apply_persistence_state(&mut self, state: PersistedSessionState) {
72        self.set_persisted_state(state);
73    }
74
75    pub(crate) fn export_graph_first_state(&self) -> PersistedSessionState {
76        self.state.clone()
77    }
78
79    /// Export a persistence-ready state envelope with dynamic/plugin snapshots
80    /// refreshed from the live session.
81    pub fn export_persisted_state(&self) -> PersistedSessionState {
82        let mut state = self.state.clone();
83        state.mode_turn_options = self.mode_turn_options.clone();
84        if let Some(session) = self.session.as_ref() {
85            let snapshot = session.plugins().tool_registry().export_state();
86            state.tool_state_generation = Some(snapshot.generation());
87            state.tool_state_snapshot = Some(snapshot);
88            state.plugin_snapshot = session.plugins().snapshot().ok();
89            state.plugin_snapshot_revision =
90                Some(session.plugins().snapshot_revision_fingerprint());
91        }
92        normalize_session_graph(&mut state);
93        state
94    }
95
96    pub fn usage_report(&self) -> SessionUsageReport {
97        let mut entries = self.state.token_ledger.clone();
98        let drained = self.shared_token_ledger.lock().expect("token ledger lock");
99        for entry in drained.iter().cloned() {
100            merge_ledger_entry(&mut entries, entry);
101        }
102        SessionUsageReport::from_entries(&entries)
103    }
104
105    pub async fn await_background_work(&mut self) -> Result<(), SessionError> {
106        let manager = self
107            .runtime_session_manager()
108            .map_err(|err| SessionError::Protocol(err.to_string()))?;
109        manager
110            .await_hidden_tasks(&self.state.session_id)
111            .await
112            .map_err(|err| SessionError::Protocol(format!("background task failed: {err}")))?;
113        if self.background_sync_needed.swap(false, Ordering::AcqRel) {
114            self.refresh_session_graph_from_store().await;
115        }
116        self.refresh_session_tool_surface().await?;
117        Ok(())
118    }
119
120    pub(super) async fn refresh_session_graph_from_store(&mut self) {
121        let Some(store) = self
122            .session
123            .as_ref()
124            .and_then(|session| session.history_store())
125        else {
126            return;
127        };
128        let read = match store
129            .load_session(crate::store::SessionReadScope::FullGraph)
130            .await
131        {
132            Ok(Some(read)) => read,
133            Ok(None) => return,
134            Err(err) => {
135                tracing::warn!("failed to refresh session graph from store: {err}");
136                return;
137            }
138        };
139        let has_newer_graph = self.state.head_revision != Some(read.head_revision)
140            || read.graph.leaf_node_id != self.state.session_graph.leaf_node_id
141            || read.checkpoint_ref != self.state.checkpoint_ref;
142        if !has_newer_graph {
143            return;
144        }
145        let head = crate::store::SessionHead {
146            session_id: read.session_id.clone(),
147            head_revision: read.head_revision,
148            graph: read.graph,
149            config: read.config.clone(),
150            checkpoint_ref: read.checkpoint_ref.clone(),
151            token_ledger: merge_usage_delta_entries(read.token_ledger),
152        };
153        apply_session_head(&mut self.state, &head);
154        apply_session_checkpoint(&mut self.state, read.checkpoint);
155    }
156
157    pub(super) fn runtime_session_manager(
158        &self,
159    ) -> Result<Arc<dyn RuntimeSessionHost>, PluginActionInvokeError> {
160        Ok(Arc::new(RuntimeSessionManager::new(self, true, None)?))
161    }
162
163    pub(super) fn runtime_session_manager_for_turn(
164        &self,
165        child_usage_event_relay: Option<ChildUsageEventRelay>,
166    ) -> Result<Arc<dyn RuntimeSessionHost>, PluginActionInvokeError> {
167        Ok(Arc::new(RuntimeSessionManager::new(
168            self,
169            false,
170            child_usage_event_relay,
171        )?))
172    }
173
174    pub fn session_manager(&self) -> Result<Arc<dyn RuntimeSessionHost>, PluginActionInvokeError> {
175        self.runtime_session_manager()
176    }
177
178    /// The plugin session bound to the currently active runtime session, if any.
179    pub fn plugin_session(&self) -> Option<Arc<crate::PluginSession>> {
180        self.session.as_ref().map(|s| Arc::clone(s.plugins()))
181    }
182
183    pub fn turn_input_injection_bridge(
184        &self,
185    ) -> Result<crate::TurnInputInjectionBridge, SessionError> {
186        let Some(session) = self.session.as_ref() else {
187            return Err(SessionError::Protocol(
188                "runtime session not available".to_string(),
189            ));
190        };
191        Ok(session.turn_input_injection_bridge().clone())
192    }
193
194    /// Run the registered history rewrite pipeline against the current
195    /// state, applying the resulting messages back onto the runtime.
196    /// Returns true when at least one rewriter produced a summary or
197    /// otherwise mutated the message list.
198    pub async fn rewrite_history(
199        &mut self,
200        trigger: crate::RewriteTrigger,
201    ) -> Result<bool, PluginActionInvokeError> {
202        let manager = self.runtime_session_manager()?;
203        let Some(plugin_session) = self.session.as_ref().map(|s| Arc::clone(s.plugins())) else {
204            return Err(PluginActionInvokeError::Unknown(
205                "runtime session not available".to_string(),
206            ));
207        };
208        let ctx = crate::RewriteContext {
209            session_id: self.state.session_id.clone(),
210            trigger,
211            state: self.read_view(),
212            host: manager,
213        };
214        let input = crate::HistoryState::from_state(&self.state.export_state());
215        let baseline_messages = input.messages.len();
216        let outcome = plugin_session
217            .rewrite_history(&ctx, input)
218            .await
219            .map_err(|err| {
220                PluginActionInvokeError::Unknown(format!("rewrite_history failed: {err}"))
221            })?;
222        let mutated =
223            outcome.metadata.produced_summary || outcome.messages.len() != baseline_messages;
224        if mutated {
225            self.state
226                .replace_active_read_state(&outcome.messages, &outcome.tool_calls);
227            if let Some(session) = self.session.as_ref() {
228                self.state.tool_state_snapshot =
229                    Some(session.plugins().tool_registry().export_state());
230                self.state.plugin_snapshot = session.plugins().snapshot().ok();
231                self.state.plugin_snapshot_revision =
232                    Some(session.plugins().snapshot_revision_fingerprint());
233            }
234        }
235        Ok(mutated)
236    }
237
238    pub(super) fn session_policy(&self) -> SessionPolicy {
239        self.policy.clone()
240    }
241
242    pub(super) async fn notify_session_config_changed(&self, previous: SessionPolicy) {
243        let Some(session) = self.session.as_ref() else {
244            return;
245        };
246        let current = self.session_policy();
247        if current == previous {
248            return;
249        }
250        let Ok(host) = self.runtime_session_manager() else {
251            return;
252        };
253        session
254            .plugins()
255            .emit_runtime_event(crate::PluginRuntimeEvent::SessionConfigChanged(Box::new(
256                SessionConfigChangedContext {
257                    session_id: self.state.session_id.clone(),
258                    previous,
259                    current,
260                    host,
261                },
262            )))
263            .await;
264    }
265
266    pub(super) async fn apply_session_config_mutations(&mut self, previous: SessionPolicy) {
267        let Some(session) = self.session.as_ref() else {
268            return;
269        };
270        let current = self.session_policy();
271        if current == previous {
272            return;
273        }
274        let Ok(host) = self.runtime_session_manager() else {
275            return;
276        };
277        self.policy = session
278            .plugins()
279            .mutate_session_config(
280                SessionConfigChangedContext {
281                    session_id: self.state.session_id.clone(),
282                    previous,
283                    current,
284                    host,
285                },
286                self.policy.clone(),
287            )
288            .await;
289        self.state.policy = self.policy.clone();
290    }
291}