1use std::{marker::PhantomData, sync::Arc};
111
112use actix::{prelude::SendError, Actor, AsyncContext, Recipient, StreamHandler};
113use actix_web::{web, Error, Handler, HttpRequest, HttpResponse};
114use actix_web_actors::ws::{self, CloseCode, CloseReason};
115use sod::{MutService, Service};
116
117use crate::sealed::SettableFuture;
118
119pub struct WsSessionFactory<O, S, F, E>
124where
125 O: Into<Option<WsMessage>> + Unpin + 'static,
126 S: MutService<Input = WsSessionEvent, Output = O> + Unpin + 'static,
127 F: Fn(&HttpRequest) -> Result<S, Error> + 'static,
128 E: Fn(&mut S, S::Error) -> Result<(), S::Error> + Unpin + 'static,
129{
130 factory: Arc<F>,
131 error_handler: Arc<E>,
132 _phantom: PhantomData<fn(O, S)>,
133}
134impl<O, S, F, E> WsSessionFactory<O, S, F, E>
135where
136 O: Into<Option<WsMessage>> + Unpin + 'static,
137 S: MutService<Input = WsSessionEvent, Output = O> + Unpin + 'static,
138 F: Fn(&HttpRequest) -> Result<S, Error> + 'static,
139 E: Fn(&mut S, S::Error) -> Result<(), S::Error> + Unpin + 'static,
140{
141 pub fn new(factory: F, error_handler: E) -> Self {
145 Self {
146 factory: Arc::new(factory),
147 error_handler: Arc::new(error_handler),
148 _phantom: PhantomData,
149 }
150 }
151}
152impl<O, S, F, E> Clone for WsSessionFactory<O, S, F, E>
153where
154 O: Into<Option<WsMessage>> + Unpin + 'static,
155 S: MutService<Input = WsSessionEvent, Output = O> + Unpin + 'static,
156 F: Fn(&HttpRequest) -> Result<S, Error> + 'static,
157 E: Fn(&mut S, S::Error) -> Result<(), S::Error> + Unpin + 'static,
158{
159 fn clone(&self) -> Self {
160 Self {
161 factory: Arc::clone(&self.factory),
162 error_handler: Arc::clone(&self.error_handler),
163 _phantom: PhantomData,
164 }
165 }
166}
167impl<O, S, F, E> Handler<(HttpRequest, web::Payload)> for WsSessionFactory<O, S, F, E>
168where
169 O: Into<Option<WsMessage>> + Unpin + 'static,
170 S: MutService<Input = WsSessionEvent, Output = O> + Unpin + 'static,
171 F: Fn(&HttpRequest) -> Result<S, Error> + 'static,
172 E: Fn(&mut S, S::Error) -> Result<(), S::Error> + Unpin + 'static,
173{
174 type Output = Result<HttpResponse, Error>;
175 type Future = SettableFuture<Result<HttpResponse, Error>>;
176 fn call(&self, (req, stream): (HttpRequest, web::Payload)) -> Self::Future {
177 let result = match (self.factory)(&req) {
178 Ok(service) => ws::start(
179 WsActor::new(service, Arc::clone(&self.error_handler)),
180 &req,
181 stream,
182 ),
183 Err(err) => Err(err),
184 };
185 SettableFuture::new().set(result)
186 }
187}
188
189#[derive(Debug)]
195pub struct WsSendService {
196 recipient: Recipient<WsMessage>,
197}
198impl WsSendService {
199 fn new(recipient: Recipient<WsMessage>) -> Self {
200 Self { recipient }
201 }
202}
203impl Service for WsSendService {
204 type Input = WsMessage;
205 type Output = ();
206 type Error = SendError<WsMessage>;
207 fn process(&self, msg: WsMessage) -> Result<Self::Output, Self::Error> {
208 self.recipient.try_send(msg)
209 }
210}
211
212#[derive(Debug)]
219pub enum WsSessionEvent {
220 Started(WsSendService),
221 Message(WsMessage),
222 Error(ws::ProtocolError),
223 Stopped,
224}
225impl WsSessionEvent {
226 fn from_actix_result(result: Result<ws::Message, ws::ProtocolError>) -> Option<Self> {
227 match result {
228 Ok(message) => match WsMessage::from_actix_ws_message(message) {
229 None => None,
230 Some(message) => Some(WsSessionEvent::Message(message)),
231 },
232 Err(err) => Some(Self::Error(err)),
233 }
234 }
235}
236
237#[derive(Debug)]
239pub enum WsMessage {
240 Ping(Vec<u8>),
241 Pong(Vec<u8>),
242 Binary(Vec<u8>),
243 Text(String),
244 Close(Option<CloseReason>),
245}
246impl WsMessage {
247 fn from_actix_ws_message(src: ws::Message) -> Option<Self> {
248 match src {
249 ws::Message::Binary(data) => Some(Self::Binary(data.into())),
250 ws::Message::Ping(data) => Some(Self::Ping(data.into())),
251 ws::Message::Pong(data) => Some(Self::Pong(data.into())),
252 ws::Message::Close(reason) => Some(Self::Close(reason)),
253 ws::Message::Text(text) => Some(Self::Text(text.into())),
254 ws::Message::Continuation(_) => None,
255 ws::Message::Nop => None,
256 }
257 }
258}
259impl From<WsMessage> for ws::Message {
260 fn from(value: WsMessage) -> Self {
261 match value {
262 WsMessage::Ping(data) => ws::Message::Ping(data.into()),
263 WsMessage::Pong(data) => ws::Message::Pong(data.into()),
264 WsMessage::Binary(data) => ws::Message::Binary(data.into()),
265 WsMessage::Text(text) => ws::Message::Text(text.into()),
266 WsMessage::Close(reason) => ws::Message::Close(reason),
267 }
268 }
269}
270impl actix::Message for WsMessage {
271 type Result = ();
272}
273
274struct WsActor<O, S, E>
276where
277 O: Unpin + 'static,
278 S: MutService<Input = WsSessionEvent, Output = O> + Unpin + 'static,
279 E: Fn(&mut S, S::Error) -> Result<(), S::Error> + Unpin + 'static,
280{
281 service: S,
282 error_handler: Arc<E>,
283 _phantom: PhantomData<fn(O)>,
284}
285impl<O, S, E> WsActor<O, S, E>
286where
287 O: Into<Option<WsMessage>> + Unpin + 'static,
288 S: MutService<Input = WsSessionEvent, Output = O> + Unpin + 'static,
289 E: Fn(&mut S, S::Error) -> Result<(), S::Error> + Unpin + 'static,
290{
291 fn new(service: S, error_handler: Arc<E>) -> Self {
292 Self {
293 service,
294 error_handler,
295 _phantom: PhantomData,
296 }
297 }
298}
299impl<O, S, E> Actor for WsActor<O, S, E>
300where
301 O: Into<Option<WsMessage>> + Unpin + 'static,
302 S: MutService<Input = WsSessionEvent, Output = O> + Unpin + 'static,
303 E: Fn(&mut S, S::Error) -> Result<(), S::Error> + Unpin + 'static,
304{
305 type Context = ws::WebsocketContext<Self>;
306 fn started(&mut self, ctx: &mut Self::Context) {
307 match self
308 .service
309 .process(WsSessionEvent::Started(WsSendService::new(
310 ctx.address().recipient(),
311 ))) {
312 Ok(send) => {
313 if let Some(send) = send.into() {
314 ctx.write_raw(send.into());
315 }
316 }
317 Err(err) => {
318 if let Err(_) = (self.error_handler)(&mut self.service, err) {
319 ctx.close(Some(CloseReason::from(CloseCode::Error)));
320 }
321 }
322 }
323 }
324 fn stopped(&mut self, _ctx: &mut Self::Context) {
325 if let Err(err) = self.service.process(WsSessionEvent::Stopped) {
326 (self.error_handler)(&mut self.service, err).ok();
327 }
328 }
329}
330impl<O, S, E> actix::Handler<WsMessage> for WsActor<O, S, E>
331where
332 O: Into<Option<WsMessage>> + Unpin + 'static,
333 S: MutService<Input = WsSessionEvent, Output = O> + Unpin + 'static,
334 E: Fn(&mut S, S::Error) -> Result<(), S::Error> + Unpin + 'static,
335{
336 type Result = ();
337 fn handle(&mut self, msg: WsMessage, ctx: &mut Self::Context) -> Self::Result {
338 match msg {
339 WsMessage::Ping(data) => ctx.ping(&data),
340 WsMessage::Pong(data) => ctx.pong(&data),
341 WsMessage::Binary(data) => ctx.binary(data),
342 WsMessage::Text(text) => ctx.text(text),
343 WsMessage::Close(reason) => ctx.close(reason),
344 }
345 }
346}
347impl<O, S, E> StreamHandler<Result<ws::Message, ws::ProtocolError>> for WsActor<O, S, E>
348where
349 O: Into<Option<WsMessage>> + Unpin + 'static,
350 S: MutService<Input = WsSessionEvent, Output = O> + Unpin + 'static,
351 E: Fn(&mut S, S::Error) -> Result<(), S::Error> + Unpin + 'static,
352{
353 fn handle(&mut self, msg: Result<ws::Message, ws::ProtocolError>, ctx: &mut Self::Context) {
354 if let Some(msg) = WsSessionEvent::from_actix_result(msg) {
355 if let WsSessionEvent::Message(WsMessage::Ping(data)) = &msg {
356 ctx.pong(data);
357 }
358 match self.service.process(msg) {
359 Ok(send) => {
360 if let Some(send) = send.into() {
361 ctx.write_raw(send.into());
362 }
363 }
364 Err(err) => {
365 if let Err(_) = (self.error_handler)(&mut self.service, err) {
366 ctx.close(Some(CloseCode::Error.into()));
367 }
368 }
369 }
370 }
371 }
372}