server_fn/request/
actix.rs

1use crate::{
2    error::{FromServerFnError, IntoAppError, ServerFnErrorErr},
3    request::Req,
4    response::actix::ActixResponse,
5};
6use actix_web::{web::Payload, HttpRequest};
7use actix_ws::Message;
8use bytes::Bytes;
9use futures::{FutureExt, Stream, StreamExt};
10use send_wrapper::SendWrapper;
11use std::{borrow::Cow, future::Future};
12
13/// A wrapped Actix request.
14///
15/// This uses a [`SendWrapper`] that allows the Actix `HttpRequest` type to be `Send`, but panics
16/// if it it is ever sent to another thread. Actix pins request handling to a single thread, so this
17/// is necessary to be compatible with traits that require `Send` but should never panic in actual use.
18pub struct ActixRequest(pub(crate) SendWrapper<(HttpRequest, Payload)>);
19
20impl ActixRequest {
21    /// Returns the raw Actix request, and its body.
22    pub fn take(self) -> (HttpRequest, Payload) {
23        self.0.take()
24    }
25
26    fn header(&self, name: &str) -> Option<Cow<'_, str>> {
27        self.0
28             .0
29            .headers()
30            .get(name)
31            .map(|h| String::from_utf8_lossy(h.as_bytes()))
32    }
33}
34
35impl From<(HttpRequest, Payload)> for ActixRequest {
36    fn from(value: (HttpRequest, Payload)) -> Self {
37        ActixRequest(SendWrapper::new(value))
38    }
39}
40
41impl<Error, InputStreamError, OutputStreamError>
42    Req<Error, InputStreamError, OutputStreamError> for ActixRequest
43where
44    Error: FromServerFnError + Send,
45    InputStreamError: FromServerFnError + Send,
46    OutputStreamError: FromServerFnError + Send,
47{
48    type WebsocketResponse = ActixResponse;
49
50    fn as_query(&self) -> Option<&str> {
51        self.0 .0.uri().query()
52    }
53
54    fn to_content_type(&self) -> Option<Cow<'_, str>> {
55        self.header("Content-Type")
56    }
57
58    fn accepts(&self) -> Option<Cow<'_, str>> {
59        self.header("Accept")
60    }
61
62    fn referer(&self) -> Option<Cow<'_, str>> {
63        self.header("Referer")
64    }
65
66    fn try_into_bytes(
67        self,
68    ) -> impl Future<Output = Result<Bytes, Error>> + Send {
69        // Actix is going to keep this on a single thread anyway so it's fine to wrap it
70        // with SendWrapper, which makes it `Send` but will panic if it moves to another thread
71        SendWrapper::new(async move {
72            let payload = self.0.take().1;
73            payload.to_bytes().await.map_err(|e| {
74                ServerFnErrorErr::Deserialization(e.to_string())
75                    .into_app_error()
76            })
77        })
78    }
79
80    fn try_into_string(
81        self,
82    ) -> impl Future<Output = Result<String, Error>> + Send {
83        // Actix is going to keep this on a single thread anyway so it's fine to wrap it
84        // with SendWrapper, which makes it `Send` but will panic if it moves to another thread
85        SendWrapper::new(async move {
86            let payload = self.0.take().1;
87            let bytes = payload.to_bytes().await.map_err(|e| {
88                Error::from_server_fn_error(ServerFnErrorErr::Deserialization(
89                    e.to_string(),
90                ))
91            })?;
92            String::from_utf8(bytes.into()).map_err(|e| {
93                Error::from_server_fn_error(ServerFnErrorErr::Deserialization(
94                    e.to_string(),
95                ))
96            })
97        })
98    }
99
100    fn try_into_stream(
101        self,
102    ) -> Result<impl Stream<Item = Result<Bytes, Bytes>> + Send, Error> {
103        let payload = self.0.take().1;
104        let stream = payload.map(|res| {
105            res.map_err(|e| {
106                Error::from_server_fn_error(ServerFnErrorErr::Deserialization(
107                    e.to_string(),
108                ))
109                .ser()
110            })
111        });
112        Ok(SendWrapper::new(stream))
113    }
114
115    async fn try_into_websocket(
116        self,
117    ) -> Result<
118        (
119            impl Stream<Item = Result<Bytes, Bytes>> + Send + 'static,
120            impl futures::Sink<Bytes> + Send + 'static,
121            Self::WebsocketResponse,
122        ),
123        Error,
124    > {
125        let (request, payload) = self.0.take();
126        let (response, mut session, mut msg_stream) =
127            actix_ws::handle(&request, payload).map_err(|e| {
128                Error::from_server_fn_error(ServerFnErrorErr::Request(
129                    e.to_string(),
130                ))
131            })?;
132
133        let (mut response_stream_tx, response_stream_rx) =
134            futures::channel::mpsc::channel(2048);
135        let (response_sink_tx, mut response_sink_rx) =
136            futures::channel::mpsc::channel::<Bytes>(2048);
137
138        actix_web::rt::spawn(async move {
139            loop {
140                futures::select! {
141                    incoming = response_sink_rx.next() => {
142                        let Some(incoming) = incoming else {
143                            break;
144                        };
145                                if let Err(err) = session.binary(incoming).await {
146                                    _ = response_stream_tx.start_send(Err(InputStreamError::from_server_fn_error(ServerFnErrorErr::Request(err.to_string())).ser()));
147                                }
148                    },
149                    outgoing = msg_stream.next().fuse() => {
150                        let Some(outgoing) = outgoing else {
151                            break;
152                        };
153                        match outgoing {
154                            Ok(Message::Ping(bytes)) => {
155                                if session.pong(&bytes).await.is_err() {
156                                    break;
157                                }
158                            }
159                            Ok(Message::Binary(bytes)) => {
160                                _ = response_stream_tx
161                                    .start_send(
162                                        Ok(bytes),
163                                    );
164                            }
165                            Ok(Message::Text(text)) => {
166                                _ = response_stream_tx.start_send(Ok(text.into_bytes()));
167                            }
168                            Ok(Message::Close(_)) => {
169                                break;
170                            }
171                            Ok(_other) => {
172                            }
173                            Err(e) => {
174                                _ = response_stream_tx.start_send(Err(InputStreamError::from_server_fn_error(ServerFnErrorErr::Response(e.to_string())).ser()));
175                            }
176                        }
177                    }
178                }
179            }
180            let _ = session.close(None).await;
181        });
182
183        Ok((
184            response_stream_rx,
185            response_sink_tx,
186            ActixResponse::from(response),
187        ))
188    }
189}