server_fn/request/
actix.rs1use crate::{
2 error::{FromServerFnError, IntoAppError, ServerFnErrorErr},
3 request::Req,
4 response::actix::ActixResponse,
5};
6use actix_web::{web::Payload, HttpRequest};
7use actix_ws::Message;
8use bytes::Bytes;
9use futures::{FutureExt, Stream, StreamExt};
10use send_wrapper::SendWrapper;
11use std::{borrow::Cow, future::Future};
12
13pub struct ActixRequest(pub(crate) SendWrapper<(HttpRequest, Payload)>);
19
20impl ActixRequest {
21 pub fn take(self) -> (HttpRequest, Payload) {
23 self.0.take()
24 }
25
26 fn header(&self, name: &str) -> Option<Cow<'_, str>> {
27 self.0
28 .0
29 .headers()
30 .get(name)
31 .map(|h| String::from_utf8_lossy(h.as_bytes()))
32 }
33}
34
35impl From<(HttpRequest, Payload)> for ActixRequest {
36 fn from(value: (HttpRequest, Payload)) -> Self {
37 ActixRequest(SendWrapper::new(value))
38 }
39}
40
41impl<Error, InputStreamError, OutputStreamError>
42 Req<Error, InputStreamError, OutputStreamError> for ActixRequest
43where
44 Error: FromServerFnError + Send,
45 InputStreamError: FromServerFnError + Send,
46 OutputStreamError: FromServerFnError + Send,
47{
48 type WebsocketResponse = ActixResponse;
49
50 fn as_query(&self) -> Option<&str> {
51 self.0 .0.uri().query()
52 }
53
54 fn to_content_type(&self) -> Option<Cow<'_, str>> {
55 self.header("Content-Type")
56 }
57
58 fn accepts(&self) -> Option<Cow<'_, str>> {
59 self.header("Accept")
60 }
61
62 fn referer(&self) -> Option<Cow<'_, str>> {
63 self.header("Referer")
64 }
65
66 fn try_into_bytes(
67 self,
68 ) -> impl Future<Output = Result<Bytes, Error>> + Send {
69 SendWrapper::new(async move {
72 let payload = self.0.take().1;
73 payload.to_bytes().await.map_err(|e| {
74 ServerFnErrorErr::Deserialization(e.to_string())
75 .into_app_error()
76 })
77 })
78 }
79
80 fn try_into_string(
81 self,
82 ) -> impl Future<Output = Result<String, Error>> + Send {
83 SendWrapper::new(async move {
86 let payload = self.0.take().1;
87 let bytes = payload.to_bytes().await.map_err(|e| {
88 Error::from_server_fn_error(ServerFnErrorErr::Deserialization(
89 e.to_string(),
90 ))
91 })?;
92 String::from_utf8(bytes.into()).map_err(|e| {
93 Error::from_server_fn_error(ServerFnErrorErr::Deserialization(
94 e.to_string(),
95 ))
96 })
97 })
98 }
99
100 fn try_into_stream(
101 self,
102 ) -> Result<impl Stream<Item = Result<Bytes, Bytes>> + Send, Error> {
103 let payload = self.0.take().1;
104 let stream = payload.map(|res| {
105 res.map_err(|e| {
106 Error::from_server_fn_error(ServerFnErrorErr::Deserialization(
107 e.to_string(),
108 ))
109 .ser()
110 })
111 });
112 Ok(SendWrapper::new(stream))
113 }
114
115 async fn try_into_websocket(
116 self,
117 ) -> Result<
118 (
119 impl Stream<Item = Result<Bytes, Bytes>> + Send + 'static,
120 impl futures::Sink<Bytes> + Send + 'static,
121 Self::WebsocketResponse,
122 ),
123 Error,
124 > {
125 let (request, payload) = self.0.take();
126 let (response, mut session, mut msg_stream) =
127 actix_ws::handle(&request, payload).map_err(|e| {
128 Error::from_server_fn_error(ServerFnErrorErr::Request(
129 e.to_string(),
130 ))
131 })?;
132
133 let (mut response_stream_tx, response_stream_rx) =
134 futures::channel::mpsc::channel(2048);
135 let (response_sink_tx, mut response_sink_rx) =
136 futures::channel::mpsc::channel::<Bytes>(2048);
137
138 actix_web::rt::spawn(async move {
139 loop {
140 futures::select! {
141 incoming = response_sink_rx.next() => {
142 let Some(incoming) = incoming else {
143 break;
144 };
145 if let Err(err) = session.binary(incoming).await {
146 _ = response_stream_tx.start_send(Err(InputStreamError::from_server_fn_error(ServerFnErrorErr::Request(err.to_string())).ser()));
147 }
148 },
149 outgoing = msg_stream.next().fuse() => {
150 let Some(outgoing) = outgoing else {
151 break;
152 };
153 match outgoing {
154 Ok(Message::Ping(bytes)) => {
155 if session.pong(&bytes).await.is_err() {
156 break;
157 }
158 }
159 Ok(Message::Binary(bytes)) => {
160 _ = response_stream_tx
161 .start_send(
162 Ok(bytes),
163 );
164 }
165 Ok(Message::Text(text)) => {
166 _ = response_stream_tx.start_send(Ok(text.into_bytes()));
167 }
168 Ok(Message::Close(_)) => {
169 break;
170 }
171 Ok(_other) => {
172 }
173 Err(e) => {
174 _ = response_stream_tx.start_send(Err(InputStreamError::from_server_fn_error(ServerFnErrorErr::Response(e.to_string())).ser()));
175 }
176 }
177 }
178 }
179 }
180 let _ = session.close(None).await;
181 });
182
183 Ok((
184 response_stream_rx,
185 response_sink_tx,
186 ActixResponse::from(response),
187 ))
188 }
189}