1use super::*;
2
3fn rewrite_history_scope_id(
4 session_id: &str,
5 turn_index: usize,
6 trigger: &crate::RewriteTrigger,
7) -> String {
8 let trigger_id = match trigger {
9 crate::RewriteTrigger::Manual { .. } => "manual",
10 crate::RewriteTrigger::WindowShrink { .. } => "window-shrink",
11 crate::RewriteTrigger::Periodic => "periodic",
12 };
13 format!("{session_id}:rewrite-history:{trigger_id}:{turn_index}")
14}
15
16impl LashRuntime {
17 pub fn session_id(&self) -> &str {
18 &self.state.session_id
19 }
20
21 pub(super) fn stamp_live_plugin_state(&mut self) {
22 if let Some(session) = self.session.as_ref() {
23 let snapshot = session.plugins().tool_registry().export_state();
24 self.state.tool_state_generation = Some(snapshot.generation());
25 self.state.tool_state_snapshot = Some(snapshot);
26 let captured = session.plugins().snapshot();
27 crate::runtime::state::store_plugin_snapshot(&mut self.state.plugin_snapshot, captured);
28 self.state.plugin_snapshot_revision =
29 Some(session.plugins().snapshot_revision_fingerprint());
30 } else {
31 self.state.tool_state_generation = None;
32 self.state.tool_state_snapshot = None;
33 self.state.plugin_snapshot = None;
34 self.state.plugin_snapshot_revision = None;
35 }
36 }
37 pub(super) fn active_tool_catalog_shared(
38 &self,
39 ) -> Result<Arc<Vec<serde_json::Value>>, crate::PluginError> {
40 self.session
41 .as_ref()
42 .map(|session| session.shared_tool_catalog(&self.state.session_id))
43 .unwrap_or_else(|| Ok(Arc::new(Vec::new())))
44 }
45
46 pub fn tool_state(&self) -> Result<crate::ToolState, SessionError> {
47 let Some(session) = self.session.as_ref() else {
48 return Err(SessionError::Protocol(
49 "runtime session not available".to_string(),
50 ));
51 };
52 Ok(session.plugins().tool_registry().export_state())
53 }
54 pub fn set_protocol_turn_options(&mut self, options: crate::ProtocolTurnOptions) {
56 self.state.protocol_turn_options = options.clone();
57 if let Some(frame) = self.state.current_agent_frame_mut() {
58 frame.protocol_turn_options = options.clone();
59 }
60 self.protocol_turn_options = options;
61 }
62
63 pub fn export_state(&self) -> crate::SessionSnapshot {
67 self.state.to_snapshot()
68 }
69
70 pub fn read_view(&self) -> crate::SessionReadView {
71 crate::SessionReadView::from_runtime_state(
72 &self.state,
73 self.state.effective_policy().clone(),
74 self.state.effective_protocol_turn_options().clone(),
75 )
76 }
77
78 pub fn export_persistence_state(&self) -> RuntimeSessionState {
80 self.state.clone()
81 }
82
83 pub fn apply_persistence_state(
84 &mut self,
85 state: RuntimeSessionState,
86 ) -> Result<(), SessionError> {
87 self.set_persisted_state(state)
88 }
89
90 pub(crate) fn export_graph_first_state(&self) -> RuntimeSessionState {
91 self.state.clone()
92 }
93
94 pub fn export_persisted_state(&self) -> RuntimeSessionState {
97 let mut state = self.state.clone();
98 state.protocol_turn_options = self.protocol_turn_options.clone();
99 if let Some(frame) = state.current_agent_frame_mut() {
100 frame.protocol_turn_options = self.protocol_turn_options.clone();
101 }
102 if let Some(session) = self.session.as_ref() {
103 let snapshot = session.plugins().tool_registry().export_state();
104 state.tool_state_generation = Some(snapshot.generation());
105 state.tool_state_snapshot = Some(snapshot);
106 let captured = session.plugins().snapshot();
107 crate::runtime::state::store_plugin_snapshot(&mut state.plugin_snapshot, captured);
108 state.plugin_snapshot_revision =
109 Some(session.plugins().snapshot_revision_fingerprint());
110 }
111 normalize_session_graph(&mut state);
112 state
113 }
114
115 pub fn usage_report(&self) -> SessionUsageReport {
116 let mut entries = self.state.token_ledger.clone();
117 let drained = self.shared_token_ledger.lock().expect("token ledger lock");
118 for entry in drained.iter().cloned() {
119 merge_ledger_entry(&mut entries, entry);
120 }
121 SessionUsageReport::from_entries(&entries)
122 }
123
124 pub async fn await_background_work(&mut self) -> Result<(), SessionError> {
125 if self.process_sync_needed.swap(false, Ordering::AcqRel) {
126 self.refresh_session_graph_from_store().await?;
127 }
128 Ok(())
129 }
130
131 pub(super) async fn refresh_session_graph_from_store(&mut self) -> Result<(), SessionError> {
132 if self.state.graph_replace_required && self.state.head_revision.is_none() {
136 return Ok(());
137 }
138 let Some(store) = self
139 .session
140 .as_ref()
141 .and_then(|session| session.history_store())
142 else {
143 return Ok(());
144 };
145 let scope = match self.residency {
146 crate::Residency::KeepAll => crate::store::SessionReadScope::FullGraph,
147 crate::Residency::ActivePathOnly => crate::store::SessionReadScope::ActivePath {
148 leaf_node_id: self.state.session_graph.leaf_node_id.clone(),
149 },
150 };
151 let Some(read) = store.load_session(scope).await.map_err(|err| {
152 SessionError::Protocol(format!("failed to refresh session graph from store: {err}"))
153 })?
154 else {
155 return Ok(());
156 };
157 let has_newer_graph = self.state.head_revision != Some(read.head_revision)
158 || read.graph.leaf_node_id != self.state.session_graph.leaf_node_id
159 || read.checkpoint_ref != self.state.checkpoint_ref;
160 if !has_newer_graph {
161 return Ok(());
162 }
163 let head = crate::store::SessionHead {
164 session_id: read.session_id.clone(),
165 head_revision: read.head_revision,
166 agent_frames: read.agent_frames.clone(),
167 current_agent_frame_id: read.current_agent_frame_id.clone(),
168 graph: read.graph,
169 config: read.config.clone(),
170 checkpoint_ref: read.checkpoint_ref.clone(),
171 token_ledger: merge_usage_delta_entries(read.token_ledger),
172 };
173 apply_session_head(&mut self.state, &head);
174 apply_session_checkpoint(&mut self.state, read.checkpoint);
175 self.policy = self.state.effective_policy().clone();
176 self.protocol_turn_options = self.state.effective_protocol_turn_options().clone();
177 Ok(())
178 }
179
180 pub(super) fn runtime_session_services(
181 &self,
182 ) -> Result<Arc<RuntimeSessionServices>, PluginActionInvokeError> {
183 Ok(Arc::new(RuntimeSessionServices::new(self, true, None)?))
184 }
185
186 pub(super) fn runtime_session_services_for_turn(
187 &self,
188 child_usage_event_relay: Option<ChildUsageEventRelay>,
189 ) -> Result<Arc<RuntimeSessionServices>, PluginActionInvokeError> {
190 Ok(Arc::new(RuntimeSessionServices::new(
191 self,
192 false,
193 child_usage_event_relay,
194 )?))
195 }
196
197 pub fn session_state_service(
198 &self,
199 ) -> Result<Arc<dyn crate::plugin::SessionStateService>, PluginActionInvokeError> {
200 self.runtime_session_services()
201 .map(|services| services.state_service())
202 }
203
204 pub fn session_lifecycle_service(
205 &self,
206 ) -> Result<Arc<dyn crate::plugin::SessionLifecycleService>, PluginActionInvokeError> {
207 self.runtime_session_services()
208 .map(|services| services.lifecycle_service())
209 }
210
211 pub fn session_graph_service(
212 &self,
213 ) -> Result<Arc<dyn crate::plugin::SessionGraphService>, PluginActionInvokeError> {
214 self.runtime_session_services()
215 .map(|services| services.graph_service())
216 }
217
218 pub fn process_service(
219 &self,
220 ) -> Result<Arc<dyn crate::ProcessService>, PluginActionInvokeError> {
221 self.runtime_session_services()
222 .map(|services| services.process_service())
223 }
224
225 pub fn process_cancel_ability(&self) -> Arc<dyn crate::ProcessCancelAbility> {
226 Arc::clone(&self.host.core.control.process_cancel_ability)
227 }
228
229 pub fn effect_host(&self) -> Arc<dyn crate::EffectHost> {
230 Arc::clone(&self.host.core.control.effect_host)
231 }
232
233 pub async fn enqueue_turn_input(
234 &self,
235 input: crate::TurnInput,
236 delivery_policy: crate::DeliveryPolicy,
237 slot_policy: crate::SlotPolicy,
238 source_key: Option<String>,
239 ) -> Result<crate::QueuedWorkBatch, RuntimeError> {
240 let store = self
241 .session
242 .as_ref()
243 .and_then(|session| session.history_store())
244 .ok_or_else(queued_turn_input_store_required)?;
245 enqueue_turn_input_to_store(
246 self.state.session_id.clone(),
247 store,
248 self.host.queued_work_poke.clone(),
249 input,
250 delivery_policy,
251 slot_policy,
252 source_key,
253 )
254 .await
255 }
256
257 pub async fn cancel_queued_work_batch(
258 &self,
259 session_id: &str,
260 batch_id: &str,
261 ) -> Result<Option<crate::QueuedWorkBatch>, RuntimeError> {
262 let store = self
263 .session
264 .as_ref()
265 .and_then(|session| session.history_store())
266 .ok_or_else(queued_turn_input_store_required)?;
267 store
268 .cancel_queued_work_batch(session_id, batch_id)
269 .await
270 .map_err(|err| RuntimeError::new(RuntimeErrorCode::StoreCommitFailed, err.to_string()))
271 }
272
273 pub fn plugin_session(&self) -> Option<Arc<crate::PluginSession>> {
275 self.session.as_ref().map(|s| Arc::clone(s.plugins()))
276 }
277
278 pub async fn rewrite_history(
283 &mut self,
284 trigger: crate::RewriteTrigger,
285 ) -> Result<bool, PluginActionInvokeError> {
286 let services = self.runtime_session_services()?;
287 let Some(plugin_session) = self.session.as_ref().map(|s| Arc::clone(s.plugins())) else {
288 return Err(PluginActionInvokeError::Unknown(
289 "runtime session not available".to_string(),
290 ));
291 };
292 let ctx = crate::RewriteContext {
293 session_id: self.state.session_id.clone(),
294 trigger: trigger.clone(),
295 state: self.read_view(),
296 sessions: services.state_service(),
297 session_lifecycle: services.lifecycle_service(),
298 session_graph: services.graph_service(),
299 scoped_effect_controller: self
300 .host
301 .core
302 .control
303 .effect_host
304 .scoped(crate::EffectScope::runtime_operation(
305 rewrite_history_scope_id(
306 &self.state.session_id,
307 self.state.turn_index,
308 &trigger,
309 ),
310 ))
311 .map_err(|err| PluginActionInvokeError::Unknown(err.to_string()))?,
312 };
313 let input = crate::HistoryState::from_snapshot(&self.state.to_snapshot());
314 let baseline_messages = input.messages.len();
315 let outcome = plugin_session
316 .rewrite_history(&ctx, input)
317 .await
318 .map_err(|err| {
319 PluginActionInvokeError::Unknown(format!("rewrite_history failed: {err}"))
320 })?;
321 let mutated =
322 outcome.metadata.produced_summary || outcome.messages.len() != baseline_messages;
323 if mutated {
324 self.state.replace_active_read_state(&outcome.messages);
325 if let Some(session) = self.session.as_ref() {
326 self.state.tool_state_snapshot = Some(session.tool_registry().export_state());
327 let captured = session.plugins().snapshot();
328 crate::runtime::state::store_plugin_snapshot(
329 &mut self.state.plugin_snapshot,
330 captured,
331 );
332 self.state.plugin_snapshot_revision =
333 Some(session.plugins().snapshot_revision_fingerprint());
334 }
335 }
336 Ok(mutated)
337 }
338
339 pub(super) fn session_policy(&self) -> SessionPolicy {
340 self.policy.clone()
341 }
342
343 pub(super) async fn notify_session_config_changed(&self, previous: SessionPolicy) {
344 let Some(session) = self.session.as_ref() else {
345 return;
346 };
347 let current = self.session_policy();
348 if current == previous {
349 return;
350 }
351 let Ok(services) = self.runtime_session_services() else {
352 return;
353 };
354 session
355 .plugins()
356 .emit_runtime_event(crate::PluginLifecycleEvent::SessionConfigChanged(Box::new(
357 SessionConfigChangedContext {
358 session_id: self.state.session_id.clone(),
359 previous,
360 current,
361 sessions: services.state_service(),
362 },
363 )))
364 .await;
365 }
366
367 pub(super) async fn apply_session_config_mutations(&mut self, previous: SessionPolicy) {
368 let Some(session) = self.session.as_ref() else {
369 return;
370 };
371 let current = self.session_policy();
372 if current == previous {
373 return;
374 }
375 let Ok(services) = self.runtime_session_services() else {
376 return;
377 };
378 self.policy = session
379 .plugins()
380 .mutate_session_config(
381 SessionConfigChangedContext {
382 session_id: self.state.session_id.clone(),
383 previous,
384 current,
385 sessions: services.state_service(),
386 },
387 self.policy.clone(),
388 )
389 .await;
390 self.state.policy = self.policy.clone();
391 }
392}
393
394pub(in crate::runtime) async fn enqueue_turn_input_to_store(
395 session_id: String,
396 store: Arc<dyn crate::RuntimePersistence>,
397 queued_work_poke: Option<crate::QueuedWorkPoke>,
398 input: crate::TurnInput,
399 delivery_policy: crate::DeliveryPolicy,
400 slot_policy: crate::SlotPolicy,
401 source_key: Option<String>,
402) -> Result<crate::QueuedWorkBatch, RuntimeError> {
403 super::turn_loop::ensure_durable_effect_input(&input)?;
404 let mut draft = crate::QueuedWorkBatchDraft::new(
405 session_id,
406 delivery_policy,
407 slot_policy,
408 vec![crate::QueuedWorkPayload::turn_input(input)],
409 );
410 draft.source_key = source_key;
411 let enqueued = store
412 .enqueue_queued_work(draft)
413 .await
414 .map_err(|err| RuntimeError::new(RuntimeErrorCode::StoreCommitFailed, err.to_string()))?;
415 if let Some(poke) = queued_work_poke.as_ref() {
416 poke.poke_session(enqueued.session_id.clone(), "queued_turn_input");
417 }
418 Ok(enqueued)
419}
420
421impl LashRuntime {
422 pub async fn submit_session_command(
423 &mut self,
424 command: crate::SessionCommand,
425 idempotency_key: impl Into<String>,
426 ) -> Result<crate::SessionCommandReceipt, RuntimeError> {
427 let idempotency_key = idempotency_key.into();
428 if idempotency_key.trim().is_empty() {
429 return Err(RuntimeError::new(
430 RuntimeErrorCode::Other("session_command_idempotency_key".to_string()),
431 "session command idempotency key cannot be empty",
432 ));
433 }
434 let source_key = command.source_key(&idempotency_key);
435 let session_id = self.state.session_id.clone();
436 let Some(store) = self
437 .session
438 .as_ref()
439 .and_then(|session| session.history_store())
440 else {
441 let batch_id = format!("inline-command:{}", uuid::Uuid::new_v4());
442 self.apply_session_command(command, None).await?;
443 return Ok(crate::SessionCommandReceipt {
444 session_id,
445 batch_id,
446 source_key,
447 });
448 };
449 let draft = crate::QueuedWorkBatchDraft::new(
450 session_id.clone(),
451 crate::DeliveryPolicy::AfterCurrentTurnCommit,
452 crate::SlotPolicy::Exclusive,
453 vec![crate::QueuedWorkPayload::session_command(command)],
454 )
455 .with_source_key(source_key.clone());
456 let enqueued = store.enqueue_queued_work(draft).await.map_err(|err| {
457 RuntimeError::new(RuntimeErrorCode::StoreCommitFailed, err.to_string())
458 })?;
459 if let Some(poke) = self.host.queued_work_poke.as_ref() {
460 poke.poke_session(session_id.clone(), "session_command");
461 }
462 Ok(crate::SessionCommandReceipt {
463 session_id,
464 batch_id: enqueued.batch_id,
465 source_key,
466 })
467 }
468
469 pub async fn drain_next_session_command(
470 &mut self,
471 ) -> Result<Option<crate::SessionCommandReceipt>, RuntimeError> {
472 let Some(store) = self
473 .session
474 .as_ref()
475 .and_then(|session| session.history_store())
476 else {
477 return Ok(None);
478 };
479 let claim = store
480 .claim_ready_queued_work(
481 &self.state.session_id,
482 &self.runtime_scope_id,
483 crate::QueuedWorkClaimBoundary::Idle,
484 crate::QUEUED_WORK_CLAIM_TTL_MS,
485 1,
486 )
487 .await
488 .map_err(|err| {
489 RuntimeError::new(RuntimeErrorCode::StoreCommitFailed, err.to_string())
490 })?;
491 let Some(claim) = claim else {
492 return Ok(None);
493 };
494 let Some((batch, command)) = claim.exclusive_session_command() else {
495 store
496 .abandon_queued_work_claim(&claim)
497 .await
498 .map_err(|err| {
499 RuntimeError::new(RuntimeErrorCode::StoreCommitFailed, err.to_string())
500 })?;
501 return Ok(None);
502 };
503 let batch_id = batch.batch_id.clone();
504 let source_key = batch.source_key.clone().unwrap_or_else(|| batch_id.clone());
505 let command = command.clone();
506 self.apply_session_command(command, Some(claim.completion()))
507 .await?;
508 Ok(Some(crate::SessionCommandReceipt {
509 session_id: self.state.session_id.clone(),
510 batch_id,
511 source_key,
512 }))
513 }
514
515 async fn apply_session_command(
516 &mut self,
517 command: crate::SessionCommand,
518 completion: Option<crate::QueuedWorkCompletion>,
519 ) -> Result<(), RuntimeError> {
520 self.refresh_session_graph_from_store()
521 .await
522 .map_err(|err| RuntimeError::new("session_command_refresh", err.to_string()))?;
523 let graph = match command {
524 crate::SessionCommand::RefreshToolSurface {
525 expected_generation,
526 ..
527 } => {
528 if let Some(expected) = expected_generation {
529 let actual = self
530 .tool_state()
531 .map_err(|err| {
532 RuntimeError::new("session_command_tool_state", err.to_string())
533 })?
534 .generation();
535 if actual != expected {
536 return Err(RuntimeError::new(
537 "session_command_generation_mismatch",
538 format!(
539 "expected tool generation {expected}, but live generation is {actual}"
540 ),
541 ));
542 }
543 }
544 self.refresh_session_tool_surface().await.map_err(|err| {
545 RuntimeError::new("session_command_refresh_tools", err.to_string())
546 })?;
547 crate::store::GraphCommitDelta::Unchanged {
548 leaf_node_id: self.state.session_graph.leaf_node_id.clone(),
549 }
550 }
551 crate::SessionCommand::ResetSession { .. } => {
552 let mut state = crate::RuntimeSessionState {
553 session_id: self.state.session_id.clone(),
554 policy: self.policy.clone(),
555 graph_replace_required: true,
556 ..crate::RuntimeSessionState::default()
557 };
558 state.ensure_agent_frame_initialized();
559 self.set_persisted_state(state)
560 .map_err(|err| RuntimeError::new("session_command_reset", err.to_string()))?;
561 crate::store::GraphCommitDelta::ReplaceFull(self.state.session_graph.clone())
562 }
563 };
564 let Some(store) = self
565 .session
566 .as_ref()
567 .and_then(|session| session.history_store())
568 else {
569 return Ok(());
570 };
571 let mut commit =
572 crate::store::RuntimeCommit::persisted_state_with_graph_commit(&self.state, graph, &[]);
573 if let Some(completion) = completion {
574 commit = commit.completing_queue_claim(completion);
575 }
576 let result = store.commit_runtime_state(commit).await.map_err(|err| {
577 RuntimeError::new(RuntimeErrorCode::StoreCommitFailed, err.to_string())
578 })?;
579 self.state.apply_persisted_commit_result(result);
580 Ok(())
581 }
582}
583
584pub(in crate::runtime) fn queued_turn_input_store_required() -> RuntimeError {
585 RuntimeError::new(
586 RuntimeErrorCode::StoreCommitFailed,
587 "queued turn input requires a persistent runtime store",
588 )
589}