use crate::agents::OperatorAnnotation;
use serde::Serialize;
use std::collections::VecDeque;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::time::{Duration, Instant};
use tokio::sync::RwLock;
use utoipa::ToSchema;
#[async_trait::async_trait]
pub trait AckHandle: Send + Sync {
async fn ack(&self) -> anyhow::Result<()>;
}
pub struct NatsAckHandle(pub async_nats::jetstream::Message);
#[async_trait::async_trait]
impl AckHandle for NatsAckHandle {
async fn ack(&self) -> anyhow::Result<()> {
self.0
.ack()
.await
.map_err(|e| anyhow::anyhow!("ack failed: {}", e))
}
}
pub struct PreAckedHandle;
#[async_trait::async_trait]
impl AckHandle for PreAckedHandle {
async fn ack(&self) -> anyhow::Result<()> {
Ok(()) }
}
pub struct BufferedResponse {
pub id: String,
pub action: String,
pub job_id: String,
pub round: u32,
pub reply_subject: String,
pub payload: Vec<u8>,
pub created_at: Instant,
pub release_at: Instant,
pub ack_handle: Box<dyn AckHandle>,
pub msg_id: String,
pub annotations: Vec<OperatorAnnotation>,
pub edited: bool,
pub stopped: bool,
}
#[derive(Debug, Clone, Serialize, ToSchema)]
pub struct BufferEntrySummary {
pub id: String,
pub action: String,
pub job_id: String,
pub round: u32,
pub age_ms: u64,
pub release_in_ms: i64,
pub stopped: bool,
}
#[derive(Debug, Clone, Serialize, ToSchema)]
pub struct BufferEntryDetail {
#[serde(flatten)]
#[schema(inline)]
pub summary: BufferEntrySummary,
pub content: serde_json::Value,
}
pub struct ResponseBuffer {
pending: RwLock<VecDeque<BufferedResponse>>,
base_hold_duration_ms: u64,
hold_duration_ms: AtomicU64,
paused: AtomicBool,
response_sla_ms: AtomicU64,
auto_approve: AtomicBool,
auto_approve_threshold_milli: AtomicU64,
}
impl ResponseBuffer {
pub fn new(hold_duration: Duration) -> Self {
let ms = hold_duration.as_millis() as u64;
Self {
pending: RwLock::new(VecDeque::new()),
base_hold_duration_ms: ms,
hold_duration_ms: AtomicU64::new(ms),
paused: AtomicBool::new(false),
response_sla_ms: AtomicU64::new(ms),
auto_approve: AtomicBool::new(true),
auto_approve_threshold_milli: AtomicU64::new(1000), }
}
pub fn hold_duration(&self) -> Duration {
Duration::from_millis(self.hold_duration_ms.load(Ordering::Relaxed))
}
pub fn base_hold_duration(&self) -> Duration {
Duration::from_millis(self.base_hold_duration_ms)
}
pub fn set_hold_duration(&self, duration: Duration) {
self.hold_duration_ms
.store(duration.as_millis() as u64, Ordering::Relaxed);
}
pub fn set_response_sla(&self, sla: Duration) {
self.response_sla_ms
.store(sla.as_millis() as u64, Ordering::Relaxed);
}
pub fn response_sla(&self) -> Option<Duration> {
let ms = self.response_sla_ms.load(Ordering::Relaxed);
if ms == 0 {
None
} else {
Some(Duration::from_millis(ms))
}
}
pub async fn push(&self, entry: BufferedResponse) {
self.pending.write().await.push_back(entry);
}
pub async fn push_with_deadline(&self, mut entry: BufferedResponse, task_received: Instant) {
if self.auto_approve.load(Ordering::Relaxed) {
entry.release_at = Instant::now();
self.pending.write().await.push_back(entry);
return;
}
let sla_ms = self.response_sla_ms.load(Ordering::Relaxed);
if sla_ms > 0 {
let sla = Duration::from_millis(sla_ms);
let deadline = task_received + sla;
let now = Instant::now();
entry.release_at = if deadline > now { deadline } else { now };
} else {
entry.release_at = Instant::now();
}
self.pending.write().await.push_back(entry);
}
pub async fn drain_ready(&self) -> Vec<BufferedResponse> {
if self.paused.load(Ordering::Relaxed) {
return Vec::new();
}
let now = Instant::now();
let mut pending = self.pending.write().await;
let mut ready = Vec::new();
let mut remaining = VecDeque::with_capacity(pending.len());
for entry in pending.drain(..) {
if now >= entry.release_at && !entry.stopped {
ready.push(entry);
} else {
remaining.push_back(entry);
}
}
*pending = remaining;
ready
}
pub async fn list(&self) -> Vec<BufferEntrySummary> {
let now = Instant::now();
let pending = self.pending.read().await;
pending
.iter()
.map(|entry| {
let age = now.duration_since(entry.created_at);
let release_in = if now >= entry.release_at {
-(now.duration_since(entry.release_at).as_millis() as i64)
} else {
entry.release_at.duration_since(now).as_millis() as i64
};
BufferEntrySummary {
id: entry.id.clone(),
action: entry.action.clone(),
job_id: entry.job_id.clone(),
round: entry.round,
age_ms: age.as_millis() as u64,
release_in_ms: release_in,
stopped: entry.stopped,
}
})
.collect()
}
pub async fn release(&self, id: &str) -> Option<BufferedResponse> {
let mut pending = self.pending.write().await;
if let Some(pos) = pending.iter().position(|e| e.id == id) {
pending.remove(pos)
} else {
None
}
}
pub async fn reject(&self, id: &str) -> Option<BufferedResponse> {
self.release(id).await
}
pub async fn drain_stale(&self, current_job_id: &str) -> Vec<BufferedResponse> {
let mut pending = self.pending.write().await;
let mut stale = Vec::new();
let mut remaining = VecDeque::with_capacity(pending.len());
for entry in pending.drain(..) {
if entry.job_id != current_job_id {
stale.push(entry);
} else {
remaining.push_back(entry);
}
}
*pending = remaining;
stale
}
pub fn pause(&self) {
self.paused.store(true, Ordering::Relaxed);
}
pub fn resume(&self) {
self.paused.store(false, Ordering::Relaxed);
}
pub fn is_paused(&self) -> bool {
self.paused.load(Ordering::Relaxed)
}
pub fn set_auto_approve(&self, enabled: bool) {
self.auto_approve.store(enabled, Ordering::Relaxed);
}
pub fn is_auto_approve(&self) -> bool {
self.auto_approve.load(Ordering::Relaxed)
}
pub fn set_auto_approve_threshold(&self, threshold: f32) {
let clamped = threshold.clamp(0.0, 1.0);
self.auto_approve_threshold_milli
.store((clamped * 1000.0) as u64, Ordering::Relaxed);
}
pub fn auto_approve_threshold(&self) -> f32 {
self.auto_approve_threshold_milli.load(Ordering::Relaxed) as f32 / 1000.0
}
pub async fn auto_release_if_eligible(&self, divergence: Option<f32>) -> usize {
if !self.auto_approve.load(Ordering::Relaxed) {
return 0;
}
if let Some(div) = divergence {
if div > self.auto_approve_threshold() {
return 0; }
}
let now = Instant::now();
let mut pending = self.pending.write().await;
let mut count = 0;
for entry in pending.iter_mut() {
if !entry.stopped && entry.release_at > now {
entry.release_at = now;
count += 1;
}
}
count
}
pub async fn len(&self) -> usize {
self.pending.read().await.len()
}
pub async fn is_empty(&self) -> bool {
self.pending.read().await.is_empty()
}
pub async fn get_detail(&self, id: &str) -> Option<BufferEntryDetail> {
let now = Instant::now();
let pending = self.pending.read().await;
pending.iter().find(|e| e.id == id).map(|entry| {
let age = now.duration_since(entry.created_at);
let release_in = if now >= entry.release_at {
-(now.duration_since(entry.release_at).as_millis() as i64)
} else {
entry.release_at.duration_since(now).as_millis() as i64
};
let content = serde_json::from_slice(&entry.payload).unwrap_or(serde_json::Value::Null);
BufferEntryDetail {
summary: BufferEntrySummary {
id: entry.id.clone(),
action: entry.action.clone(),
job_id: entry.job_id.clone(),
round: entry.round,
age_ms: age.as_millis() as u64,
release_in_ms: release_in,
stopped: entry.stopped,
},
content,
}
})
}
pub async fn update_payload(&self, id: &str, new_payload: Vec<u8>) -> bool {
let mut pending = self.pending.write().await;
if let Some(entry) = pending.iter_mut().find(|e| e.id == id) {
entry.payload = new_payload;
true
} else {
false
}
}
pub async fn add_comment(&self, id: &str, annotation: OperatorAnnotation) -> bool {
let mut pending = self.pending.write().await;
if let Some(entry) = pending.iter_mut().find(|e| e.id == id) {
entry.annotations.push(annotation);
true
} else {
false
}
}
pub async fn mark_for_release(&self, id: &str) -> bool {
let mut pending = self.pending.write().await;
if let Some(entry) = pending.iter_mut().find(|e| e.id == id) {
entry.release_at = Instant::now();
true
} else {
false
}
}
pub async fn force_release(&self, id: &str) -> bool {
let mut pending = self.pending.write().await;
if let Some(entry) = pending.iter_mut().find(|e| e.id == id) {
entry.stopped = false;
entry.release_at = Instant::now();
true
} else {
false
}
}
pub async fn stop(&self, id: &str) -> bool {
let mut pending = self.pending.write().await;
if let Some(entry) = pending.iter_mut().find(|e| e.id == id) {
entry.stopped = true;
true
} else {
false
}
}
pub async fn unstop(&self, id: &str) -> bool {
let mut pending = self.pending.write().await;
if let Some(entry) = pending.iter_mut().find(|e| e.id == id) {
entry.stopped = false;
true
} else {
false
}
}
pub async fn update_payload_with_annotation(
&self,
id: &str,
new_payload: Vec<u8>,
annotation: OperatorAnnotation,
) -> bool {
let mut pending = self.pending.write().await;
if let Some(entry) = pending.iter_mut().find(|e| e.id == id) {
entry.payload = new_payload;
entry.edited = true;
entry.annotations.push(annotation);
true
} else {
false
}
}
}
pub fn compute_adaptive_hold(
base: Duration,
mean_score: Option<f32>,
amplification: f32,
) -> Duration {
let Some(score) = mean_score else {
return base;
};
let positive = soft_normalize_positive(score);
let multiplier = 1.0 + (1.0 - positive) * amplification;
Duration::from_secs_f64(base.as_secs_f64() * multiplier as f64)
}
fn soft_normalize_positive(score: f32) -> f32 {
let soft = score / (1.0 + score.abs());
((soft + 1.0) / 2.0).clamp(0.0, 1.0)
}
pub fn compute_divergence(mean_score: Option<f32>, score_std_dev: Option<f32>) -> Option<f32> {
let score_div = mean_score.map(|s| 1.0 - soft_normalize_positive(s));
let std_div = score_std_dev.map(|sd| sd.clamp(0.0, 1.0));
match (score_div, std_div) {
(Some(a), Some(b)) => Some(a.max(b)),
(Some(a), None) => Some(a),
(None, Some(b)) => Some(b),
(None, None) => None,
}
}
#[cfg(test)]
mod tests {
use super::*;
struct NoopAckHandle;
#[async_trait::async_trait]
impl AckHandle for NoopAckHandle {
async fn ack(&self) -> anyhow::Result<()> {
Ok(())
}
}
fn make_entry(id: &str, action: &str, job_id: &str, hold: Duration) -> BufferedResponse {
let now = Instant::now();
BufferedResponse {
id: id.to_string(),
action: action.to_string(),
job_id: job_id.to_string(),
round: 1,
reply_subject: format!("nsed.{}.result.1.agent.{}", job_id, action),
payload: b"{}".to_vec(),
created_at: now,
release_at: now + hold,
ack_handle: Box::new(NoopAckHandle),
msg_id: format!("msg-{}", id),
annotations: Vec::new(),
edited: false,
stopped: false,
}
}
#[tokio::test]
async fn test_buffer_push_and_len() {
let buf = ResponseBuffer::new(Duration::from_secs(10));
assert_eq!(buf.len().await, 0);
assert!(buf.is_empty().await);
buf.push(make_entry("a", "propose", "job-1", Duration::from_secs(10)))
.await;
buf.push(make_entry(
"b",
"evaluate",
"job-2",
Duration::from_secs(10),
))
.await;
assert_eq!(buf.len().await, 2);
assert!(!buf.is_empty().await);
}
#[tokio::test]
async fn test_buffer_drain_respects_hold_duration() {
let buf = ResponseBuffer::new(Duration::from_secs(60));
buf.push(make_entry("a", "propose", "job-1", Duration::from_secs(60)))
.await;
let drained = buf.drain_ready().await;
assert!(drained.is_empty());
assert_eq!(buf.len().await, 1);
}
#[tokio::test]
async fn test_buffer_drain_releases_ready() {
let buf = ResponseBuffer::new(Duration::ZERO);
buf.push(make_entry("a", "propose", "job-1", Duration::ZERO))
.await;
buf.push(make_entry("b", "evaluate", "job-2", Duration::ZERO))
.await;
let drained = buf.drain_ready().await;
assert_eq!(drained.len(), 2);
assert!(buf.is_empty().await);
}
#[tokio::test]
async fn test_buffer_pause_stops_drain() {
let buf = ResponseBuffer::new(Duration::ZERO);
buf.push(make_entry("a", "propose", "job-1", Duration::ZERO))
.await;
buf.pause();
assert!(buf.is_paused());
let drained = buf.drain_ready().await;
assert!(drained.is_empty(), "paused buffer should not drain");
assert_eq!(buf.len().await, 1, "entry should still be in buffer");
}
#[tokio::test]
async fn test_buffer_resume_releases_overdue() {
let buf = ResponseBuffer::new(Duration::ZERO);
buf.push(make_entry("a", "propose", "job-1", Duration::ZERO))
.await;
buf.pause();
let drained = buf.drain_ready().await;
assert!(drained.is_empty());
buf.resume();
assert!(!buf.is_paused());
let drained = buf.drain_ready().await;
assert_eq!(drained.len(), 1);
}
#[tokio::test]
async fn test_buffer_release_by_id() {
let buf = ResponseBuffer::new(Duration::from_secs(60));
buf.push(make_entry("a", "propose", "job-1", Duration::from_secs(60)))
.await;
buf.push(make_entry(
"b",
"evaluate",
"job-2",
Duration::from_secs(60),
))
.await;
let released = buf.release("a").await;
assert!(released.is_some());
assert_eq!(released.unwrap().id, "a");
assert_eq!(buf.len().await, 1);
}
#[tokio::test]
async fn test_buffer_reject_by_id() {
let buf = ResponseBuffer::new(Duration::from_secs(60));
buf.push(make_entry("a", "propose", "job-1", Duration::from_secs(60)))
.await;
let rejected = buf.reject("a").await;
assert!(rejected.is_some());
assert_eq!(rejected.unwrap().id, "a");
assert!(buf.is_empty().await);
}
#[tokio::test]
async fn test_buffer_release_unknown_id() {
let buf = ResponseBuffer::new(Duration::from_secs(60));
buf.push(make_entry("a", "propose", "job-1", Duration::from_secs(60)))
.await;
let released = buf.release("nonexistent").await;
assert!(released.is_none());
assert_eq!(buf.len().await, 1, "existing entry should remain");
}
#[tokio::test]
async fn test_buffer_list_returns_summaries() {
let buf = ResponseBuffer::new(Duration::from_secs(30));
buf.push(make_entry(
"entry-1",
"propose",
"job-abcd1234",
Duration::from_secs(30),
))
.await;
let list = buf.list().await;
assert_eq!(list.len(), 1);
assert_eq!(list[0].id, "entry-1");
assert_eq!(list[0].action, "propose");
assert_eq!(list[0].job_id, "job-abcd1234");
assert_eq!(list[0].round, 1);
assert!(list[0].release_in_ms > 0, "should still be holding");
}
#[tokio::test]
async fn test_buffer_zero_hold_drains_immediately() {
let buf = ResponseBuffer::new(Duration::ZERO);
for i in 0..5 {
buf.push(make_entry(
&format!("e{}", i),
"propose",
&format!("job-{}", i),
Duration::ZERO,
))
.await;
}
let drained = buf.drain_ready().await;
assert_eq!(drained.len(), 5);
assert!(buf.is_empty().await);
}
#[tokio::test]
async fn test_get_detail_returns_content() {
let buf = ResponseBuffer::new(Duration::from_secs(30));
let payload = serde_json::json!({"title": "My proposal", "content": "Hello world"});
let now = Instant::now();
buf.push(BufferedResponse {
id: "detail-1".to_string(),
action: "propose".to_string(),
job_id: "job-xyz".to_string(),
round: 3,
reply_subject: "nsed.job-xyz.result.3.agent.propose".to_string(),
payload: serde_json::to_vec(&payload).unwrap(),
created_at: now,
release_at: now + Duration::from_secs(30),
ack_handle: Box::new(NoopAckHandle),
msg_id: "msg-detail-1".to_string(),
annotations: Vec::new(),
edited: false,
stopped: false,
})
.await;
let detail = buf.get_detail("detail-1").await;
assert!(detail.is_some());
let detail = detail.unwrap();
assert_eq!(detail.summary.id, "detail-1");
assert_eq!(detail.summary.action, "propose");
assert_eq!(detail.summary.job_id, "job-xyz");
assert_eq!(detail.summary.round, 3);
assert!(detail.summary.release_in_ms > 0);
assert_eq!(detail.content["title"], "My proposal");
assert_eq!(detail.content["content"], "Hello world");
}
#[tokio::test]
async fn test_get_detail_returns_none_for_unknown_id() {
let buf = ResponseBuffer::new(Duration::from_secs(30));
buf.push(make_entry("a", "propose", "job-1", Duration::from_secs(30)))
.await;
assert!(buf.get_detail("nonexistent").await.is_none());
}
#[tokio::test]
async fn test_get_detail_invalid_payload_returns_null_content() {
let buf = ResponseBuffer::new(Duration::from_secs(30));
let now = Instant::now();
buf.push(BufferedResponse {
id: "bad-json".to_string(),
action: "propose".to_string(),
job_id: "job-1".to_string(),
round: 1,
reply_subject: "nsed.job-1.result.1.agent.propose".to_string(),
payload: b"not valid json!".to_vec(),
created_at: now,
release_at: now + Duration::from_secs(30),
ack_handle: Box::new(NoopAckHandle),
msg_id: "msg-bad".to_string(),
annotations: Vec::new(),
edited: false,
stopped: false,
})
.await;
let detail = buf.get_detail("bad-json").await.unwrap();
assert_eq!(detail.content, serde_json::Value::Null);
}
#[tokio::test]
async fn test_update_payload_replaces_content() {
let buf = ResponseBuffer::new(Duration::from_secs(30));
buf.push(make_entry(
"upd-1",
"evaluate",
"job-1",
Duration::from_secs(30),
))
.await;
let new_payload = serde_json::json!({"scores": [8, 9, 7]});
let updated = buf
.update_payload("upd-1", serde_json::to_vec(&new_payload).unwrap())
.await;
assert!(updated);
let detail = buf.get_detail("upd-1").await.unwrap();
assert_eq!(detail.content["scores"], serde_json::json!([8, 9, 7]));
}
#[tokio::test]
async fn test_update_payload_unknown_id_returns_false() {
let buf = ResponseBuffer::new(Duration::from_secs(30));
buf.push(make_entry("a", "propose", "job-1", Duration::from_secs(30)))
.await;
let result = buf.update_payload("nonexistent", b"{}".to_vec()).await;
assert!(!result);
assert_eq!(buf.len().await, 1);
}
#[tokio::test]
async fn test_add_comment_records_annotation() {
use crate::agents::{AnnotationType, OperatorAnnotation};
let buf = ResponseBuffer::new(Duration::from_secs(30));
buf.push(make_entry(
"ann-1",
"propose",
"job-1",
Duration::from_secs(30),
))
.await;
let annotation = OperatorAnnotation {
annotation_type: AnnotationType::Comment,
comment: "Looks good".to_string(),
timestamp: "2026-03-02T12:00:00Z".to_string(),
original_content_hash: None,
};
assert!(buf.add_comment("ann-1", annotation).await);
assert_eq!(buf.len().await, 1);
}
#[tokio::test]
async fn test_add_comment_unknown_id_returns_false() {
use crate::agents::{AnnotationType, OperatorAnnotation};
let buf = ResponseBuffer::new(Duration::from_secs(30));
buf.push(make_entry("a", "propose", "job-1", Duration::from_secs(30)))
.await;
let annotation = OperatorAnnotation {
annotation_type: AnnotationType::Comment,
comment: "test".to_string(),
timestamp: "2026-03-02T12:00:00Z".to_string(),
original_content_hash: None,
};
assert!(!buf.add_comment("nonexistent", annotation).await);
}
#[tokio::test]
async fn test_update_payload_with_annotation_marks_edited() {
use crate::agents::{AnnotationType, OperatorAnnotation};
let buf = ResponseBuffer::new(Duration::ZERO);
buf.push(make_entry("edit-1", "propose", "job-1", Duration::ZERO))
.await;
let annotation = OperatorAnnotation {
annotation_type: AnnotationType::Edit,
comment: "Fixed wording".to_string(),
timestamp: "2026-03-02T12:00:00Z".to_string(),
original_content_hash: Some("abc123".to_string()),
};
let new_payload = serde_json::json!({"content": "edited"});
assert!(
buf.update_payload_with_annotation(
"edit-1",
serde_json::to_vec(&new_payload).unwrap(),
annotation
)
.await
);
let drained = buf.drain_ready().await;
assert_eq!(drained.len(), 1);
let entry = &drained[0];
assert!(entry.edited);
assert_eq!(entry.annotations.len(), 1);
assert_eq!(entry.annotations[0].annotation_type, AnnotationType::Edit);
assert_eq!(entry.annotations[0].comment, "Fixed wording");
}
#[tokio::test]
async fn test_buffer_concurrent_push_drain() {
use std::sync::Arc;
let buf = Arc::new(ResponseBuffer::new(Duration::ZERO));
let mut handles = Vec::new();
for i in 0..10 {
let buf = buf.clone();
handles.push(tokio::spawn(async move {
buf.push(make_entry(
&format!("c{}", i),
"propose",
&format!("job-{}", i),
Duration::ZERO,
))
.await;
}));
}
for h in handles {
h.await.unwrap();
}
let drained = buf.drain_ready().await;
assert_eq!(drained.len(), 10);
assert!(buf.is_empty().await);
}
#[test]
fn test_compute_adaptive_hold_high_score() {
let base = Duration::from_secs(10);
let hold = super::compute_adaptive_hold(base, Some(0.8), 3.0);
let expected_secs = 18.33;
assert!(
(hold.as_secs_f64() - expected_secs).abs() < 0.5,
"hold={:?}",
hold
);
}
#[test]
fn test_compute_adaptive_hold_low_score() {
let base = Duration::from_secs(10);
let hold = super::compute_adaptive_hold(base, Some(-0.8), 3.0);
let expected_secs = 31.67;
assert!(
(hold.as_secs_f64() - expected_secs).abs() < 0.5,
"hold={:?}",
hold
);
}
#[test]
fn test_compute_adaptive_hold_no_score() {
let base = Duration::from_secs(10);
let hold = super::compute_adaptive_hold(base, None, 3.0);
assert_eq!(hold, base);
}
#[test]
fn test_compute_adaptive_hold_perfect_score() {
let base = Duration::from_secs(10);
let hold = super::compute_adaptive_hold(base, Some(1.0), 3.0);
let expected_secs = 17.5;
assert!(
(hold.as_secs_f64() - expected_secs).abs() < 0.5,
"hold={:?}",
hold
);
}
#[test]
fn test_compute_adaptive_hold_zero_score() {
let base = Duration::from_secs(10);
let hold = super::compute_adaptive_hold(base, Some(0.0), 3.0);
let expected_secs = 25.0;
assert!(
(hold.as_secs_f64() - expected_secs).abs() < 0.5,
"hold={:?}",
hold
);
}
#[test]
fn test_set_hold_duration_atomic() {
let buf = ResponseBuffer::new(Duration::from_secs(10));
assert_eq!(buf.hold_duration(), Duration::from_secs(10));
assert_eq!(buf.base_hold_duration(), Duration::from_secs(10));
buf.set_hold_duration(Duration::from_secs(25));
assert_eq!(buf.hold_duration(), Duration::from_secs(25));
assert_eq!(buf.base_hold_duration(), Duration::from_secs(10));
}
#[test]
fn test_response_sla_default_matches_hold_duration() {
let buf = ResponseBuffer::new(Duration::from_secs(10));
assert_eq!(buf.response_sla(), Some(Duration::from_secs(10)));
let buf_long = ResponseBuffer::new(Duration::from_secs(600));
assert_eq!(buf_long.response_sla(), Some(Duration::from_secs(600)));
let buf_fast = ResponseBuffer::new(Duration::from_millis(500));
assert_eq!(buf_fast.response_sla(), Some(Duration::from_millis(500)));
}
#[test]
fn test_response_sla_zero_hold_is_none() {
let buf = ResponseBuffer::new(Duration::ZERO);
assert!(buf.response_sla().is_none());
}
#[test]
fn test_set_response_sla() {
let buf = ResponseBuffer::new(Duration::from_secs(10));
buf.set_response_sla(Duration::from_secs(600));
assert_eq!(buf.response_sla(), Some(Duration::from_secs(600)));
}
#[test]
fn test_set_response_sla_uses_exact_value() {
let buf = ResponseBuffer::new(Duration::from_secs(10));
buf.set_response_sla(Duration::from_secs(30));
assert_eq!(buf.response_sla(), Some(Duration::from_secs(30)));
buf.set_response_sla(Duration::from_millis(1));
assert_eq!(buf.response_sla(), Some(Duration::from_millis(1)));
buf.set_response_sla(Duration::ZERO);
assert_eq!(buf.response_sla(), None, "zero means passthrough");
}
#[tokio::test]
async fn test_push_with_deadline_sla() {
let buf = ResponseBuffer::new(Duration::from_secs(10));
buf.set_auto_approve(false);
buf.set_response_sla(Duration::from_secs(60));
let task_received = Instant::now();
let entry = make_entry("sla-1", "propose", "job-1", Duration::from_secs(10));
buf.push_with_deadline(entry, task_received).await;
let drained = buf.drain_ready().await;
assert!(
drained.is_empty(),
"should hold for full SLA (~60s), not drain immediately"
);
assert_eq!(buf.len().await, 1);
}
#[tokio::test]
async fn test_push_with_deadline_no_sla_fallback() {
let buf = ResponseBuffer::new(Duration::ZERO);
let task_received = Instant::now();
let entry = make_entry("nosla-1", "propose", "job-1", Duration::ZERO);
buf.push_with_deadline(entry, task_received).await;
let drained = buf.drain_ready().await;
assert_eq!(drained.len(), 1, "should drain immediately when no SLA set");
}
#[tokio::test]
async fn test_push_with_deadline_past_deadline_clamps() {
let buf = ResponseBuffer::new(Duration::from_secs(60));
buf.set_response_sla(Duration::from_secs(600));
let task_received = Instant::now() - Duration::from_secs(620);
let entry = make_entry("late-1", "evaluate", "job-1", Duration::from_secs(60));
buf.push_with_deadline(entry, task_received).await;
let drained = buf.drain_ready().await;
assert_eq!(
drained.len(),
1,
"past-deadline entry should drain immediately"
);
}
#[tokio::test]
async fn test_short_hold_duration_uses_exact_sla() {
let buf = ResponseBuffer::new(Duration::from_secs(10));
buf.set_auto_approve(false);
let task_received = Instant::now();
let entry = make_entry("p1", "propose", "job-1", Duration::from_secs(10));
buf.push_with_deadline(entry, task_received).await;
let list = buf.list().await;
assert_eq!(list.len(), 1);
assert!(
list[0].release_in_ms > 0 && list[0].release_in_ms <= 10_000,
"release_in_ms should be in (0, 10_000], got {}",
list[0].release_in_ms
);
}
#[tokio::test]
async fn test_response_sla_matches_hold_on_construction() {
let buf = ResponseBuffer::new(Duration::from_secs(10));
let sla = buf.response_sla();
assert_eq!(sla, Some(Duration::from_secs(10)));
}
#[tokio::test]
async fn test_zero_hold_is_passthrough_no_sla_floor() {
let buf = ResponseBuffer::new(Duration::ZERO);
let sla = buf.response_sla();
assert_eq!(sla, None, "pass-through mode should have no SLA");
}
#[tokio::test]
async fn test_drain_stale_removes_entries_from_other_jobs() {
let buf = ResponseBuffer::new(Duration::from_secs(300));
buf.push(make_entry(
"a",
"propose",
"old-job",
Duration::from_secs(300),
))
.await;
buf.push(make_entry(
"b",
"evaluate",
"old-job",
Duration::from_secs(300),
))
.await;
buf.push(make_entry(
"c",
"propose",
"current-job",
Duration::from_secs(300),
))
.await;
assert_eq!(buf.len().await, 3);
let stale = buf.drain_stale("current-job").await;
assert_eq!(stale.len(), 2, "should drain 2 old-job entries");
assert_eq!(buf.len().await, 1, "should keep 1 current-job entry");
let list = buf.list().await;
assert_eq!(list[0].id, "c");
assert_eq!(list[0].job_id, "current-job");
}
#[tokio::test]
async fn test_drain_stale_no_op_when_all_current() {
let buf = ResponseBuffer::new(Duration::from_secs(300));
buf.push(make_entry(
"a",
"propose",
"job-1",
Duration::from_secs(300),
))
.await;
buf.push(make_entry(
"b",
"evaluate",
"job-1",
Duration::from_secs(300),
))
.await;
let stale = buf.drain_stale("job-1").await;
assert!(stale.is_empty());
assert_eq!(buf.len().await, 2);
}
#[tokio::test]
async fn test_drain_stale_empty_buffer() {
let buf = ResponseBuffer::new(Duration::from_secs(300));
let stale = buf.drain_stale("any-job").await;
assert!(stale.is_empty());
}
#[tokio::test]
async fn test_pre_acked_handle_is_noop() {
let handle = PreAckedHandle;
assert!(
handle.ack().await.is_ok(),
"PreAckedHandle.ack() should always succeed"
);
assert!(handle.ack().await.is_ok());
}
#[tokio::test]
async fn test_buffer_entry_with_pre_acked_handle_drains_correctly() {
let buf = ResponseBuffer::new(Duration::ZERO);
let now = Instant::now();
buf.push(BufferedResponse {
id: "pre-acked-1".to_string(),
action: "propose".to_string(),
job_id: "job-1".to_string(),
round: 1,
reply_subject: "nsed.job-1.result.1.agent.propose".to_string(),
payload: b"{\"content\":\"test\"}".to_vec(),
created_at: now,
release_at: now, ack_handle: Box::new(PreAckedHandle), msg_id: "msg-pre-acked-1".to_string(),
annotations: Vec::new(),
edited: false,
stopped: false,
})
.await;
assert_eq!(buf.len().await, 1);
let drained = buf.drain_ready().await;
assert_eq!(drained.len(), 1);
assert!(drained[0].ack_handle.ack().await.is_ok());
assert!(buf.is_empty().await);
}
#[tokio::test]
async fn test_mark_for_release_sets_release_at_to_now() {
let buf = ResponseBuffer::new(Duration::from_secs(600));
let now = Instant::now();
let far_future = now + Duration::from_secs(3600);
buf.push(BufferedResponse {
id: "mark-1".to_string(),
action: "propose".to_string(),
job_id: "job-1".to_string(),
round: 1,
reply_subject: "nsed.job-1.result.1.agent.propose".to_string(),
payload: b"{}".to_vec(),
created_at: now,
release_at: far_future,
ack_handle: Box::new(NoopAckHandle),
msg_id: "msg-mark-1".to_string(),
annotations: Vec::new(),
edited: false,
stopped: false,
})
.await;
assert!(buf.drain_ready().await.is_empty());
assert_eq!(buf.len().await, 1);
let found = buf.mark_for_release("mark-1").await;
assert!(found, "mark_for_release should find the entry");
let drained = buf.drain_ready().await;
assert_eq!(drained.len(), 1);
assert_eq!(drained[0].id, "mark-1");
assert!(buf.is_empty().await);
}
#[tokio::test]
async fn test_mark_for_release_unknown_id_returns_false() {
let buf = ResponseBuffer::new(Duration::from_secs(60));
let found = buf.mark_for_release("nonexistent").await;
assert!(
!found,
"mark_for_release should return false for unknown ID"
);
}
#[tokio::test]
async fn test_mark_for_release_only_affects_target_entry() {
let buf = ResponseBuffer::new(Duration::from_secs(600));
let now = Instant::now();
let far_future = now + Duration::from_secs(3600);
for i in 0..2 {
buf.push(BufferedResponse {
id: format!("entry-{}", i),
action: "propose".to_string(),
job_id: "job-1".to_string(),
round: 1,
reply_subject: format!("nsed.job-1.result.1.agent{}.propose", i),
payload: b"{}".to_vec(),
created_at: now,
release_at: far_future,
ack_handle: Box::new(NoopAckHandle),
msg_id: format!("msg-{}", i),
annotations: Vec::new(),
edited: false,
stopped: false,
})
.await;
}
assert_eq!(buf.len().await, 2);
buf.mark_for_release("entry-0").await;
let drained = buf.drain_ready().await;
assert_eq!(drained.len(), 1);
assert_eq!(drained[0].id, "entry-0");
assert_eq!(buf.len().await, 1); }
#[tokio::test]
async fn test_mark_for_release_while_paused_still_marks() {
let buf = ResponseBuffer::new(Duration::from_secs(600));
let now = Instant::now();
buf.push(BufferedResponse {
id: "paused-mark-1".to_string(),
action: "evaluate".to_string(),
job_id: "job-2".to_string(),
round: 1,
reply_subject: "nsed.job-2.result.1.agent.evaluate".to_string(),
payload: b"{}".to_vec(),
created_at: now,
release_at: now + Duration::from_secs(3600),
ack_handle: Box::new(NoopAckHandle),
msg_id: "msg-paused-1".to_string(),
annotations: Vec::new(),
edited: false,
stopped: false,
})
.await;
buf.pause();
assert!(buf.mark_for_release("paused-mark-1").await);
assert!(buf.drain_ready().await.is_empty());
assert_eq!(buf.len().await, 1);
buf.resume();
let drained = buf.drain_ready().await;
assert_eq!(drained.len(), 1);
assert_eq!(drained[0].id, "paused-mark-1");
}
#[tokio::test]
async fn test_mark_for_release_preserves_stopped_flag() {
let buf = ResponseBuffer::new(Duration::from_secs(600));
buf.push(make_entry(
"stopped-release",
"propose",
"job-1",
Duration::from_secs(3600),
))
.await;
assert!(buf.stop("stopped-release").await);
let drained = buf.drain_ready().await;
assert!(drained.is_empty(), "stopped entry should not drain");
assert!(buf.mark_for_release("stopped-release").await);
let drained = buf.drain_ready().await;
assert!(
drained.is_empty(),
"stopped entry should not drain even after mark_for_release"
);
assert!(buf.unstop("stopped-release").await);
let drained = buf.drain_ready().await;
assert_eq!(drained.len(), 1);
assert_eq!(drained[0].id, "stopped-release");
}
#[tokio::test]
async fn test_force_release_atomically_unstops_and_releases() {
let buf = ResponseBuffer::new(Duration::from_secs(600));
buf.push(make_entry(
"atomic-rel",
"propose",
"job-1",
Duration::from_secs(3600),
))
.await;
assert!(buf.stop("atomic-rel").await);
let drained = buf.drain_ready().await;
assert!(drained.is_empty(), "stopped entry should not drain");
assert!(buf.force_release("atomic-rel").await);
let drained = buf.drain_ready().await;
assert_eq!(drained.len(), 1);
assert_eq!(drained[0].id, "atomic-rel");
}
#[tokio::test]
async fn test_force_release_nonexistent_returns_false() {
let buf = ResponseBuffer::new(Duration::from_secs(600));
assert!(!buf.force_release("no-such-entry").await);
}
#[tokio::test]
async fn test_stopped_entry_not_drained() {
let buf = ResponseBuffer::new(Duration::ZERO);
buf.push(make_entry("stop-1", "propose", "job-1", Duration::ZERO))
.await;
assert!(buf.stop("stop-1").await);
let drained = buf.drain_ready().await;
assert!(drained.is_empty(), "stopped entry should not drain");
assert_eq!(buf.len().await, 1, "entry should still be in buffer");
}
#[tokio::test]
async fn test_unstop_makes_entry_drainable() {
let buf = ResponseBuffer::new(Duration::ZERO);
buf.push(make_entry("unstop-1", "evaluate", "job-1", Duration::ZERO))
.await;
assert!(buf.stop("unstop-1").await);
assert!(buf.unstop("unstop-1").await);
let drained = buf.drain_ready().await;
assert_eq!(drained.len(), 1);
assert_eq!(drained[0].id, "unstop-1");
assert!(buf.is_empty().await);
}
#[tokio::test]
async fn test_stop_unknown_id_returns_false() {
let buf = ResponseBuffer::new(Duration::from_secs(60));
assert!(!buf.stop("nonexistent").await);
}
#[tokio::test]
async fn test_unstop_unknown_id_returns_false() {
let buf = ResponseBuffer::new(Duration::from_secs(60));
assert!(!buf.unstop("nonexistent").await);
}
#[tokio::test]
async fn test_stop_only_affects_target_entry() {
let buf = ResponseBuffer::new(Duration::ZERO);
buf.push(make_entry("s-1", "propose", "job-1", Duration::ZERO))
.await;
buf.push(make_entry("s-2", "evaluate", "job-1", Duration::ZERO))
.await;
assert!(buf.stop("s-1").await);
let drained = buf.drain_ready().await;
assert_eq!(drained.len(), 1);
assert_eq!(drained[0].id, "s-2");
assert_eq!(buf.len().await, 1);
}
#[tokio::test]
async fn test_stopped_entry_visible_in_list() {
let buf = ResponseBuffer::new(Duration::from_secs(60));
buf.push(make_entry(
"vis-1",
"propose",
"job-1",
Duration::from_secs(60),
))
.await;
buf.stop("vis-1").await;
let entries = buf.list().await;
assert_eq!(entries.len(), 1);
assert!(entries[0].stopped, "stopped flag should be true in list");
}
#[tokio::test]
async fn test_stopped_entry_visible_in_detail() {
let buf = ResponseBuffer::new(Duration::from_secs(60));
buf.push(make_entry(
"vis-d-1",
"propose",
"job-1",
Duration::from_secs(60),
))
.await;
buf.stop("vis-d-1").await;
let detail = buf.get_detail("vis-d-1").await;
assert!(detail.is_some());
assert!(
detail.unwrap().summary.stopped,
"stopped flag should be true in detail"
);
}
#[tokio::test]
async fn test_stop_while_paused_still_stops() {
let buf = ResponseBuffer::new(Duration::ZERO);
buf.push(make_entry("sp-1", "propose", "job-1", Duration::ZERO))
.await;
buf.pause();
assert!(buf.stop("sp-1").await);
buf.resume();
let drained = buf.drain_ready().await;
assert!(
drained.is_empty(),
"stopped entry should not drain even after resume"
);
assert_eq!(buf.len().await, 1);
buf.unstop("sp-1").await;
let drained = buf.drain_ready().await;
assert_eq!(drained.len(), 1);
}
#[tokio::test]
async fn test_reply_subject_preserved_after_edit() {
use crate::agents::{AnnotationType, OperatorAnnotation};
let buf = ResponseBuffer::new(Duration::ZERO);
let entry = make_entry("rs-1", "propose", "job-A", Duration::ZERO);
let original_subject = entry.reply_subject.clone();
buf.push(entry).await;
let new_payload = br#"{"content":"edited by operator"}"#.to_vec();
let annotation = OperatorAnnotation {
annotation_type: AnnotationType::Edit,
comment: "Improved wording".into(),
timestamp: "2026-01-01T00:00:00Z".into(),
original_content_hash: None,
};
assert!(
buf.update_payload_with_annotation("rs-1", new_payload.clone(), annotation)
.await
);
let drained = buf.drain_ready().await;
assert_eq!(drained.len(), 1);
assert_eq!(
drained[0].reply_subject, original_subject,
"reply_subject must survive edits"
);
assert_eq!(drained[0].payload, new_payload, "payload should be updated");
assert!(drained[0].edited, "edited flag should be set");
assert_eq!(drained[0].annotations.len(), 1);
}
#[tokio::test]
async fn test_reply_subject_preserved_after_multiple_edits() {
use crate::agents::{AnnotationType, OperatorAnnotation};
let buf = ResponseBuffer::new(Duration::ZERO);
let entry = make_entry("rs-2", "evaluate", "job-B", Duration::ZERO);
let original_subject = entry.reply_subject.clone();
buf.push(entry).await;
buf.update_payload_with_annotation(
"rs-2",
b"v2".to_vec(),
OperatorAnnotation {
annotation_type: AnnotationType::Edit,
comment: "First edit".into(),
timestamp: "t1".into(),
original_content_hash: None,
},
)
.await;
buf.update_payload_with_annotation(
"rs-2",
b"v3".to_vec(),
OperatorAnnotation {
annotation_type: AnnotationType::Edit,
comment: "Second edit".into(),
timestamp: "t2".into(),
original_content_hash: None,
},
)
.await;
buf.add_comment(
"rs-2",
OperatorAnnotation {
annotation_type: AnnotationType::Comment,
comment: "LGTM".into(),
timestamp: "t3".into(),
original_content_hash: None,
},
)
.await;
let drained = buf.drain_ready().await;
assert_eq!(drained.len(), 1);
assert_eq!(
drained[0].reply_subject, original_subject,
"reply_subject must survive multiple edits"
);
assert_eq!(
drained[0].payload, b"v3",
"payload should reflect last edit"
);
assert_eq!(drained[0].annotations.len(), 3, "all annotations preserved");
}
#[tokio::test]
async fn test_reply_subject_preserved_after_stop_edit_unstop() {
use crate::agents::{AnnotationType, OperatorAnnotation};
let buf = ResponseBuffer::new(Duration::ZERO);
let entry = make_entry("rs-3", "propose", "job-C", Duration::ZERO);
let original_subject = entry.reply_subject.clone();
buf.push(entry).await;
assert!(buf.stop("rs-3").await);
buf.update_payload_with_annotation(
"rs-3",
br#"{"content":"regenerated proposal"}"#.to_vec(),
OperatorAnnotation {
annotation_type: AnnotationType::Edit,
comment: "Regenerated by operator".into(),
timestamp: "t1".into(),
original_content_hash: None,
},
)
.await;
let drained = buf.drain_ready().await;
assert!(
drained.is_empty(),
"stopped entry should not drain even after edit"
);
assert!(buf.unstop("rs-3").await);
let drained = buf.drain_ready().await;
assert_eq!(drained.len(), 1);
assert_eq!(
drained[0].reply_subject, original_subject,
"reply_subject must survive stop→edit→unstop cycle"
);
assert_eq!(
std::str::from_utf8(&drained[0].payload).unwrap(),
r#"{"content":"regenerated proposal"}"#
);
}
#[tokio::test]
async fn test_double_stop_is_idempotent() {
let buf = ResponseBuffer::new(Duration::ZERO);
buf.push(make_entry("ds-1", "propose", "j", Duration::ZERO))
.await;
assert!(buf.stop("ds-1").await);
assert!(buf.stop("ds-1").await); assert!(buf.drain_ready().await.is_empty());
assert!(buf.unstop("ds-1").await);
assert_eq!(buf.drain_ready().await.len(), 1);
}
#[tokio::test]
async fn test_double_unstop_is_idempotent() {
let buf = ResponseBuffer::new(Duration::ZERO);
buf.push(make_entry("du-1", "propose", "j", Duration::ZERO))
.await;
buf.stop("du-1").await;
assert!(buf.unstop("du-1").await);
assert!(buf.unstop("du-1").await); assert_eq!(buf.drain_ready().await.len(), 1);
}
#[tokio::test]
async fn test_edit_nonexistent_returns_false() {
use crate::agents::{AnnotationType, OperatorAnnotation};
let buf = ResponseBuffer::new(Duration::ZERO);
let result = buf
.update_payload_with_annotation(
"ghost",
b"new".to_vec(),
OperatorAnnotation {
annotation_type: AnnotationType::Edit,
comment: "".into(),
timestamp: "t".into(),
original_content_hash: None,
},
)
.await;
assert!(!result);
}
#[tokio::test]
async fn test_edit_after_drain_returns_false() {
use crate::agents::{AnnotationType, OperatorAnnotation};
let buf = ResponseBuffer::new(Duration::ZERO);
buf.push(make_entry("ed-1", "propose", "j", Duration::ZERO))
.await;
buf.drain_ready().await;
let result = buf
.update_payload_with_annotation(
"ed-1",
b"too late".to_vec(),
OperatorAnnotation {
annotation_type: AnnotationType::Edit,
comment: "".into(),
timestamp: "t".into(),
original_content_hash: None,
},
)
.await;
assert!(!result, "cannot edit an already-drained entry");
}
#[tokio::test]
async fn test_selective_stop_only_blocks_target() {
let buf = ResponseBuffer::new(Duration::ZERO);
buf.push(make_entry("m-1", "propose", "j", Duration::ZERO))
.await;
buf.push(make_entry("m-2", "evaluate", "j", Duration::ZERO))
.await;
buf.push(make_entry("m-3", "propose", "j", Duration::ZERO))
.await;
buf.stop("m-2").await;
let drained = buf.drain_ready().await;
assert_eq!(drained.len(), 2, "only non-stopped entries should drain");
let ids: Vec<&str> = drained.iter().map(|e| e.id.as_str()).collect();
assert!(ids.contains(&"m-1"));
assert!(ids.contains(&"m-3"));
assert!(!ids.contains(&"m-2"));
assert_eq!(buf.len().await, 1);
assert!(buf.get_detail("m-2").await.is_some());
}
#[tokio::test]
async fn test_job_id_and_action_preserved_through_full_lifecycle() {
use crate::agents::{AnnotationType, OperatorAnnotation};
let buf = ResponseBuffer::new(Duration::ZERO);
let mut entry = make_entry("lc-1", "evaluate", "job-XYZ", Duration::ZERO);
entry.round = 3;
entry.reply_subject = "nsed.job-XYZ.result.3.agent.evaluate".into();
buf.push(entry).await;
buf.stop("lc-1").await;
buf.update_payload_with_annotation(
"lc-1",
b"edited".to_vec(),
OperatorAnnotation {
annotation_type: AnnotationType::Edit,
comment: "regen".into(),
timestamp: "t".into(),
original_content_hash: None,
},
)
.await;
buf.unstop("lc-1").await;
let drained = buf.drain_ready().await;
assert_eq!(drained.len(), 1);
let e = &drained[0];
assert_eq!(e.job_id, "job-XYZ");
assert_eq!(e.action, "evaluate");
assert_eq!(e.round, 3);
assert_eq!(e.reply_subject, "nsed.job-XYZ.result.3.agent.evaluate");
assert!(e.edited);
}
#[test]
fn test_compute_divergence_both_signals() {
let div = super::compute_divergence(Some(0.6), Some(0.25));
assert!((div.unwrap() - 0.3125).abs() < 0.01);
}
#[test]
fn test_compute_divergence_score_only() {
let div = super::compute_divergence(Some(1.0), None);
assert!((div.unwrap() - 0.25).abs() < 0.01);
}
#[test]
fn test_compute_divergence_score_low() {
let div = super::compute_divergence(Some(-0.8), None);
assert!((div.unwrap() - 0.722).abs() < 0.02);
}
#[test]
fn test_compute_divergence_std_dev_only() {
let div = super::compute_divergence(None, Some(0.25));
assert!((div.unwrap() - 0.25).abs() < 0.01);
}
#[test]
fn test_compute_divergence_none() {
let div = super::compute_divergence(None, None);
assert!(div.is_none());
}
#[test]
fn test_compute_divergence_perfect_score() {
let div = super::compute_divergence(Some(3.0), Some(0.0));
assert!((div.unwrap() - 0.125).abs() < 0.01);
}
#[test]
fn test_compute_divergence_worst_score() {
let div = super::compute_divergence(Some(-3.0), Some(1.2));
assert!((div.unwrap() - 1.0).abs() < 0.01);
}
#[test]
fn test_compute_divergence_large_positive_score() {
let div = super::compute_divergence(Some(10.0), None);
assert!(
div.unwrap() < 0.1,
"large positive should give low divergence"
);
}
#[test]
fn test_compute_divergence_large_negative_score() {
let div = super::compute_divergence(Some(-10.0), None);
assert!(
div.unwrap() > 0.9,
"large negative should give high divergence"
);
}
#[test]
fn test_auto_approve_default_on() {
let buf = ResponseBuffer::new(Duration::from_secs(10));
assert!(
buf.is_auto_approve(),
"auto-approve should be ON by default"
);
}
#[test]
fn test_auto_approve_toggle() {
let buf = ResponseBuffer::new(Duration::from_secs(10));
assert!(buf.is_auto_approve());
buf.set_auto_approve(false);
assert!(!buf.is_auto_approve());
buf.set_auto_approve(true);
assert!(buf.is_auto_approve());
}
#[test]
fn test_auto_approve_threshold_default() {
let buf = ResponseBuffer::new(Duration::from_secs(10));
assert!(
(buf.auto_approve_threshold() - 1.0).abs() < 0.01,
"auto_approve_threshold default should be 1.0 (release everything)"
);
}
#[tokio::test]
async fn test_default_config_releases_every_entry_regardless_of_divergence() {
let buf = ResponseBuffer::new(Duration::from_secs(60));
buf.push(make_entry("a", "propose", "job-1", Duration::from_secs(60)))
.await;
buf.push(make_entry("b", "propose", "job-2", Duration::from_secs(60)))
.await;
buf.push(make_entry("c", "propose", "job-3", Duration::from_secs(60)))
.await;
let count = buf.auto_release_if_eligible(Some(1.0)).await;
assert_eq!(
count, 3,
"all three entries should auto-release under the default 100% threshold"
);
}
#[test]
fn test_auto_approve_threshold_set_and_get() {
let buf = ResponseBuffer::new(Duration::from_secs(10));
buf.set_auto_approve_threshold(0.75);
assert!((buf.auto_approve_threshold() - 0.75).abs() < 0.01);
buf.set_auto_approve_threshold(0.1);
assert!((buf.auto_approve_threshold() - 0.1).abs() < 0.01);
}
#[test]
fn test_auto_approve_threshold_clamped() {
let buf = ResponseBuffer::new(Duration::from_secs(10));
buf.set_auto_approve_threshold(-0.5);
assert!((buf.auto_approve_threshold() - 0.0).abs() < 0.01);
buf.set_auto_approve_threshold(2.0);
assert!((buf.auto_approve_threshold() - 1.0).abs() < 0.01);
}
#[tokio::test]
async fn test_auto_release_when_eligible() {
let buf = ResponseBuffer::new(Duration::from_secs(60));
buf.set_auto_approve(true);
buf.set_auto_approve_threshold(0.5);
buf.push(make_entry("a", "propose", "job-1", Duration::from_secs(60)))
.await;
let count = buf.auto_release_if_eligible(Some(0.2)).await;
assert_eq!(count, 1);
let drained = buf.drain_ready().await;
assert_eq!(drained.len(), 1);
}
#[tokio::test]
async fn test_auto_release_skipped_when_disabled() {
let buf = ResponseBuffer::new(Duration::from_secs(60));
buf.set_auto_approve(false);
buf.set_auto_approve_threshold(0.5);
buf.push(make_entry("a", "propose", "job-1", Duration::from_secs(60)))
.await;
let count = buf.auto_release_if_eligible(Some(0.2)).await;
assert_eq!(count, 0);
let drained = buf.drain_ready().await;
assert!(drained.is_empty());
}
#[tokio::test]
async fn test_auto_release_skipped_when_divergence_above_threshold() {
let buf = ResponseBuffer::new(Duration::from_secs(60));
buf.set_auto_approve(true);
buf.set_auto_approve_threshold(0.3);
buf.push(make_entry("a", "propose", "job-1", Duration::from_secs(60)))
.await;
let count = buf.auto_release_if_eligible(Some(0.5)).await;
assert_eq!(count, 0);
let drained = buf.drain_ready().await;
assert!(drained.is_empty());
}
#[tokio::test]
async fn test_auto_release_with_no_divergence_data_trusts_operator() {
let buf = ResponseBuffer::new(Duration::from_secs(60));
buf.set_auto_approve(true);
buf.set_auto_approve_threshold(0.5);
buf.push(make_entry("a", "propose", "job-1", Duration::from_secs(60)))
.await;
let count = buf.auto_release_if_eligible(None).await;
assert_eq!(count, 1);
let drained = buf.drain_ready().await;
assert_eq!(drained.len(), 1);
}
#[tokio::test]
async fn test_auto_release_respects_stopped_flag() {
let buf = ResponseBuffer::new(Duration::from_secs(60));
buf.set_auto_approve(true);
buf.set_auto_approve_threshold(0.5);
buf.push(make_entry("a", "propose", "job-1", Duration::from_secs(60)))
.await;
buf.stop("a").await;
let count = buf.auto_release_if_eligible(Some(0.1)).await;
assert_eq!(count, 0);
let drained = buf.drain_ready().await;
assert!(drained.is_empty());
}
#[tokio::test]
async fn test_auto_release_at_exact_threshold_releases() {
let buf = ResponseBuffer::new(Duration::from_secs(60));
buf.set_auto_approve(true);
buf.set_auto_approve_threshold(0.5);
buf.push(make_entry("a", "propose", "job-1", Duration::from_secs(60)))
.await;
let count = buf.auto_release_if_eligible(Some(0.5)).await;
assert_eq!(count, 1);
}
#[tokio::test]
async fn test_auto_release_above_threshold_does_not_release() {
let buf = ResponseBuffer::new(Duration::from_secs(60));
buf.set_auto_approve(true);
buf.set_auto_approve_threshold(0.5);
buf.push(make_entry("a", "propose", "job-1", Duration::from_secs(60)))
.await;
let count = buf.auto_release_if_eligible(Some(0.51)).await;
assert_eq!(count, 0);
}
#[tokio::test]
async fn test_auto_release_multiple_entries() {
let buf = ResponseBuffer::new(Duration::from_secs(60));
buf.set_auto_approve(true);
buf.set_auto_approve_threshold(0.5);
buf.push(make_entry("a", "propose", "job-1", Duration::from_secs(60)))
.await;
buf.push(make_entry(
"b",
"evaluate",
"job-1",
Duration::from_secs(60),
))
.await;
buf.push(make_entry("c", "propose", "job-2", Duration::from_secs(60)))
.await;
let count = buf.auto_release_if_eligible(Some(0.2)).await;
assert_eq!(count, 3);
let drained = buf.drain_ready().await;
assert_eq!(drained.len(), 3);
}
#[tokio::test]
async fn test_invariant_release_in_ms_non_negative_after_push() {
let buf = ResponseBuffer::new(Duration::from_secs(60));
buf.set_response_sla(Duration::from_secs(600));
let task_received = Instant::now();
let entry = make_entry("inv-1", "propose", "job-1", Duration::from_secs(60));
buf.push_with_deadline(entry, task_received).await;
let list = buf.list().await;
assert_eq!(list.len(), 1);
assert!(
list[0].release_in_ms >= 0,
"invariant: buffered item must have release_in_ms >= 0, got {}",
list[0].release_in_ms,
);
}
#[tokio::test]
async fn test_invariant_slow_agent_still_non_negative() {
let buf = ResponseBuffer::new(Duration::from_secs(60));
buf.set_response_sla(Duration::from_secs(600));
let task_received = Instant::now() - Duration::from_secs(700);
let entry = make_entry("inv-2", "propose", "job-1", Duration::from_secs(60));
buf.push_with_deadline(entry, task_received).await;
let list = buf.list().await;
assert_eq!(list.len(), 1);
assert!(
list[0].release_in_ms >= 0,
"invariant: even for slow agents, release_in_ms must be >= 0, got {}",
list[0].release_in_ms,
);
let drained = buf.drain_ready().await;
assert_eq!(
drained.len(),
1,
"past-deadline entry should drain immediately"
);
}
#[tokio::test]
async fn test_invariant_paused_entry_stays_in_buffer() {
let buf = ResponseBuffer::new(Duration::ZERO);
buf.push(make_entry("inv-3", "propose", "job-1", Duration::ZERO))
.await;
buf.pause();
let drained = buf.drain_ready().await;
assert!(
drained.is_empty(),
"paused buffer must not drain — entry stays visible in rainfall"
);
assert_eq!(buf.len().await, 1, "entry must remain in buffer");
let list = buf.list().await;
assert_eq!(list.len(), 1);
}
#[tokio::test]
async fn test_invariant_stopped_entry_stays_in_buffer() {
let buf = ResponseBuffer::new(Duration::ZERO);
buf.push(make_entry("inv-4", "propose", "job-1", Duration::ZERO))
.await;
buf.stop("inv-4").await;
let drained = buf.drain_ready().await;
assert!(
drained.is_empty(),
"stopped entry must not drain even though release_at passed"
);
assert_eq!(buf.len().await, 1);
}
#[tokio::test]
async fn test_invariant_full_lifecycle_no_surprise_overdue() {
let buf = ResponseBuffer::new(Duration::from_secs(60));
buf.set_auto_approve(false);
buf.set_response_sla(Duration::from_secs(600));
let task_received = Instant::now();
let entry = make_entry("inv-5", "propose", "job-1", Duration::from_secs(60));
buf.push_with_deadline(entry, task_received).await;
let snap1 = buf.list().await;
assert!(snap1[0].release_in_ms > 0, "snap1: should be positive");
let detail = buf.get_detail("inv-5").await.unwrap();
assert!(
detail.summary.release_in_ms > 0,
"get_detail: should be positive immediately after push"
);
assert_eq!(buf.len().await, 1, "entry still buffered");
}
#[tokio::test]
async fn test_buffer_stop_and_unstop() {
let buf = ResponseBuffer::new(Duration::ZERO);
buf.push(make_entry("e1", "propose", "job_1", Duration::ZERO))
.await;
buf.push(make_entry("e2", "evaluate", "job_2", Duration::ZERO))
.await;
assert!(buf.stop("e1").await);
let drained = buf.drain_ready().await;
assert_eq!(drained.len(), 1, "only e2 should drain");
assert_eq!(drained[0].id, "e2");
assert_eq!(buf.len().await, 1, "e1 should still be in buffer");
assert!(buf.unstop("e1").await);
let drained = buf.drain_ready().await;
assert_eq!(drained.len(), 1, "e1 should drain after unstop");
assert_eq!(drained[0].id, "e1");
assert!(buf.is_empty().await);
}
#[tokio::test]
async fn test_buffer_mark_for_release() {
let buf = ResponseBuffer::new(Duration::from_secs(600));
buf.push(make_entry(
"mr-1",
"propose",
"job-1",
Duration::from_secs(600),
))
.await;
buf.push(make_entry(
"mr-2",
"evaluate",
"job-1",
Duration::from_secs(600),
))
.await;
assert!(buf.drain_ready().await.is_empty());
assert!(buf.mark_for_release("mr-1").await);
let drained = buf.drain_ready().await;
assert_eq!(drained.len(), 1);
assert_eq!(drained[0].id, "mr-1");
assert_eq!(buf.len().await, 1, "mr-2 should still be held");
assert!(!buf.mark_for_release("nonexistent").await);
}
#[test]
fn test_buffer_compute_divergence() {
let div = super::compute_divergence(Some(3.0), Some(0.0));
assert!(
div.unwrap() < 0.2,
"strong endorsement should have low divergence, got {}",
div.unwrap()
);
let div_high = super::compute_divergence(Some(-2.0), Some(1.5));
assert!(
(div_high.unwrap() - 1.0).abs() < 0.01,
"rejected + high stddev should saturate divergence, got {}",
div_high.unwrap()
);
let div_empty = super::compute_divergence(None, None);
assert!(
div_empty.is_none(),
"empty recent scores should return None"
);
}
#[tokio::test]
async fn test_buffer_auto_release_if_eligible() {
let buf = ResponseBuffer::new(Duration::from_secs(600));
buf.set_auto_approve(true);
buf.set_auto_approve_threshold(0.4);
buf.push(make_entry(
"ar-1",
"propose",
"job-1",
Duration::from_secs(600),
))
.await;
buf.push(make_entry(
"ar-2",
"evaluate",
"job-1",
Duration::from_secs(600),
))
.await;
buf.stop("ar-2").await;
buf.push(make_entry(
"ar-3",
"propose",
"job-2",
Duration::from_secs(600),
))
.await;
let count = buf.auto_release_if_eligible(Some(0.1)).await;
assert_eq!(count, 2, "only non-stopped entries should be auto-released");
let drained = buf.drain_ready().await;
assert_eq!(drained.len(), 2);
let ids: Vec<&str> = drained.iter().map(|e| e.id.as_str()).collect();
assert!(ids.contains(&"ar-1"));
assert!(ids.contains(&"ar-3"));
assert!(!ids.contains(&"ar-2"));
assert_eq!(buf.len().await, 1, "ar-2 should remain (stopped)");
buf.push(make_entry(
"ar-4",
"propose",
"job-3",
Duration::from_secs(600),
))
.await;
let count_high = buf.auto_release_if_eligible(Some(0.6)).await;
assert_eq!(
count_high, 0,
"high divergence should not auto-release any entries"
);
assert!(
buf.drain_ready().await.is_empty(),
"no entries should drain when divergence is above threshold"
);
}
}