aerosocket_server/
handler.rs1use aerosocket_core::{Message, Result};
6use std::future::Future;
7#[cfg(feature = "wasm-handlers")]
8use std::path::PathBuf;
9use std::pin::Pin;
10#[cfg(feature = "wasm-handlers")]
11use wasmtime::{Engine, Instance, Module, Store};
12
13pub trait Handler: Send + Sync + 'static {
15 fn handle<'a>(
17 &'a self,
18 connection: crate::connection::ConnectionHandle,
19 ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'a>>;
20
21 fn clone_box(&self) -> Box<dyn Handler>;
23}
24
25impl Clone for Box<dyn Handler> {
26 fn clone(&self) -> Box<dyn Handler> {
27 self.clone_box()
28 }
29}
30
31pub type BoxedHandler = Box<dyn Handler>;
33
34#[derive(Debug, Clone)]
36pub struct DefaultHandler;
37
38impl DefaultHandler {
39 pub fn new() -> Self {
41 Self
42 }
43}
44
45impl Default for DefaultHandler {
46 fn default() -> Self {
47 Self::new()
48 }
49}
50
51impl Handler for DefaultHandler {
52 fn handle<'a>(
53 &'a self,
54 connection: crate::connection::ConnectionHandle,
55 ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'a>> {
56 Box::pin(async move {
57 let mut conn = connection.try_lock().await.map_err(|_| {
59 aerosocket_core::Error::Other("Failed to lock connection".to_string())
60 })?;
61
62 while let Some(msg) = conn.next().await? {
63 match msg {
64 Message::Text(text) => {
65 conn.send_text(text.as_str()).await?;
66 }
67 Message::Binary(data) => {
68 conn.send_binary(data.as_bytes().to_vec()).await?;
69 }
70 Message::Ping(_) => {
71 conn.pong(None).await?;
72 }
73 Message::Close(close_msg) => {
74 let code = close_msg.code();
75 let reason = close_msg.reason();
76 conn.close(code, Some(reason)).await?;
77 break;
78 }
79 Message::Pong(_) => {
80 }
82 }
83 }
84
85 Ok(())
86 })
87 }
88
89 fn clone_box(&self) -> Box<dyn Handler> {
90 Box::new(self.clone())
91 }
92}
93
94#[derive(Debug, Clone)]
96pub struct EchoHandler;
97
98impl EchoHandler {
99 pub fn new() -> Self {
101 Self
102 }
103}
104
105impl Default for EchoHandler {
106 fn default() -> Self {
107 Self::new()
108 }
109}
110
111impl Handler for EchoHandler {
112 fn handle<'a>(
113 &'a self,
114 connection: crate::connection::ConnectionHandle,
115 ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'a>> {
116 Box::pin(async move {
117 let mut conn = connection.try_lock().await.map_err(|_| {
119 aerosocket_core::Error::Other("Failed to lock connection".to_string())
120 })?;
121
122 while let Some(msg) = conn.next().await? {
123 match msg {
124 Message::Text(text) => {
125 let echo_text = format!("Echo: {}", text.as_str());
126 conn.send_text(&echo_text).await?;
127 }
128 Message::Binary(data) => {
129 conn.send_binary(data.as_bytes().to_vec()).await?;
130 }
131 Message::Ping(_) => {
132 conn.pong(None).await?;
133 }
134 Message::Close(close_msg) => {
135 let code = close_msg.code();
136 let reason = close_msg.reason();
137 conn.close(code, Some(reason)).await?;
138 break;
139 }
140 Message::Pong(_) => {
141 }
143 }
144 }
145
146 Ok(())
147 })
148 }
149
150 fn clone_box(&self) -> Box<dyn Handler> {
151 Box::new(self.clone())
152 }
153}
154
155#[derive(Clone)]
157pub struct FnHandler<F> {
158 f: F,
159}
160
161impl<F> FnHandler<F> {
162 pub fn new(f: F) -> Self {
164 Self { f }
165 }
166}
167
168impl<F> Handler for FnHandler<F>
169where
170 F: Fn(crate::connection::ConnectionHandle) -> Pin<Box<dyn Future<Output = Result<()>> + Send>>
171 + Send
172 + Sync
173 + Clone
174 + 'static,
175{
176 fn handle<'a>(
177 &'a self,
178 connection: crate::connection::ConnectionHandle,
179 ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'a>> {
180 Box::pin((self.f)(connection))
181 }
182
183 fn clone_box(&self) -> Box<dyn Handler> {
184 Box::new(self.clone())
185 }
186}
187
188impl<F> std::fmt::Debug for FnHandler<F> {
189 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
190 f.debug_struct("FnHandler")
191 .field("f", &"<function>")
192 .finish()
193 }
194}
195
196pub fn from_fn<F, Fut>(f: F) -> FnHandler<F>
198where
199 F: Fn(crate::connection::ConnectionHandle) -> Fut + Send + Sync + Clone + 'static,
200 Fut: Future<Output = Result<()>> + Send + 'static,
201{
202 FnHandler::new(f)
203}
204
205#[cfg(feature = "wasm-handlers")]
206#[derive(Clone)]
207pub struct WasmHandler {
208 engine: Engine,
209 module: Module,
210 module_path: PathBuf,
211}
212
213#[cfg(feature = "wasm-handlers")]
214impl WasmHandler {
215 pub fn from_file(path: impl Into<PathBuf>) -> aerosocket_core::Result<Self> {
216 let path_buf = path.into();
217 let engine = Engine::default();
218 let module = Module::from_file(&engine, &path_buf).map_err(|e| {
219 aerosocket_core::Error::Other(format!("Failed to load WASM module: {}", e))
220 })?;
221
222 Ok(Self {
223 engine,
224 module,
225 module_path: path_buf,
226 })
227 }
228}
229
230#[cfg(feature = "wasm-handlers")]
231impl Handler for WasmHandler {
232 fn handle<'a>(
233 &'a self,
234 connection: crate::connection::ConnectionHandle,
235 ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'a>> {
236 Box::pin(async move {
237 let mut conn = connection.try_lock().await.map_err(|_| {
238 aerosocket_core::Error::Other("Failed to lock connection".to_string())
239 })?;
240
241 let mut store = Store::new(&self.engine, ());
242 let instance = Instance::new(&mut store, &self.module, &[]).map_err(|e| {
243 aerosocket_core::Error::Other(format!("Failed to instantiate WASM module: {}", e))
244 })?;
245
246 let memory = instance.get_memory(&mut store, "memory").ok_or_else(|| {
247 aerosocket_core::Error::Other("WASM module missing `memory` export".to_string())
248 })?;
249
250 let func = instance
251 .get_typed_func::<(i32, i32), i32>(&mut store, "on_message")
252 .map_err(|e| {
253 aerosocket_core::Error::Other(format!(
254 "Failed to get WASM function `on_message`: {}",
255 e
256 ))
257 })?;
258
259 let capacity = memory.data_size(&store);
260
261 while let Some(msg) = conn.next().await? {
262 match msg {
263 Message::Text(text) => {
264 let bytes = text.as_bytes();
265 if bytes.len() > capacity {
266 return Err(aerosocket_core::Error::Other(
267 "WASM memory too small for incoming message".to_string(),
268 ));
269 }
270
271 memory.write(&mut store, 0, bytes).map_err(|e| {
272 aerosocket_core::Error::Other(format!(
273 "Failed to write to WASM memory: {}",
274 e
275 ))
276 })?;
277
278 let out_len =
279 func.call(&mut store, (0, bytes.len() as i32))
280 .map_err(|e| {
281 aerosocket_core::Error::Other(format!(
282 "WASM `on_message` call failed: {}",
283 e
284 ))
285 })?;
286
287 if out_len > 0 {
288 let mut out = vec![0u8; out_len as usize];
289 memory.read(&mut store, 0, &mut out).map_err(|e| {
290 aerosocket_core::Error::Other(format!(
291 "Failed to read from WASM memory: {}",
292 e
293 ))
294 })?;
295
296 let out_text = String::from_utf8_lossy(&out).to_string();
297 conn.send_text(&out_text).await?;
298 }
299 }
300 Message::Binary(data) => {
301 conn.send_binary(data.as_bytes().to_vec()).await?;
302 }
303 Message::Ping(_) => {
304 conn.pong(None).await?;
305 }
306 Message::Close(close_msg) => {
307 let code = close_msg.code();
308 let reason = close_msg.reason();
309 conn.close(code, Some(reason)).await?;
310 break;
311 }
312 Message::Pong(_) => {}
313 }
314 }
315
316 Ok(())
317 })
318 }
319
320 fn clone_box(&self) -> Box<dyn Handler> {
321 Box::new(self.clone())
322 }
323}
324
325#[cfg(test)]
326mod tests {
327 use super::*;
328
329 #[tokio::test]
330 async fn test_default_handler() {
331 let _handler = DefaultHandler::new();
332 let remote = "127.0.0.1:12345".parse().unwrap();
333 let local = "127.0.0.1:8080".parse().unwrap();
334 let _connection = crate::connection::Connection::new(remote, local);
335
336 }
339
340 #[tokio::test]
341 async fn test_echo_handler() {
342 let _handler = EchoHandler::new();
343 let remote = "127.0.0.1:12345".parse().unwrap();
344 let local = "127.0.0.1:8080".parse().unwrap();
345 let _connection = crate::connection::Connection::new(remote, local);
346
347 }
350
351 #[tokio::test]
352 async fn test_fn_handler() {
353 let _handler = from_fn(|_conn| async { Ok(()) });
354
355 let remote = "127.0.0.1:12345".parse().unwrap();
356 let local = "127.0.0.1:8080".parse().unwrap();
357 let _connection = crate::connection::Connection::new(remote, local);
358
359 }
362}