rusty_files/server/
websocket.rs1use actix_web::{web, Error, HttpRequest, HttpResponse};
2use actix_ws::Message;
3use futures::StreamExt;
4use serde::Deserialize;
5use std::path::PathBuf;
6use tracing::info;
7
8use crate::server::models::FileEventType;
9use crate::server::state::AppState;
10
11pub async fn websocket_handler(
12 req: HttpRequest,
13 stream: web::Payload,
14 state: web::Data<AppState>,
15) -> Result<HttpResponse, Error> {
16 info!("WebSocket connection request");
17
18 let (res, mut session, mut stream) = actix_ws::handle(&req, stream)?;
19
20 let mut event_rx = state.event_tx.subscribe();
22
23 actix_web::rt::spawn(async move {
25 let close_reason = loop {
26 tokio::select! {
27 Some(Ok(msg)) = stream.next() => {
29 match msg {
30 Message::Text(text) => {
31 if let Ok(filter) = serde_json::from_str::<EventFilter>(&text.to_string()) {
33 info!("Received filter: {:?}", filter);
34 }
36 }
37 Message::Ping(bytes) => {
38 if session.pong(&bytes).await.is_err() {
39 break None;
40 }
41 }
42 Message::Close(reason) => {
43 break reason;
44 }
45 _ => {}
46 }
47 }
48 Ok(event) = event_rx.recv() => {
50 if let Ok(json) = serde_json::to_string(&event) {
51 if session.text(json).await.is_err() {
52 break None;
53 }
54 }
55 }
56 else => break None
57 }
58 };
59
60 let _ = session.close(close_reason).await;
61 info!("WebSocket connection closed");
62 });
63
64 Ok(res)
65}
66
67#[derive(Debug, Deserialize)]
68#[allow(dead_code)]
69struct EventFilter {
70 paths: Option<Vec<PathBuf>>,
71 event_types: Option<Vec<FileEventType>>,
72}