pub mod buffer;
use crate::agents::{
AgentConfig, AgentContext, AgentHeartbeat, AgentLiveStatus, ChatCapable, NsedAgent,
PersistenceStore, ProposalRecord, UserToolHandlerTrait,
};
use crate::nats_utils::{NatsAuth, connect_nats, ensure_kv_bucket, sanitize_subject_component};
use crate::status::{SharedAgentStatus, TaskLogEntry, new_shared_status};
use crate::telemetry::{TaskFailureClass, TelemetryEmitterMux};
use anyhow::{Context, Result};
use async_nats::connection::State as NatsState;
use async_nats::jetstream::{self, kv};
use async_trait::async_trait;
use futures::StreamExt;
use serde::{Deserialize, Serialize};
use std::collections::HashSet;
use std::fmt::Debug;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex};
use std::time::Instant;
use tracing::{error, info, warn};
use uuid::Uuid;
pub const PASSTHROUGH_SUBJECT_SUFFIX: &str = "passthrough";
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PassthroughRequest {
pub session_id: String,
pub messages: Vec<PassthroughMessage>,
pub operator_principal: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PassthroughMessage {
pub role: String,
pub content: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PassthroughResponse {
pub content: String,
pub input_tokens: Option<u32>,
pub output_tokens: Option<u32>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PassthroughError {
pub error: String,
}
#[async_trait]
pub trait WorkerHook: Send + Sync + Debug {
async fn before_publish(&self, _subject: &str, _payload: &mut Vec<u8>) -> Result<()> {
Ok(())
}
}
#[async_trait]
pub trait UserToolHandlerFactory: Send + Sync + Debug {
fn create(
&self,
nats: async_nats::Client,
js: jetstream::Context,
session_id: String,
agent_id: String,
budget_remaining_secs: f64,
subject_prefix: String,
) -> Arc<dyn UserToolHandlerTrait>;
}
#[derive(Clone, Debug)]
pub struct WorkerConfig {
pub nats_url: String,
pub stream_name: String,
pub consumer_name: String,
pub subject_prefix: String,
pub api_prefix: String,
pub scratchpad_retention_secs: u64,
pub nats_auth: Option<NatsAuth>,
}
impl WorkerConfig {
pub fn new(nats_url: String, stream_name: String, consumer_name: String) -> Self {
Self {
nats_url,
stream_name,
consumer_name,
subject_prefix: "nsed".to_string(),
api_prefix: "sphera".to_string(),
scratchpad_retention_secs: 86400 * 7,
nats_auth: None,
}
}
pub fn with_subject_prefix(mut self, prefix: String) -> Self {
self.subject_prefix = prefix;
self
}
pub fn with_api_prefix(mut self, prefix: String) -> Self {
self.api_prefix = prefix;
self
}
pub fn with_scratchpad_retention(mut self, secs: u64) -> Self {
self.scratchpad_retention_secs = secs;
self
}
pub fn with_nats_auth(mut self, auth: NatsAuth) -> Self {
self.nats_auth = Some(auth);
self
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct JobManifest {
pub job_id: String,
pub task_description: String,
pub agents: Vec<String>,
pub rounds: u32,
pub timestamp: u64,
}
#[derive(Clone, Debug)]
pub struct NatsScratchpadStore {
store: kv::Store,
js: jetstream::Context,
scope_prefix: String,
}
impl NatsScratchpadStore {
pub fn new(store: kv::Store, js: jetstream::Context, scope_prefix: String) -> Self {
Self {
store,
js,
scope_prefix,
}
}
fn scoped_key(&self, key: &str) -> String {
format!("{}.{}", self.scope_prefix, key)
}
async fn store_get(&self, key: &str) -> Result<Option<bytes::Bytes>> {
self.store
.get(self.scoped_key(key))
.await
.map_err(|e| anyhow::anyhow!(e))
}
async fn store_entry(&self, key: &str) -> Result<Option<kv::Entry>> {
self.store
.entry(self.scoped_key(key))
.await
.map_err(|e| anyhow::anyhow!(e))
}
async fn store_put(&self, key: &str, value: bytes::Bytes) -> Result<u64> {
self.store
.put(self.scoped_key(key), value)
.await
.map_err(|e| anyhow::anyhow!(e))
}
async fn store_create(
&self,
key: &str,
value: bytes::Bytes,
) -> std::result::Result<u64, async_nats::error::Error<kv::CreateErrorKind>> {
self.store.create(self.scoped_key(key), value).await
}
async fn store_update(
&self,
key: &str,
value: bytes::Bytes,
revision: u64,
) -> std::result::Result<u64, async_nats::error::Error<kv::UpdateErrorKind>> {
self.store
.update(self.scoped_key(key), value, revision)
.await
}
}
#[async_trait]
impl PersistenceStore for NatsScratchpadStore {
async fn get(&self, key: &str) -> Result<Option<String>> {
match self.store_get(key).await? {
Some(data) => {
let vec: Vec<u8> = data.to_vec();
Ok(Some(String::from_utf8(vec)?))
}
None => Ok(None),
}
}
async fn append(&self, key: &str, content: &str) -> Result<()> {
let mut attempts = 0;
let max_retries = 20;
loop {
attempts += 1;
if attempts > max_retries {
return Err(anyhow::anyhow!(
"Failed to append to key '{}' (scoped) after {} attempts due to contention",
key,
max_retries
));
}
match self.store_entry(key).await? {
Some(entry) => {
let current = String::from_utf8_lossy(&entry.value);
let new_content = format!("{}{}", current, content);
match self
.store_update(key, new_content.into(), entry.revision)
.await
{
Ok(_) => return Ok(()),
Err(e) => {
if matches!(e.kind(), kv::UpdateErrorKind::WrongLastRevision) {
tokio::time::sleep(std::time::Duration::from_millis(
5 + (attempts * 2),
))
.await;
continue;
}
return Err(anyhow::anyhow!(e));
}
}
}
None => match self.store_create(key, content.to_string().into()).await {
Ok(_) => return Ok(()),
Err(e) => {
if matches!(e.kind(), kv::CreateErrorKind::AlreadyExists) {
tokio::time::sleep(std::time::Duration::from_millis(
5 + (attempts * 2),
))
.await;
continue;
}
return Err(anyhow::anyhow!(e));
}
},
}
}
}
async fn set(&self, key: &str, content: &str) -> Result<()> {
self.store_put(key, content.to_string().into()).await?;
Ok(())
}
async fn get_round_history(&self, round: u32) -> Result<Option<Vec<ProposalRecord>>> {
let safe_id = sanitize_subject_component(&self.scope_prefix);
let bucket_name = format!("nsed_hist_{}", safe_id);
let history_store = match self.js.get_key_value(&bucket_name).await {
Ok(s) => s,
Err(e) => {
let err_str = e.to_string();
if err_str.contains("not found") || err_str.contains("no stream") {
return Ok(None);
}
return Err(anyhow::anyhow!(e)
.context(format!("Failed to access history bucket '{}'", bucket_name)));
}
};
let key = format!("round_{}", round);
match history_store.get(&key).await {
Ok(Some(entry)) => {
let records = serde_json::from_slice(&entry)
.context("Failed to deserialize round history")?;
Ok(Some(records))
}
Ok(None) => Ok(None),
Err(e) => Err(anyhow::anyhow!("NATS KV Get Error for history: {}", e)),
}
}
}
pub struct NatsNsedWorker {
agent: Arc<dyn NsedAgent>,
agent_config: AgentConfig,
nats: async_nats::Client,
js: jetstream::Context,
processed_kv: kv::Store,
scratchpad_kv: kv::Store,
config: WorkerConfig,
agent_id: String,
active_jobs: Arc<Mutex<HashSet<String>>>,
start_time: Instant,
status: Option<SharedAgentStatus>,
hook: Option<Arc<dyn WorkerHook>>,
user_tool_factory: Option<Arc<dyn UserToolHandlerFactory>>,
chat_agent: Option<Arc<dyn ChatCapable>>,
response_buffer: Option<Arc<buffer::ResponseBuffer>>,
paused: Arc<AtomicBool>,
telemetry: Option<TelemetryEmitterMux>,
}
impl NatsNsedWorker {
pub async fn new(
agent: impl NsedAgent + 'static,
agent_config: AgentConfig,
config: WorkerConfig,
telemetry: Option<TelemetryEmitterMux>,
) -> Result<Self> {
Self::from_dyn_agent(Arc::new(agent), agent_config, config, telemetry).await
}
pub async fn from_dyn_agent(
agent: Arc<dyn NsedAgent>,
agent_config: AgentConfig,
config: WorkerConfig,
telemetry: Option<TelemetryEmitterMux>,
) -> Result<Self> {
let agent_id = agent.name();
let nats = connect_nats(&config.nats_url, config.nats_auth.as_ref()).await?;
let js = jetstream::new(nats.clone());
info!("🍃 NATS Leaf Worker Connected: {}", agent_id);
let safe_id = agent_id.replace(|c: char| !c.is_alphanumeric(), "_");
let processed_bucket_name = format!("nsed_proc_{}", safe_id);
let processed_kv = ensure_kv_bucket(
&js,
kv::Config {
bucket: processed_bucket_name.clone(),
description: format!("Idempotency keys for Agent {}", agent_id),
max_age: std::time::Duration::from_secs(86400),
storage: jetstream::stream::StorageType::File,
num_replicas: 1,
..Default::default()
},
)
.await?;
let scratchpad_bucket_name = format!("nsed_local_mem_{}", safe_id);
let scratchpad_ttl = if config.scratchpad_retention_secs > 0 {
std::time::Duration::from_secs(config.scratchpad_retention_secs)
} else {
std::time::Duration::ZERO
};
let scratchpad_kv = ensure_kv_bucket(
&js,
kv::Config {
bucket: scratchpad_bucket_name.clone(),
description: format!("Session-scoped scratchpad for Agent {}", agent_id),
history: 5,
max_age: scratchpad_ttl,
storage: jetstream::stream::StorageType::File,
num_replicas: 1,
..Default::default()
},
)
.await?;
info!(
"🔒 Initialized Sovereign KV Stores: {} & {}",
processed_bucket_name, scratchpad_bucket_name
);
Ok(Self {
agent,
agent_config,
nats,
js,
processed_kv,
scratchpad_kv,
config,
agent_id,
active_jobs: Arc::new(Mutex::new(HashSet::new())),
start_time: Instant::now(),
status: None,
hook: None,
user_tool_factory: None,
chat_agent: None,
response_buffer: None,
paused: Arc::new(AtomicBool::new(false)),
telemetry,
})
}
pub fn telemetry(&self) -> Option<&TelemetryEmitterMux> {
self.telemetry.as_ref()
}
pub fn with_hook(mut self, hook: Arc<dyn WorkerHook>) -> Self {
self.hook = Some(hook);
self
}
#[cfg(feature = "audit")]
pub fn with_signing(self, keypair: crate::crypto::AgentKeyPair) -> Self {
let hook = Arc::new(crate::crypto::SigningHook::new(
keypair,
self.agent_id.clone(),
));
self.with_hook(hook)
}
#[cfg(feature = "audit")]
pub fn auto_sign(self) -> Self {
self.with_signing(crate::crypto::AgentKeyPair::generate())
}
pub fn with_user_tool_factory(mut self, factory: Arc<dyn UserToolHandlerFactory>) -> Self {
self.user_tool_factory = Some(factory);
self
}
pub fn with_chat(mut self, chat: Arc<dyn ChatCapable>) -> Self {
self.chat_agent = Some(chat);
self
}
pub fn with_status(mut self, _port: u16) -> Self {
let shared = new_shared_status(
self.agent_id.clone(),
self.agent_config.model_name.clone(),
self.agent_config.provider_id.clone(),
);
self.status = Some(shared);
self
}
pub fn status(&self) -> Option<&SharedAgentStatus> {
self.status.as_ref()
}
pub fn agent_config(&self) -> &AgentConfig {
&self.agent_config
}
pub fn agent_id(&self) -> &str {
&self.agent_id
}
pub fn chat_agent(&self) -> Option<&Arc<dyn ChatCapable>> {
self.chat_agent.as_ref()
}
pub fn with_response_buffer(mut self, hold_duration: std::time::Duration) -> Self {
self.response_buffer = Some(Arc::new(buffer::ResponseBuffer::new(hold_duration)));
self
}
pub fn response_buffer(&self) -> Option<&Arc<buffer::ResponseBuffer>> {
self.response_buffer.as_ref()
}
pub fn pause_handle(&self) -> Arc<AtomicBool> {
self.paused.clone()
}
pub fn pause(&self) {
self.paused.store(true, Ordering::Relaxed);
if let Some(ref buf) = self.response_buffer {
buf.pause();
}
}
pub fn resume(&self) {
self.paused.store(false, Ordering::Relaxed);
if let Some(ref buf) = self.response_buffer {
buf.resume();
}
}
pub fn is_paused(&self) -> bool {
self.paused.load(Ordering::Relaxed)
}
pub async fn run(&self) -> Result<()> {
let prefix = &self.config.subject_prefix;
let task_filter = format!("{}.*.task.{}.*", prefix, self.agent_id);
let stream = {
let max_attempts = 10;
let mut attempt = 0;
loop {
attempt += 1;
match self.js.get_stream(&self.config.stream_name).await {
Ok(s) => break s,
Err(e) => {
if attempt >= max_attempts {
return Err(anyhow::anyhow!(
"Stream '{}' not found after {} attempts. \
Is the orchestrator running? Last error: {}",
self.config.stream_name,
max_attempts,
e
));
}
info!(
"⏳ Waiting for stream '{}' (attempt {}/{}). \
Orchestrator may still be starting...",
self.config.stream_name, attempt, max_attempts
);
tokio::time::sleep(std::time::Duration::from_millis(500 * attempt as u64))
.await;
}
}
}
};
let task_consumer = stream
.get_or_create_consumer(
&self.config.consumer_name,
jetstream::consumer::pull::Config {
durable_name: Some(self.config.consumer_name.clone()),
filter_subject: task_filter,
ack_wait: std::time::Duration::from_secs(30), ..Default::default()
},
)
.await?;
let manifest_consumer_name = format!("manifest_watcher_{}", self.agent_id);
let manifest_consumer = stream
.get_or_create_consumer(
&manifest_consumer_name,
jetstream::consumer::pull::Config {
durable_name: Some(manifest_consumer_name.clone()),
filter_subject: format!("{}.jobs.manifest.>", self.config.api_prefix),
deliver_policy: jetstream::consumer::DeliverPolicy::New,
..Default::default()
},
)
.await?;
let score_subject = format!(
"{}.*.result.event.round_summary",
self.config.subject_prefix
);
let score_subscription: Option<async_nats::Subscriber> = if self.status.is_some() {
match self.nats.subscribe(score_subject.clone()).await {
Ok(sub) => Some(sub),
Err(e) => {
warn!(
"Failed to subscribe to score events on {}: {}. Score tracking disabled.",
score_subject, e
);
None
}
}
} else {
None
};
let passthrough_subject = format!(
"{}.agent.{}.{}",
self.config.subject_prefix, self.agent_id, PASSTHROUGH_SUBJECT_SUFFIX
);
let passthrough_subscription: Option<async_nats::Subscriber> =
match self.nats.subscribe(passthrough_subject.clone()).await {
Ok(sub) => Some(sub),
Err(e) => {
warn!(
"Failed to subscribe to passthrough subject {}: {}. \
Passthrough mode unavailable for this agent.",
passthrough_subject, e
);
None
}
};
info!(
"🎧 Agent {} listening for tasks, manifests, score events, and passthrough requests.",
self.agent_id
);
if let Some(ref status) = self.status {
let mut snap = status.write().await;
snap.nats_connected = true;
snap.push_event("connected", None, "NATS connected, listening for tasks");
}
let mut task_messages = task_consumer.messages().await?;
let mut manifest_messages = manifest_consumer.messages().await?;
let mut score_messages = score_subscription;
let mut passthrough_messages = passthrough_subscription;
let mut heartbeat_interval = tokio::time::interval(std::time::Duration::from_secs(10));
let mut drain_interval = tokio::time::interval(std::time::Duration::from_millis(500));
let mut conn_check_interval = tokio::time::interval(std::time::Duration::from_secs(5));
let mut last_conn_state = NatsState::Connected;
let mut reconnects_so_far: u32 = 0;
loop {
let is_paused = self.paused.load(Ordering::Relaxed);
tokio::select! {
Some(msg_res) = task_messages.next(), if !is_paused => {
match msg_res {
Ok(msg) => {
let worker = self.clone();
tokio::spawn(async move {
if let Err(e) = worker.handle_message(msg).await {
error!("Failed to process task: {:?}", e);
}
});
}
Err(e) => {
error!("Task consumer error: {:?}", e);
break;
}
}
}
Some(msg_res) = manifest_messages.next() => {
match msg_res {
Ok(msg) => {
let worker = self.clone();
tokio::spawn(async move {
if let Err(e) = worker.handle_manifest(msg).await {
error!("Failed to process manifest: {:?}", e);
}
});
}
Err(e) => {
error!("Manifest consumer error: {:?}", e);
break;
}
}
}
Some(msg) = async {
match &mut score_messages {
Some(sub) => sub.next().await,
None => std::future::pending::<Option<async_nats::Message>>().await,
}
} => {
let worker = self.clone();
tokio::spawn(async move {
if let Err(e) = worker.handle_round_summary(msg).await {
warn!("Failed to process round summary: {:?}", e);
}
});
}
Some(msg) = async {
match &mut passthrough_messages {
Some(sub) => sub.next().await,
None => std::future::pending::<Option<async_nats::Message>>().await,
}
} => {
if is_paused {
if let Some(reply_subject) = msg.reply.clone() {
let err = PassthroughError {
error: "Agent is paused and cannot handle passthrough requests"
.to_string(),
};
let payload = serde_json::to_vec(&err).unwrap_or_default();
let _ = self.nats.publish(reply_subject, payload.into()).await;
}
} else {
let worker = self.clone();
tokio::spawn(async move {
worker.handle_passthrough(msg).await;
});
}
}
_ = heartbeat_interval.tick() => {
self.publish_heartbeat().await;
}
_ = drain_interval.tick() => {
self.drain_buffer().await;
}
_ = conn_check_interval.tick() => {
let current_state = self.nats.connection_state();
if current_state != last_conn_state {
if let Some(ref telemetry) = self.telemetry {
use crate::telemetry::NatsConnectionState;
let state: NatsConnectionState = (¤t_state).into();
if matches!(
state,
NatsConnectionState::Reconnecting | NatsConnectionState::Connected
) && matches!(last_conn_state, NatsState::Disconnected)
{
reconnects_so_far += 1;
}
let conn_ctx = crate::telemetry::TelemetryContext::new(
&self.agent_id,
None,
None,
None,
);
crate::emit_event!(
Some(telemetry),
conn_ctx,
NatsConnectionStateChanged {
state,
reconnects_so_far,
pending_publish_depth: None,
buffer_bytes: None,
}
);
}
last_conn_state = current_state;
}
}
else => {
error!("Both consumers closed. Exiting worker loop for {}", self.agent_id);
break;
}
}
}
warn!(
"Worker loop exited for agent {}. Reconnecting...",
self.agent_id
);
Ok(())
}
async fn handle_manifest(&self, msg: async_nats::jetstream::Message) -> Result<()> {
let manifest: JobManifest = match serde_json::from_slice(&msg.payload) {
Ok(m) => m,
Err(e) => {
warn!("Failed to parse manifest: {}", e);
let _ = msg.ack().await;
return Ok(());
}
};
if manifest.agents.contains(&self.agent_id) {
info!(
"🔔 Agent {} selected for Job {}. Accepting.",
self.agent_id, manifest.job_id
);
{
let mut jobs = self.active_jobs.lock().unwrap();
jobs.insert(manifest.job_id.clone());
}
let ack_subject = format!(
"{}.jobs.ack.{}.{}",
self.config.api_prefix, manifest.job_id, self.agent_id
);
let mut ack_payload = serde_json::to_vec(&serde_json::json!({
"agent_id": self.agent_id,
"status": "Accepted",
"timestamp": chrono::Utc::now().to_rfc3339()
}))?;
if let Some(ref hook) = self.hook {
hook.before_publish(&ack_subject, &mut ack_payload).await?;
}
self.nats.publish(ack_subject, ack_payload.into()).await?;
let event_subject = format!(
"{}.{}.result.event.agent_accepted",
self.config.subject_prefix, manifest.job_id
);
let event_payload = serde_json::json!({
"agent_id": self.agent_id,
"status": "Online",
"role": "Generalist"
});
self.nats
.publish(event_subject, serde_json::to_vec(&event_payload)?.into())
.await?;
if let Some(ref status) = self.status {
let mut snap = status.write().await;
snap.push_event(
"agent_accepted",
Some(&manifest.job_id),
&format!("Accepted job manifest ({})", manifest.task_description),
);
}
}
let _ = msg.ack().await;
Ok(())
}
async fn handle_round_summary(&self, msg: async_nats::Message) -> Result<()> {
let summary: crate::events::RoundSummaryEvent = serde_json::from_slice(&msg.payload)
.map_err(|e| {
warn!("Failed to parse round_summary event: {}", e);
e
})?;
let prefix_count = if self.config.subject_prefix.is_empty() {
0
} else {
self.config.subject_prefix.split('.').count()
};
let session_id = msg
.subject
.as_str()
.split('.')
.nth(prefix_count)
.unwrap_or("?")
.to_string();
for entry in &summary.proposal_scores {
if entry.agent_id == self.agent_id {
if let Some(ref status) = self.status {
let mut snap = status.write().await;
let already_has = snap
.recent_scores
.iter()
.any(|s| s.job_id == session_id && s.round == summary.round);
if !already_has {
snap.push_score(crate::status::ScoreEntry {
timestamp: chrono::Utc::now().to_rfc3339(),
job_id: session_id.clone(),
round: summary.round,
evaluator: "aggregated".into(),
score: entry.aggregated_score,
});
}
}
break;
}
}
Ok(())
}
async fn handle_passthrough(&self, msg: async_nats::Message) {
let reply_subject = match msg.reply.as_deref() {
Some(s) if !s.is_empty() => s.to_string(),
_ => {
warn!(
agent_id = %self.agent_id,
"Passthrough message has no reply subject — ignoring"
);
return;
}
};
let request: PassthroughRequest = match serde_json::from_slice(&msg.payload) {
Ok(r) => r,
Err(e) => {
warn!(agent_id = %self.agent_id, "Failed to parse passthrough request: {}", e);
let err_payload = serde_json::to_vec(&PassthroughError {
error: e.to_string(),
})
.unwrap_or_default();
let _ = self.nats.publish(reply_subject, err_payload.into()).await;
return;
}
};
info!(
agent_id = %self.agent_id,
session_id = %request.session_id,
"Handling passthrough request"
);
let chat_agent = match &self.chat_agent {
Some(a) => a.clone(),
None => {
let err = PassthroughError {
error: format!(
"Agent '{}' does not support passthrough mode (ChatCapable not configured)",
self.agent_id
),
};
let payload = serde_json::to_vec(&err).unwrap_or_default();
let _ = self.nats.publish(reply_subject, payload.into()).await;
return;
}
};
let messages: Vec<async_openai::types::ChatCompletionRequestMessage> = request
.messages
.into_iter()
.filter_map(|m| {
match m.role.as_str() {
"user" => Some(async_openai::types::ChatCompletionRequestMessage::User(
async_openai::types::ChatCompletionRequestUserMessage {
content:
async_openai::types::ChatCompletionRequestUserMessageContent::Text(
m.content,
),
name: None,
},
)),
"assistant" => Some(
async_openai::types::ChatCompletionRequestMessage::Assistant(
async_openai::types::ChatCompletionRequestAssistantMessage {
content: Some(
async_openai::types::ChatCompletionRequestAssistantMessageContent::Text(m.content),
),
..Default::default()
},
),
),
"system" => Some(async_openai::types::ChatCompletionRequestMessage::System(
async_openai::types::ChatCompletionRequestSystemMessage {
content:
async_openai::types::ChatCompletionRequestSystemMessageContent::Text(
m.content,
),
name: None,
},
)),
other => {
warn!(
agent_id = %self.agent_id,
"Unknown role '{}' in passthrough message — skipping", other
);
None
}
}
})
.collect();
if messages.is_empty() {
warn!(
agent_id = %self.agent_id,
"Passthrough request contained no valid messages (all roles unknown) — rejecting"
);
let err = PassthroughError {
error: "No valid messages provided for passthrough (all message roles unknown)"
.to_string(),
};
let payload = serde_json::to_vec(&err).unwrap_or_default();
let _ = self.nats.publish(reply_subject, payload.into()).await;
return;
}
const PASSTHROUGH_TIMEOUT_CAP_SECS: u64 = 600;
let configured = self.agent_config.response_sla_secs;
let passthrough_timeout = std::time::Duration::from_secs(if configured == 0 {
PASSTHROUGH_TIMEOUT_CAP_SECS
} else {
configured.min(PASSTHROUGH_TIMEOUT_CAP_SECS)
});
let chat_result =
tokio::time::timeout(passthrough_timeout, chat_agent.chat(messages)).await;
let response_content = match chat_result {
Ok(Ok(content)) => content,
Ok(Err(e)) => {
warn!(agent_id = %self.agent_id, "Passthrough chat failed: {}", e);
let err = PassthroughError {
error: e.to_string(),
};
let payload = serde_json::to_vec(&err).unwrap_or_default();
let _ = self.nats.publish(reply_subject, payload.into()).await;
return;
}
Err(_) => {
warn!(
agent_id = %self.agent_id,
timeout_secs = passthrough_timeout.as_secs(),
"Passthrough chat timed out"
);
let err = PassthroughError {
error: format!(
"Passthrough request timed out after {}s",
passthrough_timeout.as_secs()
),
};
let payload = serde_json::to_vec(&err).unwrap_or_default();
let _ = self.nats.publish(reply_subject, payload.into()).await;
return;
}
};
let response = PassthroughResponse {
content: response_content,
input_tokens: None,
output_tokens: None,
};
let payload = match serde_json::to_vec(&response) {
Ok(b) => b,
Err(e) => {
error!(agent_id = %self.agent_id, "Failed to serialize passthrough response: {}", e);
let err = PassthroughError {
error: format!("Failed to serialize response: {e}"),
};
let err_payload = serde_json::to_vec(&err).unwrap_or_default();
let _ = self.nats.publish(reply_subject, err_payload.into()).await;
return;
}
};
let _ = self.nats.publish(reply_subject, payload.into()).await;
}
async fn handle_message(&self, msg: async_nats::jetstream::Message) -> Result<()> {
let msg = std::sync::Arc::new(msg);
let msg_id = match msg.info() {
Ok(info) => format!("{}-{}-{}", info.stream, info.stream_sequence, msg.subject),
Err(_) => msg
.headers
.as_ref()
.and_then(|h| h.get("Nats-Msg-Id").map(|v| v.to_string()))
.unwrap_or_else(|| Uuid::new_v4().to_string()),
};
if self.is_duplicate(&msg_id).await? {
warn!(
"♻️ Detected duplicate message {}. Acking and skipping.",
msg_id
);
msg.ack()
.await
.map_err(|e| anyhow::anyhow!(e))
.context("Failed to ack duplicate")?;
return Ok(());
}
info!("📨 Received Task: {} (Subject: {})", msg_id, msg.subject);
let task_received = Instant::now();
let mut context: AgentContext = match serde_json::from_slice(&msg.payload) {
Ok(ctx) => ctx,
Err(e) => {
error!(
msg_id = %msg_id,
error = %e,
"❌ Failed to deserialize AgentContext. Poison pill detected. Acking to discard."
);
if let Err(ack_err) = msg.ack().await {
error!("Failed to ack poison pill: {}", ack_err);
}
return Ok(());
}
};
let subject_parts: Vec<&str> = msg.subject.split('.').collect();
let prefix = &self.config.subject_prefix;
let prefix_count = if prefix.is_empty() {
0
} else {
prefix.split('.').count()
};
let session_id = subject_parts
.get(prefix_count)
.unwrap_or(&"global")
.to_string();
let action: String = subject_parts.last().unwrap_or(&"unknown").to_string();
let action = action.as_str();
let work_start_subject = format!(
"{}.{}.result.event.agent_working",
self.config.subject_prefix, session_id
);
let work_start_payload = serde_json::json!({
"agent_id": self.agent_id,
"round": context.round_number,
"action": action,
"status": "Thinking"
});
if let Err(e) = self
.nats
.publish(
work_start_subject,
serde_json::to_vec(&work_start_payload)?.into(),
)
.await
{
warn!("Failed to publish work start event: {}", e);
}
if let Some(ref status) = self.status {
let mut snap = status.write().await;
snap.push_event(
"agent_working",
Some(&session_id),
&format!("Round {} {}", context.round_number, action),
);
}
context.store = Some(Arc::new(NatsScratchpadStore::new(
self.scratchpad_kv.clone(),
self.js.clone(),
session_id.clone(),
)) as Arc<dyn PersistenceStore>);
context.telemetry = self.telemetry.clone();
if !context.user_tools.is_empty() {
if let Some(ref factory) = self.user_tool_factory {
context.user_tool_handler = Some(factory.create(
self.nats.clone(),
self.js.clone(),
session_id.clone(),
self.agent.name(),
context.phase_budget_remaining_secs,
self.config.subject_prefix.clone(),
));
}
}
{
let mut jobs = self.active_jobs.lock().unwrap();
jobs.insert(session_id.clone());
}
if let Some(ref status) = self.status {
let mut snap = status.write().await;
snap.current_job = Some(session_id.clone());
snap.current_round = Some(context.round_number);
snap.current_phase = Some(action.to_string());
}
if action == "propose" && context.round_number > 1 {
if let Some(score) = context.previous_own_score {
if let Some(ref status) = self.status {
let mut snap = status.write().await;
let prev_round = context.round_number.saturating_sub(1);
let already_has = snap
.recent_scores
.iter()
.any(|s| s.job_id == session_id && s.round == prev_round);
if !already_has {
snap.push_score(crate::status::ScoreEntry {
timestamp: chrono::Utc::now().to_rfc3339(),
job_id: session_id.clone(),
round: prev_round,
evaluator: "aggregated".to_string(),
score,
});
}
}
}
}
let dispatch_delay_ms = task_received.elapsed().as_millis() as u64;
let task_publish_ts = context.task_publish_ts;
let agent_receive_ts = chrono::Utc::now().timestamp_millis();
let job_age_at_accept_ms = task_publish_ts.map(|publish_ts| {
agent_receive_ts
.checked_sub(publish_ts)
.map_or(0, |diff| diff.max(0))
});
crate::emit_for!(
context,
TaskAccepted {
dispatch_delay_ms,
task_publish_ts,
job_age_at_accept_ms,
}
);
let msg_heartbeat = msg.clone();
let hb_session = session_id.clone();
let heartbeat_handle = tokio::spawn(async move {
let mut interval = tokio::time::interval(std::time::Duration::from_secs(15));
interval.tick().await; loop {
interval.tick().await;
if let Err(e) = msg_heartbeat
.ack_with(async_nats::jetstream::AckKind::Progress)
.await
{
tracing::warn!(
session_id = %hb_session,
"Failed to send task ack heartbeat: {}", e
);
break;
}
}
});
let task_start = Instant::now();
let execution_result = {
const MAX_TASK_RETRIES: u32 = 2;
let mut last_err = None;
let mut attempt = 0u32;
loop {
attempt += 1;
let result = async {
match action {
"propose" => {
let mut proposal = self.agent.propose(&context).await?;
proposal.published_at_ms = chrono::Utc::now().timestamp_millis();
serde_json::to_vec(&proposal).map_err(|e| anyhow::anyhow!(e))
}
"evaluate" => {
let evaluations = self.agent.evaluate(&context).await?;
let valid_ids: std::collections::HashSet<&str> =
context.candidates.iter().map(|c| c.id.as_str()).collect();
let publish_ts = chrono::Utc::now().timestamp_millis();
let filtered: Vec<_> = evaluations
.into_iter()
.filter(|(target_id, _)| {
if valid_ids.contains(target_id.as_str()) {
true
} else {
tracing::warn!(
target_id = %target_id,
"Dropping evaluation with hallucinated target ID"
);
false
}
})
.map(|(target_id, mut eval)| {
eval.published_at_ms = publish_ts;
(target_id, eval)
})
.collect();
serde_json::to_vec(&filtered).map_err(|e| anyhow::anyhow!(e))
}
_ => Err(anyhow::anyhow!("Unknown action: {}", action)),
}
}
.await;
match result {
Ok(payload) => break Ok(payload),
Err(e) => {
let is_retryable = is_transient_error(&e);
if is_retryable && attempt <= MAX_TASK_RETRIES {
warn!(
agent = %self.agent_id,
attempt,
max = MAX_TASK_RETRIES + 1,
error = %e,
"Transient task error, retrying after backoff"
);
tokio::time::sleep(std::time::Duration::from_secs(
2u64.pow(attempt - 1),
))
.await;
last_err = Some(e);
continue;
}
let _ = last_err; break Err(e);
}
}
}
};
let task_duration_ms = task_start.elapsed().as_millis() as u64;
heartbeat_handle.abort();
match execution_result {
Ok(mut response_payload) => {
let content_preview =
extract_content_preview(&response_payload, action, &context.candidates);
let reply_subject = format!(
"{}.{}.result.{}.{}.{}",
self.config.subject_prefix,
session_id,
context.round_number,
self.agent_id,
action
);
let was_buffered = if let Some(ref buf) = self.response_buffer {
self.mark_processed(&msg_id).await?;
msg.ack()
.await
.map_err(|e| anyhow::anyhow!("buffer pre-ack failed: {}", e))?;
let hold = buf.hold_duration();
let now = Instant::now();
let entry = buffer::BufferedResponse {
id: Uuid::new_v4().to_string(),
action: action.to_string(),
job_id: session_id.clone(),
round: context.round_number,
reply_subject,
payload: response_payload,
created_at: now,
release_at: now + hold, ack_handle: Box::new(buffer::PreAckedHandle),
msg_id: msg_id.clone(),
annotations: Vec::new(),
edited: false,
stopped: self.agent_config.auto_stop,
};
buf.push_with_deadline(entry, task_received).await;
info!(
"📦 Buffered response: {} (SLA-based release, pre-acked)",
msg_id
);
if let Some(ref status) = self.status {
let mut snap = status.write().await;
snap.current_job = None;
snap.current_round = None;
snap.current_phase = None;
snap.buffered_count = buf.len().await as u32;
snap.push_event(
"response_buffered",
Some(&session_id),
&format!(
"Round {} {} buffered {}ms hold",
context.round_number,
action,
hold.as_millis()
),
);
}
true
} else {
if let Some(ref hook) = self.hook {
hook.before_publish(&reply_subject, &mut response_payload)
.await?;
}
self.nats
.publish(reply_subject, response_payload.into())
.await?;
self.mark_processed(&msg_id).await?;
msg.ack().await.map_err(|e| anyhow::anyhow!(e))?;
info!("✅ Task Complete: {}", msg_id);
false
};
{
let mut jobs = self.active_jobs.lock().unwrap();
jobs.remove(&session_id);
}
if let Some(ref status) = self.status {
let mut snap = status.write().await;
snap.current_job = None;
snap.current_round = None;
snap.current_phase = None;
snap.push_task(TaskLogEntry {
timestamp: chrono::Utc::now().to_rfc3339(),
action: action.to_string(),
job_id: session_id.clone(),
round: context.round_number,
status: "ok".into(),
duration_ms: task_duration_ms,
content_preview: content_preview.clone(),
});
if !was_buffered {
snap.push_event(
"task_complete",
Some(&session_id),
&format!("{} ok {}ms", action, task_duration_ms),
);
}
}
let phase_budget_remaining_ms =
(context.phase_budget_remaining_secs * 1000.0) as i64;
crate::emit_for!(
context,
TaskCompleted {
duration_ms: task_duration_ms,
dispatch_delay_ms,
queue_wait_ms: None,
phase_budget_remaining_ms,
llm_attempts: None,
tool_call_count: None,
pending_publish_depth: None,
}
);
}
Err(e) => {
let err_str = e.to_string();
error!("❌ Task Execution Failed: {:?}", e);
let is_payment_error = err_str.contains("402 Payment Required")
|| err_str.contains("insufficient_quota")
|| err_str.contains("billing");
{
let mut jobs = self.active_jobs.lock().unwrap();
jobs.remove(&session_id);
}
let suppress_error_event =
is_payment_error && !self.agent_config.propagate_payment_error;
if suppress_error_event {
warn!(
"Payment error detected (propagate_payment_error=false) — pausing worker to avoid further API calls"
);
self.paused.store(true, Ordering::Relaxed);
if let Some(ref buf) = self.response_buffer {
buf.pause();
}
}
if !suppress_error_event {
let reason = classify_abstention_reason(&err_str);
let error_payload = serde_json::json!({
"agent_id": self.agent_id,
"round": context.round_number,
"action": action,
"error": err_str,
"reason": reason,
"status": "Failed"
});
let error_bytes = serde_json::to_vec(&error_payload)?;
let legacy_subject = format!(
"{}.{}.result.event.agent_error",
self.config.subject_prefix, session_id
);
if let Err(pub_err) = self
.nats
.publish(legacy_subject, error_bytes.clone().into())
.await
{
warn!("Failed to publish legacy agent_error event: {}", pub_err);
}
if should_publish_failure_marker(action, is_payment_error) {
let failed_subject = failed_result_subject(
&self.config.subject_prefix,
&session_id,
context.round_number,
&self.agent_id,
action,
);
if let Err(pub_err) =
self.nats.publish(failed_subject, error_bytes.into()).await
{
warn!("Failed to publish round-scoped .failed marker: {}", pub_err);
}
}
}
msg.ack().await.map_err(|e| anyhow::anyhow!(e))?;
if let Some(ref status) = self.status {
let mut snap = status.write().await;
snap.current_job = None;
snap.current_round = None;
snap.current_phase = None;
snap.push_task(TaskLogEntry {
timestamp: chrono::Utc::now().to_rfc3339(),
action: action.to_string(),
job_id: session_id.clone(),
round: context.round_number,
status: "error".into(),
duration_ms: task_duration_ms,
content_preview: Some(format!("Error: {}", err_str)),
});
snap.push_event(
"agent_error",
Some(&session_id),
&format!("{} failed: {}", action, err_str),
);
}
let phase_budget_remaining_ms =
(context.phase_budget_remaining_secs * 1000.0) as i64;
let failure_class = if is_payment_error {
TaskFailureClass::ToolError
} else {
TaskFailureClass::Timeout
};
crate::emit_for!(
context,
TaskFailed {
duration_ms: task_duration_ms,
dispatch_delay_ms,
queue_wait_ms: None,
phase_budget_remaining_ms,
llm_attempts: None,
tool_call_count: None,
failure_class,
pending_publish_depth: None,
}
);
}
}
Ok(())
}
async fn publish_heartbeat(&self) {
let active_job = {
let jobs = self.active_jobs.lock().unwrap();
jobs.iter().next().cloned()
};
let hb_status = if active_job.is_some() {
AgentLiveStatus::Busy
} else {
AgentLiveStatus::Idle
};
let uptime = self.start_time.elapsed().as_secs();
let (tasks_completed, tasks_failed, last_error) = if let Some(ref status) = self.status {
let snap = status.read().await;
let err = snap
.recent_tasks
.iter()
.find(|t| t.status == "error")
.map(|t| {
let msg = format!("{}: {}", t.action, t.job_id);
msg.chars().take(120).collect::<String>()
});
(snap.tasks_completed, snap.tasks_failed, err)
} else {
(0, 0, None)
};
let heartbeat = AgentHeartbeat {
agent_id: self.agent_id.clone(),
status: hb_status,
model_name: self.agent_config.model_name.clone(),
provider_id: self.agent_config.provider_id.clone(),
current_job: active_job.clone(),
uptime_secs: uptime,
timestamp: chrono::Utc::now().to_rfc3339(),
input_price_per_mtok: self.agent_config.input_price_per_mtok,
output_price_per_mtok: self.agent_config.output_price_per_mtok,
chars_per_token: self.agent_config.chars_per_token,
response_sla_secs: if self.agent_config.response_sla_secs > 0 {
Some(self.agent_config.response_sla_secs)
} else {
None
},
temperature: Some(self.agent_config.temperature),
frequency_penalty: self.agent_config.frequency_penalty,
presence_penalty: self.agent_config.presence_penalty,
max_tokens: Some(self.agent_config.max_tokens),
context_window: Some(self.agent_config.context_window),
tasks_completed,
tasks_failed,
last_error,
capability_tags: self.agent_config.capability_tags.clone(),
description: self.agent_config.description.clone(),
signing_schemes: self.agent_config.signing_schemes.clone(),
};
let subject = format!(
"{}.agent.heartbeat.{}",
self.config.api_prefix, self.agent_id
);
match serde_json::to_vec(&heartbeat) {
Ok(payload) => {
if let Err(e) = self.nats.publish(subject, payload.into()).await {
warn!("Failed to publish heartbeat: {}", e);
}
}
Err(e) => {
warn!(agent_id = %self.agent_id, "Failed to serialize heartbeat: {}", e);
}
}
if let Some(ref status) = self.status {
let mut snap = status.write().await;
snap.uptime_secs = uptime;
snap.nats_connected = true;
snap.current_job = active_job.clone();
snap.push_event(
"heartbeat",
active_job.as_deref(),
&format!(
"{} uptime {}s",
if active_job.is_some() { "busy" } else { "idle" },
uptime
),
);
}
}
async fn drain_buffer(&self) {
let Some(ref buf) = self.response_buffer else {
return;
};
let divergence = if let Some(ref status) = self.status {
let snap = status.read().await;
let new_hold =
buffer::compute_adaptive_hold(buf.base_hold_duration(), snap.mean_score, 3.0);
buf.set_hold_duration(new_hold);
buffer::compute_divergence(snap.mean_score, snap.score_std_dev)
} else {
None
};
let auto_released = buf.auto_release_if_eligible(divergence).await;
if auto_released > 0 {
let divergence_str =
divergence.map_or_else(|| "n/a".to_string(), |d| format!("{d:.2}"));
info!(
"⚡ Auto-approved {} buffered entries for {} (divergence: {}, threshold: {:.2})",
auto_released,
self.agent_id,
divergence_str,
buf.auto_approve_threshold(),
);
}
let ready = buf.drain_ready().await;
for mut entry in ready {
let publish_payload = if !entry.annotations.is_empty() || entry.edited {
Self::inject_annotations(&entry)
} else {
entry.payload.clone()
};
let mut publish_payload =
Self::restamp_published_at(&publish_payload, chrono::Utc::now().timestamp_millis());
if let Some(ref hook) = self.hook
&& let Err(e) = hook
.before_publish(&entry.reply_subject, &mut publish_payload)
.await
{
error!(
"Failed to prepare buffered response {} for publish: {} — re-enqueuing",
entry.id, e
);
entry.release_at = Instant::now() + std::time::Duration::from_secs(5);
buf.push(entry).await;
continue;
}
if let Err(e) = self
.nats
.publish(entry.reply_subject.clone(), publish_payload.into())
.await
{
error!(
"Failed to publish buffered response {}: {} — re-enqueuing",
entry.id, e
);
entry.release_at = Instant::now() + std::time::Duration::from_secs(5);
buf.push(entry).await;
continue;
}
if !entry.annotations.is_empty() {
let annotation_subject = format!(
"{}.{}.annotations.{}.{}",
self.config.subject_prefix,
entry.job_id,
entry.round,
entry.id.get(..8).unwrap_or(&entry.id)
);
let annotation_payload = serde_json::json!({
"entry_id": entry.id,
"action": entry.action,
"job_id": entry.job_id,
"round": entry.round,
"edited": entry.edited,
"annotations": entry.annotations,
});
if let Err(e) = self
.nats
.publish(
annotation_subject,
serde_json::to_vec(&annotation_payload)
.unwrap_or_default()
.into(),
)
.await
{
warn!("Failed to publish annotation audit trail: {}", e);
}
}
if let Err(e) = self.mark_processed(&entry.msg_id).await {
warn!("Failed to mark buffered response processed: {}", e);
}
if let Err(e) = entry.ack_handle.ack().await {
warn!("Failed to ack buffered message: {}", e);
}
let edit_marker = if entry.edited { " [EDITED]" } else { "" };
let annotation_count = entry.annotations.len();
info!(
"✅ Buffer released: {} ({} r{}){} ({} annotation(s))",
entry.id, entry.action, entry.round, edit_marker, annotation_count
);
if let Some(ref status) = self.status {
let mut snap = status.write().await;
snap.buffered_count = buf.len().await as u32;
let detail = if entry.edited {
format!(
"Round {} {} released from buffer (operator-edited)",
entry.round, entry.action
)
} else if annotation_count > 0 {
format!(
"Round {} {} released from buffer ({} annotation(s))",
entry.round, entry.action, annotation_count
)
} else {
format!(
"Round {} {} released from buffer",
entry.round, entry.action
)
};
snap.push_event("buffer_released", Some(&entry.job_id), &detail);
snap.push_event(
"task_complete",
Some(&entry.job_id),
&format!("Round {} {} released", entry.round, entry.action),
);
}
}
}
fn restamp_published_at(bytes: &[u8], now_ms: i64) -> Vec<u8> {
let Ok(mut value) = serde_json::from_slice::<serde_json::Value>(bytes) else {
return bytes.to_vec();
};
let stamp = serde_json::Value::from(now_ms);
if let Some(obj) = value.as_object_mut() {
obj.insert("published_at_ms".to_string(), stamp);
} else if let Some(arr) = value.as_array_mut() {
for item in arr.iter_mut() {
if let Some(pair) = item.as_array_mut()
&& pair.len() == 2
&& let Some(eval_obj) = pair[1].as_object_mut()
{
eval_obj.insert("published_at_ms".to_string(), stamp.clone());
}
}
}
serde_json::to_vec(&value).unwrap_or_else(|_| bytes.to_vec())
}
fn inject_annotations(entry: &buffer::BufferedResponse) -> Vec<u8> {
let Ok(mut value) = serde_json::from_slice::<serde_json::Value>(&entry.payload) else {
warn!(
"Buffer entry {} has non-JSON payload; skipping annotation injection",
entry.id
);
return entry.payload.clone();
};
let annotations_json: Vec<serde_json::Value> = entry
.annotations
.iter()
.filter_map(|a| serde_json::to_value(a).ok())
.collect();
if let Some(obj) = value.as_object_mut() {
if !annotations_json.is_empty() {
obj.insert(
"operator_annotations".to_string(),
serde_json::Value::Array(annotations_json),
);
}
if entry.edited {
obj.insert(
"edited_by".to_string(),
serde_json::Value::String("operator".to_string()),
);
}
serde_json::to_vec(&value).unwrap_or_else(|_| entry.payload.clone())
} else if let Some(arr) = value.as_array_mut() {
for item in arr.iter_mut() {
if let Some(tuple) = item.as_array_mut() {
if let Some(eval_obj) = tuple.get_mut(1).and_then(|v| v.as_object_mut()) {
if !annotations_json.is_empty() {
eval_obj.insert(
"operator_annotations".to_string(),
serde_json::Value::Array(annotations_json.clone()),
);
}
if entry.edited {
eval_obj.insert(
"edited_by".to_string(),
serde_json::Value::String("operator".to_string()),
);
}
}
}
}
serde_json::to_vec(&value).unwrap_or_else(|_| entry.payload.clone())
} else {
entry.payload.clone()
}
}
async fn is_duplicate(&self, msg_id: &str) -> Result<bool> {
match self.processed_kv.get(msg_id).await {
Ok(Some(_)) => Ok(true),
Ok(None) => Ok(false),
Err(e) => Err(anyhow::anyhow!("KV Get Error: {}", e)),
}
}
async fn mark_processed(&self, msg_id: &str) -> Result<()> {
let val = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs()
.to_string();
self.processed_kv
.put(msg_id, val.into())
.await
.map_err(|e| anyhow::anyhow!("KV Put Error: {}", e))?;
Ok(())
}
}
impl Clone for NatsNsedWorker {
fn clone(&self) -> Self {
Self {
agent: self.agent.clone(),
agent_config: self.agent_config.clone(),
nats: self.nats.clone(),
js: self.js.clone(),
processed_kv: self.processed_kv.clone(),
scratchpad_kv: self.scratchpad_kv.clone(),
config: self.config.clone(),
agent_id: self.agent_id.clone(),
active_jobs: self.active_jobs.clone(),
start_time: self.start_time,
status: self.status.clone(),
hook: self.hook.clone(),
user_tool_factory: self.user_tool_factory.clone(),
chat_agent: self.chat_agent.clone(),
response_buffer: self.response_buffer.clone(),
paused: self.paused.clone(),
telemetry: self.telemetry.clone(),
}
}
}
fn extract_content_preview(
payload: &[u8],
action: &str,
candidates: &[crate::agents::CandidateProposal],
) -> Option<String> {
let val: serde_json::Value = serde_json::from_slice(payload).ok()?;
let structured = match action {
"propose" => {
let content = val.get("content").and_then(|v| v.as_str()).unwrap_or("");
if content.is_empty() {
return None;
}
let thought = val
.get("thought_process")
.and_then(|v| v.as_str())
.unwrap_or("");
let mut obj = serde_json::Map::new();
obj.insert("t".into(), serde_json::Value::String("p".into()));
let c = if content.chars().count() > 2000 {
let idx = content
.char_indices()
.nth(2000)
.map(|(i, _)| i)
.unwrap_or(content.len());
format!("{}…", &content[..idx])
} else {
content.to_string()
};
obj.insert("c".into(), serde_json::Value::String(c));
if !thought.is_empty() {
let tp = if thought.chars().count() > 500 {
let idx = thought
.char_indices()
.nth(500)
.map(|(i, _)| i)
.unwrap_or(thought.len());
format!("{}…", &thought[..idx])
} else {
thought.to_string()
};
obj.insert("tp".into(), serde_json::Value::String(tp));
}
serde_json::Value::Object(obj)
}
"evaluate" => {
let arr = val.as_array()?;
let mut evals = Vec::new();
let mut displayed_targets = std::collections::HashSet::new();
for item in arr.iter().take(10) {
if let Some(tuple) = item.as_array() {
let target = tuple.first().and_then(|v| v.as_str()).unwrap_or("?");
if let Some(eval_obj) = tuple.get(1) {
displayed_targets.insert(target.to_string());
let mut e = serde_json::Map::new();
e.insert(
"target".into(),
serde_json::Value::String(target.to_string()),
);
if let Some(s) = eval_obj.get("score") {
e.insert("s".into(), s.clone());
}
if let Some(j) = eval_obj.get("justification").and_then(|v| v.as_str()) {
let jp = if j.chars().count() > 300 {
let idx =
j.char_indices().nth(300).map(|(i, _)| i).unwrap_or(j.len());
format!("{}…", &j[..idx])
} else {
j.to_string()
};
e.insert("j".into(), serde_json::Value::String(jp));
}
if let Some(stance) = eval_obj.get("stance").and_then(|v| v.as_str()) {
e.insert(
"stance".into(),
serde_json::Value::String(stance.to_string()),
);
}
if let Some(tf) = eval_obj.get("textual_feedback").and_then(|v| v.as_str())
{
let tfp = if tf.chars().count() > 200 {
let idx = tf
.char_indices()
.nth(200)
.map(|(i, _)| i)
.unwrap_or(tf.len());
format!("{}…", &tf[..idx])
} else {
tf.to_string()
};
e.insert("tf".into(), serde_json::Value::String(tfp));
}
if let Some(cats) = eval_obj.get("category_scores") {
e.insert("cats".into(), cats.clone());
}
if let Some(claims) = eval_obj.get("claim_assessments") {
e.insert("claims".into(), claims.clone());
}
if let Some(disputes) = eval_obj.get("disagreements") {
e.insert("disputes".into(), disputes.clone());
}
evals.push(serde_json::Value::Object(e));
}
}
}
if evals.is_empty() {
return None;
}
let mut obj = serde_json::Map::new();
obj.insert("t".into(), serde_json::Value::String("e".into()));
obj.insert("evals".into(), serde_json::Value::Array(evals));
if !candidates.is_empty() && !displayed_targets.is_empty() {
let mut props = serde_json::Map::new();
for cp in candidates
.iter()
.filter(|cp| displayed_targets.contains(&cp.id))
{
let c = &cp.proposal.content;
let truncated = if c.chars().count() > 1000 {
let idx = c
.char_indices()
.nth(1000)
.map(|(i, _)| i)
.unwrap_or(c.len());
format!("{}…", &c[..idx])
} else {
c.clone()
};
props.insert(cp.id.clone(), serde_json::Value::String(truncated));
}
if !props.is_empty() {
obj.insert("props".into(), serde_json::Value::Object(props));
}
}
serde_json::Value::Object(obj)
}
_ => return None,
};
serde_json::to_string(&structured).ok()
}
fn is_transient_error(err: &anyhow::Error) -> bool {
let msg = err.to_string().to_lowercase();
const PATTERNS: &[&str] = &[
"broken pipe",
"connection reset",
"os error 32",
"os error 104",
"timed out",
"connection closed",
"unexpected eof",
"stream closed",
"connection refused",
"network unreachable",
"connection aborted",
];
PATTERNS.iter().any(|p| msg.contains(p))
}
fn classify_abstention_reason(err: &str) -> String {
let lower = err.to_lowercase();
if lower.contains("failed to parse structured output") || lower.contains("missing field") {
"parse_error".into()
} else if lower.contains("max_iterations")
|| lower.contains("max iterations")
|| lower.contains("iteration budget")
{
"iter_budget_exhausted".into()
} else if lower.contains("timeout") || lower.contains("timed out") {
"timeout".into()
} else if lower.contains("tool") && (lower.contains("error") || lower.contains("failed")) {
"tool_error".into()
} else {
"error".into()
}
}
fn failed_result_subject(
prefix: &str,
session_id: &str,
round: u32,
agent_id: &str,
action: &str,
) -> String {
format!("{prefix}.{session_id}.result.{round}.{agent_id}.{action}.failed")
}
fn should_publish_failure_marker(action: &str, is_payment_error: bool) -> bool {
if is_payment_error {
return false;
}
matches!(action, "propose" | "evaluate")
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_worker_config_new_defaults() {
let config = WorkerConfig::new(
"nats://localhost:4222".to_string(),
"test_stream".to_string(),
"test_consumer".to_string(),
);
assert_eq!(config.nats_url, "nats://localhost:4222");
assert_eq!(config.stream_name, "test_stream");
assert_eq!(config.consumer_name, "test_consumer");
assert_eq!(config.subject_prefix, "nsed");
assert_eq!(config.api_prefix, "sphera");
assert_eq!(config.scratchpad_retention_secs, 86400 * 7);
}
#[test]
fn test_worker_config_with_subject_prefix() {
let config = WorkerConfig::new(
"nats://localhost:4222".to_string(),
"stream".to_string(),
"consumer".to_string(),
)
.with_subject_prefix("nsed.test".to_string());
assert_eq!(config.subject_prefix, "nsed.test");
}
#[test]
fn test_worker_config_with_scratchpad_retention() {
let config = WorkerConfig::new(
"nats://localhost:4222".to_string(),
"stream".to_string(),
"consumer".to_string(),
)
.with_scratchpad_retention(3600);
assert_eq!(config.scratchpad_retention_secs, 3600);
}
#[test]
fn test_worker_config_with_zero_retention() {
let config = WorkerConfig::new(
"nats://localhost:4222".to_string(),
"stream".to_string(),
"consumer".to_string(),
)
.with_scratchpad_retention(0);
assert_eq!(config.scratchpad_retention_secs, 0);
}
#[test]
fn test_worker_config_chained_builders() {
let config = WorkerConfig::new(
"nats://test:4222".to_string(),
"my_stream".to_string(),
"my_consumer".to_string(),
)
.with_subject_prefix("prefix".to_string())
.with_api_prefix("myapi".to_string())
.with_scratchpad_retention(7200);
assert_eq!(config.nats_url, "nats://test:4222");
assert_eq!(config.stream_name, "my_stream");
assert_eq!(config.consumer_name, "my_consumer");
assert_eq!(config.subject_prefix, "prefix");
assert_eq!(config.api_prefix, "myapi");
assert_eq!(config.scratchpad_retention_secs, 7200);
}
#[test]
fn test_job_manifest_deserialization() {
let json = r#"{
"job_id": "test-job-123",
"task_description": "Solve math problem",
"agents": ["agent1", "agent2", "agent3"],
"rounds": 5,
"timestamp": 1704067200
}"#;
let manifest: JobManifest = serde_json::from_str(json).unwrap();
assert_eq!(manifest.job_id, "test-job-123");
assert_eq!(manifest.task_description, "Solve math problem");
assert_eq!(manifest.agents.len(), 3);
assert_eq!(manifest.rounds, 5);
assert_eq!(manifest.timestamp, 1704067200);
}
#[test]
fn test_job_manifest_serialization_roundtrip() {
let manifest = JobManifest {
job_id: "roundtrip-test".to_string(),
task_description: "Test description".to_string(),
agents: vec!["alpha".to_string(), "beta".to_string()],
rounds: 3,
timestamp: 999999,
};
let json = serde_json::to_string(&manifest).unwrap();
let parsed: JobManifest = serde_json::from_str(&json).unwrap();
assert_eq!(parsed.job_id, manifest.job_id);
assert_eq!(parsed.task_description, manifest.task_description);
assert_eq!(parsed.agents, manifest.agents);
assert_eq!(parsed.rounds, manifest.rounds);
assert_eq!(parsed.timestamp, manifest.timestamp);
}
#[test]
fn test_dual_prefix_subject_architecture() {
let subject_prefix = "nsed";
let api_prefix = "sphera";
let job_id = "job-123";
let agent_id = "agent-001";
let ack_subject = format!("{}.jobs.ack.{}.{}", api_prefix, job_id, agent_id);
assert_eq!(ack_subject, "sphera.jobs.ack.job-123.agent-001");
let manifest_filter = format!("{}.jobs.manifest.>", api_prefix);
assert_eq!(manifest_filter, "sphera.jobs.manifest.>");
let heartbeat = format!("{}.agent.heartbeat.{}", api_prefix, agent_id);
assert_eq!(heartbeat, "sphera.agent.heartbeat.agent-001");
let accepted_subject = format!("{}.{}.result.event.agent_accepted", subject_prefix, job_id);
assert!(accepted_subject.starts_with("nsed."));
let task_filter = format!("{}.*.task.{}.*", subject_prefix, agent_id);
assert_eq!(task_filter, "nsed.*.task.agent-001.*");
}
#[test]
fn test_session_id_extraction_single_segment_prefix() {
let subject = "nsed.session-abc.task.agent1.propose";
let prefix = "nsed";
let subject_parts: Vec<&str> = subject.split('.').collect();
let prefix_count = prefix.split('.').count();
let session_id = subject_parts
.get(prefix_count)
.unwrap_or(&"global")
.to_string();
let action = subject_parts.last().unwrap_or(&"unknown");
assert_eq!(session_id, "session-abc");
assert_eq!(*action, "propose");
}
#[test]
fn test_session_id_extraction_multi_segment_prefix() {
let subject = "nsed.v2.session-abc.task.agent1.evaluate";
let prefix = "nsed.v2";
let subject_parts: Vec<&str> = subject.split('.').collect();
let prefix_count = prefix.split('.').count();
let session_id = subject_parts
.get(prefix_count)
.unwrap_or(&"global")
.to_string();
let action = subject_parts.last().unwrap_or(&"unknown");
assert_eq!(session_id, "session-abc");
assert_eq!(*action, "evaluate");
}
#[test]
fn test_session_id_extraction_empty_prefix_fallback() {
let subject = "session-abc.task.agent1.propose";
let prefix = "";
let subject_parts: Vec<&str> = subject.split('.').collect();
let prefix_count = if prefix.is_empty() {
0
} else {
prefix.split('.').count()
};
let session_id = subject_parts
.get(prefix_count)
.unwrap_or(&"global")
.to_string();
assert_eq!(session_id, "session-abc");
}
#[test]
fn test_worker_config_with_nats_auth() {
let auth = NatsAuth {
token: Some("my-secret-token".to_string()),
username: None,
password: None,
inline_creds: None,
creds_file: None,
};
let config = WorkerConfig::new(
"nats://localhost:4222".to_string(),
"stream".to_string(),
"consumer".to_string(),
)
.with_nats_auth(auth);
assert!(config.nats_auth.is_some());
let auth = config.nats_auth.unwrap();
assert_eq!(auth.token, Some("my-secret-token".to_string()));
}
#[derive(Debug)]
struct NoopHook;
#[async_trait]
impl WorkerHook for NoopHook {}
#[tokio::test]
async fn test_worker_hook_default_before_publish() {
let hook = NoopHook;
let mut payload = vec![1, 2, 3];
let result = hook.before_publish("some.subject", &mut payload).await;
assert!(result.is_ok());
assert_eq!(payload, vec![1, 2, 3]);
}
struct TestAckHandle;
#[async_trait]
impl buffer::AckHandle for TestAckHandle {
async fn ack(&self) -> anyhow::Result<()> {
Ok(())
}
}
fn make_entry(
payload: &[u8],
edited: bool,
annotations: Vec<crate::agents::OperatorAnnotation>,
) -> buffer::BufferedResponse {
let now = std::time::Instant::now();
buffer::BufferedResponse {
id: "test-id".into(),
action: "propose".into(),
job_id: "job-1".into(),
round: 1,
reply_subject: "nsed.job-1.result.1.agent.propose".into(),
payload: payload.to_vec(),
created_at: now,
release_at: now,
ack_handle: Box::new(TestAckHandle),
msg_id: "msg-test".into(),
annotations,
edited,
stopped: false,
}
}
#[test]
fn restamp_proposal_object_overwrites_published_at_ms() {
let payload = br#"{"content":"hello","thought_process":"t","published_at_ms":1000}"#;
let out = NatsNsedWorker::restamp_published_at(payload, 9999);
let v: serde_json::Value = serde_json::from_slice(&out).unwrap();
assert_eq!(v["published_at_ms"], 9999);
assert_eq!(v["content"], "hello");
}
#[test]
fn restamp_proposal_object_inserts_when_field_missing() {
let payload = br#"{"content":"hello","thought_process":"t"}"#;
let out = NatsNsedWorker::restamp_published_at(payload, 4242);
let v: serde_json::Value = serde_json::from_slice(&out).unwrap();
assert_eq!(v["published_at_ms"], 4242);
}
#[test]
fn restamp_evaluation_array_stamps_each_entry() {
let payload = br#"[["A",{"score":0.5,"published_at_ms":100}],["B",{"score":-0.5}]]"#;
let out = NatsNsedWorker::restamp_published_at(payload, 7777);
let v: serde_json::Value = serde_json::from_slice(&out).unwrap();
let arr = v.as_array().unwrap();
assert_eq!(arr.len(), 2);
assert_eq!(arr[0][1]["published_at_ms"], 7777);
assert_eq!(arr[1][1]["published_at_ms"], 7777);
assert_eq!(arr[0][1]["score"], 0.5);
}
#[test]
fn restamp_returns_input_unchanged_on_invalid_json() {
let payload = b"not-json{{";
let out = NatsNsedWorker::restamp_published_at(payload, 1);
assert_eq!(out, payload.to_vec());
}
#[test]
fn test_inject_annotations_proposal_object() {
use crate::agents::{AnnotationType, OperatorAnnotation};
let payload = br#"{"content":"hello","thought_process":"think"}"#;
let annotation = OperatorAnnotation {
annotation_type: AnnotationType::Edit,
comment: "Fixed wording".into(),
timestamp: "2026-03-04T00:00:00Z".into(),
original_content_hash: None,
};
let entry = make_entry(payload, true, vec![annotation]);
let result = NatsNsedWorker::inject_annotations(&entry);
let val: serde_json::Value = serde_json::from_slice(&result).unwrap();
assert_eq!(val["content"], "hello");
assert_eq!(val["thought_process"], "think");
assert_eq!(val["edited_by"], "operator");
assert!(val["operator_annotations"].is_array());
assert_eq!(val["operator_annotations"].as_array().unwrap().len(), 1);
assert_eq!(val["operator_annotations"][0]["comment"], "Fixed wording");
}
#[test]
fn test_inject_annotations_proposal_no_edit_no_annotations() {
let payload = br#"{"content":"original"}"#;
let entry = make_entry(payload, false, vec![]);
let result = NatsNsedWorker::inject_annotations(&entry);
let val: serde_json::Value = serde_json::from_slice(&result).unwrap();
assert_eq!(val["content"], "original");
assert!(val.get("edited_by").is_none());
assert!(val.get("operator_annotations").is_none());
}
#[test]
fn test_inject_annotations_evaluation_array() {
use crate::agents::{AnnotationType, OperatorAnnotation};
let payload = br#"[
["agent-A", {"score": 7.5, "justification": "Good work"}],
["agent-B", {"score": 4.0, "justification": "Needs improvement"}]
]"#;
let annotation = OperatorAnnotation {
annotation_type: AnnotationType::Edit,
comment: "Adjusted scores".into(),
timestamp: "2026-03-04T00:00:00Z".into(),
original_content_hash: None,
};
let entry = make_entry(payload, true, vec![annotation]);
let result = NatsNsedWorker::inject_annotations(&entry);
let val: serde_json::Value = serde_json::from_slice(&result).unwrap();
let arr = val.as_array().expect("should remain an array");
assert_eq!(arr.len(), 2);
let first = arr[0].as_array().unwrap();
assert_eq!(first[0], "agent-A");
let eval_a = &first[1];
assert_eq!(eval_a["score"], 7.5);
assert_eq!(eval_a["edited_by"], "operator");
assert!(eval_a["operator_annotations"].is_array());
assert_eq!(
eval_a["operator_annotations"][0]["comment"],
"Adjusted scores"
);
let second = arr[1].as_array().unwrap();
assert_eq!(second[0], "agent-B");
let eval_b = &second[1];
assert_eq!(eval_b["score"], 4.0);
assert_eq!(eval_b["edited_by"], "operator");
assert!(eval_b["operator_annotations"].is_array());
}
#[test]
fn test_inject_annotations_evaluation_array_no_edit() {
use crate::agents::{AnnotationType, OperatorAnnotation};
let payload = br#"[["agent-A", {"score": 5.0}]]"#;
let annotation = OperatorAnnotation {
annotation_type: AnnotationType::Comment,
comment: "Reviewed".into(),
timestamp: "2026-03-04T00:00:00Z".into(),
original_content_hash: None,
};
let entry = make_entry(payload, false, vec![annotation]);
let result = NatsNsedWorker::inject_annotations(&entry);
let val: serde_json::Value = serde_json::from_slice(&result).unwrap();
let arr = val.as_array().unwrap();
let eval_obj = &arr[0].as_array().unwrap()[1];
assert!(eval_obj["operator_annotations"].is_array());
assert_eq!(eval_obj["operator_annotations"][0]["comment"], "Reviewed");
assert!(eval_obj.get("edited_by").is_none());
}
#[test]
fn test_inject_annotations_non_json_passthrough() {
let payload = b"this is not json";
let entry = make_entry(payload, true, vec![]);
let result = NatsNsedWorker::inject_annotations(&entry);
assert_eq!(result, payload.to_vec());
}
#[test]
fn test_round_summary_event_serde_roundtrip() {
use crate::events::{ProposalScoreEntry, RoundSummaryEvent};
let event = RoundSummaryEvent {
round: 1,
convergence_score: 0.75,
decisiveness: 0.75,
net_support: vec![],
cesaro_support: vec![],
raw_distance: None,
claim_convergence: None,
total_claims: None,
leader_claim_convergence: None,
leader_total_claims: None,
controversy_scores: vec![],
proposal_scores: vec![
ProposalScoreEntry {
agent_id: "alpha".into(),
aggregated_score: 6.5,
category_breakdown: None,
controversy_score: None,
..Default::default()
},
ProposalScoreEntry {
agent_id: "beta".into(),
aggregated_score: 3.2,
category_breakdown: None,
controversy_score: None,
..Default::default()
},
],
accumulated_evidence: None,
evidence_target: None,
positive_budget: None,
du_dt: None,
signed_consensus: None,
t_opt: None,
thermo_probability: None,
..Default::default()
};
let json = serde_json::to_vec(&event).unwrap();
let parsed: RoundSummaryEvent = serde_json::from_slice(&json).unwrap();
assert_eq!(parsed.round, 1);
assert_eq!(parsed.proposal_scores.len(), 2);
assert_eq!(parsed.proposal_scores[0].agent_id, "alpha");
assert!((parsed.proposal_scores[0].aggregated_score - 6.5).abs() < f32::EPSILON);
assert_eq!(parsed.proposal_scores[1].agent_id, "beta");
assert!((parsed.proposal_scores[1].aggregated_score - 3.2).abs() < f32::EPSILON);
}
#[test]
fn test_score_dedup_prevents_duplicate() {
use crate::status::{AgentStatusSnapshot, ScoreEntry};
let mut snap = AgentStatusSnapshot::new("agent-1".into(), "gpt-4".into(), "p".into());
snap.push_score(ScoreEntry {
timestamp: "t1".into(),
job_id: "job-A".into(),
round: 1,
evaluator: "aggregated".into(),
score: 6.0,
});
assert_eq!(snap.recent_scores.len(), 1);
let already_has = snap
.recent_scores
.iter()
.any(|s| s.job_id == "job-A" && s.round == 1);
assert!(already_has, "dedup guard should detect existing score");
let different_round = snap
.recent_scores
.iter()
.any(|s| s.job_id == "job-A" && s.round == 2);
assert!(!different_round, "different round should not match");
}
#[test]
fn test_score_extraction_from_propose_context() {
use crate::status::{AgentStatusSnapshot, ScoreEntry};
let mut snap = AgentStatusSnapshot::new("agent-1".into(), "model".into(), "p".into());
let round_number: u32 = 3;
let previous_own_score: Option<f32> = Some(4.5);
let session_id = "job-X";
if let Some(score) = previous_own_score {
let prev_round = round_number.saturating_sub(1);
let already_has = snap
.recent_scores
.iter()
.any(|s| s.job_id == session_id && s.round == prev_round);
if !already_has {
snap.push_score(ScoreEntry {
timestamp: "t".into(),
job_id: session_id.into(),
round: prev_round,
evaluator: "aggregated".into(),
score,
});
}
}
assert_eq!(snap.recent_scores.len(), 1);
let entry = &snap.recent_scores[0];
assert_eq!(entry.round, 2); assert!((entry.score - 4.5).abs() < f32::EPSILON);
assert_eq!(entry.job_id, "job-X");
assert!((snap.mean_score.unwrap() - 4.5).abs() < f32::EPSILON);
}
#[test]
fn test_score_extraction_none_previous_score_no_push() {
use crate::status::AgentStatusSnapshot;
let mut snap = AgentStatusSnapshot::new("agent-1".into(), "model".into(), "p".into());
let round_number: u32 = 3;
let previous_own_score: Option<f32> = None;
let session_id = "job-X";
if let Some(score) = previous_own_score {
let prev_round = round_number.saturating_sub(1);
let already_has = snap
.recent_scores
.iter()
.any(|s| s.job_id == session_id && s.round == prev_round);
if !already_has {
snap.push_score(crate::status::ScoreEntry {
timestamp: "t".into(),
job_id: session_id.into(),
round: prev_round,
evaluator: "aggregated".into(),
score,
});
}
}
assert!(snap.recent_scores.is_empty());
assert!(snap.mean_score.is_none());
}
#[test]
fn test_score_extraction_round_1_saturating_sub() {
use crate::status::{AgentStatusSnapshot, ScoreEntry};
let mut snap = AgentStatusSnapshot::new("agent-1".into(), "model".into(), "p".into());
let round_number: u32 = 1;
let previous_own_score: Option<f32> = Some(5.0);
let session_id = "job-Y";
if let Some(score) = previous_own_score {
let prev_round = round_number.saturating_sub(1);
snap.push_score(ScoreEntry {
timestamp: "t".into(),
job_id: session_id.into(),
round: prev_round,
evaluator: "aggregated".into(),
score,
});
}
assert_eq!(snap.recent_scores.len(), 1);
assert_eq!(snap.recent_scores[0].round, 0); }
#[test]
fn test_score_extraction_skips_evaluate_action() {
use crate::status::AgentStatusSnapshot;
let snap = AgentStatusSnapshot::new("agent-1".into(), "model".into(), "p".into());
let action = "evaluate";
let previous_own_score: Option<f32> = Some(7.0);
let pushed = if action == "propose" {
previous_own_score.is_some()
} else {
false
};
assert!(!pushed, "evaluate action should NOT extract scores");
assert!(snap.recent_scores.is_empty());
}
#[test]
fn test_round_summary_unknown_agent_no_push() {
use crate::events::{ProposalScoreEntry, RoundSummaryEvent};
use crate::status::AgentStatusSnapshot;
let event = RoundSummaryEvent {
round: 1,
convergence_score: 0.5,
decisiveness: 0.5,
net_support: vec![],
cesaro_support: vec![],
raw_distance: None,
claim_convergence: None,
total_claims: None,
leader_claim_convergence: None,
leader_total_claims: None,
controversy_scores: vec![],
proposal_scores: vec![ProposalScoreEntry {
agent_id: "other-agent".into(),
aggregated_score: 8.0,
category_breakdown: None,
controversy_score: None,
..Default::default()
}],
accumulated_evidence: None,
evidence_target: None,
positive_budget: None,
du_dt: None,
signed_consensus: None,
t_opt: None,
thermo_probability: None,
..Default::default()
};
let my_id = "my-agent";
let mut snap = AgentStatusSnapshot::new(my_id.into(), "model".into(), "p".into());
for entry in &event.proposal_scores {
if entry.agent_id == my_id {
snap.push_score(crate::status::ScoreEntry {
timestamp: "t".into(),
job_id: "job".into(),
round: event.round,
evaluator: "aggregated".into(),
score: entry.aggregated_score,
});
break;
}
}
assert!(snap.recent_scores.is_empty());
assert!(snap.mean_score.is_none());
}
#[test]
fn test_round_summary_empty_proposal_scores_no_push() {
use crate::events::RoundSummaryEvent;
use crate::status::AgentStatusSnapshot;
let event = RoundSummaryEvent {
round: 1,
convergence_score: 0.0,
decisiveness: 0.0,
net_support: vec![],
cesaro_support: vec![],
raw_distance: None,
claim_convergence: None,
total_claims: None,
leader_claim_convergence: None,
leader_total_claims: None,
controversy_scores: vec![],
proposal_scores: vec![], accumulated_evidence: None,
evidence_target: None,
positive_budget: None,
du_dt: None,
signed_consensus: None,
t_opt: None,
thermo_probability: None,
..Default::default()
};
let mut snap = AgentStatusSnapshot::new("agent-1".into(), "model".into(), "p".into());
for entry in &event.proposal_scores {
if entry.agent_id == "agent-1" {
snap.push_score(crate::status::ScoreEntry {
timestamp: "t".into(),
job_id: "job".into(),
round: event.round,
evaluator: "aggregated".into(),
score: entry.aggregated_score,
});
break;
}
}
assert!(snap.recent_scores.is_empty());
}
#[test]
fn test_round_summary_subject_parsing_with_prefix() {
let prefix = "nsed";
let subject = "nsed.session-abc.result.event.round_summary";
let prefix_count = if prefix.is_empty() {
0
} else {
prefix.split('.').count()
};
let session_id = subject
.split('.')
.nth(prefix_count)
.unwrap_or("?")
.to_string();
assert_eq!(session_id, "session-abc");
}
#[test]
fn test_round_summary_subject_parsing_empty_prefix() {
let prefix = "";
let subject = "session-xyz.result.event.round_summary";
let prefix_count = if prefix.is_empty() {
0
} else {
prefix.split('.').count()
};
let session_id = subject
.split('.')
.nth(prefix_count)
.unwrap_or("?")
.to_string();
assert_eq!(session_id, "session-xyz");
}
#[test]
fn test_round_summary_subject_parsing_multi_segment_prefix() {
let prefix = "org.nsed";
let subject = "org.nsed.session-42.result.event.round_summary";
let prefix_count = if prefix.is_empty() {
0
} else {
prefix.split('.').count()
};
let session_id = subject
.split('.')
.nth(prefix_count)
.unwrap_or("?")
.to_string();
assert_eq!(session_id, "session-42");
}
#[test]
fn test_round_summary_processes_score_for_own_agent() {
use crate::events::{ProposalScoreEntry, RoundSummaryEvent};
use crate::status::AgentStatusSnapshot;
let event = RoundSummaryEvent {
round: 2,
convergence_score: 0.7,
decisiveness: 0.7,
net_support: vec![],
cesaro_support: vec![],
raw_distance: None,
claim_convergence: None,
total_claims: None,
leader_claim_convergence: None,
leader_total_claims: None,
controversy_scores: vec![],
proposal_scores: vec![
ProposalScoreEntry {
agent_id: "other-agent".into(),
aggregated_score: 6.0,
category_breakdown: None,
controversy_score: None,
..Default::default()
},
ProposalScoreEntry {
agent_id: "my-agent".into(),
aggregated_score: 8.5,
category_breakdown: None,
controversy_score: None,
..Default::default()
},
],
accumulated_evidence: None,
evidence_target: None,
positive_budget: None,
du_dt: None,
signed_consensus: None,
t_opt: None,
thermo_probability: None,
..Default::default()
};
let my_id = "my-agent";
let session_id = "job-123";
let mut snap = AgentStatusSnapshot::new(my_id.into(), "model".into(), "p".into());
for entry in &event.proposal_scores {
if entry.agent_id == my_id {
let already_has = snap
.recent_scores
.iter()
.any(|s| s.job_id == session_id && s.round == event.round);
if !already_has {
snap.push_score(crate::status::ScoreEntry {
timestamp: "t".into(),
job_id: session_id.into(),
round: event.round,
evaluator: "aggregated".into(),
score: entry.aggregated_score,
});
}
break;
}
}
assert_eq!(snap.recent_scores.len(), 1);
assert!((snap.mean_score.unwrap() - 8.5).abs() < f32::EPSILON);
assert_eq!(snap.recent_scores[0].round, 2);
}
#[test]
fn test_round_summary_dedup_prevents_double_push() {
use crate::events::{ProposalScoreEntry, RoundSummaryEvent};
use crate::status::AgentStatusSnapshot;
let event = RoundSummaryEvent {
round: 1,
convergence_score: 0.5,
decisiveness: 0.5,
net_support: vec![],
cesaro_support: vec![],
raw_distance: None,
claim_convergence: None,
total_claims: None,
leader_claim_convergence: None,
leader_total_claims: None,
controversy_scores: vec![],
proposal_scores: vec![ProposalScoreEntry {
agent_id: "agent-1".into(),
aggregated_score: 7.0,
category_breakdown: None,
controversy_score: None,
..Default::default()
}],
accumulated_evidence: None,
evidence_target: None,
positive_budget: None,
du_dt: None,
signed_consensus: None,
t_opt: None,
thermo_probability: None,
..Default::default()
};
let my_id = "agent-1";
let session_id = "job-x";
let mut snap = AgentStatusSnapshot::new(my_id.into(), "m".into(), "p".into());
for _ in 0..2 {
for entry in &event.proposal_scores {
if entry.agent_id == my_id {
let already_has = snap
.recent_scores
.iter()
.any(|s| s.job_id == session_id && s.round == event.round);
if !already_has {
snap.push_score(crate::status::ScoreEntry {
timestamp: "t".into(),
job_id: session_id.into(),
round: event.round,
evaluator: "aggregated".into(),
score: entry.aggregated_score,
});
}
break;
}
}
}
assert_eq!(
snap.recent_scores.len(),
1,
"Dedup should prevent double push"
);
}
#[test]
fn test_round_summary_zero_score_pushed() {
use crate::status::AgentStatusSnapshot;
let mut snap = AgentStatusSnapshot::new("agent-1".into(), "m".into(), "p".into());
snap.push_score(crate::status::ScoreEntry {
timestamp: "t".into(),
job_id: "job-1".into(),
round: 1,
evaluator: "aggregated".into(),
score: 0.0,
});
assert_eq!(
snap.recent_scores.len(),
1,
"Zero score should be pushed (all scores are real)"
);
}
#[test]
fn test_previous_own_score_skips_round_1() {
use crate::status::AgentStatusSnapshot;
let mut snap = AgentStatusSnapshot::new("agent".into(), "m".into(), "p".into());
let round_number: u32 = 1;
let previous_own_score: Option<f32> = Some(5.0);
let session_id = "job-1";
if round_number > 1 {
if let Some(score) = previous_own_score {
let prev_round = round_number.saturating_sub(1);
snap.push_score(crate::status::ScoreEntry {
timestamp: "t".into(),
job_id: session_id.into(),
round: prev_round,
evaluator: "aggregated".into(),
score,
});
}
}
assert!(
snap.recent_scores.is_empty(),
"Round 1 should not push a score for round 0"
);
}
#[test]
fn test_previous_own_score_pushes_for_round_2() {
use crate::status::AgentStatusSnapshot;
let mut snap = AgentStatusSnapshot::new("agent".into(), "m".into(), "p".into());
let round_number: u32 = 3;
let previous_own_score: Option<f32> = Some(7.5);
let session_id = "job-1";
if round_number > 1 {
if let Some(score) = previous_own_score {
let prev_round = round_number.saturating_sub(1);
snap.push_score(crate::status::ScoreEntry {
timestamp: "t".into(),
job_id: session_id.into(),
round: prev_round,
evaluator: "aggregated".into(),
score,
});
}
}
assert_eq!(snap.recent_scores.len(), 1);
assert_eq!(snap.recent_scores[0].round, 2);
assert!((snap.recent_scores[0].score - 7.5).abs() < f32::EPSILON);
}
#[test]
fn test_inject_annotations_empty_eval_array() {
let payload = b"[]";
let entry = make_entry(payload, true, vec![]);
let result = NatsNsedWorker::inject_annotations(&entry);
let val: serde_json::Value = serde_json::from_slice(&result).unwrap();
assert!(val.as_array().unwrap().is_empty());
}
#[test]
fn test_inject_annotations_malformed_tuple_single_element() {
let payload = br#"[["agent-A"]]"#;
let entry = make_entry(payload, true, vec![]);
let result = NatsNsedWorker::inject_annotations(&entry);
let val: serde_json::Value = serde_json::from_slice(&result).unwrap();
let arr = val.as_array().unwrap();
assert_eq!(arr.len(), 1);
let inner = arr[0].as_array().unwrap();
assert_eq!(inner.len(), 1);
assert_eq!(inner[0], "agent-A");
}
#[test]
fn test_inject_annotations_tuple_non_object_at_index_1() {
let payload = br#"[["agent-A", "not-an-object"]]"#;
let entry = make_entry(payload, true, vec![]);
let result = NatsNsedWorker::inject_annotations(&entry);
let val: serde_json::Value = serde_json::from_slice(&result).unwrap();
let inner = val.as_array().unwrap()[0].as_array().unwrap();
assert_eq!(inner[1], "not-an-object"); }
#[test]
fn test_inject_annotations_mixed_valid_and_invalid_tuples() {
use crate::agents::{AnnotationType, OperatorAnnotation};
let payload = br#"[
["agent-A", {"score": 5.0}],
["agent-B"],
["agent-C", {"score": 8.0}]
]"#;
let annotation = OperatorAnnotation {
annotation_type: AnnotationType::Edit,
comment: "Fixed".into(),
timestamp: "t".into(),
original_content_hash: None,
};
let entry = make_entry(payload, true, vec![annotation]);
let result = NatsNsedWorker::inject_annotations(&entry);
let val: serde_json::Value = serde_json::from_slice(&result).unwrap();
let arr = val.as_array().unwrap();
let a = &arr[0].as_array().unwrap()[1];
assert_eq!(a["edited_by"], "operator");
assert!(a["operator_annotations"].is_array());
let b = arr[1].as_array().unwrap();
assert_eq!(b.len(), 1);
let c = &arr[2].as_array().unwrap()[1];
assert_eq!(c["edited_by"], "operator");
}
#[test]
fn test_inject_annotations_non_array_item_in_eval_array() {
let payload = br#"["just-a-string", 42, null]"#;
let entry = make_entry(payload, true, vec![]);
let result = NatsNsedWorker::inject_annotations(&entry);
let val: serde_json::Value = serde_json::from_slice(&result).unwrap();
let arr = val.as_array().unwrap();
assert_eq!(arr.len(), 3);
}
async fn setup_nats() -> Option<async_nats::Client> {
let nats_url =
std::env::var("NATS_URL").unwrap_or_else(|_| "nats://localhost:4222".to_string());
let client = connect_nats(&nats_url, None).await.ok()?;
if client.connection_state() != NatsState::Connected {
return None;
}
Some(client)
}
#[tokio::test]
async fn test_agent_task_redelivered_after_short_ack_wait() {
let client = match setup_nats().await {
Some(c) => c,
None => {
println!("Skipping test: NATS unavailable");
return;
}
};
let js = async_nats::jetstream::new(client.clone());
let unique_id = Uuid::new_v4();
let stream_name = format!("agent_redeliver_test_{}", unique_id);
let agent_id = "test-agent";
let task_subject = format!("{}.session1.task.{}.propose", stream_name, agent_id);
js.create_stream(async_nats::jetstream::stream::Config {
name: stream_name.clone(),
subjects: vec![format!("{}.*.task.{}.>", stream_name, agent_id)],
storage: async_nats::jetstream::stream::StorageType::Memory,
..Default::default()
})
.await
.expect("create test stream");
let context = serde_json::json!({
"task_description": "Test task for redelivery",
"round_number": 1,
"agent_ids": ["test-agent"],
"session_id": "session1"
});
js.publish(
task_subject.clone(),
serde_json::to_vec(&context).unwrap().into(),
)
.await
.expect("publish task");
let consumer_name = format!("agent_consumer_{}", unique_id);
let stream = js.get_stream(&stream_name).await.expect("get stream");
let consumer = stream
.get_or_create_consumer(
&consumer_name,
async_nats::jetstream::consumer::pull::Config {
durable_name: Some(consumer_name.clone()),
filter_subject: format!("{}.*.task.{}.>", stream_name, agent_id),
ack_policy: async_nats::jetstream::consumer::AckPolicy::Explicit,
ack_wait: std::time::Duration::from_secs(3),
..Default::default()
},
)
.await
.expect("create consumer");
let mut messages = consumer.messages().await.expect("messages");
let msg = tokio::time::timeout(std::time::Duration::from_secs(5), messages.next())
.await
.expect("timeout waiting for first delivery")
.expect("stream ended")
.expect("message error");
let payload: serde_json::Value = serde_json::from_slice(&msg.payload).expect("deserialize");
assert_eq!(payload["task_description"], "Test task for redelivery");
drop(messages);
tokio::time::sleep(std::time::Duration::from_secs(4)).await;
let consumer2 = stream
.get_or_create_consumer(
&consumer_name,
async_nats::jetstream::consumer::pull::Config {
durable_name: Some(consumer_name.clone()),
filter_subject: format!("{}.*.task.{}.>", stream_name, agent_id),
ack_policy: async_nats::jetstream::consumer::AckPolicy::Explicit,
ack_wait: std::time::Duration::from_secs(3),
..Default::default()
},
)
.await
.expect("rebind consumer");
let mut messages2 = consumer2.messages().await.expect("messages2");
let redelivered = tokio::time::timeout(std::time::Duration::from_secs(5), messages2.next())
.await
.expect("message should be redelivered within ack_wait window")
.expect("stream ended")
.expect("message error");
let payload2: serde_json::Value =
serde_json::from_slice(&redelivered.payload).expect("deserialize redelivery");
assert_eq!(
payload2["task_description"], "Test task for redelivery",
"Redelivered message should be the same task"
);
if let Ok(info) = redelivered.info() {
assert!(
info.delivered > 1,
"Message should have been delivered more than once (redelivery). Got: {}",
info.delivered
);
}
let _ = redelivered.ack().await;
let _ = js.delete_stream(&stream_name).await;
}
#[tokio::test]
async fn test_progress_heartbeat_prevents_premature_redelivery() {
let client = match setup_nats().await {
Some(c) => c,
None => {
println!("Skipping test: NATS unavailable");
return;
}
};
let js = async_nats::jetstream::new(client.clone());
let unique_id = Uuid::new_v4();
let stream_name = format!("agent_hb_test_{}", unique_id);
let agent_id = "hb-agent";
let task_subject = format!("{}.session1.task.{}.propose", stream_name, agent_id);
js.create_stream(async_nats::jetstream::stream::Config {
name: stream_name.clone(),
subjects: vec![format!("{}.*.task.{}.>", stream_name, agent_id)],
storage: async_nats::jetstream::stream::StorageType::Memory,
..Default::default()
})
.await
.expect("create test stream");
let context = serde_json::json!({
"task_description": "Heartbeat test task",
"round_number": 1,
"agent_ids": ["hb-agent"],
"session_id": "session1"
});
js.publish(
task_subject.clone(),
serde_json::to_vec(&context).unwrap().into(),
)
.await
.expect("publish task");
let consumer_name = format!("hb_consumer_{}", unique_id);
let stream = js.get_stream(&stream_name).await.expect("get stream");
let consumer = stream
.get_or_create_consumer(
&consumer_name,
async_nats::jetstream::consumer::pull::Config {
durable_name: Some(consumer_name.clone()),
filter_subject: format!("{}.*.task.{}.>", stream_name, agent_id),
ack_policy: async_nats::jetstream::consumer::AckPolicy::Explicit,
ack_wait: std::time::Duration::from_secs(3), ..Default::default()
},
)
.await
.expect("create consumer");
let mut messages = consumer.messages().await.expect("messages");
let msg = tokio::time::timeout(std::time::Duration::from_secs(5), messages.next())
.await
.expect("timeout")
.expect("stream ended")
.expect("message error");
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
msg.ack_with(async_nats::jetstream::AckKind::Progress)
.await
.expect("progress ack");
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
msg.ack().await.expect("final ack");
drop(messages);
let consumer3 = stream
.get_or_create_consumer(
&consumer_name,
async_nats::jetstream::consumer::pull::Config {
durable_name: Some(consumer_name.clone()),
filter_subject: format!("{}.*.task.{}.>", stream_name, agent_id),
ack_policy: async_nats::jetstream::consumer::AckPolicy::Explicit,
ack_wait: std::time::Duration::from_secs(3),
..Default::default()
},
)
.await
.expect("rebind consumer");
let mut messages3 = consumer3.messages().await.expect("messages3");
let result =
tokio::time::timeout(std::time::Duration::from_secs(4), messages3.next()).await;
assert!(
result.is_err(),
"No message should be redelivered after successful Progress+Ack"
);
let _ = js.delete_stream(&stream_name).await;
}
#[derive(Debug, Clone)]
struct MockAgent;
#[async_trait]
impl NsedAgent for MockAgent {
async fn propose(&self, _context: &AgentContext) -> Result<crate::agents::Proposal> {
Ok(crate::agents::Proposal {
content: "mock".into(),
..Default::default()
})
}
async fn evaluate(
&self,
_context: &AgentContext,
) -> Result<Vec<(String, crate::agents::Evaluation)>> {
Ok(vec![])
}
fn name(&self) -> String {
"mock-agent".into()
}
}
#[tokio::test]
async fn test_worker_pause_resume() {
let client = match setup_nats().await {
Some(c) => c,
None => {
println!("Skipping test: NATS unavailable");
return;
}
};
drop(client);
let config = WorkerConfig::new(
std::env::var("NATS_URL").unwrap_or_else(|_| "nats://localhost:4222".to_string()),
format!("test_pause_resume_{}", Uuid::new_v4()),
format!("consumer_pause_resume_{}", Uuid::new_v4()),
);
let agent_config = AgentConfig {
name: "mock-agent".into(),
provider_id: "test".into(),
model_name: "test-model".into(),
..Default::default()
};
let worker = NatsNsedWorker::new(MockAgent, agent_config, config, None)
.await
.expect("worker creation should succeed");
assert!(!worker.is_paused(), "worker should start unpaused");
worker.pause();
assert!(worker.is_paused(), "worker should be paused after pause()");
worker.resume();
assert!(
!worker.is_paused(),
"worker should be unpaused after resume()"
);
worker.pause();
worker.pause();
assert!(worker.is_paused(), "double pause should still be paused");
worker.resume();
assert!(!worker.is_paused(), "single resume should unpause");
}
#[tokio::test]
async fn test_worker_with_response_buffer() {
let client = match setup_nats().await {
Some(c) => c,
None => {
println!("Skipping test: NATS unavailable");
return;
}
};
drop(client);
let config = WorkerConfig::new(
std::env::var("NATS_URL").unwrap_or_else(|_| "nats://localhost:4222".to_string()),
format!("test_buffer_{}", Uuid::new_v4()),
format!("consumer_buffer_{}", Uuid::new_v4()),
);
let agent_config = AgentConfig {
name: "mock-agent".into(),
provider_id: "test".into(),
model_name: "test-model".into(),
..Default::default()
};
let worker = NatsNsedWorker::new(MockAgent, agent_config.clone(), config.clone(), None)
.await
.expect("worker creation should succeed");
assert!(worker.response_buffer().is_none(), "no buffer by default");
let config2 = WorkerConfig::new(
std::env::var("NATS_URL").unwrap_or_else(|_| "nats://localhost:4222".to_string()),
format!("test_buffer2_{}", Uuid::new_v4()),
format!("consumer_buffer2_{}", Uuid::new_v4()),
);
let worker = NatsNsedWorker::new(MockAgent, agent_config, config2, None)
.await
.expect("worker creation should succeed")
.with_response_buffer(std::time::Duration::from_secs(30));
assert!(
worker.response_buffer().is_some(),
"buffer should be set after with_response_buffer()"
);
let handle = worker.pause_handle();
assert!(
!handle.load(Ordering::Relaxed),
"handle should start as false"
);
worker.pause();
assert!(
handle.load(Ordering::Relaxed),
"handle should reflect paused state"
);
assert!(
worker.response_buffer().unwrap().is_paused(),
"buffer should be paused when worker is paused"
);
worker.resume();
assert!(
!handle.load(Ordering::Relaxed),
"handle should reflect unpaused state"
);
assert!(
!worker.response_buffer().unwrap().is_paused(),
"buffer should be unpaused when worker is resumed"
);
}
#[tokio::test]
async fn test_worker_pause_handle_external_mutation() {
let client = match setup_nats().await {
Some(c) => c,
None => {
println!("Skipping test: NATS unavailable");
return;
}
};
drop(client);
let config = WorkerConfig::new(
std::env::var("NATS_URL").unwrap_or_else(|_| "nats://localhost:4222".to_string()),
format!("test_handle_{}", Uuid::new_v4()),
format!("consumer_handle_{}", Uuid::new_v4()),
);
let agent_config = AgentConfig {
name: "mock-agent".into(),
provider_id: "test".into(),
model_name: "test-model".into(),
..Default::default()
};
let worker = NatsNsedWorker::new(MockAgent, agent_config, config, None)
.await
.expect("worker creation should succeed");
let handle = worker.pause_handle();
handle.store(true, Ordering::Relaxed);
assert!(
worker.is_paused(),
"is_paused() should reflect external handle mutation"
);
handle.store(false, Ordering::Relaxed);
assert!(
!worker.is_paused(),
"is_paused() should reflect external handle un-mutation"
);
}
#[test]
fn test_inject_annotations_primitive_json_value_passthrough() {
for payload in &[
br#""just a string""#.to_vec(),
b"42".to_vec(),
b"true".to_vec(),
b"null".to_vec(),
] {
let entry = make_entry(payload, true, vec![]);
let result = NatsNsedWorker::inject_annotations(&entry);
let original: serde_json::Value = serde_json::from_slice(payload).unwrap();
let returned: serde_json::Value = serde_json::from_slice(&result).unwrap();
assert_eq!(
original, returned,
"primitive JSON should pass through unchanged"
);
}
}
#[test]
fn test_inject_annotations_proposal_edited_no_annotations() {
let payload = br#"{"content":"hello"}"#;
let entry = make_entry(payload, true, vec![]);
let result = NatsNsedWorker::inject_annotations(&entry);
let val: serde_json::Value = serde_json::from_slice(&result).unwrap();
assert_eq!(val["edited_by"], "operator");
assert!(
val.get("operator_annotations").is_none(),
"empty annotations should not produce operator_annotations key"
);
}
#[test]
fn test_inject_annotations_proposal_annotations_no_edit() {
use crate::agents::{AnnotationType, OperatorAnnotation};
let payload = br#"{"content":"hello"}"#;
let annotation = OperatorAnnotation {
annotation_type: AnnotationType::Comment,
comment: "Reviewed".into(),
timestamp: "t".into(),
original_content_hash: None,
};
let entry = make_entry(payload, false, vec![annotation]);
let result = NatsNsedWorker::inject_annotations(&entry);
let val: serde_json::Value = serde_json::from_slice(&result).unwrap();
assert!(val["operator_annotations"].is_array());
assert_eq!(val["operator_annotations"].as_array().unwrap().len(), 1);
assert!(
val.get("edited_by").is_none(),
"should NOT add edited_by when edited=false"
);
}
#[test]
fn test_inject_annotations_proposal_multiple_annotations() {
use crate::agents::{AnnotationType, OperatorAnnotation};
let payload = br#"{"content":"test"}"#;
let annotations = vec![
OperatorAnnotation {
annotation_type: AnnotationType::Comment,
comment: "First comment".into(),
timestamp: "t1".into(),
original_content_hash: None,
},
OperatorAnnotation {
annotation_type: AnnotationType::Edit,
comment: "Edited".into(),
timestamp: "t2".into(),
original_content_hash: Some("hash123".into()),
},
OperatorAnnotation {
annotation_type: AnnotationType::Comment,
comment: "Final LGTM".into(),
timestamp: "t3".into(),
original_content_hash: None,
},
];
let entry = make_entry(payload, true, annotations);
let result = NatsNsedWorker::inject_annotations(&entry);
let val: serde_json::Value = serde_json::from_slice(&result).unwrap();
assert_eq!(val["edited_by"], "operator");
let ann_arr = val["operator_annotations"].as_array().unwrap();
assert_eq!(ann_arr.len(), 3);
assert_eq!(ann_arr[0]["comment"], "First comment");
assert_eq!(ann_arr[1]["comment"], "Edited");
assert_eq!(ann_arr[2]["comment"], "Final LGTM");
}
#[test]
fn test_inject_annotations_eval_edited_no_annotations() {
let payload = br#"[["agent-A", {"score": 5.0}], ["agent-B", {"score": 7.0}]]"#;
let entry = make_entry(payload, true, vec![]);
let result = NatsNsedWorker::inject_annotations(&entry);
let val: serde_json::Value = serde_json::from_slice(&result).unwrap();
let arr = val.as_array().unwrap();
for item in arr {
let eval = &item.as_array().unwrap()[1];
assert_eq!(eval["edited_by"], "operator");
assert!(
eval.get("operator_annotations").is_none(),
"no annotations should produce no operator_annotations key"
);
}
}
#[test]
fn test_inject_annotations_eval_non_array_items_skipped() {
use crate::agents::{AnnotationType, OperatorAnnotation};
let payload = br#"[42, ["agent-A", {"score": 5.0}], null, "string"]"#;
let annotation = OperatorAnnotation {
annotation_type: AnnotationType::Comment,
comment: "test".into(),
timestamp: "t".into(),
original_content_hash: None,
};
let entry = make_entry(payload, true, vec![annotation]);
let result = NatsNsedWorker::inject_annotations(&entry);
let val: serde_json::Value = serde_json::from_slice(&result).unwrap();
let arr = val.as_array().unwrap();
assert_eq!(arr.len(), 4);
let eval_a = &arr[1].as_array().unwrap()[1];
assert_eq!(eval_a["edited_by"], "operator");
assert!(eval_a["operator_annotations"].is_array());
assert_eq!(arr[0], 42);
assert!(arr[2].is_null());
assert_eq!(arr[3], "string");
}
fn is_payment_error(err_str: &str) -> bool {
err_str.contains("402 Payment Required")
|| err_str.contains("insufficient_quota")
|| err_str.contains("billing")
}
fn should_suppress_error_event(err_str: &str, propagate_payment_error: bool) -> bool {
is_payment_error(err_str) && !propagate_payment_error
}
#[test]
fn test_payment_error_402() {
assert!(is_payment_error("HTTP error: 402 Payment Required"));
assert!(is_payment_error("402 Payment Required: no credits"));
}
#[test]
fn test_payment_error_insufficient_quota() {
assert!(is_payment_error("OpenAI error: insufficient_quota"));
assert!(is_payment_error("insufficient_quota for this model"));
}
#[test]
fn test_payment_error_billing() {
assert!(is_payment_error("Your billing account has been suspended"));
assert!(is_payment_error("billing information required"));
}
#[test]
fn test_non_payment_errors_not_detected() {
assert!(!is_payment_error("500 Internal Server Error"));
assert!(!is_payment_error("Connection timeout"));
assert!(!is_payment_error("rate limit exceeded"));
assert!(!is_payment_error("model not found"));
assert!(!is_payment_error(""));
}
#[test]
fn test_suppress_error_when_payment_and_not_propagate() {
assert!(should_suppress_error_event("402 Payment Required", false));
assert!(should_suppress_error_event("insufficient_quota", false));
assert!(should_suppress_error_event("billing issue", false));
}
#[test]
fn test_no_suppress_when_payment_and_propagate() {
assert!(!should_suppress_error_event("402 Payment Required", true));
assert!(!should_suppress_error_event("insufficient_quota", true));
}
#[test]
fn test_no_suppress_when_not_payment_error() {
assert!(!should_suppress_error_event(
"500 Internal Server Error",
false
));
assert!(!should_suppress_error_event("Connection refused", true));
}
#[test]
fn test_heartbeat_status_idle_when_no_active_jobs() {
let active_jobs: HashSet<String> = HashSet::new();
let active_job = active_jobs.iter().next().cloned();
let status = if active_job.is_some() {
AgentLiveStatus::Busy
} else {
AgentLiveStatus::Idle
};
assert_eq!(status, AgentLiveStatus::Idle);
assert!(active_job.is_none());
}
#[test]
fn test_heartbeat_status_busy_when_active_job() {
let mut active_jobs: HashSet<String> = HashSet::new();
active_jobs.insert("session-123".to_string());
let active_job = active_jobs.iter().next().cloned();
let status = if active_job.is_some() {
AgentLiveStatus::Busy
} else {
AgentLiveStatus::Idle
};
assert_eq!(status, AgentLiveStatus::Busy);
assert!(active_job.is_some());
}
#[test]
fn test_heartbeat_status_busy_with_multiple_active_jobs() {
let mut active_jobs: HashSet<String> = HashSet::new();
active_jobs.insert("session-1".to_string());
active_jobs.insert("session-2".to_string());
let active_job = active_jobs.iter().next().cloned();
assert!(active_job.is_some());
let status = if active_job.is_some() {
AgentLiveStatus::Busy
} else {
AgentLiveStatus::Idle
};
assert_eq!(status, AgentLiveStatus::Busy);
}
#[test]
fn test_heartbeat_error_extraction_no_status() {
let status: Option<&str> = None;
let (tasks_completed, tasks_failed, last_error): (u64, u64, Option<String>) =
if status.is_some() {
unreachable!()
} else {
(0, 0, None)
};
assert_eq!(tasks_completed, 0);
assert_eq!(tasks_failed, 0);
assert!(last_error.is_none());
}
#[test]
fn test_heartbeat_error_extraction_from_task_log() {
use crate::status::{AgentStatusSnapshot, TaskLogEntry};
let mut snap = AgentStatusSnapshot::new("agent-1".into(), "model".into(), "p".into());
snap.push_task(TaskLogEntry {
timestamp: "t1".into(),
action: "propose".into(),
job_id: "job-1".into(),
round: 1,
status: "ok".into(),
duration_ms: 100,
content_preview: None,
});
snap.push_task(TaskLogEntry {
timestamp: "t2".into(),
action: "evaluate".into(),
job_id: "job-2".into(),
round: 1,
status: "error".into(),
duration_ms: 200,
content_preview: Some("Error: connection timeout".into()),
});
let err = snap
.recent_tasks
.iter()
.find(|t| t.status == "error")
.map(|t| {
let msg = format!("{}: {}", t.action, t.job_id);
msg.chars().take(120).collect::<String>()
});
assert!(err.is_some());
assert_eq!(err.unwrap(), "evaluate: job-2");
}
#[test]
fn test_heartbeat_error_truncation_120_chars() {
use crate::status::{AgentStatusSnapshot, TaskLogEntry};
let mut snap = AgentStatusSnapshot::new("agent-1".into(), "model".into(), "p".into());
let long_job_id = "a".repeat(200);
snap.push_task(TaskLogEntry {
timestamp: "t1".into(),
action: "propose".into(),
job_id: long_job_id.clone(),
round: 1,
status: "error".into(),
duration_ms: 500,
content_preview: None,
});
let err = snap
.recent_tasks
.iter()
.find(|t| t.status == "error")
.map(|t| {
let msg = format!("{}: {}", t.action, t.job_id);
msg.chars().take(120).collect::<String>()
});
assert!(err.is_some());
let err_msg = err.unwrap();
assert_eq!(
err_msg.chars().count(),
120,
"error should be truncated to 120 chars"
);
assert!(err_msg.starts_with("propose: "));
}
#[test]
fn test_heartbeat_error_no_error_tasks() {
use crate::status::{AgentStatusSnapshot, TaskLogEntry};
let mut snap = AgentStatusSnapshot::new("agent-1".into(), "model".into(), "p".into());
snap.push_task(TaskLogEntry {
timestamp: "t1".into(),
action: "propose".into(),
job_id: "job-1".into(),
round: 1,
status: "ok".into(),
duration_ms: 100,
content_preview: None,
});
let err = snap
.recent_tasks
.iter()
.find(|t| t.status == "error")
.map(|t| {
let msg = format!("{}: {}", t.action, t.job_id);
msg.chars().take(120).collect::<String>()
});
assert!(err.is_none());
}
#[test]
fn test_extract_preview_proposal_basic() {
let payload = serde_json::json!({
"content": "Hello world proposal",
"thought_process": "I thought about it"
});
let bytes = serde_json::to_vec(&payload).unwrap();
let preview = extract_content_preview(&bytes, "propose", &[]);
assert!(preview.is_some());
let parsed: serde_json::Value = serde_json::from_str(&preview.unwrap()).unwrap();
assert_eq!(parsed["t"], "p");
assert_eq!(parsed["c"], "Hello world proposal");
assert_eq!(parsed["tp"], "I thought about it");
}
#[test]
fn test_extract_preview_proposal_no_thought_process() {
let payload = serde_json::json!({
"content": "Simple proposal"
});
let bytes = serde_json::to_vec(&payload).unwrap();
let preview = extract_content_preview(&bytes, "propose", &[]);
assert!(preview.is_some());
let parsed: serde_json::Value = serde_json::from_str(&preview.unwrap()).unwrap();
assert_eq!(parsed["t"], "p");
assert_eq!(parsed["c"], "Simple proposal");
assert!(parsed.get("tp").is_none());
}
#[test]
fn test_extract_preview_proposal_empty_content() {
let payload = serde_json::json!({
"content": "",
"thought_process": "I thought but had nothing to say"
});
let bytes = serde_json::to_vec(&payload).unwrap();
let preview = extract_content_preview(&bytes, "propose", &[]);
assert!(preview.is_none(), "empty content should return None");
}
#[test]
fn test_extract_preview_proposal_truncates_long_content() {
let long_content = "x".repeat(3000);
let payload = serde_json::json!({
"content": long_content,
"thought_process": ""
});
let bytes = serde_json::to_vec(&payload).unwrap();
let preview = extract_content_preview(&bytes, "propose", &[]);
assert!(preview.is_some());
let parsed: serde_json::Value = serde_json::from_str(&preview.unwrap()).unwrap();
let c = parsed["c"].as_str().unwrap();
assert!(
c.chars().count() <= 2002,
"content should be truncated to ~2000 chars, got {}",
c.chars().count()
);
assert!(
c.ends_with('\u{2026}'),
"truncated content should end with ellipsis"
);
}
#[test]
fn test_extract_preview_proposal_truncates_long_thought_process() {
let long_tp = "y".repeat(1000);
let payload = serde_json::json!({
"content": "brief",
"thought_process": long_tp
});
let bytes = serde_json::to_vec(&payload).unwrap();
let preview = extract_content_preview(&bytes, "propose", &[]);
assert!(preview.is_some());
let parsed: serde_json::Value = serde_json::from_str(&preview.unwrap()).unwrap();
let tp = parsed["tp"].as_str().unwrap();
assert!(
tp.chars().count() <= 502,
"tp should be truncated to ~500 chars, got {}",
tp.chars().count()
);
assert!(
tp.ends_with('\u{2026}'),
"truncated tp should end with ellipsis"
);
}
#[test]
fn test_extract_preview_evaluation_basic() {
let payload = serde_json::json!([
["agent-A", {"score": 7.5, "justification": "Good work", "stance": "agree"}],
["agent-B", {"score": 4.0, "justification": "Needs improvement", "textual_feedback": "Try harder"}]
]);
let bytes = serde_json::to_vec(&payload).unwrap();
let preview = extract_content_preview(&bytes, "evaluate", &[]);
assert!(preview.is_some());
let parsed: serde_json::Value = serde_json::from_str(&preview.unwrap()).unwrap();
assert_eq!(parsed["t"], "e");
let evals = parsed["evals"].as_array().unwrap();
assert_eq!(evals.len(), 2);
assert_eq!(evals[0]["target"], "agent-A");
assert_eq!(evals[0]["s"], 7.5);
assert_eq!(evals[0]["j"], "Good work");
assert_eq!(evals[0]["stance"], "agree");
assert_eq!(evals[1]["target"], "agent-B");
assert_eq!(evals[1]["s"], 4.0);
assert_eq!(evals[1]["tf"], "Try harder");
}
#[test]
fn test_extract_preview_evaluation_empty_array() {
let payload = serde_json::json!([]);
let bytes = serde_json::to_vec(&payload).unwrap();
let preview = extract_content_preview(&bytes, "evaluate", &[]);
assert!(preview.is_none(), "empty eval array should return None");
}
#[test]
fn test_extract_preview_evaluation_truncates_justification() {
let long_justification = "z".repeat(500);
let payload = serde_json::json!([
["agent-A", {"score": 5.0, "justification": long_justification}]
]);
let bytes = serde_json::to_vec(&payload).unwrap();
let preview = extract_content_preview(&bytes, "evaluate", &[]);
assert!(preview.is_some());
let parsed: serde_json::Value = serde_json::from_str(&preview.unwrap()).unwrap();
let j = parsed["evals"][0]["j"].as_str().unwrap();
assert!(
j.chars().count() <= 302,
"justification should be truncated to ~300 chars, got {}",
j.chars().count()
);
assert!(
j.ends_with('\u{2026}'),
"truncated justification should end with ellipsis"
);
}
#[test]
fn test_extract_preview_evaluation_truncates_textual_feedback() {
let long_tf = "w".repeat(400);
let payload = serde_json::json!([
["agent-A", {"score": 5.0, "textual_feedback": long_tf}]
]);
let bytes = serde_json::to_vec(&payload).unwrap();
let preview = extract_content_preview(&bytes, "evaluate", &[]);
assert!(preview.is_some());
let parsed: serde_json::Value = serde_json::from_str(&preview.unwrap()).unwrap();
let tf = parsed["evals"][0]["tf"].as_str().unwrap();
assert!(
tf.chars().count() <= 202,
"tf should be truncated to ~200 chars, got {}",
tf.chars().count()
);
assert!(
tf.ends_with('\u{2026}'),
"truncated tf should end with ellipsis"
);
}
#[test]
fn test_extract_preview_evaluation_with_category_scores() {
let payload = serde_json::json!([
["agent-A", {
"score": 6.0,
"justification": "OK",
"category_scores": {"accuracy": 7, "clarity": 8}
}]
]);
let bytes = serde_json::to_vec(&payload).unwrap();
let preview = extract_content_preview(&bytes, "evaluate", &[]);
assert!(preview.is_some());
let parsed: serde_json::Value = serde_json::from_str(&preview.unwrap()).unwrap();
let cats = &parsed["evals"][0]["cats"];
assert_eq!(cats["accuracy"], 7);
assert_eq!(cats["clarity"], 8);
}
#[test]
fn test_extract_preview_evaluation_with_claim_assessments() {
let payload = serde_json::json!([
["agent-A", {
"score": 6.0,
"justification": "OK",
"claim_assessments": [{"claim": "X", "verdict": "agree"}]
}]
]);
let bytes = serde_json::to_vec(&payload).unwrap();
let preview = extract_content_preview(&bytes, "evaluate", &[]);
assert!(preview.is_some());
let parsed: serde_json::Value = serde_json::from_str(&preview.unwrap()).unwrap();
let claims = &parsed["evals"][0]["claims"];
assert!(claims.is_array());
assert_eq!(claims[0]["claim"], "X");
}
#[test]
fn test_extract_preview_evaluation_with_disagreements() {
let payload = serde_json::json!([
["agent-A", {
"score": 3.0,
"justification": "Bad",
"disagreements": ["point 1", "point 2"]
}]
]);
let bytes = serde_json::to_vec(&payload).unwrap();
let preview = extract_content_preview(&bytes, "evaluate", &[]);
assert!(preview.is_some());
let parsed: serde_json::Value = serde_json::from_str(&preview.unwrap()).unwrap();
let disputes = &parsed["evals"][0]["disputes"];
assert!(disputes.is_array());
assert_eq!(disputes.as_array().unwrap().len(), 2);
}
#[test]
fn test_extract_preview_evaluation_caps_at_10_entries() {
let mut evals = Vec::new();
for i in 0..15 {
evals.push(serde_json::json!([
format!("agent-{}", i),
{"score": i as f64, "justification": "ok"}
]));
}
let payload = serde_json::Value::Array(evals);
let bytes = serde_json::to_vec(&payload).unwrap();
let preview = extract_content_preview(&bytes, "evaluate", &[]);
assert!(preview.is_some());
let parsed: serde_json::Value = serde_json::from_str(&preview.unwrap()).unwrap();
let eval_arr = parsed["evals"].as_array().unwrap();
assert_eq!(eval_arr.len(), 10, "should cap at 10 eval entries");
}
#[test]
fn test_extract_preview_unknown_action_returns_none() {
let payload = serde_json::json!({"content": "test"});
let bytes = serde_json::to_vec(&payload).unwrap();
let preview = extract_content_preview(&bytes, "unknown_action", &[]);
assert!(preview.is_none(), "unknown action should return None");
}
#[test]
fn test_extract_preview_invalid_json_returns_none() {
let bytes = b"not valid json at all";
let preview = extract_content_preview(bytes, "propose", &[]);
assert!(preview.is_none(), "invalid JSON should return None");
}
#[test]
fn test_extract_preview_proposal_missing_content_field() {
let payload = serde_json::json!({"thought_process": "thinking..."});
let bytes = serde_json::to_vec(&payload).unwrap();
let preview = extract_content_preview(&bytes, "propose", &[]);
assert!(
preview.is_none(),
"missing content field should return None"
);
}
#[test]
fn test_extract_preview_evaluation_malformed_tuple() {
let payload = serde_json::json!(["not-a-tuple", 42]);
let bytes = serde_json::to_vec(&payload).unwrap();
let preview = extract_content_preview(&bytes, "evaluate", &[]);
assert!(
preview.is_none(),
"non-tuple items should produce empty evals → None"
);
}
#[test]
fn test_extract_preview_evaluation_tuple_missing_eval_obj() {
let payload = serde_json::json!([["agent-A"]]);
let bytes = serde_json::to_vec(&payload).unwrap();
let preview = extract_content_preview(&bytes, "evaluate", &[]);
assert!(
preview.is_none(),
"tuple without eval obj at index 1 should produce empty evals → None"
);
}
#[test]
fn test_worker_config_with_api_prefix() {
let config = WorkerConfig::new(
"nats://localhost:4222".to_string(),
"stream".to_string(),
"consumer".to_string(),
)
.with_api_prefix("my_api".to_string());
assert_eq!(config.api_prefix, "my_api");
assert_eq!(config.subject_prefix, "nsed");
assert_eq!(config.scratchpad_retention_secs, 86400 * 7);
}
#[test]
fn test_round_summary_subject_parsing_missing_segment_fallback() {
let prefix = "nsed.v2.extra"; let subject = "nsed.v2.extra";
let prefix_count = if prefix.is_empty() {
0
} else {
prefix.split('.').count()
};
let session_id = subject
.split('.')
.nth(prefix_count) .unwrap_or("?")
.to_string();
assert_eq!(session_id, "?", "missing segment should fallback to '?'");
}
#[test]
fn test_round_summary_subject_parsing_single_segment() {
let prefix = "nsed";
let subject = "nsed.my-session";
let prefix_count = prefix.split('.').count(); let session_id = subject
.split('.')
.nth(prefix_count)
.unwrap_or("?")
.to_string();
assert_eq!(session_id, "my-session");
}
#[tokio::test]
async fn test_auto_approve_enabled_by_default() {
let buf = buffer::ResponseBuffer::new(std::time::Duration::from_secs(60));
assert!(
buf.is_auto_approve(),
"auto-approve should be ON by default"
);
}
#[tokio::test]
async fn test_auto_approve_enable_disable() {
let buf = buffer::ResponseBuffer::new(std::time::Duration::from_secs(60));
assert!(buf.is_auto_approve());
buf.set_auto_approve(false);
assert!(!buf.is_auto_approve());
buf.set_auto_approve(true);
assert!(buf.is_auto_approve());
}
#[tokio::test]
async fn test_auto_approve_threshold_default() {
let buf = buffer::ResponseBuffer::new(std::time::Duration::from_secs(60));
let threshold = buf.auto_approve_threshold();
assert!(
(threshold - 1.0).abs() < 0.01,
"default threshold should be 1.0 (release everything), got {}",
threshold
);
}
#[tokio::test]
async fn test_auto_approve_threshold_set() {
let buf = buffer::ResponseBuffer::new(std::time::Duration::from_secs(60));
buf.set_auto_approve_threshold(0.3);
assert!((buf.auto_approve_threshold() - 0.3).abs() < 0.01);
}
#[tokio::test]
async fn test_auto_approve_threshold_clamps() {
let buf = buffer::ResponseBuffer::new(std::time::Duration::from_secs(60));
buf.set_auto_approve_threshold(2.0);
assert!(
(buf.auto_approve_threshold() - 1.0).abs() < 0.01,
"should clamp to 1.0"
);
buf.set_auto_approve_threshold(-0.5);
assert!(
(buf.auto_approve_threshold() - 0.0).abs() < 0.01,
"should clamp to 0.0"
);
}
#[tokio::test]
async fn test_auto_release_disabled_returns_zero() {
let buf = buffer::ResponseBuffer::new(std::time::Duration::from_secs(60));
buf.set_auto_approve(false);
let count = buf.auto_release_if_eligible(Some(0.1)).await;
assert_eq!(count, 0);
}
#[tokio::test]
async fn test_auto_release_above_threshold_returns_zero() {
let buf = buffer::ResponseBuffer::new(std::time::Duration::from_secs(60));
buf.set_auto_approve(true);
buf.set_auto_approve_threshold(0.3);
let now = std::time::Instant::now();
buf.push(buffer::BufferedResponse {
id: "ar-1".into(),
action: "propose".into(),
job_id: "j".into(),
round: 1,
reply_subject: "s".into(),
payload: b"{}".to_vec(),
created_at: now,
release_at: now + std::time::Duration::from_secs(3600),
ack_handle: Box::new(TestAckHandle),
msg_id: "m".into(),
annotations: vec![],
edited: false,
stopped: false,
})
.await;
let count = buf.auto_release_if_eligible(Some(0.5)).await;
assert_eq!(count, 0);
}
#[tokio::test]
async fn test_auto_release_below_threshold_marks_entries() {
let buf = buffer::ResponseBuffer::new(std::time::Duration::from_secs(60));
buf.set_auto_approve(true);
buf.set_auto_approve_threshold(0.5);
let now = std::time::Instant::now();
for i in 0..3 {
buf.push(buffer::BufferedResponse {
id: format!("ar-{}", i),
action: "propose".into(),
job_id: "j".into(),
round: 1,
reply_subject: "s".into(),
payload: b"{}".to_vec(),
created_at: now,
release_at: now + std::time::Duration::from_secs(3600),
ack_handle: Box::new(TestAckHandle),
msg_id: format!("m-{}", i),
annotations: vec![],
edited: false,
stopped: false,
})
.await;
}
let count = buf.auto_release_if_eligible(Some(0.2)).await;
assert_eq!(count, 3);
let drained = buf.drain_ready().await;
assert_eq!(drained.len(), 3);
}
#[tokio::test]
async fn test_auto_release_none_divergence_releases() {
let buf = buffer::ResponseBuffer::new(std::time::Duration::from_secs(60));
buf.set_auto_approve(true);
let now = std::time::Instant::now();
buf.push(buffer::BufferedResponse {
id: "ar-none".into(),
action: "propose".into(),
job_id: "j".into(),
round: 1,
reply_subject: "s".into(),
payload: b"{}".to_vec(),
created_at: now,
release_at: now + std::time::Duration::from_secs(3600),
ack_handle: Box::new(TestAckHandle),
msg_id: "m-none".into(),
annotations: vec![],
edited: false,
stopped: false,
})
.await;
let count = buf.auto_release_if_eligible(None).await;
assert_eq!(count, 1);
}
#[tokio::test]
async fn test_auto_release_skips_stopped_entries() {
let buf = buffer::ResponseBuffer::new(std::time::Duration::from_secs(60));
buf.set_auto_approve(true);
buf.set_auto_approve_threshold(0.5);
let now = std::time::Instant::now();
buf.push(buffer::BufferedResponse {
id: "ar-stop".into(),
action: "propose".into(),
job_id: "j".into(),
round: 1,
reply_subject: "s".into(),
payload: b"{}".to_vec(),
created_at: now,
release_at: now + std::time::Duration::from_secs(3600),
ack_handle: Box::new(TestAckHandle),
msg_id: "m-stop".into(),
annotations: vec![],
edited: false,
stopped: true,
})
.await;
let count = buf.auto_release_if_eligible(Some(0.1)).await;
assert_eq!(count, 0);
}
#[tokio::test]
async fn test_auto_release_skips_already_ready_entries() {
let buf = buffer::ResponseBuffer::new(std::time::Duration::from_secs(60));
buf.set_auto_approve(true);
buf.set_auto_approve_threshold(0.5);
let now = std::time::Instant::now();
buf.push(buffer::BufferedResponse {
id: "ar-past".into(),
action: "propose".into(),
job_id: "j".into(),
round: 1,
reply_subject: "s".into(),
payload: b"{}".to_vec(),
created_at: now,
release_at: now, ack_handle: Box::new(TestAckHandle),
msg_id: "m-past".into(),
annotations: vec![],
edited: false,
stopped: false,
})
.await;
let count = buf.auto_release_if_eligible(Some(0.1)).await;
assert_eq!(count, 0);
}
#[test]
fn test_compute_adaptive_hold_negative_score() {
let base = std::time::Duration::from_secs(10);
let hold = buffer::compute_adaptive_hold(base, Some(-0.5), 3.0);
let expected = std::time::Duration::from_secs(30);
assert!(
(hold.as_secs_f64() - expected.as_secs_f64()).abs() < 0.5,
"negative score should increase hold; hold={:?}",
hold
);
}
#[test]
fn test_compute_adaptive_hold_large_positive_score() {
let base = std::time::Duration::from_secs(10);
let hold = buffer::compute_adaptive_hold(base, Some(3.0), 3.0);
let expected_secs = 13.75;
assert!(
(hold.as_secs_f64() - expected_secs).abs() < 0.5,
"large positive score should reduce hold; hold={:?}",
hold
);
}
#[test]
fn test_compute_adaptive_hold_zero_amplification() {
let base = std::time::Duration::from_secs(10);
let hold = buffer::compute_adaptive_hold(base, Some(0.2), 0.0);
assert_eq!(hold, base);
}
#[test]
fn test_compute_adaptive_hold_zero_base() {
let base = std::time::Duration::ZERO;
let hold = buffer::compute_adaptive_hold(base, Some(0.2), 3.0);
assert_eq!(hold, std::time::Duration::ZERO);
}
#[test]
fn test_compute_divergence_high_std_dev() {
let div = buffer::compute_divergence(Some(0.9), Some(1.2));
assert!((div.unwrap() - 1.0).abs() < 0.01);
}
#[test]
fn test_compute_divergence_zero_score() {
let div = buffer::compute_divergence(Some(0.0), None);
assert!((div.unwrap() - 0.5).abs() < 0.01);
}
#[test]
fn test_compute_divergence_high_positive_score() {
let div = buffer::compute_divergence(Some(3.0), None);
assert!((div.unwrap() - 0.125).abs() < 0.01);
}
#[test]
fn test_compute_divergence_negative_score() {
let div = buffer::compute_divergence(Some(-1.0), None);
assert!((div.unwrap() - 0.75).abs() < 0.01);
}
#[test]
fn test_agent_heartbeat_serialization() {
let heartbeat = crate::agents::AgentHeartbeat {
agent_id: "test-agent".into(),
status: AgentLiveStatus::Busy,
model_name: "gpt-4".into(),
provider_id: "openai".into(),
current_job: Some("job-123".into()),
uptime_secs: 3600,
timestamp: "2026-03-06T12:00:00Z".into(),
input_price_per_mtok: Some(10.0),
output_price_per_mtok: Some(30.0),
chars_per_token: Some(4.0),
response_sla_secs: Some(300),
temperature: Some(0.7),
frequency_penalty: None,
presence_penalty: None,
max_tokens: Some(4096),
context_window: Some(128000),
tasks_completed: 42,
tasks_failed: 3,
last_error: Some("evaluate: job-abc".into()),
..Default::default()
};
let json = serde_json::to_value(&heartbeat).unwrap();
assert_eq!(json["agent_id"], "test-agent");
assert_eq!(json["status"], "busy");
assert_eq!(json["current_job"], "job-123");
assert_eq!(json["uptime_secs"], 3600);
assert_eq!(json["tasks_completed"], 42);
assert_eq!(json["tasks_failed"], 3);
assert_eq!(json["last_error"], "evaluate: job-abc");
let deserialized: crate::agents::AgentHeartbeat = serde_json::from_value(json).unwrap();
assert_eq!(deserialized.agent_id, "test-agent");
assert_eq!(deserialized.status, AgentLiveStatus::Busy);
assert_eq!(deserialized.tasks_completed, 42);
}
#[test]
fn test_agent_heartbeat_idle_no_error() {
let heartbeat = crate::agents::AgentHeartbeat {
agent_id: "idle-agent".into(),
status: AgentLiveStatus::Idle,
model_name: "claude".into(),
provider_id: "anthropic".into(),
current_job: None,
uptime_secs: 10,
timestamp: "t".into(),
input_price_per_mtok: None,
output_price_per_mtok: None,
chars_per_token: None,
response_sla_secs: None,
temperature: None,
frequency_penalty: None,
presence_penalty: None,
max_tokens: None,
context_window: None,
tasks_completed: 0,
tasks_failed: 0,
last_error: None,
..Default::default()
};
let json = serde_json::to_value(&heartbeat).unwrap();
assert_eq!(json["status"], "idle");
assert!(json["current_job"].is_null());
assert_eq!(json["tasks_completed"], 0);
assert_eq!(json["tasks_failed"], 0);
}
#[test]
fn test_job_manifest_empty_agents() {
let manifest = JobManifest {
job_id: "empty-agents".into(),
task_description: "test".into(),
agents: vec![],
rounds: 1,
timestamp: 0,
};
let json = serde_json::to_string(&manifest).unwrap();
let parsed: JobManifest = serde_json::from_str(&json).unwrap();
assert!(parsed.agents.is_empty());
}
#[test]
fn test_job_manifest_contains_check() {
let manifest = JobManifest {
job_id: "check-test".into(),
task_description: "test".into(),
agents: vec!["alpha".into(), "beta".into(), "gamma".into()],
rounds: 3,
timestamp: 100,
};
assert!(manifest.agents.contains(&"alpha".to_string()));
assert!(manifest.agents.contains(&"beta".to_string()));
assert!(!manifest.agents.contains(&"delta".to_string()));
}
#[test]
fn test_scoped_key_format() {
let scope_prefix = "session-abc";
let key = "my_data";
let scoped = format!("{}.{}", scope_prefix, key);
assert_eq!(scoped, "session-abc.my_data");
}
#[test]
fn test_scoped_key_with_dots_in_prefix() {
let scope_prefix = "org.team.session";
let key = "scratchpad";
let scoped = format!("{}.{}", scope_prefix, key);
assert_eq!(scoped, "org.team.session.scratchpad");
}
#[test]
fn test_extract_preview_proposal_unicode_content_truncation() {
let long_content: String = "\u{4e16}\u{754c}".repeat(1200); let payload = serde_json::json!({
"content": long_content,
"thought_process": ""
});
let bytes = serde_json::to_vec(&payload).unwrap();
let preview = extract_content_preview(&bytes, "propose", &[]);
assert!(preview.is_some());
let parsed: serde_json::Value = serde_json::from_str(&preview.unwrap()).unwrap();
let c = parsed["c"].as_str().unwrap();
assert!(c.chars().count() <= 2002);
assert!(c.ends_with('\u{2026}'));
}
#[test]
fn test_extract_preview_proposal_unicode_thought_truncation() {
let long_tp: String = "\u{1f600}".repeat(600); let payload = serde_json::json!({
"content": "short content",
"thought_process": long_tp
});
let bytes = serde_json::to_vec(&payload).unwrap();
let preview = extract_content_preview(&bytes, "propose", &[]);
assert!(preview.is_some());
let parsed: serde_json::Value = serde_json::from_str(&preview.unwrap()).unwrap();
let tp = parsed["tp"].as_str().unwrap();
assert!(tp.chars().count() <= 502);
assert!(tp.ends_with('\u{2026}'));
}
#[test]
fn test_extract_preview_proposal_exactly_2000_chars_no_truncation() {
let exact_content: String = "a".repeat(2000);
let payload = serde_json::json!({
"content": exact_content,
});
let bytes = serde_json::to_vec(&payload).unwrap();
let preview = extract_content_preview(&bytes, "propose", &[]);
assert!(preview.is_some());
let parsed: serde_json::Value = serde_json::from_str(&preview.unwrap()).unwrap();
let c = parsed["c"].as_str().unwrap();
assert_eq!(c.chars().count(), 2000);
assert!(!c.ends_with('\u{2026}'));
}
#[test]
fn test_extract_preview_proposal_exactly_2001_chars_truncated() {
let content: String = "b".repeat(2001);
let payload = serde_json::json!({
"content": content,
});
let bytes = serde_json::to_vec(&payload).unwrap();
let preview = extract_content_preview(&bytes, "propose", &[]);
assert!(preview.is_some());
let parsed: serde_json::Value = serde_json::from_str(&preview.unwrap()).unwrap();
let c = parsed["c"].as_str().unwrap();
assert_eq!(c.chars().count(), 2001);
assert!(c.ends_with('\u{2026}'));
}
#[test]
fn test_extract_preview_proposal_thought_exactly_500_no_truncation() {
let tp: String = "c".repeat(500);
let payload = serde_json::json!({
"content": "hello",
"thought_process": tp
});
let bytes = serde_json::to_vec(&payload).unwrap();
let preview = extract_content_preview(&bytes, "propose", &[]);
assert!(preview.is_some());
let parsed: serde_json::Value = serde_json::from_str(&preview.unwrap()).unwrap();
let tp_out = parsed["tp"].as_str().unwrap();
assert_eq!(tp_out.chars().count(), 500);
assert!(!tp_out.ends_with('\u{2026}'));
}
#[test]
fn test_extract_preview_proposal_empty_thought_process_omitted() {
let payload = serde_json::json!({
"content": "some content",
"thought_process": ""
});
let bytes = serde_json::to_vec(&payload).unwrap();
let preview = extract_content_preview(&bytes, "propose", &[]);
assert!(preview.is_some());
let parsed: serde_json::Value = serde_json::from_str(&preview.unwrap()).unwrap();
assert!(
parsed.get("tp").is_none(),
"empty thought_process should not produce tp key"
);
}
#[test]
fn test_extract_preview_proposal_content_is_number_returns_none() {
let payload = serde_json::json!({
"content": 42,
"thought_process": "thinking"
});
let bytes = serde_json::to_vec(&payload).unwrap();
let preview = extract_content_preview(&bytes, "propose", &[]);
assert!(preview.is_none());
}
#[test]
fn test_extract_preview_eval_unicode_justification_truncation() {
let long_j: String = "\u{4e16}".repeat(400); let payload = serde_json::json!([
["agent-A", {"score": 6.0, "justification": long_j}]
]);
let bytes = serde_json::to_vec(&payload).unwrap();
let preview = extract_content_preview(&bytes, "evaluate", &[]);
assert!(preview.is_some());
let parsed: serde_json::Value = serde_json::from_str(&preview.unwrap()).unwrap();
let j = parsed["evals"][0]["j"].as_str().unwrap();
assert!(j.chars().count() <= 302);
assert!(j.ends_with('\u{2026}'));
}
#[test]
fn test_extract_preview_eval_unicode_textual_feedback_truncation() {
let long_tf: String = "\u{1f44d}".repeat(300); let payload = serde_json::json!([
["agent-A", {"score": 5.0, "textual_feedback": long_tf}]
]);
let bytes = serde_json::to_vec(&payload).unwrap();
let preview = extract_content_preview(&bytes, "evaluate", &[]);
assert!(preview.is_some());
let parsed: serde_json::Value = serde_json::from_str(&preview.unwrap()).unwrap();
let tf = parsed["evals"][0]["tf"].as_str().unwrap();
assert!(tf.chars().count() <= 202);
assert!(tf.ends_with('\u{2026}'));
}
#[test]
fn test_extract_preview_eval_justification_exactly_300_no_truncation() {
let j: String = "d".repeat(300);
let payload = serde_json::json!([
["agent-A", {"score": 7.0, "justification": j}]
]);
let bytes = serde_json::to_vec(&payload).unwrap();
let preview = extract_content_preview(&bytes, "evaluate", &[]);
assert!(preview.is_some());
let parsed: serde_json::Value = serde_json::from_str(&preview.unwrap()).unwrap();
let j_out = parsed["evals"][0]["j"].as_str().unwrap();
assert_eq!(j_out.chars().count(), 300);
assert!(!j_out.ends_with('\u{2026}'));
}
#[test]
fn test_extract_preview_eval_textual_feedback_exactly_200_no_truncation() {
let tf: String = "e".repeat(200);
let payload = serde_json::json!([
["agent-A", {"score": 7.0, "textual_feedback": tf}]
]);
let bytes = serde_json::to_vec(&payload).unwrap();
let preview = extract_content_preview(&bytes, "evaluate", &[]);
assert!(preview.is_some());
let parsed: serde_json::Value = serde_json::from_str(&preview.unwrap()).unwrap();
let tf_out = parsed["evals"][0]["tf"].as_str().unwrap();
assert_eq!(tf_out.chars().count(), 200);
assert!(!tf_out.ends_with('\u{2026}'));
}
#[test]
fn test_extract_preview_eval_missing_score_field() {
let payload = serde_json::json!([
["agent-A", {"justification": "OK"}]
]);
let bytes = serde_json::to_vec(&payload).unwrap();
let preview = extract_content_preview(&bytes, "evaluate", &[]);
assert!(preview.is_some());
let parsed: serde_json::Value = serde_json::from_str(&preview.unwrap()).unwrap();
let eval = &parsed["evals"][0];
assert_eq!(eval["target"], "agent-A");
assert_eq!(eval["j"], "OK");
assert!(
eval.get("s").is_none(),
"no score field in source → no s in preview"
);
}
#[test]
fn test_extract_preview_eval_missing_target_id() {
let payload = serde_json::json!([
[42, {"score": 5.0, "justification": "test"}]
]);
let bytes = serde_json::to_vec(&payload).unwrap();
let preview = extract_content_preview(&bytes, "evaluate", &[]);
assert!(preview.is_some());
let parsed: serde_json::Value = serde_json::from_str(&preview.unwrap()).unwrap();
assert_eq!(parsed["evals"][0]["target"], "?");
}
#[test]
fn test_extract_preview_eval_all_optional_fields_present() {
let payload = serde_json::json!([
["agent-A", {
"score": 7.5,
"justification": "Good analysis",
"stance": "strongly_agree",
"textual_feedback": "Well argued points",
"category_scores": {"accuracy": 8, "clarity": 9, "depth": 7},
"claim_assessments": [
{"claim": "The earth is round", "verdict": "agree"},
{"claim": "Water is wet", "verdict": "agree"}
],
"disagreements": ["Minor factual error in paragraph 2"]
}]
]);
let bytes = serde_json::to_vec(&payload).unwrap();
let preview = extract_content_preview(&bytes, "evaluate", &[]);
assert!(preview.is_some());
let parsed: serde_json::Value = serde_json::from_str(&preview.unwrap()).unwrap();
let eval = &parsed["evals"][0];
assert_eq!(eval["target"], "agent-A");
assert_eq!(eval["s"], 7.5);
assert_eq!(eval["j"], "Good analysis");
assert_eq!(eval["stance"], "strongly_agree");
assert_eq!(eval["tf"], "Well argued points");
assert_eq!(eval["cats"]["accuracy"], 8);
assert_eq!(eval["cats"]["clarity"], 9);
assert_eq!(eval["claims"].as_array().unwrap().len(), 2);
assert_eq!(eval["disputes"].as_array().unwrap().len(), 1);
}
#[test]
fn test_extract_preview_eval_minimal_eval_object() {
let payload = serde_json::json!([["agent-A", {}]]);
let bytes = serde_json::to_vec(&payload).unwrap();
let preview = extract_content_preview(&bytes, "evaluate", &[]);
assert!(preview.is_some());
let parsed: serde_json::Value = serde_json::from_str(&preview.unwrap()).unwrap();
let eval = &parsed["evals"][0];
assert_eq!(eval["target"], "agent-A");
assert!(eval.get("s").is_none());
assert!(eval.get("j").is_none());
assert!(eval.get("stance").is_none());
assert!(eval.get("tf").is_none());
assert!(eval.get("cats").is_none());
assert!(eval.get("claims").is_none());
assert!(eval.get("disputes").is_none());
}
#[test]
fn test_extract_preview_eval_props_truncated() {
let long_content = "x".repeat(1200);
let candidates = vec![crate::agents::CandidateProposal {
id: "Candidate_A".to_string(),
proposal: crate::agents::Proposal {
content: long_content.clone(),
..Default::default()
},
}];
let payload = serde_json::json!([["Candidate_A", {"score": 0.8, "justification": "ok"}]]);
let bytes = serde_json::to_vec(&payload).unwrap();
let preview = extract_content_preview(&bytes, "evaluate", &candidates);
assert!(preview.is_some());
let parsed: serde_json::Value = serde_json::from_str(&preview.unwrap()).unwrap();
let props = parsed.get("props").expect("should have props");
let val = props["Candidate_A"].as_str().unwrap();
assert!(val.ends_with('…'), "should be truncated");
assert_eq!(val.chars().count(), 1001);
}
#[test]
fn test_extract_preview_eval_props_short_content() {
let short_content = "short proposal".to_string();
let candidates = vec![crate::agents::CandidateProposal {
id: "Candidate_B".to_string(),
proposal: crate::agents::Proposal {
content: short_content.clone(),
..Default::default()
},
}];
let payload = serde_json::json!([["Candidate_B", {"score": 0.5, "justification": "meh"}]]);
let bytes = serde_json::to_vec(&payload).unwrap();
let preview = extract_content_preview(&bytes, "evaluate", &candidates);
assert!(preview.is_some());
let parsed: serde_json::Value = serde_json::from_str(&preview.unwrap()).unwrap();
let props = parsed.get("props").expect("should have props");
assert_eq!(props["Candidate_B"].as_str().unwrap(), "short proposal");
}
#[test]
fn test_extract_preview_eval_props_filters_to_displayed_targets() {
let candidates = vec![
crate::agents::CandidateProposal {
id: "Candidate_A".to_string(),
proposal: crate::agents::Proposal {
content: "proposal A".into(),
..Default::default()
},
},
crate::agents::CandidateProposal {
id: "Candidate_B".to_string(),
proposal: crate::agents::Proposal {
content: "proposal B".into(),
..Default::default()
},
},
];
let payload = serde_json::json!([["Candidate_A", {"score": 0.9, "justification": "good"}]]);
let bytes = serde_json::to_vec(&payload).unwrap();
let preview = extract_content_preview(&bytes, "evaluate", &candidates);
let parsed: serde_json::Value = serde_json::from_str(&preview.unwrap()).unwrap();
let props = parsed.get("props").expect("should have props");
assert!(
props.get("Candidate_A").is_some(),
"displayed target should be in props"
);
assert!(
props.get("Candidate_B").is_none(),
"non-displayed target should be filtered out"
);
}
#[test]
fn test_compute_divergence_only_std_dev_no_score() {
let div = buffer::compute_divergence(None, Some(0.3));
assert!(div.is_some());
assert!((div.unwrap() - 0.3).abs() < 0.01);
}
#[test]
fn test_compute_divergence_both_score_and_std_dev() {
let div = buffer::compute_divergence(Some(0.7), Some(0.35));
assert!((div.unwrap() - 0.35).abs() < 0.01);
}
#[test]
fn test_compute_divergence_score_dominates() {
let div = buffer::compute_divergence(Some(-2.0), Some(0.05));
assert!((div.unwrap() - 0.833).abs() < 0.02);
}
#[test]
fn test_compute_divergence_high_score_low_std() {
let div = buffer::compute_divergence(Some(5.0), Some(0.02));
assert!((div.unwrap() - 0.083).abs() < 0.02);
}
#[test]
fn test_compute_divergence_none_none() {
let div = buffer::compute_divergence(None, None);
assert!(div.is_none());
}
#[test]
fn test_compute_divergence_perfect_score_zero_std() {
let div = buffer::compute_divergence(Some(3.0), Some(0.0));
assert!((div.unwrap() - 0.125).abs() < 0.01);
}
#[test]
fn test_check_flags_low_score_triggers_flag() {
use crate::status::{AgentStatusSnapshot, ScoreEntry};
let mut snap = AgentStatusSnapshot::new("a".into(), "m".into(), "p".into());
for i in 0..3 {
snap.push_score(ScoreEntry {
timestamp: format!("t{}", i),
job_id: "j".into(),
round: i as u32 + 1,
evaluator: "e".into(),
score: -0.5,
});
}
assert!(snap.is_flagged, "agent should be flagged for low scores");
assert!(snap.flag_reason.as_ref().unwrap().contains("Low scores"));
}
#[test]
fn test_check_flags_high_divergence_triggers_flag() {
use crate::status::{AgentStatusSnapshot, ScoreEntry};
let mut snap = AgentStatusSnapshot::new("a".into(), "m".into(), "p".into());
snap.push_score(ScoreEntry {
timestamp: "t1".into(),
job_id: "j".into(),
round: 1,
evaluator: "e".into(),
score: -2.0,
});
snap.push_score(ScoreEntry {
timestamp: "t2".into(),
job_id: "j".into(),
round: 2,
evaluator: "e".into(),
score: 2.0,
});
assert!(snap.score_std_dev.unwrap() > 1.5);
assert!(
snap.is_flagged,
"agent should be flagged for high divergence"
);
assert!(snap.flag_reason.as_ref().unwrap().contains("divergence"));
}
#[test]
fn test_check_flags_clears_when_conditions_no_longer_met() {
use crate::status::{AgentStatusSnapshot, ScoreEntry};
let mut snap = AgentStatusSnapshot::new("a".into(), "m".into(), "p".into());
for i in 0..3 {
snap.push_score(ScoreEntry {
timestamp: format!("t{}", i),
job_id: "j".into(),
round: i as u32 + 1,
evaluator: "e".into(),
score: -0.5,
});
}
assert!(snap.is_flagged);
for i in 3..6 {
snap.push_score(ScoreEntry {
timestamp: format!("t{}", i),
job_id: "j".into(),
round: i as u32 + 1,
evaluator: "e".into(),
score: 0.8,
});
}
assert!(!snap.is_flagged, "flag should be cleared after good scores");
}
#[test]
fn test_check_flags_not_flagged_with_good_scores() {
use crate::status::{AgentStatusSnapshot, ScoreEntry};
let mut snap = AgentStatusSnapshot::new("a".into(), "m".into(), "p".into());
for i in 0..5 {
snap.push_score(ScoreEntry {
timestamp: format!("t{}", i),
job_id: "j".into(),
round: i as u32 + 1,
evaluator: "e".into(),
score: 0.7 + (i as f32 * 0.05),
});
}
assert!(!snap.is_flagged, "good scores should not flag the agent");
assert!(snap.flag_reason.is_none());
}
#[test]
fn test_check_flags_fewer_than_3_scores_no_low_score_flag() {
use crate::status::{AgentStatusSnapshot, ScoreEntry};
let mut snap = AgentStatusSnapshot::new("a".into(), "m".into(), "p".into());
snap.push_score(ScoreEntry {
timestamp: "t1".into(),
job_id: "j".into(),
round: 1,
evaluator: "e".into(),
score: 1.0,
});
snap.push_score(ScoreEntry {
timestamp: "t2".into(),
job_id: "j".into(),
round: 2,
evaluator: "e".into(),
score: 1.0,
});
assert!(
!snap.is_flagged,
"fewer than 3 scores should not trigger low-score flag"
);
}
#[test]
fn test_check_flags_low_score_takes_priority_over_high_divergence() {
use crate::status::{AgentStatusSnapshot, ScoreEntry};
let mut snap = AgentStatusSnapshot::new("a".into(), "m".into(), "p".into());
snap.push_score(ScoreEntry {
timestamp: "t1".into(),
job_id: "j".into(),
round: 1,
evaluator: "e".into(),
score: -0.5,
});
snap.push_score(ScoreEntry {
timestamp: "t2".into(),
job_id: "j".into(),
round: 2,
evaluator: "e".into(),
score: -0.4,
});
snap.push_score(ScoreEntry {
timestamp: "t3".into(),
job_id: "j".into(),
round: 3,
evaluator: "e".into(),
score: -0.6,
});
assert!(snap.is_flagged);
assert!(
snap.flag_reason.as_ref().unwrap().contains("Low scores"),
"low score flag should take priority, got: {:?}",
snap.flag_reason
);
}
#[test]
fn test_push_score_single_score_no_std_dev() {
use crate::status::{AgentStatusSnapshot, ScoreEntry};
let mut snap = AgentStatusSnapshot::new("a".into(), "m".into(), "p".into());
snap.push_score(ScoreEntry {
timestamp: "t".into(),
job_id: "j".into(),
round: 1,
evaluator: "e".into(),
score: 5.0,
});
assert_eq!(snap.mean_score, Some(5.0));
assert!(
snap.score_std_dev.is_none(),
"single score should have no std_dev"
);
}
#[test]
fn test_push_score_two_identical_scores_zero_std_dev() {
use crate::status::{AgentStatusSnapshot, ScoreEntry};
let mut snap = AgentStatusSnapshot::new("a".into(), "m".into(), "p".into());
snap.push_score(ScoreEntry {
timestamp: "t1".into(),
job_id: "j".into(),
round: 1,
evaluator: "e".into(),
score: 6.0,
});
snap.push_score(ScoreEntry {
timestamp: "t2".into(),
job_id: "j".into(),
round: 2,
evaluator: "e".into(),
score: 6.0,
});
assert_eq!(snap.mean_score, Some(6.0));
assert!((snap.score_std_dev.unwrap() - 0.0).abs() < f32::EPSILON);
}
#[test]
fn test_push_score_trims_beyond_max() {
use crate::status::{AgentStatusSnapshot, ScoreEntry};
let mut snap = AgentStatusSnapshot::new("a".into(), "m".into(), "p".into());
for i in 0..55 {
snap.push_score(ScoreEntry {
timestamp: format!("t{}", i),
job_id: "j".into(),
round: i as u32 + 1,
evaluator: "e".into(),
score: 5.0,
});
}
assert_eq!(
snap.recent_scores.len(),
50,
"should trim to MAX_RECENT_SCORES"
);
assert_eq!(snap.recent_scores.front().unwrap().round, 6);
}
#[test]
fn test_worker_config_debug_output() {
let config = WorkerConfig::new(
"nats://localhost:4222".to_string(),
"test_stream".to_string(),
"test_consumer".to_string(),
);
let debug = format!("{:?}", config);
assert!(debug.contains("WorkerConfig"));
assert!(debug.contains("nats://localhost:4222"));
assert!(debug.contains("test_stream"));
assert!(debug.contains("test_consumer"));
assert!(debug.contains("nsed"));
assert!(debug.contains("sphera"));
}
#[test]
fn test_job_manifest_debug_output() {
let manifest = JobManifest {
job_id: "debug-test".into(),
task_description: "test desc".into(),
agents: vec!["a1".into()],
rounds: 2,
timestamp: 12345,
};
let debug = format!("{:?}", manifest);
assert!(debug.contains("JobManifest"));
assert!(debug.contains("debug-test"));
assert!(debug.contains("test desc"));
}
#[test]
fn test_config_patch_apply_all_fields() {
use crate::agents::AgentConfig;
use crate::control_plane::ConfigPatch;
let mut config = AgentConfig {
name: "test".into(),
provider_id: "p".into(),
model_name: "m".into(),
temperature: 0.7,
..Default::default()
};
let patch = ConfigPatch {
temperature: Some(1.5),
frequency_penalty: Some(0.5),
presence_penalty: Some(-0.3),
persona: Some("friendly helper".into()),
textual_feedback: Some(true),
max_react_iterations: Some(3),
max_retries: Some(1),
};
patch.apply(&mut config).expect("valid patch");
assert_eq!(config.temperature, 1.5);
assert_eq!(config.frequency_penalty, Some(0.5));
assert_eq!(config.presence_penalty, Some(-0.3));
assert_eq!(config.persona, Some("friendly helper".into()));
assert!(config.textual_feedback);
assert_eq!(config.max_react_iterations, Some(3));
assert_eq!(config.max_retries, Some(1));
}
#[test]
fn test_config_patch_apply_empty_patch_no_change() {
use crate::agents::AgentConfig;
use crate::control_plane::ConfigPatch;
let mut config = AgentConfig {
name: "test".into(),
provider_id: "p".into(),
model_name: "m".into(),
temperature: 0.7,
frequency_penalty: Some(0.2),
..Default::default()
};
let patch = ConfigPatch::default();
patch.apply(&mut config).expect("empty patch");
assert_eq!(config.temperature, 0.7);
assert_eq!(config.frequency_penalty, Some(0.2));
}
#[test]
fn test_config_patch_rejects_unknown_fields() {
use crate::control_plane::ConfigPatch;
let json = r#"{"temperature": 0.5, "unknown_field": true}"#;
let result: Result<ConfigPatch, _> = serde_json::from_str(json);
assert!(
result.is_err(),
"unknown fields should be rejected due to deny_unknown_fields"
);
}
#[test]
fn test_inject_annotations_deeply_nested_eval() {
use crate::agents::{AnnotationType, OperatorAnnotation};
let payload = br#"[["agent-A", {"score": 5.0}, "extra-data"]]"#;
let annotation = OperatorAnnotation {
annotation_type: AnnotationType::Edit,
comment: "test".into(),
timestamp: "t".into(),
original_content_hash: None,
};
let entry = make_entry(payload, true, vec![annotation]);
let result = NatsNsedWorker::inject_annotations(&entry);
let val: serde_json::Value = serde_json::from_slice(&result).unwrap();
let inner = val.as_array().unwrap()[0].as_array().unwrap();
assert_eq!(inner[1]["edited_by"], "operator");
assert!(inner[1]["operator_annotations"].is_array());
assert_eq!(inner.len(), 3);
assert_eq!(inner[2], "extra-data");
}
#[test]
fn test_compute_adaptive_hold_medium_score() {
let base = std::time::Duration::from_secs(10);
let hold = buffer::compute_adaptive_hold(base, Some(0.5), 3.0);
let expected_secs = 20.0;
assert!(
(hold.as_secs_f64() - expected_secs).abs() < 0.5,
"score 0.5 should give ~2x base; got {:?}",
hold
);
}
#[tokio::test]
async fn test_buffer_response_sla_matches_hold() {
let buf = buffer::ResponseBuffer::new(std::time::Duration::from_secs(300));
assert_eq!(
buf.response_sla(),
Some(std::time::Duration::from_secs(300))
);
let buf_short = buffer::ResponseBuffer::new(std::time::Duration::from_secs(5));
assert_eq!(
buf_short.response_sla(),
Some(std::time::Duration::from_secs(5))
);
}
#[test]
fn test_agent_live_status_serialization() {
let busy_json = serde_json::to_string(&AgentLiveStatus::Busy).unwrap();
assert_eq!(busy_json, "\"busy\"");
let idle_json = serde_json::to_string(&AgentLiveStatus::Idle).unwrap();
assert_eq!(idle_json, "\"idle\"");
let parsed: AgentLiveStatus = serde_json::from_str(&busy_json).unwrap();
assert_eq!(parsed, AgentLiveStatus::Busy);
}
#[test]
fn test_error_rate_all_failures() {
use crate::status::{AgentStatusSnapshot, TaskLogEntry};
let mut snap = AgentStatusSnapshot::new("a".into(), "m".into(), "p".into());
for i in 0..5 {
snap.push_task(TaskLogEntry {
timestamp: format!("t{}", i),
action: "propose".into(),
job_id: format!("j{}", i),
round: 1,
status: "error".into(),
duration_ms: 10,
content_preview: None,
});
}
assert!(
(snap.error_rate - 1.0).abs() < f32::EPSILON,
"all failures should give 100% error rate"
);
}
#[test]
fn test_error_rate_all_successes() {
use crate::status::{AgentStatusSnapshot, TaskLogEntry};
let mut snap = AgentStatusSnapshot::new("a".into(), "m".into(), "p".into());
for i in 0..5 {
snap.push_task(TaskLogEntry {
timestamp: format!("t{}", i),
action: "evaluate".into(),
job_id: format!("j{}", i),
round: 1,
status: "ok".into(),
duration_ms: 10,
content_preview: None,
});
}
assert!(
(snap.error_rate - 0.0).abs() < f32::EPSILON,
"all successes should give 0% error rate"
);
}
#[tokio::test]
async fn test_drain_stale_drains_stopped_entries_from_other_jobs() {
let buf = buffer::ResponseBuffer::new(std::time::Duration::from_secs(300));
let now = std::time::Instant::now();
buf.push(buffer::BufferedResponse {
id: "stopped-stale".into(),
action: "propose".into(),
job_id: "old-job".into(),
round: 1,
reply_subject: "s".into(),
payload: b"{}".to_vec(),
created_at: now,
release_at: now + std::time::Duration::from_secs(3600),
ack_handle: Box::new(TestAckHandle),
msg_id: "m".into(),
annotations: vec![],
edited: false,
stopped: true,
})
.await;
let stale = buf.drain_stale("current-job").await;
assert_eq!(stale.len(), 1, "stopped stale entries should be drained");
assert_eq!(stale[0].id, "stopped-stale");
}
#[tokio::test]
async fn test_buffer_list_overdue_entry_negative_release() {
let buf = buffer::ResponseBuffer::new(std::time::Duration::from_secs(300));
let now = std::time::Instant::now();
buf.push(buffer::BufferedResponse {
id: "overdue-1".into(),
action: "propose".into(),
job_id: "j".into(),
round: 1,
reply_subject: "s".into(),
payload: b"{}".to_vec(),
created_at: now,
release_at: now, ack_handle: Box::new(TestAckHandle),
msg_id: "m".into(),
annotations: vec![],
edited: false,
stopped: true, })
.await;
tokio::time::sleep(std::time::Duration::from_millis(5)).await;
let list = buf.list().await;
assert_eq!(list.len(), 1);
assert!(
list[0].release_in_ms <= 0,
"overdue entry should have negative release_in_ms, got {}",
list[0].release_in_ms
);
assert!(list[0].stopped);
}
#[tokio::test]
async fn test_buffer_entry_detail_serde_flatten() {
let buf = buffer::ResponseBuffer::new(std::time::Duration::from_secs(60));
let payload = serde_json::json!({"content": "test proposal"});
let now = std::time::Instant::now();
buf.push(buffer::BufferedResponse {
id: "serde-1".into(),
action: "propose".into(),
job_id: "job-abc".into(),
round: 2,
reply_subject: "s".into(),
payload: serde_json::to_vec(&payload).unwrap(),
created_at: now,
release_at: now + std::time::Duration::from_secs(60),
ack_handle: Box::new(TestAckHandle),
msg_id: "msg-serde".into(),
annotations: vec![],
edited: false,
stopped: false,
})
.await;
let detail = buf.get_detail("serde-1").await.unwrap();
let json = serde_json::to_value(&detail).unwrap();
assert_eq!(json["id"], "serde-1");
assert_eq!(json["action"], "propose");
assert_eq!(json["job_id"], "job-abc");
assert_eq!(json["round"], 2);
assert!(json["age_ms"].is_number());
assert!(json["release_in_ms"].is_number());
assert_eq!(json["stopped"], false);
assert_eq!(json["content"]["content"], "test proposal");
}
#[tokio::test]
async fn test_update_payload_with_annotation_unknown_id() {
use crate::agents::{AnnotationType, OperatorAnnotation};
let buf = buffer::ResponseBuffer::new(std::time::Duration::from_secs(30));
buf.push(buffer::BufferedResponse {
id: "exists".into(),
action: "propose".into(),
job_id: "j".into(),
round: 1,
reply_subject: "s".into(),
payload: b"{}".to_vec(),
created_at: std::time::Instant::now(),
release_at: std::time::Instant::now() + std::time::Duration::from_secs(30),
ack_handle: Box::new(TestAckHandle),
msg_id: "m".into(),
annotations: vec![],
edited: false,
stopped: false,
})
.await;
let annotation = OperatorAnnotation {
annotation_type: AnnotationType::Edit,
comment: "edit".into(),
timestamp: "t".into(),
original_content_hash: None,
};
let result = buf
.update_payload_with_annotation("nonexistent", b"new payload".to_vec(), annotation)
.await;
assert!(!result, "should return false for unknown ID");
assert_eq!(buf.len().await, 1, "existing entry should be unaffected");
}
#[test]
fn test_nats_auth_inline_creds_only_is_configured() {
use crate::nats_utils::NatsAuth;
let auth = NatsAuth {
inline_creds: Some("creds-data".into()),
..Default::default()
};
assert!(auth.is_configured());
}
#[test]
fn test_nats_auth_creds_file_only_is_configured() {
use crate::nats_utils::NatsAuth;
let auth = NatsAuth {
creds_file: Some("/path/to/creds".into()),
..Default::default()
};
assert!(auth.is_configured());
}
#[test]
fn test_proposal_serde_roundtrip() {
let proposal = crate::agents::Proposal {
content: "My proposal content".into(),
thought_process: "I considered alternatives".into(),
final_scratchpad: Some("notes".into()),
..Default::default()
};
let json = serde_json::to_string(&proposal).unwrap();
let parsed: crate::agents::Proposal = serde_json::from_str(&json).unwrap();
assert_eq!(parsed.content, "My proposal content");
assert_eq!(parsed.thought_process, "I considered alternatives");
assert_eq!(parsed.final_scratchpad, Some("notes".into()));
}
#[test]
fn test_evaluation_serde_roundtrip() {
let eval = crate::agents::Evaluation {
score: 7.5,
justification: "Well-reasoned".into(),
stance: Some(crate::agents::Stance::Agree),
..Default::default()
};
let json = serde_json::to_string(&eval).unwrap();
let parsed: crate::agents::Evaluation = serde_json::from_str(&json).unwrap();
assert!((parsed.score - 7.5).abs() < f32::EPSILON);
assert_eq!(parsed.justification, "Well-reasoned");
assert_eq!(parsed.stance, Some(crate::agents::Stance::Agree));
}
#[test]
fn test_extract_preview_empty_action_string_returns_none() {
let payload = serde_json::json!({"content": "test"});
let bytes = serde_json::to_vec(&payload).unwrap();
let preview = extract_content_preview(&bytes, "", &[]);
assert!(preview.is_none(), "empty action string should return None");
}
fn filter_evaluations(
candidates: &[crate::agents::CandidateProposal],
evaluations: Vec<(String, crate::agents::Evaluation)>,
) -> Vec<(String, crate::agents::Evaluation)> {
let valid_ids: std::collections::HashSet<&str> =
candidates.iter().map(|c| c.id.as_str()).collect();
evaluations
.into_iter()
.filter(|(target_id, _)| valid_ids.contains(target_id.as_str()))
.collect()
}
fn make_candidate(id: &str) -> crate::agents::CandidateProposal {
crate::agents::CandidateProposal {
id: id.to_string(),
proposal: crate::agents::Proposal::default(),
}
}
fn make_eval(target: &str, score: f32) -> (String, crate::agents::Evaluation) {
(
target.to_string(),
crate::agents::Evaluation {
score,
..Default::default()
},
)
}
#[test]
fn test_filter_mixed_valid_and_hallucinated() {
let candidates = vec![
make_candidate("Candidate_A"),
make_candidate("Candidate_B"),
make_candidate("Candidate_C"),
];
let evaluations = vec![
make_eval("Candidate_A", 0.8),
make_eval("...", 0.5), make_eval("Candidate_B", 0.6),
make_eval("UNKNOWN_X", 0.9), make_eval("Candidate_C", 0.7),
];
let filtered = filter_evaluations(&candidates, evaluations);
assert_eq!(filtered.len(), 3);
assert_eq!(filtered[0].0, "Candidate_A");
assert_eq!(filtered[1].0, "Candidate_B");
assert_eq!(filtered[2].0, "Candidate_C");
let bytes = serde_json::to_vec(&filtered).unwrap();
let parsed: Vec<(String, crate::agents::Evaluation)> =
serde_json::from_slice(&bytes).unwrap();
assert_eq!(parsed.len(), 3);
}
#[test]
fn test_filter_all_invalid_returns_empty() {
let candidates = vec![make_candidate("Candidate_A"), make_candidate("Candidate_B")];
let evaluations = vec![
make_eval("...", 0.5),
make_eval("HALLUCINATED", 0.9),
make_eval("", 0.1),
];
let filtered = filter_evaluations(&candidates, evaluations);
assert!(filtered.is_empty());
let bytes = serde_json::to_vec(&filtered).unwrap();
assert_eq!(bytes, b"[]");
}
#[test]
fn test_filter_all_valid_passes_through() {
let candidates = vec![make_candidate("Candidate_A"), make_candidate("Candidate_B")];
let evaluations = vec![make_eval("Candidate_A", 0.8), make_eval("Candidate_B", 0.6)];
let filtered = filter_evaluations(&candidates, evaluations);
assert_eq!(filtered.len(), 2);
}
#[test]
fn test_filter_empty_evaluations() {
let candidates = vec![make_candidate("Candidate_A")];
let filtered = filter_evaluations(&candidates, vec![]);
assert!(filtered.is_empty());
}
#[test]
fn test_buffer_entry_summary_serialization() {
let summary = buffer::BufferEntrySummary {
id: "sum-1".into(),
action: "propose".into(),
job_id: "job-xyz".into(),
round: 3,
age_ms: 5000,
release_in_ms: -200,
stopped: true,
};
let json = serde_json::to_value(&summary).unwrap();
assert_eq!(json["id"], "sum-1");
assert_eq!(json["action"], "propose");
assert_eq!(json["job_id"], "job-xyz");
assert_eq!(json["round"], 3);
assert_eq!(json["age_ms"], 5000);
assert_eq!(json["release_in_ms"], -200);
assert_eq!(json["stopped"], true);
}
#[test]
fn test_is_transient_error_matches_known_patterns() {
let cases = [
"broken pipe",
"Connection reset by peer",
"os error 32",
"os error 104",
"operation timed out",
"connection closed before message completed",
"unexpected eof during handshake",
"stream closed",
"connection refused",
"network unreachable",
"connection aborted",
];
for msg in cases {
let err = anyhow::anyhow!("{msg}");
assert!(is_transient_error(&err), "expected transient for: {msg}");
}
}
#[test]
fn test_is_transient_error_case_insensitive() {
let err = anyhow::anyhow!("BROKEN PIPE in TLS layer");
assert!(is_transient_error(&err));
let err = anyhow::anyhow!("Connection Reset By Peer");
assert!(is_transient_error(&err));
}
#[test]
fn test_is_transient_error_rejects_non_transient() {
let cases = [
"invalid API key",
"401 Unauthorized",
"model not found",
"rate limit exceeded",
"JSON parse error",
"",
];
for msg in cases {
let err = anyhow::anyhow!("{msg}");
assert!(
!is_transient_error(&err),
"expected non-transient for: {msg}"
);
}
}
#[test]
fn test_is_transient_error_embedded_in_message() {
let err = anyhow::anyhow!("sending proposal failed: broken pipe (os error 32)");
assert!(is_transient_error(&err));
}
#[test]
fn classify_parse_error() {
let r = classify_abstention_reason(
"Failed to parse structured output after 4 attempts. Last error: missing field `evaluations`",
);
assert_eq!(r, "parse_error");
}
#[test]
fn classify_iter_budget() {
assert_eq!(
classify_abstention_reason("agent loop exhausted iteration budget"),
"iter_budget_exhausted"
);
assert_eq!(
classify_abstention_reason("hit max_iterations cap"),
"iter_budget_exhausted"
);
}
#[test]
fn classify_timeout() {
assert_eq!(
classify_abstention_reason("upstream timed out after 60s"),
"timeout"
);
}
#[test]
fn classify_tool_error() {
assert_eq!(
classify_abstention_reason("tool 'user_grep_repo' failed: out of sandbox"),
"tool_error"
);
}
#[test]
fn classify_fallback() {
assert_eq!(classify_abstention_reason("kaboom"), "error");
}
#[test]
fn failed_subject_format_pins_exact_wire_shape() {
let s = failed_result_subject("nsed", "sess-abc", 3, "ReviewerAlpha", "evaluate");
assert_eq!(s, "nsed.sess-abc.result.3.ReviewerAlpha.evaluate.failed");
}
#[test]
fn failed_subject_round_zero_renders() {
let s = failed_result_subject("nsed", "x", 0, "A", "propose");
assert_eq!(s, "nsed.x.result.0.A.propose.failed");
}
#[test]
fn failed_subject_custom_prefix_propagates() {
let s = failed_result_subject("tenantX", "sess", 1, "A", "propose");
assert!(s.starts_with("tenantX."));
assert!(s.ends_with(".A.propose.failed"));
}
#[test]
fn failure_marker_emitted_for_propose() {
assert!(should_publish_failure_marker("propose", false));
}
#[test]
fn failure_marker_emitted_for_evaluate() {
assert!(should_publish_failure_marker("evaluate", false));
}
#[test]
fn failure_marker_skipped_for_other_actions() {
for action in ["passthrough", "heartbeat", "unknown", ""] {
assert!(
!should_publish_failure_marker(action, false),
"action {action:?} should not trigger a .failed marker"
);
}
}
#[test]
fn failure_marker_skipped_for_payment_errors() {
assert!(!should_publish_failure_marker("propose", true));
assert!(!should_publish_failure_marker("evaluate", true));
}
}
pub mod nsed_worker;
pub use nsed_worker::{NatsNsedWorkerExt, NatsNsedWorkerStatusExt};