vertigo_cli/watch/
watch_run.rs

1use notify::RecursiveMode;
2use poem::http::Method;
3use poem::middleware::Cors;
4use poem::{get, listener::TcpListener, EndpointExt, Route, Server};
5use std::path::Path;
6use std::process::exit;
7use std::sync::Arc;
8use std::time::Duration;
9use tokio::sync::watch::Sender;
10use tokio::sync::Notify;
11use tokio::time::sleep;
12use tokio_retry::{strategy::FibonacciBackoff, Retry};
13
14use crate::build::{get_workspace, Workspace};
15use crate::commons::{spawn::SpawnOwner, ErrorCode};
16use crate::watch::sse::handler_sse;
17
18use super::is_http_server_listening::is_http_server_listening;
19use super::watch_opts::WatchOpts;
20
21#[derive(Clone, Debug, Default, PartialEq)]
22pub enum Status {
23    #[default]
24    Building,
25    Version(u32),
26    Errors,
27}
28
29pub async fn run(mut opts: WatchOpts) -> Result<(), ErrorCode> {
30    log::info!("watch params => {opts:#?}");
31
32    let ws = match get_workspace() {
33        Ok(ws) => ws,
34        Err(err) => {
35            log::error!("Can't read workspace");
36            return Err(err);
37        }
38    };
39
40    let package_name = match opts.build.package_name.as_deref() {
41        Some(name) => name.to_string(),
42        None => match ws.infer_package_name() {
43            Some(name) => {
44                log::info!("Inferred package name = {}", name);
45                opts.build.package_name = Some(name.clone());
46                name
47            }
48            None => {
49                log::error!(
50                    "Can't find vertigo project in {} (no cdylib member)",
51                    ws.get_root_dir()
52                );
53                return Err(ErrorCode::CantFindCdylibMember);
54            }
55        },
56    };
57
58    log::info!("package_name ==> {package_name:?}");
59
60    let path = ws.find_package_path(&package_name);
61    log::info!("path ==> {path:?}");
62
63    let Some(path) = path else {
64        log::error!("package not found ==> {:?}", opts.build.package_name);
65        return Err(ErrorCode::PackageNameNotFound);
66    };
67
68    let excludes = [path.join("target"), path.join(opts.common.dest_dir.clone())];
69
70    let notify_build = Arc::new(Notify::new());
71
72    let watch_result = notify::RecommendedWatcher::new(
73        {
74            let notify_build = notify_build.clone();
75
76            move |res: Result<notify::Event, _>| match res {
77                Ok(event) => {
78                    if event.paths.iter().all(|path| {
79                        for exclude_path in &excludes {
80                            if path.starts_with(exclude_path) {
81                                return true;
82                            }
83                        }
84                        false
85                    }) {
86                        return;
87                    }
88                    log::info!("event: {:?}", event);
89                    notify_build.notify_one();
90                }
91                Err(e) => {
92                    log::error!("watch error: {:?}", e);
93                }
94            }
95        },
96        notify::Config::default().with_poll_interval(std::time::Duration::from_millis(200)),
97    );
98
99    let mut watcher = match watch_result {
100        Ok(watcher) => watcher,
101        Err(error) => {
102            log::error!("error watcher => {error}");
103            return Err(ErrorCode::WatcherError);
104        }
105    };
106
107    let (tx, rx) = tokio::sync::watch::channel(Status::default());
108    let tx = Arc::new(tx);
109
110    tokio::spawn({
111        let cors_middleware = Cors::new()
112            .allow_methods(vec![Method::GET, Method::POST])
113            .max_age(3600);
114
115        let app = Route::new()
116            .at("/events", get(handler_sse))
117            .with(cors_middleware)
118            .data(rx);
119
120        async move {
121            Server::new(TcpListener::bind("127.0.0.1:5555"))
122                .run(app)
123                .await
124        }
125    });
126
127    use notify::Watcher;
128    watcher.watch(&path, RecursiveMode::Recursive).unwrap();
129
130    for watch_path in &opts.add_watch_path {
131        match watcher.watch(Path::new(watch_path), RecursiveMode::Recursive) {
132            Ok(()) => {
133                log::info!("Added `{watch_path}` to watched directories");
134            }
135            Err(err) => {
136                log::error!("Error adding watch dir `{watch_path}`: {err}");
137                return Err(ErrorCode::CantAddWatchDir);
138            }
139        }
140    }
141    let mut version = 0;
142
143    loop {
144        version += 1;
145
146        if let Err(err) = tx.send(Status::Building) {
147            log::error!("Can't contact the browser: {err} (Other watch process already running?)");
148            return Err(ErrorCode::OtherProcessAlreadyRunning);
149        };
150
151        log::info!("build run ...");
152
153        let spawn = build_and_watch(version, tx.clone(), &opts, &ws);
154        notify_build.notified().await;
155        spawn.off();
156    }
157}
158
159fn build_and_watch(
160    version: u32,
161    tx: Arc<Sender<Status>>,
162    opts: &WatchOpts,
163    ws: &Workspace,
164) -> SpawnOwner {
165    let opts = opts.clone();
166    let ws = ws.clone();
167    SpawnOwner::new(async move {
168        sleep(Duration::from_millis(200)).await;
169
170        match crate::build::run_with_ws(opts.to_build_opts(), &ws, true) {
171            Ok(()) => {
172                log::info!("Build successful.");
173
174                let check_spawn = SpawnOwner::new(async move {
175                    let _ = Retry::spawn(
176                        FibonacciBackoff::from_millis(100).max_delay(Duration::from_secs(4)),
177                        || is_http_server_listening(opts.serve.port),
178                    )
179                    .await;
180
181                    let Ok(()) = tx.send(Status::Version(version)) else {
182                        exit(ErrorCode::WatchPipeBroken as i32)
183                    };
184                });
185
186                let opts = opts.clone();
187
188                log::info!("Spawning serve command...");
189                let (serve_params, port_watch) = opts.to_serve_opts();
190
191                if let Err(error_code) = crate::serve::run(serve_params, Some(port_watch)).await {
192                    log::error!("Error {} while running server", error_code as i32);
193                    exit(error_code as i32)
194                }
195
196                check_spawn.off();
197            }
198            Err(_) => {
199                log::error!("Build run failed. Waiting for changes...");
200
201                let Ok(()) = tx.send(Status::Errors) else {
202                    exit(ErrorCode::WatchPipeBroken as i32)
203                };
204            }
205        };
206    })
207}