vertigo_cli/watch/
watch_run.rs1use notify::RecursiveMode;
2use poem::http::Method;
3use poem::middleware::Cors;
4use poem::{get, listener::TcpListener, EndpointExt, Route, Server};
5use std::{path::Path, process::exit, sync::Arc, time::Duration};
6use tokio::sync::{watch::Sender, Notify};
7use tokio::time::sleep;
8use tokio_retry::{strategy::FibonacciBackoff, Retry};
9
10use crate::build::{get_workspace, Workspace};
11use crate::commons::{spawn::SpawnOwner, ErrorCode};
12use crate::watch::ignore_agent::IgnoreAgents;
13use crate::watch::sse::handler_sse;
14
15use super::is_http_server_listening::is_http_server_listening;
16use super::watch_opts::WatchOpts;
17
18#[derive(Clone, Debug, Default, PartialEq)]
19pub enum Status {
20 #[default]
21 Building,
22 Version(u32),
23 Errors,
24}
25
26pub async fn run(mut opts: WatchOpts) -> Result<(), ErrorCode> {
27 log::info!("watch params => {opts:#?}");
28
29 let ws = match get_workspace() {
30 Ok(ws) => ws,
31 Err(err) => {
32 log::error!("Can't read workspace");
33 return Err(err);
34 }
35 };
36
37 let package_name = match opts.build.package_name.as_deref() {
38 Some(name) => name.to_string(),
39 None => match ws.infer_package_name() {
40 Some(name) => {
41 log::info!("Inferred package name = {name}");
42 opts.build.package_name = Some(name.clone());
43 name
44 }
45 None => {
46 log::error!(
47 "Can't find vertigo project in {} (no cdylib member)",
48 ws.get_root_dir()
49 );
50 return Err(ErrorCode::CantFindCdylibMember);
51 }
52 },
53 };
54
55 log::info!("package_name ==> {package_name:?}");
56
57 let root = ws.find_package_path(&package_name);
58 log::info!("path ==> {root:?}");
59
60 let Some(root) = root else {
61 log::error!("package not found ==> {:?}", opts.build.package_name);
62 return Err(ErrorCode::PackageNameNotFound);
63 };
64
65 if opts.build.release_mode.is_none() {
67 opts.build.release_mode = Some(false);
68 }
69 if opts.build.wasm_opt.is_none() {
70 opts.build.wasm_opt = Some(false);
71 }
72
73 let excludes = [root.join("target"), root.join(opts.common.dest_dir.clone())];
74
75 let notify_build = Arc::new(Notify::new());
76
77 let watch_result = notify::recommended_watcher({
78 let notify_build = notify_build.clone();
79
80 let ignore_agents = IgnoreAgents::new(&ws.get_root_dir().into(), &opts);
82
83 move |res: Result<notify::Event, _>| match res {
84 Ok(event) => {
85 if event.paths.iter().all(|path| {
86 for exclude_path in &excludes {
88 if path.starts_with(exclude_path) {
89 return true;
90 }
91 }
92 if ignore_agents.should_be_ignored(path) {
94 return true;
95 }
96 false
97 }) {
98 return;
99 }
100 notify_build.notify_one();
101 }
102 Err(err) => {
103 log::error!("watch error: {err:?}");
104 }
105 }
106 });
107
108 let mut watcher = match watch_result {
109 Ok(watcher) => watcher,
110 Err(error) => {
111 log::error!("error watcher => {error}");
112 return Err(ErrorCode::WatcherError);
113 }
114 };
115
116 let (tx, rx) = tokio::sync::watch::channel(Status::default());
117 let tx = Arc::new(tx);
118
119 tokio::spawn({
120 let cors_middleware = Cors::new()
121 .allow_methods(vec![Method::GET, Method::POST])
122 .max_age(3600);
123
124 let app = Route::new()
125 .at("/events", get(handler_sse))
126 .with(cors_middleware)
127 .data(rx);
128
129 async move {
130 Server::new(TcpListener::bind("127.0.0.1:5555"))
131 .run(app)
132 .await
133 }
134 });
135
136 use notify::Watcher;
137 watcher.watch(&root, RecursiveMode::Recursive).unwrap();
138
139 for watch_path in &opts.add_watch_path {
140 match watcher.watch(Path::new(watch_path), RecursiveMode::Recursive) {
141 Ok(()) => {
142 log::info!("Added `{watch_path}` to watched directories");
143 }
144 Err(err) => {
145 log::error!("Error adding watch dir `{watch_path}`: {err}");
146 return Err(ErrorCode::CantAddWatchDir);
147 }
148 }
149 }
150 let mut version = 0;
151
152 loop {
153 version += 1;
154
155 if let Err(err) = tx.send(Status::Building) {
156 log::error!("Can't contact the browser: {err} (Other watch process already running?)");
157 return Err(ErrorCode::OtherProcessAlreadyRunning);
158 };
159
160 sleep(Duration::from_millis(200)).await;
161
162 log::info!("Build run...");
163
164 let spawn = build_and_watch(version, tx.clone(), &opts, &ws);
165 notify_build.notified().await;
166 spawn.off();
167 }
168}
169
170fn build_and_watch(
171 version: u32,
172 tx: Arc<Sender<Status>>,
173 opts: &WatchOpts,
174 ws: &Workspace,
175) -> SpawnOwner {
176 let opts = opts.clone();
177 let ws = ws.clone();
178 SpawnOwner::new(async move {
179 match crate::build::run_with_ws(opts.to_build_opts(), &ws, true) {
180 Ok(()) => {
181 log::info!("Build successful.");
182
183 let check_spawn = SpawnOwner::new(async move {
184 let _ = Retry::spawn(
185 FibonacciBackoff::from_millis(100).max_delay(Duration::from_secs(4)),
186 || is_http_server_listening(opts.serve.port),
187 )
188 .await;
189
190 let Ok(()) = tx.send(Status::Version(version)) else {
191 exit(ErrorCode::WatchPipeBroken as i32)
192 };
193 });
194
195 let opts = opts.clone();
196
197 log::info!("Spawning serve command...");
198 let (serve_params, port_watch) = opts.to_serve_opts();
199
200 if let Err(error_code) = crate::serve::run(serve_params, Some(port_watch)).await {
201 log::error!("Error {} while running server", error_code as i32);
202 exit(error_code as i32)
203 }
204
205 check_spawn.off();
206 }
207 Err(_) => {
208 log::error!("Build run failed. Waiting for changes...");
209
210 let Ok(()) = tx.send(Status::Errors) else {
211 exit(ErrorCode::WatchPipeBroken as i32)
212 };
213 }
214 };
215 })
216}