rusty_files/server/
websocket.rs

1use actix_web::{web, Error, HttpRequest, HttpResponse};
2use actix_ws::Message;
3use futures::StreamExt;
4use serde::Deserialize;
5use std::path::PathBuf;
6use tracing::info;
7
8use crate::server::models::FileEventType;
9use crate::server::state::AppState;
10
11pub async fn websocket_handler(
12    req: HttpRequest,
13    stream: web::Payload,
14    state: web::Data<AppState>,
15) -> Result<HttpResponse, Error> {
16    info!("WebSocket connection request");
17
18    let (res, mut session, mut stream) = actix_ws::handle(&req, stream)?;
19
20    // Subscribe to file change events
21    let mut event_rx = state.event_tx.subscribe();
22
23    // Spawn task to forward events to WebSocket
24    actix_web::rt::spawn(async move {
25        let close_reason = loop {
26            tokio::select! {
27                // Handle incoming WebSocket messages
28                Some(Ok(msg)) = stream.next() => {
29                    match msg {
30                        Message::Text(text) => {
31                            // Handle client messages (e.g., filter events)
32                            if let Ok(filter) = serde_json::from_str::<EventFilter>(&text.to_string()) {
33                                info!("Received filter: {:?}", filter);
34                                // Apply filter logic (for future enhancement)
35                            }
36                        }
37                        Message::Ping(bytes) => {
38                            if session.pong(&bytes).await.is_err() {
39                                break None;
40                            }
41                        }
42                        Message::Close(reason) => {
43                            break reason;
44                        }
45                        _ => {}
46                    }
47                }
48                // Forward file change events to client
49                Ok(event) = event_rx.recv() => {
50                    if let Ok(json) = serde_json::to_string(&event) {
51                        if session.text(json).await.is_err() {
52                            break None;
53                        }
54                    }
55                }
56                else => break None
57            }
58        };
59
60        let _ = session.close(close_reason).await;
61        info!("WebSocket connection closed");
62    });
63
64    Ok(res)
65}
66
67#[derive(Debug, Deserialize)]
68#[allow(dead_code)]
69struct EventFilter {
70    paths: Option<Vec<PathBuf>>,
71    event_types: Option<Vec<FileEventType>>,
72}