1use super::*;
2
3impl LashRuntime {
4 pub fn session_id(&self) -> &str {
5 &self.state.session_id
6 }
7
8 pub(super) fn stamp_live_plugin_state(&mut self) {
9 if let Some(session) = self.session.as_ref() {
10 let snapshot = session.plugins().tool_registry().export_state();
11 self.state.tool_state_generation = Some(snapshot.generation());
12 self.state.tool_state_snapshot = Some(snapshot);
13 let captured = session.plugins().snapshot();
14 crate::runtime::state::store_plugin_snapshot(&mut self.state.plugin_snapshot, captured);
15 self.state.plugin_snapshot_revision =
16 Some(session.plugins().snapshot_revision_fingerprint());
17 } else {
18 self.state.tool_state_generation = None;
19 self.state.tool_state_snapshot = None;
20 self.state.plugin_snapshot = None;
21 self.state.plugin_snapshot_revision = None;
22 }
23 }
24 pub(super) fn active_tool_catalog_shared(
25 &self,
26 ) -> Result<Arc<Vec<serde_json::Value>>, crate::PluginError> {
27 self.session
28 .as_ref()
29 .map(|session| session.shared_tool_catalog(&self.state.session_id))
30 .unwrap_or_else(|| Ok(Arc::new(Vec::new())))
31 }
32
33 pub fn tool_state(&self) -> Result<crate::ToolState, SessionError> {
34 let Some(session) = self.session.as_ref() else {
35 return Err(SessionError::Protocol(
36 "runtime session not available".to_string(),
37 ));
38 };
39 Ok(session.plugins().tool_registry().export_state())
40 }
41 pub fn set_protocol_turn_options(&mut self, options: crate::ProtocolTurnOptions) {
43 self.state.protocol_turn_options = options.clone();
44 if let Some(frame) = self.state.current_agent_frame_mut() {
45 frame.protocol_turn_options = options.clone();
46 }
47 self.protocol_turn_options = options;
48 }
49
50 pub fn protocol_turn_options(&self) -> &crate::ProtocolTurnOptions {
52 &self.state.protocol_turn_options
53 }
54
55 pub fn set_protocol_turn_options_all_frames(&mut self, options: crate::ProtocolTurnOptions) {
59 self.state.protocol_turn_options = options.clone();
60 for frame in &mut self.state.agent_frames {
61 frame.protocol_turn_options = options.clone();
62 }
63 self.protocol_turn_options = options;
64 }
65
66 pub fn configure_protocol_on_materialize(
75 &mut self,
76 plugin_options: &crate::PluginOptions,
77 is_root_session: bool,
78 ) -> Result<(), crate::PluginError> {
79 let protocol_session = self
80 .session
81 .as_ref()
82 .map(|session| Arc::clone(session.plugins().protocol_session()));
83 if let Some(protocol_session) = protocol_session {
84 let materialization = crate::plugin::ProtocolSessionMaterialization {
85 plugin_options,
86 is_root_session,
87 };
88 protocol_session
89 .configure_runtime_on_materialize(
90 crate::plugin::ProtocolRuntimeContext::new(self),
91 materialization,
92 )
93 .map_err(|err| crate::PluginError::Session(err.to_string()))?;
94 }
95 Ok(())
96 }
97
98 pub fn export_state(&self) -> crate::SessionSnapshot {
102 self.state.to_snapshot()
103 }
104
105 pub fn read_view(&self) -> crate::SessionReadView {
106 crate::SessionReadView::from_runtime_state(
107 &self.state,
108 self.state.effective_policy().clone(),
109 self.state.effective_protocol_turn_options().clone(),
110 )
111 }
112
113 pub fn export_persistence_state(&self) -> RuntimeSessionState {
115 self.state.clone()
116 }
117
118 pub fn apply_persistence_state(
119 &mut self,
120 state: RuntimeSessionState,
121 ) -> Result<(), SessionError> {
122 self.set_persisted_state(state)
123 }
124
125 pub(crate) fn export_graph_first_state(&self) -> RuntimeSessionState {
126 self.state.clone()
127 }
128
129 pub fn export_persisted_state(&self) -> RuntimeSessionState {
132 let mut state = self.state.clone();
133 state.protocol_turn_options = self.protocol_turn_options.clone();
134 if let Some(frame) = state.current_agent_frame_mut() {
135 frame.protocol_turn_options = self.protocol_turn_options.clone();
136 }
137 if let Some(session) = self.session.as_ref() {
138 let snapshot = session.plugins().tool_registry().export_state();
139 state.tool_state_generation = Some(snapshot.generation());
140 state.tool_state_snapshot = Some(snapshot);
141 let captured = session.plugins().snapshot();
142 crate::runtime::state::store_plugin_snapshot(&mut state.plugin_snapshot, captured);
143 state.plugin_snapshot_revision =
144 Some(session.plugins().snapshot_revision_fingerprint());
145 }
146 normalize_session_graph(&mut state);
147 state
148 }
149
150 pub fn usage_report(&self) -> SessionUsageReport {
151 let mut entries = self.state.token_ledger.clone();
152 let drained = self.shared_token_ledger.lock().expect("token ledger lock");
153 for entry in drained.iter().cloned() {
154 merge_ledger_entry(&mut entries, entry);
155 }
156 SessionUsageReport::from_entries(&entries)
157 }
158
159 pub async fn await_background_work(&mut self) -> Result<(), SessionError> {
160 if self.process_sync_needed.swap(false, Ordering::AcqRel) {
161 self.refresh_session_graph_from_store().await?;
162 }
163 Ok(())
164 }
165
166 pub(super) async fn refresh_session_graph_from_store(&mut self) -> Result<(), SessionError> {
167 if self.state.graph_replace_required && self.state.head_revision.is_none() {
171 return Ok(());
172 }
173 let Some(store) = self
174 .session
175 .as_ref()
176 .and_then(|session| session.history_store())
177 else {
178 return Ok(());
179 };
180 let scope = match self.residency {
181 crate::Residency::KeepAll => crate::store::SessionReadScope::FullGraph,
182 crate::Residency::ActivePathOnly => crate::store::SessionReadScope::ActivePath {
183 leaf_node_id: self.state.session_graph.leaf_node_id.clone(),
184 },
185 };
186 let Some(read) = store.load_session(scope).await.map_err(|err| {
187 SessionError::Protocol(format!("failed to refresh session graph from store: {err}"))
188 })?
189 else {
190 return Ok(());
191 };
192 let has_newer_graph = self.state.head_revision != Some(read.head_revision)
193 || read.graph.leaf_node_id != self.state.session_graph.leaf_node_id
194 || read.checkpoint_ref != self.state.checkpoint_ref;
195 if !has_newer_graph {
196 return Ok(());
197 }
198 let head = crate::store::SessionHead {
199 session_id: read.session_id.clone(),
200 head_revision: read.head_revision,
201 agent_frames: read.agent_frames.clone(),
202 current_agent_frame_id: read.current_agent_frame_id.clone(),
203 graph: read.graph,
204 config: read.config.clone(),
205 checkpoint_ref: read.checkpoint_ref.clone(),
206 token_ledger: merge_usage_delta_entries(read.token_ledger),
207 };
208 apply_session_head(&mut self.state, &head);
209 apply_session_checkpoint(&mut self.state, read.checkpoint);
210 self.policy = self.state.effective_policy().clone();
211 self.protocol_turn_options = self.state.effective_protocol_turn_options().clone();
212 Ok(())
213 }
214
215 pub(super) fn runtime_session_services(
216 &self,
217 ) -> Result<Arc<RuntimeSessionServices>, PluginOperationInvokeError> {
218 Ok(Arc::new(RuntimeSessionServices::new(self, true, None)?))
219 }
220
221 pub(super) fn runtime_session_services_for_turn(
222 &self,
223 child_usage_event_relay: Option<ChildUsageEventRelay>,
224 ) -> Result<Arc<RuntimeSessionServices>, PluginOperationInvokeError> {
225 Ok(Arc::new(RuntimeSessionServices::new(
226 self,
227 false,
228 child_usage_event_relay,
229 )?))
230 }
231
232 pub fn session_state_service(
233 &self,
234 ) -> Result<Arc<dyn crate::plugin::SessionStateService>, PluginOperationInvokeError> {
235 self.runtime_session_services()
236 .map(|services| services.state_service())
237 }
238
239 pub fn session_lifecycle_service(
240 &self,
241 ) -> Result<Arc<dyn crate::plugin::SessionLifecycleService>, PluginOperationInvokeError> {
242 self.runtime_session_services()
243 .map(|services| services.lifecycle_service())
244 }
245
246 pub fn session_graph_service(
247 &self,
248 ) -> Result<Arc<dyn crate::plugin::SessionGraphService>, PluginOperationInvokeError> {
249 self.runtime_session_services()
250 .map(|services| services.graph_service())
251 }
252
253 pub fn process_service(
254 &self,
255 ) -> Result<Arc<dyn crate::ProcessService>, PluginOperationInvokeError> {
256 self.runtime_session_services()
257 .map(|services| services.process_service())
258 }
259
260 pub fn process_cancel_ability(&self) -> Arc<dyn crate::ProcessCancelAbility> {
261 Arc::clone(&self.host.core.control.process_cancel_ability)
262 }
263
264 pub fn effect_host(&self) -> Arc<dyn crate::EffectHost> {
265 Arc::clone(&self.host.core.control.effect_host)
266 }
267
268 pub async fn enqueue_turn_input(
269 &self,
270 input: crate::TurnInput,
271 ingress: crate::TurnInputIngress,
272 source_key: Option<String>,
273 ) -> Result<crate::PendingTurnInput, RuntimeError> {
274 let store = self
275 .session
276 .as_ref()
277 .and_then(|session| session.history_store())
278 .ok_or_else(queued_turn_input_store_required)?;
279 enqueue_turn_input_to_store(
280 self.state.session_id.clone(),
281 store,
282 self.host.queued_work_driver.clone(),
283 input,
284 ingress,
285 source_key,
286 )
287 .await
288 }
289
290 pub async fn cancel_queued_work_batch(
291 &self,
292 session_id: &str,
293 batch_id: &str,
294 ) -> Result<Option<crate::QueuedWorkBatch>, RuntimeError> {
295 let store = self
296 .session
297 .as_ref()
298 .and_then(|session| session.history_store())
299 .ok_or_else(queued_turn_input_store_required)?;
300 store
301 .cancel_queued_work_batch(session_id, batch_id)
302 .await
303 .map_err(|err| RuntimeError::new(RuntimeErrorCode::StoreCommitFailed, err.to_string()))
304 }
305
306 pub fn plugin_session(&self) -> Option<Arc<crate::PluginSession>> {
308 self.session.as_ref().map(|s| Arc::clone(s.plugins()))
309 }
310
311 pub fn open_agent_frame(
312 &mut self,
313 request: crate::OpenAgentFrameRequest,
314 ) -> crate::OpenAgentFrameResult {
315 open_agent_frame_in_state_with_clock(
316 &mut self.state,
317 request,
318 self.host.core.clock.as_ref(),
319 )
320 }
321
322 pub async fn compact_context(
325 &mut self,
326 instructions: Option<String>,
327 scoped_effect_controller: crate::ScopedEffectController<'_>,
328 ) -> Result<bool, PluginOperationInvokeError> {
329 let services = self.runtime_session_services()?;
330 let Some(plugin_session) = self.session.as_ref().map(|s| Arc::clone(s.plugins())) else {
331 return Err(PluginOperationInvokeError::Unknown(
332 "runtime session not available".to_string(),
333 ));
334 };
335 let ctx = crate::CompactionContext {
336 session_id: self.state.session_id.clone(),
337 state: self.read_view(),
338 instructions,
339 sessions: services.state_service(),
340 session_lifecycle: services.lifecycle_service(),
341 session_graph: services.graph_service(),
342 scoped_effect_controller,
343 };
344 let Some(compaction) = plugin_session.compact_context(&ctx).await.map_err(|err| {
345 PluginOperationInvokeError::Unknown(format!("context compaction failed: {err}"))
346 })?
347 else {
348 return Ok(false);
349 };
350 let frame_id = format!(
351 "{}:frame:compaction:{}",
352 self.state.session_id,
353 uuid::Uuid::new_v4()
354 );
355 let result = self.open_agent_frame(
356 crate::OpenAgentFrameRequest::new(frame_id, crate::AgentFrameReason::compaction())
357 .with_initial_nodes(compaction.initial_nodes),
358 );
359 if result.opened {
360 self.stamp_live_plugin_state();
361 }
362 Ok(result.opened)
363 }
364
365 pub(super) fn session_policy(&self) -> SessionPolicy {
366 self.policy.clone()
367 }
368
369 pub(super) async fn notify_session_config_changed(&self, previous: SessionPolicy) {
370 let Some(session) = self.session.as_ref() else {
371 return;
372 };
373 let current = self.session_policy();
374 if current == previous {
375 return;
376 }
377 let Ok(services) = self.runtime_session_services() else {
378 return;
379 };
380 session
381 .plugins()
382 .emit_runtime_event(crate::PluginLifecycleEvent::SessionConfigChanged(Box::new(
383 SessionConfigChangedContext {
384 session_id: self.state.session_id.clone(),
385 previous,
386 current,
387 sessions: services.state_service(),
388 },
389 )))
390 .await;
391 }
392
393 pub(super) async fn apply_session_config_mutations(&mut self, previous: SessionPolicy) {
394 let Some(session) = self.session.as_ref() else {
395 return;
396 };
397 let current = self.session_policy();
398 if current == previous {
399 return;
400 }
401 let Ok(services) = self.runtime_session_services() else {
402 return;
403 };
404 self.policy = session
405 .plugins()
406 .mutate_session_config(
407 SessionConfigChangedContext {
408 session_id: self.state.session_id.clone(),
409 previous,
410 current,
411 sessions: services.state_service(),
412 },
413 self.policy.clone(),
414 )
415 .await;
416 self.state.policy = self.policy.clone();
417 }
418}
419
420pub(in crate::runtime) async fn enqueue_turn_input_to_store(
421 session_id: String,
422 store: Arc<dyn crate::RuntimePersistence>,
423 queued_work_driver: Option<crate::QueuedWorkDriver>,
424 input: crate::TurnInput,
425 ingress: crate::TurnInputIngress,
426 source_key: Option<String>,
427) -> Result<crate::PendingTurnInput, RuntimeError> {
428 super::turn_loop::ensure_durable_effect_input(&input)?;
429 let is_next_turn = matches!(ingress, crate::TurnInputIngress::NextTurn);
430 let mut draft = crate::PendingTurnInputDraft::new(session_id, ingress, input);
431 draft.source_key = source_key;
432 let enqueued = store
433 .enqueue_pending_turn_input(draft)
434 .await
435 .map_err(|err| RuntimeError::new(RuntimeErrorCode::StoreCommitFailed, err.to_string()))?;
436 if is_next_turn && let Some(driver) = queued_work_driver.as_ref() {
437 driver
438 .claim_and_run_pending(Some(&enqueued.session_id), "queued_turn_input")
439 .await
440 .map_err(|err| {
441 RuntimeError::new(
442 RuntimeErrorCode::Other("queued_work".to_string()),
443 err.to_string(),
444 )
445 })?;
446 }
447 Ok(enqueued)
448}
449
450impl LashRuntime {
451 pub async fn submit_session_command(
452 &mut self,
453 command: crate::SessionCommand,
454 idempotency_key: impl Into<String>,
455 ) -> Result<crate::SessionCommandReceipt, RuntimeError> {
456 let idempotency_key = idempotency_key.into();
457 if idempotency_key.trim().is_empty() {
458 return Err(RuntimeError::new(
459 RuntimeErrorCode::Other("session_command_idempotency_key".to_string()),
460 "session command idempotency key cannot be empty",
461 ));
462 }
463 let source_key = command.source_key(&idempotency_key);
464 let session_id = self.state.session_id.clone();
465 let Some(store) = self
466 .session
467 .as_ref()
468 .and_then(|session| session.history_store())
469 else {
470 let batch_id = format!("inline-command:{}", uuid::Uuid::new_v4());
471 self.apply_session_command(command, None, None).await?;
472 return Ok(crate::SessionCommandReceipt {
473 session_id,
474 batch_id,
475 source_key,
476 });
477 };
478 let draft = crate::QueuedWorkBatchDraft::new(
479 session_id.clone(),
480 crate::DeliveryPolicy::AfterCurrentTurnCommit,
481 crate::SlotPolicy::Exclusive,
482 vec![crate::QueuedWorkPayload::session_command(command)],
483 )
484 .with_source_key(source_key.clone());
485 let enqueued = store.enqueue_queued_work(draft).await.map_err(|err| {
486 RuntimeError::new(RuntimeErrorCode::StoreCommitFailed, err.to_string())
487 })?;
488 if let Some(driver) = self.host.queued_work_driver.as_ref() {
489 driver
490 .claim_and_run_pending(Some(&session_id), "session_command")
491 .await
492 .map_err(|err| {
493 RuntimeError::new(
494 RuntimeErrorCode::Other("queued_work".to_string()),
495 err.to_string(),
496 )
497 })?;
498 }
499 Ok(crate::SessionCommandReceipt {
500 session_id,
501 batch_id: enqueued.batch_id,
502 source_key,
503 })
504 }
505
506 pub async fn drain_next_session_command(
507 &mut self,
508 session_execution_lease: &crate::SessionExecutionLeaseFence,
509 ) -> Result<Option<crate::SessionCommandReceipt>, RuntimeError> {
510 let Some(store) = self
511 .session
512 .as_ref()
513 .and_then(|session| session.history_store())
514 else {
515 return Ok(None);
516 };
517 let claim = store
518 .claim_leading_ready_session_command(
519 &self.state.session_id,
520 session_execution_lease,
521 &self.runtime_lease_owner,
522 self.host.core.control.lease_timings.ttl_ms(),
523 )
524 .await
525 .map_err(super::runtime_error_from_store_commit)?;
526 let Some(claim) = claim else {
527 return Ok(None);
528 };
529 let Some((batch, command)) = claim.exclusive_session_command() else {
530 return Err(RuntimeError::new(
531 "session_command_claim",
532 format!(
533 "queued-work claim `{}` did not contain exactly one session command batch",
534 claim.claim_id
535 ),
536 ));
537 };
538 let batch_id = batch.batch_id.clone();
539 let source_key = batch.source_key.clone().unwrap_or_else(|| batch_id.clone());
540 let command = command.clone();
541 self.apply_session_command(
542 command,
543 Some(claim.completion()),
544 Some(session_execution_lease),
545 )
546 .await?;
547 Ok(Some(crate::SessionCommandReceipt {
548 session_id: self.state.session_id.clone(),
549 batch_id,
550 source_key,
551 }))
552 }
553
554 async fn apply_session_command(
555 &mut self,
556 command: crate::SessionCommand,
557 completion: Option<crate::QueuedWorkCompletion>,
558 session_execution_lease: Option<&crate::SessionExecutionLeaseFence>,
559 ) -> Result<(), RuntimeError> {
560 self.refresh_session_graph_from_store()
561 .await
562 .map_err(|err| RuntimeError::new("session_command_refresh", err.to_string()))?;
563 let graph = match command {
564 crate::SessionCommand::RefreshToolCatalog { .. } => {
565 self.refresh_session_tool_catalog().await.map_err(|err| {
566 RuntimeError::new("session_command_refresh_tools", err.to_string())
567 })?;
568 crate::store::GraphCommitDelta::Unchanged {
569 leaf_node_id: self.state.session_graph.leaf_node_id.clone(),
570 }
571 }
572 crate::SessionCommand::ResetSession { .. } => {
573 let mut state = crate::RuntimeSessionState {
574 session_id: self.state.session_id.clone(),
575 policy: self.policy.clone(),
576 graph_replace_required: true,
577 ..crate::RuntimeSessionState::default()
578 };
579 state.ensure_agent_frame_initialized();
580 self.set_persisted_state(state)
581 .map_err(|err| RuntimeError::new("session_command_reset", err.to_string()))?;
582 crate::store::GraphCommitDelta::ReplaceFull(self.state.session_graph.clone())
583 }
584 };
585 let Some(store) = self
586 .session
587 .as_ref()
588 .and_then(|session| session.history_store())
589 else {
590 return Ok(());
591 };
592 let mut commit =
593 crate::store::RuntimeCommit::persisted_state_with_graph_commit(&self.state, graph, &[]);
594 let Some(session_execution_lease) = session_execution_lease else {
595 return Err(RuntimeError::new(
596 RuntimeErrorCode::StoreCommitFailed,
597 "session command commit requires a session execution lease",
598 ));
599 };
600 commit = commit.with_session_execution_lease(session_execution_lease.clone());
601 if let Some(completion) = completion {
602 commit = commit.completing_queue_claim(completion);
603 }
604 let result = store
605 .commit_runtime_state(commit)
606 .await
607 .map_err(super::runtime_error_from_store_commit)?;
608 self.state.apply_persisted_commit_result(result);
609 Ok(())
610 }
611}
612
613pub(in crate::runtime) fn queued_turn_input_store_required() -> RuntimeError {
614 RuntimeError::new(
615 RuntimeErrorCode::StoreCommitFailed,
616 "queued turn input requires a persistent runtime store",
617 )
618}