Skip to main content

agent_procs/cli/
up.rs

1use crate::cli;
2use crate::config::{load_config, resolve_session};
3use crate::protocol::{Request, Response, RestartPolicy, WatchConfig};
4use futures::future::join_all;
5
6pub async fn execute(
7    cli_session: Option<&str>,
8    only: Option<&str>,
9    config_path: Option<&str>,
10    proxy: bool,
11) -> i32 {
12    let (path, config) = match load_config(config_path) {
13        Ok(c) => c,
14        Err(e) => {
15            eprintln!("error: {}", e);
16            return 1;
17        }
18    };
19
20    let session = resolve_session(cli_session, config.session.as_deref());
21
22    let only_set: Option<Vec<&str>> = only.map(|s| s.split(',').collect());
23
24    let groups = match config.startup_order() {
25        Ok(g) => g,
26        Err(e) => {
27            eprintln!("error: {}", e);
28            return 1;
29        }
30    };
31
32    let enable_proxy = proxy || config.proxy.unwrap_or(false);
33
34    if enable_proxy && let Some(code) = cli::enable_proxy(session, config.proxy_port).await {
35        return code;
36    }
37
38    for group in &groups {
39        let names: Vec<&String> = group
40            .iter()
41            .filter(|name| {
42                only_set
43                    .as_ref()
44                    .is_none_or(|only| only.contains(&name.as_str()))
45            })
46            .collect();
47
48        // Start all processes in this group concurrently
49        let start_futures: Vec<_> = names
50            .iter()
51            .map(|name| {
52                let def = &config.processes[*name];
53
54                let resolved_cwd = def.cwd.as_ref().map(|c| {
55                    let p = std::path::Path::new(c);
56                    if p.is_relative() {
57                        path.parent()
58                            .unwrap_or(std::path::Path::new("."))
59                            .join(p)
60                            .to_string_lossy()
61                            .to_string()
62                    } else {
63                        c.clone()
64                    }
65                });
66
67                let env = if def.env.is_empty() {
68                    None
69                } else {
70                    Some(def.env.clone())
71                };
72
73                let restart = def
74                    .autorestart
75                    .as_ref()
76                    .map(|m| RestartPolicy::from_args(m, def.max_restarts, def.restart_delay));
77
78                let watch = def.watch.as_ref().and_then(|paths| {
79                    WatchConfig::from_args(
80                        paths.clone(),
81                        def.watch_ignore.clone().unwrap_or_default(),
82                    )
83                });
84
85                let req = Request::Run {
86                    command: def.cmd.clone(),
87                    name: Some((*name).clone()),
88                    cwd: resolved_cwd,
89                    env,
90                    port: def.port,
91                    restart,
92                    watch,
93                };
94                let name = (*name).clone();
95                async move {
96                    let result = cli::request(session, &req, true).await;
97                    (name, result)
98                }
99            })
100            .collect();
101
102        let results = join_all(start_futures).await;
103
104        for (name, result) in &results {
105            match result {
106                Ok(Response::RunOk {
107                    name, id, pid, url, ..
108                }) => match url {
109                    Some(u) => println!("started {} (id: {}, pid: {}, {})", name, id, pid, u),
110                    None => println!("started {} (id: {}, pid: {})", name, id, pid),
111                },
112                Ok(Response::Error { code, message }) => {
113                    eprintln!("error starting {}: {}", name, message);
114                    return code.exit_code();
115                }
116                _ => return 1,
117            }
118        }
119
120        // Wait for ready patterns concurrently within the group
121        let ready_futures: Vec<_> = names
122            .iter()
123            .filter_map(|name| {
124                let def = &config.processes[*name];
125                def.ready.as_ref().map(|ready| {
126                    let req = Request::Wait {
127                        target: (*name).clone(),
128                        until: Some(ready.clone()),
129                        regex: false,
130                        exit: false,
131                        timeout_secs: Some(30),
132                    };
133                    let name = (*name).clone();
134                    async move {
135                        let result = cli::request(session, &req, false).await;
136                        (name, result)
137                    }
138                })
139            })
140            .collect();
141
142        let ready_results = join_all(ready_futures).await;
143
144        for (name, result) in &ready_results {
145            match result {
146                Ok(Response::WaitMatch { .. }) => println!("{} is ready", name),
147                Ok(Response::WaitTimeout) => {
148                    eprintln!("warning: {} did not become ready within 30s", name);
149                }
150                Ok(Response::Error { message, .. }) => {
151                    eprintln!("error waiting for {}: {}", name, message);
152                    return 1;
153                }
154                _ => {}
155            }
156        }
157    }
158
159    println!("all processes started");
160    0
161}