Skip to main content

rush_sync_server/server/
watchdog.rs

1use crate::core::prelude::*;
2use actix::ActorContext;
3use actix::{Actor, AsyncContext, Handler, Message, StreamHandler};
4use actix_web::{web, HttpRequest, HttpResponse};
5use actix_web_actors::ws;
6use notify::{Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher};
7use serde::{Deserialize, Serialize};
8use std::collections::HashMap;
9use std::sync::{Arc, RwLock};
10use std::time::Duration;
11use tokio::sync::broadcast;
12
13#[derive(Debug, Clone, Serialize, Deserialize, Message)]
14#[rtype(result = "()")]
15pub struct FileChangeEvent {
16    pub event_type: String,
17    pub file_path: String,
18    pub server_name: String,
19    pub port: u16,
20    pub timestamp: u64,
21    pub file_extension: Option<String>,
22}
23
24#[derive(Debug)]
25pub struct WatchdogManager {
26    watchers: Arc<RwLock<HashMap<String, RecommendedWatcher>>>,
27    sender: broadcast::Sender<FileChangeEvent>,
28}
29
30impl Default for WatchdogManager {
31    fn default() -> Self {
32        let (sender, _) = broadcast::channel(1000);
33        Self {
34            watchers: Arc::new(RwLock::new(HashMap::new())),
35            sender,
36        }
37    }
38}
39
40impl WatchdogManager {
41    pub fn new() -> Self {
42        Self::default()
43    }
44
45    pub fn subscribe(&self) -> broadcast::Receiver<FileChangeEvent> {
46        self.sender.subscribe()
47    }
48
49    pub fn start_watching(&self, server_name: &str, port: u16) -> Result<()> {
50        let base_dir = crate::core::helpers::get_base_dir()?;
51
52        let watch_path = base_dir
53            .join("www")
54            .join(format!("{}-[{}]", server_name, port));
55
56        if !watch_path.exists() {
57            return Err(AppError::Validation(format!(
58                "Server directory does not exist: {:?}",
59                watch_path
60            )));
61        }
62
63        let server_key = format!("{}:{}", server_name, port);
64        let sender = self.sender.clone();
65        let server_name_owned = server_name.to_owned();
66
67        let mut watcher =
68            notify::recommended_watcher(move |res: notify::Result<Event>| match res {
69                Ok(event) => {
70                    if let Err(e) = handle_file_event(&event, &server_name_owned, port, &sender) {
71                        log::error!("Error handling file event: {}", e);
72                    }
73                }
74                Err(e) => log::error!("Watch error: {:?}", e),
75            })
76            .map_err(|e| AppError::Validation(format!("Failed to create watcher: {}", e)))?;
77
78        watcher
79            .watch(&watch_path, RecursiveMode::Recursive)
80            .map_err(|e| AppError::Validation(format!("Failed to start watching: {}", e)))?;
81
82        let mut watchers = self.watchers.write().unwrap_or_else(|p| p.into_inner());
83        watchers.insert(server_key.clone(), watcher);
84
85        log::info!(
86            "Started file watching for server {} on port {} at {:?}",
87            server_name,
88            port,
89            watch_path
90        );
91        Ok(())
92    }
93
94    pub fn stop_watching(&self, server_name: &str, port: u16) -> Result<()> {
95        let server_key = format!("{}:{}", server_name, port);
96        let mut watchers = self.watchers.write().unwrap_or_else(|p| p.into_inner());
97
98        if let Some(_watcher) = watchers.remove(&server_key) {
99            log::info!(
100                "Stopped file watching for server {} on port {}",
101                server_name,
102                port
103            );
104        }
105
106        Ok(())
107    }
108
109    pub fn get_active_watchers(&self) -> Vec<String> {
110        let watchers = self.watchers.read().unwrap_or_else(|p| p.into_inner());
111        watchers.keys().cloned().collect()
112    }
113}
114
115fn handle_file_event(
116    event: &Event,
117    server_name: &str,
118    port: u16,
119    sender: &broadcast::Sender<FileChangeEvent>,
120) -> Result<()> {
121    // Only process relevant events
122    let event_type = match event.kind {
123        EventKind::Create(_) => "created",
124        EventKind::Modify(_) => "modified",
125        EventKind::Remove(_) => "deleted",
126        _ => return Ok(()),
127    };
128
129    for path in &event.paths {
130        // Skip temporary files and backups
131        if let Some(file_name) = path.file_name() {
132            let name = file_name.to_string_lossy();
133            if name.starts_with('.')
134                || name.ends_with('~')
135                || name.contains(".tmp")
136                || name.contains(".swp")
137            {
138                continue;
139            }
140        }
141
142        let file_extension = path
143            .extension()
144            .and_then(|ext| ext.to_str())
145            .map(|s| s.to_string());
146
147        // Only web-relevant file types
148        if let Some(ref ext) = file_extension {
149            if ![
150                "html", "css", "js", "json", "txt", "md", "svg", "png", "jpg", "jpeg", "gif", "ico",
151            ]
152            .contains(&ext.as_str())
153            {
154                continue;
155            }
156        }
157
158        let change_event = FileChangeEvent {
159            event_type: event_type.to_string(),
160            file_path: path.to_string_lossy().to_string(),
161            server_name: server_name.to_string(),
162            port,
163            timestamp: std::time::SystemTime::now()
164                .duration_since(std::time::UNIX_EPOCH)
165                .unwrap_or_default()
166                .as_secs(),
167            file_extension,
168        };
169
170        if let Err(e) = sender.send(change_event) {
171            log::error!("Failed to send file change event: {}", e);
172        }
173    }
174
175    Ok(())
176}
177
178// WebSocket actor for hot reload
179pub struct HotReloadWs {
180    receiver: Option<broadcast::Receiver<FileChangeEvent>>,
181    server_filter: Option<String>, // Format: "name:port"
182}
183
184impl Actor for HotReloadWs {
185    type Context = ws::WebsocketContext<Self>;
186
187    fn started(&mut self, ctx: &mut Self::Context) {
188        log::debug!("WebSocket connection established for hot reload");
189
190        if let Some(mut receiver) = self.receiver.take() {
191            let addr = ctx.address();
192
193            tokio::spawn(async move {
194                loop {
195                    match receiver.recv().await {
196                        Ok(event) => {
197                            addr.do_send(event);
198                        }
199                        Err(broadcast::error::RecvError::Lagged(skipped)) => {
200                            log::warn!("WebSocket lagged, skipped {} events", skipped);
201                        }
202                        Err(broadcast::error::RecvError::Closed) => {
203                            log::debug!("WebSocket event channel closed");
204                            break;
205                        }
206                    }
207                }
208            });
209        }
210
211        // Ping every 30 seconds to keep connection alive
212        ctx.run_interval(Duration::from_secs(30), |_, ctx| {
213            ctx.ping(b"");
214        });
215    }
216}
217
218impl StreamHandler<std::result::Result<ws::Message, ws::ProtocolError>> for HotReloadWs {
219    fn handle(
220        &mut self,
221        msg: std::result::Result<ws::Message, ws::ProtocolError>,
222        ctx: &mut Self::Context,
223    ) {
224        match msg {
225            Ok(ws::Message::Ping(msg)) => ctx.pong(&msg),
226            Ok(ws::Message::Pong(_)) => {}
227            Ok(ws::Message::Text(text)) => {
228                log::debug!("WebSocket received: {}", text);
229            }
230            Ok(ws::Message::Close(reason)) => {
231                log::debug!("WebSocket closing: {:?}", reason);
232                ctx.stop();
233            }
234            _ => ctx.stop(),
235        }
236    }
237}
238
239impl Handler<FileChangeEvent> for HotReloadWs {
240    type Result = ();
241
242    fn handle(&mut self, msg: FileChangeEvent, ctx: &mut Self::Context) -> Self::Result {
243        // Filter by server if a filter is set
244        if let Some(ref filter) = self.server_filter {
245            let event_key = format!("{}:{}", msg.server_name, msg.port);
246            if *filter != event_key {
247                return;
248            }
249        }
250
251        let json = match serde_json::to_string(&msg) {
252            Ok(json) => json,
253            Err(e) => {
254                log::error!("Failed to serialize file change event: {}", e);
255                return;
256            }
257        };
258
259        ctx.text(json);
260    }
261}
262
263// WebSocket Endpoint Handler
264// Note: app_data registers Data::from(Arc<WatchdogManager>) which yields Data<WatchdogManager>,
265// so the parameter type must match exactly (not Data<Arc<WatchdogManager>>).
266pub async fn ws_hot_reload(
267    req: HttpRequest,
268    stream: web::Payload,
269    data: web::Data<WatchdogManager>,
270) -> std::result::Result<HttpResponse, actix_web::Error> {
271    let server_filter = req
272        .query_string()
273        .split('&')
274        .find_map(|param| {
275            if param.starts_with("server=") {
276                param.strip_prefix("server=")
277            } else {
278                None
279            }
280        })
281        .map(|s| s.to_string());
282
283    let ws_actor = HotReloadWs {
284        receiver: Some(data.subscribe()),
285        server_filter,
286    };
287
288    ws::start(ws_actor, &req, stream)
289}
290
291// Global watchdog manager singleton
292use std::sync::OnceLock;
293static WATCHDOG_MANAGER: OnceLock<Arc<WatchdogManager>> = OnceLock::new();
294
295pub fn get_watchdog_manager() -> &'static Arc<WatchdogManager> {
296    WATCHDOG_MANAGER.get_or_init(|| Arc::new(WatchdogManager::new()))
297}
298
299pub fn start_server_watching(server_name: &str, port: u16) -> Result<()> {
300    get_watchdog_manager().start_watching(server_name, port)
301}
302
303pub fn stop_server_watching(server_name: &str, port: u16) -> Result<()> {
304    get_watchdog_manager().stop_watching(server_name, port)
305}