Skip to main content

oven_cli/cli/
on.rs

1use std::sync::Arc;
2
3use anyhow::{Context, Result};
4use tokio::sync::Mutex;
5use tokio_util::sync::CancellationToken;
6
7use super::{GlobalOpts, OnArgs};
8use crate::{
9    config::{Config, IssueSource},
10    github::GhClient,
11    issues::{IssueProvider, github::GithubIssueProvider, local::LocalIssueProvider},
12    pipeline::{executor::PipelineExecutor, runner},
13    process::RealCommandRunner,
14};
15
16pub async fn run(args: OnArgs, global: &GlobalOpts) -> Result<()> {
17    let project_dir = std::env::current_dir().context("getting current directory")?;
18    let config = Config::load(&project_dir)?;
19
20    let run_id = args.run_id.clone().unwrap_or_else(crate::pipeline::executor::generate_run_id);
21
22    // Detached mode: re-spawn self without -d flag
23    if args.detached {
24        return spawn_detached(&project_dir, &args, &run_id);
25    }
26
27    // Set up logging
28    let log_dir = project_dir.join(".oven").join("logs").join(&run_id);
29    std::fs::create_dir_all(&log_dir)
30        .with_context(|| format!("creating log dir: {}", log_dir.display()))?;
31    let _guard = crate::logging::init_with_file(&log_dir, global.verbose);
32
33    println!("{run_id}");
34
35    let cancel_token = CancellationToken::new();
36    let cancel_for_signal = cancel_token.clone();
37
38    // Signal handler
39    tokio::spawn(async move {
40        if tokio::signal::ctrl_c().await.is_ok() {
41            tracing::info!("received ctrl-c, shutting down");
42            cancel_for_signal.cancel();
43        }
44    });
45
46    let runner = Arc::new(RealCommandRunner);
47    let github = Arc::new(GhClient::new(RealCommandRunner, &project_dir));
48    let db_path = project_dir.join(".oven").join("oven.db");
49    let conn = crate::db::open(&db_path)?;
50    let db = Arc::new(Mutex::new(conn));
51
52    // Build the issue provider based on config
53    let issues: Arc<dyn IssueProvider> = match config.project.issue_source {
54        IssueSource::Github => {
55            Arc::new(GithubIssueProvider::new(Arc::clone(&github), &config.multi_repo.target_field))
56        }
57        IssueSource::Local => Arc::new(LocalIssueProvider::new(&project_dir)),
58    };
59
60    let executor = Arc::new(PipelineExecutor {
61        runner,
62        github,
63        issues: Arc::clone(&issues),
64        db,
65        config: config.clone(),
66        cancel_token: cancel_token.clone(),
67        repo_dir: project_dir,
68    });
69
70    if let Some(ids_str) = &args.ids {
71        // Run specific issues
72        let ids = parse_issue_ids(ids_str)?;
73        let mut fetched = Vec::new();
74        for id in ids {
75            let issue = issues.get_issue(id).await?;
76            fetched.push(issue);
77        }
78
79        runner::run_batch(&executor, fetched, config.pipeline.max_parallel as usize, args.merge)
80            .await?;
81    } else {
82        // Polling mode
83        runner::polling_loop(executor, args.merge, cancel_token).await?;
84    }
85
86    Ok(())
87}
88
89fn parse_issue_ids(ids: &str) -> Result<Vec<u32>> {
90    ids.split(',')
91        .map(|s| s.trim().parse::<u32>().with_context(|| format!("invalid issue number: {s}")))
92        .collect()
93}
94
95fn spawn_detached(project_dir: &std::path::Path, args: &OnArgs, run_id: &str) -> Result<()> {
96    let exe = std::env::current_exe().context("finding current executable")?;
97
98    let mut cmd_args = vec!["on".to_string()];
99    if let Some(ref ids) = args.ids {
100        cmd_args.push(ids.clone());
101    }
102    if args.merge {
103        cmd_args.push("-m".to_string());
104    }
105    cmd_args.extend(["--run-id".to_string(), run_id.to_string()]);
106
107    let log_dir = project_dir.join(".oven").join("logs");
108    std::fs::create_dir_all(&log_dir).context("creating log dir for detached")?;
109
110    let stdout = std::fs::File::create(log_dir.join("detached.stdout"))
111        .context("creating detached stdout")?;
112    let stderr = std::fs::File::create(log_dir.join("detached.stderr"))
113        .context("creating detached stderr")?;
114
115    let child = std::process::Command::new(exe)
116        .args(&cmd_args)
117        .stdout(stdout)
118        .stderr(stderr)
119        .spawn()
120        .context("spawning detached process")?;
121
122    let pid_path = project_dir.join(".oven").join("oven.pid");
123    std::fs::write(&pid_path, child.id().to_string()).context("writing PID file")?;
124
125    println!("{run_id}");
126    Ok(())
127}
128
129#[cfg(test)]
130mod tests {
131    use super::*;
132
133    #[test]
134    fn parse_single_id() {
135        let ids = parse_issue_ids("42").unwrap();
136        assert_eq!(ids, vec![42]);
137    }
138
139    #[test]
140    fn parse_multiple_ids() {
141        let ids = parse_issue_ids("1,2,3").unwrap();
142        assert_eq!(ids, vec![1, 2, 3]);
143    }
144
145    #[test]
146    fn parse_ids_with_spaces() {
147        let ids = parse_issue_ids("1, 2, 3").unwrap();
148        assert_eq!(ids, vec![1, 2, 3]);
149    }
150
151    #[test]
152    fn parse_invalid_id_fails() {
153        let result = parse_issue_ids("1,abc,3");
154        assert!(result.is_err());
155    }
156}