1use super::*;
2
3fn rewrite_history_scope_id(
4 session_id: &str,
5 turn_index: usize,
6 trigger: &crate::RewriteTrigger,
7) -> String {
8 let trigger_id = match trigger {
9 crate::RewriteTrigger::Manual { .. } => "manual",
10 crate::RewriteTrigger::WindowShrink { .. } => "window-shrink",
11 crate::RewriteTrigger::Periodic => "periodic",
12 };
13 format!("{session_id}:rewrite-history:{trigger_id}:{turn_index}")
14}
15
16impl LashRuntime {
17 pub fn session_id(&self) -> &str {
18 &self.state.session_id
19 }
20
21 pub(super) fn stamp_live_plugin_state(&mut self) {
22 if let Some(session) = self.session.as_ref() {
23 let snapshot = session.plugins().tool_registry().export_state();
24 self.state.tool_state_generation = Some(snapshot.generation());
25 self.state.tool_state_snapshot = Some(snapshot);
26 let captured = session.plugins().snapshot();
27 crate::runtime::state::store_plugin_snapshot(&mut self.state.plugin_snapshot, captured);
28 self.state.plugin_snapshot_revision =
29 Some(session.plugins().snapshot_revision_fingerprint());
30 } else {
31 self.state.tool_state_generation = None;
32 self.state.tool_state_snapshot = None;
33 self.state.plugin_snapshot = None;
34 self.state.plugin_snapshot_revision = None;
35 }
36 }
37 pub(super) fn active_tool_catalog_shared(
38 &self,
39 ) -> Result<Arc<Vec<serde_json::Value>>, crate::PluginError> {
40 self.session
41 .as_ref()
42 .map(|session| session.shared_tool_catalog(&self.state.session_id))
43 .unwrap_or_else(|| Ok(Arc::new(Vec::new())))
44 }
45
46 pub fn tool_state(&self) -> Result<crate::ToolState, SessionError> {
47 let Some(session) = self.session.as_ref() else {
48 return Err(SessionError::Protocol(
49 "runtime session not available".to_string(),
50 ));
51 };
52 Ok(session.plugins().tool_registry().export_state())
53 }
54 pub fn set_protocol_turn_options(&mut self, options: crate::ProtocolTurnOptions) {
56 self.state.protocol_turn_options = options.clone();
57 if let Some(frame) = self.state.current_agent_frame_mut() {
58 frame.protocol_turn_options = options.clone();
59 }
60 self.protocol_turn_options = options;
61 }
62
63 pub fn export_state(&self) -> crate::SessionSnapshot {
67 self.state.to_snapshot()
68 }
69
70 pub fn read_view(&self) -> crate::SessionReadView {
71 crate::SessionReadView::from_runtime_state(
72 &self.state,
73 self.state.effective_policy().clone(),
74 self.state.effective_protocol_turn_options().clone(),
75 )
76 }
77
78 pub fn export_persistence_state(&self) -> RuntimeSessionState {
80 self.state.clone()
81 }
82
83 pub fn apply_persistence_state(
84 &mut self,
85 state: RuntimeSessionState,
86 ) -> Result<(), SessionError> {
87 self.set_persisted_state(state)
88 }
89
90 pub(crate) fn export_graph_first_state(&self) -> RuntimeSessionState {
91 self.state.clone()
92 }
93
94 pub fn export_persisted_state(&self) -> RuntimeSessionState {
97 let mut state = self.state.clone();
98 state.protocol_turn_options = self.protocol_turn_options.clone();
99 if let Some(frame) = state.current_agent_frame_mut() {
100 frame.protocol_turn_options = self.protocol_turn_options.clone();
101 }
102 if let Some(session) = self.session.as_ref() {
103 let snapshot = session.plugins().tool_registry().export_state();
104 state.tool_state_generation = Some(snapshot.generation());
105 state.tool_state_snapshot = Some(snapshot);
106 let captured = session.plugins().snapshot();
107 crate::runtime::state::store_plugin_snapshot(&mut state.plugin_snapshot, captured);
108 state.plugin_snapshot_revision =
109 Some(session.plugins().snapshot_revision_fingerprint());
110 }
111 normalize_session_graph(&mut state);
112 state
113 }
114
115 pub fn usage_report(&self) -> SessionUsageReport {
116 let mut entries = self.state.token_ledger.clone();
117 let drained = self.shared_token_ledger.lock().expect("token ledger lock");
118 for entry in drained.iter().cloned() {
119 merge_ledger_entry(&mut entries, entry);
120 }
121 SessionUsageReport::from_entries(&entries)
122 }
123
124 pub async fn await_background_work(&mut self) -> Result<(), SessionError> {
125 if self.process_sync_needed.swap(false, Ordering::AcqRel) {
126 self.refresh_session_graph_from_store().await?;
127 }
128 Ok(())
129 }
130
131 pub(super) async fn refresh_session_graph_from_store(&mut self) -> Result<(), SessionError> {
132 if self.state.graph_replace_required && self.state.head_revision.is_none() {
136 return Ok(());
137 }
138 let Some(store) = self
139 .session
140 .as_ref()
141 .and_then(|session| session.history_store())
142 else {
143 return Ok(());
144 };
145 let scope = match self.residency {
146 crate::Residency::KeepAll => crate::store::SessionReadScope::FullGraph,
147 crate::Residency::ActivePathOnly => crate::store::SessionReadScope::ActivePath {
148 leaf_node_id: self.state.session_graph.leaf_node_id.clone(),
149 },
150 };
151 let Some(read) = store.load_session(scope).await.map_err(|err| {
152 SessionError::Protocol(format!("failed to refresh session graph from store: {err}"))
153 })?
154 else {
155 return Ok(());
156 };
157 let has_newer_graph = self.state.head_revision != Some(read.head_revision)
158 || read.graph.leaf_node_id != self.state.session_graph.leaf_node_id
159 || read.checkpoint_ref != self.state.checkpoint_ref;
160 if !has_newer_graph {
161 return Ok(());
162 }
163 let head = crate::store::SessionHead {
164 session_id: read.session_id.clone(),
165 head_revision: read.head_revision,
166 agent_frames: read.agent_frames.clone(),
167 current_agent_frame_id: read.current_agent_frame_id.clone(),
168 graph: read.graph,
169 config: read.config.clone(),
170 checkpoint_ref: read.checkpoint_ref.clone(),
171 token_ledger: merge_usage_delta_entries(read.token_ledger),
172 };
173 apply_session_head(&mut self.state, &head);
174 apply_session_checkpoint(&mut self.state, read.checkpoint);
175 self.policy = self.state.effective_policy().clone();
176 self.protocol_turn_options = self.state.effective_protocol_turn_options().clone();
177 Ok(())
178 }
179
180 pub(super) fn runtime_session_services(
181 &self,
182 ) -> Result<Arc<RuntimeSessionServices>, PluginActionInvokeError> {
183 Ok(Arc::new(RuntimeSessionServices::new(self, true, None)?))
184 }
185
186 pub(super) fn runtime_session_services_for_turn(
187 &self,
188 child_usage_event_relay: Option<ChildUsageEventRelay>,
189 ) -> Result<Arc<RuntimeSessionServices>, PluginActionInvokeError> {
190 Ok(Arc::new(RuntimeSessionServices::new(
191 self,
192 false,
193 child_usage_event_relay,
194 )?))
195 }
196
197 pub fn session_state_service(
198 &self,
199 ) -> Result<Arc<dyn crate::plugin::SessionStateService>, PluginActionInvokeError> {
200 self.runtime_session_services()
201 .map(|services| services.state_service())
202 }
203
204 pub fn session_lifecycle_service(
205 &self,
206 ) -> Result<Arc<dyn crate::plugin::SessionLifecycleService>, PluginActionInvokeError> {
207 self.runtime_session_services()
208 .map(|services| services.lifecycle_service())
209 }
210
211 pub fn session_graph_service(
212 &self,
213 ) -> Result<Arc<dyn crate::plugin::SessionGraphService>, PluginActionInvokeError> {
214 self.runtime_session_services()
215 .map(|services| services.graph_service())
216 }
217
218 pub fn process_service(
219 &self,
220 ) -> Result<Arc<dyn crate::ProcessService>, PluginActionInvokeError> {
221 self.runtime_session_services()
222 .map(|services| services.process_service())
223 }
224
225 pub fn process_cancel_ability(&self) -> Arc<dyn crate::ProcessCancelAbility> {
226 Arc::clone(&self.host.core.control.process_cancel_ability)
227 }
228
229 pub fn effect_host(&self) -> Arc<dyn crate::EffectHost> {
230 Arc::clone(&self.host.core.control.effect_host)
231 }
232
233 pub async fn enqueue_turn_input(
234 &self,
235 input: crate::TurnInput,
236 delivery_policy: crate::DeliveryPolicy,
237 slot_policy: crate::SlotPolicy,
238 source_key: Option<String>,
239 ) -> Result<crate::QueuedWorkBatch, RuntimeError> {
240 let store = self
241 .session
242 .as_ref()
243 .and_then(|session| session.history_store())
244 .ok_or_else(queued_turn_input_store_required)?;
245 enqueue_turn_input_to_store(
246 self.state.session_id.clone(),
247 store,
248 self.host.queued_work_poke.clone(),
249 input,
250 delivery_policy,
251 slot_policy,
252 source_key,
253 )
254 .await
255 }
256
257 pub async fn cancel_queued_work_batch(
258 &self,
259 session_id: &str,
260 batch_id: &str,
261 ) -> Result<Option<crate::QueuedWorkBatch>, RuntimeError> {
262 let store = self
263 .session
264 .as_ref()
265 .and_then(|session| session.history_store())
266 .ok_or_else(queued_turn_input_store_required)?;
267 store
268 .cancel_queued_work_batch(session_id, batch_id)
269 .await
270 .map_err(|err| RuntimeError::new(RuntimeErrorCode::StoreCommitFailed, err.to_string()))
271 }
272
273 pub fn plugin_session(&self) -> Option<Arc<crate::PluginSession>> {
275 self.session.as_ref().map(|s| Arc::clone(s.plugins()))
276 }
277
278 pub async fn rewrite_history(
283 &mut self,
284 trigger: crate::RewriteTrigger,
285 ) -> Result<bool, PluginActionInvokeError> {
286 let services = self.runtime_session_services()?;
287 let Some(plugin_session) = self.session.as_ref().map(|s| Arc::clone(s.plugins())) else {
288 return Err(PluginActionInvokeError::Unknown(
289 "runtime session not available".to_string(),
290 ));
291 };
292 let ctx = crate::RewriteContext {
293 session_id: self.state.session_id.clone(),
294 trigger: trigger.clone(),
295 state: self.read_view(),
296 sessions: services.state_service(),
297 session_lifecycle: services.lifecycle_service(),
298 session_graph: services.graph_service(),
299 scoped_effect_controller: self
300 .host
301 .core
302 .control
303 .effect_host
304 .scoped(crate::EffectScope::runtime_operation(
305 rewrite_history_scope_id(
306 &self.state.session_id,
307 self.state.turn_index,
308 &trigger,
309 ),
310 ))
311 .map_err(|err| PluginActionInvokeError::Unknown(err.to_string()))?,
312 };
313 let input = crate::HistoryState::from_snapshot(&self.state.to_snapshot());
314 let baseline_messages = input.messages.len();
315 let outcome = plugin_session
316 .rewrite_history(&ctx, input)
317 .await
318 .map_err(|err| {
319 PluginActionInvokeError::Unknown(format!("rewrite_history failed: {err}"))
320 })?;
321 let mutated =
322 outcome.metadata.produced_summary || outcome.messages.len() != baseline_messages;
323 if mutated {
324 self.state
325 .replace_active_read_state(&outcome.messages, &outcome.tool_calls);
326 if let Some(session) = self.session.as_ref() {
327 self.state.tool_state_snapshot = Some(session.tool_registry().export_state());
328 let captured = session.plugins().snapshot();
329 crate::runtime::state::store_plugin_snapshot(
330 &mut self.state.plugin_snapshot,
331 captured,
332 );
333 self.state.plugin_snapshot_revision =
334 Some(session.plugins().snapshot_revision_fingerprint());
335 }
336 }
337 Ok(mutated)
338 }
339
340 pub(super) fn session_policy(&self) -> SessionPolicy {
341 self.policy.clone()
342 }
343
344 pub(super) async fn notify_session_config_changed(&self, previous: SessionPolicy) {
345 let Some(session) = self.session.as_ref() else {
346 return;
347 };
348 let current = self.session_policy();
349 if current == previous {
350 return;
351 }
352 let Ok(services) = self.runtime_session_services() else {
353 return;
354 };
355 session
356 .plugins()
357 .emit_runtime_event(crate::PluginLifecycleEvent::SessionConfigChanged(Box::new(
358 SessionConfigChangedContext {
359 session_id: self.state.session_id.clone(),
360 previous,
361 current,
362 sessions: services.state_service(),
363 },
364 )))
365 .await;
366 }
367
368 pub(super) async fn apply_session_config_mutations(&mut self, previous: SessionPolicy) {
369 let Some(session) = self.session.as_ref() else {
370 return;
371 };
372 let current = self.session_policy();
373 if current == previous {
374 return;
375 }
376 let Ok(services) = self.runtime_session_services() else {
377 return;
378 };
379 self.policy = session
380 .plugins()
381 .mutate_session_config(
382 SessionConfigChangedContext {
383 session_id: self.state.session_id.clone(),
384 previous,
385 current,
386 sessions: services.state_service(),
387 },
388 self.policy.clone(),
389 )
390 .await;
391 self.state.policy = self.policy.clone();
392 }
393}
394
395pub(in crate::runtime) async fn enqueue_turn_input_to_store(
396 session_id: String,
397 store: Arc<dyn crate::RuntimePersistence>,
398 queued_work_poke: Option<crate::QueuedWorkPoke>,
399 input: crate::TurnInput,
400 delivery_policy: crate::DeliveryPolicy,
401 slot_policy: crate::SlotPolicy,
402 source_key: Option<String>,
403) -> Result<crate::QueuedWorkBatch, RuntimeError> {
404 super::turn_loop::ensure_durable_effect_input(&input)?;
405 let mut draft = crate::QueuedWorkBatchDraft::new(
406 session_id,
407 delivery_policy,
408 slot_policy,
409 vec![crate::QueuedWorkPayload::turn_input(input)],
410 );
411 draft.source_key = source_key;
412 let enqueued = store
413 .enqueue_queued_work(draft)
414 .await
415 .map_err(|err| RuntimeError::new(RuntimeErrorCode::StoreCommitFailed, err.to_string()))?;
416 if let Some(poke) = queued_work_poke.as_ref() {
417 poke.poke_session(enqueued.session_id.clone(), "queued_turn_input");
418 }
419 Ok(enqueued)
420}
421
422impl LashRuntime {
423 pub async fn submit_session_command(
424 &mut self,
425 command: crate::SessionCommand,
426 idempotency_key: impl Into<String>,
427 ) -> Result<crate::SessionCommandReceipt, RuntimeError> {
428 let idempotency_key = idempotency_key.into();
429 if idempotency_key.trim().is_empty() {
430 return Err(RuntimeError::new(
431 RuntimeErrorCode::Other("session_command_idempotency_key".to_string()),
432 "session command idempotency key cannot be empty",
433 ));
434 }
435 let source_key = command.source_key(&idempotency_key);
436 let session_id = self.state.session_id.clone();
437 let Some(store) = self
438 .session
439 .as_ref()
440 .and_then(|session| session.history_store())
441 else {
442 let batch_id = format!("inline-command:{}", uuid::Uuid::new_v4());
443 self.apply_session_command(command, None).await?;
444 return Ok(crate::SessionCommandReceipt {
445 session_id,
446 batch_id,
447 source_key,
448 });
449 };
450 let draft = crate::QueuedWorkBatchDraft::new(
451 session_id.clone(),
452 crate::DeliveryPolicy::AfterCurrentTurnCommit,
453 crate::SlotPolicy::Exclusive,
454 vec![crate::QueuedWorkPayload::session_command(command)],
455 )
456 .with_source_key(source_key.clone());
457 let enqueued = store.enqueue_queued_work(draft).await.map_err(|err| {
458 RuntimeError::new(RuntimeErrorCode::StoreCommitFailed, err.to_string())
459 })?;
460 if let Some(poke) = self.host.queued_work_poke.as_ref() {
461 poke.poke_session(session_id.clone(), "session_command");
462 }
463 Ok(crate::SessionCommandReceipt {
464 session_id,
465 batch_id: enqueued.batch_id,
466 source_key,
467 })
468 }
469
470 pub async fn drain_next_session_command(
471 &mut self,
472 ) -> Result<Option<crate::SessionCommandReceipt>, RuntimeError> {
473 let Some(store) = self
474 .session
475 .as_ref()
476 .and_then(|session| session.history_store())
477 else {
478 return Ok(None);
479 };
480 let claim = store
481 .claim_ready_queued_work(
482 &self.state.session_id,
483 &self.runtime_scope_id,
484 crate::QueuedWorkClaimBoundary::Idle,
485 crate::QUEUED_WORK_CLAIM_TTL_MS,
486 1,
487 )
488 .await
489 .map_err(|err| {
490 RuntimeError::new(RuntimeErrorCode::StoreCommitFailed, err.to_string())
491 })?;
492 let Some(claim) = claim else {
493 return Ok(None);
494 };
495 let Some((batch, command)) = claim.exclusive_session_command() else {
496 store
497 .abandon_queued_work_claim(&claim)
498 .await
499 .map_err(|err| {
500 RuntimeError::new(RuntimeErrorCode::StoreCommitFailed, err.to_string())
501 })?;
502 return Ok(None);
503 };
504 let batch_id = batch.batch_id.clone();
505 let source_key = batch.source_key.clone().unwrap_or_else(|| batch_id.clone());
506 let command = command.clone();
507 self.apply_session_command(command, Some(claim.completion()))
508 .await?;
509 Ok(Some(crate::SessionCommandReceipt {
510 session_id: self.state.session_id.clone(),
511 batch_id,
512 source_key,
513 }))
514 }
515
516 async fn apply_session_command(
517 &mut self,
518 command: crate::SessionCommand,
519 completion: Option<crate::QueuedWorkCompletion>,
520 ) -> Result<(), RuntimeError> {
521 self.refresh_session_graph_from_store()
522 .await
523 .map_err(|err| RuntimeError::new("session_command_refresh", err.to_string()))?;
524 let graph = match command {
525 crate::SessionCommand::RefreshToolSurface {
526 expected_generation,
527 ..
528 } => {
529 if let Some(expected) = expected_generation {
530 let actual = self
531 .tool_state()
532 .map_err(|err| {
533 RuntimeError::new("session_command_tool_state", err.to_string())
534 })?
535 .generation();
536 if actual != expected {
537 return Err(RuntimeError::new(
538 "session_command_generation_mismatch",
539 format!(
540 "expected tool generation {expected}, but live generation is {actual}"
541 ),
542 ));
543 }
544 }
545 self.refresh_session_tool_surface().await.map_err(|err| {
546 RuntimeError::new("session_command_refresh_tools", err.to_string())
547 })?;
548 crate::store::GraphCommitDelta::Unchanged {
549 leaf_node_id: self.state.session_graph.leaf_node_id.clone(),
550 }
551 }
552 crate::SessionCommand::EmitHostEvent {
553 resource_type,
554 alias,
555 event,
556 payload,
557 } => {
558 let effect_host = Arc::clone(&self.host.core.control.effect_host);
559 let drain_id = completion
560 .as_ref()
561 .map(|completion| completion.claim_id.clone())
562 .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
563 let scoped = effect_host
564 .scoped(crate::EffectScope::queue_drain(
565 &self.state.session_id,
566 drain_id,
567 ))
568 .map_err(|err| {
569 RuntimeError::new("session_command_effect_scope", err.to_string())
570 })?;
571 let (_report, graph) = self
572 .emit_host_event_without_persist(
573 &resource_type,
574 &alias,
575 &event,
576 payload,
577 scoped,
578 )
579 .await
580 .map_err(|err| {
581 RuntimeError::new("session_command_host_event", err.to_string())
582 })?;
583 graph
584 }
585 crate::SessionCommand::ResetSession { .. } => {
586 let mut state = crate::RuntimeSessionState {
587 session_id: self.state.session_id.clone(),
588 policy: self.policy.clone(),
589 graph_replace_required: true,
590 ..crate::RuntimeSessionState::default()
591 };
592 state.ensure_agent_frame_initialized();
593 self.set_persisted_state(state)
594 .map_err(|err| RuntimeError::new("session_command_reset", err.to_string()))?;
595 crate::store::GraphCommitDelta::ReplaceFull(self.state.session_graph.clone())
596 }
597 };
598 let Some(store) = self
599 .session
600 .as_ref()
601 .and_then(|session| session.history_store())
602 else {
603 return Ok(());
604 };
605 let mut commit =
606 crate::store::RuntimeCommit::persisted_state_with_graph_commit(&self.state, graph, &[]);
607 if let Some(completion) = completion {
608 commit = commit.completing_queue_claim(completion);
609 }
610 let result = store.commit_runtime_state(commit).await.map_err(|err| {
611 RuntimeError::new(RuntimeErrorCode::StoreCommitFailed, err.to_string())
612 })?;
613 self.state.apply_persisted_commit_result(result);
614 Ok(())
615 }
616}
617
618pub(in crate::runtime) fn queued_turn_input_store_required() -> RuntimeError {
619 RuntimeError::new(
620 RuntimeErrorCode::StoreCommitFailed,
621 "queued turn input requires a persistent runtime store",
622 )
623}