use super::*;
impl LashRuntime {
pub fn session_id(&self) -> &str {
&self.state.session_id
}
pub(super) fn stamp_live_plugin_state(&mut self) {
if let Some(session) = self.session.as_ref() {
let snapshot = session.plugins().tool_registry().export_state();
self.state.tool_state_generation = Some(snapshot.generation());
self.state.tool_state_snapshot = Some(snapshot);
self.state.plugin_snapshot = session.plugins().snapshot().ok();
self.state.plugin_snapshot_revision =
Some(session.plugins().snapshot_revision_fingerprint());
} else {
self.state.tool_state_generation = None;
self.state.tool_state_snapshot = None;
self.state.plugin_snapshot = None;
self.state.plugin_snapshot_revision = None;
}
}
pub(super) fn active_tool_catalog(&self) -> Vec<serde_json::Value> {
self.active_tool_catalog_shared().as_ref().clone()
}
pub(super) fn active_tool_catalog_shared(&self) -> Arc<Vec<serde_json::Value>> {
self.session
.as_ref()
.map(|session| {
session
.shared_tool_catalog(&self.state.session_id, self.policy.execution_mode.clone())
})
.unwrap_or_else(|| Arc::new(Vec::new()))
}
pub fn tool_state(&self) -> Result<crate::ToolState, SessionError> {
let Some(session) = self.session.as_ref() else {
return Err(SessionError::Protocol(
"runtime session not available".to_string(),
));
};
Ok(session.plugins().tool_registry().export_state())
}
pub fn set_mode_turn_options(&mut self, options: crate::ModeTurnOptions) {
self.state.mode_turn_options = options.clone();
self.mode_turn_options = options;
}
pub fn export_state(&self) -> SessionStateEnvelope {
self.state.export_state()
}
pub fn read_view(&self) -> crate::SessionReadView {
crate::SessionReadView::from_runtime_state(
&self.state,
self.policy.clone(),
self.mode_turn_options.clone(),
)
}
pub fn export_persistence_state(&self) -> PersistedSessionState {
self.state.clone()
}
pub fn apply_persistence_state(&mut self, state: PersistedSessionState) {
self.set_persisted_state(state);
}
pub(crate) fn export_graph_first_state(&self) -> PersistedSessionState {
self.state.clone()
}
pub fn export_persisted_state(&self) -> PersistedSessionState {
let mut state = self.state.clone();
state.mode_turn_options = self.mode_turn_options.clone();
if let Some(session) = self.session.as_ref() {
let snapshot = session.plugins().tool_registry().export_state();
state.tool_state_generation = Some(snapshot.generation());
state.tool_state_snapshot = Some(snapshot);
state.plugin_snapshot = session.plugins().snapshot().ok();
state.plugin_snapshot_revision =
Some(session.plugins().snapshot_revision_fingerprint());
}
normalize_session_graph(&mut state);
state
}
pub fn usage_report(&self) -> SessionUsageReport {
let mut entries = self.state.token_ledger.clone();
let drained = self.shared_token_ledger.lock().expect("token ledger lock");
for entry in drained.iter().cloned() {
merge_ledger_entry(&mut entries, entry);
}
SessionUsageReport::from_entries(&entries)
}
pub async fn await_background_work(&mut self) -> Result<(), SessionError> {
let manager = self
.runtime_session_manager()
.map_err(|err| SessionError::Protocol(err.to_string()))?;
manager
.await_hidden_tasks(&self.state.session_id)
.await
.map_err(|err| SessionError::Protocol(format!("background task failed: {err}")))?;
if self.background_sync_needed.swap(false, Ordering::AcqRel) {
self.refresh_session_graph_from_store().await;
}
self.refresh_session_tool_surface().await?;
Ok(())
}
pub(super) async fn refresh_session_graph_from_store(&mut self) {
let Some(store) = self
.session
.as_ref()
.and_then(|session| session.history_store())
else {
return;
};
let read = match store
.load_session(crate::store::SessionReadScope::FullGraph)
.await
{
Ok(Some(read)) => read,
Ok(None) => return,
Err(err) => {
tracing::warn!("failed to refresh session graph from store: {err}");
return;
}
};
let has_newer_graph = self.state.head_revision != Some(read.head_revision)
|| read.graph.leaf_node_id != self.state.session_graph.leaf_node_id
|| read.checkpoint_ref != self.state.checkpoint_ref;
if !has_newer_graph {
return;
}
let head = crate::store::SessionHead {
session_id: read.session_id.clone(),
head_revision: read.head_revision,
graph: read.graph,
config: read.config.clone(),
checkpoint_ref: read.checkpoint_ref.clone(),
token_ledger: merge_usage_delta_entries(read.token_ledger),
};
apply_session_head(&mut self.state, &head);
apply_session_checkpoint(&mut self.state, read.checkpoint);
}
pub(super) fn runtime_session_manager(
&self,
) -> Result<Arc<dyn RuntimeSessionHost>, PluginActionInvokeError> {
Ok(Arc::new(RuntimeSessionManager::new(self, true, None)?))
}
pub(super) fn runtime_session_manager_for_turn(
&self,
child_usage_event_relay: Option<ChildUsageEventRelay>,
) -> Result<Arc<dyn RuntimeSessionHost>, PluginActionInvokeError> {
Ok(Arc::new(RuntimeSessionManager::new(
self,
false,
child_usage_event_relay,
)?))
}
pub fn session_manager(&self) -> Result<Arc<dyn RuntimeSessionHost>, PluginActionInvokeError> {
self.runtime_session_manager()
}
pub fn plugin_session(&self) -> Option<Arc<crate::PluginSession>> {
self.session.as_ref().map(|s| Arc::clone(s.plugins()))
}
pub fn turn_input_injection_bridge(
&self,
) -> Result<crate::TurnInputInjectionBridge, SessionError> {
let Some(session) = self.session.as_ref() else {
return Err(SessionError::Protocol(
"runtime session not available".to_string(),
));
};
Ok(session.turn_input_injection_bridge().clone())
}
pub async fn rewrite_history(
&mut self,
trigger: crate::RewriteTrigger,
) -> Result<bool, PluginActionInvokeError> {
let manager = self.runtime_session_manager()?;
let Some(plugin_session) = self.session.as_ref().map(|s| Arc::clone(s.plugins())) else {
return Err(PluginActionInvokeError::Unknown(
"runtime session not available".to_string(),
));
};
let ctx = crate::RewriteContext {
session_id: self.state.session_id.clone(),
trigger,
state: self.read_view(),
host: manager,
};
let input = crate::HistoryState::from_state(&self.state.export_state());
let baseline_messages = input.messages.len();
let outcome = plugin_session
.rewrite_history(&ctx, input)
.await
.map_err(|err| {
PluginActionInvokeError::Unknown(format!("rewrite_history failed: {err}"))
})?;
let mutated =
outcome.metadata.produced_summary || outcome.messages.len() != baseline_messages;
if mutated {
self.state
.replace_active_read_state(&outcome.messages, &outcome.tool_calls);
if let Some(session) = self.session.as_ref() {
self.state.tool_state_snapshot =
Some(session.plugins().tool_registry().export_state());
self.state.plugin_snapshot = session.plugins().snapshot().ok();
self.state.plugin_snapshot_revision =
Some(session.plugins().snapshot_revision_fingerprint());
}
}
Ok(mutated)
}
pub(super) fn session_policy(&self) -> SessionPolicy {
self.policy.clone()
}
pub(super) async fn notify_session_config_changed(&self, previous: SessionPolicy) {
let Some(session) = self.session.as_ref() else {
return;
};
let current = self.session_policy();
if current == previous {
return;
}
let Ok(host) = self.runtime_session_manager() else {
return;
};
session
.plugins()
.emit_runtime_event(crate::PluginRuntimeEvent::SessionConfigChanged(Box::new(
SessionConfigChangedContext {
session_id: self.state.session_id.clone(),
previous,
current,
host,
},
)))
.await;
}
pub(super) async fn apply_session_config_mutations(&mut self, previous: SessionPolicy) {
let Some(session) = self.session.as_ref() else {
return;
};
let current = self.session_policy();
if current == previous {
return;
}
let Ok(host) = self.runtime_session_manager() else {
return;
};
self.policy = session
.plugins()
.mutate_session_config(
SessionConfigChangedContext {
session_id: self.state.session_id.clone(),
previous,
current,
host,
},
self.policy.clone(),
)
.await;
self.state.policy = self.policy.clone();
}
}