1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93
use futures_util::{sink::SinkExt, stream::StreamExt};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use tokio::sync::{mpsc, RwLock};
use warp::ws::{Message as SocketMessage, WebSocket};
use warp::Filter;
#[derive(Serialize, Deserialize, Debug)]
pub struct GenericMessage {
pub msg_type: String,
pub payload: Value,
}
type UserId = String;
type Channel = String;
type ClientSender = mpsc::UnboundedSender<SocketMessage>;
#[derive(Debug, Clone)]
pub struct Client {
user_id: UserId,
sender: ClientSender,
subscriptions: HashSet<Channel>,
}
#[derive(Debug, Clone)]
pub struct CnctdSocket {
clients: Arc<RwLock<HashMap<UserId, Client>>>,
subscriptions: Arc<RwLock<HashMap<Channel, HashSet<UserId>>>>,
}
impl CnctdSocket {
pub fn new() -> Self {
Self {
clients: Arc::new(RwLock::new(HashMap::new())),
subscriptions: Arc::new(RwLock::new(HashMap::new())),
}
}
// pub async fn start(port: &str) -> anyhow::Result<()> {
// let socket_server = Arc::new(Self::new());
// let ws_route = warp::path("ws")
// .and(warp::ws())
// .map(move |ws: warp::ws::Ws| {
// let server_clone = Arc::clone(&socket_server);
// ws.on_upgrade(move |socket| server_clone.handle_ws_connection(socket))
// });
// let ip_address: [u8; 4] = [0, 0, 0, 0];
// let parsed_port = port.parse::<u16>()?;
// let socket_addr = std::net::SocketAddr::from((ip_address, parsed_port));
// warp::serve(ws_route)
// .run(socket_addr)
// .await;
// Ok(())
// }
// async fn handle_ws_connection(&self, ws: WebSocket) {
// let (mut sender, mut receiver) = ws.split();
// while let Some(result) = receiver.next().await {
// match result {
// Ok(msg) if msg.is_text() => {
// if let Ok(text) = msg.to_str() {
// if let Ok(generic_msg) = serde_json::from_str::<GenericMessage>(text) {
// println!("Received message: {:?}", generic_msg);
// let response = GenericMessage {
// msg_type: "response".to_string(),
// payload: Value::String("Processed your message".to_string()),
// };
// if let Ok(response_text) = serde_json::to_string(&response) {
// let _ = sender.send(SocketMessage::text(response_text)).await;
// // Optionally handle send error here
// }
// }
// }
// },
// Err(e) => {
// eprintln!("WebSocket error: {:?}", e);
// break;
// },
// _ => {} // Optionally handle binary messages or close messages here
// }
// }
// }
}