use super::errors::{AgentOsResolveError, AgentOsRunError};
use super::launch::RunLaunchSpec;
use super::prepare::{
clear_tool_call_scope_state, request_has_user_input, run_lifecycle_running_patch,
run_scope_cleanup_patches, set_or_validate_parent_thread_id, ActiveRunCleanupGuard,
};
use super::types::{AgentOs, AgentStateStoreStateCommitter, PreparedRun, RunStream};
use super::ResolvedRun;
use crate::composition::AgentOsWiringError;
use crate::contracts::runtime::RunIdentity;
use crate::contracts::storage::{ThreadHead, ThreadStore, VersionPrecondition};
use crate::contracts::thread::{CheckpointReason, Message, Thread};
use crate::contracts::{RunContext, RunRequest};
use crate::runtime::loop_runner::{
run_loop_stream_with_context, AgentLoopError, RunCancellationToken,
};
use futures::StreamExt;
use std::sync::Arc;
impl AgentOs {
pub fn agent_state_store(&self) -> Option<&Arc<dyn ThreadStore>> {
self.agent_state_store.as_ref()
}
fn require_agent_state_store(&self) -> Result<&Arc<dyn ThreadStore>, AgentOsRunError> {
self.agent_state_store
.as_ref()
.ok_or(AgentOsRunError::AgentStateStoreNotConfigured)
}
fn generate_id() -> String {
uuid::Uuid::now_v7().simple().to_string()
}
pub async fn load_thread(&self, id: &str) -> Result<Option<ThreadHead>, AgentOsRunError> {
let agent_state_store = self.require_agent_state_store()?;
Ok(agent_state_store.load(id).await?)
}
pub async fn current_run_id_for_thread(
&self,
agent_id: &str,
thread_id: &str,
) -> Result<Option<String>, AgentOsRunError> {
if let Some(run_id) = self.active_run_id_for_thread(agent_id, thread_id).await {
return Ok(Some(run_id));
}
let store = self.require_agent_state_store()?;
let Some(record) = store.active_run_for_thread(thread_id).await? else {
return Ok(None);
};
if !record.agent_id.is_empty() && record.agent_id != agent_id {
return Ok(None);
}
Ok(Some(record.run_id))
}
async fn clear_suspended_calls_before_user_run_input(
&self,
run_request: &mut RunRequest,
) -> Result<(), AgentOsRunError> {
let Some(thread_id) = run_request.thread_id.as_deref() else {
return Ok(());
};
if !request_has_user_input(&run_request.messages) {
return Ok(());
}
let store = self.require_agent_state_store()?;
let Some(head) = store.load(thread_id).await? else {
return Ok(());
};
if let Some(cleaned) = clear_tool_call_scope_state(&head.thread.state) {
run_request.state = Some(cleaned);
}
Ok(())
}
pub(crate) async fn prepare_active_run_with_spec(
&self,
owner_agent_id: &str,
run_request: RunRequest,
resolved: ResolvedRun,
launch: RunLaunchSpec,
) -> Result<(PreparedRun, String, String), AgentOsRunError> {
let previous_run_id = if !run_request.messages.is_empty() {
if let Some(thread_id) = run_request.thread_id.as_deref() {
self.current_run_id_for_thread(owner_agent_id, thread_id)
.await?
} else {
None
}
} else {
None
};
let prepared = self
.prepare_run_with_spec(run_request, resolved, launch)
.await?;
let thread_id = prepared.thread_id().to_string();
let run_id = prepared.run_id().to_string();
if let Some(previous_run_id) = previous_run_id.filter(|candidate| candidate != &run_id) {
self.cancel_active_run_by_id(&previous_run_id).await;
}
self.register_thread_run_handle(
run_id.clone(),
owner_agent_id,
&thread_id,
RunCancellationToken::new(),
)
.await;
Ok((prepared, thread_id, run_id))
}
pub(crate) async fn start_prepared_active_run(
&self,
run_id: &str,
prepared: PreparedRun,
) -> Result<RunStream, AgentOsRunError> {
let token = self
.active_thread_run_by_run_id(run_id)
.await
.ok_or_else(|| {
AgentOsRunError::Loop(AgentLoopError::StateError(format!(
"active run handle missing for run '{run_id}'",
)))
})?
.cancellation_token();
let run = Self::execute_prepared(prepared.with_cancellation_token(token))?;
if !self
.bind_thread_run_decision_tx(run_id, run.decision_tx.clone())
.await
{
self.remove_thread_run_handle(run_id).await;
return Err(AgentOsRunError::Loop(AgentLoopError::StateError(format!(
"active run handle missing for run '{run_id}'",
))));
}
Ok(self.wrap_run_stream_with_active_handle_cleanup(run))
}
pub async fn start_active_run_with_spec(
&self,
owner_agent_id: &str,
run_request: RunRequest,
resolved: ResolvedRun,
launch: RunLaunchSpec,
) -> Result<RunStream, AgentOsRunError> {
let (prepared, _thread_id, run_id) = self
.prepare_active_run_with_spec(owner_agent_id, run_request, resolved, launch)
.await?;
self.start_prepared_active_run(&run_id, prepared).await
}
fn wrap_run_stream_with_active_handle_cleanup(&self, run: RunStream) -> RunStream {
let RunStream {
thread_id,
run_id,
decision_tx,
events,
} = run;
let run_id_for_cleanup = run_id.clone();
let registry = self.active_runs.clone();
let events = Box::pin(futures::stream::unfold(
(
events,
Some(ActiveRunCleanupGuard::new(run_id_for_cleanup, registry)),
),
|(mut inner, mut cleanup)| async move {
match inner.next().await {
Some(event) => Some((event, (inner, cleanup))),
None => {
if let Some(mut cleanup) = cleanup.take() {
cleanup.cleanup_now().await;
}
None
}
}
},
));
RunStream {
thread_id,
run_id,
decision_tx,
events,
}
}
pub async fn prepare_run(
&self,
request: RunRequest,
resolved: ResolvedRun,
) -> Result<PreparedRun, AgentOsRunError> {
self.prepare_run_with_spec(request, resolved, RunLaunchSpec::DURABLE)
.await
}
pub async fn prepare_run_with_spec(
&self,
mut request: RunRequest,
resolved: ResolvedRun,
launch: RunLaunchSpec,
) -> Result<PreparedRun, AgentOsRunError> {
if launch.strip_lineage() {
request.run_id = None;
request.parent_run_id = None;
request.parent_thread_id = None;
}
self.clear_suspended_calls_before_user_run_input(&mut request)
.await?;
self.prepare_run_with_persistence_flag(request, resolved, launch.persist_run_mapping())
.await
}
async fn prepare_run_with_persistence_flag(
&self,
mut request: RunRequest,
resolved: ResolvedRun,
persist_run: bool,
) -> Result<PreparedRun, AgentOsRunError> {
let agent_state_store = self.require_agent_state_store()?;
let thread_id = request.thread_id.unwrap_or_else(Self::generate_id);
let run_id = request.run_id.unwrap_or_else(Self::generate_id);
let parent_run_id = request.parent_run_id.clone();
let parent_thread_id = request.parent_thread_id.clone();
let initial_decisions = std::mem::take(&mut request.initial_decisions);
let frontend_state = request.state.take();
let mut state_snapshot_for_delta: Option<serde_json::Value> = None;
let (mut thread, mut version) = match agent_state_store.load(&thread_id).await? {
Some(head) => {
let mut t = head.thread;
if let Some(state) = frontend_state {
t.state = state.clone();
t.patches.clear();
state_snapshot_for_delta = Some(state);
}
(t, head.version)
}
None => {
let thread = if let Some(state) = frontend_state {
Thread::with_initial_state(thread_id.clone(), state)
} else {
Thread::new(thread_id.clone())
};
let committed = agent_state_store.create(&thread).await?;
(thread, committed.version)
}
};
let parent_thread_id_updated =
set_or_validate_parent_thread_id(&mut thread, &thread_id, parent_thread_id.as_deref())?;
if parent_thread_id_updated {
agent_state_store.save(&thread).await?;
let refreshed = agent_state_store.load(&thread_id).await?.ok_or_else(|| {
AgentOsRunError::Loop(AgentLoopError::StateError(format!(
"thread '{thread_id}' disappeared after parent_thread_id update",
)))
})?;
thread = refreshed.thread;
version = refreshed.version;
}
crate::runtime::context::trim_thread_to_latest_boundary(&mut thread);
if let Some(ref resource_id) = request.resource_id {
thread.resource_id = Some(resource_id.clone());
}
let mut deduped_messages = Self::dedup_messages(&thread, request.messages);
if !deduped_messages.is_empty() {
deduped_messages = Self::attach_run_metadata_to_messages(deduped_messages, &run_id);
thread = thread.with_messages(deduped_messages.clone());
}
let delta_messages: Vec<Arc<Message>> =
deduped_messages.into_iter().map(Arc::new).collect();
let mut delta_patches =
run_scope_cleanup_patches(&thread.state, &resolved.agent.state_scope_registry);
for cp in &delta_patches {
thread.state =
tirea_state::apply_patch(&thread.state, cp.patch()).map_err(|error| {
AgentOsRunError::Loop(AgentLoopError::StateError(format!(
"failed to apply run-scope cleanup patch for thread '{thread_id}': {error}"
)))
})?;
}
delta_patches.push(run_lifecycle_running_patch(&thread.state, &run_id)?);
let mut changeset = crate::contracts::ThreadChangeSet::from_parts(
run_id.clone(),
parent_run_id.clone(),
CheckpointReason::UserMessage,
delta_messages,
delta_patches.clone(),
Vec::new(),
state_snapshot_for_delta,
);
if persist_run {
changeset = changeset.with_run_meta(crate::contracts::RunMeta {
agent_id: request.agent_id.clone(),
origin: request.origin,
status: crate::contracts::storage::RunStatus::Running,
parent_thread_id: parent_thread_id.clone(),
termination_code: None,
termination_detail: None,
source_mailbox_entry_id: request.source_mailbox_entry_id.clone(),
input_tokens: 0,
output_tokens: 0,
});
}
let committed = agent_state_store
.append(&thread_id, &changeset, VersionPrecondition::Exact(version))
.await?;
version = committed.version;
thread = thread.with_patches(delta_patches);
thread.metadata.version = Some(version);
let mut run_identity = RunIdentity::new(
thread_id.clone(),
parent_thread_id.clone(),
run_id.clone(),
parent_run_id.clone(),
request.agent_id.clone(),
request.origin,
);
if let Some(parent_tool_call_id) = resolved.parent_tool_call_id.clone() {
run_identity = run_identity.with_parent_tool_call_id(parent_tool_call_id);
}
{
let ids = resolved.agent.behavior.behavior_ids();
let mut seen = std::collections::HashSet::with_capacity(ids.len());
for id in &ids {
if !seen.insert(*id) {
return Err(AgentOsRunError::Resolve(AgentOsResolveError::Wiring(
AgentOsWiringError::BehaviorAlreadyInstalled(id.to_string()),
)));
}
}
}
let run_ctx = RunContext::from_thread_with_registry_and_identity(
&thread,
resolved.run_policy,
run_identity.clone(),
resolved.agent.lattice_registry.clone(),
)
.map_err(|e| AgentOsRunError::Loop(AgentLoopError::StateError(e.to_string())))?;
let (decision_tx, decision_rx) = tokio::sync::mpsc::unbounded_channel();
for decision in initial_decisions {
decision_tx
.send(decision)
.map_err(|e| AgentOsRunError::Loop(AgentLoopError::StateError(e.to_string())))?;
}
Ok(PreparedRun {
agent: Arc::new(resolved.agent),
tools: resolved.tools,
run_ctx,
cancellation_token: None,
state_committer: Some(Arc::new(AgentStateStoreStateCommitter::new(
agent_state_store.clone(),
persist_run,
))),
decision_tx,
decision_rx,
})
}
pub fn execute_prepared(prepared: PreparedRun) -> Result<RunStream, AgentOsRunError> {
let thread_id = prepared.thread_id().to_string();
let run_id = prepared.run_id().to_string();
let run_identity = prepared.run_ctx.run_identity().clone();
let events = run_loop_stream_with_context(
prepared.agent,
prepared.tools,
prepared.run_ctx,
run_identity,
prepared.cancellation_token,
prepared.state_committer,
Some(prepared.decision_rx),
);
Ok(RunStream {
thread_id,
run_id,
decision_tx: prepared.decision_tx,
events,
})
}
pub async fn run_stream(&self, request: RunRequest) -> Result<RunStream, AgentOsRunError> {
let resolved = self.resolve(&request.agent_id)?;
let prepared = self.prepare_run(request, resolved).await?;
Self::execute_prepared(prepared)
}
#[cfg(feature = "handoff")]
pub async fn run(&self, request: RunRequest) -> Result<RunStream, AgentOsRunError> {
self.run_stream(request).await
}
fn dedup_messages(thread: &Thread, incoming: Vec<Message>) -> Vec<Message> {
use std::collections::HashSet;
let existing_ids: HashSet<&str> = thread
.messages
.iter()
.filter_map(|m| m.id.as_deref())
.collect();
let existing_tool_call_ids: HashSet<&str> = thread
.messages
.iter()
.filter_map(|m| m.tool_call_id.as_deref())
.collect();
incoming
.into_iter()
.filter(|m| {
if let Some(ref tc_id) = m.tool_call_id {
if existing_tool_call_ids.contains(tc_id.as_str()) {
return false;
}
}
if let Some(ref id) = m.id {
if existing_ids.contains(id.as_str()) {
return false;
}
}
true
})
.collect()
}
fn attach_run_metadata_to_messages(mut messages: Vec<Message>, run_id: &str) -> Vec<Message> {
messages.iter_mut().for_each(|message| {
let mut metadata = message.metadata.clone().unwrap_or_default();
metadata.run_id = Some(run_id.to_string());
message.metadata = Some(metadata);
});
messages
}
}