vertigo_cli/watch/
watch_run.rs1use notify::RecursiveMode;
2use poem::http::Method;
3use poem::middleware::Cors;
4use poem::{get, listener::TcpListener, EndpointExt, Route, Server};
5use std::path::Path;
6use std::process::exit;
7use std::sync::Arc;
8use std::time::Duration;
9use tokio::sync::watch::Sender;
10use tokio::sync::Notify;
11use tokio::time::sleep;
12use tokio_retry::{strategy::FibonacciBackoff, Retry};
13
14use crate::build::{get_workspace, Workspace};
15use crate::commons::{spawn::SpawnOwner, ErrorCode};
16use crate::watch::ignore_agent::IgnoreAgents;
17use crate::watch::sse::handler_sse;
18
19use super::is_http_server_listening::is_http_server_listening;
20use super::watch_opts::WatchOpts;
21
22#[derive(Clone, Debug, Default, PartialEq)]
23pub enum Status {
24 #[default]
25 Building,
26 Version(u32),
27 Errors,
28}
29
30pub async fn run(mut opts: WatchOpts) -> Result<(), ErrorCode> {
31 log::info!("watch params => {opts:#?}");
32
33 let ws = match get_workspace() {
34 Ok(ws) => ws,
35 Err(err) => {
36 log::error!("Can't read workspace");
37 return Err(err);
38 }
39 };
40
41 let package_name = match opts.build.package_name.as_deref() {
42 Some(name) => name.to_string(),
43 None => match ws.infer_package_name() {
44 Some(name) => {
45 log::info!("Inferred package name = {name}");
46 opts.build.package_name = Some(name.clone());
47 name
48 }
49 None => {
50 log::error!(
51 "Can't find vertigo project in {} (no cdylib member)",
52 ws.get_root_dir()
53 );
54 return Err(ErrorCode::CantFindCdylibMember);
55 }
56 },
57 };
58
59 log::info!("package_name ==> {package_name:?}");
60
61 let root = ws.find_package_path(&package_name);
62 log::info!("path ==> {root:?}");
63
64 let Some(root) = root else {
65 log::error!("package not found ==> {:?}", opts.build.package_name);
66 return Err(ErrorCode::PackageNameNotFound);
67 };
68
69 if opts.build.release_mode.is_none() {
71 opts.build.release_mode = Some(false);
72 }
73 if opts.build.wasm_opt.is_none() {
74 opts.build.wasm_opt = Some(false);
75 }
76
77 let excludes = [root.join("target"), root.join(opts.common.dest_dir.clone())];
78
79 let notify_build = Arc::new(Notify::new());
80
81 let watch_result = notify::recommended_watcher({
82 let notify_build = notify_build.clone();
83
84 let ignore_agents = IgnoreAgents::new(&ws.get_root_dir().into(), &opts);
86
87 move |res: Result<notify::Event, _>| match res {
88 Ok(event) => {
89 if event.paths.iter().all(|path| {
90 for exclude_path in &excludes {
92 if path.starts_with(exclude_path) {
93 return true;
94 }
95 }
96 if ignore_agents.should_be_ignored(path) {
98 return true;
99 }
100 false
101 }) {
102 return;
103 }
104 notify_build.notify_one();
105 }
106 Err(err) => {
107 log::error!("watch error: {err:?}");
108 }
109 }
110 });
111
112 let mut watcher = match watch_result {
113 Ok(watcher) => watcher,
114 Err(error) => {
115 log::error!("error watcher => {error}");
116 return Err(ErrorCode::WatcherError);
117 }
118 };
119
120 let (tx, rx) = tokio::sync::watch::channel(Status::default());
121 let tx = Arc::new(tx);
122
123 tokio::spawn({
124 let cors_middleware = Cors::new()
125 .allow_methods(vec![Method::GET, Method::POST])
126 .max_age(3600);
127
128 let app = Route::new()
129 .at("/events", get(handler_sse))
130 .with(cors_middleware)
131 .data(rx);
132
133 async move {
134 Server::new(TcpListener::bind("127.0.0.1:5555"))
135 .run(app)
136 .await
137 }
138 });
139
140 use notify::Watcher;
141 watcher.watch(&root, RecursiveMode::Recursive).unwrap();
142
143 for watch_path in &opts.add_watch_path {
144 match watcher.watch(Path::new(watch_path), RecursiveMode::Recursive) {
145 Ok(()) => {
146 log::info!("Added `{watch_path}` to watched directories");
147 }
148 Err(err) => {
149 log::error!("Error adding watch dir `{watch_path}`: {err}");
150 return Err(ErrorCode::CantAddWatchDir);
151 }
152 }
153 }
154 let mut version = 0;
155
156 loop {
157 version += 1;
158
159 if let Err(err) = tx.send(Status::Building) {
160 log::error!("Can't contact the browser: {err} (Other watch process already running?)");
161 return Err(ErrorCode::OtherProcessAlreadyRunning);
162 };
163
164 sleep(Duration::from_millis(200)).await;
165
166 log::info!("Build run...");
167
168 let spawn = build_and_watch(version, tx.clone(), &opts, &ws);
169 notify_build.notified().await;
170 spawn.off();
171 }
172}
173
174fn build_and_watch(
175 version: u32,
176 tx: Arc<Sender<Status>>,
177 opts: &WatchOpts,
178 ws: &Workspace,
179) -> SpawnOwner {
180 let opts = opts.clone();
181 let ws = ws.clone();
182 SpawnOwner::new(async move {
183 match crate::build::run_with_ws(opts.to_build_opts(), &ws, true) {
184 Ok(()) => {
185 log::info!("Build successful.");
186
187 let check_spawn = SpawnOwner::new(async move {
188 let _ = Retry::spawn(
189 FibonacciBackoff::from_millis(100).max_delay(Duration::from_secs(4)),
190 || is_http_server_listening(opts.serve.port),
191 )
192 .await;
193
194 let Ok(()) = tx.send(Status::Version(version)) else {
195 exit(ErrorCode::WatchPipeBroken as i32)
196 };
197 });
198
199 let opts = opts.clone();
200
201 log::info!("Spawning serve command...");
202 let (serve_params, port_watch) = opts.to_serve_opts();
203
204 if let Err(error_code) = crate::serve::run(serve_params, Some(port_watch)).await {
205 log::error!("Error {} while running server", error_code as i32);
206 exit(error_code as i32)
207 }
208
209 check_spawn.off();
210 }
211 Err(_) => {
212 log::error!("Build run failed. Waiting for changes...");
213
214 let Ok(()) = tx.send(Status::Errors) else {
215 exit(ErrorCode::WatchPipeBroken as i32)
216 };
217 }
218 };
219 })
220}