1use super::*;
2
3impl LashRuntime {
4 pub fn session_id(&self) -> &str {
5 &self.state.session_id
6 }
7
8 pub(super) fn stamp_live_plugin_state(&mut self) {
9 if let Some(session) = self.session.as_ref() {
10 let snapshot = session.plugins().tool_registry().export_state();
11 self.state.tool_state_generation = Some(snapshot.generation());
12 self.state.tool_state_snapshot = Some(snapshot);
13 let captured = session.plugins().snapshot();
14 crate::runtime::state::store_plugin_snapshot(&mut self.state.plugin_snapshot, captured);
15 self.state.plugin_snapshot_revision =
16 Some(session.plugins().snapshot_revision_fingerprint());
17 } else {
18 self.state.tool_state_generation = None;
19 self.state.tool_state_snapshot = None;
20 self.state.plugin_snapshot = None;
21 self.state.plugin_snapshot_revision = None;
22 }
23 }
24 pub(super) fn active_tool_catalog_shared(
25 &self,
26 ) -> Result<Arc<Vec<serde_json::Value>>, crate::PluginError> {
27 self.session
28 .as_ref()
29 .map(|session| session.shared_tool_catalog(&self.state.session_id))
30 .unwrap_or_else(|| Ok(Arc::new(Vec::new())))
31 }
32
33 pub fn tool_state(&self) -> Result<crate::ToolState, SessionError> {
34 let Some(session) = self.session.as_ref() else {
35 return Err(SessionError::Protocol(
36 "runtime session not available".to_string(),
37 ));
38 };
39 Ok(session.plugins().tool_registry().export_state())
40 }
41 pub fn set_protocol_turn_options(&mut self, options: crate::ProtocolTurnOptions) {
43 self.state.protocol_turn_options = options.clone();
44 if let Some(frame) = self.state.current_agent_frame_mut() {
45 frame.protocol_turn_options = options.clone();
46 }
47 self.protocol_turn_options = options;
48 }
49
50 pub fn export_state(&self) -> crate::SessionSnapshot {
54 self.state.to_snapshot()
55 }
56
57 pub fn read_view(&self) -> crate::SessionReadView {
58 crate::SessionReadView::from_runtime_state(
59 &self.state,
60 self.state.effective_policy().clone(),
61 self.state.effective_protocol_turn_options().clone(),
62 )
63 }
64
65 pub fn export_persistence_state(&self) -> RuntimeSessionState {
67 self.state.clone()
68 }
69
70 pub fn apply_persistence_state(
71 &mut self,
72 state: RuntimeSessionState,
73 ) -> Result<(), SessionError> {
74 self.set_persisted_state(state)
75 }
76
77 pub(crate) fn export_graph_first_state(&self) -> RuntimeSessionState {
78 self.state.clone()
79 }
80
81 pub fn export_persisted_state(&self) -> RuntimeSessionState {
84 let mut state = self.state.clone();
85 state.protocol_turn_options = self.protocol_turn_options.clone();
86 if let Some(frame) = state.current_agent_frame_mut() {
87 frame.protocol_turn_options = self.protocol_turn_options.clone();
88 }
89 if let Some(session) = self.session.as_ref() {
90 let snapshot = session.plugins().tool_registry().export_state();
91 state.tool_state_generation = Some(snapshot.generation());
92 state.tool_state_snapshot = Some(snapshot);
93 let captured = session.plugins().snapshot();
94 crate::runtime::state::store_plugin_snapshot(&mut state.plugin_snapshot, captured);
95 state.plugin_snapshot_revision =
96 Some(session.plugins().snapshot_revision_fingerprint());
97 }
98 normalize_session_graph(&mut state);
99 state
100 }
101
102 pub fn usage_report(&self) -> SessionUsageReport {
103 let mut entries = self.state.token_ledger.clone();
104 let drained = self.shared_token_ledger.lock().expect("token ledger lock");
105 for entry in drained.iter().cloned() {
106 merge_ledger_entry(&mut entries, entry);
107 }
108 SessionUsageReport::from_entries(&entries)
109 }
110
111 pub async fn await_background_work(&mut self) -> Result<(), SessionError> {
112 if self.process_sync_needed.swap(false, Ordering::AcqRel) {
113 self.refresh_session_graph_from_store().await?;
114 }
115 Ok(())
116 }
117
118 pub(super) async fn refresh_session_graph_from_store(&mut self) -> Result<(), SessionError> {
119 if self.state.graph_replace_required && self.state.head_revision.is_none() {
123 return Ok(());
124 }
125 let Some(store) = self
126 .session
127 .as_ref()
128 .and_then(|session| session.history_store())
129 else {
130 return Ok(());
131 };
132 let scope = match self.residency {
133 crate::Residency::KeepAll => crate::store::SessionReadScope::FullGraph,
134 crate::Residency::ActivePathOnly => crate::store::SessionReadScope::ActivePath {
135 leaf_node_id: self.state.session_graph.leaf_node_id.clone(),
136 },
137 };
138 let Some(read) = store.load_session(scope).await.map_err(|err| {
139 SessionError::Protocol(format!("failed to refresh session graph from store: {err}"))
140 })?
141 else {
142 return Ok(());
143 };
144 let has_newer_graph = self.state.head_revision != Some(read.head_revision)
145 || read.graph.leaf_node_id != self.state.session_graph.leaf_node_id
146 || read.checkpoint_ref != self.state.checkpoint_ref;
147 if !has_newer_graph {
148 return Ok(());
149 }
150 let head = crate::store::SessionHead {
151 session_id: read.session_id.clone(),
152 head_revision: read.head_revision,
153 agent_frames: read.agent_frames.clone(),
154 current_agent_frame_id: read.current_agent_frame_id.clone(),
155 graph: read.graph,
156 config: read.config.clone(),
157 checkpoint_ref: read.checkpoint_ref.clone(),
158 token_ledger: merge_usage_delta_entries(read.token_ledger),
159 };
160 apply_session_head(&mut self.state, &head);
161 apply_session_checkpoint(&mut self.state, read.checkpoint);
162 self.policy = self.state.effective_policy().clone();
163 self.protocol_turn_options = self.state.effective_protocol_turn_options().clone();
164 Ok(())
165 }
166
167 pub(super) fn runtime_session_services(
168 &self,
169 ) -> Result<Arc<RuntimeSessionServices>, PluginActionInvokeError> {
170 Ok(Arc::new(RuntimeSessionServices::new(self, true, None)?))
171 }
172
173 pub(super) fn runtime_session_services_for_turn(
174 &self,
175 child_usage_event_relay: Option<ChildUsageEventRelay>,
176 ) -> Result<Arc<RuntimeSessionServices>, PluginActionInvokeError> {
177 Ok(Arc::new(RuntimeSessionServices::new(
178 self,
179 false,
180 child_usage_event_relay,
181 )?))
182 }
183
184 pub fn session_state_service(
185 &self,
186 ) -> Result<Arc<dyn crate::plugin::SessionStateService>, PluginActionInvokeError> {
187 self.runtime_session_services()
188 .map(|services| services.state_service())
189 }
190
191 pub fn session_lifecycle_service(
192 &self,
193 ) -> Result<Arc<dyn crate::plugin::SessionLifecycleService>, PluginActionInvokeError> {
194 self.runtime_session_services()
195 .map(|services| services.lifecycle_service())
196 }
197
198 pub fn session_graph_service(
199 &self,
200 ) -> Result<Arc<dyn crate::plugin::SessionGraphService>, PluginActionInvokeError> {
201 self.runtime_session_services()
202 .map(|services| services.graph_service())
203 }
204
205 pub fn process_service(
206 &self,
207 ) -> Result<Arc<dyn crate::ProcessService>, PluginActionInvokeError> {
208 self.runtime_session_services()
209 .map(|services| services.process_service())
210 }
211
212 pub fn process_cancel_ability(&self) -> Arc<dyn crate::ProcessCancelAbility> {
213 Arc::clone(&self.host.core.control.process_cancel_ability)
214 }
215
216 pub fn effect_host(&self) -> Arc<dyn crate::EffectHost> {
217 Arc::clone(&self.host.core.control.effect_host)
218 }
219
220 pub async fn enqueue_turn_input(
221 &self,
222 input: crate::TurnInput,
223 delivery_policy: crate::DeliveryPolicy,
224 slot_policy: crate::SlotPolicy,
225 source_key: Option<String>,
226 ) -> Result<crate::QueuedWorkBatch, RuntimeError> {
227 let store = self
228 .session
229 .as_ref()
230 .and_then(|session| session.history_store())
231 .ok_or_else(queued_turn_input_store_required)?;
232 enqueue_turn_input_to_store(
233 self.state.session_id.clone(),
234 store,
235 self.host.queued_work_driver.clone(),
236 input,
237 delivery_policy,
238 slot_policy,
239 source_key,
240 )
241 .await
242 }
243
244 pub async fn cancel_queued_work_batch(
245 &self,
246 session_id: &str,
247 batch_id: &str,
248 ) -> Result<Option<crate::QueuedWorkBatch>, RuntimeError> {
249 let store = self
250 .session
251 .as_ref()
252 .and_then(|session| session.history_store())
253 .ok_or_else(queued_turn_input_store_required)?;
254 store
255 .cancel_queued_work_batch(session_id, batch_id)
256 .await
257 .map_err(|err| RuntimeError::new(RuntimeErrorCode::StoreCommitFailed, err.to_string()))
258 }
259
260 pub fn plugin_session(&self) -> Option<Arc<crate::PluginSession>> {
262 self.session.as_ref().map(|s| Arc::clone(s.plugins()))
263 }
264
265 pub fn open_agent_frame(
266 &mut self,
267 request: crate::OpenAgentFrameRequest,
268 ) -> crate::OpenAgentFrameResult {
269 open_agent_frame_in_state_with_clock(
270 &mut self.state,
271 request,
272 self.host.core.clock.as_ref(),
273 )
274 }
275
276 pub async fn compact_context(
279 &mut self,
280 instructions: Option<String>,
281 scoped_effect_controller: crate::ScopedEffectController<'_>,
282 ) -> Result<bool, PluginActionInvokeError> {
283 let services = self.runtime_session_services()?;
284 let Some(plugin_session) = self.session.as_ref().map(|s| Arc::clone(s.plugins())) else {
285 return Err(PluginActionInvokeError::Unknown(
286 "runtime session not available".to_string(),
287 ));
288 };
289 let ctx = crate::CompactionContext {
290 session_id: self.state.session_id.clone(),
291 state: self.read_view(),
292 instructions,
293 sessions: services.state_service(),
294 session_lifecycle: services.lifecycle_service(),
295 session_graph: services.graph_service(),
296 scoped_effect_controller,
297 };
298 let Some(compaction) = plugin_session.compact_context(&ctx).await.map_err(|err| {
299 PluginActionInvokeError::Unknown(format!("context compaction failed: {err}"))
300 })?
301 else {
302 return Ok(false);
303 };
304 let frame_id = format!(
305 "{}:frame:compaction:{}",
306 self.state.session_id,
307 uuid::Uuid::new_v4()
308 );
309 let result = self.open_agent_frame(
310 crate::OpenAgentFrameRequest::new(frame_id, crate::AgentFrameReason::compaction())
311 .with_initial_nodes(compaction.initial_nodes),
312 );
313 if result.opened {
314 self.stamp_live_plugin_state();
315 }
316 Ok(result.opened)
317 }
318
319 pub(super) fn session_policy(&self) -> SessionPolicy {
320 self.policy.clone()
321 }
322
323 pub(super) async fn notify_session_config_changed(&self, previous: SessionPolicy) {
324 let Some(session) = self.session.as_ref() else {
325 return;
326 };
327 let current = self.session_policy();
328 if current == previous {
329 return;
330 }
331 let Ok(services) = self.runtime_session_services() else {
332 return;
333 };
334 session
335 .plugins()
336 .emit_runtime_event(crate::PluginLifecycleEvent::SessionConfigChanged(Box::new(
337 SessionConfigChangedContext {
338 session_id: self.state.session_id.clone(),
339 previous,
340 current,
341 sessions: services.state_service(),
342 },
343 )))
344 .await;
345 }
346
347 pub(super) async fn apply_session_config_mutations(&mut self, previous: SessionPolicy) {
348 let Some(session) = self.session.as_ref() else {
349 return;
350 };
351 let current = self.session_policy();
352 if current == previous {
353 return;
354 }
355 let Ok(services) = self.runtime_session_services() else {
356 return;
357 };
358 self.policy = session
359 .plugins()
360 .mutate_session_config(
361 SessionConfigChangedContext {
362 session_id: self.state.session_id.clone(),
363 previous,
364 current,
365 sessions: services.state_service(),
366 },
367 self.policy.clone(),
368 )
369 .await;
370 self.state.policy = self.policy.clone();
371 }
372}
373
374pub(in crate::runtime) async fn enqueue_turn_input_to_store(
375 session_id: String,
376 store: Arc<dyn crate::RuntimePersistence>,
377 queued_work_driver: Option<crate::QueuedWorkDriver>,
378 input: crate::TurnInput,
379 delivery_policy: crate::DeliveryPolicy,
380 slot_policy: crate::SlotPolicy,
381 source_key: Option<String>,
382) -> Result<crate::QueuedWorkBatch, RuntimeError> {
383 super::turn_loop::ensure_durable_effect_input(&input)?;
384 let mut draft = crate::QueuedWorkBatchDraft::new(
385 session_id,
386 delivery_policy,
387 slot_policy,
388 vec![crate::QueuedWorkPayload::turn_input(input)],
389 );
390 draft.source_key = source_key;
391 let enqueued = store
392 .enqueue_queued_work(draft)
393 .await
394 .map_err(|err| RuntimeError::new(RuntimeErrorCode::StoreCommitFailed, err.to_string()))?;
395 if enqueued.delivery_policy == crate::DeliveryPolicy::AfterCurrentTurnCommit
396 && let Some(driver) = queued_work_driver.as_ref()
397 {
398 driver
399 .claim_and_run_pending(Some(&enqueued.session_id), "queued_turn_input")
400 .await
401 .map_err(|err| {
402 RuntimeError::new(
403 RuntimeErrorCode::Other("queued_work".to_string()),
404 err.to_string(),
405 )
406 })?;
407 }
408 Ok(enqueued)
409}
410
411impl LashRuntime {
412 pub async fn submit_session_command(
413 &mut self,
414 command: crate::SessionCommand,
415 idempotency_key: impl Into<String>,
416 ) -> Result<crate::SessionCommandReceipt, RuntimeError> {
417 let idempotency_key = idempotency_key.into();
418 if idempotency_key.trim().is_empty() {
419 return Err(RuntimeError::new(
420 RuntimeErrorCode::Other("session_command_idempotency_key".to_string()),
421 "session command idempotency key cannot be empty",
422 ));
423 }
424 let source_key = command.source_key(&idempotency_key);
425 let session_id = self.state.session_id.clone();
426 let Some(store) = self
427 .session
428 .as_ref()
429 .and_then(|session| session.history_store())
430 else {
431 let batch_id = format!("inline-command:{}", uuid::Uuid::new_v4());
432 self.apply_session_command(command, None).await?;
433 return Ok(crate::SessionCommandReceipt {
434 session_id,
435 batch_id,
436 source_key,
437 });
438 };
439 let draft = crate::QueuedWorkBatchDraft::new(
440 session_id.clone(),
441 crate::DeliveryPolicy::AfterCurrentTurnCommit,
442 crate::SlotPolicy::Exclusive,
443 vec![crate::QueuedWorkPayload::session_command(command)],
444 )
445 .with_source_key(source_key.clone());
446 let enqueued = store.enqueue_queued_work(draft).await.map_err(|err| {
447 RuntimeError::new(RuntimeErrorCode::StoreCommitFailed, err.to_string())
448 })?;
449 if let Some(driver) = self.host.queued_work_driver.as_ref() {
450 driver
451 .claim_and_run_pending(Some(&session_id), "session_command")
452 .await
453 .map_err(|err| {
454 RuntimeError::new(
455 RuntimeErrorCode::Other("queued_work".to_string()),
456 err.to_string(),
457 )
458 })?;
459 }
460 Ok(crate::SessionCommandReceipt {
461 session_id,
462 batch_id: enqueued.batch_id,
463 source_key,
464 })
465 }
466
467 pub async fn drain_next_session_command(
468 &mut self,
469 ) -> Result<Option<crate::SessionCommandReceipt>, RuntimeError> {
470 let Some(store) = self
471 .session
472 .as_ref()
473 .and_then(|session| session.history_store())
474 else {
475 return Ok(None);
476 };
477 let claim = store
478 .claim_ready_queued_work(
479 &self.state.session_id,
480 &self.runtime_scope_id,
481 crate::QueuedWorkClaimBoundary::Idle,
482 crate::QUEUED_WORK_CLAIM_TTL_MS,
483 1,
484 )
485 .await
486 .map_err(|err| {
487 RuntimeError::new(RuntimeErrorCode::StoreCommitFailed, err.to_string())
488 })?;
489 let Some(claim) = claim else {
490 return Ok(None);
491 };
492 let Some((batch, command)) = claim.exclusive_session_command() else {
493 store
494 .abandon_queued_work_claim(&claim)
495 .await
496 .map_err(|err| {
497 RuntimeError::new(RuntimeErrorCode::StoreCommitFailed, err.to_string())
498 })?;
499 return Ok(None);
500 };
501 let batch_id = batch.batch_id.clone();
502 let source_key = batch.source_key.clone().unwrap_or_else(|| batch_id.clone());
503 let command = command.clone();
504 self.apply_session_command(command, Some(claim.completion()))
505 .await?;
506 Ok(Some(crate::SessionCommandReceipt {
507 session_id: self.state.session_id.clone(),
508 batch_id,
509 source_key,
510 }))
511 }
512
513 async fn apply_session_command(
514 &mut self,
515 command: crate::SessionCommand,
516 completion: Option<crate::QueuedWorkCompletion>,
517 ) -> Result<(), RuntimeError> {
518 self.refresh_session_graph_from_store()
519 .await
520 .map_err(|err| RuntimeError::new("session_command_refresh", err.to_string()))?;
521 let graph = match command {
522 crate::SessionCommand::RefreshToolCatalog { .. } => {
523 self.refresh_session_tool_catalog().await.map_err(|err| {
524 RuntimeError::new("session_command_refresh_tools", err.to_string())
525 })?;
526 crate::store::GraphCommitDelta::Unchanged {
527 leaf_node_id: self.state.session_graph.leaf_node_id.clone(),
528 }
529 }
530 crate::SessionCommand::ResetSession { .. } => {
531 let mut state = crate::RuntimeSessionState {
532 session_id: self.state.session_id.clone(),
533 policy: self.policy.clone(),
534 graph_replace_required: true,
535 ..crate::RuntimeSessionState::default()
536 };
537 state.ensure_agent_frame_initialized();
538 self.set_persisted_state(state)
539 .map_err(|err| RuntimeError::new("session_command_reset", err.to_string()))?;
540 crate::store::GraphCommitDelta::ReplaceFull(self.state.session_graph.clone())
541 }
542 };
543 let Some(store) = self
544 .session
545 .as_ref()
546 .and_then(|session| session.history_store())
547 else {
548 return Ok(());
549 };
550 let mut commit =
551 crate::store::RuntimeCommit::persisted_state_with_graph_commit(&self.state, graph, &[]);
552 if let Some(completion) = completion {
553 commit = commit.completing_queue_claim(completion);
554 }
555 let result = store.commit_runtime_state(commit).await.map_err(|err| {
556 RuntimeError::new(RuntimeErrorCode::StoreCommitFailed, err.to_string())
557 })?;
558 self.state.apply_persisted_commit_result(result);
559 Ok(())
560 }
561}
562
563pub(in crate::runtime) fn queued_turn_input_store_required() -> RuntimeError {
564 RuntimeError::new(
565 RuntimeErrorCode::StoreCommitFailed,
566 "queued turn input requires a persistent runtime store",
567 )
568}