lash_core/runtime/
session_api.rs1use super::*;
2
3impl LashRuntime {
4 pub fn session_id(&self) -> &str {
5 &self.state.session_id
6 }
7
8 pub(super) fn stamp_live_plugin_state(&mut self) {
9 if let Some(session) = self.session.as_ref() {
10 let snapshot = session.plugins().tool_registry().export_state();
11 self.state.tool_state_generation = Some(snapshot.generation());
12 self.state.tool_state_snapshot = Some(snapshot);
13 self.state.plugin_snapshot = session.plugins().snapshot().ok();
14 self.state.plugin_snapshot_revision =
15 Some(session.plugins().snapshot_revision_fingerprint());
16 } else {
17 self.state.tool_state_generation = None;
18 self.state.tool_state_snapshot = None;
19 self.state.plugin_snapshot = None;
20 self.state.plugin_snapshot_revision = None;
21 }
22 }
23 pub(super) fn active_tool_catalog(&self) -> Vec<serde_json::Value> {
24 self.active_tool_catalog_shared().as_ref().clone()
25 }
26
27 pub(super) fn active_tool_catalog_shared(&self) -> Arc<Vec<serde_json::Value>> {
28 self.session
29 .as_ref()
30 .map(|session| {
31 session
32 .shared_tool_catalog(&self.state.session_id, self.policy.execution_mode.clone())
33 })
34 .unwrap_or_else(|| Arc::new(Vec::new()))
35 }
36
37 pub fn tool_state(&self) -> Result<crate::ToolState, SessionError> {
38 let Some(session) = self.session.as_ref() else {
39 return Err(SessionError::Protocol(
40 "runtime session not available".to_string(),
41 ));
42 };
43 Ok(session.plugins().tool_registry().export_state())
44 }
45 pub fn set_mode_turn_options(&mut self, options: crate::ModeTurnOptions) {
47 self.state.mode_turn_options = options.clone();
48 self.mode_turn_options = options;
49 }
50
51 pub fn export_state(&self) -> SessionStateEnvelope {
55 self.state.export_state()
56 }
57
58 pub fn read_view(&self) -> crate::SessionReadView {
59 crate::SessionReadView::from_runtime_state(
60 &self.state,
61 self.policy.clone(),
62 self.mode_turn_options.clone(),
63 )
64 }
65
66 pub fn export_persistence_state(&self) -> PersistedSessionState {
68 self.state.clone()
69 }
70
71 pub fn apply_persistence_state(&mut self, state: PersistedSessionState) {
72 self.set_persisted_state(state);
73 }
74
75 pub(crate) fn export_graph_first_state(&self) -> PersistedSessionState {
76 self.state.clone()
77 }
78
79 pub fn export_persisted_state(&self) -> PersistedSessionState {
82 let mut state = self.state.clone();
83 state.mode_turn_options = self.mode_turn_options.clone();
84 if let Some(session) = self.session.as_ref() {
85 let snapshot = session.plugins().tool_registry().export_state();
86 state.tool_state_generation = Some(snapshot.generation());
87 state.tool_state_snapshot = Some(snapshot);
88 state.plugin_snapshot = session.plugins().snapshot().ok();
89 state.plugin_snapshot_revision =
90 Some(session.plugins().snapshot_revision_fingerprint());
91 }
92 normalize_session_graph(&mut state);
93 state
94 }
95
96 pub fn usage_report(&self) -> SessionUsageReport {
97 let mut entries = self.state.token_ledger.clone();
98 let drained = self.shared_token_ledger.lock().expect("token ledger lock");
99 for entry in drained.iter().cloned() {
100 merge_ledger_entry(&mut entries, entry);
101 }
102 SessionUsageReport::from_entries(&entries)
103 }
104
105 pub async fn await_background_work(&mut self) -> Result<(), SessionError> {
106 let manager = self
107 .runtime_session_manager()
108 .map_err(|err| SessionError::Protocol(err.to_string()))?;
109 manager
110 .await_hidden_tasks(&self.state.session_id)
111 .await
112 .map_err(|err| SessionError::Protocol(format!("background task failed: {err}")))?;
113 if self.background_sync_needed.swap(false, Ordering::AcqRel) {
114 self.refresh_session_graph_from_store().await;
115 }
116 self.refresh_session_tool_surface().await?;
117 Ok(())
118 }
119
120 pub(super) async fn refresh_session_graph_from_store(&mut self) {
121 let Some(store) = self
122 .session
123 .as_ref()
124 .and_then(|session| session.history_store())
125 else {
126 return;
127 };
128 let read = match store
129 .load_session(crate::store::SessionReadScope::FullGraph)
130 .await
131 {
132 Ok(Some(read)) => read,
133 Ok(None) => return,
134 Err(err) => {
135 tracing::warn!("failed to refresh session graph from store: {err}");
136 return;
137 }
138 };
139 let has_newer_graph = self.state.head_revision != Some(read.head_revision)
140 || read.graph.leaf_node_id != self.state.session_graph.leaf_node_id
141 || read.checkpoint_ref != self.state.checkpoint_ref;
142 if !has_newer_graph {
143 return;
144 }
145 let head = crate::store::SessionHead {
146 session_id: read.session_id.clone(),
147 head_revision: read.head_revision,
148 graph: read.graph,
149 config: read.config.clone(),
150 checkpoint_ref: read.checkpoint_ref.clone(),
151 token_ledger: merge_usage_delta_entries(read.token_ledger),
152 };
153 apply_session_head(&mut self.state, &head);
154 apply_session_checkpoint(&mut self.state, read.checkpoint);
155 }
156
157 pub(super) fn runtime_session_manager(
158 &self,
159 ) -> Result<Arc<dyn RuntimeSessionHost>, PluginActionInvokeError> {
160 Ok(Arc::new(RuntimeSessionManager::new(self, true, None)?))
161 }
162
163 pub(super) fn runtime_session_manager_for_turn(
164 &self,
165 child_usage_event_relay: Option<ChildUsageEventRelay>,
166 ) -> Result<Arc<dyn RuntimeSessionHost>, PluginActionInvokeError> {
167 Ok(Arc::new(RuntimeSessionManager::new(
168 self,
169 false,
170 child_usage_event_relay,
171 )?))
172 }
173
174 pub fn session_manager(&self) -> Result<Arc<dyn RuntimeSessionHost>, PluginActionInvokeError> {
175 self.runtime_session_manager()
176 }
177
178 pub fn plugin_session(&self) -> Option<Arc<crate::PluginSession>> {
180 self.session.as_ref().map(|s| Arc::clone(s.plugins()))
181 }
182
183 pub fn turn_input_injection_bridge(
184 &self,
185 ) -> Result<crate::TurnInputInjectionBridge, SessionError> {
186 let Some(session) = self.session.as_ref() else {
187 return Err(SessionError::Protocol(
188 "runtime session not available".to_string(),
189 ));
190 };
191 Ok(session.turn_input_injection_bridge().clone())
192 }
193
194 pub async fn rewrite_history(
199 &mut self,
200 trigger: crate::RewriteTrigger,
201 ) -> Result<bool, PluginActionInvokeError> {
202 let manager = self.runtime_session_manager()?;
203 let Some(plugin_session) = self.session.as_ref().map(|s| Arc::clone(s.plugins())) else {
204 return Err(PluginActionInvokeError::Unknown(
205 "runtime session not available".to_string(),
206 ));
207 };
208 let ctx = crate::RewriteContext {
209 session_id: self.state.session_id.clone(),
210 trigger,
211 state: self.read_view(),
212 host: manager,
213 };
214 let input = crate::HistoryState::from_state(&self.state.export_state());
215 let baseline_messages = input.messages.len();
216 let outcome = plugin_session
217 .rewrite_history(&ctx, input)
218 .await
219 .map_err(|err| {
220 PluginActionInvokeError::Unknown(format!("rewrite_history failed: {err}"))
221 })?;
222 let mutated =
223 outcome.metadata.produced_summary || outcome.messages.len() != baseline_messages;
224 if mutated {
225 self.state
226 .replace_active_read_state(&outcome.messages, &outcome.tool_calls);
227 if let Some(session) = self.session.as_ref() {
228 self.state.tool_state_snapshot =
229 Some(session.plugins().tool_registry().export_state());
230 self.state.plugin_snapshot = session.plugins().snapshot().ok();
231 self.state.plugin_snapshot_revision =
232 Some(session.plugins().snapshot_revision_fingerprint());
233 }
234 }
235 Ok(mutated)
236 }
237
238 pub(super) fn session_policy(&self) -> SessionPolicy {
239 self.policy.clone()
240 }
241
242 pub(super) async fn notify_session_config_changed(&self, previous: SessionPolicy) {
243 let Some(session) = self.session.as_ref() else {
244 return;
245 };
246 let current = self.session_policy();
247 if current == previous {
248 return;
249 }
250 let Ok(host) = self.runtime_session_manager() else {
251 return;
252 };
253 session
254 .plugins()
255 .emit_runtime_event(crate::PluginRuntimeEvent::SessionConfigChanged(Box::new(
256 SessionConfigChangedContext {
257 session_id: self.state.session_id.clone(),
258 previous,
259 current,
260 host,
261 },
262 )))
263 .await;
264 }
265
266 pub(super) async fn apply_session_config_mutations(&mut self, previous: SessionPolicy) {
267 let Some(session) = self.session.as_ref() else {
268 return;
269 };
270 let current = self.session_policy();
271 if current == previous {
272 return;
273 }
274 let Ok(host) = self.runtime_session_manager() else {
275 return;
276 };
277 self.policy = session
278 .plugins()
279 .mutate_session_config(
280 SessionConfigChangedContext {
281 session_id: self.state.session_id.clone(),
282 previous,
283 current,
284 host,
285 },
286 self.policy.clone(),
287 )
288 .await;
289 self.state.policy = self.policy.clone();
290 }
291}