1pub mod conflict;
16pub mod delivery;
17pub mod learnings;
18pub mod messages;
19pub mod publish;
20pub mod server;
21pub mod watcher;
22
23use std::collections::{HashMap, HashSet};
24use std::path::PathBuf;
25use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
26use std::sync::{Arc, OnceLock, RwLock};
27use std::thread::JoinHandle;
28use std::time::Instant;
29
30use serde::Serialize;
31
32use crate::config::{BrokerConfig, ConflictConfig};
33pub use messages::BrokerMessage;
34
35#[derive(Debug, Clone)]
39pub struct WatchTarget {
40 pub agent_id: String,
42 pub cli: String,
44 pub worktree_path: PathBuf,
46}
47
48#[derive(Debug, Clone)]
50pub struct AgentRecord {
51 pub agent_id: String,
53 pub status: String,
55 pub last_seen: Instant,
57 pub last_message: Option<BrokerMessage>,
59 pub last_committed_at: Option<Instant>,
67}
68
69#[derive(Debug, Clone, Serialize)]
72pub struct AgentStatusEntry {
73 pub agent_id: String,
75 pub cli: String,
77 pub status: String,
79 pub last_seen_seconds: u64,
81 pub summary: String,
83 #[serde(skip)]
85 pub last_seen: Instant,
86 #[serde(default, skip_serializing_if = "Option::is_none")]
90 pub phase: Option<String>,
91}
92
93#[derive(Debug)]
95pub struct BrokerStateInner {
96 pub agents: HashMap<String, AgentRecord>,
98 pub agent_clis: HashMap<String, String>,
100 pub queues: HashMap<String, Vec<(u64, BrokerMessage)>>,
102 pub message_log: Vec<(u64, std::time::SystemTime, BrokerMessage)>,
104 pub republish_working_ttl: std::time::Duration,
111 pub watched_paths: HashSet<PathBuf>,
118}
119
120#[derive(Debug)]
127pub struct BrokerState {
128 inner: RwLock<BrokerStateInner>,
130 next_seq: AtomicU64,
132 pub log_path: Option<PathBuf>,
134 started_at: Instant,
136 pub learnings: Option<learnings::SharedLearnings>,
139 pub verify_on_commit_nudge: bool,
145 pub role_gating: Option<crate::opsx::RoleGatingContext>,
151 watcher_shutdown_rx: OnceLock<tokio::sync::watch::Receiver<bool>>,
159}
160
161impl BrokerState {
162 pub fn new(log_path: Option<PathBuf>) -> Self {
164 Self {
165 inner: RwLock::new(BrokerStateInner {
166 agents: HashMap::new(),
167 agent_clis: HashMap::new(),
168 queues: HashMap::new(),
169 message_log: Vec::new(),
170 republish_working_ttl: std::time::Duration::from_secs(
171 crate::config::WatcherConfig::DEFAULT_REPUBLISH_TTL_SECONDS,
172 ),
173 watched_paths: HashSet::new(),
174 }),
175 next_seq: AtomicU64::new(0),
176 log_path,
177 started_at: Instant::now(),
178 learnings: None,
179 verify_on_commit_nudge: false,
183 role_gating: None,
184 watcher_shutdown_rx: OnceLock::new(),
185 }
186 }
187
188 #[must_use]
193 pub fn with_role_gating(mut self, ctx: crate::opsx::RoleGatingContext) -> Self {
194 self.role_gating = Some(ctx);
195 self
196 }
197
198 #[must_use]
202 pub fn with_verify_on_commit_nudge(mut self, enabled: bool) -> Self {
203 self.verify_on_commit_nudge = enabled;
204 self
205 }
206
207 #[must_use]
218 pub fn with_seeded_cli(self, agent_id: &str, cli: &str) -> Self {
219 if !cli.is_empty()
220 && let Ok(mut inner) = self.inner.write()
221 {
222 inner
223 .agent_clis
224 .insert(agent_id.to_string(), cli.to_string());
225 }
226 self
227 }
228
229 pub fn attach_learnings(&mut self, aggregator: learnings::SharedLearnings) {
233 self.learnings = Some(aggregator);
234 }
235
236 pub fn set_republish_working_ttl(&self, ttl: std::time::Duration) {
240 self.write().republish_working_ttl = ttl;
241 }
242
243 pub fn set_watcher_shutdown_rx(&self, rx: tokio::sync::watch::Receiver<bool>) {
248 let _ = self.watcher_shutdown_rx.set(rx);
249 }
250
251 #[must_use]
256 pub fn watcher_shutdown_rx(&self) -> Option<tokio::sync::watch::Receiver<bool>> {
257 self.watcher_shutdown_rx.get().cloned()
258 }
259
260 pub fn register_watch_target(&self, target: &WatchTarget) -> bool {
270 let mut inner = self.write();
271 if !inner.watched_paths.insert(target.worktree_path.clone()) {
272 return false;
273 }
274 if !target.cli.is_empty() {
275 inner
276 .agent_clis
277 .insert(target.agent_id.clone(), target.cli.clone());
278 }
279 inner.queues.entry(target.agent_id.clone()).or_default();
280 true
281 }
282
283 pub fn forget_watch_target(&self, worktree_path: &std::path::Path) {
288 self.write().watched_paths.remove(worktree_path);
289 }
290
291 pub fn read(&self) -> std::sync::RwLockReadGuard<'_, BrokerStateInner> {
297 self.inner.read().expect("broker state lock poisoned")
298 }
299
300 pub fn write(&self) -> std::sync::RwLockWriteGuard<'_, BrokerStateInner> {
306 self.inner.write().expect("broker state lock poisoned")
307 }
308
309 pub fn next_seq(&self) -> u64 {
311 self.next_seq.fetch_add(1, Ordering::Relaxed) + 1
312 }
313
314 pub fn uptime_seconds(&self) -> u64 {
318 self.started_at.elapsed().as_secs()
319 }
320}
321
322#[derive(Debug, thiserror::Error)]
324pub enum BrokerError {
325 #[error(
327 "port {port} is already in use by another process — change [broker] port in .git-paw/config.toml"
328 )]
329 PortInUse {
330 port: u16,
332 source: std::io::Error,
334 },
335
336 #[error("broker probe timed out on port {port} — check for stuck processes on this port")]
338 ProbeTimeout {
339 port: u16,
341 },
342
343 #[error("failed to bind broker: {0}")]
345 BindFailed(std::io::Error),
346
347 #[error("failed to create broker runtime: {0}")]
349 RuntimeFailed(std::io::Error),
350}
351
352pub struct BrokerHandle {
358 pub state: Arc<BrokerState>,
360 runtime: Option<tokio::runtime::Runtime>,
363 shutdown_tx: Option<tokio::sync::oneshot::Sender<()>>,
365 watcher_shutdown: Option<tokio::sync::watch::Sender<bool>>,
367 pub url: String,
369 stop_flag: Arc<AtomicBool>,
371 flush_thread: Option<JoinHandle<()>>,
373 learnings_thread: Option<JoinHandle<()>>,
376}
377
378impl BrokerHandle {
379 fn reattached(url: String, state: Arc<BrokerState>) -> Self {
381 Self {
382 state,
383 runtime: None,
384 shutdown_tx: None,
385 watcher_shutdown: None,
386 url,
387 stop_flag: Arc::new(AtomicBool::new(false)),
388 flush_thread: None,
389 learnings_thread: None,
390 }
391 }
392}
393
394impl Drop for BrokerHandle {
395 fn drop(&mut self) {
396 self.stop_flag.store(true, Ordering::Release);
398 if let Some(handle) = self.flush_thread.take() {
399 let _ = handle.join();
400 }
401 if let Some(handle) = self.learnings_thread.take() {
404 let _ = handle.join();
405 }
406 if let Some(tx) = self.watcher_shutdown.take() {
408 let _ = tx.send(true);
409 }
410 if let Some(tx) = self.shutdown_tx.take() {
412 let _ = tx.send(());
413 }
414 if let Some(rt) = self.runtime.take() {
416 rt.shutdown_timeout(std::time::Duration::from_secs(2));
417 }
418 }
419}
420
421#[derive(Debug, PartialEq, Eq)]
423pub enum ProbeResult {
424 NoListener,
426 LiveBroker,
428 ForeignServer,
430 Timeout,
432}
433
434pub fn probe_broker(url: &str) -> ProbeResult {
443 probe_existing_broker(url)
444}
445
446fn probe_existing_broker(url: &str) -> ProbeResult {
447 use std::io::{Read, Write};
448 use std::net::TcpStream;
449 use std::time::Duration;
450
451 let addr = url.strip_prefix("http://").unwrap_or(url);
453
454 let socket_addr = if let Ok(a) = addr.parse() {
455 a
456 } else {
457 use std::net::ToSocketAddrs;
458 match addr.to_socket_addrs() {
459 Ok(mut addrs) => match addrs.next() {
460 Some(a) => a,
461 None => return ProbeResult::NoListener,
462 },
463 Err(_) => return ProbeResult::NoListener,
464 }
465 };
466
467 let Ok(mut stream) = TcpStream::connect_timeout(&socket_addr, Duration::from_millis(500))
468 else {
469 return ProbeResult::NoListener;
470 };
471
472 stream
473 .set_read_timeout(Some(Duration::from_millis(500)))
474 .ok();
475 stream
476 .set_write_timeout(Some(Duration::from_millis(500)))
477 .ok();
478
479 let request = format!("GET /status HTTP/1.1\r\nHost: {addr}\r\nConnection: close\r\n\r\n");
480 if stream.write_all(request.as_bytes()).is_err() {
481 return ProbeResult::Timeout;
482 }
483
484 let mut response = String::new();
485 if stream.read_to_string(&mut response).is_err() && response.is_empty() {
486 return ProbeResult::Timeout;
487 }
488
489 if response.contains("\"git_paw\":true") || response.contains("\"git_paw\": true") {
490 ProbeResult::LiveBroker
491 } else if response.starts_with("HTTP/") {
492 ProbeResult::ForeignServer
493 } else {
494 ProbeResult::Timeout
495 }
496}
497
498pub fn start_broker(
511 config: &BrokerConfig,
512 state: BrokerState,
513 watch_targets: Vec<WatchTarget>,
514) -> Result<BrokerHandle, BrokerError> {
515 start_broker_with(config, state, watch_targets, None, 60)
516}
517
518#[allow(clippy::too_many_lines)]
531pub fn start_broker_with(
532 config: &BrokerConfig,
533 state: BrokerState,
534 watch_targets: Vec<WatchTarget>,
535 conflict: Option<ConflictConfig>,
536 learnings_flush_interval_seconds: u64,
537) -> Result<BrokerHandle, BrokerError> {
538 let url = config.url();
539 let state = Arc::new(state);
540 state.set_republish_working_ttl(std::time::Duration::from_secs(
543 config.watcher.republish_working_ttl_seconds(),
544 ));
545 let stop_flag = Arc::new(AtomicBool::new(false));
546
547 match probe_existing_broker(&url) {
548 ProbeResult::LiveBroker => return Ok(BrokerHandle::reattached(url, state)),
549 ProbeResult::ForeignServer => {
550 return Err(BrokerError::PortInUse {
551 port: config.port,
552 source: std::io::Error::new(
553 std::io::ErrorKind::AddrInUse,
554 "port occupied by non-broker process",
555 ),
556 });
557 }
558 ProbeResult::Timeout => {
559 return Err(BrokerError::ProbeTimeout { port: config.port });
560 }
561 ProbeResult::NoListener => {}
562 }
563
564 let flush_thread = if state.log_path.is_some() {
566 let s = Arc::clone(&state);
567 let f = Arc::clone(&stop_flag);
568 Some(std::thread::spawn(move || {
569 delivery::flush_loop(&s, &f);
570 }))
571 } else {
572 None
573 };
574
575 let learnings_thread = if state.learnings.is_some() {
579 let s = Arc::clone(&state);
580 let f = Arc::clone(&stop_flag);
581 Some(std::thread::spawn(move || {
582 learnings_flush_loop(&s, &f, learnings_flush_interval_seconds);
583 }))
584 } else {
585 None
586 };
587
588 let runtime = tokio::runtime::Builder::new_multi_thread()
589 .enable_all()
590 .build()
591 .map_err(BrokerError::RuntimeFailed)?;
592
593 let addr: std::net::SocketAddr = format!("{}:{}", config.bind, config.port).parse().map_err(
594 |e: std::net::AddrParseError| {
595 BrokerError::BindFailed(std::io::Error::new(std::io::ErrorKind::InvalidInput, e))
596 },
597 )?;
598
599 let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::<()>();
600
601 let router = server::router(Arc::clone(&state));
602
603 let listener = runtime.block_on(async {
604 let socket = tokio::net::TcpSocket::new_v4().map_err(BrokerError::BindFailed)?;
605 socket
606 .set_reuseaddr(true)
607 .map_err(BrokerError::BindFailed)?;
608 socket.bind(addr).map_err(BrokerError::BindFailed)?;
609 socket.listen(1024).map_err(BrokerError::BindFailed)
610 })?;
611
612 runtime.spawn(async {
615 let _ = tokio::signal::ctrl_c().await;
616 });
617
618 runtime.spawn(async move {
619 axum::serve(listener, router)
620 .with_graceful_shutdown(async {
621 let _ = shutdown_rx.await;
622 })
623 .await
624 .ok();
625 });
626
627 {
633 let mut inner = state.write();
634 for target in &watch_targets {
635 inner
636 .agent_clis
637 .insert(target.agent_id.clone(), target.cli.clone());
638 inner.queues.entry(target.agent_id.clone()).or_default();
639 inner.watched_paths.insert(target.worktree_path.clone());
641 }
642 }
643
644 let (watcher_tx, watcher_rx) = tokio::sync::watch::channel(false);
649 state.set_watcher_shutdown_rx(watcher_rx.clone());
651 for target in watch_targets {
652 let s = Arc::clone(&state);
653 let rx = watcher_rx.clone();
654 runtime.spawn(watcher::watch_worktree(s, target, rx));
655 }
656 if let Some(conflict_cfg) = conflict {
657 let s = Arc::clone(&state);
658 let rx = watcher_rx.clone();
659 runtime.spawn(conflict::run_detector_loop(s, conflict_cfg, rx));
660 }
661
662 Ok(BrokerHandle {
663 state,
664 runtime: Some(runtime),
665 shutdown_tx: Some(shutdown_tx),
666 watcher_shutdown: Some(watcher_tx),
667 url,
668 stop_flag,
669 flush_thread,
670 learnings_thread,
671 })
672}
673
674fn learnings_flush_loop(
680 state: &Arc<BrokerState>,
681 stop: &Arc<AtomicBool>,
682 flush_interval_seconds: u64,
683) {
684 let Some(aggregator) = state.learnings.clone() else {
685 return;
686 };
687 let interval = std::time::Duration::from_secs(flush_interval_seconds.max(1));
688 let tick = std::time::Duration::from_millis(100);
689
690 loop {
691 let mut elapsed = std::time::Duration::ZERO;
692 while elapsed < interval {
693 if stop.load(Ordering::Acquire) {
694 if let Ok(mut agg) = aggregator.lock() {
695 let _ = agg.flush_at_shutdown();
696 }
697 publish_pending_learnings(state, &aggregator);
698 return;
699 }
700 std::thread::sleep(tick);
701 elapsed += tick;
702 }
703 if let Ok(mut agg) = aggregator.lock() {
704 let _ = agg.flush();
705 }
706 publish_pending_learnings(state, &aggregator);
707 }
708}
709
710fn publish_pending_learnings(state: &Arc<BrokerState>, aggregator: &learnings::SharedLearnings) {
719 let records = match aggregator.lock() {
720 Ok(mut agg) => agg.take_pending_publish(),
721 Err(_) => return,
722 };
723 for record in &records {
724 delivery::publish_message(state, &BrokerMessage::from(record));
725 }
726}
727
728#[cfg(test)]
729mod tests {
730 use super::*;
731
732 #[test]
733 fn broker_state_new_is_empty() {
734 let state = BrokerState::new(None);
735 let inner = state.read();
736 assert!(inner.agents.is_empty());
737 assert!(inner.queues.is_empty());
738 assert!(inner.message_log.is_empty());
739 }
740
741 #[test]
742 fn register_watch_target_is_idempotent_and_seeds_roster() {
743 let state = BrokerState::new(None);
744 let target = WatchTarget {
745 agent_id: "feat-hot".to_string(),
746 cli: "claude".to_string(),
747 worktree_path: PathBuf::from("/tmp/feat-hot"),
748 };
749 assert!(
751 state.register_watch_target(&target),
752 "first registration must return true"
753 );
754 assert!(
756 !state.register_watch_target(&target),
757 "duplicate registration must return false"
758 );
759 let inner = state.read();
760 assert_eq!(inner.watched_paths.len(), 1, "path recorded exactly once");
761 assert_eq!(
762 inner.agent_clis.get("feat-hot").map(String::as_str),
763 Some("claude"),
764 "registration seeds the CLI label"
765 );
766 assert!(
767 inner.queues.contains_key("feat-hot"),
768 "registration seeds the inbox queue"
769 );
770 }
771
772 #[test]
773 fn forget_watch_target_allows_re_registration() {
774 let state = BrokerState::new(None);
775 let target = WatchTarget {
776 agent_id: "feat-hot".to_string(),
777 cli: "claude".to_string(),
778 worktree_path: PathBuf::from("/tmp/feat-hot"),
779 };
780 assert!(state.register_watch_target(&target));
781 state.forget_watch_target(&target.worktree_path);
782 assert!(
783 state.register_watch_target(&target),
784 "after forgetting, the same path registers fresh again"
785 );
786 }
787
788 #[test]
789 fn next_seq_starts_at_one() {
790 let state = BrokerState::new(None);
791 assert_eq!(state.next_seq(), 1);
792 assert_eq!(state.next_seq(), 2);
793 assert_eq!(state.next_seq(), 3);
794 }
795
796 #[test]
797 fn probe_no_listener() {
798 let result = probe_existing_broker("http://127.0.0.1:19999");
800 assert_eq!(result, ProbeResult::NoListener);
801 }
802
803 #[test]
804 fn reattached_handle_has_no_runtime() {
805 let state = Arc::new(BrokerState::new(None));
806 let h = BrokerHandle::reattached("http://127.0.0.1:9119".into(), state);
807 assert!(h.runtime.is_none());
808 assert!(h.shutdown_tx.is_none());
809 assert!(h.flush_thread.is_none());
810 }
811
812 #[test]
813 fn start_broker_on_free_port() {
814 let config = BrokerConfig {
815 enabled: true,
816 #[allow(clippy::cast_possible_truncation)]
818 port: 19_000 + (std::process::id() as u16 % 1000),
819 bind: "127.0.0.1".to_string(),
820 ..Default::default()
821 };
822 let state = BrokerState::new(None);
823 let handle = start_broker(&config, state, Vec::new());
824 if let Ok(h) = handle {
826 assert!(h.url.contains(&config.port.to_string()));
827 drop(h);
828 }
829 }
830
831 #[test]
832 fn start_broker_no_log_path_no_flush_thread() {
833 let config = BrokerConfig {
834 enabled: true,
835 #[allow(clippy::cast_possible_truncation)]
836 port: 19_100 + (std::process::id() as u16 % 100),
837 bind: "127.0.0.1".to_string(),
838 ..Default::default()
839 };
840 let state = BrokerState::new(None);
841 if let Ok(handle) = start_broker(&config, state, Vec::new()) {
842 assert!(handle.flush_thread.is_none());
843 drop(handle);
844 }
845 }
846
847 #[test]
848 fn start_broker_with_log_path_spawns_flush_thread() {
849 let tmp = tempfile::tempdir().unwrap();
850 let log_path = tmp.path().join("broker.log");
851 let config = BrokerConfig {
852 enabled: true,
853 #[allow(clippy::cast_possible_truncation)]
854 port: 19_200 + (std::process::id() as u16 % 100),
855 bind: "127.0.0.1".to_string(),
856 ..Default::default()
857 };
858 let state = BrokerState::new(Some(log_path));
859 if let Ok(handle) = start_broker(&config, state, Vec::new()) {
860 assert!(handle.flush_thread.is_some());
861 drop(handle);
862 }
863 }
864
865 fn conflict_feedback(target: &str, other: &str) -> BrokerMessage {
868 BrokerMessage::Feedback {
869 agent_id: target.to_string(),
870 payload: messages::FeedbackPayload {
871 from: "supervisor".to_string(),
872 errors: vec![format!(
873 "[conflict-detector] in-flight conflict with {other} on src/a.rs"
874 )],
875 },
876 }
877 }
878
879 fn learning_payloads(state: &Arc<BrokerState>) -> Vec<messages::LearningPayload> {
880 state
881 .read()
882 .message_log
883 .iter()
884 .filter_map(|(_, _, m)| match m {
885 BrokerMessage::Learning { payload } => Some(payload.clone()),
886 _ => None,
887 })
888 .collect()
889 }
890
891 fn tick(state: &Arc<BrokerState>) {
894 let aggregator = state.learnings.clone().expect("aggregator attached");
895 if let Ok(mut a) = aggregator.lock() {
896 a.flush().unwrap();
897 }
898 publish_pending_learnings(state, &aggregator);
899 }
900
901 fn state_with_aggregator(path: PathBuf, broker_publish: bool) -> Arc<BrokerState> {
902 let mut agg = learnings::LearningsAggregator::new(path);
903 agg.set_broker_publish(broker_publish);
904 agg.register_agent("feat-x");
905 agg.register_agent("feat-y");
906 let mut state = BrokerState::new(None);
907 state.attach_learnings(Arc::new(std::sync::Mutex::new(agg)));
908 Arc::new(state)
909 }
910
911 #[test]
914 fn dual_output_publishes_learning_and_writes_file_when_broker_on() {
915 let tmp = tempfile::tempdir().unwrap();
916 let path = tmp.path().join("session-learnings.md");
917 let state = state_with_aggregator(path.clone(), true);
918
919 delivery::publish_message(&state, &conflict_feedback("feat-x", "feat-y"));
920 tick(&state);
921
922 let md = std::fs::read_to_string(&path).unwrap();
923 assert!(
924 md.contains("### Conflict events"),
925 "file missing conflict:\n{md}"
926 );
927
928 let learnings = learning_payloads(&state);
929 assert_eq!(learnings.len(), 1, "expected one agent.learning record");
930 assert_eq!(learnings[0].category, "conflict_event");
931 assert_eq!(learnings[0].id.len(), 16);
932 assert!(
933 md.contains(&learnings[0].title),
934 "file title must match broker record title: {}",
935 learnings[0].title
936 );
937 }
938
939 #[test]
942 fn no_broker_publish_when_disabled_but_file_still_written() {
943 let tmp = tempfile::tempdir().unwrap();
944 let path = tmp.path().join("session-learnings.md");
945 let state = state_with_aggregator(path.clone(), false);
946
947 delivery::publish_message(&state, &conflict_feedback("feat-x", "feat-y"));
948 tick(&state);
949
950 let md = std::fs::read_to_string(&path).unwrap();
951 assert!(md.contains("### Conflict events"));
952 assert!(
953 learning_payloads(&state).is_empty(),
954 "no agent.learning record should be published when broker publish is off"
955 );
956 }
957
958 #[test]
961 fn re_ticking_does_not_duplicate_learning_records() {
962 let tmp = tempfile::tempdir().unwrap();
963 let path = tmp.path().join("session-learnings.md");
964 let state = state_with_aggregator(path, true);
965
966 delivery::publish_message(&state, &conflict_feedback("feat-x", "feat-y"));
967 tick(&state);
968 tick(&state); assert_eq!(
971 learning_payloads(&state).len(),
972 1,
973 "the conflict record must be published exactly once"
974 );
975 }
976
977 #[test]
979 fn branch_scoped_learning_is_routed_to_branch_inbox() {
980 let tmp = tempfile::tempdir().unwrap();
981 let path = tmp.path().join("session-learnings.md");
982 let state = state_with_aggregator(path, true);
983
984 delivery::publish_message(
986 &state,
987 &BrokerMessage::Blocked {
988 agent_id: "feat-x".to_string(),
989 payload: messages::BlockedPayload {
990 needs: "types".to_string(),
991 from: "feat-y".to_string(),
992 },
993 },
994 );
995 delivery::publish_message(
996 &state,
997 &BrokerMessage::Artifact {
998 agent_id: "feat-x".to_string(),
999 payload: messages::ArtifactPayload {
1000 status: "done".to_string(),
1001 exports: vec![],
1002 modified_files: vec![],
1003 },
1004 },
1005 );
1006 tick(&state);
1007
1008 let (msgs, _) = delivery::poll_messages(&state, "feat-x", 0);
1009 assert!(
1010 msgs.iter().any(|m| matches!(
1011 m,
1012 BrokerMessage::Learning { payload } if payload.category == "stuck_duration"
1013 )),
1014 "stuck_duration learning should land in feat-x's inbox"
1015 );
1016 }
1017}