vertigo_cli/watch/
watch_run.rs

1use actix_cors::Cors;
2use actix_web::{App, HttpServer, rt::System, web};
3use notify::RecursiveMode;
4use std::{path::Path, process::exit, sync::Arc, time::Duration};
5use tokio::{
6    sync::{Notify, watch::Sender},
7    time::sleep,
8};
9use tokio_retry::{Retry, strategy::FibonacciBackoff};
10
11use crate::build::{Workspace, get_workspace};
12use crate::commons::{
13    ErrorCode,
14    spawn::{SpawnOwner, term_signal},
15};
16
17use super::{
18    ignore_agent::IgnoreAgents, is_http_server_listening::is_http_server_listening,
19    sse::handler_sse, watch_opts::WatchOpts,
20};
21
22#[derive(Clone, Debug, Default, PartialEq)]
23pub enum Status {
24    #[default]
25    Building,
26    Version(u32),
27    Errors,
28}
29
30pub async fn run(mut opts: WatchOpts) -> Result<(), ErrorCode> {
31    log::info!("watch params => {opts:#?}");
32
33    let ws = match get_workspace() {
34        Ok(ws) => ws,
35        Err(err) => {
36            log::error!("Can't read workspace");
37            return Err(err);
38        }
39    };
40
41    let package_name = match opts.build.package_name.as_deref() {
42        Some(name) => name.to_string(),
43        None => match ws.infer_package_name() {
44            Some(name) => {
45                log::info!("Inferred package name = {name}");
46                opts.build.package_name = Some(name.clone());
47                name
48            }
49            None => {
50                log::error!(
51                    "Can't find vertigo project in {} (no cdylib member)",
52                    ws.get_root_dir()
53                );
54                return Err(ErrorCode::CantFindCdylibMember);
55            }
56        },
57    };
58
59    log::info!("package_name ==> {package_name:?}");
60
61    let root = ws.find_package_path(&package_name);
62    log::info!("path ==> {root:?}");
63
64    let Some(root) = root else {
65        log::error!("package not found ==> {:?}", opts.build.package_name);
66        return Err(ErrorCode::PackageNameNotFound);
67    };
68
69    // If optimization params not provided, default to false
70    if opts.build.release_mode.is_none() {
71        opts.build.release_mode = Some(false);
72    }
73    if opts.build.wasm_opt.is_none() {
74        opts.build.wasm_opt = Some(false);
75    }
76
77    let excludes = [root.join("target"), root.join(opts.common.dest_dir.clone())];
78
79    let notify_build = Arc::new(Notify::new());
80
81    let watch_result = notify::recommended_watcher({
82        let notify_build = notify_build.clone();
83
84        // Generate one Gitignore instance per every watched directory
85        let ignore_agents = IgnoreAgents::new(&ws.get_root_dir().into(), &opts);
86
87        move |res: Result<notify::Event, _>| match res {
88            Ok(event) => {
89                if let notify::EventKind::Access(_) = event.kind {
90                    return;
91                }
92
93                if event.paths.iter().all(|path| {
94                    // Check against hardcoded excludes
95                    for exclude_path in &excludes {
96                        if path.starts_with(exclude_path) {
97                            return true;
98                        }
99                    }
100                    // Check against ignore lists and custom excludes
101                    if ignore_agents.should_be_ignored(path) {
102                        return true;
103                    }
104                    false
105                }) {
106                    return;
107                }
108                notify_build.notify_one();
109            }
110            Err(err) => {
111                log::error!("watch error: {err:?}");
112            }
113        }
114    });
115
116    let mut watcher = match watch_result {
117        Ok(watcher) => watcher,
118        Err(error) => {
119            log::error!("error watcher => {error}");
120            return Err(ErrorCode::WatcherError);
121        }
122    };
123
124    let (tx, rx) = tokio::sync::watch::channel(Status::default());
125    let tx = Arc::new(tx);
126
127    let watch_server = HttpServer::new(move || {
128        App::new()
129            .wrap(Cors::permissive())
130            .service(web::resource("/events").route(web::get().to(handler_sse)))
131            .app_data(web::Data::new(rx.clone()))
132    })
133    .workers(1)
134    .bind("127.0.0.1:5555")
135    .map_err(|err| {
136        log::error!("Watch server bind error: {err}");
137        ErrorCode::WatcherError
138    })?
139    .disable_signals()
140    .client_disconnect_timeout(Duration::from_secs(1))
141    .shutdown_timeout(1)
142    .run();
143
144    let watch_handle = watch_server.handle();
145
146    std::thread::spawn(move || System::new().block_on(watch_server));
147
148    use notify::Watcher;
149    watcher
150        .watch(&root, RecursiveMode::Recursive)
151        .map_err(|err| {
152            log::error!("Can't watch root dir {}: {err}", root.to_string_lossy());
153            ErrorCode::CantAddWatchDir
154        })?;
155
156    for watch_path in &opts.add_watch_path {
157        match watcher.watch(Path::new(watch_path), RecursiveMode::Recursive) {
158            Ok(()) => {
159                log::info!("Added `{watch_path}` to watched directories");
160            }
161            Err(err) => {
162                log::error!("Error adding watch dir `{watch_path}`: {err}");
163                return Err(ErrorCode::CantAddWatchDir);
164            }
165        }
166    }
167    let mut version = 0;
168
169    loop {
170        version += 1;
171
172        if let Err(err) = tx.send(Status::Building) {
173            log::error!("Can't contact the browser: {err} (Other watch process already running?)");
174            return Err(ErrorCode::OtherProcessAlreadyRunning);
175        };
176
177        sleep(Duration::from_millis(200)).await;
178
179        log::info!("Build run...");
180
181        let spawn = build_and_watch(version, tx.clone(), &opts, &ws);
182
183        tokio::select! {
184            msg = term_signal() => {
185                log::info!("{msg} received, shutting down");
186                spawn.off();
187                watch_handle.stop(true).await;
188                return Ok(());
189            }
190            _ = notify_build.notified() => {
191                log::info!("Notify build received, shutting down");
192                spawn.off();
193            }
194        }
195    }
196}
197
198fn build_and_watch(
199    version: u32,
200    tx: Arc<Sender<Status>>,
201    opts: &WatchOpts,
202    ws: &Workspace,
203) -> SpawnOwner {
204    let opts = opts.clone();
205    let ws = ws.clone();
206    SpawnOwner::new(async move {
207        match crate::build::run_with_ws(opts.to_build_opts(), &ws, true) {
208            Ok(()) => {
209                log::info!("Build successful.");
210
211                let check_spawn = SpawnOwner::new(async move {
212                    let _ = Retry::spawn(
213                        FibonacciBackoff::from_millis(100).max_delay(Duration::from_secs(4)),
214                        || is_http_server_listening(opts.serve.port),
215                    )
216                    .await;
217
218                    let Ok(()) = tx.send(Status::Version(version)) else {
219                        exit(ErrorCode::WatchPipeBroken as i32)
220                    };
221                });
222
223                let opts = opts.clone();
224
225                log::info!("Spawning serve command...");
226                let (mut serve_params, port_watch) = opts.to_serve_opts();
227
228                if serve_params.inner.threads.is_none() {
229                    serve_params.inner.threads = Some(2);
230                }
231
232                if let Err(error_code) = crate::serve::run(serve_params, Some(port_watch)).await {
233                    log::error!("Error {} while running server", error_code as i32);
234                    exit(error_code as i32)
235                }
236
237                check_spawn.off();
238            }
239            Err(_) => {
240                log::error!("Build run failed. Waiting for changes...");
241
242                let Ok(()) = tx.send(Status::Errors) else {
243                    exit(ErrorCode::WatchPipeBroken as i32)
244                };
245            }
246        };
247    })
248}