1use std::time::{Duration, Instant};
2
3use anyhow::Result;
4use tracing::{debug, info, warn};
5
6use super::config::RoleType;
7use super::daemon::TeamDaemon;
8use super::errors::DeliveryError;
9use super::inbox;
10use super::message;
11use super::retry::{RetryConfig, retry_sync};
12use crate::tmux;
13
14pub(super) const DELIVERY_VERIFICATION_CAPTURE_LINES: u32 = 50;
15pub(super) const DELIVERY_VERIFICATION_CAPTURE_LINES_RECENTLY_READY: u32 = 100;
18pub(super) const FAILED_DELIVERY_RETRY_DELAY: Duration = Duration::from_secs(30);
19pub(super) const FAILED_DELIVERY_MAX_ATTEMPTS: u32 = 3;
20const TELEGRAM_DELIVERY_CIRCUIT_BREAKER_THRESHOLD: u32 = 5;
21const TELEGRAM_DELIVERY_CIRCUIT_BREAKER_COOLDOWN: Duration = Duration::from_secs(300);
22
23pub(super) fn is_agent_ready(pane_id: &str) -> bool {
26 match tmux::capture_pane_recent(pane_id, 20) {
27 Ok(capture) => super::watcher::is_at_agent_prompt(&capture),
28 Err(_) => false,
29 }
30}
31
32#[derive(Debug, Clone)]
33pub(super) struct PendingMessage {
34 pub(super) from: String,
35 pub(super) body: String,
36 #[allow(dead_code)] pub(super) queued_at: Instant,
38}
39
40#[derive(Debug, Clone)]
41pub(super) struct FailedDelivery {
42 pub(super) recipient: String,
43 pub(super) from: String,
44 pub(super) body: String,
45 pub(super) attempts: u32,
46 pub(super) last_attempt: Instant,
47}
48
49impl FailedDelivery {
50 pub(super) fn new(recipient: &str, from: &str, body: &str) -> Self {
51 Self {
52 recipient: recipient.to_string(),
53 from: from.to_string(),
54 body: body.to_string(),
55 attempts: 1,
56 last_attempt: Instant::now(),
57 }
58 }
59
60 pub(super) fn message_marker(&self) -> String {
61 message_delivery_marker(&self.from)
62 }
63
64 fn is_ready_for_retry(&self, now: Instant) -> bool {
65 now.duration_since(self.last_attempt) >= FAILED_DELIVERY_RETRY_DELAY
66 }
67
68 fn has_attempts_remaining(&self) -> bool {
69 self.attempts < FAILED_DELIVERY_MAX_ATTEMPTS
70 }
71}
72
73#[derive(Debug, Clone, Copy, PartialEq, Eq)]
74pub(super) enum MessageDelivery {
75 Channel,
76 LivePane,
77 InboxQueued,
78 DeferredPending,
79 SkippedUnknownRecipient,
80}
81
82impl TeamDaemon {
83 fn telegram_failure_key(recipient: &str) -> String {
84 format!("telegram-delivery-failures::{recipient}")
85 }
86
87 fn telegram_circuit_breaker_key(recipient: &str) -> String {
88 format!("telegram-delivery-breaker::{recipient}")
89 }
90
91 fn telegram_retry_config() -> RetryConfig {
92 RetryConfig {
93 max_retries: 3,
94 base_delay_ms: 100,
95 max_delay_ms: 1_000,
96 jitter: false,
97 }
98 }
99
100 fn telegram_channel_paused(&self, recipient: &str) -> bool {
101 self.intervention_cooldowns
102 .get(&Self::telegram_circuit_breaker_key(recipient))
103 .is_some_and(|opened_at| {
104 opened_at.elapsed() < TELEGRAM_DELIVERY_CIRCUIT_BREAKER_COOLDOWN
105 })
106 }
107
108 fn clear_telegram_delivery_failures(&mut self, recipient: &str) {
109 self.retry_counts
110 .remove(&Self::telegram_failure_key(recipient));
111 self.intervention_cooldowns
112 .remove(&Self::telegram_circuit_breaker_key(recipient));
113 }
114
115 fn increment_telegram_delivery_failures(&mut self, recipient: &str) -> u32 {
116 let failures = self
117 .retry_counts
118 .entry(Self::telegram_failure_key(recipient))
119 .or_insert(0);
120 *failures += 1;
121 *failures
122 }
123
124 fn alert_telegram_delivery_paused(
125 &mut self,
126 recipient: &str,
127 from: &str,
128 detail: &str,
129 ) -> Result<()> {
130 let Some(manager) = self.failed_delivery_escalation_recipient(recipient) else {
131 warn!(
132 recipient,
133 from, detail, "telegram delivery paused without escalation target"
134 );
135 return Ok(());
136 };
137
138 let body = format!(
139 "Telegram delivery paused after repeated failures.\nRecipient: {recipient}\nFrom: {from}\nLast error: {detail}"
140 );
141 let root = inbox::inboxes_root(&self.config.project_root);
142 let msg = inbox::InboxMessage::new_send("daemon", &manager, &body);
143 inbox::deliver_to_inbox(&root, &msg)?;
144 self.record_message_routed("daemon", &manager);
145 Ok(())
146 }
147
148 fn deliver_channel_message(
149 &mut self,
150 from: &str,
151 recipient: &str,
152 body: &str,
153 ) -> Result<MessageDelivery> {
154 let channel_type = self
155 .channels
156 .get(recipient)
157 .map(|channel| channel.channel_type().to_string())
158 .unwrap_or_default();
159
160 if !channel_type.starts_with("telegram") {
161 self.channels
162 .get(recipient)
163 .expect("channel presence checked by caller")
164 .send(body)?;
165 self.record_message_routed(from, recipient);
166 return Ok(MessageDelivery::Channel);
167 }
168
169 if self.telegram_channel_paused(recipient) {
170 return Err(DeliveryError::ChannelSend {
171 recipient: recipient.to_string(),
172 detail: "telegram delivery circuit breaker is open".to_string(),
173 }
174 .into());
175 }
176
177 let send_result = {
178 let channel = self
179 .channels
180 .get(recipient)
181 .expect("channel presence checked by caller");
182 retry_sync(&Self::telegram_retry_config(), || channel.send(body))
183 };
184
185 match send_result {
186 Ok(()) => {
187 self.clear_telegram_delivery_failures(recipient);
188 self.record_message_routed(from, recipient);
189 Ok(MessageDelivery::Channel)
190 }
191 Err(error) => {
192 let failure_count = self.increment_telegram_delivery_failures(recipient);
193 if failure_count >= TELEGRAM_DELIVERY_CIRCUIT_BREAKER_THRESHOLD {
194 self.intervention_cooldowns.insert(
195 Self::telegram_circuit_breaker_key(recipient),
196 Instant::now(),
197 );
198 self.alert_telegram_delivery_paused(recipient, from, &error.to_string())?;
199 }
200 Err(error.into())
201 }
202 }
203 }
204
205 fn verify_message_content_in_pane(&self, pane_id: &str, message_marker: &str) -> bool {
206 self.verify_message_content_in_pane_lines(
207 pane_id,
208 message_marker,
209 DELIVERY_VERIFICATION_CAPTURE_LINES,
210 )
211 }
212
213 fn verify_message_content_in_pane_lines(
214 &self,
215 pane_id: &str,
216 message_marker: &str,
217 capture_lines: u32,
218 ) -> bool {
219 match tmux::capture_pane_recent(pane_id, capture_lines) {
220 Ok(capture) => capture_contains_message_marker(&capture, message_marker),
221 Err(error) => {
222 warn!(
223 pane_id,
224 error = %error,
225 "failed to capture pane for content-based delivery verification"
226 );
227 false
228 }
229 }
230 }
231
232 fn record_failed_delivery(&mut self, recipient: &str, from: &str, body: &str) {
233 if let Some(existing) = self.failed_deliveries.iter_mut().find(|delivery| {
234 delivery.recipient == recipient && delivery.from == from && delivery.body == body
235 }) {
236 existing.last_attempt = Instant::now();
237 return;
238 }
239
240 self.failed_deliveries
241 .push(FailedDelivery::new(recipient, from, body));
242 self.record_delivery_failed(recipient, from, "message delivery failed after retries");
243 }
244
245 fn clear_failed_delivery(&mut self, recipient: &str, from: &str, body: &str) {
246 self.failed_deliveries.retain(|delivery| {
247 delivery.recipient != recipient || delivery.from != from || delivery.body != body
248 });
249 }
250
251 fn failed_delivery_escalation_recipient(&self, recipient: &str) -> Option<String> {
252 self.config
253 .members
254 .iter()
255 .find(|member| member.name == recipient)
256 .and_then(|member| member.reports_to.clone())
257 .or_else(|| {
258 self.config
259 .members
260 .iter()
261 .find(|member| {
262 member.role_type == RoleType::Manager && member.name != recipient
263 })
264 .map(|member| member.name.clone())
265 })
266 .or_else(|| {
267 let sender = self.automation_sender_for(recipient);
268 (sender != recipient
269 && self
270 .config
271 .members
272 .iter()
273 .any(|member| member.name == sender))
274 .then_some(sender)
275 })
276 }
277
278 fn escalate_failed_delivery(&mut self, delivery: &FailedDelivery) -> Result<()> {
279 let Some(manager) = self.failed_delivery_escalation_recipient(&delivery.recipient) else {
280 warn!(
281 recipient = %delivery.recipient,
282 from = %delivery.from,
283 "failed delivery exhausted retries without escalation target"
284 );
285 return Ok(());
286 };
287
288 let body = format!(
289 "Live message delivery failed after {} attempts.\nRecipient: {}\nFrom: {}\nMarker: {}\nMessage body:\n{}",
290 delivery.attempts,
291 delivery.recipient,
292 delivery.from,
293 delivery.message_marker(),
294 delivery.body
295 );
296 let root = inbox::inboxes_root(&self.config.project_root);
297 let msg = inbox::InboxMessage::new_send("daemon", &manager, &body);
298 inbox::deliver_to_inbox(&root, &msg)?;
299 self.record_message_routed("daemon", &manager);
300 warn!(
301 recipient = %delivery.recipient,
302 from = %delivery.from,
303 escalation_target = %manager,
304 attempts = delivery.attempts,
305 "failed delivery escalated to manager inbox"
306 );
307 Ok(())
308 }
309
310 pub(super) fn retry_failed_deliveries(&mut self) -> Result<()> {
311 if self.failed_deliveries.is_empty() {
312 return Ok(());
313 }
314
315 let now = Instant::now();
316 let pending = std::mem::take(&mut self.failed_deliveries);
317 for mut delivery in pending {
318 if !delivery.is_ready_for_retry(now) {
319 self.failed_deliveries.push(delivery);
320 continue;
321 }
322
323 let is_ready = self
324 .watchers
325 .get(&delivery.recipient)
326 .map(|watcher| {
327 matches!(
328 watcher.state,
329 super::watcher::WatcherState::Ready | super::watcher::WatcherState::Idle
330 )
331 })
332 .unwrap_or(true);
333 if !is_ready {
334 self.failed_deliveries.push(delivery);
335 continue;
336 }
337
338 let Some(pane_id) = self.config.pane_map.get(&delivery.recipient).cloned() else {
339 self.escalate_failed_delivery(&delivery)?;
340 continue;
341 };
342
343 delivery.attempts += 1;
344 delivery.last_attempt = now;
345 info!(
346 recipient = %delivery.recipient,
347 from = %delivery.from,
348 attempts = delivery.attempts,
349 "retrying failed live delivery"
350 );
351
352 let injected = match message::inject_message(&pane_id, &delivery.from, &delivery.body) {
353 Ok(()) => true,
354 Err(error) => {
355 warn!(
356 recipient = %delivery.recipient,
357 from = %delivery.from,
358 attempts = delivery.attempts,
359 error = %error,
360 "failed to re-inject message during delivery retry"
361 );
362 false
363 }
364 };
365
366 if injected
367 && self.verify_message_delivered(
368 &delivery.from,
369 &delivery.recipient,
370 &delivery.body,
371 3,
372 false,
373 )
374 {
375 continue;
376 }
377
378 if delivery.has_attempts_remaining() {
379 self.failed_deliveries.push(delivery);
380 } else {
381 let agent_still_starting = self
384 .watchers
385 .get(&delivery.recipient)
386 .is_some_and(|w| !w.is_ready_for_delivery());
387 if agent_still_starting {
388 debug!(
389 recipient = %delivery.recipient,
390 "agent still starting; suppressing escalation"
391 );
392 self.failed_deliveries.push(delivery);
393 } else {
394 self.escalate_failed_delivery(&delivery)?;
395 }
396 }
397 }
398
399 Ok(())
400 }
401
402 fn verify_message_delivered(
403 &mut self,
404 from: &str,
405 recipient: &str,
406 body: &str,
407 max_attempts: u32,
408 record_failure: bool,
409 ) -> bool {
410 let Some(pane_id) = self.config.pane_map.get(recipient).cloned() else {
411 return true;
412 };
413 let message_marker = message_delivery_marker(from);
414
415 for attempt in 1..=max_attempts {
416 std::thread::sleep(Duration::from_secs(2));
417
418 if self.verify_message_content_in_pane(&pane_id, &message_marker) {
419 self.clear_failed_delivery(recipient, from, body);
420 debug!(
421 recipient,
422 attempt,
423 marker = %message_marker,
424 "message delivery verified: marker found in pane"
425 );
426 return true;
427 }
428
429 if self.agent_went_active_after_injection(&pane_id, recipient) {
432 self.clear_failed_delivery(recipient, from, body);
433 info!(
434 recipient,
435 attempt,
436 marker = %message_marker,
437 "message delivery inferred: agent active after injection (marker scrolloff)"
438 );
439 return true;
440 }
441
442 warn!(
443 recipient,
444 attempt,
445 marker = %message_marker,
446 "message marker missing after injection; resending Enter"
447 );
448 if let Err(error) = tmux::send_keys(&pane_id, "", true) {
449 warn!(recipient, error = %error, "failed to resend Enter");
450 }
451 }
452
453 if record_failure {
454 self.record_failed_delivery(recipient, from, body);
455 warn!(
456 recipient,
457 max_attempts,
458 marker = %message_marker,
459 "message delivery failed after retries; queued for daemon retry"
460 );
461 }
462
463 false
464 }
465
466 fn verify_message_delivered_with_lines(
467 &mut self,
468 from: &str,
469 recipient: &str,
470 body: &str,
471 max_attempts: u32,
472 record_failure: bool,
473 capture_lines: u32,
474 ) -> bool {
475 let Some(pane_id) = self.config.pane_map.get(recipient).cloned() else {
476 return true;
477 };
478 let message_marker = message_delivery_marker(from);
479
480 for attempt in 1..=max_attempts {
481 std::thread::sleep(Duration::from_secs(2));
482
483 if self.verify_message_content_in_pane_lines(&pane_id, &message_marker, capture_lines) {
484 self.clear_failed_delivery(recipient, from, body);
485 debug!(
486 recipient,
487 attempt,
488 capture_lines,
489 marker = %message_marker,
490 "message delivery verified: marker found in pane"
491 );
492 return true;
493 }
494
495 if self.agent_went_active_after_injection(&pane_id, recipient) {
498 self.clear_failed_delivery(recipient, from, body);
499 info!(
500 recipient,
501 attempt,
502 capture_lines,
503 marker = %message_marker,
504 "message delivery inferred: agent active after injection (marker scrolloff)"
505 );
506 return true;
507 }
508
509 warn!(
510 recipient,
511 attempt,
512 marker = %message_marker,
513 "message marker missing after injection; resending Enter"
514 );
515 if let Err(error) = tmux::send_keys(&pane_id, "", true) {
516 warn!(recipient, error = %error, "failed to resend Enter");
517 }
518 }
519
520 if record_failure {
521 self.record_failed_delivery(recipient, from, body);
522 warn!(
523 recipient,
524 max_attempts,
525 marker = %message_marker,
526 "message delivery failed after retries; queued for daemon retry"
527 );
528 }
529
530 false
531 }
532
533 fn check_agent_ready(&mut self, recipient: &str, pane_id: &str) -> bool {
544 if self
546 .watchers
547 .get(recipient)
548 .is_some_and(|w| w.is_ready_for_delivery())
549 {
550 return true;
551 }
552
553 if is_agent_ready(pane_id) {
555 if let Some(watcher) = self.watchers.get_mut(recipient) {
556 watcher.confirm_ready();
557 }
558 info!(
559 recipient,
560 pane_id, "agent readiness confirmed via live check"
561 );
562 return true;
563 }
564
565 debug!(recipient, pane_id, "agent not ready; deferring delivery");
566 false
567 }
568
569 fn delivery_capture_lines_for(&self, recipient: &str) -> u32 {
573 let recently_ready = self
574 .watchers
575 .get(recipient)
576 .is_some_and(|w| matches!(w.state, super::watcher::WatcherState::Ready));
577 if recently_ready {
578 DELIVERY_VERIFICATION_CAPTURE_LINES_RECENTLY_READY
579 } else {
580 DELIVERY_VERIFICATION_CAPTURE_LINES
581 }
582 }
583
584 fn agent_went_active_after_injection(&self, pane_id: &str, recipient: &str) -> bool {
590 let was_ready = self
593 .watchers
594 .get(recipient)
595 .is_some_and(|w| w.is_ready_for_delivery());
596 if !was_ready {
597 return false;
598 }
599 !is_agent_ready(pane_id)
601 }
602
603 pub(super) fn drain_pending_queue(&mut self, recipient: &str) -> Result<()> {
606 let messages = self
607 .pending_delivery_queue
608 .remove(recipient)
609 .unwrap_or_default();
610 if messages.is_empty() {
611 return Ok(());
612 }
613 info!(
614 recipient,
615 count = messages.len(),
616 "draining pending delivery queue after agent became ready"
617 );
618 for msg in messages {
619 self.deliver_message(&msg.from, recipient, &msg.body)?;
620 }
621 Ok(())
622 }
623
624 pub(super) fn queue_daemon_message(
625 &mut self,
626 recipient: &str,
627 body: &str,
628 ) -> Result<MessageDelivery> {
629 let visible_sender = self.automation_sender_for(recipient);
630 self.deliver_message(&visible_sender, recipient, body)
631 }
632
633 pub(super) fn queue_message(&mut self, from: &str, recipient: &str, body: &str) -> Result<()> {
634 self.deliver_message(from, recipient, body).map(|_| ())
635 }
636
637 fn deliver_message(
638 &mut self,
639 from: &str,
640 recipient: &str,
641 body: &str,
642 ) -> Result<MessageDelivery> {
643 if let Some(channel) = self.channels.get(recipient) {
644 let _ = channel;
645 return self.deliver_channel_message(from, recipient, body);
646 }
647
648 let known_recipient = self.config.pane_map.contains_key(recipient)
649 || self
650 .config
651 .members
652 .iter()
653 .any(|member| member.name == recipient);
654 if !known_recipient {
655 debug!(from, recipient, "skipping message for unknown recipient");
656 return Ok(MessageDelivery::SkippedUnknownRecipient);
657 }
658
659 if let Some(pane_id) = self.config.pane_map.get(recipient).cloned() {
660 if !self.check_agent_ready(recipient, &pane_id) {
662 let never_been_ready = self
667 .watchers
668 .get(recipient)
669 .is_some_and(|w| !w.is_ready_for_delivery());
670 if never_been_ready {
671 info!(
672 from,
673 to = recipient,
674 pane_id = pane_id.as_str(),
675 "agent still starting; deferring to pending queue"
676 );
677 self.pending_delivery_queue
678 .entry(recipient.to_string())
679 .or_default()
680 .push(PendingMessage {
681 from: from.to_string(),
682 body: body.to_string(),
683 queued_at: Instant::now(),
684 });
685 return Ok(MessageDelivery::DeferredPending);
686 }
687 info!(
688 from,
689 to = recipient,
690 pane_id = pane_id.as_str(),
691 "agent not ready after timeout; deferring to inbox"
692 );
693 } else {
695 match message::inject_message(&pane_id, from, body) {
696 Ok(()) => {
697 self.record_message_routed(from, recipient);
698 let capture_lines = self.delivery_capture_lines_for(recipient);
699 self.verify_message_delivered_with_lines(
700 from,
701 recipient,
702 body,
703 3,
704 true,
705 capture_lines,
706 );
707 return Ok(MessageDelivery::LivePane);
708 }
709 Err(error) => {
710 warn!(
711 from,
712 to = recipient,
713 pane_id = pane_id.as_str(),
714 error = %error,
715 "live message delivery failed; queueing to inbox"
716 );
717 let _typed_error = DeliveryError::PaneInject {
718 recipient: recipient.to_string(),
719 pane_id,
720 detail: error.to_string(),
721 };
722 }
723 }
724 }
725 }
726
727 let root = inbox::inboxes_root(&self.config.project_root);
728 let msg = inbox::InboxMessage::new_send(from, recipient, body);
729 inbox::deliver_to_inbox(&root, &msg).map_err(|error| DeliveryError::InboxQueue {
730 recipient: recipient.to_string(),
731 detail: error.to_string(),
732 })?;
733 self.record_message_routed(from, recipient);
734 Ok(MessageDelivery::InboxQueued)
735 }
736
737 pub(super) fn drain_legacy_command_queue(&mut self) -> Result<()> {
738 let queue_path = message::command_queue_path(&self.config.project_root);
739 let commands = message::read_command_queue(&queue_path)?;
740 if commands.is_empty() {
741 return Ok(());
742 }
743
744 let root = inbox::inboxes_root(&self.config.project_root);
745 let mut remaining_commands = Vec::new();
746 for cmd in commands {
747 let result: Result<()> = (|| match &cmd {
748 message::QueuedCommand::Send {
749 from,
750 to,
751 message: msg,
752 } => {
753 let is_user =
754 self.config.team_config.roles.iter().any(|role| {
755 role.name == to.as_str() && role.role_type == RoleType::User
756 });
757
758 if is_user {
759 if let Some(channel) = self.channels.get(to.as_str()) {
760 let formatted = format!("[From {from}]\n{msg}");
761 let _ = channel;
762 self.deliver_channel_message(from, to, &formatted)?;
763 }
764 } else {
765 let inbox_msg = inbox::InboxMessage::new_send(from, to, msg);
766 inbox::deliver_to_inbox(&root, &inbox_msg)?;
767 debug!(from, to, "legacy command routed to inbox");
768 }
769 Ok(())
770 }
771 message::QueuedCommand::Assign {
772 from,
773 engineer,
774 task,
775 } => {
776 let msg = inbox::InboxMessage::new_assign(from, engineer, task);
777 inbox::deliver_to_inbox(&root, &msg)?;
778 debug!(engineer, "legacy assign routed to inbox");
779 Ok(())
780 }
781 })();
782
783 if let Err(error) = result {
784 warn!(error = %error, "failed to process legacy command; preserving in queue");
785 remaining_commands.push(cmd);
786 }
787 }
788
789 message::write_command_queue(&queue_path, &remaining_commands)?;
790 Ok(())
791 }
792
793 pub(super) fn deliver_inbox_messages(&mut self) -> Result<()> {
794 let root = inbox::inboxes_root(&self.config.project_root);
795 let member_names: Vec<String> = self.config.pane_map.keys().cloned().collect();
796
797 for name in &member_names {
798 let is_ready = self
799 .watchers
800 .get(name)
801 .map(|watcher| {
802 matches!(
803 watcher.state,
804 super::watcher::WatcherState::Ready | super::watcher::WatcherState::Idle
805 )
806 })
807 .unwrap_or(true);
808
809 if !is_ready {
810 continue;
811 }
812
813 let messages = match inbox::pending_messages(&root, name) {
814 Ok(msgs) => msgs,
815 Err(error) => {
816 debug!(member = %name, error = %error, "failed to read inbox");
817 continue;
818 }
819 };
820
821 if messages.is_empty() {
822 continue;
823 }
824
825 let Some(pane_id) = self.config.pane_map.get(name).cloned() else {
826 continue;
827 };
828
829 let mut delivered_any = false;
830 for msg in &messages {
831 let from_role = self.resolve_role_name(&msg.from);
832 let to_role = self.resolve_role_name(name);
833 if !self.config.team_config.can_talk(&from_role, &to_role) {
834 warn!(
835 from = %msg.from, from_role, to = %name, to_role,
836 "blocked message: routing not allowed"
837 );
838 let _ = inbox::mark_delivered(&root, name, &msg.id);
839 continue;
840 }
841
842 let is_send = matches!(msg.msg_type, inbox::MessageType::Send);
843 let delivery_result = match msg.msg_type {
844 inbox::MessageType::Send => {
845 info!(from = %msg.from, to = %name, id = %msg.id, "delivering inbox message");
846 message::inject_message(&pane_id, &msg.from, &msg.body)
847 }
848 inbox::MessageType::Assign => {
849 info!(to = %name, id = %msg.id, "delivering inbox assignment");
850 self.manual_assign_cooldowns
851 .insert(name.to_string(), Instant::now());
852 self.assign_task(name, &msg.body).map(|launch| {
853 self.record_assignment_success(name, &msg.id, &msg.body, &launch);
854 self.notify_assignment_sender_success(
855 &msg.from, name, &msg.id, &msg.body, &launch,
856 );
857 })
858 }
859 };
860
861 let mut mark_delivered = false;
862 match delivery_result {
863 Ok(()) => {
864 delivered_any = true;
865 mark_delivered = true;
866 if is_send {
867 self.verify_message_delivered(&msg.from, name, &msg.body, 3, true);
868 }
869 }
870 Err(error) => {
871 warn!(
872 from = %msg.from,
873 to = %name,
874 id = %msg.id,
875 error = %error,
876 "failed to deliver inbox message"
877 );
878 if matches!(msg.msg_type, inbox::MessageType::Assign) {
879 mark_delivered = true;
880 self.record_assignment_failure(name, &msg.id, &msg.body, &error);
881 self.notify_assignment_sender_failure(
882 &msg.from, name, &msg.id, &msg.body, &error,
883 );
884 }
885 }
886 }
887
888 if !mark_delivered {
889 continue;
890 }
891
892 if let Err(error) = inbox::mark_delivered(&root, name, &msg.id) {
893 warn!(
894 member = %name,
895 id = %msg.id,
896 error = %error,
897 "failed to mark delivered"
898 );
899 } else {
900 self.record_message_routed(&msg.from, name);
901 }
902
903 std::thread::sleep(Duration::from_secs(1));
904 }
905
906 if delivered_any {
907 self.mark_member_working(name);
908 }
909 }
910
911 Ok(())
912 }
913
914 fn resolve_role_name(&self, member_name: &str) -> String {
915 if member_name == "human" || member_name == "daemon" {
916 return member_name.to_string();
917 }
918 self.config
919 .members
920 .iter()
921 .find(|member| member.name == member_name)
922 .map(|member| member.role_name.clone())
923 .unwrap_or_else(|| member_name.to_string())
924 }
925}
926
927pub(super) fn message_delivery_marker(sender: &str) -> String {
928 format!("--- Message from {sender} ---")
929}
930
931pub(super) fn capture_contains_message_marker(capture: &str, message_marker: &str) -> bool {
932 capture.contains(message_marker)
933}
934
935#[cfg(test)]
936mod tests {
937 use super::*;
938 use std::collections::VecDeque;
939 use std::collections::{HashMap, HashSet};
940 use std::io;
941 use std::sync::{Arc, Mutex};
942
943 use crate::team::AssignmentResultStatus;
944 use crate::team::comms::Channel;
945 use crate::team::config::OrchestratorPosition;
946 use crate::team::config::{
947 AutomationConfig, BoardConfig, ChannelConfig, RoleDef, StandupConfig, WorkflowMode,
948 WorkflowPolicy,
949 };
950 use crate::team::daemon::{DaemonConfig, TeamDaemon};
951 use crate::team::errors::DeliveryError;
952 use crate::team::events::EventSink;
953 use crate::team::failure_patterns::FailureTracker;
954 use crate::team::hierarchy::MemberInstance;
955
956 struct RecordingChannel {
957 messages: Arc<Mutex<Vec<String>>>,
958 }
959
960 impl Channel for RecordingChannel {
961 fn send(&self, message: &str) -> std::result::Result<(), DeliveryError> {
962 self.messages.lock().unwrap().push(message.to_string());
963 Ok(())
964 }
965
966 fn channel_type(&self) -> &str {
967 "test"
968 }
969 }
970
971 struct FailingChannel;
972
973 impl Channel for FailingChannel {
974 fn send(&self, _message: &str) -> std::result::Result<(), DeliveryError> {
975 Err(DeliveryError::ChannelSend {
976 recipient: "test-recipient".to_string(),
977 detail: "synthetic channel failure".to_string(),
978 })
979 }
980
981 fn channel_type(&self) -> &str {
982 "test-failing"
983 }
984 }
985
986 struct SequencedTelegramChannel {
987 results: Arc<Mutex<VecDeque<std::result::Result<(), DeliveryError>>>>,
988 attempts: Arc<Mutex<u32>>,
989 }
990
991 impl Channel for SequencedTelegramChannel {
992 fn send(&self, _message: &str) -> std::result::Result<(), DeliveryError> {
993 *self.attempts.lock().unwrap() += 1;
994 self.results.lock().unwrap().pop_front().unwrap_or(Ok(()))
995 }
996
997 fn channel_type(&self) -> &str {
998 "telegram-test"
999 }
1000 }
1001
1002 struct FailingWriter;
1003
1004 impl io::Write for FailingWriter {
1005 fn write(&mut self, _buf: &[u8]) -> io::Result<usize> {
1006 Err(io::Error::other("synthetic event sink failure"))
1007 }
1008
1009 fn flush(&mut self) -> io::Result<()> {
1010 Err(io::Error::other("synthetic event sink failure"))
1011 }
1012 }
1013
1014 fn empty_legacy_daemon(tmp: &tempfile::TempDir) -> TeamDaemon {
1015 TeamDaemon {
1016 config: DaemonConfig {
1017 project_root: tmp.path().to_path_buf(),
1018 team_config: super::super::config::TeamConfig {
1019 name: "test".to_string(),
1020 agent: None,
1021 workflow_mode: WorkflowMode::Legacy,
1022 workflow_policy: WorkflowPolicy::default(),
1023 board: BoardConfig::default(),
1024 standup: StandupConfig::default(),
1025 automation: AutomationConfig::default(),
1026 automation_sender: None,
1027 external_senders: Vec::new(),
1028 orchestrator_pane: true,
1029 orchestrator_position: OrchestratorPosition::Bottom,
1030 layout: None,
1031 cost: Default::default(),
1032 event_log_max_bytes: crate::team::DEFAULT_EVENT_LOG_MAX_BYTES,
1033 retro_min_duration_secs: 60,
1034 roles: Vec::new(),
1035 },
1036 session: "test".to_string(),
1037 members: Vec::new(),
1038 pane_map: HashMap::new(),
1039 },
1040 watchers: HashMap::new(),
1041 states: HashMap::new(),
1042 idle_started_at: HashMap::new(),
1043 active_tasks: HashMap::new(),
1044 retry_counts: HashMap::new(),
1045 dispatch_queue: Vec::new(),
1046 triage_idle_epochs: HashMap::new(),
1047 triage_interventions: HashMap::new(),
1048 owned_task_interventions: HashMap::new(),
1049 intervention_cooldowns: HashMap::new(),
1050 channels: HashMap::new(),
1051 nudges: HashMap::new(),
1052 telegram_bot: None,
1053 failure_tracker: FailureTracker::new(20),
1054 event_sink: EventSink::new(&tmp.path().join("events.jsonl")).unwrap(),
1055 paused_standups: HashSet::new(),
1056 last_standup: HashMap::new(),
1057 last_board_rotation: Instant::now(),
1058 last_auto_archive: Instant::now(),
1059 last_auto_dispatch: Instant::now(),
1060 pipeline_starvation_fired: false,
1061 pipeline_starvation_last_fired: None,
1062 retro_generated: false,
1063 failed_deliveries: Vec::new(),
1064 review_first_seen: HashMap::new(),
1065 review_nudge_sent: HashSet::new(),
1066 poll_interval: Duration::from_secs(5),
1067 is_git_repo: false,
1068 subsystem_error_counts: HashMap::new(),
1069 auto_merge_overrides: HashMap::new(),
1070 recent_dispatches: HashMap::new(),
1071 telemetry_db: None,
1072 manual_assign_cooldowns: HashMap::new(),
1073 backend_health: HashMap::new(),
1074 last_health_check: Instant::now(),
1075 last_uncommitted_warn: HashMap::new(),
1076 pending_delivery_queue: HashMap::new(),
1077 }
1078 }
1079
1080 fn failed_delivery_test_daemon(tmp: &tempfile::TempDir) -> TeamDaemon {
1081 let manager = MemberInstance {
1082 name: "manager".to_string(),
1083 role_name: "manager".to_string(),
1084 role_type: RoleType::Manager,
1085 agent: Some("claude".to_string()),
1086 prompt: None,
1087 reports_to: Some("architect".to_string()),
1088 use_worktrees: false,
1089 };
1090 let engineer = MemberInstance {
1091 name: "eng-1".to_string(),
1092 role_name: "eng".to_string(),
1093 role_type: RoleType::Engineer,
1094 agent: Some("codex".to_string()),
1095 prompt: None,
1096 reports_to: Some("manager".to_string()),
1097 use_worktrees: false,
1098 };
1099 let architect = MemberInstance {
1100 name: "architect".to_string(),
1101 role_name: "architect".to_string(),
1102 role_type: RoleType::Architect,
1103 agent: Some("claude".to_string()),
1104 prompt: None,
1105 reports_to: None,
1106 use_worktrees: false,
1107 };
1108
1109 TeamDaemon {
1110 config: DaemonConfig {
1111 project_root: tmp.path().to_path_buf(),
1112 team_config: super::super::config::TeamConfig {
1113 name: "test".to_string(),
1114 agent: None,
1115 workflow_mode: WorkflowMode::Legacy,
1116 workflow_policy: WorkflowPolicy::default(),
1117 board: BoardConfig::default(),
1118 standup: StandupConfig::default(),
1119 automation: AutomationConfig::default(),
1120 automation_sender: None,
1121 external_senders: Vec::new(),
1122 orchestrator_pane: true,
1123 orchestrator_position: OrchestratorPosition::Bottom,
1124 layout: None,
1125 cost: Default::default(),
1126 event_log_max_bytes: crate::team::DEFAULT_EVENT_LOG_MAX_BYTES,
1127 retro_min_duration_secs: 60,
1128 roles: Vec::new(),
1129 },
1130 session: "test".to_string(),
1131 members: vec![architect, manager, engineer],
1132 pane_map: HashMap::from([("eng-1".to_string(), "%9999999".to_string())]),
1133 },
1134 ..empty_legacy_daemon(tmp)
1135 }
1136 }
1137
1138 #[test]
1139 fn queue_daemon_message_routes_to_channel_for_user_roles() {
1140 let tmp = tempfile::tempdir().unwrap();
1141 let mut daemon = empty_legacy_daemon(&tmp);
1142 let sent = Arc::new(Mutex::new(Vec::new()));
1143 daemon.channels.insert(
1144 "human".to_string(),
1145 Box::new(RecordingChannel {
1146 messages: Arc::clone(&sent),
1147 }),
1148 );
1149
1150 daemon
1151 .queue_daemon_message("human", "Assignment delivered.")
1152 .unwrap();
1153
1154 assert_eq!(sent.lock().unwrap().as_slice(), ["Assignment delivered."]);
1155 }
1156
1157 #[test]
1158 fn queue_daemon_message_ignores_event_sink_failure() {
1159 let tmp = tempfile::tempdir().unwrap();
1160 let mut daemon = empty_legacy_daemon(&tmp);
1161 daemon.event_sink = EventSink::from_writer(
1162 tmp.path().join("broken-events.jsonl").as_path(),
1163 FailingWriter,
1164 );
1165
1166 let sent = Arc::new(Mutex::new(Vec::new()));
1167 daemon.channels.insert(
1168 "human".to_string(),
1169 Box::new(RecordingChannel {
1170 messages: Arc::clone(&sent),
1171 }),
1172 );
1173
1174 daemon
1175 .queue_daemon_message("human", "Event sink can fail without breaking delivery.")
1176 .unwrap();
1177
1178 assert_eq!(
1179 sent.lock().unwrap().as_slice(),
1180 ["Event sink can fail without breaking delivery."]
1181 );
1182 }
1183
1184 #[test]
1185 fn telegram_delivery_retries_transient_channel_failures() {
1186 let tmp = tempfile::tempdir().unwrap();
1187 let mut daemon = empty_legacy_daemon(&tmp);
1188 let attempts = Arc::new(Mutex::new(0));
1189 daemon.channels.insert(
1190 "human".to_string(),
1191 Box::new(SequencedTelegramChannel {
1192 results: Arc::new(Mutex::new(VecDeque::from([
1193 Err(DeliveryError::ChannelSend {
1194 recipient: "human".to_string(),
1195 detail: "429 too many requests".to_string(),
1196 }),
1197 Err(DeliveryError::ChannelSend {
1198 recipient: "human".to_string(),
1199 detail: "timeout while sending".to_string(),
1200 }),
1201 Ok(()),
1202 ]))),
1203 attempts: Arc::clone(&attempts),
1204 }),
1205 );
1206
1207 daemon
1208 .queue_daemon_message("human", "Assignment delivered.")
1209 .unwrap();
1210
1211 assert_eq!(*attempts.lock().unwrap(), 3);
1212 assert_eq!(
1213 daemon
1214 .retry_counts
1215 .get(&TeamDaemon::telegram_failure_key("human")),
1216 None
1217 );
1218 }
1219
1220 #[test]
1221 fn telegram_delivery_circuit_breaker_alerts_manager_after_repeated_failures() {
1222 let tmp = tempfile::tempdir().unwrap();
1223 let manager = MemberInstance {
1224 name: "manager".to_string(),
1225 role_name: "manager".to_string(),
1226 role_type: RoleType::Manager,
1227 agent: Some("claude".to_string()),
1228 prompt: None,
1229 reports_to: Some("architect".to_string()),
1230 use_worktrees: false,
1231 };
1232 let mut daemon = TeamDaemon {
1233 config: DaemonConfig {
1234 project_root: tmp.path().to_path_buf(),
1235 team_config: super::super::config::TeamConfig {
1236 name: "test".to_string(),
1237 agent: None,
1238 workflow_mode: WorkflowMode::Legacy,
1239 workflow_policy: WorkflowPolicy::default(),
1240 board: BoardConfig::default(),
1241 standup: StandupConfig::default(),
1242 automation: AutomationConfig::default(),
1243 automation_sender: None,
1244 external_senders: Vec::new(),
1245 orchestrator_pane: true,
1246 orchestrator_position: OrchestratorPosition::Bottom,
1247 layout: None,
1248 cost: Default::default(),
1249 event_log_max_bytes: crate::team::DEFAULT_EVENT_LOG_MAX_BYTES,
1250 retro_min_duration_secs: 60,
1251 roles: Vec::new(),
1252 },
1253 session: "test".to_string(),
1254 members: vec![manager],
1255 pane_map: HashMap::new(),
1256 },
1257 ..empty_legacy_daemon(&tmp)
1258 };
1259 let attempts = Arc::new(Mutex::new(0));
1260 daemon.channels.insert(
1261 "human".to_string(),
1262 Box::new(SequencedTelegramChannel {
1263 results: Arc::new(Mutex::new(VecDeque::from(
1264 (0..32)
1265 .map(|_| {
1266 Err(DeliveryError::ChannelSend {
1267 recipient: "human".to_string(),
1268 detail: "429 too many requests".to_string(),
1269 })
1270 })
1271 .collect::<Vec<_>>(),
1272 ))),
1273 attempts: Arc::clone(&attempts),
1274 }),
1275 );
1276
1277 for _ in 0..TELEGRAM_DELIVERY_CIRCUIT_BREAKER_THRESHOLD {
1278 assert!(
1279 daemon
1280 .queue_daemon_message("human", "Still failing")
1281 .is_err()
1282 );
1283 }
1284
1285 assert!(daemon.telegram_channel_paused("human"));
1286 let pending = inbox::pending_messages(&inbox::inboxes_root(tmp.path()), "manager").unwrap();
1287 assert_eq!(pending.len(), 1);
1288 assert!(pending[0].body.contains("Telegram delivery paused"));
1289
1290 let before = *attempts.lock().unwrap();
1291 assert!(
1292 daemon
1293 .queue_daemon_message("human", "Breaker open")
1294 .is_err()
1295 );
1296 assert_eq!(*attempts.lock().unwrap(), before);
1297 }
1298
1299 #[test]
1300 fn drain_legacy_command_queue_preserves_failed_commands() {
1301 let tmp = tempfile::tempdir().unwrap();
1302 let queue_path = message::command_queue_path(tmp.path());
1303 message::enqueue_command(
1304 &queue_path,
1305 &message::QueuedCommand::Send {
1306 from: "architect".into(),
1307 to: "human".into(),
1308 message: "status".into(),
1309 },
1310 )
1311 .unwrap();
1312 message::enqueue_command(
1313 &queue_path,
1314 &message::QueuedCommand::Assign {
1315 from: "manager".into(),
1316 engineer: "eng-1".into(),
1317 task: "Task #7: recover".into(),
1318 },
1319 )
1320 .unwrap();
1321
1322 let mut daemon = TeamDaemon {
1323 config: DaemonConfig {
1324 project_root: tmp.path().to_path_buf(),
1325 team_config: super::super::config::TeamConfig {
1326 name: "test".to_string(),
1327 agent: None,
1328 workflow_mode: WorkflowMode::Legacy,
1329 workflow_policy: WorkflowPolicy::default(),
1330 board: BoardConfig::default(),
1331 standup: StandupConfig::default(),
1332 automation: AutomationConfig::default(),
1333 automation_sender: None,
1334 external_senders: Vec::new(),
1335 orchestrator_pane: true,
1336 orchestrator_position: OrchestratorPosition::Bottom,
1337 layout: None,
1338 cost: Default::default(),
1339 event_log_max_bytes: crate::team::DEFAULT_EVENT_LOG_MAX_BYTES,
1340 retro_min_duration_secs: 60,
1341 roles: vec![RoleDef {
1342 name: "human".to_string(),
1343 role_type: RoleType::User,
1344 agent: None,
1345 instances: 1,
1346 prompt: None,
1347 talks_to: vec![],
1348 channel: Some("telegram".to_string()),
1349 channel_config: Some(ChannelConfig {
1350 target: "123".to_string(),
1351 provider: "fake".to_string(),
1352 bot_token: None,
1353 allowed_user_ids: vec![],
1354 }),
1355 nudge_interval_secs: None,
1356 receives_standup: None,
1357 standup_interval_secs: None,
1358 owns: Vec::new(),
1359 use_worktrees: false,
1360 }],
1361 },
1362 session: "test".to_string(),
1363 members: vec![MemberInstance {
1364 name: "eng-1".to_string(),
1365 role_name: "eng-1".to_string(),
1366 role_type: RoleType::Engineer,
1367 agent: Some("claude".to_string()),
1368 prompt: None,
1369 reports_to: None,
1370 use_worktrees: false,
1371 }],
1372 pane_map: HashMap::new(),
1373 },
1374 channels: HashMap::from([(
1375 "human".to_string(),
1376 Box::new(FailingChannel) as Box<dyn Channel>,
1377 )]),
1378 ..empty_legacy_daemon(&tmp)
1379 };
1380
1381 daemon.drain_legacy_command_queue().unwrap();
1382
1383 let remaining = message::read_command_queue(&queue_path).unwrap();
1384 assert_eq!(remaining.len(), 1);
1385 match &remaining[0] {
1386 message::QueuedCommand::Send { to, message, .. } => {
1387 assert_eq!(to, "human");
1388 assert_eq!(message, "status");
1389 }
1390 other => panic!("expected failed send command to remain queued, got {other:?}"),
1391 }
1392
1393 let engineer_pending =
1394 inbox::pending_messages(&inbox::inboxes_root(tmp.path()), "eng-1").unwrap();
1395 assert_eq!(engineer_pending.len(), 1);
1396 assert_eq!(engineer_pending[0].from, "manager");
1397 assert!(engineer_pending[0].body.contains("Task #7: recover"));
1398 }
1399
1400 #[test]
1401 fn deliver_inbox_messages_reports_failed_assignment_without_crashing() {
1402 let tmp = tempfile::tempdir().unwrap();
1403 let roles = vec![
1404 RoleDef {
1405 name: "manager".to_string(),
1406 role_type: RoleType::Manager,
1407 agent: Some("claude".to_string()),
1408 instances: 1,
1409 prompt: None,
1410 talks_to: vec![],
1411 channel: None,
1412 channel_config: None,
1413 nudge_interval_secs: None,
1414 receives_standup: None,
1415 standup_interval_secs: None,
1416 owns: Vec::new(),
1417 use_worktrees: false,
1418 },
1419 RoleDef {
1420 name: "eng-1".to_string(),
1421 role_type: RoleType::Engineer,
1422 agent: Some("claude".to_string()),
1423 instances: 1,
1424 prompt: None,
1425 talks_to: vec![],
1426 channel: None,
1427 channel_config: None,
1428 nudge_interval_secs: None,
1429 receives_standup: None,
1430 standup_interval_secs: None,
1431 owns: Vec::new(),
1432 use_worktrees: false,
1433 },
1434 ];
1435 let members = vec![
1436 MemberInstance {
1437 name: "manager".to_string(),
1438 role_name: "manager".to_string(),
1439 role_type: RoleType::Manager,
1440 agent: Some("claude".to_string()),
1441 prompt: None,
1442 reports_to: None,
1443 use_worktrees: false,
1444 },
1445 MemberInstance {
1446 name: "eng-1".to_string(),
1447 role_name: "eng-1".to_string(),
1448 role_type: RoleType::Engineer,
1449 agent: Some("claude".to_string()),
1450 prompt: None,
1451 reports_to: Some("manager".to_string()),
1452 use_worktrees: false,
1453 },
1454 ];
1455
1456 let mut pane_map = HashMap::new();
1457 pane_map.insert("eng-1".to_string(), "%999".to_string());
1458
1459 let mut daemon = TeamDaemon::new(DaemonConfig {
1460 project_root: tmp.path().to_path_buf(),
1461 team_config: super::super::config::TeamConfig {
1462 name: "test".to_string(),
1463 agent: None,
1464 workflow_mode: WorkflowMode::Legacy,
1465 workflow_policy: WorkflowPolicy::default(),
1466 board: BoardConfig::default(),
1467 standup: StandupConfig::default(),
1468 automation: AutomationConfig::default(),
1469 automation_sender: None,
1470 external_senders: Vec::new(),
1471 orchestrator_pane: true,
1472 orchestrator_position: OrchestratorPosition::Bottom,
1473 layout: None,
1474 cost: Default::default(),
1475 event_log_max_bytes: crate::team::DEFAULT_EVENT_LOG_MAX_BYTES,
1476 retro_min_duration_secs: 60,
1477 roles,
1478 },
1479 session: "test".to_string(),
1480 members,
1481 pane_map,
1482 })
1483 .unwrap();
1484
1485 let root = inbox::inboxes_root(tmp.path());
1486 let assign = inbox::InboxMessage::new_assign("manager", "eng-1", "Task #13: fix it");
1487 let id = inbox::deliver_to_inbox(&root, &assign).unwrap();
1488
1489 daemon.deliver_inbox_messages().unwrap();
1490
1491 let engineer_pending = inbox::pending_messages(&root, "eng-1").unwrap();
1492 assert!(engineer_pending.is_empty());
1493
1494 let engineer_all = inbox::all_messages(&root, "eng-1").unwrap();
1495 assert!(
1496 engineer_all
1497 .iter()
1498 .any(|(msg, delivered)| msg.id == id && *delivered)
1499 );
1500
1501 let manager_pending = inbox::pending_messages(&root, "manager").unwrap();
1502 assert_eq!(manager_pending.len(), 1);
1503 assert_eq!(manager_pending[0].from, "daemon");
1504 assert!(manager_pending[0].body.contains("Assignment failed."));
1505 assert!(manager_pending[0].body.contains("Engineer: eng-1"));
1506 assert!(manager_pending[0].body.contains("Message ID:"));
1507 let result = crate::team::load_assignment_result(tmp.path(), &id)
1508 .unwrap()
1509 .unwrap();
1510 assert_eq!(result.status, AssignmentResultStatus::Failed);
1511 assert_eq!(result.engineer, "eng-1");
1512 assert_eq!(daemon.states.get("eng-1"), None);
1513 }
1514
1515 #[test]
1516 fn queue_message_falls_back_to_inbox_when_live_delivery_fails() {
1517 let tmp = tempfile::tempdir().unwrap();
1518 let manager = MemberInstance {
1519 name: "manager".to_string(),
1520 role_name: "manager".to_string(),
1521 role_type: RoleType::Manager,
1522 agent: Some("claude".to_string()),
1523 prompt: None,
1524 reports_to: None,
1525 use_worktrees: false,
1526 };
1527 let mut daemon = TeamDaemon {
1528 config: DaemonConfig {
1529 project_root: tmp.path().to_path_buf(),
1530 team_config: super::super::config::TeamConfig {
1531 name: "test".to_string(),
1532 agent: None,
1533 workflow_mode: WorkflowMode::Legacy,
1534 workflow_policy: WorkflowPolicy::default(),
1535 board: BoardConfig::default(),
1536 standup: StandupConfig::default(),
1537 automation: AutomationConfig::default(),
1538 automation_sender: None,
1539 external_senders: Vec::new(),
1540 orchestrator_pane: true,
1541 orchestrator_position: OrchestratorPosition::Bottom,
1542 layout: None,
1543 cost: Default::default(),
1544 event_log_max_bytes: crate::team::DEFAULT_EVENT_LOG_MAX_BYTES,
1545 retro_min_duration_secs: 60,
1546 roles: Vec::new(),
1547 },
1548 session: "test".to_string(),
1549 members: vec![manager],
1550 pane_map: HashMap::from([("manager".to_string(), "%999".to_string())]),
1551 },
1552 ..empty_legacy_daemon(&tmp)
1553 };
1554
1555 daemon
1556 .queue_message("eng-1", "manager", "Need review on merge handling.")
1557 .unwrap();
1558
1559 let messages =
1560 inbox::pending_messages(&inbox::inboxes_root(tmp.path()), "manager").unwrap();
1561 assert_eq!(messages.len(), 1);
1562 assert_eq!(messages[0].from, "eng-1");
1563 assert!(messages[0].body.contains("Need review on merge handling."));
1564 }
1565
1566 #[test]
1567 fn delivery_confirm_marker_detection_matches_captured_text() {
1568 let marker = message_delivery_marker("manager");
1569 let capture = format!("prompt\n{marker}\nbody\n");
1570 assert!(capture_contains_message_marker(&capture, &marker));
1571 assert!(!capture_contains_message_marker("prompt only", &marker));
1572 }
1573
1574 #[test]
1575 fn delivery_confirm_marker_generation_uses_sender_header() {
1576 assert_eq!(
1577 message_delivery_marker("eng-1-4"),
1578 "--- Message from eng-1-4 ---"
1579 );
1580 }
1581
1582 #[test]
1583 fn failed_delivery_new_sets_expected_fields() {
1584 let delivery = FailedDelivery::new("eng-1", "manager", "Please retry this.");
1585 assert_eq!(delivery.recipient, "eng-1");
1586 assert_eq!(delivery.from, "manager");
1587 assert_eq!(delivery.body, "Please retry this.");
1588 assert_eq!(delivery.attempts, 1);
1589 assert_eq!(delivery.message_marker(), "--- Message from manager ---");
1590 assert!(delivery.has_attempts_remaining());
1591 }
1592
1593 #[test]
1594 fn failed_delivery_emits_single_health_event_per_unique_message() {
1595 let tmp = tempfile::tempdir().unwrap();
1596 let mut daemon = failed_delivery_test_daemon(&tmp);
1597
1598 daemon.record_failed_delivery("eng-1", "manager", "Please retry this.");
1599 daemon.record_failed_delivery("eng-1", "manager", "Please retry this.");
1600
1601 let events = super::super::events::read_events(&tmp.path().join("events.jsonl")).unwrap();
1602 let delivery_failed = events
1603 .into_iter()
1604 .filter(|event| event.event == "delivery_failed")
1605 .collect::<Vec<_>>();
1606 assert_eq!(delivery_failed.len(), 1);
1607 assert_eq!(delivery_failed[0].role.as_deref(), Some("eng-1"));
1608 assert_eq!(delivery_failed[0].from.as_deref(), Some("manager"));
1609 }
1610
1611 #[test]
1612 fn failed_delivery_retry_requeues_before_attempt_cap() {
1613 let tmp = tempfile::tempdir().unwrap();
1614 let mut daemon = failed_delivery_test_daemon(&tmp);
1615 let mut delivery = FailedDelivery::new("eng-1", "manager", "Please retry this.");
1616 delivery.attempts = 1;
1617 delivery.last_attempt = Instant::now() - FAILED_DELIVERY_RETRY_DELAY;
1618 daemon.failed_deliveries.push(delivery);
1619
1620 daemon.retry_failed_deliveries().unwrap();
1621
1622 assert_eq!(daemon.failed_deliveries.len(), 1);
1623 assert_eq!(daemon.failed_deliveries[0].attempts, 2);
1624 let messages =
1625 inbox::pending_messages(&inbox::inboxes_root(tmp.path()), "manager").unwrap();
1626 assert!(messages.is_empty());
1627 }
1628
1629 #[test]
1630 fn failed_delivery_retry_respects_attempt_cap_and_escalates() {
1631 let tmp = tempfile::tempdir().unwrap();
1632 let mut daemon = failed_delivery_test_daemon(&tmp);
1633 let mut delivery = FailedDelivery::new("eng-1", "manager", "Please retry this.");
1634 delivery.attempts = FAILED_DELIVERY_MAX_ATTEMPTS - 1;
1635 delivery.last_attempt = Instant::now() - FAILED_DELIVERY_RETRY_DELAY;
1636 daemon.failed_deliveries.push(delivery);
1637
1638 daemon.retry_failed_deliveries().unwrap();
1639
1640 assert!(daemon.failed_deliveries.is_empty());
1641 let messages =
1642 inbox::pending_messages(&inbox::inboxes_root(tmp.path()), "manager").unwrap();
1643 assert_eq!(messages.len(), 1);
1644 assert_eq!(messages[0].from, "daemon");
1645 assert!(
1646 messages[0]
1647 .body
1648 .contains("Live message delivery failed after 3 attempts.")
1649 );
1650 assert!(messages[0].body.contains("Recipient: eng-1"));
1651 }
1652
1653 #[test]
1654 fn external_sender_delivery() {
1655 let tmp = tempfile::tempdir().unwrap();
1658 let mut daemon = empty_legacy_daemon(&tmp);
1659
1660 daemon.config.team_config.external_senders = vec!["email-router".to_string()];
1662 daemon.config.team_config.roles = vec![RoleDef {
1663 name: "manager".to_string(),
1664 role_type: RoleType::Manager,
1665 agent: Some("claude".to_string()),
1666 instances: 1,
1667 prompt: None,
1668 talks_to: vec![],
1669 channel: None,
1670 channel_config: None,
1671 nudge_interval_secs: None,
1672 receives_standup: None,
1673 standup_interval_secs: None,
1674 owns: Vec::new(),
1675 use_worktrees: false,
1676 }];
1677 daemon.config.members = vec![MemberInstance {
1678 name: "manager".to_string(),
1679 role_name: "manager".to_string(),
1680 role_type: RoleType::Manager,
1681 agent: Some("claude".to_string()),
1682 prompt: None,
1683 reports_to: None,
1684 use_worktrees: false,
1685 }];
1686
1687 daemon
1689 .queue_message("email-router", "manager", "New email from user@example.com")
1690 .unwrap();
1691
1692 let root = inbox::inboxes_root(tmp.path());
1694 let messages = inbox::pending_messages(&root, "manager").unwrap();
1695 assert_eq!(messages.len(), 1);
1696 assert_eq!(messages[0].from, "email-router");
1697 assert!(messages[0].body.contains("New email from user@example.com"));
1698
1699 assert!(
1701 daemon
1702 .config
1703 .team_config
1704 .can_talk("email-router", "manager")
1705 );
1706 }
1707
1708 #[test]
1711 fn is_agent_ready_returns_false_for_nonexistent_pane() {
1712 assert!(!is_agent_ready("%99999999"));
1713 }
1714
1715 #[test]
1716 fn delivery_capture_lines_default_for_idle_agent() {
1717 let tmp = tempfile::tempdir().unwrap();
1718 let mut daemon = failed_delivery_test_daemon(&tmp);
1719 daemon.watchers.insert(
1721 "eng-1".to_string(),
1722 crate::team::watcher::SessionWatcher::new("%9999999", "eng-1", 300, None),
1723 );
1724 assert_eq!(
1725 daemon.delivery_capture_lines_for("eng-1"),
1726 DELIVERY_VERIFICATION_CAPTURE_LINES
1727 );
1728 }
1729
1730 #[test]
1731 fn delivery_capture_lines_increased_for_recently_ready_agent() {
1732 let tmp = tempfile::tempdir().unwrap();
1733 let mut daemon = failed_delivery_test_daemon(&tmp);
1734 let mut watcher = crate::team::watcher::SessionWatcher::new("%9999999", "eng-1", 300, None);
1735 watcher.confirm_ready();
1736 assert_eq!(watcher.state, crate::team::watcher::WatcherState::Ready);
1737 daemon.watchers.insert("eng-1".to_string(), watcher);
1738 assert_eq!(
1739 daemon.delivery_capture_lines_for("eng-1"),
1740 DELIVERY_VERIFICATION_CAPTURE_LINES_RECENTLY_READY
1741 );
1742 }
1743
1744 #[test]
1745 fn delivery_capture_lines_default_for_unknown_agent() {
1746 let tmp = tempfile::tempdir().unwrap();
1747 let daemon = failed_delivery_test_daemon(&tmp);
1748 assert_eq!(
1750 daemon.delivery_capture_lines_for("unknown-agent"),
1751 DELIVERY_VERIFICATION_CAPTURE_LINES
1752 );
1753 }
1754
1755 #[test]
1756 fn check_agent_ready_returns_true_when_watcher_confirmed() {
1757 let tmp = tempfile::tempdir().unwrap();
1758 let mut daemon = failed_delivery_test_daemon(&tmp);
1759 let mut watcher = crate::team::watcher::SessionWatcher::new("%9999999", "eng-1", 300, None);
1760 watcher.activate(); watcher.deactivate();
1762 daemon.watchers.insert("eng-1".to_string(), watcher);
1763 assert!(daemon.check_agent_ready("eng-1", "%9999999"));
1765 }
1766
1767 #[test]
1768 fn check_agent_ready_returns_false_for_unready_nonexistent_pane() {
1769 let tmp = tempfile::tempdir().unwrap();
1770 let mut daemon = failed_delivery_test_daemon(&tmp);
1771 let watcher = crate::team::watcher::SessionWatcher::new("%99999999", "eng-1", 300, None);
1772 daemon.watchers.insert("eng-1".to_string(), watcher);
1773 assert!(
1785 !daemon
1786 .watchers
1787 .get("eng-1")
1788 .unwrap()
1789 .is_ready_for_delivery()
1790 );
1791 }
1792
1793 #[test]
1794 fn deliver_inbox_skips_agents_not_ready() {
1795 let tmp = tempfile::tempdir().unwrap();
1796 let mut daemon = failed_delivery_test_daemon(&tmp);
1797
1798 let root = inbox::inboxes_root(tmp.path());
1800 let msg = inbox::InboxMessage::new_send("manager", "eng-1", "test assignment");
1801 inbox::deliver_to_inbox(&root, &msg).unwrap();
1802
1803 let mut watcher = crate::team::watcher::SessionWatcher::new("%9999999", "eng-1", 300, None);
1805 watcher.activate();
1806 daemon.watchers.insert("eng-1".to_string(), watcher);
1807
1808 daemon.deliver_inbox_messages().unwrap();
1810
1811 let pending = inbox::pending_messages(&root, "eng-1").unwrap();
1813 assert_eq!(
1814 pending.len(),
1815 1,
1816 "message should remain pending for active agent"
1817 );
1818 }
1819
1820 #[test]
1821 fn deliver_inbox_delivers_to_ready_agents() {
1822 let tmp = tempfile::tempdir().unwrap();
1823 let mut daemon = failed_delivery_test_daemon(&tmp);
1824
1825 let root = inbox::inboxes_root(tmp.path());
1827 let msg = inbox::InboxMessage::new_send("manager", "eng-1", "test assignment");
1828 inbox::deliver_to_inbox(&root, &msg).unwrap();
1829
1830 let mut watcher = crate::team::watcher::SessionWatcher::new("%9999999", "eng-1", 300, None);
1833 watcher.confirm_ready();
1834 daemon.watchers.insert("eng-1".to_string(), watcher);
1835
1836 daemon.deliver_inbox_messages().unwrap();
1839
1840 let pending = inbox::pending_messages(&root, "eng-1").unwrap();
1844 let _ = pending;
1848 }
1849
1850 #[test]
1851 fn retry_failed_delivery_skips_non_ready_watcher() {
1852 let tmp = tempfile::tempdir().unwrap();
1853 let mut daemon = failed_delivery_test_daemon(&tmp);
1854
1855 let mut delivery = FailedDelivery::new("eng-1", "manager", "test message");
1857 delivery.last_attempt = Instant::now() - Duration::from_secs(60);
1858 daemon.failed_deliveries.push(delivery);
1859
1860 let mut watcher = crate::team::watcher::SessionWatcher::new("%9999999", "eng-1", 300, None);
1862 watcher.activate();
1863 daemon.watchers.insert("eng-1".to_string(), watcher);
1864
1865 daemon.retry_failed_deliveries().unwrap();
1866
1867 assert_eq!(daemon.failed_deliveries.len(), 1);
1869 }
1870
1871 #[test]
1872 fn retry_failed_delivery_attempts_ready_watcher() {
1873 let tmp = tempfile::tempdir().unwrap();
1874 let mut daemon = failed_delivery_test_daemon(&tmp);
1875
1876 let mut delivery = FailedDelivery::new("eng-1", "manager", "test message");
1878 delivery.last_attempt = Instant::now() - Duration::from_secs(60);
1879 daemon.failed_deliveries.push(delivery);
1880
1881 let mut watcher = crate::team::watcher::SessionWatcher::new("%9999999", "eng-1", 300, None);
1883 watcher.confirm_ready();
1884 daemon.watchers.insert("eng-1".to_string(), watcher);
1885
1886 daemon.retry_failed_deliveries().unwrap();
1887
1888 assert!(
1892 daemon.failed_deliveries.len() <= 1,
1893 "delivery should have been attempted"
1894 );
1895 }
1896
1897 #[test]
1900 fn failed_delivery_is_not_ready_for_retry_when_recent() {
1901 let delivery = FailedDelivery::new("eng-1", "manager", "test");
1902 assert!(!delivery.is_ready_for_retry(Instant::now()));
1904 }
1905
1906 #[test]
1907 fn failed_delivery_is_ready_for_retry_after_delay() {
1908 let mut delivery = FailedDelivery::new("eng-1", "manager", "test");
1909 delivery.last_attempt =
1910 Instant::now() - FAILED_DELIVERY_RETRY_DELAY - Duration::from_secs(1);
1911 assert!(delivery.is_ready_for_retry(Instant::now()));
1912 }
1913
1914 #[test]
1915 fn failed_delivery_has_attempts_remaining_at_boundary() {
1916 let mut delivery = FailedDelivery::new("eng-1", "manager", "test");
1917 delivery.attempts = FAILED_DELIVERY_MAX_ATTEMPTS - 1;
1918 assert!(delivery.has_attempts_remaining());
1919 delivery.attempts = FAILED_DELIVERY_MAX_ATTEMPTS;
1920 assert!(!delivery.has_attempts_remaining());
1921 }
1922
1923 #[test]
1924 fn failed_delivery_message_marker_uses_from_field() {
1925 let delivery = FailedDelivery::new("eng-1", "architect", "body");
1926 assert_eq!(delivery.message_marker(), "--- Message from architect ---");
1927 }
1928
1929 #[test]
1932 fn message_delivery_variants_are_distinct() {
1933 assert_ne!(MessageDelivery::Channel, MessageDelivery::LivePane);
1934 assert_ne!(MessageDelivery::LivePane, MessageDelivery::InboxQueued);
1935 assert_ne!(
1936 MessageDelivery::InboxQueued,
1937 MessageDelivery::SkippedUnknownRecipient
1938 );
1939 assert_eq!(MessageDelivery::Channel, MessageDelivery::Channel);
1940 }
1941
1942 #[test]
1945 fn telegram_failure_key_contains_recipient() {
1946 let key = TeamDaemon::telegram_failure_key("human");
1947 assert_eq!(key, "telegram-delivery-failures::human");
1948 }
1949
1950 #[test]
1951 fn telegram_circuit_breaker_key_contains_recipient() {
1952 let key = TeamDaemon::telegram_circuit_breaker_key("user-1");
1953 assert_eq!(key, "telegram-delivery-breaker::user-1");
1954 }
1955
1956 #[test]
1959 fn telegram_retry_config_has_expected_defaults() {
1960 let config = TeamDaemon::telegram_retry_config();
1961 assert_eq!(config.max_retries, 3);
1962 assert_eq!(config.base_delay_ms, 100);
1963 assert_eq!(config.max_delay_ms, 1_000);
1964 assert!(!config.jitter);
1965 }
1966
1967 #[test]
1970 fn telegram_channel_not_paused_by_default() {
1971 let tmp = tempfile::tempdir().unwrap();
1972 let daemon = empty_legacy_daemon(&tmp);
1973 assert!(!daemon.telegram_channel_paused("human"));
1974 }
1975
1976 #[test]
1977 fn telegram_channel_paused_when_breaker_open() {
1978 let tmp = tempfile::tempdir().unwrap();
1979 let mut daemon = empty_legacy_daemon(&tmp);
1980 daemon.intervention_cooldowns.insert(
1981 TeamDaemon::telegram_circuit_breaker_key("human"),
1982 Instant::now(),
1983 );
1984 assert!(daemon.telegram_channel_paused("human"));
1985 }
1986
1987 #[test]
1988 fn telegram_channel_not_paused_after_cooldown_expires() {
1989 let tmp = tempfile::tempdir().unwrap();
1990 let mut daemon = empty_legacy_daemon(&tmp);
1991 daemon.intervention_cooldowns.insert(
1992 TeamDaemon::telegram_circuit_breaker_key("human"),
1993 Instant::now() - TELEGRAM_DELIVERY_CIRCUIT_BREAKER_COOLDOWN - Duration::from_secs(1),
1994 );
1995 assert!(!daemon.telegram_channel_paused("human"));
1996 }
1997
1998 #[test]
2001 fn clear_telegram_delivery_failures_removes_keys() {
2002 let tmp = tempfile::tempdir().unwrap();
2003 let mut daemon = empty_legacy_daemon(&tmp);
2004 daemon
2005 .retry_counts
2006 .insert(TeamDaemon::telegram_failure_key("human"), 3);
2007 daemon.intervention_cooldowns.insert(
2008 TeamDaemon::telegram_circuit_breaker_key("human"),
2009 Instant::now(),
2010 );
2011
2012 daemon.clear_telegram_delivery_failures("human");
2013
2014 assert!(
2015 !daemon
2016 .retry_counts
2017 .contains_key(&TeamDaemon::telegram_failure_key("human"))
2018 );
2019 assert!(
2020 !daemon
2021 .intervention_cooldowns
2022 .contains_key(&TeamDaemon::telegram_circuit_breaker_key("human"))
2023 );
2024 }
2025
2026 #[test]
2029 fn increment_telegram_delivery_failures_starts_at_one() {
2030 let tmp = tempfile::tempdir().unwrap();
2031 let mut daemon = empty_legacy_daemon(&tmp);
2032 let count = daemon.increment_telegram_delivery_failures("human");
2033 assert_eq!(count, 1);
2034 }
2035
2036 #[test]
2037 fn increment_telegram_delivery_failures_accumulates() {
2038 let tmp = tempfile::tempdir().unwrap();
2039 let mut daemon = empty_legacy_daemon(&tmp);
2040 daemon.increment_telegram_delivery_failures("human");
2041 daemon.increment_telegram_delivery_failures("human");
2042 let count = daemon.increment_telegram_delivery_failures("human");
2043 assert_eq!(count, 3);
2044 }
2045
2046 #[test]
2049 fn deliver_message_skips_unknown_recipient() {
2050 let tmp = tempfile::tempdir().unwrap();
2051 let mut daemon = empty_legacy_daemon(&tmp);
2052 let result = daemon
2053 .deliver_message("manager", "nonexistent-role", "hello")
2054 .unwrap();
2055 assert_eq!(result, MessageDelivery::SkippedUnknownRecipient);
2056 }
2057
2058 #[test]
2061 fn deliver_message_to_member_without_pane_goes_to_inbox() {
2062 let tmp = tempfile::tempdir().unwrap();
2063 let mut daemon = empty_legacy_daemon(&tmp);
2064 daemon.config.members.push(MemberInstance {
2066 name: "eng-2".to_string(),
2067 role_name: "eng".to_string(),
2068 role_type: RoleType::Engineer,
2069 agent: Some("claude".to_string()),
2070 prompt: None,
2071 reports_to: Some("manager".to_string()),
2072 use_worktrees: false,
2073 });
2074
2075 let result = daemon
2076 .deliver_message("manager", "eng-2", "Go fix the bug")
2077 .unwrap();
2078 assert_eq!(result, MessageDelivery::InboxQueued);
2079
2080 let root = inbox::inboxes_root(tmp.path());
2081 let messages = inbox::pending_messages(&root, "eng-2").unwrap();
2082 assert_eq!(messages.len(), 1);
2083 assert_eq!(messages[0].from, "manager");
2084 assert!(messages[0].body.contains("Go fix the bug"));
2085 }
2086
2087 #[test]
2090 fn deliver_channel_message_records_routing_event() {
2091 let tmp = tempfile::tempdir().unwrap();
2092 let mut daemon = empty_legacy_daemon(&tmp);
2093 let sent = Arc::new(Mutex::new(Vec::new()));
2094 daemon.channels.insert(
2095 "user".to_string(),
2096 Box::new(RecordingChannel {
2097 messages: Arc::clone(&sent),
2098 }),
2099 );
2100
2101 let result = daemon
2102 .deliver_channel_message("eng-1", "user", "Status update")
2103 .unwrap();
2104 assert_eq!(result, MessageDelivery::Channel);
2105 assert_eq!(sent.lock().unwrap().as_slice(), ["Status update"]);
2106 }
2107
2108 #[test]
2111 fn record_failed_delivery_deduplicates_same_message() {
2112 let tmp = tempfile::tempdir().unwrap();
2113 let mut daemon = failed_delivery_test_daemon(&tmp);
2114
2115 daemon.record_failed_delivery("eng-1", "manager", "test msg");
2116 let first_attempt_time = daemon.failed_deliveries[0].last_attempt;
2117 std::thread::sleep(Duration::from_millis(10));
2118 daemon.record_failed_delivery("eng-1", "manager", "test msg");
2119
2120 assert_eq!(daemon.failed_deliveries.len(), 1);
2122 assert!(daemon.failed_deliveries[0].last_attempt >= first_attempt_time);
2124 }
2125
2126 #[test]
2127 fn record_failed_delivery_tracks_different_messages_separately() {
2128 let tmp = tempfile::tempdir().unwrap();
2129 let mut daemon = failed_delivery_test_daemon(&tmp);
2130
2131 daemon.record_failed_delivery("eng-1", "manager", "msg A");
2132 daemon.record_failed_delivery("eng-1", "manager", "msg B");
2133
2134 assert_eq!(daemon.failed_deliveries.len(), 2);
2135 }
2136
2137 #[test]
2138 fn record_failed_delivery_tracks_different_recipients_separately() {
2139 let tmp = tempfile::tempdir().unwrap();
2140 let mut daemon = failed_delivery_test_daemon(&tmp);
2141 daemon
2143 .config
2144 .pane_map
2145 .insert("eng-2".to_string(), "%8888888".to_string());
2146
2147 daemon.record_failed_delivery("eng-1", "manager", "same msg");
2148 daemon.record_failed_delivery("eng-2", "manager", "same msg");
2149
2150 assert_eq!(daemon.failed_deliveries.len(), 2);
2151 }
2152
2153 #[test]
2156 fn clear_failed_delivery_removes_matching_entry() {
2157 let tmp = tempfile::tempdir().unwrap();
2158 let mut daemon = failed_delivery_test_daemon(&tmp);
2159 daemon
2160 .failed_deliveries
2161 .push(FailedDelivery::new("eng-1", "manager", "msg A"));
2162 daemon
2163 .failed_deliveries
2164 .push(FailedDelivery::new("eng-1", "manager", "msg B"));
2165
2166 daemon.clear_failed_delivery("eng-1", "manager", "msg A");
2167
2168 assert_eq!(daemon.failed_deliveries.len(), 1);
2169 assert_eq!(daemon.failed_deliveries[0].body, "msg B");
2170 }
2171
2172 #[test]
2173 fn clear_failed_delivery_no_op_when_not_found() {
2174 let tmp = tempfile::tempdir().unwrap();
2175 let mut daemon = failed_delivery_test_daemon(&tmp);
2176 daemon
2177 .failed_deliveries
2178 .push(FailedDelivery::new("eng-1", "manager", "msg A"));
2179
2180 daemon.clear_failed_delivery("eng-1", "manager", "nonexistent");
2181
2182 assert_eq!(daemon.failed_deliveries.len(), 1);
2183 }
2184
2185 #[test]
2188 fn escalation_recipient_uses_reports_to() {
2189 let tmp = tempfile::tempdir().unwrap();
2190 let daemon = failed_delivery_test_daemon(&tmp);
2191 let target = daemon.failed_delivery_escalation_recipient("eng-1");
2193 assert_eq!(target.as_deref(), Some("manager"));
2194 }
2195
2196 #[test]
2197 fn escalation_recipient_falls_back_to_any_manager() {
2198 let tmp = tempfile::tempdir().unwrap();
2199 let mut daemon = failed_delivery_test_daemon(&tmp);
2200 daemon.config.members.push(MemberInstance {
2202 name: "standalone".to_string(),
2203 role_name: "standalone".to_string(),
2204 role_type: RoleType::Engineer,
2205 agent: Some("claude".to_string()),
2206 prompt: None,
2207 reports_to: None,
2208 use_worktrees: false,
2209 });
2210
2211 let target = daemon.failed_delivery_escalation_recipient("standalone");
2212 assert_eq!(target.as_deref(), Some("manager"));
2213 }
2214
2215 #[test]
2216 fn escalation_recipient_none_for_unknown_member_without_managers() {
2217 let tmp = tempfile::tempdir().unwrap();
2218 let daemon = empty_legacy_daemon(&tmp);
2219 let target = daemon.failed_delivery_escalation_recipient("unknown");
2221 assert!(target.is_none());
2223 }
2224
2225 #[test]
2228 fn escalate_failed_delivery_sends_to_manager_inbox() {
2229 let tmp = tempfile::tempdir().unwrap();
2230 let mut daemon = failed_delivery_test_daemon(&tmp);
2231 let delivery = FailedDelivery::new("eng-1", "architect", "critical update");
2232
2233 daemon.escalate_failed_delivery(&delivery).unwrap();
2234
2235 let root = inbox::inboxes_root(tmp.path());
2236 let messages = inbox::pending_messages(&root, "manager").unwrap();
2238 assert_eq!(messages.len(), 1);
2239 assert_eq!(messages[0].from, "daemon");
2240 assert!(messages[0].body.contains("Live message delivery failed"));
2241 assert!(messages[0].body.contains("Recipient: eng-1"));
2242 assert!(messages[0].body.contains("From: architect"));
2243 assert!(messages[0].body.contains("critical update"));
2244 }
2245
2246 #[test]
2247 fn escalate_failed_delivery_no_crash_without_target() {
2248 let tmp = tempfile::tempdir().unwrap();
2249 let mut daemon = empty_legacy_daemon(&tmp);
2250 let delivery = FailedDelivery::new("orphan", "ghost", "lost message");
2251
2252 daemon.escalate_failed_delivery(&delivery).unwrap();
2254 }
2255
2256 #[test]
2259 fn retry_failed_deliveries_noop_when_empty() {
2260 let tmp = tempfile::tempdir().unwrap();
2261 let mut daemon = failed_delivery_test_daemon(&tmp);
2262 daemon.retry_failed_deliveries().unwrap();
2263 assert!(daemon.failed_deliveries.is_empty());
2264 }
2265
2266 #[test]
2267 fn retry_failed_deliveries_skips_too_recent() {
2268 let tmp = tempfile::tempdir().unwrap();
2269 let mut daemon = failed_delivery_test_daemon(&tmp);
2270 let delivery = FailedDelivery::new("eng-1", "manager", "recent msg");
2272 daemon.failed_deliveries.push(delivery);
2273
2274 daemon.retry_failed_deliveries().unwrap();
2275
2276 assert_eq!(daemon.failed_deliveries.len(), 1);
2278 assert_eq!(daemon.failed_deliveries[0].attempts, 1); }
2280
2281 #[test]
2282 fn retry_failed_deliveries_escalates_without_pane() {
2283 let tmp = tempfile::tempdir().unwrap();
2284 let mut daemon = failed_delivery_test_daemon(&tmp);
2285 daemon.config.pane_map.clear();
2287 let mut delivery = FailedDelivery::new("eng-1", "manager", "no pane msg");
2288 delivery.last_attempt =
2289 Instant::now() - FAILED_DELIVERY_RETRY_DELAY - Duration::from_secs(1);
2290 daemon.failed_deliveries.push(delivery);
2291
2292 daemon.retry_failed_deliveries().unwrap();
2293
2294 assert!(daemon.failed_deliveries.is_empty());
2296 let root = inbox::inboxes_root(tmp.path());
2298 let messages = inbox::pending_messages(&root, "manager").unwrap();
2299 assert_eq!(messages.len(), 1);
2300 assert!(messages[0].body.contains("Live message delivery failed"));
2301 }
2302
2303 #[test]
2306 fn resolve_role_name_returns_human_for_human() {
2307 let tmp = tempfile::tempdir().unwrap();
2308 let daemon = failed_delivery_test_daemon(&tmp);
2309 assert_eq!(daemon.resolve_role_name("human"), "human");
2310 }
2311
2312 #[test]
2313 fn resolve_role_name_returns_daemon_for_daemon() {
2314 let tmp = tempfile::tempdir().unwrap();
2315 let daemon = failed_delivery_test_daemon(&tmp);
2316 assert_eq!(daemon.resolve_role_name("daemon"), "daemon");
2317 }
2318
2319 #[test]
2320 fn resolve_role_name_maps_member_to_role_name() {
2321 let tmp = tempfile::tempdir().unwrap();
2322 let daemon = failed_delivery_test_daemon(&tmp);
2323 assert_eq!(daemon.resolve_role_name("eng-1"), "eng");
2325 }
2326
2327 #[test]
2328 fn resolve_role_name_returns_input_for_unknown() {
2329 let tmp = tempfile::tempdir().unwrap();
2330 let daemon = failed_delivery_test_daemon(&tmp);
2331 assert_eq!(daemon.resolve_role_name("unknown-member"), "unknown-member");
2332 }
2333
2334 #[test]
2337 fn capture_contains_marker_empty_capture() {
2338 assert!(!capture_contains_message_marker(
2339 "",
2340 "--- Message from x ---"
2341 ));
2342 }
2343
2344 #[test]
2345 fn capture_contains_marker_partial_match_fails() {
2346 let marker = message_delivery_marker("manager");
2347 assert!(!capture_contains_message_marker(
2348 "--- Message from",
2349 &marker
2350 ));
2351 }
2352
2353 #[test]
2354 fn capture_contains_marker_multiline_capture() {
2355 let marker = message_delivery_marker("eng-1");
2356 let capture = "line1\nline2\n--- Message from eng-1 ---\nline4\n";
2357 assert!(capture_contains_message_marker(capture, &marker));
2358 }
2359
2360 #[test]
2363 fn queue_daemon_message_to_unknown_skips() {
2364 let tmp = tempfile::tempdir().unwrap();
2365 let mut daemon = empty_legacy_daemon(&tmp);
2366 let result = daemon.queue_daemon_message("nobody", "test msg").unwrap();
2367 assert_eq!(result, MessageDelivery::SkippedUnknownRecipient);
2368 }
2369
2370 #[test]
2373 fn deliver_channel_message_failing_non_telegram_channel_returns_error() {
2374 let tmp = tempfile::tempdir().unwrap();
2375 let mut daemon = empty_legacy_daemon(&tmp);
2376 daemon
2377 .channels
2378 .insert("user".to_string(), Box::new(FailingChannel));
2379
2380 let result = daemon.deliver_channel_message("eng-1", "user", "test");
2381 assert!(result.is_err());
2382 }
2383
2384 #[test]
2387 fn telegram_delivery_blocked_when_circuit_breaker_open() {
2388 let tmp = tempfile::tempdir().unwrap();
2389 let mut daemon = empty_legacy_daemon(&tmp);
2390 let sent = Arc::new(Mutex::new(Vec::new()));
2391 daemon.channels.insert(
2392 "user".to_string(),
2393 Box::new(RecordingChannel {
2394 messages: Arc::clone(&sent),
2395 }),
2396 );
2397 daemon.channels.insert(
2399 "tg-user".to_string(),
2400 Box::new(SequencedTelegramChannel {
2401 results: Arc::new(Mutex::new(VecDeque::from([
2402 Ok(()),
2403 Ok(()),
2404 Ok(()),
2405 Ok(()),
2406 Ok(()),
2407 ]))),
2408 attempts: Arc::new(Mutex::new(0)),
2409 }),
2410 );
2411 daemon.intervention_cooldowns.insert(
2413 TeamDaemon::telegram_circuit_breaker_key("tg-user"),
2414 Instant::now(),
2415 );
2416
2417 let result = daemon.deliver_channel_message("eng-1", "tg-user", "blocked msg");
2418 assert!(result.is_err());
2419 let err = result.unwrap_err().to_string();
2420 assert!(err.contains("circuit breaker is open"));
2421 }
2422
2423 #[test]
2426 fn delivery_verification_constants_are_sane() {
2427 const {
2428 assert!(
2429 DELIVERY_VERIFICATION_CAPTURE_LINES_RECENTLY_READY
2430 > DELIVERY_VERIFICATION_CAPTURE_LINES
2431 );
2432 assert!(DELIVERY_VERIFICATION_CAPTURE_LINES > 0);
2433 assert!(FAILED_DELIVERY_MAX_ATTEMPTS >= 2);
2434 }
2435 assert!(FAILED_DELIVERY_RETRY_DELAY >= Duration::from_secs(1));
2436 }
2437
2438 #[test]
2441 fn verify_message_content_in_pane_returns_false_for_nonexistent_pane() {
2442 let tmp = tempfile::tempdir().unwrap();
2443 let daemon = failed_delivery_test_daemon(&tmp);
2444 assert!(!daemon.verify_message_content_in_pane("%99999999", "--- Message from test ---"));
2445 }
2446
2447 #[test]
2448 fn verify_message_content_in_pane_lines_returns_false_for_nonexistent_pane() {
2449 let tmp = tempfile::tempdir().unwrap();
2450 let daemon = failed_delivery_test_daemon(&tmp);
2451 assert!(!daemon.verify_message_content_in_pane_lines(
2452 "%99999999",
2453 "--- Message from test ---",
2454 100
2455 ));
2456 }
2457
2458 #[test]
2461 fn failed_delivery_not_ready_for_immediate_retry() {
2462 let fd = FailedDelivery::new("eng-1", "manager", "test message");
2463 assert!(!fd.is_ready_for_retry(Instant::now()));
2465 }
2466
2467 #[test]
2468 fn failed_delivery_ready_after_delay() {
2469 let mut fd = FailedDelivery::new("eng-1", "manager", "test message");
2470 fd.last_attempt = Instant::now() - FAILED_DELIVERY_RETRY_DELAY - Duration::from_secs(1);
2472 assert!(fd.is_ready_for_retry(Instant::now()));
2473 }
2474
2475 #[test]
2476 fn failed_delivery_has_attempts_remaining() {
2477 let mut fd = FailedDelivery::new("eng-1", "manager", "test message");
2478 assert!(fd.has_attempts_remaining()); fd.attempts = FAILED_DELIVERY_MAX_ATTEMPTS;
2480 assert!(!fd.has_attempts_remaining());
2481 }
2482
2483 #[test]
2484 fn failed_delivery_message_marker_format() {
2485 let fd = FailedDelivery::new("eng-1", "manager", "test message");
2486 let marker = fd.message_marker();
2487 assert!(marker.contains("manager"));
2488 }
2489
2490 #[test]
2491 fn failed_delivery_fields_preserved() {
2492 let fd = FailedDelivery::new("eng-1", "manager", "hello world");
2493 assert_eq!(fd.recipient, "eng-1");
2494 assert_eq!(fd.from, "manager");
2495 assert_eq!(fd.body, "hello world");
2496 assert_eq!(fd.attempts, 1);
2497 }
2498
2499 #[test]
2500 fn telegram_circuit_breaker_key_format() {
2501 let key = TeamDaemon::telegram_circuit_breaker_key("eng-1");
2502 assert!(key.contains("eng-1"));
2503 assert!(key.starts_with("telegram-delivery-breaker::"));
2504 }
2505
2506 #[test]
2507 fn telegram_failure_key_format() {
2508 let key = TeamDaemon::telegram_failure_key("manager");
2509 assert!(key.contains("manager"));
2510 assert!(key.starts_with("telegram-delivery-failures::"));
2511 }
2512
2513 #[test]
2514 fn telegram_retry_config_has_sensible_defaults() {
2515 let config = TeamDaemon::telegram_retry_config();
2516 assert!(config.max_retries >= 1);
2517 assert!(config.max_delay_ms > config.base_delay_ms);
2518 }
2519
2520 #[test]
2521 fn telegram_channel_not_paused_initially() {
2522 let tmp = tempfile::tempdir().unwrap();
2523 let daemon = empty_legacy_daemon(&tmp);
2524 assert!(!daemon.telegram_channel_paused("eng-1"));
2525 }
2526
2527 #[test]
2530 fn pending_queue_buffers_message_when_agent_not_ready() {
2531 let tmp = tempfile::tempdir().unwrap();
2532 let mut daemon = failed_delivery_test_daemon(&tmp);
2533 let watcher = crate::team::watcher::SessionWatcher::new("%9999999", "eng-1", 300, None);
2535 assert!(!watcher.is_ready_for_delivery());
2536 daemon.watchers.insert("eng-1".to_string(), watcher);
2537
2538 let result = daemon
2539 .deliver_message("manager", "eng-1", "task assignment")
2540 .unwrap();
2541
2542 assert_eq!(
2543 result,
2544 MessageDelivery::DeferredPending,
2545 "message to starting agent must be deferred to pending queue"
2546 );
2547 let queue = daemon.pending_delivery_queue.get("eng-1").unwrap();
2548 assert_eq!(queue.len(), 1);
2549 assert_eq!(queue[0].from, "manager");
2550 assert_eq!(queue[0].body, "task assignment");
2551 }
2552
2553 #[test]
2554 fn drain_pending_queue_delivers_when_agent_ready() {
2555 let tmp = tempfile::tempdir().unwrap();
2556 let mut daemon = failed_delivery_test_daemon(&tmp);
2557
2558 daemon
2560 .pending_delivery_queue
2561 .entry("eng-1".to_string())
2562 .or_default()
2563 .push(PendingMessage {
2564 from: "manager".to_string(),
2565 body: "queued assignment".to_string(),
2566 queued_at: Instant::now(),
2567 });
2568
2569 let mut watcher = crate::team::watcher::SessionWatcher::new("%9999999", "eng-1", 300, None);
2571 watcher.confirm_ready();
2572 daemon.watchers.insert("eng-1".to_string(), watcher);
2573
2574 daemon.drain_pending_queue("eng-1").unwrap();
2575
2576 assert!(
2578 daemon
2579 .pending_delivery_queue
2580 .get("eng-1")
2581 .map(|q| q.is_empty())
2582 .unwrap_or(true),
2583 "pending queue must be empty after drain"
2584 );
2585
2586 let root = inbox::inboxes_root(tmp.path());
2588 let messages = inbox::pending_messages(&root, "eng-1").unwrap();
2589 assert_eq!(messages.len(), 1);
2590 assert_eq!(messages[0].body, "queued assignment");
2591 }
2592
2593 #[test]
2594 fn drain_pending_queue_noop_when_empty() {
2595 let tmp = tempfile::tempdir().unwrap();
2596 let mut daemon = failed_delivery_test_daemon(&tmp);
2597 daemon.drain_pending_queue("eng-1").unwrap();
2599 assert!(
2600 daemon
2601 .pending_delivery_queue
2602 .get("eng-1")
2603 .map(|q| q.is_empty())
2604 .unwrap_or(true)
2605 );
2606 }
2607
2608 #[test]
2609 fn escalation_skipped_for_starting_agents() {
2610 let tmp = tempfile::tempdir().unwrap();
2611 let mut daemon = failed_delivery_test_daemon(&tmp);
2612
2613 let watcher = crate::team::watcher::SessionWatcher::new("%9999999", "eng-1", 300, None);
2615 assert!(!watcher.is_ready_for_delivery());
2616 daemon.watchers.insert("eng-1".to_string(), watcher);
2617
2618 let mut delivery = FailedDelivery::new("eng-1", "manager", "assignment");
2621 delivery.attempts = FAILED_DELIVERY_MAX_ATTEMPTS - 1;
2622 delivery.last_attempt =
2623 Instant::now() - FAILED_DELIVERY_RETRY_DELAY - Duration::from_secs(1);
2624 daemon.failed_deliveries.push(delivery);
2625
2626 daemon.retry_failed_deliveries().unwrap();
2627
2628 let root = inbox::inboxes_root(tmp.path());
2631 let architect_inbox = inbox::pending_messages(&root, "architect").unwrap();
2632 assert!(
2633 architect_inbox.is_empty(),
2634 "no escalation expected while agent is starting"
2635 );
2636 assert_eq!(
2638 daemon.failed_deliveries.len(),
2639 1,
2640 "failed delivery must stay in queue for starting agent"
2641 );
2642 }
2643
2644 #[test]
2645 fn escalation_happens_for_ready_agents_with_failed_delivery() {
2646 let tmp = tempfile::tempdir().unwrap();
2647 let mut daemon = failed_delivery_test_daemon(&tmp);
2648
2649 let mut watcher = crate::team::watcher::SessionWatcher::new("%9999999", "eng-1", 300, None);
2651 watcher.confirm_ready();
2652 daemon.watchers.insert("eng-1".to_string(), watcher);
2653
2654 let mut delivery = FailedDelivery::new("eng-1", "manager", "assignment");
2656 delivery.attempts = FAILED_DELIVERY_MAX_ATTEMPTS - 1;
2657 delivery.last_attempt =
2658 Instant::now() - FAILED_DELIVERY_RETRY_DELAY - Duration::from_secs(1);
2659 daemon.failed_deliveries.push(delivery);
2660
2661 daemon.retry_failed_deliveries().unwrap();
2662
2663 let root = inbox::inboxes_root(tmp.path());
2666 let manager_inbox = inbox::pending_messages(&root, "manager").unwrap();
2667 assert!(
2668 !manager_inbox.is_empty(),
2669 "failed delivery for ready agent must be escalated"
2670 );
2671 }
2672
2673 #[test]
2674 fn multiple_messages_queued_and_drained_in_order() {
2675 let tmp = tempfile::tempdir().unwrap();
2676 let mut daemon = failed_delivery_test_daemon(&tmp);
2677
2678 let watcher = crate::team::watcher::SessionWatcher::new("%9999999", "eng-1", 300, None);
2680 daemon.watchers.insert("eng-1".to_string(), watcher);
2681
2682 for i in 1..=3u32 {
2683 let result = daemon
2684 .deliver_message("manager", "eng-1", &format!("msg-{i}"))
2685 .unwrap();
2686 assert_eq!(result, MessageDelivery::DeferredPending);
2687 }
2688
2689 let queue = daemon.pending_delivery_queue.get("eng-1").unwrap();
2690 assert_eq!(queue.len(), 3);
2691 assert_eq!(queue[0].body, "msg-1");
2693 assert_eq!(queue[1].body, "msg-2");
2694 assert_eq!(queue[2].body, "msg-3");
2695
2696 daemon.watchers.get_mut("eng-1").unwrap().confirm_ready();
2698 daemon.drain_pending_queue("eng-1").unwrap();
2699
2700 assert!(
2702 daemon
2703 .pending_delivery_queue
2704 .get("eng-1")
2705 .map(|q| q.is_empty())
2706 .unwrap_or(true)
2707 );
2708
2709 let root = inbox::inboxes_root(tmp.path());
2711 let inbox_msgs = inbox::pending_messages(&root, "eng-1").unwrap();
2712 assert_eq!(inbox_msgs.len(), 3, "all queued messages must be delivered");
2713 let mut bodies: Vec<&str> = inbox_msgs.iter().map(|m| m.body.as_str()).collect();
2715 bodies.sort();
2716 assert_eq!(bodies, vec!["msg-1", "msg-2", "msg-3"]);
2717 }
2718
2719 #[test]
2722 fn pending_queue_full_lifecycle_buffer_transition_drain_verify() {
2723 let tmp = tempfile::tempdir().unwrap();
2724 let mut daemon = failed_delivery_test_daemon(&tmp);
2725
2726 let watcher = crate::team::watcher::SessionWatcher::new("%9999999", "eng-1", 300, None);
2728 assert!(!watcher.is_ready_for_delivery());
2729 daemon.watchers.insert("eng-1".to_string(), watcher);
2730
2731 let result = daemon
2733 .deliver_message("manager", "eng-1", "Task #42: implement feature")
2734 .unwrap();
2735 assert_eq!(
2736 result,
2737 MessageDelivery::DeferredPending,
2738 "message to starting agent must be deferred"
2739 );
2740 let queue = daemon.pending_delivery_queue.get("eng-1").unwrap();
2741 assert_eq!(
2742 queue.len(),
2743 1,
2744 "pending queue must contain exactly one message"
2745 );
2746 assert_eq!(queue[0].from, "manager");
2747 assert_eq!(queue[0].body, "Task #42: implement feature");
2748
2749 daemon.watchers.get_mut("eng-1").unwrap().confirm_ready();
2751 assert!(
2752 daemon
2753 .watchers
2754 .get("eng-1")
2755 .unwrap()
2756 .is_ready_for_delivery()
2757 );
2758
2759 daemon.drain_pending_queue("eng-1").unwrap();
2761
2762 assert!(
2764 daemon
2765 .pending_delivery_queue
2766 .get("eng-1")
2767 .map(|q| q.is_empty())
2768 .unwrap_or(true),
2769 "pending queue must be empty after drain"
2770 );
2771
2772 let root = inbox::inboxes_root(tmp.path());
2774 let inbox_msgs = inbox::pending_messages(&root, "eng-1").unwrap();
2775 assert_eq!(
2776 inbox_msgs.len(),
2777 1,
2778 "message must arrive in inbox after drain"
2779 );
2780 assert_eq!(inbox_msgs[0].body, "Task #42: implement feature");
2781 assert_eq!(inbox_msgs[0].from, "manager");
2782 }
2783
2784 #[test]
2787 fn marker_scrolloff_detected_as_delivered_when_agent_active() {
2788 let tmp = tempfile::tempdir().unwrap();
2792 let mut daemon = failed_delivery_test_daemon(&tmp);
2793 let mut watcher = crate::team::watcher::SessionWatcher::new("%9999999", "eng-1", 300, None);
2794 watcher.activate(); daemon.watchers.insert("eng-1".to_string(), watcher);
2796
2797 assert!(daemon.agent_went_active_after_injection("%9999999", "eng-1"));
2800 }
2801
2802 #[test]
2803 fn marker_scrolloff_not_inferred_when_watcher_never_ready() {
2804 let tmp = tempfile::tempdir().unwrap();
2807 let mut daemon = failed_delivery_test_daemon(&tmp);
2808 let watcher = crate::team::watcher::SessionWatcher::new("%9999999", "eng-1", 300, None);
2809 assert!(!watcher.is_ready_for_delivery());
2810 daemon.watchers.insert("eng-1".to_string(), watcher);
2811
2812 assert!(!daemon.agent_went_active_after_injection("%9999999", "eng-1"));
2813 }
2814
2815 #[test]
2816 fn marker_scrolloff_not_inferred_without_watcher() {
2817 let tmp = tempfile::tempdir().unwrap();
2819 let daemon = failed_delivery_test_daemon(&tmp);
2820 assert!(!daemon.agent_went_active_after_injection("%9999999", "unknown-agent"));
2821 }
2822
2823 #[test]
2824 fn state_transition_confirms_delivery_after_activate() {
2825 let tmp = tempfile::tempdir().unwrap();
2829 let mut daemon = failed_delivery_test_daemon(&tmp);
2830 let mut watcher = crate::team::watcher::SessionWatcher::new("%9999999", "eng-1", 300, None);
2831 watcher.confirm_ready();
2832 assert!(watcher.is_ready_for_delivery());
2833 watcher.activate(); assert!(watcher.is_ready_for_delivery()); daemon.watchers.insert("eng-1".to_string(), watcher);
2836
2837 assert!(daemon.agent_went_active_after_injection("%9999999", "eng-1"));
2839 }
2840
2841 #[test]
2842 fn state_transition_ready_to_active_clears_failed_delivery() {
2843 let tmp = tempfile::tempdir().unwrap();
2846 let mut daemon = failed_delivery_test_daemon(&tmp);
2847 let mut watcher = crate::team::watcher::SessionWatcher::new("%9999999", "eng-1", 300, None);
2848 watcher.activate();
2849 daemon.watchers.insert("eng-1".to_string(), watcher);
2850
2851 daemon
2853 .failed_deliveries
2854 .push(FailedDelivery::new("eng-1", "manager", "test message"));
2855 assert_eq!(daemon.failed_deliveries.len(), 1);
2856
2857 daemon.clear_failed_delivery("eng-1", "manager", "test message");
2860 assert!(daemon.failed_deliveries.is_empty());
2861 }
2862}