1use std::collections::HashMap;
13use std::path::Path;
14
15use crate::broker::messages::BrokerMessage;
16use crate::broker::publish::{build_status_message, publish_to_broker_http};
17use crate::config::BrokerConfig;
18use crate::error::PawError;
19use crate::git;
20use crate::session::Session;
21use crate::summary::TestResult;
22
23const MERGE_TARGET_BRANCH: &str = "main";
25
26pub fn build_dependency_graph(messages: &[(u64, BrokerMessage)]) -> HashMap<String, Vec<String>> {
32 let mut graph: HashMap<String, Vec<String>> = HashMap::new();
33 for (_, msg) in messages {
34 if let BrokerMessage::Blocked { agent_id, payload } = msg {
35 let dep = payload.from.clone();
36 let dependent = agent_id.clone();
37 graph.entry(dep).or_default().push(dependent);
38 }
39 }
40 graph
41}
42
43pub fn topological_merge_order<S: std::hash::BuildHasher>(
50 graph: &HashMap<String, Vec<String>, S>,
51 all_agents: &[String],
52) -> Vec<String> {
53 let mut in_degree: HashMap<String, usize> = HashMap::new();
55 for agent in all_agents {
56 in_degree.entry(agent.clone()).or_insert(0);
57 }
58 for dependents in graph.values() {
59 for dependent in dependents {
60 *in_degree.entry(dependent.clone()).or_insert(0) += 1;
61 }
62 }
63
64 let mut queue: Vec<String> = in_degree
66 .iter()
67 .filter_map(|(k, v)| if *v == 0 { Some(k.clone()) } else { None })
68 .collect();
69 queue.sort();
70
71 let mut order = Vec::new();
72 while let Some(node) = queue.pop() {
73 order.push(node.clone());
74 if let Some(dependents) = graph.get(&node) {
75 for dep in dependents {
76 if let Some(deg) = in_degree.get_mut(dep) {
77 *deg = deg.saturating_sub(1);
78 if *deg == 0 {
79 queue.push(dep.clone());
80 queue.sort();
81 }
82 }
83 }
84 }
85 }
86
87 if order.len() == all_agents.len() {
88 order
89 } else {
90 let cycle_members: Vec<String> = in_degree
91 .iter()
92 .filter_map(|(k, v)| if *v > 0 { Some(k.clone()) } else { None })
93 .collect();
94 eprintln!(
95 "warning: dependency cycle detected among agents {cycle_members:?}; \
96 falling back to sorted merge order"
97 );
98 let mut fallback = all_agents.to_vec();
102 fallback.sort();
103 fallback
104 }
105}
106
107#[derive(Debug, Clone)]
114pub struct MergeResults {
115 pub merge_order: Vec<String>,
117 pub test_results: HashMap<String, TestResult>,
119}
120
121pub fn run_test_command(repo_root: &Path, test_command: &str) -> Result<TestResult, PawError> {
127 let output = std::process::Command::new("sh")
128 .current_dir(repo_root)
129 .arg("-c")
130 .arg(test_command)
131 .output()
132 .map_err(|e| PawError::SessionError(format!("failed to run test command: {e}")))?;
133
134 let success = output.status.success();
135 let output_str = String::from_utf8_lossy(&output.stdout).to_string();
136
137 Ok(TestResult {
138 success,
139 output: output_str,
140 })
141}
142
143#[allow(clippy::unnecessary_wraps)]
158pub fn run_merge_loop_with_publisher<S: std::hash::BuildHasher>(
159 repo_root: &Path,
160 session: &Session,
161 test_command: Option<&String>,
162 dep_graph: &HashMap<String, Vec<String>, S>,
163 publisher: &dyn Fn(&BrokerMessage),
164) -> Result<MergeResults, PawError> {
165 let agents: Vec<String> = session.worktrees.iter().map(|w| w.branch.clone()).collect();
166 let merge_order = topological_merge_order(dep_graph, &agents);
167
168 let mut test_results: HashMap<String, TestResult> = HashMap::new();
169 let mut n_ok: usize = 0;
170 let mut n_fail: usize = 0;
171
172 let _ = std::process::Command::new("git")
173 .current_dir(repo_root)
174 .args(["checkout", MERGE_TARGET_BRANCH])
175 .status();
176
177 for branch in &merge_order {
178 println!("Merging branch: {branch}");
179
180 if let Err(e) = git::merge_branch(repo_root, branch) {
181 eprintln!("Warning: Failed to merge branch {branch}: {e}");
182 let reason = format!("merge failed: {e}");
183 test_results.insert(
184 branch.clone(),
185 TestResult {
186 success: false,
187 output: format!("Merge failed: {e}"),
188 },
189 );
190 publisher(&build_status_message(branch, "merge_failed", Some(reason)));
191 n_fail += 1;
192 continue;
193 }
194
195 let _ = std::process::Command::new("git")
201 .current_dir(repo_root)
202 .args(["commit", "--no-edit", "--allow-empty"])
203 .output();
204
205 let merged_msg = format!("merged into {MERGE_TARGET_BRANCH}");
206
207 if let Some(cmd) = test_command {
208 println!("Running test command: {cmd}");
209 match run_test_command(repo_root, cmd) {
210 Ok(result) => {
211 let success = result.success;
212 test_results.insert(branch.clone(), result);
213 if success {
214 println!("\u{2713} Tests passed for {branch}");
215 publisher(&build_status_message(
216 branch,
217 "merged",
218 Some(merged_msg.clone()),
219 ));
220 n_ok += 1;
221 } else {
222 println!("\u{2717} Tests failed for {branch}");
223 publisher(&build_status_message(
224 branch,
225 "merge_failed",
226 Some(format!("test command failed for {branch}")),
227 ));
228 n_fail += 1;
229 }
230 }
231 Err(e) => {
232 eprintln!("Warning: Test command failed for {branch}: {e}");
233 let reason = format!("test execution failed: {e}");
234 test_results.insert(
235 branch.clone(),
236 TestResult {
237 success: false,
238 output: format!("Test execution failed: {e}"),
239 },
240 );
241 publisher(&build_status_message(branch, "merge_failed", Some(reason)));
242 n_fail += 1;
243 }
244 }
245 } else {
246 println!("\u{2713} Merged {branch} (no test command configured)");
247 test_results.insert(
248 branch.clone(),
249 TestResult {
250 success: true,
251 output: "No test command configured".to_string(),
252 },
253 );
254 publisher(&build_status_message(branch, "merged", Some(merged_msg)));
255 n_ok += 1;
256 }
257 }
258
259 publisher(&build_status_message(
260 "supervisor",
261 "working",
262 Some(format!("merge loop done: {n_ok} merged, {n_fail} failed")),
263 ));
264
265 Ok(MergeResults {
266 merge_order,
267 test_results,
268 })
269}
270
271pub fn run_merge_loop(
280 repo_root: &Path,
281 session: &Session,
282 test_command: Option<&String>,
283 broker_config: &BrokerConfig,
284) -> Result<MergeResults, PawError> {
285 let dep_graph = if broker_config.enabled {
286 match crate::broker::publish::fetch_log_over_http(&broker_config.url()) {
287 Ok(messages) => {
288 let pairs: Vec<(u64, BrokerMessage)> = messages
289 .into_iter()
290 .enumerate()
291 .map(|(i, m)| (i as u64, m))
292 .collect();
293 build_dependency_graph(&pairs)
294 }
295 Err(e) => {
296 eprintln!(
297 "warning: failed to fetch broker /log for merge dependency graph: {e}; \
298 falling back to alphabetical merge order"
299 );
300 HashMap::new()
301 }
302 }
303 } else {
304 HashMap::new()
305 };
306
307 let publisher: Box<dyn Fn(&BrokerMessage)> = if broker_config.enabled {
308 let url = broker_config.url();
309 Box::new(move |msg: &BrokerMessage| {
310 if let Err(e) = publish_to_broker_http(&url, msg) {
311 eprintln!("warning: failed to publish merge status to broker: {e}");
312 }
313 })
314 } else {
315 Box::new(|_msg: &BrokerMessage| {})
316 };
317 run_merge_loop_with_publisher(repo_root, session, test_command, &dep_graph, &*publisher)
318}
319
320#[cfg(test)]
321mod tests {
322 use std::collections::HashMap;
328 use std::path::PathBuf;
329 use std::process::Command;
330 use std::sync::Arc;
331 use std::time::SystemTime;
332
333 use crate::broker;
334 use crate::broker::delivery;
335 use crate::session::{Session, SessionStatus, WorktreeEntry};
336
337 use super::run_merge_loop_with_publisher;
338
339 fn init_repo(dir: &std::path::Path) {
340 let git = which::which("git").expect("git on PATH");
341 Command::new(&git)
342 .current_dir(dir)
343 .args(["init", "-b", "main"])
344 .output()
345 .expect("git init");
346 Command::new(&git)
347 .current_dir(dir)
348 .args(["config", "user.email", "test@test.com"])
349 .output()
350 .expect("git config email");
351 Command::new(&git)
352 .current_dir(dir)
353 .args(["config", "user.name", "Test"])
354 .output()
355 .expect("git config name");
356 std::fs::write(dir.join("README.md"), "# test\n").unwrap();
357 Command::new(&git)
358 .current_dir(dir)
359 .args(["add", "README.md"])
360 .output()
361 .expect("git add");
362 Command::new(&git)
363 .current_dir(dir)
364 .args(["commit", "-m", "init"])
365 .output()
366 .expect("git commit");
367 }
368
369 fn synthetic_session(branches: &[&str]) -> Session {
370 Session {
371 session_name: "paw-test".to_string(),
372 repo_path: PathBuf::from("/tmp"),
373 project_name: "test".to_string(),
374 created_at: SystemTime::now(),
375 status: SessionStatus::Active,
376 worktrees: branches
377 .iter()
378 .map(|b| WorktreeEntry {
379 branch: (*b).to_string(),
380 worktree_path: PathBuf::from("/tmp"),
381 cli: "claude".to_string(),
382 branch_created: false,
383 })
384 .collect(),
385 broker_port: None,
386 broker_bind: None,
387 broker_log_path: None,
388 }
389 }
390
391 #[test]
392 fn merge_loop_publishes_final_supervisor_status() {
393 let tmp = tempfile::tempdir().expect("tempdir");
394 init_repo(tmp.path());
395
396 let state = Arc::new(broker::BrokerState::new(None));
397 let publisher_state = Arc::clone(&state);
398 let publisher = move |msg: &broker::messages::BrokerMessage| {
399 delivery::publish_message(&publisher_state, msg);
400 };
401
402 let session = synthetic_session(&["feat-a", "feat-b"]);
403 let _ =
404 run_merge_loop_with_publisher(tmp.path(), &session, None, &HashMap::new(), &publisher);
405
406 let inner = state.read();
407 let supervisor = inner
408 .agents
409 .get("supervisor")
410 .expect("supervisor record published at end of merge loop");
411 assert_eq!(supervisor.status, "working");
412 let last_msg = supervisor
413 .last_message
414 .as_ref()
415 .expect("supervisor last_message recorded");
416 match last_msg {
417 broker::messages::BrokerMessage::Status { payload, .. } => {
418 let body = payload.message.as_deref().unwrap_or("");
419 assert!(
420 body.starts_with("merge loop done:"),
421 "expected 'merge loop done: ...' message, got: {body}"
422 );
423 }
424 other => panic!("expected Status, got {other:?}"),
425 }
426 }
427
428 #[test]
429 fn merge_loop_publishes_merge_failed_when_test_command_fails() {
430 let tmp = tempfile::tempdir().expect("tempdir");
431 init_repo(tmp.path());
432
433 let git = which::which("git").expect("git on PATH");
434 Command::new(&git)
435 .current_dir(tmp.path())
436 .args(["checkout", "-b", "feat-broken"])
437 .output()
438 .expect("checkout -b feat-broken");
439 std::fs::write(tmp.path().join("broken.txt"), "broken\n").unwrap();
440 Command::new(&git)
441 .current_dir(tmp.path())
442 .args(["add", "broken.txt"])
443 .output()
444 .expect("git add");
445 Command::new(&git)
446 .current_dir(tmp.path())
447 .args(["commit", "-m", "broken"])
448 .output()
449 .expect("git commit");
450 Command::new(&git)
451 .current_dir(tmp.path())
452 .args(["checkout", "main"])
453 .output()
454 .expect("checkout main");
455
456 let state = Arc::new(broker::BrokerState::new(None));
457 let publisher_state = Arc::clone(&state);
458 let publisher = move |msg: &broker::messages::BrokerMessage| {
459 delivery::publish_message(&publisher_state, msg);
460 };
461
462 let session = synthetic_session(&["feat-broken"]);
463 let test_cmd = "exit 1".to_string();
464 let _ = run_merge_loop_with_publisher(
465 tmp.path(),
466 &session,
467 Some(&test_cmd),
468 &HashMap::new(),
469 &publisher,
470 );
471
472 let inner = state.read();
473 let record = inner
474 .agents
475 .get("feat-broken")
476 .expect("feat-broken status published");
477 assert_eq!(
478 record.status, "merge_failed",
479 "branch should publish merge_failed when test command fails",
480 );
481
482 let supervisor = inner.agents.get("supervisor").expect("supervisor row");
483 let last_msg = supervisor
484 .last_message
485 .as_ref()
486 .expect("supervisor last message recorded");
487 if let broker::messages::BrokerMessage::Status { payload, .. } = last_msg {
488 let body = payload.message.as_deref().unwrap_or("");
489 assert!(
490 body.contains("0 merged") && body.contains("1 failed"),
491 "expected '0 merged, 1 failed' in body, got: {body}",
492 );
493 } else {
494 panic!("expected Status message");
495 }
496 }
497
498 #[test]
499 fn merge_loop_publishes_merged_status_when_branch_exists() {
500 let tmp = tempfile::tempdir().expect("tempdir");
501 init_repo(tmp.path());
502
503 let git = which::which("git").expect("git on PATH");
504 Command::new(&git)
505 .current_dir(tmp.path())
506 .args(["checkout", "-b", "feat-ok"])
507 .output()
508 .expect("checkout -b feat-ok");
509 std::fs::write(tmp.path().join("feature.txt"), "feature\n").unwrap();
510 Command::new(&git)
511 .current_dir(tmp.path())
512 .args(["add", "feature.txt"])
513 .output()
514 .expect("git add feature.txt");
515 Command::new(&git)
516 .current_dir(tmp.path())
517 .args(["commit", "-m", "feat"])
518 .output()
519 .expect("git commit");
520 Command::new(&git)
521 .current_dir(tmp.path())
522 .args(["checkout", "main"])
523 .output()
524 .expect("checkout main");
525
526 let state = Arc::new(broker::BrokerState::new(None));
527 let publisher_state = Arc::clone(&state);
528 let publisher = move |msg: &broker::messages::BrokerMessage| {
529 delivery::publish_message(&publisher_state, msg);
530 };
531
532 let session = synthetic_session(&["feat-ok"]);
533 let _ =
534 run_merge_loop_with_publisher(tmp.path(), &session, None, &HashMap::new(), &publisher);
535
536 let inner = state.read();
537 let record = inner
538 .agents
539 .get("feat-ok")
540 .expect("feat-ok status published");
541 assert_eq!(record.status, "merged");
542 let last_msg = record
543 .last_message
544 .as_ref()
545 .expect("feat-ok last message recorded");
546 if let broker::messages::BrokerMessage::Status { payload, .. } = last_msg {
547 assert_eq!(payload.message.as_deref(), Some("merged into main"));
548 } else {
549 panic!("expected Status message");
550 }
551 }
552
553 #[test]
554 fn topological_merge_order_cycle_fallback_is_deterministic() {
555 use std::collections::HashMap;
556
557 use super::topological_merge_order;
558
559 let mut graph: HashMap<String, Vec<String>> = HashMap::new();
561 graph.insert("a".into(), vec!["b".into()]);
562 graph.insert("b".into(), vec!["a".into()]);
563
564 let all_agents: Vec<String> = vec!["c".into(), "a".into(), "b".into()];
567 let order = topological_merge_order(&graph, &all_agents);
568 assert_eq!(
569 order,
570 vec!["a".to_string(), "b".to_string(), "c".to_string()]
571 );
572
573 let alt: Vec<String> = vec!["b".into(), "c".into(), "a".into()];
575 let order_alt = topological_merge_order(&graph, &alt);
576 assert_eq!(order_alt, order);
577 }
578}