use anda_cloud_cdk::{ChallengeEnvelope, ChallengeRequest, TEEInfo, TEEKind};
use anda_core::{
Agent, AgentInput, AgentOutput, AgentSet, BoxError, Function, Json, Path, RequestMeta,
Resource, Tool, ToolInput, ToolOutput, ToolSet, validate_function_name,
};
use candid::Principal;
use ic_tee_cdk::AttestationRequest;
use object_store::memory::InMemory;
use std::{
collections::{BTreeSet, HashMap},
sync::{Arc, OnceLock, Weak},
};
use structured_logger::unix_ms;
use tokio_util::sync::{CancellationToken, WaitForCancellationFuture};
use crate::{
context::{
AgentCtx, BaseCtx, SubAgentManager, SubAgentSetManager, ToolsSearch, ToolsSelect,
Web3Client, Web3SDK,
},
hook::{Hook, Hooks},
management::{BaseManagement, Management, SYSTEM_PATH, Visibility},
model::{Model, Models},
store::Store,
};
pub use crate::context::{AgentInfo, EngineCard, RemoteEngineArgs, RemoteEngines};
pub struct Engine {
id: Principal,
ctx: AgentCtx,
info: AgentInfo,
default_agent: String,
export_agents: BTreeSet<String>,
export_tools: BTreeSet<String>,
hooks: Arc<Hooks>,
management: Arc<dyn Management>,
}
impl Engine {
pub fn builder() -> EngineBuilder {
EngineBuilder::new()
}
pub fn id(&self) -> Principal {
self.id
}
pub fn info(&self) -> &AgentInfo {
&self.info
}
pub fn info_mut(&mut self) -> &mut AgentInfo {
&mut self.info
}
pub fn default_agent(&self) -> String {
self.default_agent.clone()
}
pub fn cancel(&self) {
self.ctx.base.cancellation_token.cancel()
}
pub fn is_cancelled(&self) -> bool {
self.ctx.base.cancellation_token.is_cancelled()
}
pub fn cancelled(&self) -> WaitForCancellationFuture<'_> {
self.ctx.base.cancellation_token.cancelled()
}
pub fn cancellation_token(&self) -> CancellationToken {
self.ctx.base.cancellation_token.child_token()
}
pub fn models(&self) -> Arc<Models> {
self.ctx.models.clone()
}
pub async fn close(&self) -> Result<(), BoxError> {
self.ctx.base.cancellation_token.cancel();
self.cancelled().await;
Ok(())
}
pub fn ctx_with(
&self,
caller: Principal,
agent_name: &str,
agent_label: &str,
meta: RequestMeta,
) -> Result<AgentCtx, BoxError> {
let name = agent_name.to_ascii_lowercase();
if (!self.export_agents.contains(&name) && !self.management.is_manager(&caller))
|| !self.ctx.agents.contains(&name)
{
return Err(format!("agent {} not found", name).into());
}
self.ctx.child_with(caller, &name, agent_label, meta)
}
pub async fn agent_run(
&self,
caller: Principal,
mut input: AgentInput,
) -> Result<AgentOutput, BoxError> {
let meta = input.meta.unwrap_or_default();
if meta.engine.is_some() && meta.engine != Some(self.id) {
return Err(format!(
"invalid engine ID, expected {}, got {:?}",
self.id.to_text(),
meta.engine
)
.into());
}
if let Some(user) = &meta.user {
let u = user.trim();
if u.is_empty() || u != user || u.len() > 32 {
return Err(format!("invalid user name {:?}", user).into());
}
}
input.name = if input.name.is_empty() {
self.default_agent.clone()
} else {
input.name.to_ascii_lowercase()
};
let agent = self
.ctx
.agents
.get(&input.name)
.ok_or_else(|| format!("agent {} not found", input.name))?;
let visibility = self.management.check_visibility(&caller)?;
if visibility == Visibility::Protected && !self.management.is_manager(&caller) {
return Err("caller does not have permission".into());
}
let ctx = self.ctx_with(caller, &input.name, agent.label(), meta)?;
self.hooks.on_agent_start(&ctx, &input.name).await?;
let output = agent
.run(ctx.clone(), input.prompt, input.resources)
.await?;
let mut output = self.hooks.on_agent_end(&ctx, &input.name, output).await?;
output.raw_history.clear(); Ok(output)
}
pub async fn tool_call(
&self,
caller: Principal,
input: ToolInput<Json>,
) -> Result<ToolOutput<Json>, BoxError> {
let meta = input.meta.unwrap_or_default();
if meta.engine.is_some() && meta.engine != Some(self.id) {
return Err(format!(
"invalid engine ID, expected {}, got {:?}",
self.id.to_text(),
meta.engine
)
.into());
}
if let Some(user) = &meta.user {
let u = user.trim();
if u.is_empty() || u != user || u.len() > 32 {
return Err(format!("invalid user name {:?}", user).into());
}
}
if !self.export_tools.contains(&input.name) && !self.management.is_manager(&caller) {
return Err(format!("tool {} not found", &input.name).into());
}
let tool = self
.ctx
.tools
.get(&input.name)
.ok_or_else(|| format!("tool {} not found", &input.name))?;
let visibility = self.management.check_visibility(&caller)?;
if visibility == Visibility::Protected && !self.management.is_manager(&caller) {
return Err("caller does not have permission".into());
}
let ctx = self
.ctx
.child_base_with(caller, &self.default_agent, &input.name, meta)?;
self.hooks.on_tool_start(&ctx, &input.name).await?;
let output = tool.call(ctx.clone(), input.args, input.resources).await?;
let res = self.hooks.on_tool_end(&ctx, &input.name, output).await?;
Ok(res)
}
pub fn agents(&self, names: Option<&[String]>) -> Vec<Function> {
self.ctx.agents.functions(names)
}
pub fn tools(&self, names: Option<&[String]>) -> Vec<Function> {
self.ctx.tools.functions(names)
}
pub fn sub_agents_manager(&self) -> Arc<SubAgentSetManager> {
self.ctx.subagents.clone()
}
pub async fn challenge(
&self,
request: ChallengeRequest,
) -> Result<ChallengeEnvelope, BoxError> {
let now_ms = unix_ms();
request.verify(now_ms, request.registry)?;
let message_digest = request.digest();
let res = match self.ctx.base.web3.as_ref() {
Web3SDK::Tee(cli) => {
let authentication = cli.sign_envelope(message_digest).await?;
let tee = cli
.sign_attestation(AttestationRequest {
public_key: Some(authentication.pubkey.clone()),
user_data: None,
nonce: Some(request.code.to_vec().into()),
})
.await?;
let info = cli
.tee_info()
.ok_or_else(|| "TEE not available".to_string())?;
ChallengeEnvelope {
request,
authentication,
tee: Some(TEEInfo {
id: info.id,
kind: TEEKind::try_from(tee.kind.as_str())?,
url: info.url,
attestation: Some(tee.attestation),
}),
}
}
Web3SDK::Web3(Web3Client { client: cli }) => {
let authentication = cli.sign_envelope(message_digest).await?;
ChallengeEnvelope {
request,
authentication,
tee: None,
}
}
};
Ok(res)
}
pub fn information(&self) -> EngineCard {
EngineCard {
id: self.id,
info: self.info.clone(),
agents: self.agents(Some(
self.export_agents
.iter()
.cloned()
.collect::<Vec<_>>()
.as_slice(),
)),
tools: self.tools(Some(
self.export_tools
.iter()
.cloned()
.collect::<Vec<_>>()
.as_slice(),
)),
}
}
}
#[non_exhaustive]
pub struct EngineBuilder {
info: AgentInfo,
tools: ToolSet<BaseCtx>,
agents: AgentSet<AgentCtx>,
remote: HashMap<String, RemoteEngineArgs>,
models: Arc<Models>,
store: Store,
web3: Arc<Web3SDK>,
hooks: Arc<Hooks>,
cancellation_token: CancellationToken,
export_agents: BTreeSet<String>,
export_tools: BTreeSet<String>,
management: Option<Arc<dyn Management>>,
}
impl Default for EngineBuilder {
fn default() -> Self {
Self::new()
}
}
impl EngineBuilder {
pub fn new() -> Self {
let mstore = Arc::new(InMemory::new());
let mut agents = AgentSet::new();
agents
.add(Arc::new(ToolsSearch::new()), Some("flash".to_string()))
.unwrap();
agents
.add(Arc::new(ToolsSelect::new()), Some("flash".to_string()))
.unwrap();
EngineBuilder {
info: AgentInfo {
handle: "anda".to_string(),
name: "Anda Engine".to_string(),
description: "Anda Engine for managing agents and tools".to_string(),
endpoint: "https://localhost:8443/default".to_string(),
..Default::default()
},
tools: ToolSet::new(),
agents,
remote: HashMap::new(),
models: Arc::new(Models::default()),
store: Store::new(mstore),
web3: Arc::new(Web3SDK::Web3(Web3Client::not_implemented())),
hooks: Arc::new(Hooks::new()),
cancellation_token: CancellationToken::new(),
export_agents: BTreeSet::new(),
export_tools: BTreeSet::new(),
management: None,
}
}
pub fn with_info(mut self, info: AgentInfo) -> Self {
self.info = info;
self
}
pub fn with_cancellation_token(mut self, cancellation_token: CancellationToken) -> Self {
self.cancellation_token = cancellation_token;
self
}
pub fn with_web3_client(mut self, web3: Arc<Web3SDK>) -> Self {
self.web3 = web3;
self
}
pub fn with_model(self, model: Model) -> Self {
self.models.set_model(model);
self
}
pub fn with_fallback_model(self, model: Model) -> Self {
self.models.set_fallback_model(model);
self
}
pub fn with_models(self, models: HashMap<String, Model>) -> Self {
self.models.set_models(models);
self
}
pub fn set_models(mut self, models: Arc<Models>) -> Self {
self.models = models;
self
}
pub fn with_store(mut self, store: Store) -> Self {
self.store = store;
self
}
pub fn with_management(mut self, management: Arc<dyn Management>) -> Self {
self.management = Some(management);
self
}
pub fn register_tool<T>(mut self, tool: Arc<T>) -> Result<Self, BoxError>
where
T: Tool<BaseCtx> + Send + Sync + 'static,
{
self.tools.add(tool)?;
Ok(self)
}
pub fn register_tools(mut self, tools: ToolSet<BaseCtx>) -> Result<Self, BoxError> {
for (name, tool) in tools.set {
if self.tools.set.contains_key(&name) {
return Err(format!("tool {} already exists", name).into());
}
self.tools.set.insert(name, tool);
}
Ok(self)
}
pub fn register_agent<T>(
mut self,
agent: Arc<T>,
label: Option<String>,
) -> Result<Self, BoxError>
where
T: Agent<AgentCtx> + Send + Sync + 'static,
{
for tool in agent.tool_dependencies() {
if !self.tools.contains(&tool) && !self.agents.contains(&tool) {
return Err(format!("dependent tool {} not found", tool).into());
}
}
self.agents.add(agent, label)?;
Ok(self)
}
pub fn register_agents(mut self, agents: AgentSet<AgentCtx>) -> Result<Self, BoxError> {
for (name, agent) in agents.set {
if self.agents.set.contains_key(&name) {
return Err(format!("agent {} already exists", name).into());
}
for tool in agent.tool_dependencies() {
if !self.tools.contains(&tool) && !self.agents.contains(&tool) {
return Err(format!("dependent tool {} not found", tool).into());
}
}
self.agents.set.insert(name, agent);
}
Ok(self)
}
pub fn register_remote_engine(mut self, engine: RemoteEngineArgs) -> Result<Self, BoxError> {
if self.remote.contains_key(&engine.endpoint) {
return Err(format!("remote engine {} already exists", engine.endpoint).into());
}
if let Some(handle) = &engine.handle {
validate_function_name(handle)
.map_err(|err| format!("invalid engine handle {}: {}", handle, err))?;
}
self.remote.insert(engine.endpoint.clone(), engine);
Ok(self)
}
pub fn export_agents(mut self, agents: Vec<String>) -> Self {
for mut agent in agents {
agent.make_ascii_lowercase();
self.export_agents.insert(agent);
}
self
}
pub fn export_tools(mut self, tools: Vec<String>) -> Self {
for tool in tools {
self.export_tools.insert(tool);
}
self
}
pub fn with_hooks(mut self, hooks: Arc<Hooks>) -> Self {
self.hooks = hooks;
self
}
pub fn empty(mut self) -> Engine {
let id = self.web3.as_ref().get_principal();
let mut names: BTreeSet<Path> = self
.tools
.set
.keys()
.map(|p| Path::from(format!("T:{}", p)))
.chain(
self.agents
.set
.keys()
.map(|p| Path::from(format!("A:{}", p))),
)
.collect();
names.insert(Path::from(SYSTEM_PATH));
let ctx = BaseCtx::new(
id,
self.info.name.clone(),
"".to_string(),
self.cancellation_token,
names,
self.web3,
self.store,
Arc::new(RemoteEngines::new()),
);
let subagent_manager = Arc::new(SubAgentManager::new(ctx.clone()));
self.tools.add(subagent_manager.clone()).unwrap();
let subagents = SubAgentSetManager::new();
subagents.insert(subagent_manager);
let ctx = AgentCtx::new(
ctx,
self.models,
Arc::new(self.tools),
Arc::new(self.agents),
Arc::new(subagents),
);
Engine {
id,
ctx,
info: self.info,
default_agent: String::new(),
export_agents: self.export_agents,
export_tools: self.export_tools,
hooks: self.hooks,
management: self.management.unwrap_or_else(|| {
Arc::new(BaseManagement {
controller: id,
managers: BTreeSet::new(),
visibility: Visibility::Private, })
}),
}
}
pub async fn build(mut self, default_agent: String) -> Result<Engine, BoxError> {
let default_agent = default_agent.to_ascii_lowercase();
if !self.agents.contains(&default_agent) {
return Err(format!("default agent {} not found", default_agent).into());
}
self.export_agents.insert(default_agent.clone());
self.info.validate()?;
let id = self.web3.as_ref().get_principal();
let mut names: BTreeSet<Path> = self
.tools
.set
.keys()
.map(|p| Path::from(format!("T:{}", p)))
.chain(
self.agents
.set
.keys()
.map(|p| Path::from(format!("A:{}", p))),
)
.collect();
names.insert(Path::from(SYSTEM_PATH));
let mut remote = RemoteEngines::new();
for (_, engine) in self.remote {
remote.register(self.web3.as_ref(), engine).await?;
}
let ctx = BaseCtx::new(
id,
self.info.name.clone(),
default_agent.clone(),
self.cancellation_token,
names,
self.web3,
self.store,
Arc::new(remote),
);
let subagent_manager = Arc::new(SubAgentManager::new(ctx.clone()));
subagent_manager.load().await?;
self.tools.add(subagent_manager.clone())?;
let subagents = SubAgentSetManager::new();
subagents.insert(subagent_manager);
let tools = Arc::new(self.tools);
let agents = Arc::new(self.agents);
let ctx = AgentCtx::new(
ctx,
self.models,
tools.clone(),
agents.clone(),
Arc::new(subagents),
);
let meta = RequestMeta::default();
for (name, tool) in &tools.set {
let ct = ctx.child_base_with(id, &default_agent, name, meta.clone())?;
tool.init(ct).await?;
}
for (name, agent) in &agents.set {
let ct = ctx.child_with(id, name, agent.label(), meta.clone())?;
agent.init(ct).await?;
}
Ok(Engine {
id,
ctx,
info: self.info,
default_agent,
export_agents: self.export_agents,
export_tools: self.export_tools,
hooks: self.hooks,
management: self.management.unwrap_or_else(|| {
Arc::new(BaseManagement {
controller: id,
managers: BTreeSet::new(),
visibility: Visibility::Private, })
}),
})
}
pub fn mock_ctx(mut self) -> AgentCtx {
let mut names: BTreeSet<Path> = self
.tools
.set
.keys()
.map(|p| Path::from(format!("T:{}", p)))
.chain(
self.agents
.set
.keys()
.map(|p| Path::from(format!("A:{}", p))),
)
.collect();
names.insert(Path::from(SYSTEM_PATH));
let ctx = BaseCtx::new(
Principal::anonymous(),
"Mocker".to_string(),
"Mocker".to_string(),
self.cancellation_token,
names,
self.web3,
self.store,
Arc::new(RemoteEngines::new()),
);
let subagent_manager = Arc::new(SubAgentManager::new(ctx.clone()));
self.tools.add(subagent_manager.clone()).unwrap();
let subagents = SubAgentSetManager::new();
subagents.insert(subagent_manager);
AgentCtx::new(
ctx,
self.models,
Arc::new(self.tools),
Arc::new(self.agents),
Arc::new(subagents),
)
}
}
pub struct EchoEngineInfo {
info: AgentInfo,
content: String,
}
impl EchoEngineInfo {
pub fn new(info: AgentInfo) -> Self {
let content = serde_json::to_string(&info).unwrap_or_default();
Self { info, content }
}
}
impl Agent<AgentCtx> for EchoEngineInfo {
fn name(&self) -> String {
self.info.handle.clone()
}
fn description(&self) -> String {
self.info.description.clone()
}
async fn run(
&self,
_ctx: AgentCtx,
_prompt: String,
_resources: Vec<Resource>,
) -> Result<AgentOutput, BoxError> {
Ok(AgentOutput {
content: self.content.clone(),
..Default::default()
})
}
}
pub struct EngineRef {
inner: OnceLock<Weak<Engine>>,
}
impl Default for EngineRef {
fn default() -> Self {
Self::new()
}
}
impl EngineRef {
pub fn new() -> Self {
Self {
inner: OnceLock::new(),
}
}
pub fn bind(&self, engine: Weak<Engine>) {
let _ = self.inner.set(engine);
}
pub fn get(&self) -> Option<Arc<Engine>> {
self.inner.get().and_then(Weak::upgrade)
}
}