1use std::sync::Arc;
2
3use anyhow::{Context, Result};
4use tokio::sync::Mutex;
5use tokio_util::sync::CancellationToken;
6
7use super::{GlobalOpts, OnArgs};
8use crate::{
9 config::{Config, IssueSource},
10 github::GhClient,
11 issues::{IssueProvider, github::GithubIssueProvider, local::LocalIssueProvider},
12 pipeline::{executor::PipelineExecutor, runner},
13 process::RealCommandRunner,
14};
15
16pub async fn run(args: OnArgs, global: &GlobalOpts) -> Result<()> {
17 let project_dir = std::env::current_dir().context("getting current directory")?;
18 let config = Config::load(&project_dir)?;
19
20 let run_id = args.run_id.clone().unwrap_or_else(crate::pipeline::executor::generate_run_id);
21
22 if args.detached {
24 return spawn_detached(&project_dir, &args, &run_id);
25 }
26
27 let log_dir = project_dir.join(".oven").join("logs").join(&run_id);
29 std::fs::create_dir_all(&log_dir)
30 .with_context(|| format!("creating log dir: {}", log_dir.display()))?;
31 let _guard = crate::logging::init_with_file(&log_dir, global.verbose);
32
33 println!("{run_id}");
34
35 let cancel_token = CancellationToken::new();
36 let cancel_for_signal = cancel_token.clone();
37
38 tokio::spawn(async move {
40 if tokio::signal::ctrl_c().await.is_ok() {
41 tracing::info!("received ctrl-c, shutting down");
42 cancel_for_signal.cancel();
43 }
44 });
45
46 let runner = Arc::new(RealCommandRunner);
47 let github = Arc::new(GhClient::new(RealCommandRunner, &project_dir));
48 let db_path = project_dir.join(".oven").join("oven.db");
49 let conn = crate::db::open(&db_path)?;
50 let db = Arc::new(Mutex::new(conn));
51
52 let issues: Arc<dyn IssueProvider> = match config.project.issue_source {
54 IssueSource::Github => {
55 Arc::new(GithubIssueProvider::new(Arc::clone(&github), &config.multi_repo.target_field))
56 }
57 IssueSource::Local => Arc::new(LocalIssueProvider::new(&project_dir)),
58 };
59
60 let executor = Arc::new(PipelineExecutor {
61 runner,
62 github,
63 issues: Arc::clone(&issues),
64 db,
65 config: config.clone(),
66 cancel_token: cancel_token.clone(),
67 repo_dir: project_dir,
68 });
69
70 if let Some(ids_str) = &args.ids {
71 let ids = parse_issue_ids(ids_str)?;
73 let mut fetched = Vec::new();
74 for id in ids {
75 let issue = issues.get_issue(id).await?;
76 fetched.push(issue);
77 }
78
79 runner::run_batch(&executor, fetched, config.pipeline.max_parallel as usize, args.merge)
80 .await?;
81 } else {
82 runner::polling_loop(executor, args.merge, cancel_token).await?;
84 }
85
86 Ok(())
87}
88
89fn parse_issue_ids(ids: &str) -> Result<Vec<u32>> {
90 ids.split(',')
91 .map(|s| s.trim().parse::<u32>().with_context(|| format!("invalid issue number: {s}")))
92 .collect()
93}
94
95fn spawn_detached(project_dir: &std::path::Path, args: &OnArgs, run_id: &str) -> Result<()> {
96 let exe = std::env::current_exe().context("finding current executable")?;
97
98 let mut cmd_args = vec!["on".to_string()];
99 if let Some(ref ids) = args.ids {
100 cmd_args.push(ids.clone());
101 }
102 if args.merge {
103 cmd_args.push("-m".to_string());
104 }
105 cmd_args.extend(["--run-id".to_string(), run_id.to_string()]);
106
107 let log_dir = project_dir.join(".oven").join("logs");
108 std::fs::create_dir_all(&log_dir).context("creating log dir for detached")?;
109
110 let stdout = std::fs::File::create(log_dir.join("detached.stdout"))
111 .context("creating detached stdout")?;
112 let stderr = std::fs::File::create(log_dir.join("detached.stderr"))
113 .context("creating detached stderr")?;
114
115 let child = std::process::Command::new(exe)
116 .args(&cmd_args)
117 .stdout(stdout)
118 .stderr(stderr)
119 .spawn()
120 .context("spawning detached process")?;
121
122 let pid_path = project_dir.join(".oven").join("oven.pid");
123 std::fs::write(&pid_path, child.id().to_string()).context("writing PID file")?;
124
125 println!("{run_id}");
126 Ok(())
127}
128
129#[cfg(test)]
130mod tests {
131 use super::*;
132
133 #[test]
134 fn parse_single_id() {
135 let ids = parse_issue_ids("42").unwrap();
136 assert_eq!(ids, vec![42]);
137 }
138
139 #[test]
140 fn parse_multiple_ids() {
141 let ids = parse_issue_ids("1,2,3").unwrap();
142 assert_eq!(ids, vec![1, 2, 3]);
143 }
144
145 #[test]
146 fn parse_ids_with_spaces() {
147 let ids = parse_issue_ids("1, 2, 3").unwrap();
148 assert_eq!(ids, vec![1, 2, 3]);
149 }
150
151 #[test]
152 fn parse_invalid_id_fails() {
153 let result = parse_issue_ids("1,abc,3");
154 assert!(result.is_err());
155 }
156}