use super::*;
use crate::contracts::storage::VersionPrecondition;
use crate::runtime::loop_runner::run_loop_stream;
impl AgentOs {
pub fn agent_state_store(&self) -> Option<&Arc<dyn AgentStateStore>> {
self.agent_state_store.as_ref()
}
fn require_agent_state_store(&self) -> Result<&Arc<dyn AgentStateStore>, AgentOsRunError> {
self.agent_state_store
.as_ref()
.ok_or(AgentOsRunError::AgentStateStoreNotConfigured)
}
fn generate_id() -> String {
uuid::Uuid::now_v7().simple().to_string()
}
pub async fn load_agent_state(
&self,
id: &str,
) -> Result<Option<AgentStateHead>, AgentOsRunError> {
let agent_state_store = self.require_agent_state_store()?;
Ok(agent_state_store.load(id).await?)
}
pub async fn prepare_run(
&self,
mut request: RunRequest,
mut resolved: ResolvedRun,
) -> Result<PreparedRun, AgentOsRunError> {
let agent_state_store = self.require_agent_state_store()?;
let thread_id = request.thread_id.unwrap_or_else(Self::generate_id);
let run_id = request.run_id.unwrap_or_else(Self::generate_id);
let parent_run_id = request.parent_run_id.clone();
let frontend_state = request.state.take();
let mut state_snapshot_for_delta: Option<serde_json::Value> = None;
let (mut thread, mut version) = match agent_state_store.load(&thread_id).await? {
Some(head) => {
let mut t = head.agent_state;
if let Some(state) = frontend_state {
t.state = state.clone();
t.patches.clear();
state_snapshot_for_delta = Some(state);
}
(t, head.version)
}
None => {
let thread = if let Some(state) = frontend_state {
Thread::with_initial_state(thread_id.clone(), state)
} else {
Thread::new(thread_id.clone())
};
let committed = agent_state_store.create(&thread).await?;
(thread, committed.version)
}
};
if let Some(ref resource_id) = request.resource_id {
thread.resource_id = Some(resource_id.clone());
}
let deduped = Self::dedup_messages(&thread, request.messages);
if !deduped.is_empty() {
thread = thread.with_messages(deduped);
}
let pending = thread.take_pending();
if !pending.is_empty() || state_snapshot_for_delta.is_some() {
let changeset = crate::contracts::ThreadChangeSet::from_parts(
run_id.clone(),
parent_run_id.clone(),
CheckpointReason::UserMessage,
pending.messages,
pending.patches,
state_snapshot_for_delta,
);
let committed = agent_state_store
.append(&thread_id, &changeset, VersionPrecondition::Exact(version))
.await?;
version = committed.version;
}
thread.metadata.version = Some(version);
let _ = resolved.run_config.set("run_id", run_id.clone());
if let Some(parent) = parent_run_id.as_deref() {
let _ = resolved.run_config.set("parent_run_id", parent.to_string());
}
Self::ensure_unique_plugin_ids(&resolved.config.plugins)
.map_err(AgentOsResolveError::from)
.map_err(AgentOsRunError::from)?;
let run_ctx = RunContext::from_thread(&thread, resolved.run_config)
.map_err(|e| AgentOsRunError::Loop(AgentLoopError::StateError(e.to_string())))?;
Ok(PreparedRun {
thread_id,
run_id,
config: resolved.config,
tools: resolved.tools,
run_ctx,
cancellation_token: None,
state_committer: Some(Arc::new(AgentStateStoreStateCommitter::new(
agent_state_store.clone(),
))),
})
}
pub fn execute_prepared(prepared: PreparedRun) -> Result<RunStream, AgentOsRunError> {
let events = run_loop_stream(
prepared.config,
prepared.tools,
prepared.run_ctx,
prepared.cancellation_token,
prepared.state_committer,
);
Ok(RunStream {
thread_id: prepared.thread_id,
run_id: prepared.run_id,
events,
})
}
pub async fn run_stream(&self, request: RunRequest) -> Result<RunStream, AgentOsRunError> {
let resolved = self.resolve(&request.agent_id)?;
let prepared = self.prepare_run(request, resolved).await?;
Self::execute_prepared(prepared)
}
fn dedup_messages(thread: &Thread, incoming: Vec<Message>) -> Vec<Message> {
use std::collections::HashSet;
let existing_ids: HashSet<&str> = thread
.messages
.iter()
.filter_map(|m| m.id.as_deref())
.collect();
let existing_tool_call_ids: HashSet<&str> = thread
.messages
.iter()
.filter_map(|m| m.tool_call_id.as_deref())
.collect();
incoming
.into_iter()
.filter(|m| {
if let Some(ref tc_id) = m.tool_call_id {
if existing_tool_call_ids.contains(tc_id.as_str()) {
return false;
}
}
if let Some(ref id) = m.id {
if existing_ids.contains(id.as_str()) {
return false;
}
}
true
})
.collect()
}
pub(crate) fn run_stream_with_context(
&self,
agent_id: &str,
thread: Thread,
cancellation_token: Option<RunCancellationToken>,
state_committer: Option<Arc<dyn StateCommitter>>,
) -> Result<impl futures::Stream<Item = AgentEvent> + Send, AgentOsRunError> {
let resolved = self.resolve(agent_id)?;
let run_ctx = RunContext::from_thread(&thread, resolved.run_config)
.map_err(|e| AgentOsRunError::Loop(AgentLoopError::StateError(e.to_string())))?;
Ok(run_loop_stream(
resolved.config,
resolved.tools,
run_ctx,
cancellation_token,
state_committer,
))
}
}