use std::collections::HashMap;
use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;
use futures::Stream;
use genai::Client;
use crate::contracts::plugin::AgentPlugin;
use crate::contracts::storage::{
AgentStateHead, AgentStateStore, AgentStateStoreError, VersionPrecondition,
};
use crate::contracts::thread::CheckpointReason;
use crate::contracts::thread::Message;
use crate::contracts::thread::Thread;
use crate::contracts::tool::Tool;
use crate::contracts::RunContext;
use crate::contracts::{AgentEvent, RunRequest};
use crate::extensions::skills::{
CompositeSkillRegistry, InMemorySkillRegistry, Skill, SkillDiscoveryPlugin, SkillError,
SkillPlugin, SkillRegistry, SkillRegistryError, SkillRegistryManagerError, SkillRuntimePlugin,
SkillSubsystem, SkillSubsystemError,
};
use crate::runtime::loop_runner::{
AgentConfig, AgentLoopError, RunCancellationToken, StateCommitError, StateCommitter,
};
mod agent_definition;
pub(crate) mod agent_tools;
mod builder;
mod composition;
mod policy;
mod run;
mod wiring;
#[cfg(test)]
mod tests;
pub use agent_definition::AgentDefinition;
use agent_tools::{
AgentRecoveryPlugin, AgentRunManager, AgentRunTool, AgentStopTool, AgentToolsPlugin,
};
pub use composition::{
AgentRegistry, AgentRegistryError, BundleComposeError, BundleComposer,
BundleRegistryAccumulator, BundleRegistryKind, CompositeAgentRegistry, CompositeModelRegistry,
CompositePluginRegistry, CompositeProviderRegistry, CompositeStopPolicyRegistry,
CompositeToolRegistry, InMemoryAgentRegistry, InMemoryModelRegistry, InMemoryPluginRegistry,
InMemoryProviderRegistry, InMemoryStopPolicyRegistry, InMemoryToolRegistry, ModelDefinition,
ModelRegistry, ModelRegistryError, PluginRegistry, PluginRegistryError, ProviderRegistry,
ProviderRegistryError, RegistryBundle, RegistrySet, StopPolicyRegistry,
StopPolicyRegistryError, ToolPluginBundle, ToolRegistry, ToolRegistryError,
};
pub use crate::runtime::loop_runner::ResolvedRun;
#[derive(Clone)]
struct AgentStateStoreStateCommitter {
agent_state_store: Arc<dyn AgentStateStore>,
}
impl AgentStateStoreStateCommitter {
fn new(agent_state_store: Arc<dyn AgentStateStore>) -> Self {
Self { agent_state_store }
}
}
#[async_trait::async_trait]
impl StateCommitter for AgentStateStoreStateCommitter {
async fn commit(
&self,
thread_id: &str,
changeset: crate::contracts::ThreadChangeSet,
precondition: VersionPrecondition,
) -> Result<u64, StateCommitError> {
self.agent_state_store
.append(thread_id, &changeset, precondition)
.await
.map(|committed| committed.version)
.map_err(|e| StateCommitError::new(format!("checkpoint append failed: {e}")))
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum SkillsMode {
Disabled,
DiscoveryAndRuntime,
DiscoveryOnly,
RuntimeOnly,
}
#[derive(Debug, Clone)]
pub struct SkillsConfig {
pub mode: SkillsMode,
pub discovery_max_entries: usize,
pub discovery_max_chars: usize,
}
impl Default for SkillsConfig {
fn default() -> Self {
Self {
mode: SkillsMode::Disabled,
discovery_max_entries: 32,
discovery_max_chars: 16 * 1024,
}
}
}
#[derive(Debug, Clone)]
pub struct AgentToolsConfig {
pub discovery_max_entries: usize,
pub discovery_max_chars: usize,
}
impl Default for AgentToolsConfig {
fn default() -> Self {
Self {
discovery_max_entries: 64,
discovery_max_chars: 16 * 1024,
}
}
}
#[derive(Debug, thiserror::Error)]
pub enum AgentOsWiringError {
#[error("reserved plugin id cannot be used: {0}")]
ReservedPluginId(String),
#[error("plugin not found: {0}")]
PluginNotFound(String),
#[error("stop condition not found: {0}")]
StopConditionNotFound(String),
#[error("plugin id already installed: {0}")]
PluginAlreadyInstalled(String),
#[error("skills tool id already registered: {0}")]
SkillsToolIdConflict(String),
#[error("skills plugin already installed: {0}")]
SkillsPluginAlreadyInstalled(String),
#[error("skills enabled but no skills configured")]
SkillsNotConfigured,
#[error("agent tool id already registered: {0}")]
AgentToolIdConflict(String),
#[error("agent tools plugin already installed: {0}")]
AgentToolsPluginAlreadyInstalled(String),
#[error("agent recovery plugin already installed: {0}")]
AgentRecoveryPluginAlreadyInstalled(String),
#[error("bundle '{bundle_id}' includes unsupported contribution in wiring: {kind}")]
BundleUnsupportedContribution { bundle_id: String, kind: String },
#[error("bundle '{bundle_id}' tool id already registered: {id}")]
BundleToolIdConflict { bundle_id: String, id: String },
#[error("bundle '{bundle_id}' plugin id mismatch: key={key} plugin.id()={plugin_id}")]
BundlePluginIdMismatch {
bundle_id: String,
key: String,
plugin_id: String,
},
}
#[derive(Debug, thiserror::Error)]
pub enum AgentOsBuildError {
#[error(transparent)]
Agents(#[from] AgentRegistryError),
#[error(transparent)]
Bundle(#[from] BundleComposeError),
#[error(transparent)]
Tools(#[from] ToolRegistryError),
#[error(transparent)]
Plugins(#[from] PluginRegistryError),
#[error(transparent)]
Providers(#[from] ProviderRegistryError),
#[error(transparent)]
Models(#[from] ModelRegistryError),
#[error(transparent)]
Skills(#[from] SkillError),
#[error(transparent)]
SkillRegistry(#[from] SkillRegistryError),
#[error(transparent)]
SkillRegistryManager(#[from] SkillRegistryManagerError),
#[error(transparent)]
StopPolicies(#[from] StopPolicyRegistryError),
#[error("agent {agent_id} references an empty plugin id")]
AgentEmptyPluginRef { agent_id: String },
#[error("agent {agent_id} references reserved plugin id: {plugin_id}")]
AgentReservedPluginId { agent_id: String, plugin_id: String },
#[error("agent {agent_id} references unknown plugin id: {plugin_id}")]
AgentPluginNotFound { agent_id: String, plugin_id: String },
#[error("agent {agent_id} has duplicate plugin reference: {plugin_id}")]
AgentDuplicatePluginRef { agent_id: String, plugin_id: String },
#[error("agent {agent_id} references an empty stop condition id")]
AgentEmptyStopConditionRef { agent_id: String },
#[error("agent {agent_id} references unknown stop condition id: {stop_condition_id}")]
AgentStopConditionNotFound {
agent_id: String,
stop_condition_id: String,
},
#[error("agent {agent_id} has duplicate stop condition reference: {stop_condition_id}")]
AgentDuplicateStopConditionRef {
agent_id: String,
stop_condition_id: String,
},
#[error("models configured but no ProviderRegistry configured")]
ProvidersNotConfigured,
#[error("provider not found: {provider_id} (for model id: {model_id})")]
ProviderNotFound {
provider_id: String,
model_id: String,
},
#[error("skills enabled but no skills configured")]
SkillsNotConfigured,
}
#[derive(Debug, thiserror::Error)]
pub enum AgentOsResolveError {
#[error("agent not found: {0}")]
AgentNotFound(String),
#[error("model not found: {0}")]
ModelNotFound(String),
#[error("provider not found: {provider_id} (for model id: {model_id})")]
ProviderNotFound {
provider_id: String,
model_id: String,
},
#[error(transparent)]
Wiring(#[from] AgentOsWiringError),
}
#[derive(Debug, thiserror::Error)]
pub enum AgentOsRunError {
#[error(transparent)]
Resolve(#[from] AgentOsResolveError),
#[error(transparent)]
Loop(#[from] AgentLoopError),
#[error("agent state store error: {0}")]
AgentStateStore(#[from] AgentStateStoreError),
#[error("agent state store not configured")]
AgentStateStoreNotConfigured,
}
pub struct RunStream {
pub thread_id: String,
pub run_id: String,
pub events: Pin<Box<dyn Stream<Item = AgentEvent> + Send>>,
}
pub struct PreparedRun {
pub thread_id: String,
pub run_id: String,
config: AgentConfig,
tools: HashMap<String, Arc<dyn Tool>>,
run_ctx: RunContext,
cancellation_token: Option<RunCancellationToken>,
state_committer: Option<Arc<dyn StateCommitter>>,
}
impl PreparedRun {
#[must_use]
pub fn with_cancellation_token(mut self, token: RunCancellationToken) -> Self {
self.cancellation_token = Some(token);
self
}
}
#[derive(Clone)]
pub struct AgentOs {
default_client: Client,
agents: Arc<dyn AgentRegistry>,
base_tools: Arc<dyn ToolRegistry>,
plugins: Arc<dyn PluginRegistry>,
providers: Arc<dyn ProviderRegistry>,
models: Arc<dyn ModelRegistry>,
stop_policies: Arc<dyn StopPolicyRegistry>,
skills_registry: Option<Arc<dyn SkillRegistry>>,
skills_config: SkillsConfig,
agent_runs: Arc<AgentRunManager>,
agent_tools: AgentToolsConfig,
agent_state_store: Option<Arc<dyn AgentStateStore>>,
}
#[derive(Clone)]
pub struct AgentOsBuilder {
client: Option<Client>,
bundles: Vec<Arc<dyn RegistryBundle>>,
agents: HashMap<String, AgentDefinition>,
agent_registries: Vec<Arc<dyn AgentRegistry>>,
base_tools: HashMap<String, Arc<dyn Tool>>,
base_tool_registries: Vec<Arc<dyn ToolRegistry>>,
plugins: HashMap<String, Arc<dyn AgentPlugin>>,
plugin_registries: Vec<Arc<dyn PluginRegistry>>,
stop_policies: HashMap<String, Arc<dyn crate::contracts::runtime::StopPolicy>>,
stop_policy_registries: Vec<Arc<dyn StopPolicyRegistry>>,
providers: HashMap<String, Client>,
provider_registries: Vec<Arc<dyn ProviderRegistry>>,
models: HashMap<String, ModelDefinition>,
model_registries: Vec<Arc<dyn ModelRegistry>>,
skills: Vec<Arc<dyn Skill>>,
skill_registries: Vec<Arc<dyn SkillRegistry>>,
skills_refresh_interval: Option<Duration>,
skills_config: SkillsConfig,
agent_tools: AgentToolsConfig,
agent_state_store: Option<Arc<dyn AgentStateStore>>,
}