use anda_cognitive_nexus::ConceptPK;
use anda_core::{
Agent, AgentContext, AgentOutput, BoxError, CompletionFeatures, CompletionRequest, Document,
Documents, FunctionDefinition, Json, Message, Resource, StateFeatures, Tool, ToolOutput,
estimate_tokens,
};
use anda_engine::{
context::{AgentCtx, BaseCtx},
extension::note::{NoteTool, load_notes},
local_date_hour,
memory::{
Conversation, ConversationRef, ConversationStatus, Conversations, MemoryManagement,
MemoryReadonly,
},
unix_ms,
};
use parking_lot::RwLock;
use serde_json::json;
use std::{
collections::VecDeque,
sync::{Arc, LazyLock},
time::Duration,
};
use tokio::time::timeout;
use anda_kip::{KipError, KipErrorCode, Request, Response};
use super::{BrainHook, SELF_USER_ID};
use crate::types::RecallInput;
const SELF_INSTRUCTIONS: &str = include_str!("../../assets/BrainRecall.md");
const RECALL_CONTEXT_TIMEOUT: Duration = Duration::from_secs(2);
pub const READONLY_KIP_TIMEOUT: Duration = Duration::from_secs(15);
pub static FUNCTION_DEFINITION: LazyLock<FunctionDefinition> = LazyLock::new(|| {
serde_json::from_value(json!({
"name": "recall_memory",
"description": "Recall information from the assistant's long-term memory (the Cognitive Nexus owned by $self). Use only for information that is not already present in the active conversation. Do not call for facts just mentioned, just submitted to formation, or otherwise available in current context; formation is asynchronous and fresh memories may take a minute or more to become searchable.",
"parameters": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "A natural language question about older or out-of-context memory. Be specific and include the subject, timeframe, and topic when known. Examples: 'What do we know about the current user's communication preferences?', 'What happened in our last discussion about Project Aurora?', 'Who are the members of the engineering team?'"
},
"context": {
"type": [
"object",
"null"
],
"description": "Optional current conversational context used only to disambiguate the query within $self's memory. Pass an object, not a JSON string. It does not change the memory owner.",
"properties": {
"counterparty": {
"type": [
"string",
"null"
],
"description": "Preferred. Durable identifier of the current external person or organization interacting with the business agent. Useful for resolving implicit references such as 'the current user', 'they', or omitted subjects."
},
"agent": {
"type": [
"string",
"null"
],
"description": "The identifier of the calling business agent, if applicable. Useful for provenance or caller-specific queries, but it does not change whose memory is searched."
},
"source": {
"type": [
"string",
"null"
],
"description": "Identifier of the current source, thread, channel, or app context. Useful when the query refers to a previous discussion in the same place."
},
"topic": {
"type": [
"string",
"null"
],
"description": "The topic of the current conversation, to help disambiguate the query."
}
},
"required": [
"counterparty",
"agent",
"source",
"topic"
],
"additionalProperties": false
}
},
"required": [
"query",
"context"
],
"additionalProperties": false
},
"strict": true
})).unwrap()
});
#[derive(Clone)]
pub struct TimedMemoryReadonly {
memory: Arc<MemoryManagement>,
timeout: Duration,
}
impl TimedMemoryReadonly {
pub fn new(memory: Arc<MemoryManagement>) -> Self {
Self {
memory,
timeout: READONLY_KIP_TIMEOUT,
}
}
}
impl Tool<BaseCtx> for TimedMemoryReadonly {
type Args = Request;
type Output = Response;
fn name(&self) -> String {
MemoryReadonly::NAME.to_string()
}
fn description(&self) -> String {
"Executes one or more KIP (Knowledge Interaction Protocol) commands against the Cognitive Nexus to read from your persistent memory. This tool does not allow any modifications to the memory and is safe to use for retrieval operations.".to_string()
}
fn definition(&self) -> FunctionDefinition {
FunctionDefinition {
name: self.name(),
description: self.description(),
parameters: self.memory.kip_function_definitions.parameters.clone(),
strict: Some(true),
}
}
async fn call(
&self,
_ctx: BaseCtx,
mut request: Self::Args,
_resources: Vec<Resource>,
) -> Result<ToolOutput<Self::Output>, BoxError> {
let res = match timeout(
self.timeout,
request.readonly().execute(self.memory.nexus.as_ref()),
)
.await
{
Ok((_, res)) => res,
Err(_) => Response::err(KipError::new(
KipErrorCode::ExecutionTimeout,
format!(
"read-only KIP execution timed out after {} seconds; memory is busy, retry later",
self.timeout.as_secs()
),
)),
};
let is_error = if matches!(res, Response::Err { .. }) {
Some(true)
} else {
None
};
let mut output = ToolOutput::new(res);
output.is_error = is_error;
Ok(output)
}
}
#[derive(Clone)]
pub struct RecallAgent {
pub conversations: Conversations,
memory: Arc<MemoryManagement>,
hook: Arc<dyn BrainHook>,
history: Arc<RwLock<VecDeque<Document>>>,
#[allow(dead_code)]
max_input_tokens: usize,
}
impl RecallAgent {
pub const NAME: &'static str = "recall_memory";
pub fn new(
memory: Arc<MemoryManagement>,
conversations: Conversations,
hook: Arc<dyn BrainHook>,
max_input_tokens: usize,
) -> Self {
Self {
conversations,
memory,
hook,
history: Arc::new(RwLock::new(VecDeque::new())),
max_input_tokens,
}
}
pub async fn init(&self) -> Result<(), BoxError> {
let (conversations, _) = self
.conversations
.list_conversations_by_user(&SELF_USER_ID, None, Some(3))
.await?;
*self.history.write() = conversations.into_iter().map(Document::from).collect();
Ok(())
}
pub async fn get_counterparty(&self, counterparty: &str) -> Result<Json, BoxError> {
let user = self
.memory
.nexus
.get_concept(&ConceptPK::Object {
r#type: "Person".to_string(),
name: counterparty.to_string(),
})
.await?;
Ok(user.to_concept_node())
}
}
impl Agent<AgentCtx> for RecallAgent {
fn name(&self) -> String {
Self::NAME.to_string()
}
fn description(&self) -> String {
FUNCTION_DEFINITION.description.clone()
}
fn definition(&self) -> FunctionDefinition {
FUNCTION_DEFINITION.clone()
}
fn tool_dependencies(&self) -> Vec<String> {
vec![MemoryReadonly::NAME.to_string(), NoteTool::NAME.to_string()]
}
async fn run(
&self,
ctx: AgentCtx,
prompt: String, _resources: Vec<Resource>,
) -> Result<AgentOutput, BoxError> {
let caller = ctx.caller();
let now_ms = unix_ms();
let token_count = estimate_tokens(&prompt);
if token_count > self.max_input_tokens {
return Err(format!(
"Input too large: {} tokens (estimated), max allowed is {} tokens",
token_count, self.max_input_tokens
)
.into());
}
let counterparty = if let Ok(input) = serde_json::from_str::<RecallInput>(&prompt)
&& let Some(ctx) = input.context
&& let Some(counterparty) = ctx.counterparty
{
Some(counterparty)
} else {
None
};
let counterparty_info = if let Some(counterparty) = counterparty {
match timeout(RECALL_CONTEXT_TIMEOUT, self.get_counterparty(&counterparty)).await {
Ok(Ok(info)) => Some(info),
Ok(Err(err)) => {
log::debug!(
target: "brain",
counterparty;
"recall counterparty profile not available: {err:?}"
);
None
}
Err(_) => {
log::warn!(
target: "brain",
counterparty;
"recall counterparty profile lookup timed out"
);
None
}
}
} else {
None
};
let chat_history: Vec<Document> = { self.history.read().iter().cloned().collect() };
let chat_history = if chat_history.is_empty() {
vec![]
} else {
vec![Message {
role: "user".into(),
content: vec![
Documents::new("history_recall".to_string(), chat_history)
.to_string()
.into(),
],
name: Some("$system".into()),
timestamp: Some(now_ms),
..Default::default()
}]
};
let primer = match timeout(RECALL_CONTEXT_TIMEOUT, self.memory.describe_primer()).await {
Ok(Ok(primer)) => primer,
Ok(Err(err)) => {
log::debug!(target: "brain", "recall primer not available: {err:?}");
Json::default()
}
Err(_) => {
log::warn!(target: "brain", "recall primer lookup timed out");
Json::default()
}
};
let mut conversation = Conversation {
user: *caller,
messages: vec![serde_json::json!(Message {
role: "user".into(),
content: vec![prompt.clone().into()],
timestamp: Some(now_ms),
..Default::default()
})],
status: ConversationStatus::Working,
period: now_ms / 3600 / 1000,
created_at: now_ms,
updated_at: now_ms,
label: Some("recall".to_string()),
..Default::default()
};
let id = self
.conversations
.add_conversation(ConversationRef::from(&conversation))
.await?;
conversation._id = id;
let notes = load_notes(&ctx).await.unwrap_or_default();
match ctx
.completion(
CompletionRequest {
instructions: format!(
"{}\n\n---\n\n# `DESCRIBE PRIMER` Result:\n{}\n\n---\n\n# Your Notes:\n{}\n\n# Counterparty profile:\n{}\n\n# Current Datetime: {}",
SELF_INSTRUCTIONS,
primer,
serde_json::to_string(¬es.notes).unwrap_or_default(),
serde_json::to_string(&counterparty_info).unwrap_or_default(),
local_date_hour(now_ms).unwrap_or_default()
),
prompt,
chat_history,
tools: ctx.tool_definitions(Some(&self.tool_dependencies())),
tool_choice_required: true,
..Default::default()
},
vec![],
)
.await
{
Ok(mut output) => {
conversation.messages.clear();
conversation.append_messages(output.chat_history.clone());
conversation.status = if output.failed_reason.is_some() {
ConversationStatus::Failed
} else {
ConversationStatus::Completed
};
conversation.usage = output.usage.clone();
conversation.updated_at = now_ms;
if let Some(ref failed_reason) = output.failed_reason {
conversation.failed_reason = Some(failed_reason.clone());
} else {
let doc: Document = conversation.clone().into();
let mut history = self.history.write();
history.push_back(doc);
let len = history.len();
if len > 3 {
history.drain(0..(len - 3));
}
}
if let Ok(changes) = conversation.to_changes() {
let _ = self.conversations.update_conversation(conversation._id, changes).await;
}
self.hook
.on_conversation_end(Self::NAME, &conversation)
.await;
output.conversation = Some(conversation._id);
Ok(output)
}
Err(err) => {
conversation.status = ConversationStatus::Failed;
conversation.failed_reason = Some(err.to_string());
conversation.updated_at = unix_ms();
if let Ok(changes) = conversation.to_changes() {
let _ = self.conversations.update_conversation(conversation._id, changes).await;
}
self.hook
.on_conversation_end(Self::NAME, &conversation)
.await;
Err(err)
}
}
}
}
#[cfg(test)]
mod tests {
use super::{FUNCTION_DEFINITION, READONLY_KIP_TIMEOUT, RecallAgent};
#[test]
fn recall_function_definition_matches_agent_contract() {
assert_eq!(RecallAgent::NAME, "recall_memory");
assert_eq!(FUNCTION_DEFINITION.name, RecallAgent::NAME);
assert_eq!(FUNCTION_DEFINITION.strict, Some(true));
assert_eq!(
FUNCTION_DEFINITION
.parameters
.pointer("/properties/query/type")
.and_then(|v| v.as_str()),
Some("string")
);
assert_eq!(
FUNCTION_DEFINITION
.parameters
.pointer("/required")
.and_then(|v| v.as_array())
.map(|values| values.iter().filter_map(|v| v.as_str()).collect::<Vec<_>>()),
Some(vec!["query", "context"])
);
}
#[test]
fn readonly_kip_timeout_stays_bounded() {
assert_eq!(READONLY_KIP_TIMEOUT.as_secs(), 15);
}
}