use std::collections::HashMap;
use std::io::Write;
use std::path::{Path, PathBuf};
use std::time::Duration;
use anyhow::{Context, Result, bail};
use crossterm::event::{Event, KeyCode, KeyModifiers};
use crossterm::terminal;
use serde::{Deserialize, Serialize};
use tokio::time::sleep;
use crate::agents::{self, AgentDef};
use crate::dispatch::{self, DispatchDecision};
use crate::display::input::{InputAction, InputHandler};
use crate::display::renderer::Renderer;
use crate::fork::{self, ForkConfig};
use crate::session::runner::SessionConfig;
use crate::session::state::SessionState;
use crate::vcr::{Io, IoEvent, VcrContext};
use crate::worker_state;
use crate::worktree::{self, SpawnOptions};
use super::session_loop::{self, SessionOutcome};
struct PhaseContext<'a, W: Write> {
renderer: &'a mut Renderer<W>,
input: &'a mut InputHandler,
io: &'a mut Io,
vcr: &'a VcrContext,
fork_config: Option<&'a ForkConfig>,
}
pub struct WorkerConfig {
pub show_thinking: bool,
pub branch: Option<String>,
pub worktree_base: PathBuf,
pub extra_args: Vec<String>,
pub working_dir: Option<PathBuf>,
pub fork: bool,
}
#[derive(Serialize, Deserialize, PartialEq, Debug)]
struct SpawnArgs {
repo_path: String,
branch: Option<String>,
base_path: String,
}
#[derive(Serialize, Deserialize, PartialEq, Debug)]
struct WorkerUpdateArgs {
path: String,
branch: String,
agent: Option<String>,
args: HashMap<String, String>,
}
pub async fn worker<W: Write>(
mut config: WorkerConfig,
io: &mut Io,
vcr: &VcrContext,
writer: W,
) -> Result<()> {
if !config.extra_args.iter().any(|a| a == "--permission-mode") {
config
.extra_args
.extend(["--permission-mode".to_string(), "acceptEdits".to_string()]);
}
let configured_dir = config.working_dir.as_ref().map(|d| d.display().to_string());
let configured_base = config.worktree_base.display().to_string();
let spawn_args: SpawnArgs = vcr
.call("worker_paths", (), async |(): &()| {
let repo_path = match configured_dir {
Some(s) => s,
None => std::env::current_dir()?.display().to_string(),
};
Ok(SpawnArgs {
repo_path,
branch: config.branch.clone(),
base_path: configured_base,
})
})
.await?;
let spawn_result = vcr
.call_typed_err("worktree::spawn", spawn_args, async |a: &SpawnArgs| {
worktree::spawn(&SpawnOptions {
repo_path: Path::new(&a.repo_path),
branch: a.branch.as_deref(),
base_path: Path::new(&a.base_path),
})
})
.await??;
if vcr.is_live() {
terminal::enable_raw_mode()?;
}
let mut renderer = Renderer::with_writer(writer);
renderer.set_show_thinking(config.show_thinking);
renderer.render_help();
let mut input = InputHandler::new();
let mut total_cost = 0.0;
let wt_str = spawn_result.worktree_path.display().to_string();
let own_pid: u32 = vcr
.call("process_id", (), async |(): &()| Ok(std::process::id()))
.await?;
vcr.call(
"worker_state::register",
(wt_str.clone(), spawn_result.branch.clone()),
async |a: &(String, String)| worker_state::register(Path::new(&a.0), &a.1),
)
.await?;
renderer.set_title(&format!("coven: {}", spawn_result.branch));
renderer.write_raw(&format!(
"\r\nWorker started: {} ({})\r\n",
spawn_result.branch,
spawn_result.worktree_path.display()
));
let fork_config = ForkConfig::if_enabled(config.fork, &config.extra_args, &config.working_dir);
let mut ctx = PhaseContext {
renderer: &mut renderer,
input: &mut input,
io,
vcr,
fork_config: fork_config.as_ref(),
};
let result = worker_loop(
&config,
&spawn_result.worktree_path,
&spawn_result.branch,
own_pid,
&mut ctx,
&mut total_cost,
)
.await;
if vcr.is_live() {
terminal::disable_raw_mode()?;
}
renderer.set_title("");
vcr.call(
"worker_state::deregister",
wt_str.clone(),
async |p: &String| -> Result<()> {
worker_state::deregister(Path::new(p));
Ok(())
},
)
.await?;
renderer.write_raw("\r\nRemoving worktree...\r\n");
if let Err(e) = vcr
.call_typed_err("worktree::remove", wt_str, async |p: &String| {
worktree::remove(Path::new(p))
})
.await?
{
renderer.write_raw(&format!("Warning: failed to remove worktree: {e}\r\n"));
}
result
}
struct DispatchResult {
decision: DispatchDecision,
agent_defs: Vec<AgentDef>,
cost: f64,
}
async fn run_dispatch<W: Write>(
worktree_path: &Path,
branch: &str,
extra_args: &[String],
worker_status: &str,
ctx: &mut PhaseContext<'_, W>,
) -> Result<Option<DispatchResult>> {
ctx.renderer
.set_title(&format!("coven: {branch} \u{2014} dispatch"));
ctx.renderer.write_raw("\r\n=== Dispatch ===\r\n\r\n");
let agents_dir = worktree_path.join(agents::AGENTS_DIR);
let agents_dir_str = agents_dir.display().to_string();
let agent_defs = ctx
.vcr
.call("agents::load_agents", agents_dir_str, async |d: &String| {
agents::load_agents(Path::new(d))
})
.await?;
if agent_defs.is_empty() {
bail!("no agent definitions found in {}", agents_dir.display());
}
let dispatch_agent = agent_defs
.iter()
.find(|a| a.name == "dispatch")
.context("no dispatch.md agent definition found")?;
let catalog = dispatch::format_agent_catalog(&agent_defs);
let dispatch_args = HashMap::from([
("agent_catalog".to_string(), catalog),
("worker_status".to_string(), worker_status.to_string()),
]);
let dispatch_prompt = dispatch_agent.render(&dispatch_args)?;
let PhaseOutcome::Completed {
result_text,
cost,
session_id,
} = run_phase_session(&dispatch_prompt, worktree_path, extra_args, None, ctx).await?
else {
return Ok(None);
};
match dispatch::parse_decision(&result_text) {
Ok(decision) => Ok(Some(DispatchResult {
decision,
agent_defs,
cost,
})),
Err(parse_err) => {
let Some(session_id) = session_id else {
return Err(parse_err).context("failed to parse dispatch decision");
};
ctx.renderer.write_raw(&format!(
"\r\nDispatch output could not be parsed: {parse_err}\r\nRetrying...\r\n\r\n"
));
let retry_prompt = format!(
"Your previous output could not be parsed: {parse_err}\n\n\
Please output your decision inside a <dispatch> tag containing YAML. \
For example:\n\n\
<dispatch>\nagent: plan\nissue: issues/example.md\n</dispatch>\n\n\
Or to sleep:\n\n\
<dispatch>\nsleep: true\n</dispatch>"
);
let PhaseOutcome::Completed {
result_text: retry_text,
cost: retry_cost,
..
} = run_phase_session(
&retry_prompt,
worktree_path,
extra_args,
Some(&session_id),
ctx,
)
.await?
else {
return Ok(None);
};
let decision = dispatch::parse_decision(&retry_text)
.context("failed to parse dispatch decision after retry")?;
Ok(Some(DispatchResult {
decision,
agent_defs,
cost: cost + retry_cost,
}))
}
}
}
async fn worker_loop<W: Write>(
config: &WorkerConfig,
worktree_path: &Path,
branch: &str,
own_pid: u32,
ctx: &mut PhaseContext<'_, W>,
total_cost: &mut f64,
) -> Result<()> {
loop {
let wt_str = worktree_path.display().to_string();
ctx.vcr
.call_typed_err(
"worktree::sync_to_main",
wt_str.clone(),
async |p: &String| worktree::sync_to_main(Path::new(p)),
)
.await?
.context("failed to sync worktree to main")?;
let lock = ctx
.vcr
.call(
"worker_state::acquire_dispatch_lock",
wt_str.clone(),
async |p: &String| worker_state::acquire_dispatch_lock(Path::new(p)),
)
.await?;
let all_workers = ctx
.vcr
.call(
"worker_state::read_all",
wt_str.clone(),
async |p: &String| worker_state::read_all(Path::new(p)),
)
.await?;
let worker_status = worker_state::format_status(&all_workers, own_pid);
let Some(dispatch) = run_dispatch(
worktree_path,
branch,
&config.extra_args,
&worker_status,
ctx,
)
.await?
else {
return Ok(());
};
let empty = HashMap::new();
let (agent_name, agent_args) = match &dispatch.decision {
DispatchDecision::Sleep => (None, &empty),
DispatchDecision::RunAgent { agent, args } => (Some(agent.as_str()), args),
};
vcr_update_worker_state(ctx.vcr, &wt_str, branch, agent_name, agent_args).await?;
drop(lock);
*total_cost += dispatch.cost;
ctx.renderer
.write_raw(&format!(" Total cost: ${total_cost:.2}\r\n"));
match dispatch.decision {
DispatchDecision::Sleep => {
ctx.renderer
.set_title(&format!("coven: {branch} \u{2014} sleeping"));
ctx.renderer
.write_raw("\r\nDispatch: sleep — waiting for new commits...\r\n");
let wait =
wait_for_new_commits(worktree_path, ctx.renderer, ctx.input, ctx.io, ctx.vcr);
if matches!(wait.await?, WaitOutcome::Exited) {
return Ok(());
}
}
DispatchDecision::RunAgent { agent, args } => {
let args_display = args
.iter()
.map(|(k, v)| format!("{k}={v}"))
.collect::<Vec<_>>()
.join(" ");
ctx.renderer
.write_raw(&format!("\r\nDispatch: {agent} {args_display}\r\n"));
let agent_def = dispatch
.agent_defs
.iter()
.find(|a| a.name == agent)
.with_context(|| format!("dispatch chose unknown agent: {agent}"))?;
let agent_prompt = agent_def.render(&args)?;
ctx.renderer
.write_raw(&format!("\r\n=== Agent: {agent} ===\r\n\r\n"));
let title_suffix = if args_display.is_empty() {
agent
} else {
format!("{agent} {args_display}")
};
ctx.renderer
.set_title(&format!("coven: {branch} \u{2014} {title_suffix}"));
let should_exit = run_agent(
&agent_prompt,
worktree_path,
&config.extra_args,
ctx,
total_cost,
)
.await?;
if should_exit {
return Ok(());
}
vcr_update_worker_state(ctx.vcr, &wt_str, branch, None, &HashMap::new()).await?;
}
}
}
}
async fn run_agent<W: Write>(
prompt: &str,
worktree_path: &Path,
extra_args: &[String],
ctx: &mut PhaseContext<'_, W>,
total_cost: &mut f64,
) -> Result<bool> {
let agent_session_id =
match run_phase_session(prompt, worktree_path, extra_args, None, ctx).await? {
PhaseOutcome::Completed {
cost, session_id, ..
} => {
*total_cost += cost;
ctx.renderer
.write_raw(&format!(" Total cost: ${total_cost:.2}\r\n"));
session_id
}
PhaseOutcome::Exited => return Ok(true),
};
warn_clean(worktree_path, ctx.renderer, ctx.vcr).await?;
let commit_result =
ensure_commits(worktree_path, agent_session_id, extra_args, ctx, total_cost).await?;
match commit_result {
CommitCheck::HasCommits { session_id } => {
let should_exit = land_or_resolve(
worktree_path,
session_id.as_deref(),
extra_args,
ctx,
total_cost,
)
.await?;
if should_exit {
return Ok(true);
}
}
CommitCheck::NoCommits => {
ctx.renderer
.write_raw("Agent produced no commits — skipping land.\r\n");
}
CommitCheck::Exited => return Ok(true),
}
Ok(false)
}
enum CommitCheck {
HasCommits { session_id: Option<String> },
NoCommits,
Exited,
}
async fn ensure_commits<W: Write>(
worktree_path: &Path,
agent_session_id: Option<String>,
extra_args: &[String],
ctx: &mut PhaseContext<'_, W>,
total_cost: &mut f64,
) -> Result<CommitCheck> {
let wt_str = worktree_path.display().to_string();
if vcr_has_unique_commits(ctx.vcr, wt_str.clone()).await?? {
return Ok(CommitCheck::HasCommits {
session_id: agent_session_id,
});
}
let Some(sid) = agent_session_id.as_deref() else {
ctx.renderer
.write_raw("Agent produced no commits and no session to resume.\r\n");
return Ok(CommitCheck::NoCommits);
};
ctx.renderer
.write_raw("Agent produced no commits — resuming session to ask for a commit.\r\n\r\n");
match run_phase_session(
"You finished without committing anything. \
If you have changes worth keeping, please commit them now. \
If there's nothing to commit, just confirm that.",
worktree_path,
extra_args,
Some(sid),
ctx,
)
.await?
{
PhaseOutcome::Completed {
cost, session_id, ..
} => {
*total_cost += cost;
ctx.renderer
.write_raw(&format!(" Total cost: ${total_cost:.2}\r\n"));
warn_clean(worktree_path, ctx.renderer, ctx.vcr).await?;
if vcr_has_unique_commits(ctx.vcr, wt_str).await?? {
Ok(CommitCheck::HasCommits { session_id })
} else {
Ok(CommitCheck::NoCommits)
}
}
PhaseOutcome::Exited => Ok(CommitCheck::Exited),
}
}
async fn land_or_resolve<W: Write>(
worktree_path: &Path,
session_id: Option<&str>,
extra_args: &[String],
ctx: &mut PhaseContext<'_, W>,
total_cost: &mut f64,
) -> Result<bool> {
const MAX_ATTEMPTS: u32 = 5;
ctx.renderer.write_raw("\r\n=== Landing ===\r\n");
let mut resume_session_id = session_id.map(String::from);
let mut attempts: u32 = 0;
let wt_str = worktree_path.display().to_string();
loop {
let conflict_files = match ctx
.vcr
.call_typed_err("worktree::land", wt_str.clone(), async |p: &String| {
worktree::land(Path::new(p))
})
.await?
{
Ok(result) => {
ctx.renderer.write_raw(&format!(
"Landed {} onto {}\r\n",
result.branch, result.main_branch
));
return Ok(false);
}
Err(worktree::WorktreeError::RebaseConflict(files)) => files,
Err(worktree::WorktreeError::FastForwardFailed) => {
attempts += 1;
if attempts > MAX_ATTEMPTS {
ctx.renderer.write_raw(&format!(
"Fast-forward failed after {MAX_ATTEMPTS} attempts \
— pausing worker. Press Enter to retry.\r\n",
));
if wait_for_enter_or_exit(ctx.io).await? {
return Ok(true);
}
attempts = 0;
continue;
}
ctx.renderer
.write_raw("Main advanced during land — retrying...\r\n");
continue;
}
Err(e) => {
let _ = vcr_abort_rebase(ctx.vcr, wt_str.clone()).await?;
ctx.renderer.write_raw(&format!(
"Land failed: {e} — pausing worker. Press Enter to retry.\r\n",
));
if wait_for_enter_or_exit(ctx.io).await? {
return Ok(true);
}
continue;
}
};
attempts += 1;
if attempts > MAX_ATTEMPTS {
vcr_abort_rebase(ctx.vcr, wt_str.clone()).await??;
ctx.renderer.write_raw(&format!(
"Conflict resolution failed after {MAX_ATTEMPTS} attempts \
— pausing worker. Press Enter to retry.\r\n",
));
if wait_for_enter_or_exit(ctx.io).await? {
return Ok(true);
}
attempts = 0;
continue;
}
let files_display = conflict_files.join(", ");
let Some(sid) = resume_session_id.as_deref() else {
vcr_abort_rebase(ctx.vcr, wt_str.clone()).await??;
bail!(
"Rebase conflict in {files_display} but no session ID available \
— this should be impossible"
);
};
ctx.renderer.write_raw(&format!(
"Rebase conflict in: {files_display} — resuming session to resolve.\r\n"
));
ctx.renderer
.write_raw("\r\n=== Conflict Resolution ===\r\n\r\n");
let prompt = format!(
"The rebase onto main hit conflicts in: {files_display}\n\n\
Resolve the conflicts in those files, stage them with `git add`, \
and run `git rebase --continue`. If more conflicts appear after \
continuing, resolve those too until the rebase completes."
);
match resolve_conflict(&prompt, worktree_path, sid, extra_args, ctx).await? {
ResolveOutcome::Resolved { session_id, cost } => {
*total_cost += cost;
ctx.renderer
.write_raw("Conflict resolution complete, retrying land...\r\n");
resume_session_id = session_id;
}
ResolveOutcome::Incomplete { session_id, cost } => {
*total_cost += cost;
ctx.renderer.write_raw("Retrying land...\r\n");
resume_session_id = session_id;
}
ResolveOutcome::Exited => return Ok(true),
}
}
}
enum ResolveOutcome {
Resolved {
session_id: Option<String>,
cost: f64,
},
Incomplete {
session_id: Option<String>,
cost: f64,
},
Exited,
}
async fn resolve_conflict<W: Write>(
prompt: &str,
worktree_path: &Path,
sid: &str,
extra_args: &[String],
ctx: &mut PhaseContext<'_, W>,
) -> Result<ResolveOutcome> {
let wt_str = worktree_path.display().to_string();
let PhaseOutcome::Completed {
cost, session_id, ..
} = run_phase_session(prompt, worktree_path, extra_args, Some(sid), ctx).await?
else {
abort_and_reset(worktree_path, ctx.renderer, ctx.vcr).await?;
return Ok(ResolveOutcome::Exited);
};
warn_clean(worktree_path, ctx.renderer, ctx.vcr).await?;
let is_rebasing = vcr_is_rebase_in_progress(ctx.vcr, wt_str.clone())
.await?
.unwrap_or(false);
if !is_rebasing {
if session_id.as_deref() != Some(sid) {
ctx.renderer.write_raw(
"Warning: resolution session returned a different session ID than expected.\r\n",
);
}
return Ok(ResolveOutcome::Resolved { session_id, cost });
}
ctx.renderer
.write_raw("Rebase still in progress — nudging session to complete it.\r\n\r\n");
let nudge_sid = session_id.as_deref().unwrap_or(sid);
let PhaseOutcome::Completed {
cost: nudge_cost,
session_id: nudge_session_id,
..
} = run_phase_session(
"The rebase is still in progress — please run `git rebase --continue` to complete it.",
worktree_path,
extra_args,
Some(nudge_sid),
ctx,
)
.await?
else {
abort_and_reset(worktree_path, ctx.renderer, ctx.vcr).await?;
return Ok(ResolveOutcome::Exited);
};
let total_cost = cost + nudge_cost;
warn_clean(worktree_path, ctx.renderer, ctx.vcr).await?;
let is_rebasing = vcr_is_rebase_in_progress(ctx.vcr, wt_str.clone())
.await?
.unwrap_or(false);
if is_rebasing {
ctx.renderer
.write_raw("Rebase still in progress after nudge — aborting this attempt.\r\n");
vcr_abort_rebase(ctx.vcr, wt_str).await??;
return Ok(ResolveOutcome::Incomplete {
session_id: nudge_session_id,
cost: total_cost,
});
}
Ok(ResolveOutcome::Resolved {
session_id: nudge_session_id,
cost: total_cost,
})
}
async fn wait_for_enter_or_exit(io: &mut Io) -> Result<bool> {
loop {
let io_event = io.next_event().await?;
if let IoEvent::Terminal(Event::Key(key_event)) = io_event {
match key_event.code {
KeyCode::Enter => return Ok(false),
KeyCode::Char('c' | 'd') if key_event.modifiers.contains(KeyModifiers::CONTROL) => {
return Ok(true);
}
_ => {}
}
}
}
}
async fn abort_and_reset<W: Write>(
worktree_path: &Path,
renderer: &mut Renderer<W>,
vcr: &VcrContext,
) -> Result<()> {
let wt_str = worktree_path.display().to_string();
let _ = vcr_abort_rebase(vcr, wt_str.clone()).await?;
vcr.call_typed_err("worktree::reset_to_main", wt_str, async |p: &String| {
worktree::reset_to_main(Path::new(p))
})
.await??;
warn_clean(worktree_path, renderer, vcr).await?;
Ok(())
}
async fn warn_clean<W: Write>(
worktree_path: &Path,
renderer: &mut Renderer<W>,
vcr: &VcrContext,
) -> Result<()> {
let wt_str = worktree_path.display().to_string();
if let Err(e) = vcr
.call_typed_err("worktree::clean", wt_str, async |p: &String| {
worktree::clean(Path::new(p))
})
.await?
{
renderer.write_raw(&format!("Warning: worktree clean failed: {e}\r\n"));
}
Ok(())
}
async fn vcr_abort_rebase(
vcr: &VcrContext,
wt_str: String,
) -> Result<Result<(), worktree::WorktreeError>> {
vcr.call_typed_err("worktree::abort_rebase", wt_str, async |p: &String| {
worktree::abort_rebase(Path::new(p))
})
.await
}
async fn vcr_has_unique_commits(
vcr: &VcrContext,
wt_str: String,
) -> Result<Result<bool, worktree::WorktreeError>> {
vcr.call_typed_err(
"worktree::has_unique_commits",
wt_str,
async |p: &String| worktree::has_unique_commits(Path::new(p)),
)
.await
}
async fn vcr_is_rebase_in_progress(
vcr: &VcrContext,
wt_str: String,
) -> Result<Result<bool, worktree::WorktreeError>> {
vcr.call_typed_err(
"worktree::is_rebase_in_progress",
wt_str,
async |p: &String| worktree::is_rebase_in_progress(Path::new(p)),
)
.await
}
async fn vcr_main_head_sha(vcr: &VcrContext, wt_str: String) -> Result<String> {
vcr.call("main_head_sha", wt_str, async |p: &String| {
main_head_sha(Path::new(p))
})
.await
}
async fn vcr_update_worker_state(
vcr: &VcrContext,
path: &str,
branch: &str,
agent: Option<&str>,
args: &HashMap<String, String>,
) -> Result<()> {
vcr.call(
"worker_state::update",
WorkerUpdateArgs {
path: path.to_string(),
branch: branch.to_string(),
agent: agent.map(String::from),
args: args.clone(),
},
async |a: &WorkerUpdateArgs| {
worker_state::update(Path::new(&a.path), &a.branch, a.agent.as_deref(), &a.args)
},
)
.await
}
enum PhaseOutcome {
Completed {
result_text: String,
cost: f64,
session_id: Option<String>,
},
Exited,
}
async fn run_phase_session<W: Write>(
prompt: &str,
working_dir: &Path,
extra_args: &[String],
resume: Option<&str>,
ctx: &mut PhaseContext<'_, W>,
) -> Result<PhaseOutcome> {
let append_system_prompt = ctx
.fork_config
.map(|_| fork::fork_system_prompt().to_string());
let session_config = SessionConfig {
prompt: Some(prompt.to_string()),
extra_args: extra_args.to_vec(),
append_system_prompt,
resume: resume.map(String::from),
working_dir: Some(working_dir.to_path_buf()),
};
let mut runner = session_loop::spawn_session(session_config, ctx.io, ctx.vcr).await?;
let mut state = SessionState::default();
loop {
let outcome = session_loop::run_session(
&mut runner,
&mut state,
ctx.renderer,
ctx.input,
ctx.io,
ctx.vcr,
ctx.fork_config,
)
.await?;
runner.close_input();
let _ = runner.wait().await;
match outcome {
SessionOutcome::Completed { result_text } => {
return Ok(PhaseOutcome::Completed {
result_text,
cost: state.total_cost_usd,
session_id: state.session_id.clone(),
});
}
SessionOutcome::Interrupted => {
ctx.io.clear_event_channel();
let Some(session_id) = state.session_id.take() else {
return Ok(PhaseOutcome::Exited);
};
ctx.renderer.render_interrupted();
match session_loop::wait_for_user_input(ctx.input, ctx.renderer, ctx.io, ctx.vcr)
.await?
{
Some(text) => {
let resume_config = SessionConfig {
prompt: Some(text),
extra_args: extra_args.to_vec(),
append_system_prompt: ctx
.fork_config
.map(|_| fork::fork_system_prompt().to_string()),
resume: Some(session_id),
working_dir: Some(working_dir.to_path_buf()),
};
runner =
session_loop::spawn_session(resume_config, ctx.io, ctx.vcr).await?;
state = SessionState::default();
}
None => return Ok(PhaseOutcome::Exited),
}
}
SessionOutcome::ProcessExited => {
return Ok(PhaseOutcome::Exited);
}
}
}
}
enum WaitOutcome {
NewCommits,
Exited,
}
async fn wait_for_new_commits<W: Write>(
worktree_path: &Path,
renderer: &mut Renderer<W>,
input: &mut InputHandler,
io: &mut Io,
vcr: &VcrContext,
) -> Result<WaitOutcome> {
let wt_str = worktree_path.display().to_string();
let initial_head = vcr_main_head_sha(vcr, wt_str.clone()).await?;
loop {
tokio::select! {
() = sleep(Duration::from_secs(10)) => {
let current = vcr_main_head_sha(vcr, wt_str.clone()).await?;
if current != initial_head {
renderer.write_raw("New commits detected on main.\r\n");
return Ok(WaitOutcome::NewCommits);
}
}
event = vcr.call("next_event", (), async |(): &()| io.next_event().await) => {
let event = event?;
if let IoEvent::Terminal(Event::Key(key_event)) = event {
let action = input.handle_key(&key_event);
if matches!(action, InputAction::Interrupt | InputAction::EndSession) {
return Ok(WaitOutcome::Exited);
}
}
}
}
}
}
fn main_head_sha(worktree_path: &Path) -> Result<String> {
let output = std::process::Command::new("git")
.arg("-C")
.arg(worktree_path)
.args(["worktree", "list", "--porcelain"])
.output()
.context("failed to run git worktree list")?;
let stdout = String::from_utf8_lossy(&output.stdout);
let main_branch = stdout
.lines()
.find_map(|line| line.strip_prefix("branch refs/heads/"))
.context("could not find main branch in worktree list")?;
let output = std::process::Command::new("git")
.arg("-C")
.arg(worktree_path)
.args(["rev-parse", main_branch])
.output()
.context("failed to run git rev-parse")?;
Ok(String::from_utf8_lossy(&output.stdout).trim().to_string())
}