use anda_core::{
Agent, AgentContext, AgentOutput, BoxError, CompletionRequest, Document, Documents, Message,
Resource, StateFeatures, Tool, estimate_tokens,
};
use anda_db::{
query::Fv,
schema::{DocumentId, Json, Map},
};
use anda_engine::{
context::AgentCtx,
extension::note::{NoteTool, load_notes, load_notes_from_legacy},
local_date_hour,
memory::{Conversation, ConversationRef, ConversationStatus, MemoryManagement},
unix_ms,
};
use parking_lot::RwLock;
use serde_json::json;
use std::{
collections::VecDeque,
sync::{
Arc,
atomic::{AtomicU64, Ordering},
},
};
use super::{BrainHook, push_completed_history};
use crate::types::FormationInput;
const SELF_INSTRUCTIONS: &str = include_str!("../../assets/BrainFormation.md");
const REVIEW_INSTRUCTIONS: &str = include_str!("../../assets/BrainFormationReview.md");
struct ProcessingGuard(Arc<AtomicU64>);
impl Drop for ProcessingGuard {
fn drop(&mut self) {
self.0.store(0, Ordering::SeqCst);
}
}
#[derive(Clone)]
pub struct FormationAgent {
memory: Arc<MemoryManagement>,
processing_conversation: Arc<AtomicU64>,
hook: Arc<dyn BrainHook>,
history: Arc<RwLock<VecDeque<Document>>>,
#[allow(dead_code)]
max_input_tokens: usize,
}
impl FormationAgent {
pub const NAME: &'static str = "formation_memory";
pub fn new(
memory: Arc<MemoryManagement>,
hook: Arc<dyn BrainHook>,
max_input_tokens: usize,
) -> Self {
Self {
max_input_tokens,
memory,
processing_conversation: Arc::new(AtomicU64::new(0)),
history: Arc::new(RwLock::new(VecDeque::new())),
hook,
}
}
pub fn is_processing(&self) -> bool {
self.processing_conversation.load(Ordering::SeqCst) != 0
}
pub fn get_processed(&self) -> Option<DocumentId> {
self.memory
.conversations
.get_extension_as::<DocumentId>("brain_processed")
}
pub async fn get_or_init_counterparty(
&self,
counterparty: String,
name: Option<String>,
) -> Result<Json, BoxError> {
let mut attributes = Map::new();
let mut metadata = Map::new();
attributes.insert("id".to_string(), counterparty.clone().into());
attributes.insert("person_class".to_string(), "Human".into());
if let Some(name) = name {
attributes.insert("name".to_string(), name.into());
}
metadata.insert("author".to_string(), "$system".into());
metadata.insert("status".to_string(), "active".into());
let user = self
.memory
.nexus
.get_or_init_concept("Person".to_string(), counterparty, attributes, metadata)
.await?;
Ok(user.to_concept_node())
}
pub async fn start_process(
&self,
ctx: AgentCtx,
conversation: DocumentId,
) -> Result<(), BoxError> {
let current = self.processing_conversation.load(Ordering::SeqCst);
if current != 0 {
return Err(format!(
"FormationAgent is already processing conversation {}",
current
)
.into());
}
if self.hook.is_maintenance_processing() {
return Err(
"MaintenanceAgent is processing, formation will resume when maintenance completes"
.into(),
);
}
let conv = self
.find_next_submitted(conversation.saturating_sub(1))
.await
.ok_or_else(|| {
format!(
"No pending formation conversation found starting from {}",
conversation
)
})?;
self.try_process(ctx, conv);
Ok(())
}
pub fn try_process(&self, ctx: AgentCtx, conversation: Conversation) {
if self
.processing_conversation
.compare_exchange(0, conversation._id, Ordering::SeqCst, Ordering::SeqCst)
.is_err()
{
log::info!(
target: "brain",
"FormationAgent is already processing conversation {}, cannot process conversation {}",
self.processing_conversation.load(Ordering::SeqCst),
conversation._id
);
return;
}
let agent = self.clone();
let pc = self.processing_conversation.clone();
tokio::spawn(async move {
let guard = ProcessingGuard(pc);
agent.process_loop(ctx, conversation).await;
std::mem::forget(guard);
});
}
async fn process_loop(&self, ctx: AgentCtx, mut conversation: Conversation) {
loop {
let conv_id = conversation._id;
self.process_one(&ctx, &mut conversation).await;
self.hook
.on_conversation_end(Self::NAME, &conversation)
.await;
if conversation.status == ConversationStatus::Failed {
tokio::time::sleep(std::time::Duration::from_secs(60)).await; self.process_one(&ctx, &mut conversation).await;
self.hook
.on_conversation_end(Self::NAME, &conversation)
.await;
}
if conversation.status != ConversationStatus::Completed {
log::error!(
target: "brain",
"Conversation {} ended with status {:?}, not marking as processed",
conv_id,
conversation.status
);
self.processing_conversation.store(0, Ordering::SeqCst);
break;
}
self.memory
.conversations
.save_extension("brain_processed".to_string(), conv_id.into())
.await
.ok();
if let Some(id) = self.hook.try_start_maintenance(conv_id).await {
log::info!(
target: "brain",
"Triggered maintenance for conversation {}, new maintenance conversation {}",
conv_id,
id
);
self.processing_conversation.store(0, Ordering::SeqCst);
break; }
match self.find_next_submitted(conv_id).await {
Some(next_conv) => {
if self
.processing_conversation
.compare_exchange(
conv_id,
next_conv._id,
Ordering::SeqCst,
Ordering::SeqCst,
)
.is_ok()
{
conversation = next_conv;
continue;
}
break;
}
None => {
self.processing_conversation.store(0, Ordering::SeqCst);
if let Some(next_conv) = self.find_next_submitted(conv_id).await
&& self
.processing_conversation
.compare_exchange(0, next_conv._id, Ordering::SeqCst, Ordering::SeqCst)
.is_ok()
{
conversation = next_conv;
continue;
}
break;
}
}
}
}
async fn find_next_submitted(&self, after_id: u64) -> Option<Conversation> {
let mut id = after_id;
while id < self.memory.max_conversation_id() {
id += 1;
match self.memory.get_conversation(id).await {
Ok(conv) => {
if conv.status == ConversationStatus::Completed
|| conv.status == ConversationStatus::Cancelled
{
continue;
}
if let Some(label) = &conv.label
&& label != "formation"
{
continue; }
return Some(conv);
}
_ => continue,
}
}
None
}
async fn mark_conversation_failed(&self, conversation: &mut Conversation, reason: String) {
log::error!(target: "brain", "Conversation {} failed: {}", conversation._id, reason);
conversation.failed_reason = Some(reason);
conversation.status = ConversationStatus::Failed;
conversation.updated_at = unix_ms();
if let Ok(changes) = conversation.to_changes() {
let _ = self
.memory
.update_conversation(conversation._id, changes)
.await;
}
}
async fn process_one(&self, ctx: &AgentCtx, conversation: &mut Conversation) {
let prompt = match conversation
.messages
.first()
.and_then(|v| serde_json::from_value::<Message>(v.clone()).ok())
.and_then(|v| v.text())
{
Some(p) => p,
None => {
self.mark_conversation_failed(conversation, "No prompt found".to_string())
.await;
return;
}
};
let counterparty_info = if let Ok(input) = serde_json::from_str::<FormationInput>(&prompt)
&& let Some(ctx) = input.context
&& let Some(counterparty) = ctx.counterparty
{
self.get_or_init_counterparty(counterparty, None).await.ok()
} else {
None
};
let now_ms = unix_ms();
let chat_history: Vec<Document> = { self.history.read().iter().cloned().collect() };
let chat_history = if chat_history.is_empty() {
vec![]
} else {
vec![Message {
role: "user".into(),
content: vec![
Documents::new("history_formation".to_string(), chat_history)
.to_string()
.into(),
],
name: Some("$system".into()),
timestamp: Some(now_ms),
..Default::default()
}]
};
let primer = self.memory.describe_primer().await.unwrap_or_default();
let notes = match load_notes(ctx).await {
Some(n) => n,
None => load_notes_from_legacy(ctx).await.unwrap_or_default(),
};
let should_review = prompt.len() >= 10000;
let mut runner = ctx.clone().completion_iter(
CompletionRequest {
instructions: format!(
"{}\n\n---\n\n# `DESCRIBE PRIMER` Result:\n{}\n\n---\n\n# Your Notes:\n{}\n\n# Counterparty Profile:\n{}\n\n# Current Datetime: {}",
SELF_INSTRUCTIONS,
primer,
serde_json::to_string(¬es.items).unwrap_or_default(),
serde_json::to_string(&counterparty_info).unwrap_or_default(),
local_date_hour(now_ms).unwrap_or_default()
),
prompt,
chat_history,
tools: ctx.tool_definitions(Some(&self.tool_dependencies())),
tool_choice_required: true,
..Default::default()
},
vec![],
);
if should_review {
runner.follow_up(REVIEW_INSTRUCTIONS.to_string());
}
let mut first_round = true;
loop {
match runner.next().await {
Ok(None) => break,
Ok(Some(mut res)) => {
let now_ms = unix_ms();
let is_done = runner.is_done();
if first_round {
first_round = false;
conversation.messages.clear();
conversation.append_messages(res.chat_history);
} else {
let existing_len = conversation.messages.len();
if res.chat_history.len() >= existing_len {
res.chat_history.drain(0..existing_len);
conversation.append_messages(res.chat_history);
} else {
conversation.messages.clear();
conversation.append_messages(res.chat_history);
}
}
conversation.status = if res.failed_reason.is_some() {
ConversationStatus::Failed
} else if is_done {
ConversationStatus::Completed
} else {
ConversationStatus::Working
};
conversation.usage = res.usage;
conversation.updated_at = now_ms;
if let Some(failed_reason) = res.failed_reason {
conversation.failed_reason = Some(failed_reason);
} else {
conversation.failed_reason = None;
push_completed_history(&self.history, conversation, 2);
}
match conversation.to_changes() {
Ok(mut changes) => {
if conversation.failed_reason.is_none() {
changes.insert("failed_reason".to_string(), Fv::Null);
}
let _ = self
.memory
.update_conversation(conversation._id, changes)
.await;
}
Err(err) => {
log::error!(
target: "brain",
"Failed to serialize formation conversation {} changes: {:?}",
conversation._id,
err
);
}
}
if conversation.status == ConversationStatus::Cancelled
|| conversation.status == ConversationStatus::Failed
{
break;
}
}
Err(err) => {
self.mark_conversation_failed(
conversation,
format!("CompletionRunner error: {err:?}"),
)
.await;
break;
}
}
}
}
}
impl Agent<AgentCtx> for FormationAgent {
fn name(&self) -> String {
Self::NAME.to_string()
}
fn description(&self) -> String {
"Receives conversation messages and encodes them into structured memory within the Cognitive Nexus via KIP.".to_string()
}
fn tool_dependencies(&self) -> Vec<String> {
vec![self.memory.name(), NoteTool::NAME.to_string()]
}
async fn run(
&self,
ctx: AgentCtx,
prompt: String, _resources: Vec<Resource>,
) -> Result<AgentOutput, BoxError> {
let caller = ctx.caller();
let now_ms = unix_ms();
let token_count = estimate_tokens(&prompt);
if token_count > self.max_input_tokens {
return Err(format!(
"Input too large: {} tokens (estimated), max allowed is {} tokens",
token_count, self.max_input_tokens
)
.into());
}
let mut conversation = Conversation {
user: *caller,
messages: vec![json!(Message {
role: "user".into(),
content: vec![prompt.into()],
..Default::default()
})],
period: now_ms / 3600 / 1000,
created_at: now_ms,
updated_at: now_ms,
label: Some("formation".to_string()),
..Default::default()
};
let id = self
.memory
.add_conversation(ConversationRef::from(&conversation))
.await?;
conversation._id = id;
let res = AgentOutput {
conversation: Some(id),
..Default::default()
};
let is_idle = self.processing_conversation.load(Ordering::SeqCst) == 0;
if is_idle {
if self.hook.is_maintenance_processing() {
log::info!(
target: "brain",
conversation = id;
"Formation queued while maintenance is processing"
);
} else {
if let Some(prev_id) = self.get_processed()
&& prev_id + 1 < id
{
if let Some(conv) = self.find_next_submitted(prev_id).await {
self.try_process(ctx, conv);
}
} else {
self.try_process(ctx, conversation);
}
}
}
Ok(res)
}
}
#[cfg(test)]
mod tests {
use super::{FormationAgent, ProcessingGuard};
use crate::{
agents::SELF_USER_ID,
space::AppState,
types::{FormationInput, InputContext, MaintenanceInput, MaintenanceScope},
};
use anda_core::{
Agent, AgentOutput, BoxError, BoxPinFut, CompletionRequest, Message, Principal,
};
use anda_db::{database::DBConfig, storage::StorageConfig};
use anda_engine::{
context::AgentCtx,
management::{BaseManagement, Visibility},
memory::{Conversation, ConversationRef, ConversationStatus},
model::{CompletionFeaturesDyn, Model, Models, reqwest},
unix_ms,
};
use object_store::memory::InMemory;
use serde_json::json;
use std::collections::BTreeSet;
use std::sync::{
Arc,
atomic::{AtomicU64, Ordering},
};
use tokio::time::{Duration, sleep};
#[derive(Debug)]
struct SuccessCompleter;
impl CompletionFeaturesDyn for SuccessCompleter {
fn model_name(&self) -> String {
"success-test-model".to_string()
}
fn completion(&self, req: CompletionRequest) -> BoxPinFut<Result<AgentOutput, BoxError>> {
Box::pin(async move {
Ok(AgentOutput {
content: "formation done".to_string(),
chat_history: vec![Message {
role: "assistant".to_string(),
content: vec![format!("processed: {}", req.prompt).into()],
..Default::default()
}],
..Default::default()
})
})
}
}
#[derive(Debug)]
struct FailedReasonCompleter;
impl CompletionFeaturesDyn for FailedReasonCompleter {
fn model_name(&self) -> String {
"failed-reason-test-model".to_string()
}
fn completion(&self, _req: CompletionRequest) -> BoxPinFut<Result<AgentOutput, BoxError>> {
Box::pin(async move {
Ok(AgentOutput {
failed_reason: Some("formation failed".to_string()),
chat_history: vec![Message {
role: "assistant".to_string(),
content: vec!["formation failure".to_string().into()],
..Default::default()
}],
..Default::default()
})
})
}
}
#[derive(Debug)]
struct RetryCompleter {
calls: Arc<AtomicU64>,
}
impl CompletionFeaturesDyn for RetryCompleter {
fn model_name(&self) -> String {
"retry-test-model".to_string()
}
fn completion(&self, req: CompletionRequest) -> BoxPinFut<Result<AgentOutput, BoxError>> {
let calls = self.calls.clone();
Box::pin(async move {
let call = calls.fetch_add(1, Ordering::SeqCst);
if call == 0 {
Ok(AgentOutput {
failed_reason: Some("transient formation failure".to_string()),
chat_history: vec![Message {
role: "assistant".to_string(),
content: vec!["retry later".to_string().into()],
..Default::default()
}],
..Default::default()
})
} else {
Ok(AgentOutput {
content: "formation retried".to_string(),
chat_history: vec![Message {
role: "assistant".to_string(),
content: vec![format!("recovered: {}", req.prompt).into()],
..Default::default()
}],
..Default::default()
})
}
})
}
}
#[derive(Debug)]
struct ErrorCompleter;
impl CompletionFeaturesDyn for ErrorCompleter {
fn model_name(&self) -> String {
"error-test-model".to_string()
}
fn completion(&self, _req: CompletionRequest) -> BoxPinFut<Result<AgentOutput, BoxError>> {
Box::pin(async move { Err("model error".into()) })
}
}
#[derive(Debug)]
struct SlowCompleter;
impl CompletionFeaturesDyn for SlowCompleter {
fn model_name(&self) -> String {
"slow-test-model".to_string()
}
fn completion(&self, req: CompletionRequest) -> BoxPinFut<Result<AgentOutput, BoxError>> {
Box::pin(async move {
sleep(Duration::from_millis(150)).await;
Ok(AgentOutput {
content: "done".to_string(),
chat_history: vec![Message {
role: "assistant".to_string(),
content: vec![format!("processed: {}", req.prompt).into()],
..Default::default()
}],
..Default::default()
})
})
}
}
fn test_db_config(name: &str) -> DBConfig {
DBConfig {
name: name.to_string(),
description: "test database".to_string(),
storage: StorageConfig::default(),
lock: None,
}
}
fn test_app_state(name: &str) -> AppState {
test_app_state_with_models(name, Arc::new(Models::default()))
}
fn test_app_state_with_completer<C>(name: &str, completer: C) -> AppState
where
C: CompletionFeaturesDyn,
{
let models = Models::default();
models.set_model(Model::with_completer(Arc::new(completer)));
test_app_state_with_models(name, Arc::new(models))
}
fn test_app_state_with_models(name: &str, models: Arc<Models>) -> AppState {
let management = Arc::new(BaseManagement {
controller: SELF_USER_ID,
managers: BTreeSet::new(),
visibility: Visibility::Public,
});
let http_client = reqwest::Client::builder().build().unwrap();
AppState::new(
Arc::new(InMemory::new()),
Arc::new(test_db_config(name)),
management,
http_client,
models,
Arc::new(vec![]),
"anda_brain".to_string(),
"test".to_string(),
0,
)
}
fn formation_prompt(counterparty: Option<&str>) -> String {
formation_prompt_with_text("remember this preference", counterparty)
}
fn formation_prompt_with_text(text: &str, counterparty: Option<&str>) -> String {
serde_json::to_string(&FormationInput {
messages: vec![Message {
role: "user".to_string(),
content: vec![text.to_string().into()],
..Default::default()
}],
context: counterparty.map(|counterparty| InputContext {
counterparty: Some(counterparty.to_string()),
..Default::default()
}),
timestamp: None,
})
.unwrap()
}
async fn stored_conversation(
space: &crate::space::Space,
messages: Vec<serde_json::Value>,
) -> Conversation {
let now = unix_ms();
let mut conversation = Conversation {
user: SELF_USER_ID,
status: ConversationStatus::Submitted,
messages,
label: Some("formation".to_string()),
created_at: now,
updated_at: now,
..Default::default()
};
let id = space
.memory
.add_conversation(ConversationRef::from(&conversation))
.await
.unwrap();
conversation._id = id;
conversation
}
async fn create_loaded_space(app: &AppState, id: &str) -> Arc<crate::space::Space> {
app.admin_create_space(
Principal::from_slice(&[1]),
Principal::from_slice(&[2]),
id.to_string(),
1,
unix_ms(),
)
.await
.unwrap();
app.load_space(id, false).await.unwrap()
}
#[test]
fn processing_guard_resets_conversation_id_on_drop() {
let processing = Arc::new(AtomicU64::new(42));
{
let _guard = ProcessingGuard(processing.clone());
assert_eq!(processing.load(Ordering::SeqCst), 42);
}
assert_eq!(processing.load(Ordering::SeqCst), 0);
}
#[test]
fn formation_agent_name_matches_registered_agent_name() {
assert_eq!(FormationAgent::NAME, "formation_memory");
}
#[tokio::test]
async fn formation_agent_trait_metadata_matches_runtime_registration() {
let app = test_app_state("formation_trait_metadata");
let space = create_loaded_space(&app, "formation_trait_metadata").await;
assert_eq!(
Agent::<AgentCtx>::name(space.formation.as_ref()),
FormationAgent::NAME
);
assert!(
Agent::<AgentCtx>::description(space.formation.as_ref()).contains("structured memory")
);
let tools = Agent::<AgentCtx>::tool_dependencies(space.formation.as_ref());
assert!(tools.iter().any(|name| name == "execute_kip"));
assert!(tools.iter().any(|name| name == "note"));
}
#[tokio::test]
async fn find_next_submitted_skips_terminal_and_non_formation_conversations() {
let app = test_app_state("formation_find_next");
let space = create_loaded_space(&app, "formation_find_next").await;
let now = unix_ms();
for conversation in [
Conversation {
user: SELF_USER_ID,
status: ConversationStatus::Completed,
label: Some("formation".to_string()),
created_at: now,
updated_at: now,
..Default::default()
},
Conversation {
user: SELF_USER_ID,
status: ConversationStatus::Submitted,
label: Some("recall".to_string()),
created_at: now + 1,
updated_at: now + 1,
..Default::default()
},
Conversation {
user: SELF_USER_ID,
status: ConversationStatus::Cancelled,
label: Some("formation".to_string()),
created_at: now + 2,
updated_at: now + 2,
..Default::default()
},
] {
space
.memory
.add_conversation(ConversationRef::from(&conversation))
.await
.unwrap();
}
let pending = Conversation {
user: SELF_USER_ID,
status: ConversationStatus::Submitted,
label: Some("formation".to_string()),
created_at: now + 3,
updated_at: now + 3,
..Default::default()
};
let pending_id = space
.memory
.add_conversation(ConversationRef::from(&pending))
.await
.unwrap();
let found = space.formation.find_next_submitted(0).await.unwrap();
assert_eq!(found._id, pending_id);
assert!(
space
.formation
.find_next_submitted(pending_id)
.await
.is_none()
);
}
#[tokio::test]
async fn mark_conversation_failed_persists_status_and_reason() {
let app = test_app_state("formation_mark_failed");
let space = create_loaded_space(&app, "formation_mark_failed").await;
let now = unix_ms();
let mut conversation = Conversation {
user: SELF_USER_ID,
status: ConversationStatus::Submitted,
label: Some("formation".to_string()),
created_at: now,
updated_at: now,
..Default::default()
};
let id = space
.memory
.add_conversation(ConversationRef::from(&conversation))
.await
.unwrap();
conversation._id = id;
space
.formation
.mark_conversation_failed(&mut conversation, "boom".to_string())
.await;
assert_eq!(conversation.status, ConversationStatus::Failed);
assert_eq!(conversation.failed_reason.as_deref(), Some("boom"));
let stored = space.memory.get_conversation(id).await.unwrap();
assert_eq!(stored.status, ConversationStatus::Failed);
assert_eq!(stored.failed_reason.as_deref(), Some("boom"));
}
#[tokio::test]
async fn start_process_rejects_busy_and_maintenance_states() {
let app = test_app_state("formation_start_guards");
let space = create_loaded_space(&app, "formation_start_guards").await;
let ctx = space
.ctx_for_test(SELF_USER_ID, FormationAgent::NAME)
.unwrap();
space
.formation
.processing_conversation
.store(42, Ordering::SeqCst);
let err = space
.formation
.start_process(ctx.clone(), 1)
.await
.unwrap_err();
assert!(
err.to_string()
.contains("already processing conversation 42")
);
space
.formation
.processing_conversation
.store(0, Ordering::SeqCst);
let app = test_app_state_with_completer("formation_maintenance_guard", SlowCompleter);
let space = create_loaded_space(&app, "formation_maintenance_guard").await;
let maintenance = space
.maintenance(
SELF_USER_ID,
MaintenanceInput {
scope: MaintenanceScope::Quick,
..Default::default()
},
)
.await
.unwrap();
assert!(maintenance.conversation.is_some());
let ctx = space
.ctx_for_test(SELF_USER_ID, FormationAgent::NAME)
.unwrap();
let err = space.formation.start_process(ctx, 1).await.unwrap_err();
assert!(err.to_string().contains("MaintenanceAgent is processing"));
for _ in 0..100 {
if !space.is_processing() {
return;
}
sleep(Duration::from_millis(10)).await;
}
panic!("maintenance did not finish");
}
#[tokio::test]
async fn start_process_finds_pending_conversation_and_dispatches_worker() {
let app = test_app_state_with_completer("formation_start_success", SuccessCompleter);
let space = create_loaded_space(&app, "formation_start_success").await;
let ctx = space
.ctx_for_test(SELF_USER_ID, FormationAgent::NAME)
.unwrap();
let pending = stored_conversation(
&space,
vec![json!(Message {
role: "user".to_string(),
content: vec![formation_prompt(None).into()],
..Default::default()
})],
)
.await;
space
.formation
.start_process(ctx, pending._id)
.await
.unwrap();
for _ in 0..100 {
if !space.formation.is_processing() {
break;
}
sleep(Duration::from_millis(10)).await;
}
assert_eq!(space.formation.get_processed(), Some(pending._id));
assert_eq!(
space
.memory
.get_conversation(pending._id)
.await
.unwrap()
.status,
ConversationStatus::Completed
);
}
#[tokio::test]
async fn run_queues_while_maintenance_runs_and_resumes_from_processed_gap() {
let app = test_app_state_with_completer("formation_run_resume_gap", SuccessCompleter);
let space = create_loaded_space(&app, "formation_run_resume_gap").await;
let ctx = space
.ctx_for_test(SELF_USER_ID, FormationAgent::NAME)
.unwrap();
space
.memory
.conversations
.save_extension("brain_processed".to_string(), 0_u64.into())
.await
.unwrap();
let missed = stored_conversation(
&space,
vec![json!(Message {
role: "user".to_string(),
content: vec![formation_prompt(None).into()],
..Default::default()
})],
)
.await;
let output = Agent::<AgentCtx>::run(
space.formation.as_ref(),
ctx,
formation_prompt(Some("resume-gap-user")),
vec![],
)
.await
.unwrap();
let queued_id = output.conversation.unwrap();
for _ in 0..100 {
if !space.formation.is_processing() {
break;
}
sleep(Duration::from_millis(10)).await;
}
assert_eq!(space.formation.get_processed(), Some(queued_id));
assert_eq!(
space
.memory
.get_conversation(missed._id)
.await
.unwrap()
.status,
ConversationStatus::Completed
);
let app = test_app_state_with_completer("formation_run_maintenance_queue", SlowCompleter);
let space = create_loaded_space(&app, "formation_run_maintenance_queue").await;
let maintenance = space
.maintenance(
SELF_USER_ID,
MaintenanceInput {
scope: MaintenanceScope::Quick,
..Default::default()
},
)
.await
.unwrap();
assert!(maintenance.conversation.is_some());
let ctx = space
.ctx_for_test(SELF_USER_ID, FormationAgent::NAME)
.unwrap();
let output = Agent::<AgentCtx>::run(
space.formation.as_ref(),
ctx,
formation_prompt(None),
vec![],
)
.await
.unwrap();
assert!(output.conversation.is_some());
assert!(!space.formation.is_processing());
for _ in 0..100 {
if !space.is_processing() {
return;
}
sleep(Duration::from_millis(10)).await;
}
panic!("maintenance did not finish");
}
#[tokio::test]
async fn try_process_returns_when_another_conversation_owns_the_guard() {
let app = test_app_state("formation_try_process_guard");
let space = create_loaded_space(&app, "formation_try_process_guard").await;
let ctx = space
.ctx_for_test(SELF_USER_ID, FormationAgent::NAME)
.unwrap();
let conversation = stored_conversation(
&space,
vec![json!(Message {
role: "user".to_string(),
content: vec![formation_prompt(None).into()],
..Default::default()
})],
)
.await;
space
.formation
.processing_conversation
.store(conversation._id + 10, Ordering::SeqCst);
space.formation.try_process(ctx, conversation.clone());
assert_eq!(
space
.formation
.processing_conversation
.load(Ordering::SeqCst),
conversation._id + 10
);
}
#[tokio::test]
async fn process_one_marks_missing_prompt_and_completion_errors() {
let app = test_app_state("formation_no_prompt");
let space = create_loaded_space(&app, "formation_no_prompt").await;
let ctx = space
.ctx_for_test(SELF_USER_ID, FormationAgent::NAME)
.unwrap();
let mut no_prompt = stored_conversation(&space, vec![]).await;
space.formation.process_one(&ctx, &mut no_prompt).await;
assert_eq!(no_prompt.status, ConversationStatus::Failed);
assert_eq!(no_prompt.failed_reason.as_deref(), Some("No prompt found"));
let app = test_app_state_with_completer("formation_model_error", ErrorCompleter);
let space = create_loaded_space(&app, "formation_model_error").await;
let ctx = space
.ctx_for_test(SELF_USER_ID, FormationAgent::NAME)
.unwrap();
let mut conversation = stored_conversation(
&space,
vec![json!(Message {
role: "user".to_string(),
content: vec![formation_prompt(Some("counterparty-error")).into()],
..Default::default()
})],
)
.await;
space.formation.process_one(&ctx, &mut conversation).await;
assert_eq!(conversation.status, ConversationStatus::Failed);
assert!(
conversation
.failed_reason
.as_deref()
.unwrap_or_default()
.contains("CompletionRunner error")
);
}
#[tokio::test]
async fn process_one_persists_model_failed_reason() {
let app = test_app_state_with_completer("formation_failed_reason", FailedReasonCompleter);
let space = create_loaded_space(&app, "formation_failed_reason").await;
let ctx = space
.ctx_for_test(SELF_USER_ID, FormationAgent::NAME)
.unwrap();
let mut conversation = stored_conversation(
&space,
vec![json!(Message {
role: "user".to_string(),
content: vec![formation_prompt(None).into()],
..Default::default()
})],
)
.await;
space.formation.process_one(&ctx, &mut conversation).await;
assert_eq!(conversation.status, ConversationStatus::Failed);
assert_eq!(
conversation.failed_reason.as_deref(),
Some("formation failed")
);
let stored = space
.memory
.get_conversation(conversation._id)
.await
.unwrap();
assert_eq!(stored.status, ConversationStatus::Failed);
assert_eq!(stored.failed_reason.as_deref(), Some("formation failed"));
}
#[tokio::test]
async fn run_rejects_oversized_formation_input_before_persisting() {
let app = test_app_state("formation_input_too_large");
let space = create_loaded_space(&app, "formation_input_too_large").await;
let ctx = space
.ctx_for_test(SELF_USER_ID, FormationAgent::NAME)
.unwrap();
let prompt = "x ".repeat(1_000_000);
let err = Agent::<AgentCtx>::run(space.formation.as_ref(), ctx, prompt, vec![])
.await
.unwrap_err();
assert!(err.to_string().contains("Input too large"));
assert_eq!(space.memory.conversations.len(), 0);
}
#[tokio::test]
async fn process_loop_processes_submitted_formation_queue_sequentially() {
let app = test_app_state_with_completer("formation_process_loop_queue", SuccessCompleter);
let space = create_loaded_space(&app, "formation_process_loop_queue").await;
let ctx = space
.ctx_for_test(SELF_USER_ID, FormationAgent::NAME)
.unwrap();
let first = stored_conversation(
&space,
vec![json!(Message {
role: "user".to_string(),
content: vec![formation_prompt(None).into()],
..Default::default()
})],
)
.await;
let second = stored_conversation(
&space,
vec![json!(Message {
role: "user".to_string(),
content: vec![formation_prompt(Some("queue-user")).into()],
..Default::default()
})],
)
.await;
space
.formation
.processing_conversation
.store(first._id, Ordering::SeqCst);
space.formation.process_loop(ctx, first).await;
assert_eq!(
space
.formation
.processing_conversation
.load(Ordering::SeqCst),
0
);
assert_eq!(space.formation.get_processed(), Some(second._id));
assert_eq!(
space
.memory
.get_conversation(second._id)
.await
.unwrap()
.status,
ConversationStatus::Completed
);
}
#[tokio::test(start_paused = true)]
async fn process_loop_retries_failed_conversation_once_and_clears_failure_reason() {
let calls = Arc::new(AtomicU64::new(0));
let app = test_app_state_with_completer(
"formation_process_loop_retry",
RetryCompleter {
calls: calls.clone(),
},
);
let space = create_loaded_space(&app, "formation_process_loop_retry").await;
let ctx = space
.ctx_for_test(SELF_USER_ID, FormationAgent::NAME)
.unwrap();
let pending = stored_conversation(
&space,
vec![json!(Message {
role: "user".to_string(),
content: vec![formation_prompt(None).into()],
..Default::default()
})],
)
.await;
space
.formation
.processing_conversation
.store(pending._id, Ordering::SeqCst);
space.formation.process_loop(ctx, pending.clone()).await;
assert_eq!(calls.load(Ordering::SeqCst), 2);
assert_eq!(
space
.formation
.processing_conversation
.load(Ordering::SeqCst),
0
);
assert_eq!(space.formation.get_processed(), Some(pending._id));
let stored = space.memory.get_conversation(pending._id).await.unwrap();
assert_eq!(stored.status, ConversationStatus::Completed);
assert_eq!(stored.failed_reason, None);
}
#[tokio::test]
async fn process_loop_triggers_scheduled_maintenance_at_threshold() {
let app =
test_app_state_with_completer("formation_process_loop_maintenance", SuccessCompleter);
let space = create_loaded_space(&app, "formation_process_loop_maintenance").await;
let ctx = space
.ctx_for_test(SELF_USER_ID, FormationAgent::NAME)
.unwrap();
for _ in 0..20 {
let completed = Conversation {
user: SELF_USER_ID,
status: ConversationStatus::Completed,
label: Some("formation".to_string()),
created_at: unix_ms(),
updated_at: unix_ms(),
..Default::default()
};
space
.memory
.add_conversation(ConversationRef::from(&completed))
.await
.unwrap();
}
let pending = stored_conversation(
&space,
vec![json!(Message {
role: "user".to_string(),
content: vec![formation_prompt(None).into()],
..Default::default()
})],
)
.await;
assert_eq!(pending._id, 21);
space
.formation
.processing_conversation
.store(pending._id, Ordering::SeqCst);
space.formation.process_loop(ctx, pending).await;
assert_eq!(
space
.formation
.processing_conversation
.load(Ordering::SeqCst),
0
);
for _ in 0..100 {
if !space.is_processing() {
break;
}
sleep(Duration::from_millis(10)).await;
}
assert_eq!(space.maintenance_for_test().get_processed_at().daydream, 21);
}
#[tokio::test]
async fn process_one_reviews_large_prompts_and_appends_follow_up_history() {
let app = test_app_state_with_completer("formation_review_large_prompt", SuccessCompleter);
let space = create_loaded_space(&app, "formation_review_large_prompt").await;
let ctx = space
.ctx_for_test(SELF_USER_ID, FormationAgent::NAME)
.unwrap();
let large_text = "x".repeat(10_500);
let mut conversation = stored_conversation(
&space,
vec![json!(Message {
role: "user".to_string(),
content: vec![formation_prompt_with_text(&large_text, None).into()],
..Default::default()
})],
)
.await;
space.formation.process_one(&ctx, &mut conversation).await;
assert_eq!(conversation.status, ConversationStatus::Completed);
assert!(conversation.messages.len() >= 2);
}
}