use anda_core::{
Agent, AgentContext, AgentOutput, BoxError, CompletionFeatures, CompletionRequest, Document,
Documents, FunctionDefinition, Json, Message, Resource, StateFeatures,
};
use anda_engine::{
context::AgentCtx,
extension::note::{NoteTool, load_notes},
local_date_hour,
memory::{
Conversation, ConversationRef, ConversationStatus, Conversations, MemoryManagement,
MemoryReadonly,
},
unix_ms,
};
use parking_lot::RwLock;
use serde_json::{Map, json};
use std::{
collections::VecDeque,
sync::{Arc, LazyLock},
};
use super::{BrainHook, SELF_USER_ID};
use crate::types::RecallInput;
const SELF_INSTRUCTIONS: &str = include_str!("../../assets/BrainRecall.md");
pub static FUNCTION_DEFINITION: LazyLock<FunctionDefinition> = LazyLock::new(|| {
serde_json::from_value(json!({
"name": "recall_memory",
"description": "Recall information from the assistant's long-term memory (the Cognitive Nexus owned by $self). Use only for information that is not already present in the active conversation. Do not call for facts just mentioned, just submitted to formation, or otherwise available in current context; formation is asynchronous and fresh memories may take a minute or more to become searchable.",
"parameters": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "A natural language question about older or out-of-context memory. Be specific and include the subject, timeframe, and topic when known. Examples: 'What do we know about the current user's communication preferences?', 'What happened in our last discussion about Project Aurora?', 'Who are the members of the engineering team?'"
},
"context": {
"type": [
"object",
"null"
],
"description": "Optional current conversational context used only to disambiguate the query within $self's memory. Pass an object, not a JSON string. It does not change the memory owner.",
"properties": {
"counterparty": {
"type": [
"string",
"null"
],
"description": "Preferred. Durable identifier of the current external person or organization interacting with the business agent. Useful for resolving implicit references such as 'the current user', 'they', or omitted subjects."
},
"agent": {
"type": [
"string",
"null"
],
"description": "The identifier of the calling business agent, if applicable. Useful for provenance or caller-specific queries, but it does not change whose memory is searched."
},
"source": {
"type": [
"string",
"null"
],
"description": "Identifier of the current source, thread, channel, or app context. Useful when the query refers to a previous discussion in the same place."
},
"topic": {
"type": [
"string",
"null"
],
"description": "The topic of the current conversation, to help disambiguate the query."
}
},
"required": [
"counterparty",
"agent",
"source",
"topic"
],
"additionalProperties": false
}
},
"required": [
"query",
"context"
],
"additionalProperties": false
},
"strict": true
})).unwrap()
});
#[derive(Clone)]
pub struct RecallAgent {
pub conversations: Conversations,
memory: Arc<MemoryManagement>,
hook: Arc<dyn BrainHook>,
history: Arc<RwLock<VecDeque<Document>>>,
#[allow(dead_code)]
max_input_tokens: usize,
}
impl RecallAgent {
pub const NAME: &'static str = "recall_memory";
pub fn new(
memory: Arc<MemoryManagement>,
conversations: Conversations,
hook: Arc<dyn BrainHook>,
max_input_tokens: usize,
) -> Self {
Self {
conversations,
memory,
hook,
history: Arc::new(RwLock::new(VecDeque::new())),
max_input_tokens,
}
}
pub async fn init(&self) -> Result<(), BoxError> {
let (conversations, _) = self
.conversations
.list_conversations_by_user(&SELF_USER_ID, None, Some(3))
.await?;
*self.history.write() = conversations.into_iter().map(Document::from).collect();
Ok(())
}
pub async fn get_or_init_counterparty(
&self,
counterparty: String,
name: Option<String>,
) -> Result<Json, BoxError> {
let mut attributes = Map::new();
let mut metadata = Map::new();
attributes.insert("id".to_string(), counterparty.clone().into());
attributes.insert("person_class".to_string(), "Human".into());
if let Some(name) = name {
attributes.insert("name".to_string(), name.into());
}
metadata.insert("author".to_string(), "$system".into());
metadata.insert("status".to_string(), "active".into());
let user = self
.memory
.nexus
.get_or_init_concept("Person".to_string(), counterparty, attributes, metadata)
.await?;
Ok(user.to_concept_node())
}
}
impl Agent<AgentCtx> for RecallAgent {
fn name(&self) -> String {
Self::NAME.to_string()
}
fn description(&self) -> String {
FUNCTION_DEFINITION.description.clone()
}
fn definition(&self) -> FunctionDefinition {
FUNCTION_DEFINITION.clone()
}
fn tool_dependencies(&self) -> Vec<String> {
vec![MemoryReadonly::NAME.to_string(), NoteTool::NAME.to_string()]
}
async fn run(
&self,
ctx: AgentCtx,
prompt: String, _resources: Vec<Resource>,
) -> Result<AgentOutput, BoxError> {
let caller = ctx.caller();
let now_ms = unix_ms();
let counterparty_info = if let Ok(input) = serde_json::from_str::<RecallInput>(&prompt)
&& let Some(ctx) = input.context
&& let Some(counterparty) = ctx.counterparty
{
self.get_or_init_counterparty(counterparty, None).await.ok()
} else {
None
};
let chat_history: Vec<Document> = { self.history.read().iter().cloned().collect() };
let chat_history = if chat_history.is_empty() {
vec![]
} else {
vec![Message {
role: "user".into(),
content: vec![
Documents::new("history_recall".to_string(), chat_history)
.to_string()
.into(),
],
name: Some("$system".into()),
timestamp: Some(now_ms),
..Default::default()
}]
};
let primer = self.memory.describe_primer().await.unwrap_or_default();
let mut conversation = Conversation {
user: *caller,
messages: vec![serde_json::json!(Message {
role: "user".into(),
content: vec![prompt.clone().into()],
timestamp: Some(now_ms),
..Default::default()
})],
status: ConversationStatus::Working,
period: now_ms / 3600 / 1000,
created_at: now_ms,
updated_at: now_ms,
label: Some("recall".to_string()),
..Default::default()
};
let id = self
.conversations
.add_conversation(ConversationRef::from(&conversation))
.await?;
conversation._id = id;
let notes = load_notes(&ctx).await.unwrap_or_default();
match ctx
.completion(
CompletionRequest {
instructions: format!(
"{}\n\n---\n\n# `DESCRIBE PRIMER` Result:\n{}\n\n---\n\n# Your Notes:\n{}\n\n# Counterparty profile:\n{}\n\n# Current Datetime: {}",
SELF_INSTRUCTIONS,
primer,
serde_json::to_string(¬es.notes).unwrap_or_default(),
serde_json::to_string(&counterparty_info).unwrap_or_default(),
local_date_hour(now_ms).unwrap_or_default()
),
prompt,
chat_history,
tools: ctx.tool_definitions(Some(&self.tool_dependencies())),
tool_choice_required: true,
..Default::default()
},
vec![],
)
.await
{
Ok(mut output) => {
conversation.messages.clear();
conversation.append_messages(output.chat_history.clone());
conversation.status = if output.failed_reason.is_some() {
ConversationStatus::Failed
} else {
ConversationStatus::Completed
};
conversation.usage = output.usage.clone();
conversation.updated_at = now_ms;
if let Some(ref failed_reason) = output.failed_reason {
conversation.failed_reason = Some(failed_reason.clone());
} else {
let doc: Document = conversation.clone().into();
let mut history = self.history.write();
history.push_back(doc);
let len = history.len();
if len > 3 {
history.drain(0..(len - 3));
}
}
if let Ok(changes) = conversation.to_changes() {
let _ = self.conversations.update_conversation(conversation._id, changes).await;
}
self.hook
.on_conversation_end(Self::NAME, &conversation)
.await;
output.conversation = Some(conversation._id);
Ok(output)
}
Err(err) => {
conversation.status = ConversationStatus::Failed;
conversation.failed_reason = Some(err.to_string());
conversation.updated_at = unix_ms();
if let Ok(changes) = conversation.to_changes() {
let _ = self.conversations.update_conversation(conversation._id, changes).await;
}
self.hook
.on_conversation_end(Self::NAME, &conversation)
.await;
Err(err)
}
}
}
}