Skip to main content

aerosocket_server/
handler.rs

1//! WebSocket connection handlers
2//!
3//! This module provides handler abstractions for processing WebSocket connections.
4
5use aerosocket_core::{Message, Result};
6use std::future::Future;
7#[cfg(feature = "wasm-handlers")]
8use std::path::PathBuf;
9use std::pin::Pin;
10#[cfg(feature = "wasm-handlers")]
11use wasmtime::{Engine, Instance, Module, Store};
12
13/// Trait for handling WebSocket connections
14pub trait Handler: Send + Sync + 'static {
15    /// Handle a new connection
16    fn handle<'a>(
17        &'a self,
18        connection: crate::connection::ConnectionHandle,
19    ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'a>>;
20
21    /// Clone the handler
22    fn clone_box(&self) -> Box<dyn Handler>;
23}
24
25impl Clone for Box<dyn Handler> {
26    fn clone(&self) -> Box<dyn Handler> {
27        self.clone_box()
28    }
29}
30
31/// Boxed handler type
32pub type BoxedHandler = Box<dyn Handler>;
33
34/// Default handler implementation
35#[derive(Debug, Clone)]
36pub struct DefaultHandler;
37
38impl DefaultHandler {
39    /// Create a new default handler
40    pub fn new() -> Self {
41        Self
42    }
43}
44
45impl Default for DefaultHandler {
46    fn default() -> Self {
47        Self::new()
48    }
49}
50
51impl Handler for DefaultHandler {
52    fn handle<'a>(
53        &'a self,
54        connection: crate::connection::ConnectionHandle,
55    ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'a>> {
56        Box::pin(async move {
57            // Get the connection from the handle
58            let mut conn = connection.try_lock().await.map_err(|_| {
59                aerosocket_core::Error::Other("Failed to lock connection".to_string())
60            })?;
61
62            while let Some(msg) = conn.next().await? {
63                match msg {
64                    Message::Text(text) => {
65                        conn.send_text(text.as_str()).await?;
66                    }
67                    Message::Binary(data) => {
68                        conn.send_binary(data.as_bytes().to_vec()).await?;
69                    }
70                    Message::Ping(_) => {
71                        conn.pong(None).await?;
72                    }
73                    Message::Close(close_msg) => {
74                        let code = close_msg.code();
75                        let reason = close_msg.reason();
76                        conn.close(code, Some(reason)).await?;
77                        break;
78                    }
79                    Message::Pong(_) => {
80                        // Ignore pong messages
81                    }
82                }
83            }
84
85            Ok(())
86        })
87    }
88
89    fn clone_box(&self) -> Box<dyn Handler> {
90        Box::new(self.clone())
91    }
92}
93
94/// Echo handler implementation
95#[derive(Debug, Clone)]
96pub struct EchoHandler;
97
98impl EchoHandler {
99    /// Create a new echo handler
100    pub fn new() -> Self {
101        Self
102    }
103}
104
105impl Default for EchoHandler {
106    fn default() -> Self {
107        Self::new()
108    }
109}
110
111impl Handler for EchoHandler {
112    fn handle<'a>(
113        &'a self,
114        connection: crate::connection::ConnectionHandle,
115    ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'a>> {
116        Box::pin(async move {
117            // Get the connection from the handle
118            let mut conn = connection.try_lock().await.map_err(|_| {
119                aerosocket_core::Error::Other("Failed to lock connection".to_string())
120            })?;
121
122            while let Some(msg) = conn.next().await? {
123                match msg {
124                    Message::Text(text) => {
125                        let echo_text = format!("Echo: {}", text.as_str());
126                        conn.send_text(&echo_text).await?;
127                    }
128                    Message::Binary(data) => {
129                        conn.send_binary(data.as_bytes().to_vec()).await?;
130                    }
131                    Message::Ping(_) => {
132                        conn.pong(None).await?;
133                    }
134                    Message::Close(close_msg) => {
135                        let code = close_msg.code();
136                        let reason = close_msg.reason();
137                        conn.close(code, Some(reason)).await?;
138                        break;
139                    }
140                    Message::Pong(_) => {
141                        // Ignore pong messages
142                    }
143                }
144            }
145
146            Ok(())
147        })
148    }
149
150    fn clone_box(&self) -> Box<dyn Handler> {
151        Box::new(self.clone())
152    }
153}
154
155/// Function-based handler
156#[derive(Clone)]
157pub struct FnHandler<F> {
158    f: F,
159}
160
161impl<F> FnHandler<F> {
162    /// Create a new function-based handler
163    pub fn new(f: F) -> Self {
164        Self { f }
165    }
166}
167
168impl<F> Handler for FnHandler<F>
169where
170    F: Fn(crate::connection::ConnectionHandle) -> Pin<Box<dyn Future<Output = Result<()>> + Send>>
171        + Send
172        + Sync
173        + Clone
174        + 'static,
175{
176    fn handle<'a>(
177        &'a self,
178        connection: crate::connection::ConnectionHandle,
179    ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'a>> {
180        Box::pin((self.f)(connection))
181    }
182
183    fn clone_box(&self) -> Box<dyn Handler> {
184        Box::new(self.clone())
185    }
186}
187
188impl<F> std::fmt::Debug for FnHandler<F> {
189    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
190        f.debug_struct("FnHandler")
191            .field("f", &"<function>")
192            .finish()
193    }
194}
195
196/// Create a handler from a function
197pub fn from_fn<F, Fut>(f: F) -> FnHandler<F>
198where
199    F: Fn(crate::connection::ConnectionHandle) -> Fut + Send + Sync + Clone + 'static,
200    Fut: Future<Output = Result<()>> + Send + 'static,
201{
202    FnHandler::new(f)
203}
204
205#[cfg(feature = "wasm-handlers")]
206#[derive(Clone)]
207pub struct WasmHandler {
208    engine: Engine,
209    module: Module,
210    module_path: PathBuf,
211}
212
213#[cfg(feature = "wasm-handlers")]
214impl WasmHandler {
215    pub fn from_file(path: impl Into<PathBuf>) -> aerosocket_core::Result<Self> {
216        let path_buf = path.into();
217        let engine = Engine::default();
218        let module = Module::from_file(&engine, &path_buf).map_err(|e| {
219            aerosocket_core::Error::Other(format!("Failed to load WASM module: {}", e))
220        })?;
221
222        Ok(Self {
223            engine,
224            module,
225            module_path: path_buf,
226        })
227    }
228}
229
230#[cfg(feature = "wasm-handlers")]
231impl Handler for WasmHandler {
232    fn handle<'a>(
233        &'a self,
234        connection: crate::connection::ConnectionHandle,
235    ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'a>> {
236        Box::pin(async move {
237            let mut conn = connection.try_lock().await.map_err(|_| {
238                aerosocket_core::Error::Other("Failed to lock connection".to_string())
239            })?;
240
241            let mut store = Store::new(&self.engine, ());
242            let instance = Instance::new(&mut store, &self.module, &[]).map_err(|e| {
243                aerosocket_core::Error::Other(format!("Failed to instantiate WASM module: {}", e))
244            })?;
245
246            let memory = instance.get_memory(&mut store, "memory").ok_or_else(|| {
247                aerosocket_core::Error::Other("WASM module missing `memory` export".to_string())
248            })?;
249
250            let func = instance
251                .get_typed_func::<(i32, i32), i32>(&mut store, "on_message")
252                .map_err(|e| {
253                    aerosocket_core::Error::Other(format!(
254                        "Failed to get WASM function `on_message`: {}",
255                        e
256                    ))
257                })?;
258
259            let capacity = memory.data_size(&store);
260
261            while let Some(msg) = conn.next().await? {
262                match msg {
263                    Message::Text(text) => {
264                        let bytes = text.as_bytes();
265                        if bytes.len() > capacity {
266                            return Err(aerosocket_core::Error::Other(
267                                "WASM memory too small for incoming message".to_string(),
268                            ));
269                        }
270
271                        memory.write(&mut store, 0, bytes).map_err(|e| {
272                            aerosocket_core::Error::Other(format!(
273                                "Failed to write to WASM memory: {}",
274                                e
275                            ))
276                        })?;
277
278                        let out_len =
279                            func.call(&mut store, (0, bytes.len() as i32))
280                                .map_err(|e| {
281                                    aerosocket_core::Error::Other(format!(
282                                        "WASM `on_message` call failed: {}",
283                                        e
284                                    ))
285                                })?;
286
287                        if out_len > 0 {
288                            let mut out = vec![0u8; out_len as usize];
289                            memory.read(&mut store, 0, &mut out).map_err(|e| {
290                                aerosocket_core::Error::Other(format!(
291                                    "Failed to read from WASM memory: {}",
292                                    e
293                                ))
294                            })?;
295
296                            let out_text = String::from_utf8_lossy(&out).to_string();
297                            conn.send_text(&out_text).await?;
298                        }
299                    }
300                    Message::Binary(data) => {
301                        conn.send_binary(data.as_bytes().to_vec()).await?;
302                    }
303                    Message::Ping(_) => {
304                        conn.pong(None).await?;
305                    }
306                    Message::Close(close_msg) => {
307                        let code = close_msg.code();
308                        let reason = close_msg.reason();
309                        conn.close(code, Some(reason)).await?;
310                        break;
311                    }
312                    Message::Pong(_) => {}
313                }
314            }
315
316            Ok(())
317        })
318    }
319
320    fn clone_box(&self) -> Box<dyn Handler> {
321        Box::new(self.clone())
322    }
323}
324
325#[cfg(test)]
326mod tests {
327    use super::*;
328
329    #[tokio::test]
330    async fn test_default_handler() {
331        let _handler = DefaultHandler::new();
332        let remote = "127.0.0.1:12345".parse().unwrap();
333        let local = "127.0.0.1:8080".parse().unwrap();
334        let _connection = crate::connection::Connection::new(remote, local);
335
336        // Note: This test will fail until Connection::next and send are implemented
337        // For now, we just test that the handler can be created
338    }
339
340    #[tokio::test]
341    async fn test_echo_handler() {
342        let _handler = EchoHandler::new();
343        let remote = "127.0.0.1:12345".parse().unwrap();
344        let local = "127.0.0.1:8080".parse().unwrap();
345        let _connection = crate::connection::Connection::new(remote, local);
346
347        // Note: This test will fail until Connection::next and send are implemented
348        // For now, we just test that the handler can be created
349    }
350
351    #[tokio::test]
352    async fn test_fn_handler() {
353        let _handler = from_fn(|_conn| async { Ok(()) });
354
355        let remote = "127.0.0.1:12345".parse().unwrap();
356        let local = "127.0.0.1:8080".parse().unwrap();
357        let _connection = crate::connection::Connection::new(remote, local);
358
359        // Note: This test will fail until Connection::next and send are implemented
360        // For now, we just test that the handler can be created
361    }
362}