vertigo_cli/watch/
watch_run.rs

1use notify::RecursiveMode;
2use poem::http::Method;
3use poem::middleware::Cors;
4use poem::{get, listener::TcpListener, EndpointExt, Route, Server};
5use std::path::Path;
6use std::process::exit;
7use std::sync::Arc;
8use std::time::Duration;
9use tokio::sync::watch::Sender;
10use tokio::sync::Notify;
11use tokio::time::sleep;
12use tokio_retry::{strategy::FibonacciBackoff, Retry};
13
14use crate::build::{get_workspace, Workspace};
15use crate::commons::{spawn::SpawnOwner, ErrorCode};
16use crate::watch::ignore_agent::IgnoreAgents;
17use crate::watch::sse::handler_sse;
18
19use super::is_http_server_listening::is_http_server_listening;
20use super::watch_opts::WatchOpts;
21
22#[derive(Clone, Debug, Default, PartialEq)]
23pub enum Status {
24    #[default]
25    Building,
26    Version(u32),
27    Errors,
28}
29
30pub async fn run(mut opts: WatchOpts) -> Result<(), ErrorCode> {
31    log::info!("watch params => {opts:#?}");
32
33    let ws = match get_workspace() {
34        Ok(ws) => ws,
35        Err(err) => {
36            log::error!("Can't read workspace");
37            return Err(err);
38        }
39    };
40
41    let package_name = match opts.build.package_name.as_deref() {
42        Some(name) => name.to_string(),
43        None => match ws.infer_package_name() {
44            Some(name) => {
45                log::info!("Inferred package name = {name}");
46                opts.build.package_name = Some(name.clone());
47                name
48            }
49            None => {
50                log::error!(
51                    "Can't find vertigo project in {} (no cdylib member)",
52                    ws.get_root_dir()
53                );
54                return Err(ErrorCode::CantFindCdylibMember);
55            }
56        },
57    };
58
59    log::info!("package_name ==> {package_name:?}");
60
61    let root = ws.find_package_path(&package_name);
62    log::info!("path ==> {root:?}");
63
64    let Some(root) = root else {
65        log::error!("package not found ==> {:?}", opts.build.package_name);
66        return Err(ErrorCode::PackageNameNotFound);
67    };
68
69    // If optimization params not provided, default to false
70    if opts.build.release_mode.is_none() {
71        opts.build.release_mode = Some(false);
72    }
73    if opts.build.wasm_opt.is_none() {
74        opts.build.wasm_opt = Some(false);
75    }
76
77    let excludes = [root.join("target"), root.join(opts.common.dest_dir.clone())];
78
79    let notify_build = Arc::new(Notify::new());
80
81    let watch_result = notify::recommended_watcher({
82        let notify_build = notify_build.clone();
83
84        // Generate one Gitignore instance per every watched directory
85        let ignore_agents = IgnoreAgents::new(&ws.get_root_dir().into(), &opts);
86
87        move |res: Result<notify::Event, _>| match res {
88            Ok(event) => {
89                if event.paths.iter().all(|path| {
90                    // Check against hardcoded excludes
91                    for exclude_path in &excludes {
92                        if path.starts_with(exclude_path) {
93                            return true;
94                        }
95                    }
96                    // Check against ignore lists and custom excludes
97                    if ignore_agents.should_be_ignored(path) {
98                        return true;
99                    }
100                    false
101                }) {
102                    return;
103                }
104                notify_build.notify_one();
105            }
106            Err(err) => {
107                log::error!("watch error: {err:?}");
108            }
109        }
110    });
111
112    let mut watcher = match watch_result {
113        Ok(watcher) => watcher,
114        Err(error) => {
115            log::error!("error watcher => {error}");
116            return Err(ErrorCode::WatcherError);
117        }
118    };
119
120    let (tx, rx) = tokio::sync::watch::channel(Status::default());
121    let tx = Arc::new(tx);
122
123    tokio::spawn({
124        let cors_middleware = Cors::new()
125            .allow_methods(vec![Method::GET, Method::POST])
126            .max_age(3600);
127
128        let app = Route::new()
129            .at("/events", get(handler_sse))
130            .with(cors_middleware)
131            .data(rx);
132
133        async move {
134            Server::new(TcpListener::bind("127.0.0.1:5555"))
135                .run(app)
136                .await
137        }
138    });
139
140    use notify::Watcher;
141    watcher.watch(&root, RecursiveMode::Recursive).unwrap();
142
143    for watch_path in &opts.add_watch_path {
144        match watcher.watch(Path::new(watch_path), RecursiveMode::Recursive) {
145            Ok(()) => {
146                log::info!("Added `{watch_path}` to watched directories");
147            }
148            Err(err) => {
149                log::error!("Error adding watch dir `{watch_path}`: {err}");
150                return Err(ErrorCode::CantAddWatchDir);
151            }
152        }
153    }
154    let mut version = 0;
155
156    loop {
157        version += 1;
158
159        if let Err(err) = tx.send(Status::Building) {
160            log::error!("Can't contact the browser: {err} (Other watch process already running?)");
161            return Err(ErrorCode::OtherProcessAlreadyRunning);
162        };
163
164        sleep(Duration::from_millis(200)).await;
165
166        log::info!("Build run...");
167
168        let spawn = build_and_watch(version, tx.clone(), &opts, &ws);
169        notify_build.notified().await;
170        spawn.off();
171    }
172}
173
174fn build_and_watch(
175    version: u32,
176    tx: Arc<Sender<Status>>,
177    opts: &WatchOpts,
178    ws: &Workspace,
179) -> SpawnOwner {
180    let opts = opts.clone();
181    let ws = ws.clone();
182    SpawnOwner::new(async move {
183        match crate::build::run_with_ws(opts.to_build_opts(), &ws, true) {
184            Ok(()) => {
185                log::info!("Build successful.");
186
187                let check_spawn = SpawnOwner::new(async move {
188                    let _ = Retry::spawn(
189                        FibonacciBackoff::from_millis(100).max_delay(Duration::from_secs(4)),
190                        || is_http_server_listening(opts.serve.port),
191                    )
192                    .await;
193
194                    let Ok(()) = tx.send(Status::Version(version)) else {
195                        exit(ErrorCode::WatchPipeBroken as i32)
196                    };
197                });
198
199                let opts = opts.clone();
200
201                log::info!("Spawning serve command...");
202                let (serve_params, port_watch) = opts.to_serve_opts();
203
204                if let Err(error_code) = crate::serve::run(serve_params, Some(port_watch)).await {
205                    log::error!("Error {} while running server", error_code as i32);
206                    exit(error_code as i32)
207                }
208
209                check_spawn.off();
210            }
211            Err(_) => {
212                log::error!("Build run failed. Waiting for changes...");
213
214                let Ok(()) = tx.send(Status::Errors) else {
215                    exit(ErrorCode::WatchPipeBroken as i32)
216                };
217            }
218        };
219    })
220}