vertigo_cli/watch/
watch_run.rs1use notify::RecursiveMode;
2use poem::http::Method;
3use poem::middleware::Cors;
4use poem::{get, listener::TcpListener, EndpointExt, Route, Server};
5use std::path::Path;
6use std::process::exit;
7use std::sync::Arc;
8use std::time::Duration;
9use tokio::sync::watch::Sender;
10use tokio::sync::Notify;
11use tokio::time::sleep;
12use tokio_retry::{strategy::FibonacciBackoff, Retry};
13
14use crate::build::{get_workspace, Workspace};
15use crate::commons::{spawn::SpawnOwner, ErrorCode};
16use crate::watch::sse::handler_sse;
17
18use super::is_http_server_listening::is_http_server_listening;
19use super::watch_opts::WatchOpts;
20
21#[derive(Clone, Debug, Default, PartialEq)]
22pub enum Status {
23 #[default]
24 Building,
25 Version(u32),
26 Errors,
27}
28
29pub async fn run(mut opts: WatchOpts) -> Result<(), ErrorCode> {
30 log::info!("watch params => {opts:#?}");
31
32 let ws = match get_workspace() {
33 Ok(ws) => ws,
34 Err(err) => {
35 log::error!("Can't read workspace");
36 return Err(err);
37 }
38 };
39
40 let package_name = match opts.build.package_name.as_deref() {
41 Some(name) => name.to_string(),
42 None => match ws.infer_package_name() {
43 Some(name) => {
44 log::info!("Inferred package name = {name}");
45 opts.build.package_name = Some(name.clone());
46 name
47 }
48 None => {
49 log::error!(
50 "Can't find vertigo project in {} (no cdylib member)",
51 ws.get_root_dir()
52 );
53 return Err(ErrorCode::CantFindCdylibMember);
54 }
55 },
56 };
57
58 log::info!("package_name ==> {package_name:?}");
59
60 let path = ws.find_package_path(&package_name);
61 log::info!("path ==> {path:?}");
62
63 let Some(path) = path else {
64 log::error!("package not found ==> {:?}", opts.build.package_name);
65 return Err(ErrorCode::PackageNameNotFound);
66 };
67
68 let excludes = [path.join("target"), path.join(opts.common.dest_dir.clone())];
69
70 let notify_build = Arc::new(Notify::new());
71
72 let watch_result = notify::recommended_watcher({
73 let notify_build = notify_build.clone();
74
75 move |res: Result<notify::Event, _>| match res {
76 Ok(event) => {
77 if event.paths.iter().all(|path| {
78 for exclude_path in &excludes {
79 if path.starts_with(exclude_path) {
80 return true;
81 }
82 }
83 false
84 }) {
85 return;
86 }
87 notify_build.notify_one();
88 }
89 Err(err) => {
90 log::error!("watch error: {err:?}");
91 }
92 }
93 });
94
95 let mut watcher = match watch_result {
96 Ok(watcher) => watcher,
97 Err(error) => {
98 log::error!("error watcher => {error}");
99 return Err(ErrorCode::WatcherError);
100 }
101 };
102
103 let (tx, rx) = tokio::sync::watch::channel(Status::default());
104 let tx = Arc::new(tx);
105
106 tokio::spawn({
107 let cors_middleware = Cors::new()
108 .allow_methods(vec![Method::GET, Method::POST])
109 .max_age(3600);
110
111 let app = Route::new()
112 .at("/events", get(handler_sse))
113 .with(cors_middleware)
114 .data(rx);
115
116 async move {
117 Server::new(TcpListener::bind("127.0.0.1:5555"))
118 .run(app)
119 .await
120 }
121 });
122
123 use notify::Watcher;
124 watcher.watch(&path, RecursiveMode::Recursive).unwrap();
125
126 for watch_path in &opts.add_watch_path {
127 match watcher.watch(Path::new(watch_path), RecursiveMode::Recursive) {
128 Ok(()) => {
129 log::info!("Added `{watch_path}` to watched directories");
130 }
131 Err(err) => {
132 log::error!("Error adding watch dir `{watch_path}`: {err}");
133 return Err(ErrorCode::CantAddWatchDir);
134 }
135 }
136 }
137 let mut version = 0;
138
139 loop {
140 version += 1;
141
142 if let Err(err) = tx.send(Status::Building) {
143 log::error!("Can't contact the browser: {err} (Other watch process already running?)");
144 return Err(ErrorCode::OtherProcessAlreadyRunning);
145 };
146
147 sleep(Duration::from_millis(200)).await;
148
149 log::info!("Build run...");
150
151 let spawn = build_and_watch(version, tx.clone(), &opts, &ws);
152 notify_build.notified().await;
153 spawn.off();
154 }
155}
156
157fn build_and_watch(
158 version: u32,
159 tx: Arc<Sender<Status>>,
160 opts: &WatchOpts,
161 ws: &Workspace,
162) -> SpawnOwner {
163 let opts = opts.clone();
164 let ws = ws.clone();
165 SpawnOwner::new(async move {
166 match crate::build::run_with_ws(opts.to_build_opts(), &ws, true) {
167 Ok(()) => {
168 log::info!("Build successful.");
169
170 let check_spawn = SpawnOwner::new(async move {
171 let _ = Retry::spawn(
172 FibonacciBackoff::from_millis(100).max_delay(Duration::from_secs(4)),
173 || is_http_server_listening(opts.serve.port),
174 )
175 .await;
176
177 let Ok(()) = tx.send(Status::Version(version)) else {
178 exit(ErrorCode::WatchPipeBroken as i32)
179 };
180 });
181
182 let opts = opts.clone();
183
184 log::info!("Spawning serve command...");
185 let (serve_params, port_watch) = opts.to_serve_opts();
186
187 if let Err(error_code) = crate::serve::run(serve_params, Some(port_watch)).await {
188 log::error!("Error {} while running server", error_code as i32);
189 exit(error_code as i32)
190 }
191
192 check_spawn.off();
193 }
194 Err(_) => {
195 log::error!("Build run failed. Waiting for changes...");
196
197 let Ok(()) = tx.send(Status::Errors) else {
198 exit(ErrorCode::WatchPipeBroken as i32)
199 };
200 }
201 };
202 })
203}