use anda_core::{
Agent, AgentContext, AgentOutput, BoxError, CompletionRequest, Document, Documents, Message,
Resource, StateFeatures,
};
use anda_db::schema::DocumentId;
use anda_engine::{
context::AgentCtx,
extension::note::{NoteTool, load_notes, load_notes_from_legacy},
local_date_hour,
memory::{Conversation, ConversationRef, ConversationStatus, Conversations, MemoryManagement},
unix_ms,
};
use parking_lot::RwLock;
use serde_json::json;
use std::{
collections::VecDeque,
sync::{
Arc,
atomic::{AtomicBool, Ordering},
},
};
use super::{BrainHook, SELF_USER_ID, push_completed_history};
use crate::types::{MaintenanceAt, MaintenanceInput, MaintenanceScope};
const SELF_INSTRUCTIONS: &str = include_str!("../../assets/BrainMaintenance.md");
struct ProcessingGuard(Arc<AtomicBool>);
impl Drop for ProcessingGuard {
fn drop(&mut self) {
self.0.store(false, Ordering::SeqCst);
}
}
#[derive(Clone)]
pub struct MaintenanceAgent {
pub conversations: Conversations,
memory: Arc<MemoryManagement>,
processing: Arc<AtomicBool>,
hook: Arc<dyn BrainHook>,
history: Arc<RwLock<VecDeque<Document>>>,
}
impl MaintenanceAgent {
pub const NAME: &'static str = "maintenance_memory";
pub fn new(
memory: Arc<MemoryManagement>,
conversations: Conversations,
hook: Arc<dyn BrainHook>,
) -> Self {
Self {
memory,
conversations,
processing: Arc::new(AtomicBool::new(false)),
hook,
history: Arc::new(RwLock::new(VecDeque::new())),
}
}
pub async fn init(&self) -> Result<(), BoxError> {
let (conversations, _) = self
.conversations
.list_conversations_by_user(&SELF_USER_ID, None, Some(2))
.await?;
*self.history.write() = conversations
.into_iter()
.filter(|c| c.status == ConversationStatus::Completed)
.rev()
.map(Document::from)
.collect();
Ok(())
}
pub fn is_processing(&self) -> bool {
self.processing.load(Ordering::SeqCst)
}
pub fn get_processed(&self) -> Option<DocumentId> {
match self.conversations.conversations.max_document_id() {
0 => None,
id => Some(id),
}
}
pub fn get_processed_at(&self) -> MaintenanceAt {
let mut rt = MaintenanceAt::default();
self.conversations.conversations.extensions_with(|kv| {
if let Some(v) = kv.get("full")
&& let Ok(id) = v.try_into()
{
rt.full = id;
}
if let Some(v) = kv.get("quick")
&& let Ok(id) = v.try_into()
{
rt.quick = id;
}
if let Some(v) = kv.get("daydream")
&& let Ok(id) = v.try_into()
{
rt.daydream = id;
}
});
rt
}
pub async fn set_processed_at(
&self,
scope: MaintenanceScope,
formation_id: DocumentId,
) -> Result<(), BoxError> {
self.conversations
.conversations
.save_extension_from(scope.to_string(), &formation_id)
.await?;
Ok(())
}
}
impl Agent<AgentCtx> for MaintenanceAgent {
fn name(&self) -> String {
Self::NAME.to_string()
}
fn description(&self) -> String {
"The Brain Maintenance agent operates in Sleep Mode — performing memory metabolism including consolidation, organization, pruning, and health optimization of the Cognitive Nexus during scheduled maintenance cycles.".to_string()
}
fn tool_dependencies(&self) -> Vec<String> {
vec!["execute_kip".to_string(), NoteTool::NAME.to_string()]
}
async fn run(
&self,
ctx: AgentCtx,
prompt: String, _resources: Vec<Resource>,
) -> Result<AgentOutput, BoxError> {
let maintenance_input = serde_json::from_str::<MaintenanceInput>(&prompt)
.map_err(|err| format!("invalid MaintenanceInput: {err}"))?;
if self
.processing
.compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
.is_err()
{
return Ok(AgentOutput {
content: "Maintenance cycle is already in progress.".to_string(),
..Default::default()
});
}
let guard = ProcessingGuard(self.processing.clone());
let caller = ctx.caller();
let now_ms = unix_ms();
let mut conversation = Conversation {
user: *caller,
messages: vec![json!(Message {
role: "user".into(),
content: vec![prompt.into()],
..Default::default()
})],
status: ConversationStatus::Working,
period: now_ms / 3600 / 1000,
created_at: now_ms,
updated_at: now_ms,
label: Some("maintenance".to_string()),
..Default::default()
};
let id = self
.conversations
.add_conversation(ConversationRef::from(&conversation))
.await?;
conversation._id = id;
let agent = self.clone();
let ctx_clone = ctx.clone();
tokio::spawn(async move {
{
let _guard = guard;
agent.process_one(&ctx_clone, &mut conversation).await;
if conversation.status == ConversationStatus::Completed
&& let Err(err) = agent
.set_processed_at(maintenance_input.scope, maintenance_input.formation_id)
.await
{
log::error!(
target: "brain",
conversation = conversation._id,
formation_id = maintenance_input.formation_id;
"failed to persist maintenance processed marker: {err:?}"
);
}
agent
.hook
.on_conversation_end(MaintenanceAgent::NAME, &conversation)
.await;
}
agent.hook.try_start_formation().await;
});
Ok(AgentOutput {
conversation: Some(id),
..Default::default()
})
}
}
impl MaintenanceAgent {
async fn mark_conversation_failed(&self, conversation: &mut Conversation, reason: String) {
log::error!(
target: "brain",
"Maintenance conversation {} failed: {}",
conversation._id,
reason
);
conversation.failed_reason = Some(reason);
conversation.status = ConversationStatus::Failed;
conversation.updated_at = unix_ms();
if let Ok(changes) = conversation.to_changes() {
let _ = self
.conversations
.update_conversation(conversation._id, changes)
.await;
}
}
async fn process_one(&self, ctx: &AgentCtx, conversation: &mut Conversation) {
let prompt = match conversation
.messages
.first()
.and_then(|v| serde_json::from_value::<Message>(v.clone()).ok())
.and_then(|v| v.text())
{
Some(p) => p,
None => {
self.mark_conversation_failed(conversation, "No prompt found".to_string())
.await;
return;
}
};
let primer = self.memory.describe_primer().await.unwrap_or_default();
let now_ms = unix_ms();
let chat_history: Vec<Document> = { self.history.read().iter().cloned().collect() };
let chat_history = if chat_history.is_empty() {
vec![]
} else {
vec![Message {
role: "user".into(),
content: vec![
Documents::new("history_maintenance".to_string(), chat_history)
.to_string()
.into(),
],
name: Some("$system".into()),
timestamp: Some(now_ms),
..Default::default()
}]
};
let notes = match load_notes(ctx).await {
Some(n) => n,
None => load_notes_from_legacy(ctx).await.unwrap_or_default(),
};
let mut runner = ctx.clone().completion_iter(
CompletionRequest {
instructions: format!(
"{}\n\n---\n\n# `DESCRIBE PRIMER` Result:\n{}\n\n---\n\n# Your Notes:\n{}\n\n# Current Datetime: {}",
SELF_INSTRUCTIONS,
primer,
serde_json::to_string(¬es.items).unwrap_or_default(),
local_date_hour(now_ms).unwrap_or_default()
),
prompt,
chat_history,
tools: ctx.tool_definitions(Some(&self.tool_dependencies())),
tool_choice_required: true,
..Default::default()
},
vec![],
);
let mut first_round = true;
loop {
match runner.next().await {
Ok(None) => break,
Ok(Some(mut res)) => {
let now_ms = unix_ms();
let is_done = runner.is_done();
if res.chat_history.is_empty() {
} else if first_round {
first_round = false;
conversation.messages.clear();
conversation.append_messages(res.chat_history);
} else {
let existing_len = conversation.messages.len();
if res.chat_history.len() >= existing_len {
res.chat_history.drain(0..existing_len);
conversation.append_messages(res.chat_history);
} else {
conversation.messages.clear();
conversation.append_messages(res.chat_history);
}
}
conversation.status = if res.failed_reason.is_some() {
ConversationStatus::Failed
} else if is_done {
ConversationStatus::Completed
} else {
ConversationStatus::Working
};
conversation.usage = res.usage;
conversation.updated_at = now_ms;
if let Some(failed_reason) = res.failed_reason {
conversation.failed_reason = Some(failed_reason);
} else {
push_completed_history(&self.history, conversation, 2);
}
match conversation.to_changes() {
Ok(changes) => {
let _ = self
.conversations
.update_conversation(conversation._id, changes)
.await;
}
Err(err) => {
log::error!(
target: "brain",
"Failed to serialize maintenance conversation {} changes: {:?}",
conversation._id,
err
);
}
}
if conversation.status == ConversationStatus::Cancelled
|| conversation.status == ConversationStatus::Failed
{
break;
}
}
Err(err) => {
self.mark_conversation_failed(
conversation,
format!("CompletionRunner error: {err:?}"),
)
.await;
break;
}
}
}
}
}
#[cfg(test)]
mod tests {
use super::{MaintenanceAgent, ProcessingGuard};
use crate::{
agents::SELF_USER_ID,
space::AppState,
types::{MaintenanceInput, MaintenanceScope},
};
use anda_core::{
Agent, AgentOutput, BoxError, BoxPinFut, CompletionRequest, Message, Principal,
};
use anda_db::{database::DBConfig, storage::StorageConfig};
use anda_engine::{
context::AgentCtx,
management::{BaseManagement, Visibility},
memory::{Conversation, ConversationRef, ConversationStatus},
model::{CompletionFeaturesDyn, Model, Models, reqwest},
unix_ms,
};
use object_store::memory::InMemory;
use serde_json::json;
use std::collections::BTreeSet;
use std::sync::{
Arc,
atomic::{AtomicBool, Ordering},
};
#[derive(Debug)]
struct FinalCompleter;
impl CompletionFeaturesDyn for FinalCompleter {
fn model_name(&self) -> String {
"maintenance-final-test-model".to_string()
}
fn completion(&self, req: CompletionRequest) -> BoxPinFut<Result<AgentOutput, BoxError>> {
Box::pin(async move {
Ok(AgentOutput {
content: "maintained".to_string(),
chat_history: vec![Message {
role: "assistant".to_string(),
content: vec![format!("maintained: {}", req.prompt).into()],
..Default::default()
}],
..Default::default()
})
})
}
}
#[derive(Debug)]
struct FailedReasonCompleter;
impl CompletionFeaturesDyn for FailedReasonCompleter {
fn model_name(&self) -> String {
"maintenance-failed-reason-test-model".to_string()
}
fn completion(&self, _req: CompletionRequest) -> BoxPinFut<Result<AgentOutput, BoxError>> {
Box::pin(async move {
Ok(AgentOutput {
failed_reason: Some("maintenance failed".to_string()),
chat_history: vec![Message {
role: "assistant".to_string(),
content: vec!["maintenance failure".to_string().into()],
..Default::default()
}],
..Default::default()
})
})
}
}
#[derive(Debug)]
struct ErrorCompleter;
impl CompletionFeaturesDyn for ErrorCompleter {
fn model_name(&self) -> String {
"maintenance-error-test-model".to_string()
}
fn completion(&self, _req: CompletionRequest) -> BoxPinFut<Result<AgentOutput, BoxError>> {
Box::pin(async move { Err("model error".into()) })
}
}
fn test_db_config(name: &str) -> DBConfig {
DBConfig {
name: name.to_string(),
description: "test database".to_string(),
storage: StorageConfig::default(),
lock: None,
}
}
fn test_app_state_with_completer<C>(name: &str, completer: C) -> AppState
where
C: CompletionFeaturesDyn,
{
let models = Models::default();
models.set_model(Model::with_completer(Arc::new(completer)));
let management = Arc::new(BaseManagement {
controller: SELF_USER_ID,
managers: BTreeSet::new(),
visibility: Visibility::Public,
});
let http_client = reqwest::Client::builder().build().unwrap();
AppState::new(
Arc::new(InMemory::new()),
Arc::new(test_db_config(name)),
management,
http_client,
Arc::new(models),
Arc::new(vec![]),
"anda_brain".to_string(),
"test".to_string(),
0,
)
}
async fn create_loaded_space(app: &AppState, id: &str) -> Arc<crate::space::Space> {
app.admin_create_space(
Principal::from_slice(&[1]),
Principal::from_slice(&[2]),
id.to_string(),
1,
123,
)
.await
.unwrap();
app.load_space(id, false).await.unwrap()
}
fn maintenance_prompt(scope: MaintenanceScope) -> String {
serde_json::to_string(&MaintenanceInput {
scope,
formation_id: 99,
..Default::default()
})
.unwrap()
}
async fn stored_conversation(
agent: &MaintenanceAgent,
messages: Vec<serde_json::Value>,
) -> Conversation {
let now = unix_ms();
let mut conversation = Conversation {
user: SELF_USER_ID,
status: ConversationStatus::Submitted,
messages,
label: Some("maintenance".to_string()),
created_at: now,
updated_at: now,
..Default::default()
};
let id = agent
.conversations
.add_conversation(ConversationRef::from(&conversation))
.await
.unwrap();
conversation._id = id;
conversation
}
#[test]
fn processing_guard_resets_processing_flag_on_drop() {
let processing = Arc::new(AtomicBool::new(true));
{
let _guard = ProcessingGuard(processing.clone());
assert!(processing.load(Ordering::SeqCst));
}
assert!(!processing.load(Ordering::SeqCst));
}
#[test]
fn maintenance_agent_name_matches_registered_agent_name() {
assert_eq!(MaintenanceAgent::NAME, "maintenance_memory");
}
#[tokio::test]
async fn maintenance_agent_trait_metadata_and_processed_markers() {
let app = test_app_state_with_completer("maintenance_trait", FinalCompleter);
let space = create_loaded_space(&app, "maintenance_trait").await;
let maintenance = space.maintenance_for_test();
assert_eq!(
Agent::<AgentCtx>::name(maintenance.as_ref()),
MaintenanceAgent::NAME
);
assert!(Agent::<AgentCtx>::description(maintenance.as_ref()).contains("Sleep Mode"));
let tools = Agent::<AgentCtx>::tool_dependencies(maintenance.as_ref());
assert!(tools.iter().any(|name| name == "execute_kip"));
assert!(tools.iter().any(|name| name == "note"));
assert_eq!(maintenance.get_processed(), None);
maintenance
.set_processed_at(MaintenanceScope::Quick, 7)
.await
.unwrap();
assert_eq!(maintenance.get_processed_at().quick, 7);
}
#[tokio::test]
async fn init_restores_history_in_oldest_first_order() {
let app = test_app_state_with_completer("maintenance_init_order", FinalCompleter);
let space = create_loaded_space(&app, "maintenance_init_order").await;
let maintenance = space.maintenance_for_test();
for _ in 0..3 {
let conversation = Conversation {
user: SELF_USER_ID,
status: ConversationStatus::Completed,
label: Some("maintenance".to_string()),
..Default::default()
};
maintenance
.conversations
.add_conversation(ConversationRef::from(&conversation))
.await
.unwrap();
}
maintenance.init().await.unwrap();
let ids: Vec<u64> = maintenance
.history
.read()
.iter()
.map(|doc| doc.metadata.get("_id").and_then(|v| v.as_u64()).unwrap())
.collect();
assert_eq!(ids, vec![2, 3]);
}
#[tokio::test]
async fn run_rejects_invalid_maintenance_input_before_processing() {
let app = test_app_state_with_completer("maintenance_invalid_input", FinalCompleter);
let space = create_loaded_space(&app, "maintenance_invalid_input").await;
let maintenance = space.maintenance_for_test();
let ctx = space
.ctx_for_test(SELF_USER_ID, MaintenanceAgent::NAME)
.unwrap();
let err =
Agent::<AgentCtx>::run(maintenance.as_ref(), ctx, "not a json".to_string(), vec![])
.await
.unwrap_err();
assert!(err.to_string().contains("invalid MaintenanceInput"));
assert!(!maintenance.is_processing());
assert_eq!(maintenance.conversations.conversations.len(), 0);
}
#[tokio::test]
async fn mark_conversation_failed_persists_status_and_reason() {
let app = test_app_state_with_completer("maintenance_mark_failed", FinalCompleter);
let space = create_loaded_space(&app, "maintenance_mark_failed").await;
let maintenance = space.maintenance_for_test();
let mut conversation = stored_conversation(&maintenance, vec![]).await;
maintenance
.mark_conversation_failed(&mut conversation, "boom".to_string())
.await;
assert_eq!(conversation.status, ConversationStatus::Failed);
assert_eq!(conversation.failed_reason.as_deref(), Some("boom"));
let stored = maintenance
.conversations
.get_conversation(conversation._id)
.await
.unwrap();
assert_eq!(stored.status, ConversationStatus::Failed);
assert_eq!(stored.failed_reason.as_deref(), Some("boom"));
}
#[tokio::test]
async fn process_one_marks_missing_prompt_and_completion_errors() {
let app = test_app_state_with_completer("maintenance_no_prompt", FinalCompleter);
let space = create_loaded_space(&app, "maintenance_no_prompt").await;
let maintenance = space.maintenance_for_test();
let ctx = space
.ctx_for_test(SELF_USER_ID, MaintenanceAgent::NAME)
.unwrap();
let mut no_prompt = stored_conversation(&maintenance, vec![]).await;
maintenance.process_one(&ctx, &mut no_prompt).await;
assert_eq!(no_prompt.status, ConversationStatus::Failed);
assert_eq!(no_prompt.failed_reason.as_deref(), Some("No prompt found"));
let app = test_app_state_with_completer("maintenance_model_error", ErrorCompleter);
let space = create_loaded_space(&app, "maintenance_model_error").await;
let maintenance = space.maintenance_for_test();
let ctx = space
.ctx_for_test(SELF_USER_ID, MaintenanceAgent::NAME)
.unwrap();
let mut conversation = stored_conversation(
&maintenance,
vec![json!(Message {
role: "user".to_string(),
content: vec![maintenance_prompt(MaintenanceScope::Quick).into()],
..Default::default()
})],
)
.await;
maintenance.process_one(&ctx, &mut conversation).await;
assert_eq!(conversation.status, ConversationStatus::Failed);
assert!(
conversation
.failed_reason
.as_deref()
.unwrap_or_default()
.contains("CompletionRunner error")
);
}
#[tokio::test]
async fn process_one_uses_history_and_persists_failed_reason() {
let app = test_app_state_with_completer("maintenance_history", FinalCompleter);
let space = create_loaded_space(&app, "maintenance_history").await;
let maintenance = space.maintenance_for_test();
let ctx = space
.ctx_for_test(SELF_USER_ID, MaintenanceAgent::NAME)
.unwrap();
let mut first = stored_conversation(
&maintenance,
vec![json!(Message {
role: "user".to_string(),
content: vec![maintenance_prompt(MaintenanceScope::Quick).into()],
..Default::default()
})],
)
.await;
maintenance.process_one(&ctx, &mut first).await;
assert_eq!(first.status, ConversationStatus::Completed);
let mut second = stored_conversation(
&maintenance,
vec![json!(Message {
role: "user".to_string(),
content: vec![maintenance_prompt(MaintenanceScope::Full).into()],
..Default::default()
})],
)
.await;
maintenance.process_one(&ctx, &mut second).await;
assert_eq!(second.status, ConversationStatus::Completed);
let app = test_app_state_with_completer("maintenance_failed_reason", FailedReasonCompleter);
let space = create_loaded_space(&app, "maintenance_failed_reason").await;
let maintenance = space.maintenance_for_test();
let ctx = space
.ctx_for_test(SELF_USER_ID, MaintenanceAgent::NAME)
.unwrap();
let mut failed = stored_conversation(
&maintenance,
vec![json!(Message {
role: "user".to_string(),
content: vec![maintenance_prompt(MaintenanceScope::Daydream).into()],
..Default::default()
})],
)
.await;
maintenance.process_one(&ctx, &mut failed).await;
assert_eq!(failed.status, ConversationStatus::Failed);
assert_eq!(failed.failed_reason.as_deref(), Some("maintenance failed"));
}
}