1use std::sync::Arc;
2
3use anyhow::{Context, Result};
4use tokio::sync::Mutex;
5use tokio_util::sync::CancellationToken;
6
7use super::{GlobalOpts, OnArgs};
8use crate::{
9 config::{Config, IssueSource},
10 github::GhClient,
11 issues::{IssueProvider, github::GithubIssueProvider, local::LocalIssueProvider},
12 pipeline::{executor::PipelineExecutor, runner},
13 process::RealCommandRunner,
14};
15
16pub async fn run(args: OnArgs, global: &GlobalOpts) -> Result<()> {
17 let project_dir = std::env::current_dir().context("getting current directory")?;
18 let config = Config::load(&project_dir)?;
19
20 let run_id = args.run_id.clone().unwrap_or_else(crate::pipeline::executor::generate_run_id);
21
22 if args.detached {
24 return spawn_detached(&project_dir, &args, &run_id);
25 }
26
27 let log_dir = project_dir.join(".oven").join("logs").join(&run_id);
29 std::fs::create_dir_all(&log_dir)
30 .with_context(|| format!("creating log dir: {}", log_dir.display()))?;
31 let _guard = crate::logging::init_with_file(&log_dir, global.verbose);
32
33 println!("{run_id}");
34
35 let cancel_token = CancellationToken::new();
36 let cancel_for_signal = cancel_token.clone();
37
38 tokio::spawn(async move {
40 if tokio::signal::ctrl_c().await.is_ok() {
41 tracing::info!("received ctrl-c, shutting down");
42 cancel_for_signal.cancel();
43 }
44 });
45
46 let runner = Arc::new(RealCommandRunner);
47 let github = Arc::new(GhClient::new(RealCommandRunner, &project_dir));
48 let db_path = project_dir.join(".oven").join("oven.db");
49 let conn = crate::db::open(&db_path)?;
50 let db = Arc::new(Mutex::new(conn));
51
52 let issues: Arc<dyn IssueProvider> = match config.project.issue_source {
54 IssueSource::Github => {
55 Arc::new(GithubIssueProvider::new(Arc::clone(&github), &config.multi_repo.target_field))
56 }
57 IssueSource::Local => Arc::new(LocalIssueProvider::new(&project_dir)),
58 };
59
60 let executor = Arc::new(PipelineExecutor {
61 runner,
62 github,
63 issues: Arc::clone(&issues),
64 db,
65 config: config.clone(),
66 cancel_token: cancel_token.clone(),
67 repo_dir: project_dir,
68 });
69
70 if let Some(ids_str) = &args.ids {
71 let ids = parse_issue_ids(ids_str)?;
73 let mut fetched = Vec::new();
74 for id in ids {
75 let issue = issues.get_issue(id).await?;
76 fetched.push(issue);
77 }
78
79 runner::run_batch(&executor, fetched, config.pipeline.max_parallel as usize, args.merge)
80 .await?;
81 } else {
82 runner::polling_loop(executor, args.merge, cancel_token).await?;
84 }
85
86 Ok(())
87}
88
89fn parse_issue_ids(ids: &str) -> Result<Vec<u32>> {
90 ids.split(',')
91 .map(|s| s.trim().parse::<u32>().with_context(|| format!("invalid issue number: {s}")))
92 .collect()
93}
94
95fn spawn_detached(project_dir: &std::path::Path, args: &OnArgs, run_id: &str) -> Result<()> {
96 use std::io::Write;
97 let exe = std::env::current_exe().context("finding current executable")?;
98
99 let mut cmd_args = vec!["on".to_string()];
100 if let Some(ref ids) = args.ids {
101 cmd_args.push(ids.clone());
102 }
103 if args.merge {
104 cmd_args.push("-m".to_string());
105 }
106 cmd_args.extend(["--run-id".to_string(), run_id.to_string()]);
107
108 let log_dir = project_dir.join(".oven").join("logs");
109 std::fs::create_dir_all(&log_dir).context("creating log dir for detached")?;
110
111 let stdout = std::fs::File::create(log_dir.join("detached.stdout"))
112 .context("creating detached stdout")?;
113 let stderr = std::fs::File::create(log_dir.join("detached.stderr"))
114 .context("creating detached stderr")?;
115
116 let child = std::process::Command::new(exe)
117 .args(&cmd_args)
118 .stdout(stdout)
119 .stderr(stderr)
120 .spawn()
121 .context("spawning detached process")?;
122
123 let pid_path = project_dir.join(".oven").join("oven.pid");
124 match std::fs::File::create_new(&pid_path) {
125 Ok(mut f) => {
126 write!(f, "{}", child.id()).context("writing PID file")?;
127 }
128 Err(e) if e.kind() == std::io::ErrorKind::AlreadyExists => {
129 if let Ok(existing) = std::fs::read_to_string(&pid_path) {
131 if let Ok(existing_pid) = existing.trim().parse::<u32>() {
132 let alive = std::process::Command::new("kill")
133 .args(["-0", &existing_pid.to_string()])
134 .status()
135 .is_ok_and(|s| s.success());
136 if alive {
137 anyhow::bail!(
138 "oven is already running (pid {existing_pid}). Use 'oven off' to stop it."
139 );
140 }
141 }
142 }
143 std::fs::remove_file(&pid_path).ok();
145 let mut f = std::fs::File::create_new(&pid_path).context("writing PID file")?;
146 write!(f, "{}", child.id()).context("writing PID to file")?;
147 }
148 Err(e) => return Err(e).context("creating PID file"),
149 }
150
151 println!("{run_id}");
152 Ok(())
153}
154
155#[cfg(test)]
156mod tests {
157 use super::*;
158
159 #[test]
160 fn parse_single_id() {
161 let ids = parse_issue_ids("42").unwrap();
162 assert_eq!(ids, vec![42]);
163 }
164
165 #[test]
166 fn parse_multiple_ids() {
167 let ids = parse_issue_ids("1,2,3").unwrap();
168 assert_eq!(ids, vec![1, 2, 3]);
169 }
170
171 #[test]
172 fn parse_ids_with_spaces() {
173 let ids = parse_issue_ids("1, 2, 3").unwrap();
174 assert_eq!(ids, vec![1, 2, 3]);
175 }
176
177 #[test]
178 fn parse_invalid_id_fails() {
179 let result = parse_issue_ids("1,abc,3");
180 assert!(result.is_err());
181 }
182}