1use crate::core::prelude::*;
2use actix::ActorContext;
3use actix::{Actor, AsyncContext, Handler, Message, StreamHandler};
4use actix_web::{web, HttpRequest, HttpResponse};
5use actix_web_actors::ws;
6use notify::{Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher};
7use serde::{Deserialize, Serialize};
8use std::collections::HashMap;
9use std::sync::{Arc, RwLock};
10use std::time::Duration;
11use tokio::sync::broadcast;
12
13#[derive(Debug, Clone, Serialize, Deserialize, Message)]
14#[rtype(result = "()")]
15pub struct FileChangeEvent {
16 pub event_type: String,
17 pub file_path: String,
18 pub server_name: String,
19 pub port: u16,
20 pub timestamp: u64,
21 pub file_extension: Option<String>,
22}
23
24#[derive(Debug)]
25pub struct WatchdogManager {
26 watchers: Arc<RwLock<HashMap<String, RecommendedWatcher>>>,
27 sender: broadcast::Sender<FileChangeEvent>,
28}
29
30impl Default for WatchdogManager {
31 fn default() -> Self {
32 let (sender, _) = broadcast::channel(1000);
33 Self {
34 watchers: Arc::new(RwLock::new(HashMap::new())),
35 sender,
36 }
37 }
38}
39
40impl WatchdogManager {
41 pub fn new() -> Self {
42 Self::default()
43 }
44
45 pub fn subscribe(&self) -> broadcast::Receiver<FileChangeEvent> {
46 self.sender.subscribe()
47 }
48
49 pub fn start_watching(&self, server_name: &str, port: u16) -> Result<()> {
50 let exe_path = std::env::current_exe().map_err(AppError::Io)?;
51 let base_dir = exe_path.parent().ok_or_else(|| {
52 AppError::Validation("Cannot determine executable directory".to_string())
53 })?;
54
55 let watch_path = base_dir
56 .join("www")
57 .join(format!("{}-[{}]", server_name, port));
58
59 if !watch_path.exists() {
60 return Err(AppError::Validation(format!(
61 "Server directory does not exist: {:?}",
62 watch_path
63 )));
64 }
65
66 let server_key = format!("{}:{}", server_name, port);
67 let sender = self.sender.clone();
68 let server_name_owned = server_name.to_owned();
69
70 let mut watcher =
71 notify::recommended_watcher(move |res: notify::Result<Event>| match res {
72 Ok(event) => {
73 if let Err(e) = handle_file_event(&event, &server_name_owned, port, &sender) {
74 log::error!("Error handling file event: {}", e);
75 }
76 }
77 Err(e) => log::error!("Watch error: {:?}", e),
78 })
79 .map_err(|e| AppError::Validation(format!("Failed to create watcher: {}", e)))?;
80
81 watcher
82 .watch(&watch_path, RecursiveMode::Recursive)
83 .map_err(|e| AppError::Validation(format!("Failed to start watching: {}", e)))?;
84
85 let mut watchers = self.watchers.write().unwrap();
86 watchers.insert(server_key.clone(), watcher);
87
88 log::info!(
89 "Started file watching for server {} on port {} at {:?}",
90 server_name,
91 port,
92 watch_path
93 );
94 Ok(())
95 }
96
97 pub fn stop_watching(&self, server_name: &str, port: u16) -> Result<()> {
98 let server_key = format!("{}:{}", server_name, port);
99 let mut watchers = self.watchers.write().unwrap();
100
101 if let Some(_watcher) = watchers.remove(&server_key) {
102 log::info!(
103 "Stopped file watching for server {} on port {}",
104 server_name,
105 port
106 );
107 }
108
109 Ok(())
110 }
111
112 pub fn get_active_watchers(&self) -> Vec<String> {
113 let watchers = self.watchers.read().unwrap();
114 watchers.keys().cloned().collect()
115 }
116}
117
118fn handle_file_event(
119 event: &Event,
120 server_name: &str,
121 port: u16,
122 sender: &broadcast::Sender<FileChangeEvent>,
123) -> Result<()> {
124 let event_type = match event.kind {
126 EventKind::Create(_) => "created",
127 EventKind::Modify(_) => "modified",
128 EventKind::Remove(_) => "deleted",
129 _ => return Ok(()), };
131
132 for path in &event.paths {
133 if let Some(file_name) = path.file_name() {
135 let name = file_name.to_string_lossy();
136 if name.starts_with('.')
137 || name.ends_with('~')
138 || name.contains(".tmp")
139 || name.contains(".swp")
140 {
141 continue;
142 }
143 }
144
145 let file_extension = path
146 .extension()
147 .and_then(|ext| ext.to_str())
148 .map(|s| s.to_string());
149
150 if let Some(ref ext) = file_extension {
152 if ![
153 "html", "css", "js", "json", "txt", "md", "svg", "png", "jpg", "jpeg", "gif", "ico",
154 ]
155 .contains(&ext.as_str())
156 {
157 continue;
158 }
159 }
160
161 let change_event = FileChangeEvent {
162 event_type: event_type.to_string(),
163 file_path: path.to_string_lossy().to_string(),
164 server_name: server_name.to_string(),
165 port,
166 timestamp: std::time::SystemTime::now()
167 .duration_since(std::time::UNIX_EPOCH)
168 .unwrap_or_default()
169 .as_secs(),
170 file_extension,
171 };
172
173 if let Err(e) = sender.send(change_event) {
174 log::error!("Failed to send file change event: {}", e);
175 }
176 }
177
178 Ok(())
179}
180
181pub struct HotReloadWs {
183 receiver: Option<broadcast::Receiver<FileChangeEvent>>,
184 server_filter: Option<String>, }
186
187impl Actor for HotReloadWs {
188 type Context = ws::WebsocketContext<Self>;
189
190 fn started(&mut self, ctx: &mut Self::Context) {
191 log::debug!("WebSocket connection established for hot reload");
192
193 if let Some(mut receiver) = self.receiver.take() {
194 let addr = ctx.address();
195
196 tokio::spawn(async move {
197 loop {
198 match receiver.recv().await {
199 Ok(event) => {
200 addr.do_send(event);
201 }
202 Err(broadcast::error::RecvError::Lagged(skipped)) => {
203 log::warn!("WebSocket lagged, skipped {} events", skipped);
204 }
205 Err(broadcast::error::RecvError::Closed) => {
206 log::debug!("WebSocket event channel closed");
207 break;
208 }
209 }
210 }
211 });
212 }
213
214 ctx.run_interval(Duration::from_secs(30), |_, ctx| {
216 ctx.ping(b"");
217 });
218 }
219}
220
221impl StreamHandler<std::result::Result<ws::Message, ws::ProtocolError>> for HotReloadWs {
222 fn handle(
223 &mut self,
224 msg: std::result::Result<ws::Message, ws::ProtocolError>,
225 ctx: &mut Self::Context,
226 ) {
227 match msg {
228 Ok(ws::Message::Ping(msg)) => ctx.pong(&msg),
229 Ok(ws::Message::Pong(_)) => {}
230 Ok(ws::Message::Text(text)) => {
231 log::debug!("WebSocket received: {}", text);
232 }
233 Ok(ws::Message::Close(reason)) => {
234 log::debug!("WebSocket closing: {:?}", reason);
235 ctx.stop();
236 }
237 _ => ctx.stop(),
238 }
239 }
240}
241
242impl Handler<FileChangeEvent> for HotReloadWs {
243 type Result = ();
244
245 fn handle(&mut self, msg: FileChangeEvent, ctx: &mut Self::Context) -> Self::Result {
246 if let Some(ref filter) = self.server_filter {
248 let event_key = format!("{}:{}", msg.server_name, msg.port);
249 if *filter != event_key {
250 return;
251 }
252 }
253
254 let json = match serde_json::to_string(&msg) {
255 Ok(json) => json,
256 Err(e) => {
257 log::error!("Failed to serialize file change event: {}", e);
258 return;
259 }
260 };
261
262 ctx.text(json);
263 }
264}
265
266pub async fn ws_hot_reload(
268 req: HttpRequest,
269 stream: web::Payload,
270 data: web::Data<Arc<WatchdogManager>>,
271) -> std::result::Result<HttpResponse, actix_web::Error> {
272 let server_filter = req
273 .query_string()
274 .split('&')
275 .find_map(|param| {
276 if param.starts_with("server=") {
277 param.strip_prefix("server=")
278 } else {
279 None
280 }
281 })
282 .map(|s| s.to_string());
283
284 let ws_actor = HotReloadWs {
285 receiver: Some(data.subscribe()),
286 server_filter,
287 };
288
289 ws::start(ws_actor, &req, stream)
290}
291
292use std::sync::OnceLock;
294static WATCHDOG_MANAGER: OnceLock<Arc<WatchdogManager>> = OnceLock::new();
295
296pub fn get_watchdog_manager() -> &'static Arc<WatchdogManager> {
297 WATCHDOG_MANAGER.get_or_init(|| Arc::new(WatchdogManager::new()))
298}
299
300pub fn start_server_watching(server_name: &str, port: u16) -> Result<()> {
301 get_watchdog_manager().start_watching(server_name, port)
302}
303
304pub fn stop_server_watching(server_name: &str, port: u16) -> Result<()> {
305 get_watchdog_manager().stop_watching(server_name, port)
306}