pub mod actions;
pub mod escalation;
pub mod resolve;
pub use escalation::parse_duration;
pub use resolve::status_to_reaction_key;
use crate::{
config::AoConfig,
error::Result,
events::OrchestratorEvent,
notifier::NotifierRegistry,
reactions::{EscalateAfter, ReactionAction, ReactionConfig, ReactionOutcome},
traits::{Runtime, Scm},
types::{Session, SessionId},
};
use escalation::TrackerState;
use resolve::merge_reaction_config;
use std::{
collections::{HashMap, HashSet},
sync::{Arc, Mutex},
time::Instant,
};
use tokio::sync::broadcast;
pub struct ReactionEngine {
config: HashMap<String, ReactionConfig>,
ao_config: Option<Arc<AoConfig>>,
runtime: Arc<dyn Runtime>,
events_tx: broadcast::Sender<OrchestratorEvent>,
trackers: Mutex<HashMap<(SessionId, String), TrackerState>>,
warned_parse_failures: Mutex<HashSet<String>>,
scm: Option<Arc<dyn Scm>>,
notifier_registry: Option<NotifierRegistry>,
}
impl ReactionEngine {
pub fn new(
config: HashMap<String, ReactionConfig>,
runtime: Arc<dyn Runtime>,
events_tx: broadcast::Sender<OrchestratorEvent>,
) -> Self {
Self {
config,
ao_config: None,
runtime,
events_tx,
trackers: Mutex::new(HashMap::new()),
warned_parse_failures: Mutex::new(HashSet::new()),
scm: None,
notifier_registry: None,
}
}
pub fn new_with_config(
ao_config: Arc<AoConfig>,
runtime: Arc<dyn Runtime>,
events_tx: broadcast::Sender<OrchestratorEvent>,
) -> Self {
Self {
config: HashMap::new(),
ao_config: Some(ao_config),
runtime,
events_tx,
trackers: Mutex::new(HashMap::new()),
warned_parse_failures: Mutex::new(HashSet::new()),
scm: None,
notifier_registry: None,
}
}
fn global_reactions(&self) -> &HashMap<String, ReactionConfig> {
self.ao_config
.as_ref()
.map(|c| &c.reactions)
.unwrap_or(&self.config)
}
pub fn resolve_reaction_config(&self, session: &Session, key: &str) -> Option<ReactionConfig> {
let global = self.global_reactions().get(key).cloned();
let project = self
.ao_config
.as_ref()
.and_then(|c| c.projects.get(&session.project_id))
.and_then(|p| p.reactions.get(key))
.cloned();
match (global, project) {
(Some(g), Some(p)) => Some(merge_reaction_config(g, p)),
(Some(g), None) => Some(g),
(None, Some(p)) => Some(p),
(None, None) => None,
}
}
pub fn with_scm(mut self, scm: Arc<dyn Scm>) -> Self {
self.scm = Some(scm);
self
}
pub fn with_notifier_registry(mut self, registry: NotifierRegistry) -> Self {
self.notifier_registry = Some(registry);
self
}
pub async fn dispatch(
&self,
session: &Session,
reaction_key: &str,
) -> Result<Option<ReactionOutcome>> {
let Some(cfg) = self.resolve_reaction_config(session, reaction_key) else {
tracing::debug!(
reaction = reaction_key,
session = %session.id,
"no reaction configured; skipping"
);
return Ok(None);
};
self.dispatch_with_cfg(session, reaction_key, cfg).await
}
pub async fn dispatch_with_message(
&self,
session: &Session,
reaction_key: &str,
message_override: String,
) -> Result<Option<ReactionOutcome>> {
let Some(mut cfg) = self.resolve_reaction_config(session, reaction_key) else {
tracing::debug!(
reaction = reaction_key,
session = %session.id,
"no reaction configured; skipping"
);
return Ok(None);
};
cfg.message = Some(message_override);
self.dispatch_with_cfg(session, reaction_key, cfg).await
}
async fn dispatch_with_cfg(
&self,
session: &Session,
reaction_key: &str,
cfg: crate::reactions::ReactionConfig,
) -> Result<Option<ReactionOutcome>> {
if !cfg.auto {
if cfg.action == ReactionAction::Notify {
let outcome = self
.dispatch_notify(session, reaction_key, &cfg, false)
.await;
return Ok(Some(outcome));
}
tracing::debug!(
reaction = reaction_key,
session = %session.id,
"reaction auto: false; skipping non-notify action"
);
return Ok(None);
}
let duration_gate: Option<std::time::Duration> = match cfg.escalate_after {
Some(EscalateAfter::Duration(ref s)) => match parse_duration(s) {
Some(d) => Some(d),
None => {
self.warn_once_parse_failure(reaction_key, "escalate_after", s);
None
}
},
_ => None,
};
let (attempts, should_escalate) = {
let mut trackers = self.trackers.lock().unwrap_or_else(|e| {
tracing::error!("reaction tracker mutex poisoned; recovering inner state: {e}");
e.into_inner()
});
let entry = trackers
.entry((session.id.clone(), reaction_key.to_string()))
.or_insert_with(|| TrackerState {
attempts: 0,
first_triggered_at: Instant::now(),
});
entry.attempts += 1;
let attempts = entry.attempts;
let max_attempts = cfg.retries;
let mut escalate = max_attempts.is_some_and(|n| attempts > n);
if let Some(EscalateAfter::Attempts(n)) = cfg.escalate_after {
if attempts > n {
escalate = true;
}
} else if let Some(d) = duration_gate {
if entry.first_triggered_at.elapsed() > d {
escalate = true;
}
}
(attempts, escalate)
};
if should_escalate {
self.emit(OrchestratorEvent::ReactionEscalated {
id: session.id.clone(),
reaction_key: reaction_key.to_string(),
attempts,
});
let outcome = self
.dispatch_notify(session, reaction_key, &cfg, true)
.await;
return Ok(Some(outcome));
}
let outcome = match cfg.action {
ReactionAction::SendToAgent => {
self.dispatch_send_to_agent(session, reaction_key, &cfg)
.await
}
ReactionAction::Notify => {
self.dispatch_notify(session, reaction_key, &cfg, false)
.await
}
ReactionAction::AutoMerge => {
self.dispatch_auto_merge(session, reaction_key, &cfg).await
}
};
Ok(Some(outcome))
}
pub fn clear_tracker(&self, session_id: &SessionId, reaction_key: &str) {
let mut trackers = self.trackers.lock().unwrap_or_else(|e| {
tracing::error!("reaction tracker mutex poisoned; recovering inner state: {e}");
e.into_inner()
});
trackers.remove(&(session_id.clone(), reaction_key.to_string()));
}
pub fn clear_all_for_session(&self, session_id: &SessionId) {
let mut trackers = self.trackers.lock().unwrap_or_else(|e| {
tracing::error!("reaction tracker mutex poisoned; recovering inner state: {e}");
e.into_inner()
});
trackers.retain(|(sid, _), _| sid != session_id);
}
pub fn attempts(&self, session_id: &SessionId, reaction_key: &str) -> u32 {
self.trackers
.lock()
.unwrap_or_else(|e| {
tracing::error!("reaction tracker mutex poisoned; recovering inner state: {e}");
e.into_inner()
})
.get(&(session_id.clone(), reaction_key.to_string()))
.map(|t| t.attempts)
.unwrap_or(0)
}
#[cfg(test)]
fn first_triggered_at(&self, session_id: &SessionId, reaction_key: &str) -> Option<Instant> {
self.trackers
.lock()
.unwrap_or_else(|e| {
tracing::error!("reaction tracker mutex poisoned; recovering inner state: {e}");
e.into_inner()
})
.get(&(session_id.clone(), reaction_key.to_string()))
.map(|t| t.first_triggered_at)
}
pub(crate) fn warn_once_parse_failure(&self, reaction_key: &str, field: &str, raw: &str) {
let key = format!("{reaction_key}.{field}");
let mut warned = self.warned_parse_failures.lock().unwrap_or_else(|e| {
tracing::error!(
"reaction warned_parse_failures mutex poisoned; recovering inner state: {e}"
);
e.into_inner()
});
if warned.insert(key) {
tracing::warn!(
reaction = reaction_key,
field = field,
value = raw,
"ignoring unparseable duration string; expected `^\\d+(s|m|h)$`"
);
}
}
fn emit(&self, event: OrchestratorEvent) {
let _ = self.events_tx.send(event);
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{
config::{AoConfig, ProjectConfig},
notifier::NotificationPayload,
reactions::{EscalateAfter, EventPriority, ReactionAction, ReactionConfig},
traits::Runtime,
types::{now_ms, ActivityState, Session, SessionId, SessionStatus},
};
use async_trait::async_trait;
use std::path::{Path, PathBuf};
use std::sync::atomic::Ordering;
use std::sync::Mutex as StdMutex;
struct RecordingRuntime {
sends: StdMutex<Vec<(String, String)>>,
fail_send: std::sync::atomic::AtomicBool,
}
impl RecordingRuntime {
fn new() -> Self {
Self {
sends: StdMutex::new(Vec::new()),
fail_send: std::sync::atomic::AtomicBool::new(false),
}
}
fn sends(&self) -> Vec<(String, String)> {
self.sends.lock().unwrap().clone()
}
}
#[async_trait]
impl Runtime for RecordingRuntime {
async fn create(
&self,
_session_id: &str,
_cwd: &Path,
_launch_command: &str,
_env: &[(String, String)],
) -> Result<String> {
Ok("mock-handle".into())
}
async fn send_message(&self, handle: &str, msg: &str) -> Result<()> {
if self.fail_send.load(Ordering::SeqCst) {
return Err(crate::error::AoError::Runtime("mock send failed".into()));
}
self.sends
.lock()
.unwrap()
.push((handle.to_string(), msg.to_string()));
Ok(())
}
async fn is_alive(&self, _handle: &str) -> Result<bool> {
Ok(true)
}
async fn destroy(&self, _handle: &str) -> Result<()> {
Ok(())
}
}
fn fake_session(id: &str) -> Session {
Session {
id: SessionId(id.into()),
project_id: "demo".into(),
status: SessionStatus::CiFailed,
agent: "claude-code".into(),
agent_config: None,
branch: format!("ao-{id}"),
task: "t".into(),
workspace_path: Some(PathBuf::from("/tmp/ws")),
runtime_handle: Some(format!("handle-{id}")),
runtime: "tmux".into(),
activity: Some(ActivityState::Ready),
created_at: now_ms(),
cost: None,
issue_id: None,
issue_url: None,
claimed_pr_number: None,
claimed_pr_url: None,
initial_prompt_override: None,
spawned_by: None,
last_merge_conflict_dispatched: None,
last_review_backlog_fingerprint: None,
}
}
fn minimal_project(reactions: HashMap<String, ReactionConfig>) -> ProjectConfig {
ProjectConfig {
name: None,
repo: "test/test".into(),
path: "/tmp/ao-test-project".into(),
default_branch: "main".into(),
session_prefix: None,
branch_namespace: None,
runtime: None,
agent: None,
workspace: None,
tracker: None,
scm: None,
symlinks: vec![],
post_create: vec![],
agent_config: None,
orchestrator: None,
worker: None,
reactions,
agent_rules: None,
agent_rules_file: None,
orchestrator_rules: None,
orchestrator_session_strategy: None,
opencode_issue_session_strategy: None,
}
}
fn build(
cfg_map: HashMap<String, ReactionConfig>,
) -> (
Arc<ReactionEngine>,
Arc<RecordingRuntime>,
broadcast::Receiver<OrchestratorEvent>,
) {
let runtime = Arc::new(RecordingRuntime::new());
let (tx, rx) = broadcast::channel(32);
let engine = Arc::new(ReactionEngine::new(
cfg_map,
runtime.clone() as Arc<dyn Runtime>,
tx,
));
(engine, runtime, rx)
}
fn drain(rx: &mut broadcast::Receiver<OrchestratorEvent>) -> Vec<OrchestratorEvent> {
let mut out = Vec::new();
while let Ok(e) = rx.try_recv() {
out.push(e);
}
out
}
#[test]
fn resolve_reaction_config_merges_global_and_project() {
let mut global = ReactionConfig::new(ReactionAction::SendToAgent);
global.message = Some("global-msg".into());
global.retries = Some(3);
global.auto = true;
global.priority = Some(EventPriority::Warning);
let mut proj_cfg = ReactionConfig::new(ReactionAction::Notify);
proj_cfg.message = None;
proj_cfg.retries = None;
proj_cfg.auto = false;
proj_cfg.priority = Some(EventPriority::Urgent);
let mut reactions = HashMap::new();
reactions.insert("ci-failed".into(), global);
let mut proj_reactions = HashMap::new();
proj_reactions.insert("ci-failed".into(), proj_cfg);
let mut projects = HashMap::new();
projects.insert("demo".into(), minimal_project(proj_reactions));
let ao = AoConfig {
reactions,
projects,
..Default::default()
};
let (tx, _rx) = broadcast::channel(4);
let engine = ReactionEngine::new_with_config(
Arc::new(ao),
Arc::new(RecordingRuntime::new()) as Arc<dyn Runtime>,
tx,
);
let session = fake_session("s1");
let resolved = engine
.resolve_reaction_config(&session, "ci-failed")
.expect("merged config");
assert_eq!(resolved.action, ReactionAction::Notify);
assert!(!resolved.auto);
assert_eq!(resolved.message.as_deref(), Some("global-msg"));
assert_eq!(resolved.retries, Some(3));
assert_eq!(resolved.priority, Some(EventPriority::Urgent));
}
#[test]
fn resolve_reaction_config_project_only_key() {
let mut proj_cfg = ReactionConfig::new(ReactionAction::Notify);
proj_cfg.message = Some("project-local".into());
let mut proj_reactions = HashMap::new();
proj_reactions.insert("only-in-project".into(), proj_cfg);
let mut projects = HashMap::new();
projects.insert("demo".into(), minimal_project(proj_reactions));
let ao = AoConfig {
projects,
..Default::default()
};
let (tx, _rx) = broadcast::channel(4);
let engine = ReactionEngine::new_with_config(
Arc::new(ao),
Arc::new(RecordingRuntime::new()) as Arc<dyn Runtime>,
tx,
);
let session = fake_session("s1");
let resolved = engine
.resolve_reaction_config(&session, "only-in-project")
.expect("project-only reaction");
assert_eq!(resolved.action, ReactionAction::Notify);
assert_eq!(resolved.message.as_deref(), Some("project-local"));
}
#[tokio::test]
async fn tracker_first_triggered_at_persists_across_dispatches() {
let mut config = ReactionConfig::new(ReactionAction::Notify);
config.message = Some("hi".into());
let mut map = HashMap::new();
map.insert("ci-failed".into(), config);
let (engine, _runtime, _rx) = build(map);
let session = fake_session("s1");
assert_eq!(engine.attempts(&session.id, "ci-failed"), 0);
assert!(engine
.first_triggered_at(&session.id, "ci-failed")
.is_none());
engine.dispatch(&session, "ci-failed").await.unwrap();
assert_eq!(engine.attempts(&session.id, "ci-failed"), 1);
let first = engine
.first_triggered_at(&session.id, "ci-failed")
.expect("first dispatch must populate first_triggered_at");
tokio::time::sleep(std::time::Duration::from_millis(5)).await;
engine.dispatch(&session, "ci-failed").await.unwrap();
assert_eq!(engine.attempts(&session.id, "ci-failed"), 2);
assert_eq!(
engine.first_triggered_at(&session.id, "ci-failed"),
Some(first),
"first_triggered_at must survive subsequent dispatches"
);
}
#[tokio::test]
async fn tracker_first_triggered_at_resets_after_clear() {
let mut config = ReactionConfig::new(ReactionAction::Notify);
config.message = Some("hi".into());
let mut map = HashMap::new();
map.insert("ci-failed".into(), config);
let (engine, _runtime, _rx) = build(map);
let session = fake_session("s1");
engine.dispatch(&session, "ci-failed").await.unwrap();
let first = engine
.first_triggered_at(&session.id, "ci-failed")
.expect("populated");
engine.clear_tracker(&session.id, "ci-failed");
assert_eq!(engine.attempts(&session.id, "ci-failed"), 0);
assert!(engine
.first_triggered_at(&session.id, "ci-failed")
.is_none());
tokio::time::sleep(std::time::Duration::from_millis(5)).await;
engine.dispatch(&session, "ci-failed").await.unwrap();
let second = engine
.first_triggered_at(&session.id, "ci-failed")
.expect("repopulated");
assert!(
second > first,
"second episode must start a fresh first_triggered_at"
);
}
#[tokio::test]
async fn dispatch_unconfigured_key_is_noop() {
let (engine, runtime, mut rx) = build(HashMap::new());
let session = fake_session("s1");
let result = engine.dispatch(&session, "ci-failed").await.unwrap();
assert!(result.is_none());
assert!(runtime.sends().is_empty());
assert!(drain(&mut rx).is_empty());
}
#[tokio::test]
async fn dispatch_send_to_agent_calls_runtime_and_emits_event() {
let mut config = ReactionConfig::new(ReactionAction::SendToAgent);
config.message = Some("CI broke — please fix.".into());
let mut map = HashMap::new();
map.insert("ci-failed".into(), config);
let (engine, runtime, mut rx) = build(map);
let session = fake_session("s1");
let result = engine
.dispatch(&session, "ci-failed")
.await
.unwrap()
.unwrap();
assert!(result.success);
assert!(!result.escalated);
assert_eq!(result.action, ReactionAction::SendToAgent);
assert_eq!(runtime.sends().len(), 1);
assert_eq!(runtime.sends()[0].0, "handle-s1");
assert_eq!(runtime.sends()[0].1, "CI broke — please fix.");
let events = drain(&mut rx);
assert_eq!(events.len(), 2, "got {events:?}");
assert!(events.iter().any(|e| matches!(
e,
OrchestratorEvent::ReactionTriggered {
reaction_key,
action: ReactionAction::SendToAgent,
..
} if reaction_key == "ci-failed"
)));
assert!(events.iter().any(|e| matches!(
e,
OrchestratorEvent::UiNotification { notification } if notification.reaction_key == "ci-failed"
)));
}
#[tokio::test]
async fn dispatch_send_to_agent_without_message_fails_softly() {
let config = ReactionConfig::new(ReactionAction::SendToAgent); let mut map = HashMap::new();
map.insert("ci-failed".into(), config);
let (engine, runtime, mut rx) = build(map);
let session = fake_session("s1");
let result = engine
.dispatch(&session, "ci-failed")
.await
.unwrap()
.unwrap();
assert!(!result.success);
assert!(runtime.sends().is_empty());
assert!(drain(&mut rx).is_empty());
}
#[tokio::test]
async fn dispatch_send_to_agent_propagates_runtime_send_failure_as_soft_failure() {
let mut config = ReactionConfig::new(ReactionAction::SendToAgent);
config.message = Some("fix it".into());
let mut map = HashMap::new();
map.insert("ci-failed".into(), config);
let (engine, runtime, mut rx) = build(map);
runtime.fail_send.store(true, Ordering::SeqCst);
let session = fake_session("s1");
let result = engine
.dispatch(&session, "ci-failed")
.await
.unwrap()
.unwrap();
assert!(!result.success);
assert_eq!(engine.attempts(&session.id, "ci-failed"), 1);
assert!(drain(&mut rx).is_empty());
}
#[tokio::test]
async fn dispatch_notify_emits_event_and_succeeds() {
let mut config = ReactionConfig::new(ReactionAction::Notify);
config.message = Some("approved & green".into());
let mut map = HashMap::new();
map.insert("approved-and-green".into(), config);
let (engine, runtime, mut rx) = build(map);
let mut session = fake_session("s1");
session.status = SessionStatus::Mergeable;
let result = engine
.dispatch(&session, "approved-and-green")
.await
.unwrap()
.unwrap();
assert!(result.success);
assert_eq!(result.action, ReactionAction::Notify);
assert!(runtime.sends().is_empty());
let events = drain(&mut rx);
assert_eq!(events.len(), 2, "got {events:?}");
assert!(events.iter().any(|e| matches!(
e,
OrchestratorEvent::ReactionTriggered {
action: ReactionAction::Notify,
..
}
)));
assert!(events.iter().any(|e| matches!(
e,
OrchestratorEvent::UiNotification { notification } if notification.action == ReactionAction::Notify
)));
}
#[tokio::test]
async fn dispatch_auto_merge_without_scm_falls_back_to_phase_d_behaviour() {
let config = ReactionConfig::new(ReactionAction::AutoMerge);
let mut map = HashMap::new();
map.insert("approved-and-green".into(), config);
let (engine, _runtime, mut rx) = build(map);
let mut session = fake_session("s1");
session.status = SessionStatus::Mergeable;
let result = engine
.dispatch(&session, "approved-and-green")
.await
.unwrap()
.unwrap();
assert!(result.success);
assert_eq!(result.action, ReactionAction::AutoMerge);
let events = drain(&mut rx);
assert_eq!(events.len(), 1);
assert!(matches!(
&events[0],
OrchestratorEvent::ReactionTriggered {
action: ReactionAction::AutoMerge,
..
}
));
}
use crate::scm::{
CheckRun, CiStatus, MergeMethod, MergeReadiness, PrState, PullRequest, Review,
ReviewComment, ReviewDecision,
};
struct MergeMockScm {
pr: StdMutex<Option<PullRequest>>,
readiness: StdMutex<MergeReadiness>,
merge_calls: StdMutex<Vec<(u32, Option<MergeMethod>)>>,
detect_pr_errors: std::sync::atomic::AtomicBool,
merge_errors: std::sync::atomic::AtomicBool,
}
impl MergeMockScm {
fn new(pr: Option<PullRequest>, readiness: MergeReadiness) -> Self {
Self {
pr: StdMutex::new(pr),
readiness: StdMutex::new(readiness),
merge_calls: StdMutex::new(Vec::new()),
detect_pr_errors: std::sync::atomic::AtomicBool::new(false),
merge_errors: std::sync::atomic::AtomicBool::new(false),
}
}
fn merges(&self) -> Vec<(u32, Option<MergeMethod>)> {
self.merge_calls.lock().unwrap().clone()
}
}
#[async_trait]
impl Scm for MergeMockScm {
fn name(&self) -> &str {
"merge-mock"
}
async fn detect_pr(&self, _session: &Session) -> Result<Option<PullRequest>> {
if self.detect_pr_errors.load(Ordering::SeqCst) {
return Err(crate::error::AoError::Runtime("detect_pr".into()));
}
Ok(self.pr.lock().unwrap().clone())
}
async fn pr_state(&self, _pr: &PullRequest) -> Result<PrState> {
Ok(PrState::Open)
}
async fn ci_checks(&self, _pr: &PullRequest) -> Result<Vec<CheckRun>> {
Ok(vec![])
}
async fn ci_status(&self, _pr: &PullRequest) -> Result<CiStatus> {
Ok(CiStatus::Passing)
}
async fn reviews(&self, _pr: &PullRequest) -> Result<Vec<Review>> {
Ok(vec![])
}
async fn review_decision(&self, _pr: &PullRequest) -> Result<ReviewDecision> {
Ok(ReviewDecision::Approved)
}
async fn pending_comments(&self, _pr: &PullRequest) -> Result<Vec<ReviewComment>> {
Ok(vec![])
}
async fn mergeability(&self, _pr: &PullRequest) -> Result<MergeReadiness> {
Ok(self.readiness.lock().unwrap().clone())
}
async fn merge(&self, pr: &PullRequest, method: Option<MergeMethod>) -> Result<()> {
if self.merge_errors.load(Ordering::SeqCst) {
return Err(crate::error::AoError::Runtime("merge failed".into()));
}
self.merge_calls.lock().unwrap().push((pr.number, method));
Ok(())
}
}
fn ready_readiness() -> MergeReadiness {
MergeReadiness {
mergeable: true,
ci_passing: true,
approved: true,
no_conflicts: true,
blockers: vec![],
}
}
fn fake_pr(number: u32) -> PullRequest {
PullRequest {
number,
url: format!("https://github.com/acme/widgets/pull/{number}"),
title: "fix the widgets".into(),
owner: "acme".into(),
repo: "widgets".into(),
branch: "ao-s1".into(),
base_branch: "main".into(),
is_draft: false,
}
}
fn build_with_scm(
cfg_map: HashMap<String, ReactionConfig>,
scm: Arc<dyn Scm>,
) -> (
Arc<ReactionEngine>,
Arc<RecordingRuntime>,
broadcast::Receiver<OrchestratorEvent>,
) {
let runtime = Arc::new(RecordingRuntime::new());
let (tx, rx) = broadcast::channel(32);
let engine = Arc::new(
ReactionEngine::new(cfg_map, runtime.clone() as Arc<dyn Runtime>, tx).with_scm(scm),
);
(engine, runtime, rx)
}
#[tokio::test]
async fn dispatch_auto_merge_with_ready_pr_calls_scm_merge() {
let config = ReactionConfig::new(ReactionAction::AutoMerge);
let mut map = HashMap::new();
map.insert("approved-and-green".into(), config);
let scm = Arc::new(MergeMockScm::new(Some(fake_pr(42)), ready_readiness()));
let (engine, _runtime, mut rx) = build_with_scm(map, scm.clone() as Arc<dyn Scm>);
let mut session = fake_session("s1");
session.status = SessionStatus::Mergeable;
let result = engine
.dispatch(&session, "approved-and-green")
.await
.unwrap()
.unwrap();
assert!(result.success);
assert_eq!(result.action, ReactionAction::AutoMerge);
assert_eq!(scm.merges().len(), 1, "expected one merge call");
assert_eq!(scm.merges()[0], (42, None));
assert!(result.message.unwrap().contains("merged PR #42"));
let events = drain(&mut rx);
assert_eq!(events.len(), 1);
assert!(matches!(
&events[0],
OrchestratorEvent::ReactionTriggered {
action: ReactionAction::AutoMerge,
..
}
));
}
#[tokio::test]
async fn dispatch_auto_merge_with_stale_green_observation_does_not_merge() {
let config = ReactionConfig::new(ReactionAction::AutoMerge);
let mut map = HashMap::new();
map.insert("approved-and-green".into(), config);
let stale = MergeReadiness {
mergeable: false,
ci_passing: false,
approved: true,
no_conflicts: true,
blockers: vec!["CI is failing".into()],
};
let scm = Arc::new(MergeMockScm::new(Some(fake_pr(42)), stale));
let (engine, _runtime, mut rx) = build_with_scm(map, scm.clone() as Arc<dyn Scm>);
let mut session = fake_session("s1");
session.status = SessionStatus::Mergeable;
let result = engine
.dispatch(&session, "approved-and-green")
.await
.unwrap()
.unwrap();
assert!(!result.success, "stale observation must not merge");
assert!(scm.merges().is_empty(), "Scm::merge must not be called");
let events = drain(&mut rx);
assert!(
events.is_empty(),
"stale-green skip must not emit events, got {events:?}"
);
}
#[tokio::test]
async fn dispatch_auto_merge_with_no_pr_returns_soft_failure() {
let config = ReactionConfig::new(ReactionAction::AutoMerge);
let mut map = HashMap::new();
map.insert("approved-and-green".into(), config);
let scm = Arc::new(MergeMockScm::new(None, ready_readiness()));
let (engine, _runtime, mut rx) = build_with_scm(map, scm.clone() as Arc<dyn Scm>);
let mut session = fake_session("s1");
session.status = SessionStatus::Mergeable;
let result = engine
.dispatch(&session, "approved-and-green")
.await
.unwrap()
.unwrap();
assert!(!result.success);
assert!(scm.merges().is_empty());
let events = drain(&mut rx);
assert!(
events.is_empty(),
"no-PR skip must not emit events, got {events:?}"
);
}
#[tokio::test]
async fn dispatch_auto_merge_with_detect_pr_error_returns_soft_failure() {
let config = ReactionConfig::new(ReactionAction::AutoMerge);
let mut map = HashMap::new();
map.insert("approved-and-green".into(), config);
let scm = Arc::new(MergeMockScm::new(Some(fake_pr(42)), ready_readiness()));
scm.detect_pr_errors.store(true, Ordering::SeqCst);
let (engine, _runtime, mut rx) = build_with_scm(map, scm.clone() as Arc<dyn Scm>);
let mut session = fake_session("s1");
session.status = SessionStatus::Mergeable;
let result = engine
.dispatch(&session, "approved-and-green")
.await
.unwrap()
.unwrap();
assert!(!result.success);
assert!(scm.merges().is_empty(), "merge must not run on detect err");
let events = drain(&mut rx);
assert!(
events.is_empty(),
"detect_pr error must not emit events, got {events:?}"
);
}
#[tokio::test]
async fn dispatch_auto_merge_propagates_merge_error_as_soft_failure() {
let config = ReactionConfig::new(ReactionAction::AutoMerge);
let mut map = HashMap::new();
map.insert("approved-and-green".into(), config);
let scm = Arc::new(MergeMockScm::new(Some(fake_pr(42)), ready_readiness()));
scm.merge_errors.store(true, Ordering::SeqCst);
let (engine, _runtime, _rx) = build_with_scm(map, scm.clone() as Arc<dyn Scm>);
let mut session = fake_session("s1");
session.status = SessionStatus::Mergeable;
let result = engine
.dispatch(&session, "approved-and-green")
.await
.unwrap()
.unwrap();
assert!(!result.success);
assert!(
result.message.unwrap().contains("merge failed"),
"error message should surface"
);
}
#[tokio::test]
async fn dispatch_auto_false_skips_active_actions_but_allows_notify() {
let mut sta = ReactionConfig::new(ReactionAction::SendToAgent);
sta.auto = false;
sta.message = Some("noop".into());
let mut map = HashMap::new();
map.insert("ci-failed".into(), sta);
let mut notify = ReactionConfig::new(ReactionAction::Notify);
notify.auto = false;
map.insert("approved-and-green".into(), notify);
let (engine, runtime, mut rx) = build(map);
let s1 = fake_session("s1");
assert!(engine.dispatch(&s1, "ci-failed").await.unwrap().is_none());
assert!(runtime.sends().is_empty());
assert!(drain(&mut rx).is_empty());
let mut s2 = fake_session("s2");
s2.status = SessionStatus::Mergeable;
let result = engine
.dispatch(&s2, "approved-and-green")
.await
.unwrap()
.unwrap();
assert!(result.success);
assert_eq!(result.action, ReactionAction::Notify);
}
#[tokio::test]
async fn retries_exhausted_escalates_to_notify_and_emits_both_events() {
let mut config = ReactionConfig::new(ReactionAction::SendToAgent);
config.message = Some("fix".into());
config.retries = Some(2);
let mut map = HashMap::new();
map.insert("ci-failed".into(), config);
let (engine, runtime, mut rx) = build(map);
let session = fake_session("s1");
let r1 = engine
.dispatch(&session, "ci-failed")
.await
.unwrap()
.unwrap();
assert!(r1.success);
assert!(!r1.escalated);
let r2 = engine
.dispatch(&session, "ci-failed")
.await
.unwrap()
.unwrap();
assert!(r2.success);
assert!(!r2.escalated);
assert_eq!(runtime.sends().len(), 2);
let r3 = engine
.dispatch(&session, "ci-failed")
.await
.unwrap()
.unwrap();
assert!(r3.escalated);
assert_eq!(r3.action, ReactionAction::Notify);
assert_eq!(runtime.sends().len(), 2);
let events = drain(&mut rx);
assert_eq!(events.len(), 7, "got {events:?}");
let escalated_count = events
.iter()
.filter(|e| matches!(e, OrchestratorEvent::ReactionEscalated { .. }))
.count();
assert_eq!(escalated_count, 1);
assert!(events.iter().any(|e| matches!(
e,
OrchestratorEvent::ReactionTriggered {
action: ReactionAction::Notify,
..
}
)));
assert!(matches!(
events.last().unwrap(),
OrchestratorEvent::UiNotification { .. }
));
}
#[tokio::test]
async fn escalate_after_attempts_escalates_independently_of_retries() {
let mut config = ReactionConfig::new(ReactionAction::SendToAgent);
config.message = Some("fix".into());
config.escalate_after = Some(EscalateAfter::Attempts(1));
let mut map = HashMap::new();
map.insert("ci-failed".into(), config);
let (engine, runtime, _rx) = build(map);
let session = fake_session("s1");
let r1 = engine
.dispatch(&session, "ci-failed")
.await
.unwrap()
.unwrap();
assert!(!r1.escalated);
assert_eq!(runtime.sends().len(), 1);
let r2 = engine
.dispatch(&session, "ci-failed")
.await
.unwrap()
.unwrap();
assert!(r2.escalated);
assert_eq!(runtime.sends().len(), 1);
}
#[tokio::test]
async fn escalate_after_duration_does_not_fire_before_elapsed() {
let mut config = ReactionConfig::new(ReactionAction::SendToAgent);
config.message = Some("fix".into());
config.escalate_after = Some(EscalateAfter::Duration("10m".into()));
let mut map = HashMap::new();
map.insert("ci-failed".into(), config);
let (engine, runtime, _rx) = build(map);
let session = fake_session("s1");
for _ in 0..5 {
let r = engine
.dispatch(&session, "ci-failed")
.await
.unwrap()
.unwrap();
assert!(!r.escalated);
}
assert_eq!(runtime.sends().len(), 5);
}
#[tokio::test]
async fn escalate_after_duration_fires_once_elapsed_exceeds_threshold() {
let mut config = ReactionConfig::new(ReactionAction::Notify);
config.message = Some("stuck".into());
config.retries = None; config.escalate_after = Some(EscalateAfter::Duration("1s".into()));
let mut map = HashMap::new();
map.insert("agent-stuck".into(), config);
let (engine, _runtime, mut rx) = build(map);
let mut session = fake_session("s1");
session.status = SessionStatus::Working;
let first = engine
.dispatch(&session, "agent-stuck")
.await
.unwrap()
.unwrap();
assert!(!first.escalated);
{
let mut trackers = engine.trackers.lock().unwrap();
let key = (session.id.clone(), "agent-stuck".to_string());
let entry = trackers.get_mut(&key).expect("tracker populated");
entry.first_triggered_at = Instant::now()
.checked_sub(std::time::Duration::from_secs(2))
.expect("monotonic clock has been running >2s");
}
let second = engine
.dispatch(&session, "agent-stuck")
.await
.unwrap()
.unwrap();
assert!(second.escalated, "duration gate should have fired");
assert_eq!(second.action, ReactionAction::Notify);
let events = drain(&mut rx);
assert!(
events
.iter()
.any(|e| matches!(e, OrchestratorEvent::ReactionEscalated { .. })),
"expected ReactionEscalated, got {events:?}"
);
}
#[tokio::test]
async fn escalate_after_duration_with_garbage_string_logs_once_and_retries_gate_still_fires() {
let mut config = ReactionConfig::new(ReactionAction::Notify);
config.message = Some("stuck".into());
config.retries = Some(2);
config.escalate_after = Some(EscalateAfter::Duration("ten minutes".into()));
let mut map = HashMap::new();
map.insert("agent-stuck".into(), config);
let (engine, _runtime, _rx) = build(map);
let mut session = fake_session("s1");
session.status = SessionStatus::Working;
let r1 = engine
.dispatch(&session, "agent-stuck")
.await
.unwrap()
.unwrap();
assert!(!r1.escalated);
let r2 = engine
.dispatch(&session, "agent-stuck")
.await
.unwrap()
.unwrap();
assert!(!r2.escalated);
let r3 = engine
.dispatch(&session, "agent-stuck")
.await
.unwrap()
.unwrap();
assert!(
r3.escalated,
"retries gate must still fire even when escalate_after is garbage"
);
let warned = engine.warned_parse_failures.lock().unwrap();
assert!(warned.contains("agent-stuck.escalate_after"));
assert_eq!(
warned.len(),
1,
"only one warn should be recorded across 3 dispatches"
);
}
#[tokio::test]
async fn warn_once_parse_failure_is_idempotent_per_key() {
let (engine, _runtime, _rx) = build(HashMap::new());
engine.warn_once_parse_failure("agent-stuck", "threshold", "ten");
engine.warn_once_parse_failure("agent-stuck", "threshold", "eleven");
engine.warn_once_parse_failure("agent-stuck", "threshold", "twelve");
engine.warn_once_parse_failure("agent-stuck", "escalate_after", "frob");
let warned = engine.warned_parse_failures.lock().unwrap();
assert_eq!(warned.len(), 2);
assert!(warned.contains("agent-stuck.threshold"));
assert!(warned.contains("agent-stuck.escalate_after"));
}
#[tokio::test]
async fn clear_tracker_after_escalation_restores_real_action() {
let mut config = ReactionConfig::new(ReactionAction::SendToAgent);
config.message = Some("fix".into());
config.retries = Some(1);
let mut map = HashMap::new();
map.insert("ci-failed".into(), config);
let (engine, runtime, _rx) = build(map);
let session = fake_session("s1");
let r1 = engine
.dispatch(&session, "ci-failed")
.await
.unwrap()
.unwrap();
assert!(!r1.escalated);
let r2 = engine
.dispatch(&session, "ci-failed")
.await
.unwrap()
.unwrap();
assert!(r2.escalated);
assert_eq!(runtime.sends().len(), 1);
engine.clear_tracker(&session.id, "ci-failed");
let r3 = engine
.dispatch(&session, "ci-failed")
.await
.unwrap()
.unwrap();
assert!(r3.success);
assert!(!r3.escalated);
assert_eq!(r3.action, ReactionAction::SendToAgent);
assert_eq!(runtime.sends().len(), 2);
}
#[tokio::test]
async fn clear_all_for_session_drops_every_reaction_tracker() {
let mut ci = ReactionConfig::new(ReactionAction::SendToAgent);
ci.message = Some("fix".into());
let mut cr = ReactionConfig::new(ReactionAction::SendToAgent);
cr.message = Some("review".into());
let mut map = HashMap::new();
map.insert("ci-failed".into(), ci);
map.insert("changes-requested".into(), cr);
let (engine, _runtime, _rx) = build(map);
let a = fake_session("a");
let b = fake_session("b");
engine.dispatch(&a, "ci-failed").await.unwrap();
engine.dispatch(&a, "changes-requested").await.unwrap();
engine.dispatch(&b, "ci-failed").await.unwrap();
assert_eq!(engine.attempts(&a.id, "ci-failed"), 1);
assert_eq!(engine.attempts(&a.id, "changes-requested"), 1);
assert_eq!(engine.attempts(&b.id, "ci-failed"), 1);
engine.clear_all_for_session(&a.id);
assert_eq!(engine.attempts(&a.id, "ci-failed"), 0);
assert_eq!(engine.attempts(&a.id, "changes-requested"), 0);
assert_eq!(engine.attempts(&b.id, "ci-failed"), 1);
}
#[tokio::test]
async fn auto_false_notify_fires_once_per_transition_and_does_not_escalate() {
let mut cfg = ReactionConfig::new(ReactionAction::Notify);
cfg.auto = false;
cfg.retries = Some(0); let mut map = HashMap::new();
map.insert("approved-and-green".into(), cfg);
let (engine, _runtime, mut rx) = build(map);
let mut session = fake_session("s1");
session.status = SessionStatus::Mergeable;
for _ in 0..2 {
let r = engine
.dispatch(&session, "approved-and-green")
.await
.unwrap()
.unwrap();
assert!(r.success);
assert!(!r.escalated);
assert_eq!(r.action, ReactionAction::Notify);
}
assert_eq!(engine.attempts(&session.id, "approved-and-green"), 0);
let events = drain(&mut rx);
assert!(
!events
.iter()
.any(|e| matches!(e, OrchestratorEvent::ReactionEscalated { .. })),
"auto:false notify must not escalate, got {events:?}"
);
}
#[tokio::test]
async fn clear_tracker_resets_attempts_for_next_transition() {
let mut config = ReactionConfig::new(ReactionAction::SendToAgent);
config.message = Some("fix".into());
config.retries = Some(1);
let mut map = HashMap::new();
map.insert("ci-failed".into(), config);
let (engine, _runtime, _rx) = build(map);
let session = fake_session("s1");
engine.dispatch(&session, "ci-failed").await.unwrap();
assert_eq!(engine.attempts(&session.id, "ci-failed"), 1);
engine.clear_tracker(&session.id, "ci-failed");
assert_eq!(engine.attempts(&session.id, "ci-failed"), 0);
let r = engine
.dispatch(&session, "ci-failed")
.await
.unwrap()
.unwrap();
assert!(r.success);
assert!(!r.escalated);
}
#[tokio::test]
async fn trackers_are_scoped_per_reaction_key() {
let mut ci = ReactionConfig::new(ReactionAction::SendToAgent);
ci.message = Some("fix ci".into());
let mut cr = ReactionConfig::new(ReactionAction::SendToAgent);
cr.message = Some("address review".into());
let mut map = HashMap::new();
map.insert("ci-failed".into(), ci);
map.insert("changes-requested".into(), cr);
let (engine, _runtime, _rx) = build(map);
let session = fake_session("s1");
engine.dispatch(&session, "ci-failed").await.unwrap();
engine.dispatch(&session, "ci-failed").await.unwrap();
engine
.dispatch(&session, "changes-requested")
.await
.unwrap();
assert_eq!(engine.attempts(&session.id, "ci-failed"), 2);
assert_eq!(engine.attempts(&session.id, "changes-requested"), 1);
}
#[tokio::test]
async fn trackers_are_scoped_per_session_id() {
let mut cfg = ReactionConfig::new(ReactionAction::SendToAgent);
cfg.message = Some("fix".into());
let mut map = HashMap::new();
map.insert("ci-failed".into(), cfg);
let (engine, _runtime, _rx) = build(map);
let a = fake_session("a");
let b = fake_session("b");
engine.dispatch(&a, "ci-failed").await.unwrap();
engine.dispatch(&a, "ci-failed").await.unwrap();
engine.dispatch(&b, "ci-failed").await.unwrap();
assert_eq!(engine.attempts(&a.id, "ci-failed"), 2);
assert_eq!(engine.attempts(&b.id, "ci-failed"), 1);
}
use crate::notifier::{tests::TestNotifier, NotificationRouting, NotifierRegistry};
fn build_with_notifier(
cfg_map: HashMap<String, ReactionConfig>,
registry: NotifierRegistry,
) -> (
Arc<ReactionEngine>,
Arc<RecordingRuntime>,
broadcast::Receiver<OrchestratorEvent>,
) {
let runtime = Arc::new(RecordingRuntime::new());
let (tx, rx) = broadcast::channel(32);
let engine = Arc::new(
ReactionEngine::new(cfg_map, runtime.clone() as Arc<dyn Runtime>, tx)
.with_notifier_registry(registry),
);
(engine, runtime, rx)
}
#[tokio::test]
async fn dispatch_notify_without_registry_unchanged() {
let mut config = ReactionConfig::new(ReactionAction::Notify);
config.message = Some("approved".into());
let mut map = HashMap::new();
map.insert("approved-and-green".into(), config);
let (engine, _runtime, mut rx) = build(map);
let mut session = fake_session("s1");
session.status = SessionStatus::Mergeable;
let result = engine
.dispatch(&session, "approved-and-green")
.await
.unwrap()
.unwrap();
assert!(result.success);
assert_eq!(result.action, ReactionAction::Notify);
assert!(!result.escalated);
assert_eq!(result.message.as_deref(), Some("approved"));
let events = drain(&mut rx);
assert_eq!(events.len(), 2, "got {events:?}");
assert!(events.iter().any(|e| matches!(
e,
OrchestratorEvent::ReactionTriggered {
action: ReactionAction::Notify,
..
}
)));
assert!(events.iter().any(|e| matches!(
e,
OrchestratorEvent::UiNotification { notification } if notification.action == ReactionAction::Notify
)));
}
#[tokio::test]
async fn dispatch_notify_with_empty_routing_is_success() {
let registry = NotifierRegistry::new(NotificationRouting::default());
let config = ReactionConfig::new(ReactionAction::Notify);
let mut map = HashMap::new();
map.insert("approved-and-green".into(), config);
let (engine, _runtime, mut rx) = build_with_notifier(map, registry);
let mut session = fake_session("s1");
session.status = SessionStatus::Mergeable;
let result = engine
.dispatch(&session, "approved-and-green")
.await
.unwrap()
.unwrap();
assert!(result.success);
assert!(!result.escalated);
let events = drain(&mut rx);
assert!(events
.iter()
.any(|e| matches!(e, OrchestratorEvent::ReactionTriggered { .. })));
}
#[tokio::test]
async fn dispatch_notify_routes_to_single_plugin() {
let mut routing = HashMap::new();
routing.insert(EventPriority::Action, vec!["test".to_string()]);
let (tn, received) = TestNotifier::new("test");
let mut registry = NotifierRegistry::new(NotificationRouting::from_map(routing));
registry.register("test", Arc::new(tn));
let mut config = ReactionConfig::new(ReactionAction::Notify);
config.message = Some("PR merged".into());
let mut map = HashMap::new();
map.insert("approved-and-green".into(), config);
let (engine, _runtime, _rx) = build_with_notifier(map, registry);
let mut session = fake_session("s1");
session.status = SessionStatus::Mergeable;
let result = engine
.dispatch(&session, "approved-and-green")
.await
.unwrap()
.unwrap();
assert!(result.success);
assert_eq!(result.message.as_deref(), Some("PR merged"));
let payloads = received.lock().unwrap();
assert_eq!(payloads.len(), 1);
assert_eq!(payloads[0].reaction_key, "approved-and-green");
assert_eq!(payloads[0].priority, EventPriority::Action);
assert_eq!(payloads[0].body, "PR merged");
assert!(!payloads[0].escalated);
}
#[tokio::test]
async fn dispatch_notify_fan_out_reports_partial_failure() {
use crate::notifier::NotifierError;
struct FailNotifier;
#[async_trait::async_trait]
impl crate::notifier::Notifier for FailNotifier {
fn name(&self) -> &str {
"fail"
}
async fn send(
&self,
_payload: &NotificationPayload,
) -> std::result::Result<(), NotifierError> {
Err(NotifierError::Unavailable("offline".into()))
}
}
let mut routing = HashMap::new();
routing.insert(
EventPriority::Urgent,
vec!["ok-plugin".to_string(), "fail".to_string()],
);
let (tn, received) = TestNotifier::new("ok-plugin");
let mut registry = NotifierRegistry::new(NotificationRouting::from_map(routing));
registry.register("ok-plugin", Arc::new(tn));
registry.register("fail", Arc::new(FailNotifier));
let mut config = ReactionConfig::new(ReactionAction::Notify);
config.message = Some("something".into());
let mut map = HashMap::new();
map.insert("agent-stuck".into(), config);
let (engine, _runtime, _rx) = build_with_notifier(map, registry);
let mut session = fake_session("s1");
session.status = SessionStatus::Stuck;
let result = engine
.dispatch(&session, "agent-stuck")
.await
.unwrap()
.unwrap();
assert!(!result.success);
let msg = result.message.unwrap();
assert!(
msg.contains("fail"),
"error message should name the failing notifier, got: {msg}"
);
let payloads = received.lock().unwrap();
assert_eq!(payloads.len(), 1);
assert_eq!(payloads[0].reaction_key, "agent-stuck");
}
#[tokio::test]
async fn escalation_routes_through_notifier_registry() {
let mut routing = HashMap::new();
routing.insert(EventPriority::Urgent, vec!["test".to_string()]);
let (tn, received) = TestNotifier::new("test");
let mut registry = NotifierRegistry::new(NotificationRouting::from_map(routing));
registry.register("test", Arc::new(tn));
let mut config = ReactionConfig::new(ReactionAction::SendToAgent);
config.message = Some("fix ci".into());
config.retries = Some(1);
let mut map = HashMap::new();
map.insert("ci-failed".into(), config);
let (engine, _runtime, mut rx) = build_with_notifier(map, registry);
let session = fake_session("s1");
let r1 = engine
.dispatch(&session, "ci-failed")
.await
.unwrap()
.unwrap();
assert!(!r1.escalated);
let r2 = engine
.dispatch(&session, "ci-failed")
.await
.unwrap()
.unwrap();
assert!(r2.escalated);
assert_eq!(r2.action, ReactionAction::Notify);
let payloads = received.lock().unwrap();
assert_eq!(payloads.len(), 1);
assert!(payloads[0].escalated);
assert_eq!(payloads[0].reaction_key, "ci-failed");
assert_eq!(payloads[0].priority, EventPriority::Urgent);
let events = drain(&mut rx);
assert!(events.iter().any(|e| matches!(
e,
OrchestratorEvent::ReactionEscalated {
reaction_key,
..
} if reaction_key == "ci-failed"
)));
assert!(events.iter().any(|e| matches!(
e,
OrchestratorEvent::ReactionTriggered {
action: ReactionAction::Notify,
..
}
)));
}
#[tokio::test]
async fn auto_false_notify_still_routes_through_registry() {
let mut routing = HashMap::new();
routing.insert(EventPriority::Action, vec!["test".to_string()]);
let (tn, received) = TestNotifier::new("test");
let mut registry = NotifierRegistry::new(NotificationRouting::from_map(routing));
registry.register("test", Arc::new(tn));
let mut config = ReactionConfig::new(ReactionAction::Notify);
config.auto = false;
config.message = Some("fyi".into());
let mut map = HashMap::new();
map.insert("approved-and-green".into(), config);
let (engine, _runtime, _rx) = build_with_notifier(map, registry);
let mut session = fake_session("s1");
session.status = SessionStatus::Mergeable;
let result = engine
.dispatch(&session, "approved-and-green")
.await
.unwrap()
.unwrap();
assert!(result.success);
assert!(!result.escalated);
let payloads = received.lock().unwrap();
assert_eq!(payloads.len(), 1);
assert_eq!(payloads[0].body, "fyi");
}
}