rush_sync_server/server/
watchdog.rs

1use crate::core::prelude::*;
2use actix::ActorContext;
3use actix::{Actor, AsyncContext, Handler, Message, StreamHandler};
4use actix_web::{web, HttpRequest, HttpResponse};
5use actix_web_actors::ws;
6use notify::{Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher};
7use serde::{Deserialize, Serialize};
8use std::collections::HashMap;
9use std::sync::{Arc, RwLock};
10use std::time::Duration;
11use tokio::sync::broadcast;
12
13#[derive(Debug, Clone, Serialize, Deserialize, Message)]
14#[rtype(result = "()")]
15pub struct FileChangeEvent {
16    pub event_type: String,
17    pub file_path: String,
18    pub server_name: String,
19    pub port: u16,
20    pub timestamp: u64,
21    pub file_extension: Option<String>,
22}
23
24#[derive(Debug)]
25pub struct WatchdogManager {
26    watchers: Arc<RwLock<HashMap<String, RecommendedWatcher>>>,
27    sender: broadcast::Sender<FileChangeEvent>,
28}
29
30impl Default for WatchdogManager {
31    fn default() -> Self {
32        let (sender, _) = broadcast::channel(1000);
33        Self {
34            watchers: Arc::new(RwLock::new(HashMap::new())),
35            sender,
36        }
37    }
38}
39
40impl WatchdogManager {
41    pub fn new() -> Self {
42        Self::default()
43    }
44
45    pub fn subscribe(&self) -> broadcast::Receiver<FileChangeEvent> {
46        self.sender.subscribe()
47    }
48
49    pub fn start_watching(&self, server_name: &str, port: u16) -> Result<()> {
50        let exe_path = std::env::current_exe().map_err(AppError::Io)?;
51        let base_dir = exe_path.parent().ok_or_else(|| {
52            AppError::Validation("Cannot determine executable directory".to_string())
53        })?;
54
55        let watch_path = base_dir
56            .join("www")
57            .join(format!("{}-[{}]", server_name, port));
58
59        if !watch_path.exists() {
60            return Err(AppError::Validation(format!(
61                "Server directory does not exist: {:?}",
62                watch_path
63            )));
64        }
65
66        let server_key = format!("{}:{}", server_name, port);
67        let sender = self.sender.clone();
68        let server_name_owned = server_name.to_owned();
69
70        let mut watcher =
71            notify::recommended_watcher(move |res: notify::Result<Event>| match res {
72                Ok(event) => {
73                    if let Err(e) = handle_file_event(&event, &server_name_owned, port, &sender) {
74                        log::error!("Error handling file event: {}", e);
75                    }
76                }
77                Err(e) => log::error!("Watch error: {:?}", e),
78            })
79            .map_err(|e| AppError::Validation(format!("Failed to create watcher: {}", e)))?;
80
81        watcher
82            .watch(&watch_path, RecursiveMode::Recursive)
83            .map_err(|e| AppError::Validation(format!("Failed to start watching: {}", e)))?;
84
85        let mut watchers = self.watchers.write().unwrap();
86        watchers.insert(server_key.clone(), watcher);
87
88        log::info!(
89            "Started file watching for server {} on port {} at {:?}",
90            server_name,
91            port,
92            watch_path
93        );
94        Ok(())
95    }
96
97    pub fn stop_watching(&self, server_name: &str, port: u16) -> Result<()> {
98        let server_key = format!("{}:{}", server_name, port);
99        let mut watchers = self.watchers.write().unwrap();
100
101        if let Some(_watcher) = watchers.remove(&server_key) {
102            log::info!(
103                "Stopped file watching for server {} on port {}",
104                server_name,
105                port
106            );
107        }
108
109        Ok(())
110    }
111
112    pub fn get_active_watchers(&self) -> Vec<String> {
113        let watchers = self.watchers.read().unwrap();
114        watchers.keys().cloned().collect()
115    }
116}
117
118fn handle_file_event(
119    event: &Event,
120    server_name: &str,
121    port: u16,
122    sender: &broadcast::Sender<FileChangeEvent>,
123) -> Result<()> {
124    // Nur relevante Events verarbeiten
125    let event_type = match event.kind {
126        EventKind::Create(_) => "created",
127        EventKind::Modify(_) => "modified",
128        EventKind::Remove(_) => "deleted",
129        _ => return Ok(()), // Ignore andere Events
130    };
131
132    for path in &event.paths {
133        // Skip temporäre Dateien und Backups
134        if let Some(file_name) = path.file_name() {
135            let name = file_name.to_string_lossy();
136            if name.starts_with('.')
137                || name.ends_with('~')
138                || name.contains(".tmp")
139                || name.contains(".swp")
140            {
141                continue;
142            }
143        }
144
145        let file_extension = path
146            .extension()
147            .and_then(|ext| ext.to_str())
148            .map(|s| s.to_string());
149
150        // Nur Web-relevante Dateien
151        if let Some(ref ext) = file_extension {
152            if ![
153                "html", "css", "js", "json", "txt", "md", "svg", "png", "jpg", "jpeg", "gif", "ico",
154            ]
155            .contains(&ext.as_str())
156            {
157                continue;
158            }
159        }
160
161        let change_event = FileChangeEvent {
162            event_type: event_type.to_string(),
163            file_path: path.to_string_lossy().to_string(),
164            server_name: server_name.to_string(),
165            port,
166            timestamp: std::time::SystemTime::now()
167                .duration_since(std::time::UNIX_EPOCH)
168                .unwrap_or_default()
169                .as_secs(),
170            file_extension,
171        };
172
173        if let Err(e) = sender.send(change_event) {
174            log::error!("Failed to send file change event: {}", e);
175        }
176    }
177
178    Ok(())
179}
180
181// WebSocket Actor für Hot Reload
182pub struct HotReloadWs {
183    receiver: Option<broadcast::Receiver<FileChangeEvent>>,
184    server_filter: Option<String>, // Format: "name:port"
185}
186
187impl Actor for HotReloadWs {
188    type Context = ws::WebsocketContext<Self>;
189
190    fn started(&mut self, ctx: &mut Self::Context) {
191        log::debug!("WebSocket connection established for hot reload");
192
193        if let Some(mut receiver) = self.receiver.take() {
194            let addr = ctx.address();
195
196            tokio::spawn(async move {
197                loop {
198                    match receiver.recv().await {
199                        Ok(event) => {
200                            addr.do_send(event);
201                        }
202                        Err(broadcast::error::RecvError::Lagged(skipped)) => {
203                            log::warn!("WebSocket lagged, skipped {} events", skipped);
204                        }
205                        Err(broadcast::error::RecvError::Closed) => {
206                            log::debug!("WebSocket event channel closed");
207                            break;
208                        }
209                    }
210                }
211            });
212        }
213
214        // Ping alle 30 Sekunden
215        ctx.run_interval(Duration::from_secs(30), |_, ctx| {
216            ctx.ping(b"");
217        });
218    }
219}
220
221impl StreamHandler<std::result::Result<ws::Message, ws::ProtocolError>> for HotReloadWs {
222    fn handle(
223        &mut self,
224        msg: std::result::Result<ws::Message, ws::ProtocolError>,
225        ctx: &mut Self::Context,
226    ) {
227        match msg {
228            Ok(ws::Message::Ping(msg)) => ctx.pong(&msg),
229            Ok(ws::Message::Pong(_)) => {}
230            Ok(ws::Message::Text(text)) => {
231                log::debug!("WebSocket received: {}", text);
232            }
233            Ok(ws::Message::Close(reason)) => {
234                log::debug!("WebSocket closing: {:?}", reason);
235                ctx.stop();
236            }
237            _ => ctx.stop(),
238        }
239    }
240}
241
242impl Handler<FileChangeEvent> for HotReloadWs {
243    type Result = ();
244
245    fn handle(&mut self, msg: FileChangeEvent, ctx: &mut Self::Context) -> Self::Result {
246        // Filter nach Server wenn gesetzt
247        if let Some(ref filter) = self.server_filter {
248            let event_key = format!("{}:{}", msg.server_name, msg.port);
249            if *filter != event_key {
250                return;
251            }
252        }
253
254        let json = match serde_json::to_string(&msg) {
255            Ok(json) => json,
256            Err(e) => {
257                log::error!("Failed to serialize file change event: {}", e);
258                return;
259            }
260        };
261
262        ctx.text(json);
263    }
264}
265
266// WebSocket Endpoint Handler
267pub async fn ws_hot_reload(
268    req: HttpRequest,
269    stream: web::Payload,
270    data: web::Data<Arc<WatchdogManager>>,
271) -> std::result::Result<HttpResponse, actix_web::Error> {
272    let server_filter = req
273        .query_string()
274        .split('&')
275        .find_map(|param| {
276            if param.starts_with("server=") {
277                param.strip_prefix("server=")
278            } else {
279                None
280            }
281        })
282        .map(|s| s.to_string());
283
284    let ws_actor = HotReloadWs {
285        receiver: Some(data.subscribe()),
286        server_filter,
287    };
288
289    ws::start(ws_actor, &req, stream)
290}
291
292// Static globals für Manager
293use std::sync::OnceLock;
294static WATCHDOG_MANAGER: OnceLock<Arc<WatchdogManager>> = OnceLock::new();
295
296pub fn get_watchdog_manager() -> &'static Arc<WatchdogManager> {
297    WATCHDOG_MANAGER.get_or_init(|| Arc::new(WatchdogManager::new()))
298}
299
300pub fn start_server_watching(server_name: &str, port: u16) -> Result<()> {
301    get_watchdog_manager().start_watching(server_name, port)
302}
303
304pub fn stop_server_watching(server_name: &str, port: u16) -> Result<()> {
305    get_watchdog_manager().stop_watching(server_name, port)
306}