vertigo_cli/watch/
watch_run.rs1use actix_cors::Cors;
2use actix_web::{App, HttpServer, rt::System, web};
3use notify::RecursiveMode;
4use std::{path::Path, process::exit, sync::Arc, time::Duration};
5use tokio::{
6 sync::{Notify, watch::Sender},
7 time::sleep,
8};
9use tokio_retry::{Retry, strategy::FibonacciBackoff};
10
11use crate::build::{Workspace, get_workspace};
12use crate::commons::{
13 ErrorCode,
14 spawn::{SpawnOwner, term_signal},
15};
16
17use super::{
18 ignore_agent::IgnoreAgents, is_http_server_listening::is_http_server_listening,
19 sse::handler_sse, watch_opts::WatchOpts,
20};
21
22#[derive(Clone, Debug, Default, PartialEq)]
23pub enum Status {
24 #[default]
25 Building,
26 Version(u32),
27 Errors,
28}
29
30pub async fn run(mut opts: WatchOpts) -> Result<(), ErrorCode> {
31 log::info!("watch params => {opts:#?}");
32
33 let ws = match get_workspace() {
34 Ok(ws) => ws,
35 Err(err) => {
36 log::error!("Can't read workspace");
37 return Err(err);
38 }
39 };
40
41 let package_name = match opts.build.package_name.as_deref() {
42 Some(name) => name.to_string(),
43 None => match ws.infer_package_name() {
44 Some(name) => {
45 log::info!("Inferred package name = {name}");
46 opts.build.package_name = Some(name.clone());
47 name
48 }
49 None => {
50 log::error!(
51 "Can't find vertigo project in {} (no cdylib member)",
52 ws.get_root_dir()
53 );
54 return Err(ErrorCode::CantFindCdylibMember);
55 }
56 },
57 };
58
59 log::info!("package_name ==> {package_name:?}");
60
61 let root = ws.find_package_path(&package_name);
62 log::info!("path ==> {root:?}");
63
64 let Some(root) = root else {
65 log::error!("package not found ==> {:?}", opts.build.package_name);
66 return Err(ErrorCode::PackageNameNotFound);
67 };
68
69 if opts.build.release_mode.is_none() {
71 opts.build.release_mode = Some(false);
72 }
73 if opts.build.wasm_opt.is_none() {
74 opts.build.wasm_opt = Some(false);
75 }
76
77 let excludes = [root.join("target"), root.join(opts.common.dest_dir.clone())];
78
79 let notify_build = Arc::new(Notify::new());
80
81 let watch_result = notify::recommended_watcher({
82 let notify_build = notify_build.clone();
83
84 let ignore_agents = IgnoreAgents::new(&ws.get_root_dir().into(), &opts);
86
87 move |res: Result<notify::Event, _>| match res {
88 Ok(event) => {
89 if let notify::EventKind::Access(_) = event.kind {
90 return;
91 }
92
93 if event.paths.iter().all(|path| {
94 for exclude_path in &excludes {
96 if path.starts_with(exclude_path) {
97 return true;
98 }
99 }
100 if ignore_agents.should_be_ignored(path) {
102 return true;
103 }
104 false
105 }) {
106 return;
107 }
108 notify_build.notify_one();
109 }
110 Err(err) => {
111 log::error!("watch error: {err:?}");
112 }
113 }
114 });
115
116 let mut watcher = match watch_result {
117 Ok(watcher) => watcher,
118 Err(error) => {
119 log::error!("error watcher => {error}");
120 return Err(ErrorCode::WatcherError);
121 }
122 };
123
124 let (tx, rx) = tokio::sync::watch::channel(Status::default());
125 let tx = Arc::new(tx);
126
127 let watch_server = HttpServer::new(move || {
128 App::new()
129 .wrap(Cors::permissive())
130 .service(web::resource("/events").route(web::get().to(handler_sse)))
131 .app_data(web::Data::new(rx.clone()))
132 })
133 .workers(1)
134 .bind("127.0.0.1:5555")
135 .map_err(|err| {
136 log::error!("Watch server bind error: {err}");
137 ErrorCode::WatcherError
138 })?
139 .disable_signals()
140 .client_disconnect_timeout(Duration::from_secs(1))
141 .shutdown_timeout(1)
142 .run();
143
144 let watch_handle = watch_server.handle();
145
146 std::thread::spawn(move || System::new().block_on(watch_server));
147
148 use notify::Watcher;
149 watcher
150 .watch(&root, RecursiveMode::Recursive)
151 .map_err(|err| {
152 log::error!("Can't watch root dir {}: {err}", root.to_string_lossy());
153 ErrorCode::CantAddWatchDir
154 })?;
155
156 for watch_path in &opts.add_watch_path {
157 match watcher.watch(Path::new(watch_path), RecursiveMode::Recursive) {
158 Ok(()) => {
159 log::info!("Added `{watch_path}` to watched directories");
160 }
161 Err(err) => {
162 log::error!("Error adding watch dir `{watch_path}`: {err}");
163 return Err(ErrorCode::CantAddWatchDir);
164 }
165 }
166 }
167 let mut version = 0;
168
169 loop {
170 version += 1;
171
172 if let Err(err) = tx.send(Status::Building) {
173 log::error!("Can't contact the browser: {err} (Other watch process already running?)");
174 return Err(ErrorCode::OtherProcessAlreadyRunning);
175 };
176
177 sleep(Duration::from_millis(200)).await;
178
179 log::info!("Build run...");
180
181 let spawn = build_and_watch(version, tx.clone(), &opts, &ws);
182
183 tokio::select! {
184 msg = term_signal() => {
185 log::info!("{msg} received, shutting down");
186 spawn.off();
187 watch_handle.stop(true).await;
188 return Ok(());
189 }
190 _ = notify_build.notified() => {
191 log::info!("Notify build received, shutting down");
192 spawn.off();
193 }
194 }
195 }
196}
197
198fn build_and_watch(
199 version: u32,
200 tx: Arc<Sender<Status>>,
201 opts: &WatchOpts,
202 ws: &Workspace,
203) -> SpawnOwner {
204 let opts = opts.clone();
205 let ws = ws.clone();
206 SpawnOwner::new(async move {
207 match crate::build::run_with_ws(opts.to_build_opts(), &ws, true) {
208 Ok(()) => {
209 log::info!("Build successful.");
210
211 let check_spawn = SpawnOwner::new(async move {
212 let _ = Retry::spawn(
213 FibonacciBackoff::from_millis(100).max_delay(Duration::from_secs(4)),
214 || is_http_server_listening(opts.serve.port),
215 )
216 .await;
217
218 let Ok(()) = tx.send(Status::Version(version)) else {
219 exit(ErrorCode::WatchPipeBroken as i32)
220 };
221 });
222
223 let opts = opts.clone();
224
225 log::info!("Spawning serve command...");
226 let (mut serve_params, port_watch) = opts.to_serve_opts();
227
228 if serve_params.inner.threads.is_none() {
229 serve_params.inner.threads = Some(2);
230 }
231
232 if let Err(error_code) = crate::serve::run(serve_params, Some(port_watch)).await {
233 log::error!("Error {} while running server", error_code as i32);
234 exit(error_code as i32)
235 }
236
237 check_spawn.off();
238 }
239 Err(_) => {
240 log::error!("Build run failed. Waiting for changes...");
241
242 let Ok(()) = tx.send(Status::Errors) else {
243 exit(ErrorCode::WatchPipeBroken as i32)
244 };
245 }
246 };
247 })
248}