vertigo_cli/watch/
watch_run.rs

1use notify::RecursiveMode;
2use poem::http::Method;
3use poem::middleware::Cors;
4use poem::{get, listener::TcpListener, EndpointExt, Route, Server};
5use std::path::Path;
6use std::process::exit;
7use std::sync::Arc;
8use std::time::Duration;
9use tokio::sync::watch::Sender;
10use tokio::sync::Notify;
11use tokio::time::sleep;
12use tokio_retry::{strategy::FibonacciBackoff, Retry};
13
14use crate::build::{get_workspace, Workspace};
15use crate::commons::{spawn::SpawnOwner, ErrorCode};
16use crate::watch::sse::handler_sse;
17
18use super::is_http_server_listening::is_http_server_listening;
19use super::watch_opts::WatchOpts;
20
21#[derive(Clone, Debug, Default, PartialEq)]
22pub enum Status {
23    #[default]
24    Building,
25    Version(u32),
26    Errors,
27}
28
29pub async fn run(mut opts: WatchOpts) -> Result<(), ErrorCode> {
30    log::info!("watch params => {opts:#?}");
31
32    let ws = match get_workspace() {
33        Ok(ws) => ws,
34        Err(err) => {
35            log::error!("Can't read workspace");
36            return Err(err);
37        }
38    };
39
40    let package_name = match opts.build.package_name.as_deref() {
41        Some(name) => name.to_string(),
42        None => match ws.infer_package_name() {
43            Some(name) => {
44                log::info!("Inferred package name = {name}");
45                opts.build.package_name = Some(name.clone());
46                name
47            }
48            None => {
49                log::error!(
50                    "Can't find vertigo project in {} (no cdylib member)",
51                    ws.get_root_dir()
52                );
53                return Err(ErrorCode::CantFindCdylibMember);
54            }
55        },
56    };
57
58    log::info!("package_name ==> {package_name:?}");
59
60    let path = ws.find_package_path(&package_name);
61    log::info!("path ==> {path:?}");
62
63    let Some(path) = path else {
64        log::error!("package not found ==> {:?}", opts.build.package_name);
65        return Err(ErrorCode::PackageNameNotFound);
66    };
67
68    let excludes = [path.join("target"), path.join(opts.common.dest_dir.clone())];
69
70    let notify_build = Arc::new(Notify::new());
71
72    let watch_result = notify::recommended_watcher({
73        let notify_build = notify_build.clone();
74
75        move |res: Result<notify::Event, _>| match res {
76            Ok(event) => {
77                if event.paths.iter().all(|path| {
78                    for exclude_path in &excludes {
79                        if path.starts_with(exclude_path) {
80                            return true;
81                        }
82                    }
83                    false
84                }) {
85                    return;
86                }
87                notify_build.notify_one();
88            }
89            Err(err) => {
90                log::error!("watch error: {err:?}");
91            }
92        }
93    });
94
95    let mut watcher = match watch_result {
96        Ok(watcher) => watcher,
97        Err(error) => {
98            log::error!("error watcher => {error}");
99            return Err(ErrorCode::WatcherError);
100        }
101    };
102
103    let (tx, rx) = tokio::sync::watch::channel(Status::default());
104    let tx = Arc::new(tx);
105
106    tokio::spawn({
107        let cors_middleware = Cors::new()
108            .allow_methods(vec![Method::GET, Method::POST])
109            .max_age(3600);
110
111        let app = Route::new()
112            .at("/events", get(handler_sse))
113            .with(cors_middleware)
114            .data(rx);
115
116        async move {
117            Server::new(TcpListener::bind("127.0.0.1:5555"))
118                .run(app)
119                .await
120        }
121    });
122
123    use notify::Watcher;
124    watcher.watch(&path, RecursiveMode::Recursive).unwrap();
125
126    for watch_path in &opts.add_watch_path {
127        match watcher.watch(Path::new(watch_path), RecursiveMode::Recursive) {
128            Ok(()) => {
129                log::info!("Added `{watch_path}` to watched directories");
130            }
131            Err(err) => {
132                log::error!("Error adding watch dir `{watch_path}`: {err}");
133                return Err(ErrorCode::CantAddWatchDir);
134            }
135        }
136    }
137    let mut version = 0;
138
139    loop {
140        version += 1;
141
142        if let Err(err) = tx.send(Status::Building) {
143            log::error!("Can't contact the browser: {err} (Other watch process already running?)");
144            return Err(ErrorCode::OtherProcessAlreadyRunning);
145        };
146
147        sleep(Duration::from_millis(200)).await;
148
149        log::info!("Build run...");
150
151        let spawn = build_and_watch(version, tx.clone(), &opts, &ws);
152        notify_build.notified().await;
153        spawn.off();
154    }
155}
156
157fn build_and_watch(
158    version: u32,
159    tx: Arc<Sender<Status>>,
160    opts: &WatchOpts,
161    ws: &Workspace,
162) -> SpawnOwner {
163    let opts = opts.clone();
164    let ws = ws.clone();
165    SpawnOwner::new(async move {
166        match crate::build::run_with_ws(opts.to_build_opts(), &ws, true) {
167            Ok(()) => {
168                log::info!("Build successful.");
169
170                let check_spawn = SpawnOwner::new(async move {
171                    let _ = Retry::spawn(
172                        FibonacciBackoff::from_millis(100).max_delay(Duration::from_secs(4)),
173                        || is_http_server_listening(opts.serve.port),
174                    )
175                    .await;
176
177                    let Ok(()) = tx.send(Status::Version(version)) else {
178                        exit(ErrorCode::WatchPipeBroken as i32)
179                    };
180                });
181
182                let opts = opts.clone();
183
184                log::info!("Spawning serve command...");
185                let (serve_params, port_watch) = opts.to_serve_opts();
186
187                if let Err(error_code) = crate::serve::run(serve_params, Some(port_watch)).await {
188                    log::error!("Error {} while running server", error_code as i32);
189                    exit(error_code as i32)
190                }
191
192                check_spawn.off();
193            }
194            Err(_) => {
195                log::error!("Build run failed. Waiting for changes...");
196
197                let Ok(()) = tx.send(Status::Errors) else {
198                    exit(ErrorCode::WatchPipeBroken as i32)
199                };
200            }
201        };
202    })
203}