use super::*;
fn rewrite_history_scope_id(
session_id: &str,
turn_index: usize,
trigger: &crate::RewriteTrigger,
) -> String {
let trigger_id = match trigger {
crate::RewriteTrigger::Manual { .. } => "manual",
crate::RewriteTrigger::WindowShrink { .. } => "window-shrink",
crate::RewriteTrigger::Periodic => "periodic",
};
format!("{session_id}:rewrite-history:{trigger_id}:{turn_index}")
}
impl LashRuntime {
pub fn session_id(&self) -> &str {
&self.state.session_id
}
pub(super) fn stamp_live_plugin_state(&mut self) {
if let Some(session) = self.session.as_ref() {
let snapshot = session.plugins().tool_registry().export_state();
self.state.tool_state_generation = Some(snapshot.generation());
self.state.tool_state_snapshot = Some(snapshot);
let captured = session.plugins().snapshot();
crate::runtime::state::store_plugin_snapshot(&mut self.state.plugin_snapshot, captured);
self.state.plugin_snapshot_revision =
Some(session.plugins().snapshot_revision_fingerprint());
} else {
self.state.tool_state_generation = None;
self.state.tool_state_snapshot = None;
self.state.plugin_snapshot = None;
self.state.plugin_snapshot_revision = None;
}
}
pub(super) fn active_tool_catalog_shared(
&self,
) -> Result<Arc<Vec<serde_json::Value>>, crate::PluginError> {
self.session
.as_ref()
.map(|session| session.shared_tool_catalog(&self.state.session_id))
.unwrap_or_else(|| Ok(Arc::new(Vec::new())))
}
pub fn tool_state(&self) -> Result<crate::ToolState, SessionError> {
let Some(session) = self.session.as_ref() else {
return Err(SessionError::Protocol(
"runtime session not available".to_string(),
));
};
Ok(session.plugins().tool_registry().export_state())
}
pub fn set_protocol_turn_options(&mut self, options: crate::ProtocolTurnOptions) {
self.state.protocol_turn_options = options.clone();
if let Some(frame) = self.state.current_agent_frame_mut() {
frame.protocol_turn_options = options.clone();
}
self.protocol_turn_options = options;
}
pub fn export_state(&self) -> crate::SessionSnapshot {
self.state.to_snapshot()
}
pub fn read_view(&self) -> crate::SessionReadView {
crate::SessionReadView::from_runtime_state(
&self.state,
self.state.effective_policy().clone(),
self.state.effective_protocol_turn_options().clone(),
)
}
pub fn export_persistence_state(&self) -> RuntimeSessionState {
self.state.clone()
}
pub fn apply_persistence_state(
&mut self,
state: RuntimeSessionState,
) -> Result<(), SessionError> {
self.set_persisted_state(state)
}
pub(crate) fn export_graph_first_state(&self) -> RuntimeSessionState {
self.state.clone()
}
pub fn export_persisted_state(&self) -> RuntimeSessionState {
let mut state = self.state.clone();
state.protocol_turn_options = self.protocol_turn_options.clone();
if let Some(frame) = state.current_agent_frame_mut() {
frame.protocol_turn_options = self.protocol_turn_options.clone();
}
if let Some(session) = self.session.as_ref() {
let snapshot = session.plugins().tool_registry().export_state();
state.tool_state_generation = Some(snapshot.generation());
state.tool_state_snapshot = Some(snapshot);
let captured = session.plugins().snapshot();
crate::runtime::state::store_plugin_snapshot(&mut state.plugin_snapshot, captured);
state.plugin_snapshot_revision =
Some(session.plugins().snapshot_revision_fingerprint());
}
normalize_session_graph(&mut state);
state
}
pub fn usage_report(&self) -> SessionUsageReport {
let mut entries = self.state.token_ledger.clone();
let drained = self.shared_token_ledger.lock().expect("token ledger lock");
for entry in drained.iter().cloned() {
merge_ledger_entry(&mut entries, entry);
}
SessionUsageReport::from_entries(&entries)
}
pub async fn await_background_work(&mut self) -> Result<(), SessionError> {
if self.process_sync_needed.swap(false, Ordering::AcqRel) {
self.refresh_session_graph_from_store().await?;
}
Ok(())
}
pub(super) async fn refresh_session_graph_from_store(&mut self) -> Result<(), SessionError> {
if self.state.graph_replace_required && self.state.head_revision.is_none() {
return Ok(());
}
let Some(store) = self
.session
.as_ref()
.and_then(|session| session.history_store())
else {
return Ok(());
};
let scope = match self.residency {
crate::Residency::KeepAll => crate::store::SessionReadScope::FullGraph,
crate::Residency::ActivePathOnly => crate::store::SessionReadScope::ActivePath {
leaf_node_id: self.state.session_graph.leaf_node_id.clone(),
},
};
let Some(read) = store.load_session(scope).await.map_err(|err| {
SessionError::Protocol(format!("failed to refresh session graph from store: {err}"))
})?
else {
return Ok(());
};
let has_newer_graph = self.state.head_revision != Some(read.head_revision)
|| read.graph.leaf_node_id != self.state.session_graph.leaf_node_id
|| read.checkpoint_ref != self.state.checkpoint_ref;
if !has_newer_graph {
return Ok(());
}
let head = crate::store::SessionHead {
session_id: read.session_id.clone(),
head_revision: read.head_revision,
agent_frames: read.agent_frames.clone(),
current_agent_frame_id: read.current_agent_frame_id.clone(),
graph: read.graph,
config: read.config.clone(),
checkpoint_ref: read.checkpoint_ref.clone(),
token_ledger: merge_usage_delta_entries(read.token_ledger),
};
apply_session_head(&mut self.state, &head);
apply_session_checkpoint(&mut self.state, read.checkpoint);
self.policy = self.state.effective_policy().clone();
self.protocol_turn_options = self.state.effective_protocol_turn_options().clone();
Ok(())
}
pub(super) fn runtime_session_services(
&self,
) -> Result<Arc<RuntimeSessionServices>, PluginActionInvokeError> {
Ok(Arc::new(RuntimeSessionServices::new(self, true, None)?))
}
pub(super) fn runtime_session_services_for_turn(
&self,
child_usage_event_relay: Option<ChildUsageEventRelay>,
) -> Result<Arc<RuntimeSessionServices>, PluginActionInvokeError> {
Ok(Arc::new(RuntimeSessionServices::new(
self,
false,
child_usage_event_relay,
)?))
}
pub fn session_state_service(
&self,
) -> Result<Arc<dyn crate::plugin::SessionStateService>, PluginActionInvokeError> {
self.runtime_session_services()
.map(|services| services.state_service())
}
pub fn session_lifecycle_service(
&self,
) -> Result<Arc<dyn crate::plugin::SessionLifecycleService>, PluginActionInvokeError> {
self.runtime_session_services()
.map(|services| services.lifecycle_service())
}
pub fn session_graph_service(
&self,
) -> Result<Arc<dyn crate::plugin::SessionGraphService>, PluginActionInvokeError> {
self.runtime_session_services()
.map(|services| services.graph_service())
}
pub fn process_service(
&self,
) -> Result<Arc<dyn crate::ProcessService>, PluginActionInvokeError> {
self.runtime_session_services()
.map(|services| services.process_service())
}
pub fn process_cancel_ability(&self) -> Arc<dyn crate::ProcessCancelAbility> {
Arc::clone(&self.host.core.control.process_cancel_ability)
}
pub fn effect_host(&self) -> Arc<dyn crate::EffectHost> {
Arc::clone(&self.host.core.control.effect_host)
}
pub async fn enqueue_turn_input(
&self,
input: crate::TurnInput,
delivery_policy: crate::DeliveryPolicy,
slot_policy: crate::SlotPolicy,
source_key: Option<String>,
) -> Result<crate::QueuedWorkBatch, RuntimeError> {
let store = self
.session
.as_ref()
.and_then(|session| session.history_store())
.ok_or_else(queued_turn_input_store_required)?;
enqueue_turn_input_to_store(
self.state.session_id.clone(),
store,
self.host.queued_work_poke.clone(),
input,
delivery_policy,
slot_policy,
source_key,
)
.await
}
pub async fn cancel_queued_work_batch(
&self,
session_id: &str,
batch_id: &str,
) -> Result<Option<crate::QueuedWorkBatch>, RuntimeError> {
let store = self
.session
.as_ref()
.and_then(|session| session.history_store())
.ok_or_else(queued_turn_input_store_required)?;
store
.cancel_queued_work_batch(session_id, batch_id)
.await
.map_err(|err| RuntimeError::new(RuntimeErrorCode::StoreCommitFailed, err.to_string()))
}
pub fn plugin_session(&self) -> Option<Arc<crate::PluginSession>> {
self.session.as_ref().map(|s| Arc::clone(s.plugins()))
}
pub async fn rewrite_history(
&mut self,
trigger: crate::RewriteTrigger,
) -> Result<bool, PluginActionInvokeError> {
let services = self.runtime_session_services()?;
let Some(plugin_session) = self.session.as_ref().map(|s| Arc::clone(s.plugins())) else {
return Err(PluginActionInvokeError::Unknown(
"runtime session not available".to_string(),
));
};
let ctx = crate::RewriteContext {
session_id: self.state.session_id.clone(),
trigger: trigger.clone(),
state: self.read_view(),
sessions: services.state_service(),
session_lifecycle: services.lifecycle_service(),
session_graph: services.graph_service(),
scoped_effect_controller: self
.host
.core
.control
.effect_host
.scoped(crate::EffectScope::runtime_operation(
rewrite_history_scope_id(
&self.state.session_id,
self.state.turn_index,
&trigger,
),
))
.map_err(|err| PluginActionInvokeError::Unknown(err.to_string()))?,
};
let input = crate::HistoryState::from_snapshot(&self.state.to_snapshot());
let baseline_messages = input.messages.len();
let outcome = plugin_session
.rewrite_history(&ctx, input)
.await
.map_err(|err| {
PluginActionInvokeError::Unknown(format!("rewrite_history failed: {err}"))
})?;
let mutated =
outcome.metadata.produced_summary || outcome.messages.len() != baseline_messages;
if mutated {
self.state
.replace_active_read_state(&outcome.messages, &outcome.tool_calls);
if let Some(session) = self.session.as_ref() {
self.state.tool_state_snapshot = Some(session.tool_registry().export_state());
let captured = session.plugins().snapshot();
crate::runtime::state::store_plugin_snapshot(
&mut self.state.plugin_snapshot,
captured,
);
self.state.plugin_snapshot_revision =
Some(session.plugins().snapshot_revision_fingerprint());
}
}
Ok(mutated)
}
pub(super) fn session_policy(&self) -> SessionPolicy {
self.policy.clone()
}
pub(super) async fn notify_session_config_changed(&self, previous: SessionPolicy) {
let Some(session) = self.session.as_ref() else {
return;
};
let current = self.session_policy();
if current == previous {
return;
}
let Ok(services) = self.runtime_session_services() else {
return;
};
session
.plugins()
.emit_runtime_event(crate::PluginLifecycleEvent::SessionConfigChanged(Box::new(
SessionConfigChangedContext {
session_id: self.state.session_id.clone(),
previous,
current,
sessions: services.state_service(),
},
)))
.await;
}
pub(super) async fn apply_session_config_mutations(&mut self, previous: SessionPolicy) {
let Some(session) = self.session.as_ref() else {
return;
};
let current = self.session_policy();
if current == previous {
return;
}
let Ok(services) = self.runtime_session_services() else {
return;
};
self.policy = session
.plugins()
.mutate_session_config(
SessionConfigChangedContext {
session_id: self.state.session_id.clone(),
previous,
current,
sessions: services.state_service(),
},
self.policy.clone(),
)
.await;
self.state.policy = self.policy.clone();
}
}
pub(in crate::runtime) async fn enqueue_turn_input_to_store(
session_id: String,
store: Arc<dyn crate::RuntimePersistence>,
queued_work_poke: Option<crate::QueuedWorkPoke>,
input: crate::TurnInput,
delivery_policy: crate::DeliveryPolicy,
slot_policy: crate::SlotPolicy,
source_key: Option<String>,
) -> Result<crate::QueuedWorkBatch, RuntimeError> {
super::turn_loop::ensure_durable_effect_input(&input)?;
let mut draft = crate::QueuedWorkBatchDraft::new(
session_id,
delivery_policy,
slot_policy,
vec![crate::QueuedWorkPayload::turn_input(input)],
);
draft.source_key = source_key;
let enqueued = store
.enqueue_queued_work(draft)
.await
.map_err(|err| RuntimeError::new(RuntimeErrorCode::StoreCommitFailed, err.to_string()))?;
if let Some(poke) = queued_work_poke.as_ref() {
poke.poke_session(enqueued.session_id.clone(), "queued_turn_input");
}
Ok(enqueued)
}
impl LashRuntime {
pub async fn submit_session_command(
&mut self,
command: crate::SessionCommand,
idempotency_key: impl Into<String>,
) -> Result<crate::SessionCommandReceipt, RuntimeError> {
let idempotency_key = idempotency_key.into();
if idempotency_key.trim().is_empty() {
return Err(RuntimeError::new(
RuntimeErrorCode::Other("session_command_idempotency_key".to_string()),
"session command idempotency key cannot be empty",
));
}
let source_key = command.source_key(&idempotency_key);
let session_id = self.state.session_id.clone();
let Some(store) = self
.session
.as_ref()
.and_then(|session| session.history_store())
else {
let batch_id = format!("inline-command:{}", uuid::Uuid::new_v4());
self.apply_session_command(command, None).await?;
return Ok(crate::SessionCommandReceipt {
session_id,
batch_id,
source_key,
});
};
let draft = crate::QueuedWorkBatchDraft::new(
session_id.clone(),
crate::DeliveryPolicy::AfterCurrentTurnCommit,
crate::SlotPolicy::Exclusive,
vec![crate::QueuedWorkPayload::session_command(command)],
)
.with_source_key(source_key.clone());
let enqueued = store.enqueue_queued_work(draft).await.map_err(|err| {
RuntimeError::new(RuntimeErrorCode::StoreCommitFailed, err.to_string())
})?;
if let Some(poke) = self.host.queued_work_poke.as_ref() {
poke.poke_session(session_id.clone(), "session_command");
}
Ok(crate::SessionCommandReceipt {
session_id,
batch_id: enqueued.batch_id,
source_key,
})
}
pub async fn drain_next_session_command(
&mut self,
) -> Result<Option<crate::SessionCommandReceipt>, RuntimeError> {
let Some(store) = self
.session
.as_ref()
.and_then(|session| session.history_store())
else {
return Ok(None);
};
let claim = store
.claim_ready_queued_work(
&self.state.session_id,
&self.runtime_scope_id,
crate::QueuedWorkClaimBoundary::Idle,
crate::QUEUED_WORK_CLAIM_TTL_MS,
1,
)
.await
.map_err(|err| {
RuntimeError::new(RuntimeErrorCode::StoreCommitFailed, err.to_string())
})?;
let Some(claim) = claim else {
return Ok(None);
};
let Some((batch, command)) = claim.exclusive_session_command() else {
store
.abandon_queued_work_claim(&claim)
.await
.map_err(|err| {
RuntimeError::new(RuntimeErrorCode::StoreCommitFailed, err.to_string())
})?;
return Ok(None);
};
let batch_id = batch.batch_id.clone();
let source_key = batch.source_key.clone().unwrap_or_else(|| batch_id.clone());
let command = command.clone();
self.apply_session_command(command, Some(claim.completion()))
.await?;
Ok(Some(crate::SessionCommandReceipt {
session_id: self.state.session_id.clone(),
batch_id,
source_key,
}))
}
async fn apply_session_command(
&mut self,
command: crate::SessionCommand,
completion: Option<crate::QueuedWorkCompletion>,
) -> Result<(), RuntimeError> {
self.refresh_session_graph_from_store()
.await
.map_err(|err| RuntimeError::new("session_command_refresh", err.to_string()))?;
let graph = match command {
crate::SessionCommand::RefreshToolSurface {
expected_generation,
..
} => {
if let Some(expected) = expected_generation {
let actual = self
.tool_state()
.map_err(|err| {
RuntimeError::new("session_command_tool_state", err.to_string())
})?
.generation();
if actual != expected {
return Err(RuntimeError::new(
"session_command_generation_mismatch",
format!(
"expected tool generation {expected}, but live generation is {actual}"
),
));
}
}
self.refresh_session_tool_surface().await.map_err(|err| {
RuntimeError::new("session_command_refresh_tools", err.to_string())
})?;
crate::store::GraphCommitDelta::Unchanged {
leaf_node_id: self.state.session_graph.leaf_node_id.clone(),
}
}
crate::SessionCommand::EmitHostEvent {
resource_type,
alias,
event,
payload,
} => {
let effect_host = Arc::clone(&self.host.core.control.effect_host);
let drain_id = completion
.as_ref()
.map(|completion| completion.claim_id.clone())
.unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
let scoped = effect_host
.scoped(crate::EffectScope::queue_drain(
&self.state.session_id,
drain_id,
))
.map_err(|err| {
RuntimeError::new("session_command_effect_scope", err.to_string())
})?;
let (_report, graph) = self
.emit_host_event_without_persist(
&resource_type,
&alias,
&event,
payload,
scoped,
)
.await
.map_err(|err| {
RuntimeError::new("session_command_host_event", err.to_string())
})?;
graph
}
crate::SessionCommand::ResetSession { .. } => {
let mut state = crate::RuntimeSessionState {
session_id: self.state.session_id.clone(),
policy: self.policy.clone(),
graph_replace_required: true,
..crate::RuntimeSessionState::default()
};
state.ensure_agent_frame_initialized();
self.set_persisted_state(state)
.map_err(|err| RuntimeError::new("session_command_reset", err.to_string()))?;
crate::store::GraphCommitDelta::ReplaceFull(self.state.session_graph.clone())
}
};
let Some(store) = self
.session
.as_ref()
.and_then(|session| session.history_store())
else {
return Ok(());
};
let mut commit =
crate::store::RuntimeCommit::persisted_state_with_graph_commit(&self.state, graph, &[]);
if let Some(completion) = completion {
commit = commit.completing_queue_claim(completion);
}
let result = store.commit_runtime_state(commit).await.map_err(|err| {
RuntimeError::new(RuntimeErrorCode::StoreCommitFailed, err.to_string())
})?;
self.state.apply_persisted_commit_result(result);
Ok(())
}
}
pub(in crate::runtime) fn queued_turn_input_store_required() -> RuntimeError {
RuntimeError::new(
RuntimeErrorCode::StoreCommitFailed,
"queued turn input requires a persistent runtime store",
)
}