1use crate::core::prelude::*;
2use actix::ActorContext;
3use actix::{Actor, AsyncContext, Handler, Message, StreamHandler};
4use actix_web::{web, HttpRequest, HttpResponse};
5use actix_web_actors::ws;
6use notify::{Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher};
7use serde::{Deserialize, Serialize};
8use std::collections::HashMap;
9use std::sync::{Arc, RwLock};
10use std::time::Duration;
11use tokio::sync::broadcast;
12
13#[derive(Debug, Clone, Serialize, Deserialize, Message)]
14#[rtype(result = "()")]
15pub struct FileChangeEvent {
16 pub event_type: String,
17 pub file_path: String,
18 pub server_name: String,
19 pub port: u16,
20 pub timestamp: u64,
21 pub file_extension: Option<String>,
22}
23
24#[derive(Debug)]
25pub struct WatchdogManager {
26 watchers: Arc<RwLock<HashMap<String, RecommendedWatcher>>>,
27 sender: broadcast::Sender<FileChangeEvent>,
28}
29
30impl Default for WatchdogManager {
31 fn default() -> Self {
32 let (sender, _) = broadcast::channel(1000);
33 Self {
34 watchers: Arc::new(RwLock::new(HashMap::new())),
35 sender,
36 }
37 }
38}
39
40impl WatchdogManager {
41 pub fn new() -> Self {
42 Self::default()
43 }
44
45 pub fn subscribe(&self) -> broadcast::Receiver<FileChangeEvent> {
46 self.sender.subscribe()
47 }
48
49 pub fn start_watching(&self, server_name: &str, port: u16) -> Result<()> {
50 let base_dir = crate::core::helpers::get_base_dir()?;
51
52 let watch_path = base_dir
53 .join("www")
54 .join(format!("{}-[{}]", server_name, port));
55
56 if !watch_path.exists() {
57 return Err(AppError::Validation(format!(
58 "Server directory does not exist: {:?}",
59 watch_path
60 )));
61 }
62
63 let server_key = format!("{}:{}", server_name, port);
64 let sender = self.sender.clone();
65 let server_name_owned = server_name.to_owned();
66
67 let mut watcher =
68 notify::recommended_watcher(move |res: notify::Result<Event>| match res {
69 Ok(event) => {
70 if let Err(e) = handle_file_event(&event, &server_name_owned, port, &sender) {
71 log::error!("Error handling file event: {}", e);
72 }
73 }
74 Err(e) => log::error!("Watch error: {:?}", e),
75 })
76 .map_err(|e| AppError::Validation(format!("Failed to create watcher: {}", e)))?;
77
78 watcher
79 .watch(&watch_path, RecursiveMode::Recursive)
80 .map_err(|e| AppError::Validation(format!("Failed to start watching: {}", e)))?;
81
82 let mut watchers = self.watchers.write().unwrap_or_else(|p| p.into_inner());
83 watchers.insert(server_key.clone(), watcher);
84
85 log::info!(
86 "Started file watching for server {} on port {} at {:?}",
87 server_name,
88 port,
89 watch_path
90 );
91 Ok(())
92 }
93
94 pub fn stop_watching(&self, server_name: &str, port: u16) -> Result<()> {
95 let server_key = format!("{}:{}", server_name, port);
96 let mut watchers = self.watchers.write().unwrap_or_else(|p| p.into_inner());
97
98 if let Some(_watcher) = watchers.remove(&server_key) {
99 log::info!(
100 "Stopped file watching for server {} on port {}",
101 server_name,
102 port
103 );
104 }
105
106 Ok(())
107 }
108
109 pub fn get_active_watchers(&self) -> Vec<String> {
110 let watchers = self.watchers.read().unwrap_or_else(|p| p.into_inner());
111 watchers.keys().cloned().collect()
112 }
113}
114
115fn handle_file_event(
116 event: &Event,
117 server_name: &str,
118 port: u16,
119 sender: &broadcast::Sender<FileChangeEvent>,
120) -> Result<()> {
121 let event_type = match event.kind {
123 EventKind::Create(_) => "created",
124 EventKind::Modify(_) => "modified",
125 EventKind::Remove(_) => "deleted",
126 _ => return Ok(()),
127 };
128
129 for path in &event.paths {
130 if let Some(file_name) = path.file_name() {
132 let name = file_name.to_string_lossy();
133 if name.starts_with('.')
134 || name.ends_with('~')
135 || name.contains(".tmp")
136 || name.contains(".swp")
137 {
138 continue;
139 }
140 }
141
142 let file_extension = path
143 .extension()
144 .and_then(|ext| ext.to_str())
145 .map(|s| s.to_string());
146
147 if let Some(ref ext) = file_extension {
149 if ![
150 "html", "css", "js", "json", "txt", "md", "svg", "png", "jpg", "jpeg", "gif", "ico",
151 ]
152 .contains(&ext.as_str())
153 {
154 continue;
155 }
156 }
157
158 let change_event = FileChangeEvent {
159 event_type: event_type.to_string(),
160 file_path: path.to_string_lossy().to_string(),
161 server_name: server_name.to_string(),
162 port,
163 timestamp: std::time::SystemTime::now()
164 .duration_since(std::time::UNIX_EPOCH)
165 .unwrap_or_default()
166 .as_secs(),
167 file_extension,
168 };
169
170 if let Err(e) = sender.send(change_event) {
171 log::error!("Failed to send file change event: {}", e);
172 }
173 }
174
175 Ok(())
176}
177
178pub struct HotReloadWs {
180 receiver: Option<broadcast::Receiver<FileChangeEvent>>,
181 server_filter: Option<String>, }
183
184impl Actor for HotReloadWs {
185 type Context = ws::WebsocketContext<Self>;
186
187 fn started(&mut self, ctx: &mut Self::Context) {
188 log::debug!("WebSocket connection established for hot reload");
189
190 if let Some(mut receiver) = self.receiver.take() {
191 let addr = ctx.address();
192
193 tokio::spawn(async move {
194 loop {
195 match receiver.recv().await {
196 Ok(event) => {
197 addr.do_send(event);
198 }
199 Err(broadcast::error::RecvError::Lagged(skipped)) => {
200 log::warn!("WebSocket lagged, skipped {} events", skipped);
201 }
202 Err(broadcast::error::RecvError::Closed) => {
203 log::debug!("WebSocket event channel closed");
204 break;
205 }
206 }
207 }
208 });
209 }
210
211 ctx.run_interval(Duration::from_secs(30), |_, ctx| {
213 ctx.ping(b"");
214 });
215 }
216}
217
218impl StreamHandler<std::result::Result<ws::Message, ws::ProtocolError>> for HotReloadWs {
219 fn handle(
220 &mut self,
221 msg: std::result::Result<ws::Message, ws::ProtocolError>,
222 ctx: &mut Self::Context,
223 ) {
224 match msg {
225 Ok(ws::Message::Ping(msg)) => ctx.pong(&msg),
226 Ok(ws::Message::Pong(_)) => {}
227 Ok(ws::Message::Text(text)) => {
228 log::debug!("WebSocket received: {}", text);
229 }
230 Ok(ws::Message::Close(reason)) => {
231 log::debug!("WebSocket closing: {:?}", reason);
232 ctx.stop();
233 }
234 _ => ctx.stop(),
235 }
236 }
237}
238
239impl Handler<FileChangeEvent> for HotReloadWs {
240 type Result = ();
241
242 fn handle(&mut self, msg: FileChangeEvent, ctx: &mut Self::Context) -> Self::Result {
243 if let Some(ref filter) = self.server_filter {
245 let event_key = format!("{}:{}", msg.server_name, msg.port);
246 if *filter != event_key {
247 return;
248 }
249 }
250
251 let json = match serde_json::to_string(&msg) {
252 Ok(json) => json,
253 Err(e) => {
254 log::error!("Failed to serialize file change event: {}", e);
255 return;
256 }
257 };
258
259 ctx.text(json);
260 }
261}
262
263pub async fn ws_hot_reload(
267 req: HttpRequest,
268 stream: web::Payload,
269 data: web::Data<WatchdogManager>,
270) -> std::result::Result<HttpResponse, actix_web::Error> {
271 let server_filter = req
272 .query_string()
273 .split('&')
274 .find_map(|param| {
275 if param.starts_with("server=") {
276 param.strip_prefix("server=")
277 } else {
278 None
279 }
280 })
281 .map(|s| s.to_string());
282
283 let ws_actor = HotReloadWs {
284 receiver: Some(data.subscribe()),
285 server_filter,
286 };
287
288 ws::start(ws_actor, &req, stream)
289}
290
291use std::sync::OnceLock;
293static WATCHDOG_MANAGER: OnceLock<Arc<WatchdogManager>> = OnceLock::new();
294
295pub fn get_watchdog_manager() -> &'static Arc<WatchdogManager> {
296 WATCHDOG_MANAGER.get_or_init(|| Arc::new(WatchdogManager::new()))
297}
298
299pub fn start_server_watching(server_name: &str, port: u16) -> Result<()> {
300 get_watchdog_manager().start_watching(server_name, port)
301}
302
303pub fn stop_server_watching(server_name: &str, port: u16) -> Result<()> {
304 get_watchdog_manager().stop_watching(server_name, port)
305}