1pub mod conflict;
16pub mod delivery;
17pub mod learnings;
18pub mod messages;
19pub mod publish;
20pub mod server;
21pub mod watcher;
22
23use std::collections::{HashMap, HashSet};
24use std::path::PathBuf;
25use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
26use std::sync::{Arc, OnceLock, RwLock};
27use std::thread::JoinHandle;
28use std::time::Instant;
29
30use serde::Serialize;
31
32use crate::config::{BrokerConfig, ConflictConfig};
33pub use messages::BrokerMessage;
34
35#[derive(Debug, Clone)]
39pub struct WatchTarget {
40 pub agent_id: String,
42 pub cli: String,
44 pub worktree_path: PathBuf,
46}
47
48#[derive(Debug, Clone)]
50pub struct AgentRecord {
51 pub agent_id: String,
53 pub status: String,
55 pub last_seen: Instant,
57 pub last_message: Option<BrokerMessage>,
59 pub last_committed_at: Option<Instant>,
67}
68
69#[derive(Debug, Clone, Serialize)]
72pub struct AgentStatusEntry {
73 pub agent_id: String,
75 pub cli: String,
77 pub status: String,
79 pub last_seen_seconds: u64,
81 #[serde(skip)]
83 pub last_seen: Instant,
84 #[serde(default, skip_serializing_if = "Option::is_none")]
88 pub phase: Option<String>,
89}
90
91#[derive(Debug)]
93pub struct BrokerStateInner {
94 pub agents: HashMap<String, AgentRecord>,
96 pub agent_clis: HashMap<String, String>,
98 pub queues: HashMap<String, Vec<(u64, BrokerMessage)>>,
100 pub message_log: Vec<(u64, std::time::SystemTime, BrokerMessage)>,
102 pub republish_working_ttl: std::time::Duration,
109 pub watched_paths: HashSet<PathBuf>,
116}
117
118#[derive(Debug)]
125pub struct BrokerState {
126 inner: RwLock<BrokerStateInner>,
128 next_seq: AtomicU64,
130 pub log_path: Option<PathBuf>,
132 started_at: Instant,
134 pub learnings: Option<learnings::SharedLearnings>,
137 pub verify_on_commit_nudge: bool,
143 pub role_gating: Option<crate::opsx::RoleGatingContext>,
149 watcher_shutdown_rx: OnceLock<tokio::sync::watch::Receiver<bool>>,
157}
158
159impl BrokerState {
160 pub fn new(log_path: Option<PathBuf>) -> Self {
162 Self {
163 inner: RwLock::new(BrokerStateInner {
164 agents: HashMap::new(),
165 agent_clis: HashMap::new(),
166 queues: HashMap::new(),
167 message_log: Vec::new(),
168 republish_working_ttl: std::time::Duration::from_secs(
169 crate::config::WatcherConfig::DEFAULT_REPUBLISH_TTL_SECONDS,
170 ),
171 watched_paths: HashSet::new(),
172 }),
173 next_seq: AtomicU64::new(0),
174 log_path,
175 started_at: Instant::now(),
176 learnings: None,
177 verify_on_commit_nudge: false,
181 role_gating: None,
182 watcher_shutdown_rx: OnceLock::new(),
183 }
184 }
185
186 #[must_use]
191 pub fn with_role_gating(mut self, ctx: crate::opsx::RoleGatingContext) -> Self {
192 self.role_gating = Some(ctx);
193 self
194 }
195
196 #[must_use]
200 pub fn with_verify_on_commit_nudge(mut self, enabled: bool) -> Self {
201 self.verify_on_commit_nudge = enabled;
202 self
203 }
204
205 #[must_use]
216 pub fn with_seeded_cli(self, agent_id: &str, cli: &str) -> Self {
217 if !cli.is_empty()
218 && let Ok(mut inner) = self.inner.write()
219 {
220 inner
221 .agent_clis
222 .insert(agent_id.to_string(), cli.to_string());
223 }
224 self
225 }
226
227 pub fn attach_learnings(&mut self, aggregator: learnings::SharedLearnings) {
231 self.learnings = Some(aggregator);
232 }
233
234 pub fn set_republish_working_ttl(&self, ttl: std::time::Duration) {
238 self.write().republish_working_ttl = ttl;
239 }
240
241 pub fn set_watcher_shutdown_rx(&self, rx: tokio::sync::watch::Receiver<bool>) {
246 let _ = self.watcher_shutdown_rx.set(rx);
247 }
248
249 #[must_use]
254 pub fn watcher_shutdown_rx(&self) -> Option<tokio::sync::watch::Receiver<bool>> {
255 self.watcher_shutdown_rx.get().cloned()
256 }
257
258 pub fn register_watch_target(&self, target: &WatchTarget) -> bool {
268 let mut inner = self.write();
269 if !inner.watched_paths.insert(target.worktree_path.clone()) {
270 return false;
271 }
272 if !target.cli.is_empty() {
273 inner
274 .agent_clis
275 .insert(target.agent_id.clone(), target.cli.clone());
276 }
277 inner.queues.entry(target.agent_id.clone()).or_default();
278 true
279 }
280
281 pub fn forget_watch_target(&self, worktree_path: &std::path::Path) {
286 self.write().watched_paths.remove(worktree_path);
287 }
288
289 pub fn read(&self) -> std::sync::RwLockReadGuard<'_, BrokerStateInner> {
295 self.inner.read().expect("broker state lock poisoned")
296 }
297
298 pub fn write(&self) -> std::sync::RwLockWriteGuard<'_, BrokerStateInner> {
304 self.inner.write().expect("broker state lock poisoned")
305 }
306
307 pub fn next_seq(&self) -> u64 {
309 self.next_seq.fetch_add(1, Ordering::Relaxed) + 1
310 }
311
312 pub fn uptime_seconds(&self) -> u64 {
316 self.started_at.elapsed().as_secs()
317 }
318}
319
320#[derive(Debug, thiserror::Error)]
322pub enum BrokerError {
323 #[error(
325 "port {port} is already in use by another process — change [broker] port in .git-paw/config.toml"
326 )]
327 PortInUse {
328 port: u16,
330 source: std::io::Error,
332 },
333
334 #[error("broker probe timed out on port {port} — check for stuck processes on this port")]
336 ProbeTimeout {
337 port: u16,
339 },
340
341 #[error("failed to bind broker: {0}")]
343 BindFailed(std::io::Error),
344
345 #[error("failed to create broker runtime: {0}")]
347 RuntimeFailed(std::io::Error),
348}
349
350pub struct BrokerHandle {
356 pub state: Arc<BrokerState>,
358 runtime: Option<tokio::runtime::Runtime>,
361 shutdown_tx: Option<tokio::sync::oneshot::Sender<()>>,
363 watcher_shutdown: Option<tokio::sync::watch::Sender<bool>>,
365 pub url: String,
367 stop_flag: Arc<AtomicBool>,
369 flush_thread: Option<JoinHandle<()>>,
371 learnings_thread: Option<JoinHandle<()>>,
374}
375
376impl BrokerHandle {
377 fn reattached(url: String, state: Arc<BrokerState>) -> Self {
379 Self {
380 state,
381 runtime: None,
382 shutdown_tx: None,
383 watcher_shutdown: None,
384 url,
385 stop_flag: Arc::new(AtomicBool::new(false)),
386 flush_thread: None,
387 learnings_thread: None,
388 }
389 }
390}
391
392impl Drop for BrokerHandle {
393 fn drop(&mut self) {
394 self.stop_flag.store(true, Ordering::Release);
396 if let Some(handle) = self.flush_thread.take() {
397 let _ = handle.join();
398 }
399 if let Some(handle) = self.learnings_thread.take() {
402 let _ = handle.join();
403 }
404 if let Some(tx) = self.watcher_shutdown.take() {
406 let _ = tx.send(true);
407 }
408 if let Some(tx) = self.shutdown_tx.take() {
410 let _ = tx.send(());
411 }
412 if let Some(rt) = self.runtime.take() {
414 rt.shutdown_timeout(std::time::Duration::from_secs(2));
415 }
416 }
417}
418
419#[derive(Debug, PartialEq, Eq)]
421pub enum ProbeResult {
422 NoListener,
424 LiveBroker,
426 ForeignServer,
428 Timeout,
430}
431
432pub fn probe_broker(url: &str) -> ProbeResult {
441 probe_existing_broker(url)
442}
443
444fn probe_existing_broker(url: &str) -> ProbeResult {
445 use std::io::{Read, Write};
446 use std::net::TcpStream;
447 use std::time::Duration;
448
449 let addr = url.strip_prefix("http://").unwrap_or(url);
451
452 let socket_addr = if let Ok(a) = addr.parse() {
453 a
454 } else {
455 use std::net::ToSocketAddrs;
456 match addr.to_socket_addrs() {
457 Ok(mut addrs) => match addrs.next() {
458 Some(a) => a,
459 None => return ProbeResult::NoListener,
460 },
461 Err(_) => return ProbeResult::NoListener,
462 }
463 };
464
465 let Ok(mut stream) = TcpStream::connect_timeout(&socket_addr, Duration::from_millis(500))
466 else {
467 return ProbeResult::NoListener;
468 };
469
470 stream
471 .set_read_timeout(Some(Duration::from_millis(500)))
472 .ok();
473 stream
474 .set_write_timeout(Some(Duration::from_millis(500)))
475 .ok();
476
477 let request = format!("GET /status HTTP/1.1\r\nHost: {addr}\r\nConnection: close\r\n\r\n");
478 if stream.write_all(request.as_bytes()).is_err() {
479 return ProbeResult::Timeout;
480 }
481
482 let mut response = String::new();
483 if stream.read_to_string(&mut response).is_err() && response.is_empty() {
484 return ProbeResult::Timeout;
485 }
486
487 if response.contains("\"git_paw\":true") || response.contains("\"git_paw\": true") {
488 ProbeResult::LiveBroker
489 } else if response.starts_with("HTTP/") {
490 ProbeResult::ForeignServer
491 } else {
492 ProbeResult::Timeout
493 }
494}
495
496pub fn start_broker(
509 config: &BrokerConfig,
510 state: BrokerState,
511 watch_targets: Vec<WatchTarget>,
512) -> Result<BrokerHandle, BrokerError> {
513 start_broker_with(config, state, watch_targets, None, 60)
514}
515
516#[allow(clippy::too_many_lines)]
529pub fn start_broker_with(
530 config: &BrokerConfig,
531 state: BrokerState,
532 watch_targets: Vec<WatchTarget>,
533 conflict: Option<ConflictConfig>,
534 learnings_flush_interval_seconds: u64,
535) -> Result<BrokerHandle, BrokerError> {
536 let url = config.url();
537 let state = Arc::new(state);
538 state.set_republish_working_ttl(std::time::Duration::from_secs(
541 config.watcher.republish_working_ttl_seconds(),
542 ));
543 let stop_flag = Arc::new(AtomicBool::new(false));
544
545 match probe_existing_broker(&url) {
546 ProbeResult::LiveBroker => return Ok(BrokerHandle::reattached(url, state)),
547 ProbeResult::ForeignServer => {
548 return Err(BrokerError::PortInUse {
549 port: config.port,
550 source: std::io::Error::new(
551 std::io::ErrorKind::AddrInUse,
552 "port occupied by non-broker process",
553 ),
554 });
555 }
556 ProbeResult::Timeout => {
557 return Err(BrokerError::ProbeTimeout { port: config.port });
558 }
559 ProbeResult::NoListener => {}
560 }
561
562 let flush_thread = if state.log_path.is_some() {
564 let s = Arc::clone(&state);
565 let f = Arc::clone(&stop_flag);
566 Some(std::thread::spawn(move || {
567 delivery::flush_loop(&s, &f);
568 }))
569 } else {
570 None
571 };
572
573 let learnings_thread = if state.learnings.is_some() {
577 let s = Arc::clone(&state);
578 let f = Arc::clone(&stop_flag);
579 Some(std::thread::spawn(move || {
580 learnings_flush_loop(&s, &f, learnings_flush_interval_seconds);
581 }))
582 } else {
583 None
584 };
585
586 let runtime = tokio::runtime::Builder::new_multi_thread()
587 .enable_all()
588 .build()
589 .map_err(BrokerError::RuntimeFailed)?;
590
591 let addr: std::net::SocketAddr = format!("{}:{}", config.bind, config.port).parse().map_err(
592 |e: std::net::AddrParseError| {
593 BrokerError::BindFailed(std::io::Error::new(std::io::ErrorKind::InvalidInput, e))
594 },
595 )?;
596
597 let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::<()>();
598
599 let router = server::router(Arc::clone(&state));
600
601 let listener = runtime.block_on(async {
602 let socket = tokio::net::TcpSocket::new_v4().map_err(BrokerError::BindFailed)?;
603 socket
604 .set_reuseaddr(true)
605 .map_err(BrokerError::BindFailed)?;
606 socket.bind(addr).map_err(BrokerError::BindFailed)?;
607 socket.listen(1024).map_err(BrokerError::BindFailed)
608 })?;
609
610 runtime.spawn(async {
613 let _ = tokio::signal::ctrl_c().await;
614 });
615
616 runtime.spawn(async move {
617 axum::serve(listener, router)
618 .with_graceful_shutdown(async {
619 let _ = shutdown_rx.await;
620 })
621 .await
622 .ok();
623 });
624
625 {
631 let mut inner = state.write();
632 for target in &watch_targets {
633 inner
634 .agent_clis
635 .insert(target.agent_id.clone(), target.cli.clone());
636 inner.queues.entry(target.agent_id.clone()).or_default();
637 inner.watched_paths.insert(target.worktree_path.clone());
639 }
640 }
641
642 let (watcher_tx, watcher_rx) = tokio::sync::watch::channel(false);
647 state.set_watcher_shutdown_rx(watcher_rx.clone());
649 for target in watch_targets {
650 let s = Arc::clone(&state);
651 let rx = watcher_rx.clone();
652 runtime.spawn(watcher::watch_worktree(s, target, rx));
653 }
654 if let Some(conflict_cfg) = conflict {
655 let s = Arc::clone(&state);
656 let rx = watcher_rx.clone();
657 runtime.spawn(conflict::run_detector_loop(s, conflict_cfg, rx));
658 }
659
660 Ok(BrokerHandle {
661 state,
662 runtime: Some(runtime),
663 shutdown_tx: Some(shutdown_tx),
664 watcher_shutdown: Some(watcher_tx),
665 url,
666 stop_flag,
667 flush_thread,
668 learnings_thread,
669 })
670}
671
672fn learnings_flush_loop(
678 state: &Arc<BrokerState>,
679 stop: &Arc<AtomicBool>,
680 flush_interval_seconds: u64,
681) {
682 let Some(aggregator) = state.learnings.clone() else {
683 return;
684 };
685 let interval = std::time::Duration::from_secs(flush_interval_seconds.max(1));
686 let tick = std::time::Duration::from_millis(100);
687
688 loop {
689 let mut elapsed = std::time::Duration::ZERO;
690 while elapsed < interval {
691 if stop.load(Ordering::Acquire) {
692 if let Ok(mut agg) = aggregator.lock() {
693 let _ = agg.flush_at_shutdown();
694 }
695 publish_pending_learnings(state, &aggregator);
696 return;
697 }
698 std::thread::sleep(tick);
699 elapsed += tick;
700 }
701 if let Ok(mut agg) = aggregator.lock() {
702 let _ = agg.flush();
703 }
704 publish_pending_learnings(state, &aggregator);
705 }
706}
707
708fn publish_pending_learnings(state: &Arc<BrokerState>, aggregator: &learnings::SharedLearnings) {
717 let records = match aggregator.lock() {
718 Ok(mut agg) => agg.take_pending_publish(),
719 Err(_) => return,
720 };
721 for record in &records {
722 delivery::publish_message(state, &BrokerMessage::from(record));
723 }
724}
725
726#[cfg(test)]
727mod tests {
728 use super::*;
729
730 #[test]
731 fn broker_state_new_is_empty() {
732 let state = BrokerState::new(None);
733 let inner = state.read();
734 assert!(inner.agents.is_empty());
735 assert!(inner.queues.is_empty());
736 assert!(inner.message_log.is_empty());
737 }
738
739 #[test]
740 fn register_watch_target_is_idempotent_and_seeds_roster() {
741 let state = BrokerState::new(None);
742 let target = WatchTarget {
743 agent_id: "feat-hot".to_string(),
744 cli: "claude".to_string(),
745 worktree_path: PathBuf::from("/tmp/feat-hot"),
746 };
747 assert!(
749 state.register_watch_target(&target),
750 "first registration must return true"
751 );
752 assert!(
754 !state.register_watch_target(&target),
755 "duplicate registration must return false"
756 );
757 let inner = state.read();
758 assert_eq!(inner.watched_paths.len(), 1, "path recorded exactly once");
759 assert_eq!(
760 inner.agent_clis.get("feat-hot").map(String::as_str),
761 Some("claude"),
762 "registration seeds the CLI label"
763 );
764 assert!(
765 inner.queues.contains_key("feat-hot"),
766 "registration seeds the inbox queue"
767 );
768 }
769
770 #[test]
771 fn forget_watch_target_allows_re_registration() {
772 let state = BrokerState::new(None);
773 let target = WatchTarget {
774 agent_id: "feat-hot".to_string(),
775 cli: "claude".to_string(),
776 worktree_path: PathBuf::from("/tmp/feat-hot"),
777 };
778 assert!(state.register_watch_target(&target));
779 state.forget_watch_target(&target.worktree_path);
780 assert!(
781 state.register_watch_target(&target),
782 "after forgetting, the same path registers fresh again"
783 );
784 }
785
786 #[test]
787 fn next_seq_starts_at_one() {
788 let state = BrokerState::new(None);
789 assert_eq!(state.next_seq(), 1);
790 assert_eq!(state.next_seq(), 2);
791 assert_eq!(state.next_seq(), 3);
792 }
793
794 #[test]
795 fn probe_no_listener() {
796 let result = probe_existing_broker("http://127.0.0.1:19999");
798 assert_eq!(result, ProbeResult::NoListener);
799 }
800
801 #[test]
802 fn reattached_handle_has_no_runtime() {
803 let state = Arc::new(BrokerState::new(None));
804 let h = BrokerHandle::reattached("http://127.0.0.1:9119".into(), state);
805 assert!(h.runtime.is_none());
806 assert!(h.shutdown_tx.is_none());
807 assert!(h.flush_thread.is_none());
808 }
809
810 #[test]
811 fn start_broker_on_free_port() {
812 let config = BrokerConfig {
813 enabled: true,
814 #[allow(clippy::cast_possible_truncation)]
816 port: 19_000 + (std::process::id() as u16 % 1000),
817 bind: "127.0.0.1".to_string(),
818 ..Default::default()
819 };
820 let state = BrokerState::new(None);
821 let handle = start_broker(&config, state, Vec::new());
822 if let Ok(h) = handle {
824 assert!(h.url.contains(&config.port.to_string()));
825 drop(h);
826 }
827 }
828
829 #[test]
830 fn start_broker_no_log_path_no_flush_thread() {
831 let config = BrokerConfig {
832 enabled: true,
833 #[allow(clippy::cast_possible_truncation)]
834 port: 19_100 + (std::process::id() as u16 % 100),
835 bind: "127.0.0.1".to_string(),
836 ..Default::default()
837 };
838 let state = BrokerState::new(None);
839 if let Ok(handle) = start_broker(&config, state, Vec::new()) {
840 assert!(handle.flush_thread.is_none());
841 drop(handle);
842 }
843 }
844
845 #[test]
846 fn start_broker_with_log_path_spawns_flush_thread() {
847 let tmp = tempfile::tempdir().unwrap();
848 let log_path = tmp.path().join("broker.log");
849 let config = BrokerConfig {
850 enabled: true,
851 #[allow(clippy::cast_possible_truncation)]
852 port: 19_200 + (std::process::id() as u16 % 100),
853 bind: "127.0.0.1".to_string(),
854 ..Default::default()
855 };
856 let state = BrokerState::new(Some(log_path));
857 if let Ok(handle) = start_broker(&config, state, Vec::new()) {
858 assert!(handle.flush_thread.is_some());
859 drop(handle);
860 }
861 }
862
863 fn conflict_feedback(target: &str, other: &str) -> BrokerMessage {
866 BrokerMessage::Feedback {
867 agent_id: target.to_string(),
868 payload: messages::FeedbackPayload {
869 from: "supervisor".to_string(),
870 errors: vec![format!(
871 "[conflict-detector] in-flight conflict with {other} on src/a.rs"
872 )],
873 },
874 }
875 }
876
877 fn learning_payloads(state: &Arc<BrokerState>) -> Vec<messages::LearningPayload> {
878 state
879 .read()
880 .message_log
881 .iter()
882 .filter_map(|(_, _, m)| match m {
883 BrokerMessage::Learning { payload } => Some(payload.clone()),
884 _ => None,
885 })
886 .collect()
887 }
888
889 fn tick(state: &Arc<BrokerState>) {
892 let aggregator = state.learnings.clone().expect("aggregator attached");
893 if let Ok(mut a) = aggregator.lock() {
894 a.flush().unwrap();
895 }
896 publish_pending_learnings(state, &aggregator);
897 }
898
899 fn state_with_aggregator(path: PathBuf, broker_publish: bool) -> Arc<BrokerState> {
900 let mut agg = learnings::LearningsAggregator::new(path);
901 agg.set_broker_publish(broker_publish);
902 agg.register_agent("feat-x");
903 agg.register_agent("feat-y");
904 let mut state = BrokerState::new(None);
905 state.attach_learnings(Arc::new(std::sync::Mutex::new(agg)));
906 Arc::new(state)
907 }
908
909 #[test]
912 fn dual_output_publishes_learning_and_writes_file_when_broker_on() {
913 let tmp = tempfile::tempdir().unwrap();
914 let path = tmp.path().join("session-learnings.md");
915 let state = state_with_aggregator(path.clone(), true);
916
917 delivery::publish_message(&state, &conflict_feedback("feat-x", "feat-y"));
918 tick(&state);
919
920 let md = std::fs::read_to_string(&path).unwrap();
921 assert!(
922 md.contains("### Conflict events"),
923 "file missing conflict:\n{md}"
924 );
925
926 let learnings = learning_payloads(&state);
927 assert_eq!(learnings.len(), 1, "expected one agent.learning record");
928 assert_eq!(learnings[0].category, "conflict_event");
929 assert_eq!(learnings[0].id.len(), 16);
930 assert!(
931 md.contains(&learnings[0].title),
932 "file title must match broker record title: {}",
933 learnings[0].title
934 );
935 }
936
937 #[test]
940 fn no_broker_publish_when_disabled_but_file_still_written() {
941 let tmp = tempfile::tempdir().unwrap();
942 let path = tmp.path().join("session-learnings.md");
943 let state = state_with_aggregator(path.clone(), false);
944
945 delivery::publish_message(&state, &conflict_feedback("feat-x", "feat-y"));
946 tick(&state);
947
948 let md = std::fs::read_to_string(&path).unwrap();
949 assert!(md.contains("### Conflict events"));
950 assert!(
951 learning_payloads(&state).is_empty(),
952 "no agent.learning record should be published when broker publish is off"
953 );
954 }
955
956 #[test]
959 fn re_ticking_does_not_duplicate_learning_records() {
960 let tmp = tempfile::tempdir().unwrap();
961 let path = tmp.path().join("session-learnings.md");
962 let state = state_with_aggregator(path, true);
963
964 delivery::publish_message(&state, &conflict_feedback("feat-x", "feat-y"));
965 tick(&state);
966 tick(&state); assert_eq!(
969 learning_payloads(&state).len(),
970 1,
971 "the conflict record must be published exactly once"
972 );
973 }
974
975 #[test]
977 fn branch_scoped_learning_is_routed_to_branch_inbox() {
978 let tmp = tempfile::tempdir().unwrap();
979 let path = tmp.path().join("session-learnings.md");
980 let state = state_with_aggregator(path, true);
981
982 delivery::publish_message(
984 &state,
985 &BrokerMessage::Blocked {
986 agent_id: "feat-x".to_string(),
987 payload: messages::BlockedPayload {
988 needs: "types".to_string(),
989 from: "feat-y".to_string(),
990 },
991 },
992 );
993 delivery::publish_message(
994 &state,
995 &BrokerMessage::Artifact {
996 agent_id: "feat-x".to_string(),
997 payload: messages::ArtifactPayload {
998 status: "done".to_string(),
999 exports: vec![],
1000 modified_files: vec![],
1001 },
1002 },
1003 );
1004 tick(&state);
1005
1006 let (msgs, _) = delivery::poll_messages(&state, "feat-x", 0);
1007 assert!(
1008 msgs.iter().any(|m| matches!(
1009 m,
1010 BrokerMessage::Learning { payload } if payload.category == "stuck_duration"
1011 )),
1012 "stuck_duration learning should land in feat-x's inbox"
1013 );
1014 }
1015}