1use std::sync::Arc;
2
3use anyhow::{Context, Result};
4use tokio::sync::Mutex;
5use tokio_util::sync::CancellationToken;
6
7use super::{GlobalOpts, OnArgs};
8use crate::{
9 config::{Config, IssueSource},
10 github::GhClient,
11 issues::{IssueProvider, github::GithubIssueProvider, local::LocalIssueProvider},
12 pipeline::{executor::PipelineExecutor, runner},
13 process::RealCommandRunner,
14};
15
16pub async fn run(args: OnArgs, global: &GlobalOpts) -> Result<()> {
17 let project_dir = std::env::current_dir().context("getting current directory")?;
18 let config = Config::load(&project_dir)?;
19
20 let run_id = args.run_id.clone().unwrap_or_else(crate::pipeline::executor::generate_run_id);
21
22 if args.detached {
24 return spawn_detached(&project_dir, &args, &run_id);
25 }
26
27 let log_dir = project_dir.join(".oven").join("logs").join(&run_id);
29 std::fs::create_dir_all(&log_dir)
30 .with_context(|| format!("creating log dir: {}", log_dir.display()))?;
31 let _guard = crate::logging::init_with_file(&log_dir, global.verbose);
32
33 println!("{run_id}");
34
35 let cancel_token = CancellationToken::new();
36 let cancel_for_signal = cancel_token.clone();
37
38 tokio::spawn(async move {
40 if tokio::signal::ctrl_c().await.is_ok() {
41 tracing::info!("received ctrl-c, shutting down");
42 cancel_for_signal.cancel();
43 }
44 });
45
46 let runner = Arc::new(RealCommandRunner);
47 let github = Arc::new(GhClient::new(RealCommandRunner, &project_dir));
48 let db_path = project_dir.join(".oven").join("oven.db");
49 let conn = crate::db::open(&db_path)?;
50 let db = Arc::new(Mutex::new(conn));
51
52 let issues: Arc<dyn IssueProvider> = match config.project.issue_source {
54 IssueSource::Github => {
55 Arc::new(GithubIssueProvider::new(Arc::clone(&github), &config.multi_repo.target_field))
56 }
57 IssueSource::Local => Arc::new(LocalIssueProvider::new(&project_dir)),
58 };
59
60 let executor = Arc::new(PipelineExecutor {
61 runner,
62 github,
63 issues: Arc::clone(&issues),
64 db,
65 config: config.clone(),
66 cancel_token: cancel_token.clone(),
67 repo_dir: project_dir,
68 });
69
70 if let Some(ids_str) = &args.ids {
71 let ids = parse_issue_ids(ids_str)?;
73 let mut fetched = Vec::new();
74 for id in ids {
75 let issue = issues.get_issue(id).await?;
76 fetched.push(issue);
77 }
78
79 if !args.trust && config.project.issue_source == IssueSource::Github {
81 let current_user = executor.github.get_current_user().await?;
82 validate_issue_authors(&fetched, ¤t_user)?;
83 }
84
85 runner::run_batch(&executor, fetched, config.pipeline.max_parallel as usize, args.merge)
86 .await?;
87 } else {
88 runner::polling_loop(executor, args.merge, cancel_token).await?;
90 }
91
92 Ok(())
93}
94
95fn validate_issue_authors(
96 issues: &[crate::issues::PipelineIssue],
97 current_user: &str,
98) -> Result<()> {
99 let mut mismatches = Vec::new();
100 for issue in issues {
101 match &issue.author {
102 Some(author) if author != current_user => {
103 mismatches.push((issue.number, author.as_str()));
104 }
105 None => {
106 mismatches.push((issue.number, "<unknown>"));
107 }
108 _ => {}
109 }
110 }
111
112 if !mismatches.is_empty() {
113 let details: Vec<String> = mismatches
114 .iter()
115 .map(|(num, author)| {
116 format!("issue #{num} was created by \"{author}\", not \"{current_user}\"")
117 })
118 .collect();
119 anyhow::bail!("{}. Use --trust to override.", details.join("; "));
120 }
121
122 Ok(())
123}
124
125fn parse_issue_ids(ids: &str) -> Result<Vec<u32>> {
126 ids.split(',')
127 .map(|s| s.trim().parse::<u32>().with_context(|| format!("invalid issue number: {s}")))
128 .collect()
129}
130
131fn spawn_detached(project_dir: &std::path::Path, args: &OnArgs, run_id: &str) -> Result<()> {
132 use std::io::Write;
133 let exe = std::env::current_exe().context("finding current executable")?;
134
135 let mut cmd_args = vec!["on".to_string()];
136 if let Some(ref ids) = args.ids {
137 cmd_args.push(ids.clone());
138 }
139 if args.merge {
140 cmd_args.push("-m".to_string());
141 }
142 if args.trust {
143 cmd_args.push("--trust".to_string());
144 }
145 cmd_args.extend(["--run-id".to_string(), run_id.to_string()]);
146
147 let log_dir = project_dir.join(".oven").join("logs");
148 std::fs::create_dir_all(&log_dir).context("creating log dir for detached")?;
149
150 let stdout = std::fs::File::create(log_dir.join("detached.stdout"))
151 .context("creating detached stdout")?;
152 let stderr = std::fs::File::create(log_dir.join("detached.stderr"))
153 .context("creating detached stderr")?;
154
155 let child = std::process::Command::new(exe)
156 .args(&cmd_args)
157 .stdout(stdout)
158 .stderr(stderr)
159 .spawn()
160 .context("spawning detached process")?;
161
162 let pid_path = project_dir.join(".oven").join("oven.pid");
163 match std::fs::File::create_new(&pid_path) {
164 Ok(mut f) => {
165 write!(f, "{}", child.id()).context("writing PID file")?;
166 }
167 Err(e) if e.kind() == std::io::ErrorKind::AlreadyExists => {
168 if let Ok(existing) = std::fs::read_to_string(&pid_path) {
170 if let Ok(existing_pid) = existing.trim().parse::<u32>() {
171 let alive = std::process::Command::new("kill")
172 .args(["-0", &existing_pid.to_string()])
173 .status()
174 .is_ok_and(|s| s.success());
175 if alive {
176 anyhow::bail!(
177 "oven is already running (pid {existing_pid}). Use 'oven off' to stop it."
178 );
179 }
180 }
181 }
182 std::fs::remove_file(&pid_path).ok();
184 let mut f = std::fs::File::create_new(&pid_path).context("writing PID file")?;
185 write!(f, "{}", child.id()).context("writing PID to file")?;
186 }
187 Err(e) => return Err(e).context("creating PID file"),
188 }
189
190 println!("{run_id}");
191 Ok(())
192}
193
194#[cfg(test)]
195mod tests {
196 use super::*;
197
198 #[test]
199 fn parse_single_id() {
200 let ids = parse_issue_ids("42").unwrap();
201 assert_eq!(ids, vec![42]);
202 }
203
204 #[test]
205 fn parse_multiple_ids() {
206 let ids = parse_issue_ids("1,2,3").unwrap();
207 assert_eq!(ids, vec![1, 2, 3]);
208 }
209
210 #[test]
211 fn parse_ids_with_spaces() {
212 let ids = parse_issue_ids("1, 2, 3").unwrap();
213 assert_eq!(ids, vec![1, 2, 3]);
214 }
215
216 #[test]
217 fn parse_invalid_id_fails() {
218 let result = parse_issue_ids("1,abc,3");
219 assert!(result.is_err());
220 }
221
222 fn make_pipeline_issue(number: u32, author: Option<&str>) -> crate::issues::PipelineIssue {
223 crate::issues::PipelineIssue {
224 number,
225 title: format!("Issue #{number}"),
226 body: String::new(),
227 source: crate::issues::IssueOrigin::Github,
228 target_repo: None,
229 author: author.map(String::from),
230 }
231 }
232
233 #[test]
234 fn validate_authors_passes_when_all_match() {
235 let issues =
236 vec![make_pipeline_issue(1, Some("alice")), make_pipeline_issue(2, Some("alice"))];
237 assert!(validate_issue_authors(&issues, "alice").is_ok());
238 }
239
240 #[test]
241 fn validate_authors_fails_on_mismatch() {
242 let issues =
243 vec![make_pipeline_issue(1, Some("alice")), make_pipeline_issue(2, Some("bob"))];
244 let err = validate_issue_authors(&issues, "alice").unwrap_err();
245 let msg = err.to_string();
246 assert!(msg.contains("issue #2"));
247 assert!(msg.contains("bob"));
248 assert!(msg.contains("alice"));
249 assert!(msg.contains("--trust"));
250 }
251
252 #[test]
253 fn validate_authors_fails_fast_on_multiple_mismatches() {
254 let issues =
255 vec![make_pipeline_issue(1, Some("eve")), make_pipeline_issue(2, Some("mallory"))];
256 let err = validate_issue_authors(&issues, "alice").unwrap_err();
257 let msg = err.to_string();
258 assert!(msg.contains("issue #1"));
259 assert!(msg.contains("issue #2"));
260 }
261
262 #[test]
263 fn validate_authors_rejects_none_author() {
264 let issues = vec![make_pipeline_issue(1, None)];
265 let err = validate_issue_authors(&issues, "alice").unwrap_err();
266 let msg = err.to_string();
267 assert!(msg.contains("issue #1"));
268 assert!(msg.contains("<unknown>"));
269 assert!(msg.contains("--trust"));
270 }
271
272 #[test]
273 fn validate_authors_empty_issues() {
274 assert!(validate_issue_authors(&[], "alice").is_ok());
275 }
276}