use anda_core::{
Agent, AgentContext, AgentOutput, BoxError, CompletionRequest, Document, Documents, Message,
Resource, StateFeatures, Tool, estimate_tokens,
};
use anda_db::schema::{DocumentId, Json, Map};
use anda_engine::{
context::AgentCtx,
extension::note::{NoteTool, load_notes},
local_date_hour,
memory::{Conversation, ConversationRef, ConversationStatus, MemoryManagement},
unix_ms,
};
use parking_lot::RwLock;
use serde_json::json;
use std::{
collections::VecDeque,
sync::{
Arc,
atomic::{AtomicU64, Ordering},
},
};
use super::BrainHook;
use crate::types::FormationInput;
const SELF_INSTRUCTIONS: &str = include_str!("../../assets/BrainFormation.md");
const REVIEW_INSTRUCTIONS: &str = include_str!("../../assets/BrainFormationReview.md");
struct ProcessingGuard(Arc<AtomicU64>);
impl Drop for ProcessingGuard {
fn drop(&mut self) {
self.0.store(0, Ordering::SeqCst);
}
}
#[derive(Clone)]
pub struct FormationAgent {
memory: Arc<MemoryManagement>,
processing_conversation: Arc<AtomicU64>,
hook: Arc<dyn BrainHook>,
history: Arc<RwLock<VecDeque<Document>>>,
#[allow(dead_code)]
max_input_tokens: usize,
}
impl FormationAgent {
pub const NAME: &'static str = "formation_memory";
pub fn new(
memory: Arc<MemoryManagement>,
hook: Arc<dyn BrainHook>,
max_input_tokens: usize,
) -> Self {
Self {
max_input_tokens,
memory,
processing_conversation: Arc::new(AtomicU64::new(0)),
history: Arc::new(RwLock::new(VecDeque::new())),
hook,
}
}
pub fn is_processing(&self) -> bool {
self.processing_conversation.load(Ordering::SeqCst) != 0
}
pub fn get_processed(&self) -> Option<DocumentId> {
self.memory
.conversations
.get_extension_as::<DocumentId>("brain_processed")
}
pub async fn get_or_init_counterparty(
&self,
counterparty: String,
name: Option<String>,
) -> Result<Json, BoxError> {
let mut attributes = Map::new();
let mut metadata = Map::new();
attributes.insert("id".to_string(), counterparty.clone().into());
attributes.insert("person_class".to_string(), "Human".into());
if let Some(name) = name {
attributes.insert("name".to_string(), name.into());
}
metadata.insert("author".to_string(), "$system".into());
metadata.insert("status".to_string(), "active".into());
let user = self
.memory
.nexus
.get_or_init_concept("Person".to_string(), counterparty, attributes, metadata)
.await?;
Ok(user.to_concept_node())
}
pub async fn start_process(
&self,
ctx: AgentCtx,
conversation: DocumentId,
) -> Result<(), BoxError> {
let current = self.processing_conversation.load(Ordering::SeqCst);
if current != 0 {
return Err(format!(
"FormationAgent is already processing conversation {}",
current
)
.into());
}
if self.hook.is_maintenance_processing() {
return Err(
"MaintenanceAgent is processing, formation will resume when maintenance completes"
.into(),
);
}
let conv = self
.find_next_submitted(conversation.saturating_sub(1))
.await
.ok_or_else(|| {
format!(
"No pending formation conversation found starting from {}",
conversation
)
})?;
self.try_process(ctx, conv);
Ok(())
}
pub fn try_process(&self, ctx: AgentCtx, conversation: Conversation) {
if self
.processing_conversation
.compare_exchange(0, conversation._id, Ordering::SeqCst, Ordering::SeqCst)
.is_err()
{
log::info!(
target: "brain",
"FormationAgent is already processing conversation {}, cannot process conversation {}",
self.processing_conversation.load(Ordering::SeqCst),
conversation._id
);
return;
}
let agent = self.clone();
let pc = self.processing_conversation.clone();
tokio::spawn(async move {
let guard = ProcessingGuard(pc);
agent.process_loop(ctx, conversation).await;
std::mem::forget(guard);
});
}
async fn process_loop(&self, ctx: AgentCtx, mut conversation: Conversation) {
loop {
let conv_id = conversation._id;
self.process_one(&ctx, &mut conversation).await;
self.hook
.on_conversation_end(Self::NAME, &conversation)
.await;
if conversation.status == ConversationStatus::Failed {
tokio::time::sleep(std::time::Duration::from_secs(60)).await; self.process_one(&ctx, &mut conversation).await;
self.hook
.on_conversation_end(Self::NAME, &conversation)
.await;
}
if conversation.status != ConversationStatus::Completed {
log::error!(
target: "brain",
"Conversation {} ended with status {:?}, not marking as processed",
conv_id,
conversation.status
);
self.processing_conversation.store(0, Ordering::SeqCst);
break;
}
self.memory
.conversations
.save_extension("brain_processed".to_string(), conv_id.into())
.await
.ok();
if let Some(id) = self.hook.try_start_maintenance(conv_id).await {
log::info!(
target: "brain",
"Triggered maintenance for conversation {}, new maintenance conversation {}",
conv_id,
id
);
self.processing_conversation.store(0, Ordering::SeqCst);
break; }
match self.find_next_submitted(conv_id).await {
Some(next_conv) => {
if self
.processing_conversation
.compare_exchange(
conv_id,
next_conv._id,
Ordering::SeqCst,
Ordering::SeqCst,
)
.is_ok()
{
conversation = next_conv;
continue;
}
break;
}
None => {
self.processing_conversation.store(0, Ordering::SeqCst);
if let Some(next_conv) = self.find_next_submitted(conv_id).await
&& self
.processing_conversation
.compare_exchange(0, next_conv._id, Ordering::SeqCst, Ordering::SeqCst)
.is_ok()
{
conversation = next_conv;
continue;
}
break;
}
}
}
}
async fn find_next_submitted(&self, after_id: u64) -> Option<Conversation> {
let mut id = after_id;
while id < self.memory.max_conversation_id() {
id += 1;
match self.memory.get_conversation(id).await {
Ok(conv) => {
if conv.status == ConversationStatus::Completed
|| conv.status == ConversationStatus::Cancelled
{
continue;
}
if let Some(label) = &conv.label
&& label != "formation"
{
continue; }
return Some(conv);
}
_ => continue,
}
}
None
}
async fn mark_conversation_failed(&self, conversation: &mut Conversation, reason: String) {
log::error!(target: "brain", "Conversation {} failed: {}", conversation._id, reason);
conversation.failed_reason = Some(reason);
conversation.status = ConversationStatus::Failed;
conversation.updated_at = unix_ms();
if let Ok(changes) = conversation.to_changes() {
let _ = self
.memory
.update_conversation(conversation._id, changes)
.await;
}
}
async fn process_one(&self, ctx: &AgentCtx, conversation: &mut Conversation) {
let prompt = match conversation
.messages
.first()
.and_then(|v| serde_json::from_value::<Message>(v.clone()).ok())
.and_then(|v| v.text())
{
Some(p) => p,
None => {
self.mark_conversation_failed(conversation, "No prompt found".to_string())
.await;
return;
}
};
let counterparty_info = if let Ok(input) = serde_json::from_str::<FormationInput>(&prompt)
&& let Some(ctx) = input.context
&& let Some(counterparty) = ctx.counterparty
{
self.get_or_init_counterparty(counterparty, None).await.ok()
} else {
None
};
let now_ms = unix_ms();
let chat_history: Vec<Document> = { self.history.read().iter().cloned().collect() };
let chat_history = if chat_history.is_empty() {
vec![]
} else {
vec![Message {
role: "user".into(),
content: vec![
Documents::new("history_formation".to_string(), chat_history)
.to_string()
.into(),
],
name: Some("$system".into()),
timestamp: Some(now_ms),
..Default::default()
}]
};
let primer = self.memory.describe_primer().await.unwrap_or_default();
let notes = load_notes(ctx).await.unwrap_or_default();
let should_review = prompt.len() >= 10000;
let mut runner = ctx.clone().completion_iter(
CompletionRequest {
instructions: format!(
"{}\n\n---\n\n# `DESCRIBE PRIMER` Result:\n{}\n\n---\n\n# Your Notes:\n{}\n\n# Counterparty Profile:\n{}\n\n# Current Datetime: {}",
SELF_INSTRUCTIONS,
primer,
serde_json::to_string(¬es.notes).unwrap_or_default(),
serde_json::to_string(&counterparty_info).unwrap_or_default(),
local_date_hour(now_ms).unwrap_or_default()
),
prompt,
chat_history,
tools: ctx.tool_definitions(Some(&self.tool_dependencies())),
tool_choice_required: true,
..Default::default()
},
vec![],
);
if should_review {
runner.follow_up(REVIEW_INSTRUCTIONS.to_string());
}
let mut first_round = true;
loop {
match runner.next().await {
Ok(None) => break,
Ok(Some(mut res)) => {
let now_ms = unix_ms();
let is_done = runner.is_done();
if first_round {
first_round = false;
conversation.messages.clear();
conversation.append_messages(res.chat_history);
} else {
let existing_len = conversation.messages.len();
if res.chat_history.len() >= existing_len {
res.chat_history.drain(0..existing_len);
conversation.append_messages(res.chat_history);
} else {
conversation.messages.clear();
conversation.append_messages(res.chat_history);
}
}
conversation.status = if res.failed_reason.is_some() {
ConversationStatus::Failed
} else if is_done {
ConversationStatus::Completed
} else {
ConversationStatus::Working
};
conversation.usage = res.usage;
conversation.updated_at = now_ms;
if let Some(failed_reason) = res.failed_reason {
conversation.failed_reason = Some(failed_reason);
} else {
let doc: Document = conversation.clone().into();
let mut history = self.history.write();
history.push_back(doc);
let len = history.len();
if len > 2 {
history.drain(0..(len - 2));
}
}
match conversation.to_changes() {
Ok(changes) => {
let _ = self
.memory
.update_conversation(conversation._id, changes)
.await;
}
Err(err) => {
log::error!(
target: "brain",
"Failed to serialize formation conversation {} changes: {:?}",
conversation._id,
err
);
}
}
if conversation.status == ConversationStatus::Cancelled
|| conversation.status == ConversationStatus::Failed
{
break;
}
}
Err(err) => {
self.mark_conversation_failed(
conversation,
format!("CompletionRunner error: {err:?}"),
)
.await;
break;
}
}
}
}
}
impl Agent<AgentCtx> for FormationAgent {
fn name(&self) -> String {
Self::NAME.to_string()
}
fn description(&self) -> String {
"Receives conversation messages and encodes them into structured memory within the Cognitive Nexus via KIP.".to_string()
}
fn tool_dependencies(&self) -> Vec<String> {
vec![self.memory.name(), NoteTool::NAME.to_string()]
}
async fn run(
&self,
ctx: AgentCtx,
prompt: String, _resources: Vec<Resource>,
) -> Result<AgentOutput, BoxError> {
let caller = ctx.caller();
let now_ms = unix_ms();
let token_count = estimate_tokens(&prompt);
if token_count > self.max_input_tokens {
return Err(format!(
"Input too large: {} tokens (estimated), max allowed is {} tokens",
token_count, self.max_input_tokens
)
.into());
}
let mut conversation = Conversation {
user: *caller,
messages: vec![json!(Message {
role: "user".into(),
content: vec![prompt.into()],
..Default::default()
})],
period: now_ms / 3600 / 1000,
created_at: now_ms,
updated_at: now_ms,
label: Some("formation".to_string()),
..Default::default()
};
let id = self
.memory
.add_conversation(ConversationRef::from(&conversation))
.await?;
conversation._id = id;
let res = AgentOutput {
conversation: Some(id),
..Default::default()
};
let is_idle = self.processing_conversation.load(Ordering::SeqCst) == 0;
if is_idle {
if self.hook.is_maintenance_processing() {
log::info!(
target: "brain",
conversation = id;
"Formation queued while maintenance is processing"
);
} else {
if let Some(prev_id) = self.get_processed()
&& prev_id + 1 < id
{
if let Some(conv) = self.find_next_submitted(prev_id).await {
self.try_process(ctx, conv);
}
} else {
self.try_process(ctx, conversation);
}
}
}
Ok(res)
}
}