1use super::*;
2
3impl LashRuntime {
4 pub fn session_id(&self) -> &str {
5 &self.state.session_id
6 }
7
8 pub(super) fn stamp_live_plugin_state(&mut self) {
9 if let Some(session) = self.session.as_ref() {
10 let snapshot = session.plugins().tool_registry().export_state();
11 self.state.tool_state_generation = Some(snapshot.generation());
12 self.state.tool_state_snapshot = Some(snapshot);
13 let captured = session.plugins().snapshot();
14 crate::runtime::state::store_plugin_snapshot(&mut self.state.plugin_snapshot, captured);
15 self.state.plugin_snapshot_revision =
16 Some(session.plugins().snapshot_revision_fingerprint());
17 } else {
18 self.state.tool_state_generation = None;
19 self.state.tool_state_snapshot = None;
20 self.state.plugin_snapshot = None;
21 self.state.plugin_snapshot_revision = None;
22 }
23 }
24 pub(super) fn active_tool_catalog_shared(
25 &self,
26 ) -> Result<Arc<Vec<serde_json::Value>>, crate::PluginError> {
27 self.session
28 .as_ref()
29 .map(|session| session.shared_tool_catalog(&self.state.session_id))
30 .unwrap_or_else(|| Ok(Arc::new(Vec::new())))
31 }
32
33 pub fn tool_state(&self) -> Result<crate::ToolState, SessionError> {
34 let Some(session) = self.session.as_ref() else {
35 return Err(SessionError::Protocol(
36 "runtime session not available".to_string(),
37 ));
38 };
39 Ok(session.plugins().tool_registry().export_state())
40 }
41 pub fn set_protocol_turn_options(&mut self, options: crate::ProtocolTurnOptions) {
43 self.state.protocol_turn_options = options.clone();
44 if let Some(frame) = self.state.current_agent_frame_mut() {
45 frame.protocol_turn_options = options.clone();
46 }
47 self.protocol_turn_options = options;
48 }
49
50 pub fn export_state(&self) -> crate::SessionSnapshot {
54 self.state.to_snapshot()
55 }
56
57 pub fn read_view(&self) -> crate::SessionReadView {
58 crate::SessionReadView::from_runtime_state(
59 &self.state,
60 self.state.effective_policy().clone(),
61 self.state.effective_protocol_turn_options().clone(),
62 )
63 }
64
65 pub fn export_persistence_state(&self) -> RuntimeSessionState {
67 self.state.clone()
68 }
69
70 pub fn apply_persistence_state(
71 &mut self,
72 state: RuntimeSessionState,
73 ) -> Result<(), SessionError> {
74 self.set_persisted_state(state)
75 }
76
77 pub(crate) fn export_graph_first_state(&self) -> RuntimeSessionState {
78 self.state.clone()
79 }
80
81 pub fn export_persisted_state(&self) -> RuntimeSessionState {
84 let mut state = self.state.clone();
85 state.protocol_turn_options = self.protocol_turn_options.clone();
86 if let Some(frame) = state.current_agent_frame_mut() {
87 frame.protocol_turn_options = self.protocol_turn_options.clone();
88 }
89 if let Some(session) = self.session.as_ref() {
90 let snapshot = session.plugins().tool_registry().export_state();
91 state.tool_state_generation = Some(snapshot.generation());
92 state.tool_state_snapshot = Some(snapshot);
93 let captured = session.plugins().snapshot();
94 crate::runtime::state::store_plugin_snapshot(&mut state.plugin_snapshot, captured);
95 state.plugin_snapshot_revision =
96 Some(session.plugins().snapshot_revision_fingerprint());
97 }
98 normalize_session_graph(&mut state);
99 state
100 }
101
102 pub fn usage_report(&self) -> SessionUsageReport {
103 let mut entries = self.state.token_ledger.clone();
104 let drained = self.shared_token_ledger.lock().expect("token ledger lock");
105 for entry in drained.iter().cloned() {
106 merge_ledger_entry(&mut entries, entry);
107 }
108 SessionUsageReport::from_entries(&entries)
109 }
110
111 pub async fn await_background_work(&mut self) -> Result<(), SessionError> {
112 if self.process_sync_needed.swap(false, Ordering::AcqRel) {
113 self.refresh_session_graph_from_store().await?;
114 }
115 Ok(())
116 }
117
118 pub(super) async fn refresh_session_graph_from_store(&mut self) -> Result<(), SessionError> {
119 if self.state.graph_replace_required && self.state.head_revision.is_none() {
123 return Ok(());
124 }
125 let Some(store) = self
126 .session
127 .as_ref()
128 .and_then(|session| session.history_store())
129 else {
130 return Ok(());
131 };
132 let scope = match self.residency {
133 crate::Residency::KeepAll => crate::store::SessionReadScope::FullGraph,
134 crate::Residency::ActivePathOnly => crate::store::SessionReadScope::ActivePath {
135 leaf_node_id: self.state.session_graph.leaf_node_id.clone(),
136 },
137 };
138 let Some(read) = store.load_session(scope).await.map_err(|err| {
139 SessionError::Protocol(format!("failed to refresh session graph from store: {err}"))
140 })?
141 else {
142 return Ok(());
143 };
144 let has_newer_graph = self.state.head_revision != Some(read.head_revision)
145 || read.graph.leaf_node_id != self.state.session_graph.leaf_node_id
146 || read.checkpoint_ref != self.state.checkpoint_ref;
147 if !has_newer_graph {
148 return Ok(());
149 }
150 let head = crate::store::SessionHead {
151 session_id: read.session_id.clone(),
152 head_revision: read.head_revision,
153 agent_frames: read.agent_frames.clone(),
154 current_agent_frame_id: read.current_agent_frame_id.clone(),
155 graph: read.graph,
156 config: read.config.clone(),
157 checkpoint_ref: read.checkpoint_ref.clone(),
158 token_ledger: merge_usage_delta_entries(read.token_ledger),
159 };
160 apply_session_head(&mut self.state, &head);
161 apply_session_checkpoint(&mut self.state, read.checkpoint);
162 self.policy = self.state.effective_policy().clone();
163 self.protocol_turn_options = self.state.effective_protocol_turn_options().clone();
164 Ok(())
165 }
166
167 pub(super) fn runtime_session_services(
168 &self,
169 ) -> Result<Arc<RuntimeSessionServices>, PluginActionInvokeError> {
170 Ok(Arc::new(RuntimeSessionServices::new(self, true, None)?))
171 }
172
173 pub(super) fn runtime_session_services_for_turn(
174 &self,
175 child_usage_event_relay: Option<ChildUsageEventRelay>,
176 ) -> Result<Arc<RuntimeSessionServices>, PluginActionInvokeError> {
177 Ok(Arc::new(RuntimeSessionServices::new(
178 self,
179 false,
180 child_usage_event_relay,
181 )?))
182 }
183
184 pub fn session_state_service(
185 &self,
186 ) -> Result<Arc<dyn crate::plugin::SessionStateService>, PluginActionInvokeError> {
187 self.runtime_session_services()
188 .map(|services| services.state_service())
189 }
190
191 pub fn session_lifecycle_service(
192 &self,
193 ) -> Result<Arc<dyn crate::plugin::SessionLifecycleService>, PluginActionInvokeError> {
194 self.runtime_session_services()
195 .map(|services| services.lifecycle_service())
196 }
197
198 pub fn session_graph_service(
199 &self,
200 ) -> Result<Arc<dyn crate::plugin::SessionGraphService>, PluginActionInvokeError> {
201 self.runtime_session_services()
202 .map(|services| services.graph_service())
203 }
204
205 pub fn process_service(
206 &self,
207 ) -> Result<Arc<dyn crate::ProcessService>, PluginActionInvokeError> {
208 self.runtime_session_services()
209 .map(|services| services.process_service())
210 }
211
212 pub fn process_cancel_ability(&self) -> Arc<dyn crate::ProcessCancelAbility> {
213 Arc::clone(&self.host.core.control.process_cancel_ability)
214 }
215
216 pub fn effect_host(&self) -> Arc<dyn crate::EffectHost> {
217 Arc::clone(&self.host.core.control.effect_host)
218 }
219
220 pub async fn enqueue_turn_input(
221 &self,
222 input: crate::TurnInput,
223 delivery_policy: crate::DeliveryPolicy,
224 slot_policy: crate::SlotPolicy,
225 source_key: Option<String>,
226 ) -> Result<crate::QueuedWorkBatch, RuntimeError> {
227 let store = self
228 .session
229 .as_ref()
230 .and_then(|session| session.history_store())
231 .ok_or_else(queued_turn_input_store_required)?;
232 enqueue_turn_input_to_store(
233 self.state.session_id.clone(),
234 store,
235 self.host.queued_work_poke.clone(),
236 input,
237 delivery_policy,
238 slot_policy,
239 source_key,
240 )
241 .await
242 }
243
244 pub async fn cancel_queued_work_batch(
245 &self,
246 session_id: &str,
247 batch_id: &str,
248 ) -> Result<Option<crate::QueuedWorkBatch>, RuntimeError> {
249 let store = self
250 .session
251 .as_ref()
252 .and_then(|session| session.history_store())
253 .ok_or_else(queued_turn_input_store_required)?;
254 store
255 .cancel_queued_work_batch(session_id, batch_id)
256 .await
257 .map_err(|err| RuntimeError::new(RuntimeErrorCode::StoreCommitFailed, err.to_string()))
258 }
259
260 pub fn plugin_session(&self) -> Option<Arc<crate::PluginSession>> {
262 self.session.as_ref().map(|s| Arc::clone(s.plugins()))
263 }
264
265 pub fn open_agent_frame(
266 &mut self,
267 request: crate::OpenAgentFrameRequest,
268 ) -> crate::OpenAgentFrameResult {
269 open_agent_frame_in_state(&mut self.state, request)
270 }
271
272 pub async fn compact_context(
275 &mut self,
276 instructions: Option<String>,
277 scoped_effect_controller: crate::ScopedEffectController<'_>,
278 ) -> Result<bool, PluginActionInvokeError> {
279 let services = self.runtime_session_services()?;
280 let Some(plugin_session) = self.session.as_ref().map(|s| Arc::clone(s.plugins())) else {
281 return Err(PluginActionInvokeError::Unknown(
282 "runtime session not available".to_string(),
283 ));
284 };
285 let ctx = crate::CompactionContext {
286 session_id: self.state.session_id.clone(),
287 state: self.read_view(),
288 instructions,
289 sessions: services.state_service(),
290 session_lifecycle: services.lifecycle_service(),
291 session_graph: services.graph_service(),
292 scoped_effect_controller,
293 };
294 let Some(compaction) = plugin_session.compact_context(&ctx).await.map_err(|err| {
295 PluginActionInvokeError::Unknown(format!("context compaction failed: {err}"))
296 })?
297 else {
298 return Ok(false);
299 };
300 let frame_id = format!(
301 "{}:frame:compaction:{}",
302 self.state.session_id,
303 uuid::Uuid::new_v4()
304 );
305 let result = self.open_agent_frame(
306 crate::OpenAgentFrameRequest::new(frame_id, crate::AgentFrameReason::compaction())
307 .with_initial_nodes(compaction.initial_nodes),
308 );
309 if result.opened {
310 self.stamp_live_plugin_state();
311 }
312 Ok(result.opened)
313 }
314
315 pub(super) fn session_policy(&self) -> SessionPolicy {
316 self.policy.clone()
317 }
318
319 pub(super) async fn notify_session_config_changed(&self, previous: SessionPolicy) {
320 let Some(session) = self.session.as_ref() else {
321 return;
322 };
323 let current = self.session_policy();
324 if current == previous {
325 return;
326 }
327 let Ok(services) = self.runtime_session_services() else {
328 return;
329 };
330 session
331 .plugins()
332 .emit_runtime_event(crate::PluginLifecycleEvent::SessionConfigChanged(Box::new(
333 SessionConfigChangedContext {
334 session_id: self.state.session_id.clone(),
335 previous,
336 current,
337 sessions: services.state_service(),
338 },
339 )))
340 .await;
341 }
342
343 pub(super) async fn apply_session_config_mutations(&mut self, previous: SessionPolicy) {
344 let Some(session) = self.session.as_ref() else {
345 return;
346 };
347 let current = self.session_policy();
348 if current == previous {
349 return;
350 }
351 let Ok(services) = self.runtime_session_services() else {
352 return;
353 };
354 self.policy = session
355 .plugins()
356 .mutate_session_config(
357 SessionConfigChangedContext {
358 session_id: self.state.session_id.clone(),
359 previous,
360 current,
361 sessions: services.state_service(),
362 },
363 self.policy.clone(),
364 )
365 .await;
366 self.state.policy = self.policy.clone();
367 }
368}
369
370pub(in crate::runtime) async fn enqueue_turn_input_to_store(
371 session_id: String,
372 store: Arc<dyn crate::RuntimePersistence>,
373 queued_work_poke: Option<crate::QueuedWorkPoke>,
374 input: crate::TurnInput,
375 delivery_policy: crate::DeliveryPolicy,
376 slot_policy: crate::SlotPolicy,
377 source_key: Option<String>,
378) -> Result<crate::QueuedWorkBatch, RuntimeError> {
379 super::turn_loop::ensure_durable_effect_input(&input)?;
380 let mut draft = crate::QueuedWorkBatchDraft::new(
381 session_id,
382 delivery_policy,
383 slot_policy,
384 vec![crate::QueuedWorkPayload::turn_input(input)],
385 );
386 draft.source_key = source_key;
387 let enqueued = store
388 .enqueue_queued_work(draft)
389 .await
390 .map_err(|err| RuntimeError::new(RuntimeErrorCode::StoreCommitFailed, err.to_string()))?;
391 if let Some(poke) = queued_work_poke.as_ref() {
392 poke.poke_session(enqueued.session_id.clone(), "queued_turn_input");
393 }
394 Ok(enqueued)
395}
396
397impl LashRuntime {
398 pub async fn submit_session_command(
399 &mut self,
400 command: crate::SessionCommand,
401 idempotency_key: impl Into<String>,
402 ) -> Result<crate::SessionCommandReceipt, RuntimeError> {
403 let idempotency_key = idempotency_key.into();
404 if idempotency_key.trim().is_empty() {
405 return Err(RuntimeError::new(
406 RuntimeErrorCode::Other("session_command_idempotency_key".to_string()),
407 "session command idempotency key cannot be empty",
408 ));
409 }
410 let source_key = command.source_key(&idempotency_key);
411 let session_id = self.state.session_id.clone();
412 let Some(store) = self
413 .session
414 .as_ref()
415 .and_then(|session| session.history_store())
416 else {
417 let batch_id = format!("inline-command:{}", uuid::Uuid::new_v4());
418 self.apply_session_command(command, None).await?;
419 return Ok(crate::SessionCommandReceipt {
420 session_id,
421 batch_id,
422 source_key,
423 });
424 };
425 let draft = crate::QueuedWorkBatchDraft::new(
426 session_id.clone(),
427 crate::DeliveryPolicy::AfterCurrentTurnCommit,
428 crate::SlotPolicy::Exclusive,
429 vec![crate::QueuedWorkPayload::session_command(command)],
430 )
431 .with_source_key(source_key.clone());
432 let enqueued = store.enqueue_queued_work(draft).await.map_err(|err| {
433 RuntimeError::new(RuntimeErrorCode::StoreCommitFailed, err.to_string())
434 })?;
435 if let Some(poke) = self.host.queued_work_poke.as_ref() {
436 poke.poke_session(session_id.clone(), "session_command");
437 }
438 Ok(crate::SessionCommandReceipt {
439 session_id,
440 batch_id: enqueued.batch_id,
441 source_key,
442 })
443 }
444
445 pub async fn drain_next_session_command(
446 &mut self,
447 ) -> Result<Option<crate::SessionCommandReceipt>, RuntimeError> {
448 let Some(store) = self
449 .session
450 .as_ref()
451 .and_then(|session| session.history_store())
452 else {
453 return Ok(None);
454 };
455 let claim = store
456 .claim_ready_queued_work(
457 &self.state.session_id,
458 &self.runtime_scope_id,
459 crate::QueuedWorkClaimBoundary::Idle,
460 crate::QUEUED_WORK_CLAIM_TTL_MS,
461 1,
462 )
463 .await
464 .map_err(|err| {
465 RuntimeError::new(RuntimeErrorCode::StoreCommitFailed, err.to_string())
466 })?;
467 let Some(claim) = claim else {
468 return Ok(None);
469 };
470 let Some((batch, command)) = claim.exclusive_session_command() else {
471 store
472 .abandon_queued_work_claim(&claim)
473 .await
474 .map_err(|err| {
475 RuntimeError::new(RuntimeErrorCode::StoreCommitFailed, err.to_string())
476 })?;
477 return Ok(None);
478 };
479 let batch_id = batch.batch_id.clone();
480 let source_key = batch.source_key.clone().unwrap_or_else(|| batch_id.clone());
481 let command = command.clone();
482 self.apply_session_command(command, Some(claim.completion()))
483 .await?;
484 Ok(Some(crate::SessionCommandReceipt {
485 session_id: self.state.session_id.clone(),
486 batch_id,
487 source_key,
488 }))
489 }
490
491 async fn apply_session_command(
492 &mut self,
493 command: crate::SessionCommand,
494 completion: Option<crate::QueuedWorkCompletion>,
495 ) -> Result<(), RuntimeError> {
496 self.refresh_session_graph_from_store()
497 .await
498 .map_err(|err| RuntimeError::new("session_command_refresh", err.to_string()))?;
499 let graph = match command {
500 crate::SessionCommand::RefreshToolSurface {
501 expected_generation,
502 ..
503 } => {
504 if let Some(expected) = expected_generation {
505 let actual = self
506 .tool_state()
507 .map_err(|err| {
508 RuntimeError::new("session_command_tool_state", err.to_string())
509 })?
510 .generation();
511 if actual != expected {
512 return Err(RuntimeError::new(
513 "session_command_generation_mismatch",
514 format!(
515 "expected tool generation {expected}, but live generation is {actual}"
516 ),
517 ));
518 }
519 }
520 self.refresh_session_tool_surface().await.map_err(|err| {
521 RuntimeError::new("session_command_refresh_tools", err.to_string())
522 })?;
523 crate::store::GraphCommitDelta::Unchanged {
524 leaf_node_id: self.state.session_graph.leaf_node_id.clone(),
525 }
526 }
527 crate::SessionCommand::ResetSession { .. } => {
528 let mut state = crate::RuntimeSessionState {
529 session_id: self.state.session_id.clone(),
530 policy: self.policy.clone(),
531 graph_replace_required: true,
532 ..crate::RuntimeSessionState::default()
533 };
534 state.ensure_agent_frame_initialized();
535 self.set_persisted_state(state)
536 .map_err(|err| RuntimeError::new("session_command_reset", err.to_string()))?;
537 crate::store::GraphCommitDelta::ReplaceFull(self.state.session_graph.clone())
538 }
539 };
540 let Some(store) = self
541 .session
542 .as_ref()
543 .and_then(|session| session.history_store())
544 else {
545 return Ok(());
546 };
547 let mut commit =
548 crate::store::RuntimeCommit::persisted_state_with_graph_commit(&self.state, graph, &[]);
549 if let Some(completion) = completion {
550 commit = commit.completing_queue_claim(completion);
551 }
552 let result = store.commit_runtime_state(commit).await.map_err(|err| {
553 RuntimeError::new(RuntimeErrorCode::StoreCommitFailed, err.to_string())
554 })?;
555 self.state.apply_persisted_commit_result(result);
556 Ok(())
557 }
558}
559
560pub(in crate::runtime) fn queued_turn_input_store_required() -> RuntimeError {
561 RuntimeError::new(
562 RuntimeErrorCode::StoreCommitFailed,
563 "queued turn input requires a persistent runtime store",
564 )
565}