vertigo_cli/watch/
watch_run.rs1use notify::RecursiveMode;
2use poem::http::Method;
3use poem::middleware::Cors;
4use poem::{get, listener::TcpListener, EndpointExt, Route, Server};
5use std::path::Path;
6use std::process::exit;
7use std::sync::Arc;
8use std::time::Duration;
9use tokio::sync::watch::Sender;
10use tokio::sync::Notify;
11use tokio::time::sleep;
12use tokio_retry::{strategy::FibonacciBackoff, Retry};
13
14use crate::build::{get_workspace, Workspace};
15use crate::commons::{spawn::SpawnOwner, ErrorCode};
16use crate::watch::sse::handler_sse;
17
18use super::is_http_server_listening::is_http_server_listening;
19use super::watch_opts::WatchOpts;
20
21#[derive(Clone, Debug, Default, PartialEq)]
22pub enum Status {
23 #[default]
24 Building,
25 Version(u32),
26 Errors,
27}
28
29pub async fn run(mut opts: WatchOpts) -> Result<(), ErrorCode> {
30 log::info!("watch params => {opts:#?}");
31
32 let ws = match get_workspace() {
33 Ok(ws) => ws,
34 Err(err) => {
35 log::error!("Can't read workspace");
36 return Err(err);
37 }
38 };
39
40 let package_name = match opts.build.package_name.as_deref() {
41 Some(name) => name.to_string(),
42 None => match ws.infer_package_name() {
43 Some(name) => {
44 log::info!("Inferred package name = {}", name);
45 opts.build.package_name = Some(name.clone());
46 name
47 }
48 None => {
49 log::error!(
50 "Can't find vertigo project in {} (no cdylib member)",
51 ws.get_root_dir()
52 );
53 return Err(ErrorCode::CantFindCdylibMember);
54 }
55 },
56 };
57
58 log::info!("package_name ==> {package_name:?}");
59
60 let path = ws.find_package_path(&package_name);
61 log::info!("path ==> {path:?}");
62
63 let Some(path) = path else {
64 log::error!("package not found ==> {:?}", opts.build.package_name);
65 return Err(ErrorCode::PackageNameNotFound);
66 };
67
68 let excludes = [path.join("target"), path.join(opts.common.dest_dir.clone())];
69
70 let notify_build = Arc::new(Notify::new());
71
72 let watch_result = notify::RecommendedWatcher::new(
73 {
74 let notify_build = notify_build.clone();
75
76 move |res: Result<notify::Event, _>| match res {
77 Ok(event) => {
78 if event.paths.iter().all(|path| {
79 for exclude_path in &excludes {
80 if path.starts_with(exclude_path) {
81 return true;
82 }
83 }
84 false
85 }) {
86 return;
87 }
88 log::info!("event: {:?}", event);
89 notify_build.notify_one();
90 }
91 Err(e) => {
92 log::error!("watch error: {:?}", e);
93 }
94 }
95 },
96 notify::Config::default().with_poll_interval(std::time::Duration::from_millis(200)),
97 );
98
99 let mut watcher = match watch_result {
100 Ok(watcher) => watcher,
101 Err(error) => {
102 log::error!("error watcher => {error}");
103 return Err(ErrorCode::WatcherError);
104 }
105 };
106
107 let (tx, rx) = tokio::sync::watch::channel(Status::default());
108 let tx = Arc::new(tx);
109
110 tokio::spawn({
111 let cors_middleware = Cors::new()
112 .allow_methods(vec![Method::GET, Method::POST])
113 .max_age(3600);
114
115 let app = Route::new()
116 .at("/events", get(handler_sse))
117 .with(cors_middleware)
118 .data(rx);
119
120 async move {
121 Server::new(TcpListener::bind("127.0.0.1:5555"))
122 .run(app)
123 .await
124 }
125 });
126
127 use notify::Watcher;
128 watcher.watch(&path, RecursiveMode::Recursive).unwrap();
129
130 for watch_path in &opts.add_watch_path {
131 match watcher.watch(Path::new(watch_path), RecursiveMode::Recursive) {
132 Ok(()) => {
133 log::info!("Added `{watch_path}` to watched directories");
134 }
135 Err(err) => {
136 log::error!("Error adding watch dir `{watch_path}`: {err}");
137 return Err(ErrorCode::CantAddWatchDir);
138 }
139 }
140 }
141 let mut version = 0;
142
143 loop {
144 version += 1;
145
146 if let Err(err) = tx.send(Status::Building) {
147 log::error!("Can't contact the browser: {err} (Other watch process already running?)");
148 return Err(ErrorCode::OtherProcessAlreadyRunning);
149 };
150
151 log::info!("build run ...");
152
153 let spawn = build_and_watch(version, tx.clone(), &opts, &ws);
154 notify_build.notified().await;
155 spawn.off();
156 }
157}
158
159fn build_and_watch(
160 version: u32,
161 tx: Arc<Sender<Status>>,
162 opts: &WatchOpts,
163 ws: &Workspace,
164) -> SpawnOwner {
165 let opts = opts.clone();
166 let ws = ws.clone();
167 SpawnOwner::new(async move {
168 sleep(Duration::from_millis(200)).await;
169
170 match crate::build::run_with_ws(opts.to_build_opts(), &ws, true) {
171 Ok(()) => {
172 log::info!("Build successful.");
173
174 let check_spawn = SpawnOwner::new(async move {
175 let _ = Retry::spawn(
176 FibonacciBackoff::from_millis(100).max_delay(Duration::from_secs(4)),
177 || is_http_server_listening(opts.serve.port),
178 )
179 .await;
180
181 let Ok(()) = tx.send(Status::Version(version)) else {
182 exit(ErrorCode::WatchPipeBroken as i32)
183 };
184 });
185
186 let opts = opts.clone();
187
188 log::info!("Spawning serve command...");
189 let (serve_params, port_watch) = opts.to_serve_opts();
190
191 if let Err(error_code) = crate::serve::run(serve_params, Some(port_watch)).await {
192 log::error!("Error {} while running server", error_code as i32);
193 exit(error_code as i32)
194 }
195
196 check_spawn.off();
197 }
198 Err(_) => {
199 log::error!("Build run failed. Waiting for changes...");
200
201 let Ok(()) = tx.send(Status::Errors) else {
202 exit(ErrorCode::WatchPipeBroken as i32)
203 };
204 }
205 };
206 })
207}