use std::sync::Arc;
use arc_swap::ArcSwap;
use tokio::sync::Mutex;
use super::{LashRuntime, ProcessHandleGrantEntry, ProcessHandleSummary, ProcessRegistry};
#[derive(Clone)]
pub struct RuntimeObservation {
pub session_id: Arc<str>,
pub policy: crate::SessionPolicy,
pub read_view: crate::SessionReadView,
pub persisted_state: super::RuntimeSessionState,
pub usage_report: super::SessionUsageReport,
pub tool_state: Option<crate::ToolState>,
pub tool_catalog: Arc<Vec<serde_json::Value>>,
pub tool_catalog_error: Option<String>,
pub process_registry: Option<Arc<dyn ProcessRegistry>>,
pub queue_store: Option<Arc<dyn crate::RuntimePersistence>>,
pub queued_work_poke: Option<super::QueuedWorkPoke>,
}
impl RuntimeObservation {
fn from_runtime(runtime: &LashRuntime, previous: Option<&RuntimeObservation>) -> Self {
let (tool_catalog, tool_catalog_error) = match runtime.active_tool_catalog_shared() {
Ok(catalog) => (catalog, None),
Err(err) => (Arc::new(Vec::new()), Some(err.to_string())),
};
let tool_state_generation = runtime
.session
.as_ref()
.map(|session| session.plugins().tool_registry().generation());
let tool_state = match (
tool_state_generation,
previous.and_then(|observation| observation.tool_state.as_ref()),
) {
(Some(generation), Some(snapshot)) if snapshot.generation() == generation => {
Some(snapshot.clone())
}
(Some(_), _) => match runtime.tool_state() {
Ok(state) => Some(state),
Err(err) => {
tracing::warn!(
session_id = %runtime.session_id(),
error = %err,
"failed to capture tool state for observation; omitting the snapshot",
);
None
}
},
(None, _) => None,
};
Self {
session_id: Arc::from(runtime.session_id()),
policy: runtime.read_view().policy().clone(),
read_view: runtime.read_view(),
persisted_state: runtime.export_persisted_state(),
usage_report: runtime.usage_report(),
tool_state,
tool_catalog,
tool_catalog_error,
process_registry: runtime.host.process_registry.clone(),
queue_store: runtime
.session
.as_ref()
.and_then(|session| session.history_store()),
queued_work_poke: runtime.host.queued_work_poke.clone(),
}
}
pub fn session_id(&self) -> &str {
&self.session_id
}
pub fn process_scope(&self) -> crate::ProcessScope {
crate::ProcessScope::new(self.session_id.as_ref())
}
pub fn process_scope_id(&self) -> crate::ProcessScopeId {
self.process_scope().id()
}
pub async fn list_process_handles(&self) -> Vec<ProcessHandleSummary> {
let Some(executor) = self.process_registry.as_ref() else {
return Vec::new();
};
self.list_process_handles_with_mode(executor, crate::ProcessListMode::Live)
.await
}
pub async fn list_all_process_handles(&self) -> Vec<ProcessHandleSummary> {
let Some(executor) = self.process_registry.as_ref() else {
return Vec::new();
};
self.list_process_handles_with_mode(executor, crate::ProcessListMode::All)
.await
}
async fn list_process_handles_with_mode(
&self,
executor: &Arc<dyn crate::ProcessRegistry>,
mode: crate::ProcessListMode,
) -> Vec<ProcessHandleSummary> {
let root_scope = self.process_scope();
let mut entries = list_scope_process_handles(executor, &root_scope, mode).await;
let agent_frame_id = self.persisted_state.current_agent_frame_id.as_str();
if !agent_frame_id.is_empty() {
let frame_scope =
crate::ProcessScope::for_agent_frame(self.session_id.as_ref(), agent_frame_id);
if frame_scope.id() != root_scope.id() {
entries.extend(list_scope_process_handles(executor, &frame_scope, mode).await);
entries.sort_by(|(left, _), (right, _)| left.process_id.cmp(&right.process_id));
entries.dedup_by(|(left, _), (right, _)| left.process_id == right.process_id);
}
}
entries
.into_iter()
.map(ProcessHandleSummary::from)
.collect()
}
}
async fn list_scope_process_handles(
executor: &Arc<dyn crate::ProcessRegistry>,
scope: &crate::ProcessScope,
mode: crate::ProcessListMode,
) -> Vec<ProcessHandleGrantEntry> {
match mode {
crate::ProcessListMode::Live => executor.list_live_handle_grants(scope).await,
crate::ProcessListMode::All => executor.list_handle_grants(scope).await,
}
.unwrap_or_default()
}
#[derive(Clone)]
pub struct RuntimeHandle {
pub(in crate::runtime) runtime: Arc<Mutex<LashRuntime>>,
observation: Arc<ArcSwap<RuntimeObservation>>,
}
impl RuntimeHandle {
pub fn new(runtime: LashRuntime) -> Self {
let observation = RuntimeObservation::from_runtime(&runtime, None);
Self {
runtime: Arc::new(Mutex::new(runtime)),
observation: Arc::new(ArcSwap::from_pointee(observation)),
}
}
pub fn writer(&self) -> Arc<Mutex<LashRuntime>> {
Arc::clone(&self.runtime)
}
pub fn observe(&self) -> Arc<RuntimeObservation> {
self.observation.load_full()
}
pub fn publish_from(&self, runtime: &LashRuntime) {
let previous = self.observation.load_full();
self.observation
.store(Arc::new(RuntimeObservation::from_runtime(
runtime,
Some(previous.as_ref()),
)));
}
pub async fn enqueue_turn_input(
&self,
input: crate::TurnInput,
delivery_policy: crate::DeliveryPolicy,
slot_policy: crate::SlotPolicy,
source_key: Option<String>,
) -> Result<crate::QueuedWorkBatch, crate::RuntimeError> {
let observation = self.observe();
let store = observation
.queue_store
.clone()
.ok_or_else(super::session_api::queued_turn_input_store_required)?;
super::session_api::enqueue_turn_input_to_store(
observation.session_id.as_ref().to_string(),
store,
observation.queued_work_poke.clone(),
input,
delivery_policy,
slot_policy,
source_key,
)
.await
}
pub async fn cancel_queued_work_batch(
&self,
session_id: &str,
batch_id: &str,
) -> Result<Option<crate::QueuedWorkBatch>, crate::RuntimeError> {
let observation = self.observe();
let store = observation
.queue_store
.clone()
.ok_or_else(super::session_api::queued_turn_input_store_required)?;
store
.cancel_queued_work_batch(session_id, batch_id)
.await
.map_err(|err| {
crate::RuntimeError::new(
crate::RuntimeErrorCode::StoreCommitFailed,
err.to_string(),
)
})
}
pub fn try_into_runtime(self) -> Result<LashRuntime, Self> {
match Arc::try_unwrap(self.runtime) {
Ok(mutex) => Ok(mutex.into_inner()),
Err(runtime) => Err(Self {
runtime,
observation: self.observation,
}),
}
}
}