Skip to main content

git_paw/
merge_loop.rs

1//! Supervisor merge loop and topological dependency ordering.
2//!
3//! Walks the agent set in topological merge order, merges each branch into
4//! the current branch, and runs the supervisor's configured test command (if
5//! any) between merges. The captured per-branch outcomes are returned for
6//! [`crate::summary::write_supervisor_summary`] to render into the session
7//! summary.
8//!
9//! Pulled out of `main.rs` so integration tests can drive the merge loop
10//! without spawning the binary or depending on the supervisor CLI.
11
12use std::collections::HashMap;
13use std::path::Path;
14
15use crate::broker::messages::BrokerMessage;
16use crate::broker::publish::{build_status_message, publish_to_broker_http};
17use crate::config::BrokerConfig;
18use crate::error::PawError;
19use crate::git;
20use crate::session::Session;
21use crate::summary::TestResult;
22
23/// Target branch for the supervisor merge loop.
24const MERGE_TARGET_BRANCH: &str = "main";
25
26/// Builds a dependency graph from broker messages.
27///
28/// An edge `B → A` means "A was blocked on B" — so B must merge before A.
29/// The returned map is keyed by the dependency (B) and lists its dependents
30/// (A's). Used by [`topological_merge_order`] to pick a safe merge order.
31pub fn build_dependency_graph(messages: &[(u64, BrokerMessage)]) -> HashMap<String, Vec<String>> {
32    let mut graph: HashMap<String, Vec<String>> = HashMap::new();
33    for (_, msg) in messages {
34        if let BrokerMessage::Blocked { agent_id, payload } = msg {
35            let dep = payload.from.clone();
36            let dependent = agent_id.clone();
37            graph.entry(dep).or_default().push(dependent);
38        }
39    }
40    graph
41}
42
43/// Topological sort of the dependency graph. Returns a merge order where
44/// agents with no dependents come first (so agents that are depended upon
45/// merge before the agents that depend on them).
46///
47/// On cycle detection, logs a warning and falls back to returning all agents
48/// in arbitrary order so the caller can still proceed.
49pub fn topological_merge_order<S: std::hash::BuildHasher>(
50    graph: &HashMap<String, Vec<String>, S>,
51    all_agents: &[String],
52) -> Vec<String> {
53    // Compute in-degree of each agent (number of deps it has).
54    let mut in_degree: HashMap<String, usize> = HashMap::new();
55    for agent in all_agents {
56        in_degree.entry(agent.clone()).or_insert(0);
57    }
58    for dependents in graph.values() {
59        for dependent in dependents {
60            *in_degree.entry(dependent.clone()).or_insert(0) += 1;
61        }
62    }
63
64    // Kahn's algorithm — seed with all zero-in-degree agents.
65    let mut queue: Vec<String> = in_degree
66        .iter()
67        .filter_map(|(k, v)| if *v == 0 { Some(k.clone()) } else { None })
68        .collect();
69    queue.sort();
70
71    let mut order = Vec::new();
72    while let Some(node) = queue.pop() {
73        order.push(node.clone());
74        if let Some(dependents) = graph.get(&node) {
75            for dep in dependents {
76                if let Some(deg) = in_degree.get_mut(dep) {
77                    *deg = deg.saturating_sub(1);
78                    if *deg == 0 {
79                        queue.push(dep.clone());
80                        queue.sort();
81                    }
82                }
83            }
84        }
85    }
86
87    if order.len() == all_agents.len() {
88        order
89    } else {
90        let cycle_members: Vec<String> = in_degree
91            .iter()
92            .filter_map(|(k, v)| if *v > 0 { Some(k.clone()) } else { None })
93            .collect();
94        eprintln!(
95            "warning: dependency cycle detected among agents {cycle_members:?}; \
96             falling back to sorted merge order"
97        );
98        // Sort so the fallback order is deterministic regardless of the
99        // caller's input ordering — otherwise tests (and operators) get
100        // different results from the same cyclic graph.
101        let mut fallback = all_agents.to_vec();
102        fallback.sort();
103        fallback
104    }
105}
106
107/// Results of running the merge loop.
108///
109/// `merge_order` lists the branches in the order they were attempted (matches
110/// `topological_merge_order`'s return). `test_results` records, per branch,
111/// whether the merge + test succeeded plus the captured stdout (or a synthetic
112/// "Merge failed: ..." / "No test command configured" line).
113#[derive(Debug, Clone)]
114pub struct MergeResults {
115    /// Branches in the order they were processed.
116    pub merge_order: Vec<String>,
117    /// Per-branch outcome (keyed by branch name).
118    pub test_results: HashMap<String, TestResult>,
119}
120
121/// Runs the configured test command via `sh -c` and captures stdout.
122///
123/// Returns a [`TestResult`] whose `success` reflects the shell exit status
124/// and whose `output` is the captured stdout (stderr is intentionally not
125/// captured here — the supervisor summary surfaces stdout to the operator).
126pub fn run_test_command(repo_root: &Path, test_command: &str) -> Result<TestResult, PawError> {
127    let output = std::process::Command::new("sh")
128        .current_dir(repo_root)
129        .arg("-c")
130        .arg(test_command)
131        .output()
132        .map_err(|e| PawError::SessionError(format!("failed to run test command: {e}")))?;
133
134    let success = output.status.success();
135    let output_str = String::from_utf8_lossy(&output.stdout).to_string();
136
137    Ok(TestResult {
138        success,
139        output: output_str,
140    })
141}
142
143/// Walks the agent set in topological order, merging each branch into the
144/// current branch, optionally running `test_command` between merges, and
145/// returning the per-branch outcomes.
146///
147/// `dep_graph` is the dependency map produced by [`build_dependency_graph`]
148/// from the broker's `agent.blocked` messages. An empty map (or `None`)
149/// means no dependencies are known, in which case [`topological_merge_order`]
150/// falls back to a sorted alphabetical merge.
151///
152/// Per-branch and final supervisor status messages are dispatched via
153/// `publisher`. The production callsite injects a closure that POSTs to the
154/// broker over HTTP; tests inject a closure that calls
155/// [`crate::broker::delivery::publish_message`] against a synthetic state so
156/// they can assert on the resulting broker state without a live HTTP server.
157#[allow(clippy::unnecessary_wraps)]
158pub fn run_merge_loop_with_publisher<S: std::hash::BuildHasher>(
159    repo_root: &Path,
160    session: &Session,
161    test_command: Option<&String>,
162    dep_graph: &HashMap<String, Vec<String>, S>,
163    publisher: &dyn Fn(&BrokerMessage),
164) -> Result<MergeResults, PawError> {
165    let agents: Vec<String> = session.worktrees.iter().map(|w| w.branch.clone()).collect();
166    let merge_order = topological_merge_order(dep_graph, &agents);
167
168    let mut test_results: HashMap<String, TestResult> = HashMap::new();
169    let mut n_ok: usize = 0;
170    let mut n_fail: usize = 0;
171
172    let _ = std::process::Command::new("git")
173        .current_dir(repo_root)
174        .args(["checkout", MERGE_TARGET_BRANCH])
175        .status();
176
177    for branch in &merge_order {
178        println!("Merging branch: {branch}");
179
180        if let Err(e) = git::merge_branch(repo_root, branch) {
181            eprintln!("Warning: Failed to merge branch {branch}: {e}");
182            let reason = format!("merge failed: {e}");
183            test_results.insert(
184                branch.clone(),
185                TestResult {
186                    success: false,
187                    output: format!("Merge failed: {e}"),
188                },
189            );
190            publisher(&build_status_message(branch, "merge_failed", Some(reason)));
191            n_fail += 1;
192            continue;
193        }
194
195        // `git::merge_branch` uses `--no-commit`, so even a clean merge leaves
196        // MERGE_HEAD set and blocks the next merge. Finalize by committing
197        // with the default merge message; if there is nothing to commit
198        // (e.g. the branch was already merged), `git commit` exits non-zero
199        // and we treat that as a no-op.
200        let _ = std::process::Command::new("git")
201            .current_dir(repo_root)
202            .args(["commit", "--no-edit", "--allow-empty"])
203            .output();
204
205        let merged_msg = format!("merged into {MERGE_TARGET_BRANCH}");
206
207        if let Some(cmd) = test_command {
208            println!("Running test command: {cmd}");
209            match run_test_command(repo_root, cmd) {
210                Ok(result) => {
211                    let success = result.success;
212                    test_results.insert(branch.clone(), result);
213                    if success {
214                        println!("\u{2713} Tests passed for {branch}");
215                        publisher(&build_status_message(
216                            branch,
217                            "merged",
218                            Some(merged_msg.clone()),
219                        ));
220                        n_ok += 1;
221                    } else {
222                        println!("\u{2717} Tests failed for {branch}");
223                        publisher(&build_status_message(
224                            branch,
225                            "merge_failed",
226                            Some(format!("test command failed for {branch}")),
227                        ));
228                        n_fail += 1;
229                    }
230                }
231                Err(e) => {
232                    eprintln!("Warning: Test command failed for {branch}: {e}");
233                    let reason = format!("test execution failed: {e}");
234                    test_results.insert(
235                        branch.clone(),
236                        TestResult {
237                            success: false,
238                            output: format!("Test execution failed: {e}"),
239                        },
240                    );
241                    publisher(&build_status_message(branch, "merge_failed", Some(reason)));
242                    n_fail += 1;
243                }
244            }
245        } else {
246            println!("\u{2713} Merged {branch} (no test command configured)");
247            test_results.insert(
248                branch.clone(),
249                TestResult {
250                    success: true,
251                    output: "No test command configured".to_string(),
252                },
253            );
254            publisher(&build_status_message(branch, "merged", Some(merged_msg)));
255            n_ok += 1;
256        }
257    }
258
259    publisher(&build_status_message(
260        "supervisor",
261        "working",
262        Some(format!("merge loop done: {n_ok} merged, {n_fail} failed")),
263    ));
264
265    Ok(MergeResults {
266        merge_order,
267        test_results,
268    })
269}
270
271/// Production wrapper around [`run_merge_loop_with_publisher`].
272///
273/// Publishes merge results to the broker over HTTP using `broker_config.url()`
274/// when the broker is enabled; otherwise the publisher is a no-op. The
275/// dependency graph is sourced from the broker's `agent.blocked` messages
276/// fetched via `GET /log` so the merge order honours real cross-agent
277/// dependencies. If the fetch fails, falls back to an empty graph (sorted
278/// alphabetical merge) with a warning to stderr.
279pub fn run_merge_loop(
280    repo_root: &Path,
281    session: &Session,
282    test_command: Option<&String>,
283    broker_config: &BrokerConfig,
284) -> Result<MergeResults, PawError> {
285    let dep_graph = if broker_config.enabled {
286        match crate::broker::publish::fetch_log_over_http(&broker_config.url()) {
287            Ok(messages) => {
288                let pairs: Vec<(u64, BrokerMessage)> = messages
289                    .into_iter()
290                    .enumerate()
291                    .map(|(i, m)| (i as u64, m))
292                    .collect();
293                build_dependency_graph(&pairs)
294            }
295            Err(e) => {
296                eprintln!(
297                    "warning: failed to fetch broker /log for merge dependency graph: {e}; \
298                     falling back to alphabetical merge order"
299                );
300                HashMap::new()
301            }
302        }
303    } else {
304        HashMap::new()
305    };
306
307    let publisher: Box<dyn Fn(&BrokerMessage)> = if broker_config.enabled {
308        let url = broker_config.url();
309        Box::new(move |msg: &BrokerMessage| {
310            if let Err(e) = publish_to_broker_http(&url, msg) {
311                eprintln!("warning: failed to publish merge status to broker: {e}");
312            }
313        })
314    } else {
315        Box::new(|_msg: &BrokerMessage| {})
316    };
317    run_merge_loop_with_publisher(repo_root, session, test_command, &dep_graph, &*publisher)
318}
319
320#[cfg(test)]
321mod tests {
322    //! Behavioral tests for `run_merge_loop_with_publisher`. The publisher
323    //! closure routes through `broker::delivery::publish_message` against a
324    //! synthetic in-process broker state so we can assert on the resulting
325    //! status records without spinning up an HTTP server.
326
327    use std::collections::HashMap;
328    use std::path::PathBuf;
329    use std::process::Command;
330    use std::sync::Arc;
331    use std::time::SystemTime;
332
333    use crate::broker;
334    use crate::broker::delivery;
335    use crate::session::{Session, SessionStatus, WorktreeEntry};
336
337    use super::run_merge_loop_with_publisher;
338
339    fn init_repo(dir: &std::path::Path) {
340        let git = which::which("git").expect("git on PATH");
341        Command::new(&git)
342            .current_dir(dir)
343            .args(["init", "-b", "main"])
344            .output()
345            .expect("git init");
346        Command::new(&git)
347            .current_dir(dir)
348            .args(["config", "user.email", "test@test.com"])
349            .output()
350            .expect("git config email");
351        Command::new(&git)
352            .current_dir(dir)
353            .args(["config", "user.name", "Test"])
354            .output()
355            .expect("git config name");
356        std::fs::write(dir.join("README.md"), "# test\n").unwrap();
357        Command::new(&git)
358            .current_dir(dir)
359            .args(["add", "README.md"])
360            .output()
361            .expect("git add");
362        Command::new(&git)
363            .current_dir(dir)
364            .args(["commit", "-m", "init"])
365            .output()
366            .expect("git commit");
367    }
368
369    fn synthetic_session(branches: &[&str]) -> Session {
370        Session {
371            session_name: "paw-test".to_string(),
372            repo_path: PathBuf::from("/tmp"),
373            project_name: "test".to_string(),
374            created_at: SystemTime::now(),
375            status: SessionStatus::Active,
376            worktrees: branches
377                .iter()
378                .map(|b| WorktreeEntry {
379                    branch: (*b).to_string(),
380                    worktree_path: PathBuf::from("/tmp"),
381                    cli: "claude".to_string(),
382                    branch_created: false,
383                })
384                .collect(),
385            broker_port: None,
386            broker_bind: None,
387            broker_log_path: None,
388        }
389    }
390
391    #[test]
392    fn merge_loop_publishes_final_supervisor_status() {
393        let tmp = tempfile::tempdir().expect("tempdir");
394        init_repo(tmp.path());
395
396        let state = Arc::new(broker::BrokerState::new(None));
397        let publisher_state = Arc::clone(&state);
398        let publisher = move |msg: &broker::messages::BrokerMessage| {
399            delivery::publish_message(&publisher_state, msg);
400        };
401
402        let session = synthetic_session(&["feat-a", "feat-b"]);
403        let _ =
404            run_merge_loop_with_publisher(tmp.path(), &session, None, &HashMap::new(), &publisher);
405
406        let inner = state.read();
407        let supervisor = inner
408            .agents
409            .get("supervisor")
410            .expect("supervisor record published at end of merge loop");
411        assert_eq!(supervisor.status, "working");
412        let last_msg = supervisor
413            .last_message
414            .as_ref()
415            .expect("supervisor last_message recorded");
416        match last_msg {
417            broker::messages::BrokerMessage::Status { payload, .. } => {
418                let body = payload.message.as_deref().unwrap_or("");
419                assert!(
420                    body.starts_with("merge loop done:"),
421                    "expected 'merge loop done: ...' message, got: {body}"
422                );
423            }
424            other => panic!("expected Status, got {other:?}"),
425        }
426    }
427
428    #[test]
429    fn merge_loop_publishes_merge_failed_when_test_command_fails() {
430        let tmp = tempfile::tempdir().expect("tempdir");
431        init_repo(tmp.path());
432
433        let git = which::which("git").expect("git on PATH");
434        Command::new(&git)
435            .current_dir(tmp.path())
436            .args(["checkout", "-b", "feat-broken"])
437            .output()
438            .expect("checkout -b feat-broken");
439        std::fs::write(tmp.path().join("broken.txt"), "broken\n").unwrap();
440        Command::new(&git)
441            .current_dir(tmp.path())
442            .args(["add", "broken.txt"])
443            .output()
444            .expect("git add");
445        Command::new(&git)
446            .current_dir(tmp.path())
447            .args(["commit", "-m", "broken"])
448            .output()
449            .expect("git commit");
450        Command::new(&git)
451            .current_dir(tmp.path())
452            .args(["checkout", "main"])
453            .output()
454            .expect("checkout main");
455
456        let state = Arc::new(broker::BrokerState::new(None));
457        let publisher_state = Arc::clone(&state);
458        let publisher = move |msg: &broker::messages::BrokerMessage| {
459            delivery::publish_message(&publisher_state, msg);
460        };
461
462        let session = synthetic_session(&["feat-broken"]);
463        let test_cmd = "exit 1".to_string();
464        let _ = run_merge_loop_with_publisher(
465            tmp.path(),
466            &session,
467            Some(&test_cmd),
468            &HashMap::new(),
469            &publisher,
470        );
471
472        let inner = state.read();
473        let record = inner
474            .agents
475            .get("feat-broken")
476            .expect("feat-broken status published");
477        assert_eq!(
478            record.status, "merge_failed",
479            "branch should publish merge_failed when test command fails",
480        );
481
482        let supervisor = inner.agents.get("supervisor").expect("supervisor row");
483        let last_msg = supervisor
484            .last_message
485            .as_ref()
486            .expect("supervisor last message recorded");
487        if let broker::messages::BrokerMessage::Status { payload, .. } = last_msg {
488            let body = payload.message.as_deref().unwrap_or("");
489            assert!(
490                body.contains("0 merged") && body.contains("1 failed"),
491                "expected '0 merged, 1 failed' in body, got: {body}",
492            );
493        } else {
494            panic!("expected Status message");
495        }
496    }
497
498    #[test]
499    fn merge_loop_publishes_merged_status_when_branch_exists() {
500        let tmp = tempfile::tempdir().expect("tempdir");
501        init_repo(tmp.path());
502
503        let git = which::which("git").expect("git on PATH");
504        Command::new(&git)
505            .current_dir(tmp.path())
506            .args(["checkout", "-b", "feat-ok"])
507            .output()
508            .expect("checkout -b feat-ok");
509        std::fs::write(tmp.path().join("feature.txt"), "feature\n").unwrap();
510        Command::new(&git)
511            .current_dir(tmp.path())
512            .args(["add", "feature.txt"])
513            .output()
514            .expect("git add feature.txt");
515        Command::new(&git)
516            .current_dir(tmp.path())
517            .args(["commit", "-m", "feat"])
518            .output()
519            .expect("git commit");
520        Command::new(&git)
521            .current_dir(tmp.path())
522            .args(["checkout", "main"])
523            .output()
524            .expect("checkout main");
525
526        let state = Arc::new(broker::BrokerState::new(None));
527        let publisher_state = Arc::clone(&state);
528        let publisher = move |msg: &broker::messages::BrokerMessage| {
529            delivery::publish_message(&publisher_state, msg);
530        };
531
532        let session = synthetic_session(&["feat-ok"]);
533        let _ =
534            run_merge_loop_with_publisher(tmp.path(), &session, None, &HashMap::new(), &publisher);
535
536        let inner = state.read();
537        let record = inner
538            .agents
539            .get("feat-ok")
540            .expect("feat-ok status published");
541        assert_eq!(record.status, "merged");
542        let last_msg = record
543            .last_message
544            .as_ref()
545            .expect("feat-ok last message recorded");
546        if let broker::messages::BrokerMessage::Status { payload, .. } = last_msg {
547            assert_eq!(payload.message.as_deref(), Some("merged into main"));
548        } else {
549            panic!("expected Status message");
550        }
551    }
552
553    #[test]
554    fn topological_merge_order_cycle_fallback_is_deterministic() {
555        use std::collections::HashMap;
556
557        use super::topological_merge_order;
558
559        // 2-cycle: a depends on b, b depends on a.
560        let mut graph: HashMap<String, Vec<String>> = HashMap::new();
561        graph.insert("a".into(), vec!["b".into()]);
562        graph.insert("b".into(), vec!["a".into()]);
563
564        // Caller passes agents in a non-sorted order. The cycle fallback must
565        // sort them so the result is deterministic regardless of input order.
566        let all_agents: Vec<String> = vec!["c".into(), "a".into(), "b".into()];
567        let order = topological_merge_order(&graph, &all_agents);
568        assert_eq!(
569            order,
570            vec!["a".to_string(), "b".to_string(), "c".to_string()]
571        );
572
573        // Same agents in another order produce the same fallback.
574        let alt: Vec<String> = vec!["b".into(), "c".into(), "a".into()];
575        let order_alt = topological_merge_order(&graph, &alt);
576        assert_eq!(order_alt, order);
577    }
578}