vertigo_cli/watch/
watch_run.rs1use notify::RecursiveMode;
2use poem::http::Method;
3use poem::middleware::Cors;
4use poem::{get, listener::TcpListener, EndpointExt, Route, Server};
5use std::{path::Path, process::exit, sync::Arc, time::Duration};
6use tokio::sync::{watch::Sender, Notify};
7use tokio::time::sleep;
8use tokio_retry::{strategy::FibonacciBackoff, Retry};
9
10use crate::build::{get_workspace, Workspace};
11use crate::commons::{spawn::SpawnOwner, ErrorCode};
12use crate::watch::ignore_agent::IgnoreAgents;
13use crate::watch::sse::handler_sse;
14
15use super::is_http_server_listening::is_http_server_listening;
16use super::watch_opts::WatchOpts;
17
18#[derive(Clone, Debug, Default, PartialEq)]
19pub enum Status {
20 #[default]
21 Building,
22 Version(u32),
23 Errors,
24}
25
26pub async fn run(mut opts: WatchOpts) -> Result<(), ErrorCode> {
27 log::info!("watch params => {opts:#?}");
28
29 let ws = match get_workspace() {
30 Ok(ws) => ws,
31 Err(err) => {
32 log::error!("Can't read workspace");
33 return Err(err);
34 }
35 };
36
37 let package_name = match opts.build.package_name.as_deref() {
38 Some(name) => name.to_string(),
39 None => match ws.infer_package_name() {
40 Some(name) => {
41 log::info!("Inferred package name = {name}");
42 opts.build.package_name = Some(name.clone());
43 name
44 }
45 None => {
46 log::error!(
47 "Can't find vertigo project in {} (no cdylib member)",
48 ws.get_root_dir()
49 );
50 return Err(ErrorCode::CantFindCdylibMember);
51 }
52 },
53 };
54
55 log::info!("package_name ==> {package_name:?}");
56
57 let root = ws.find_package_path(&package_name);
58 log::info!("path ==> {root:?}");
59
60 let Some(root) = root else {
61 log::error!("package not found ==> {:?}", opts.build.package_name);
62 return Err(ErrorCode::PackageNameNotFound);
63 };
64
65 if opts.build.release_mode.is_none() {
67 opts.build.release_mode = Some(false);
68 }
69 if opts.build.wasm_opt.is_none() {
70 opts.build.wasm_opt = Some(false);
71 }
72
73 let excludes = [root.join("target"), root.join(opts.common.dest_dir.clone())];
74
75 let notify_build = Arc::new(Notify::new());
76
77 let watch_result = notify::recommended_watcher({
78 let notify_build = notify_build.clone();
79
80 let ignore_agents = IgnoreAgents::new(&ws.get_root_dir().into(), &opts);
82
83 move |res: Result<notify::Event, _>| match res {
84 Ok(event) => {
85 if event.paths.iter().all(|path| {
86 for exclude_path in &excludes {
88 if path.starts_with(exclude_path) {
89 return true;
90 }
91 }
92 if ignore_agents.should_be_ignored(path) {
94 return true;
95 }
96 false
97 }) {
98 return;
99 }
100 notify_build.notify_one();
101 }
102 Err(err) => {
103 log::error!("watch error: {err:?}");
104 }
105 }
106 });
107
108 let mut watcher = match watch_result {
109 Ok(watcher) => watcher,
110 Err(error) => {
111 log::error!("error watcher => {error}");
112 return Err(ErrorCode::WatcherError);
113 }
114 };
115
116 let (tx, rx) = tokio::sync::watch::channel(Status::default());
117 let tx = Arc::new(tx);
118
119 tokio::spawn({
120 let cors_middleware = Cors::new()
121 .allow_methods(vec![Method::GET, Method::POST])
122 .max_age(3600);
123
124 let app = Route::new()
125 .at("/events", get(handler_sse))
126 .with(cors_middleware)
127 .data(rx);
128
129 async move {
130 Server::new(TcpListener::bind("127.0.0.1:5555"))
131 .run(app)
132 .await
133 }
134 });
135
136 use notify::Watcher;
137 watcher
138 .watch(&root, RecursiveMode::Recursive)
139 .map_err(|err| {
140 log::error!("Can't watch root dir {}: {err}", root.to_string_lossy());
141 ErrorCode::CantAddWatchDir
142 })?;
143
144 for watch_path in &opts.add_watch_path {
145 match watcher.watch(Path::new(watch_path), RecursiveMode::Recursive) {
146 Ok(()) => {
147 log::info!("Added `{watch_path}` to watched directories");
148 }
149 Err(err) => {
150 log::error!("Error adding watch dir `{watch_path}`: {err}");
151 return Err(ErrorCode::CantAddWatchDir);
152 }
153 }
154 }
155 let mut version = 0;
156
157 loop {
158 version += 1;
159
160 if let Err(err) = tx.send(Status::Building) {
161 log::error!("Can't contact the browser: {err} (Other watch process already running?)");
162 return Err(ErrorCode::OtherProcessAlreadyRunning);
163 };
164
165 sleep(Duration::from_millis(200)).await;
166
167 log::info!("Build run...");
168
169 let spawn = build_and_watch(version, tx.clone(), &opts, &ws);
170 notify_build.notified().await;
171 spawn.off();
172 }
173}
174
175fn build_and_watch(
176 version: u32,
177 tx: Arc<Sender<Status>>,
178 opts: &WatchOpts,
179 ws: &Workspace,
180) -> SpawnOwner {
181 let opts = opts.clone();
182 let ws = ws.clone();
183 SpawnOwner::new(async move {
184 match crate::build::run_with_ws(opts.to_build_opts(), &ws, true) {
185 Ok(()) => {
186 log::info!("Build successful.");
187
188 let check_spawn = SpawnOwner::new(async move {
189 let _ = Retry::spawn(
190 FibonacciBackoff::from_millis(100).max_delay(Duration::from_secs(4)),
191 || is_http_server_listening(opts.serve.port),
192 )
193 .await;
194
195 let Ok(()) = tx.send(Status::Version(version)) else {
196 exit(ErrorCode::WatchPipeBroken as i32)
197 };
198 });
199
200 let opts = opts.clone();
201
202 log::info!("Spawning serve command...");
203 let (serve_params, port_watch) = opts.to_serve_opts();
204
205 if let Err(error_code) = crate::serve::run(serve_params, Some(port_watch)).await {
206 log::error!("Error {} while running server", error_code as i32);
207 exit(error_code as i32)
208 }
209
210 check_spawn.off();
211 }
212 Err(_) => {
213 log::error!("Build run failed. Waiting for changes...");
214
215 let Ok(()) = tx.send(Status::Errors) else {
216 exit(ErrorCode::WatchPipeBroken as i32)
217 };
218 }
219 };
220 })
221}