1use std::{pin::Pin, sync::Arc};
2
3use crate::proto;
4
5#[derive(Debug, thiserror::Error)]
6pub enum ServerError {
7 #[error(transparent)]
8 Type(#[from] crate::types::TypeError),
9 #[error(transparent)]
10 Io(#[from] std::io::Error),
11 #[error("Expected message")]
12 MessageExpected,
13 #[error("Unexpected message type")]
14 UnexpectedMessageType,
15 #[error("Channel send error: {0}")]
16 Send(String),
17 #[error(transparent)]
18 Json(#[from] serde_json::Error),
19 #[error("Unauthorized")]
20 Unauthorized,
21 #[error(transparent)]
22 SystemTime(#[from] std::time::SystemTimeError),
23 #[error(transparent)]
24 Jwt(#[from] jsonwebtoken::errors::Error),
25}
26
27impl From<ServerError> for tonic::Status {
28 fn from(value: ServerError) -> Self {
29 match value {
30 ServerError::Type(_) | ServerError::Json(_) => {
31 tonic::Status::invalid_argument(value.to_string())
32 }
33 ServerError::Io(_)
34 | ServerError::MessageExpected
35 | ServerError::UnexpectedMessageType
36 | ServerError::Send(_)
37 | ServerError::Jwt(_)
38 | ServerError::SystemTime(_) => tonic::Status::internal(value.to_string()),
39 ServerError::Unauthorized => tonic::Status::unauthenticated(value.to_string()),
40 }
41 }
42}
43
44#[async_trait::async_trait]
45pub trait Server {
46 async fn auth(
47 &self,
48 mut rx_req: tokio::sync::mpsc::Receiver<crate::types::auth::AuthenticateRequest>,
49 tx_res: tokio::sync::mpsc::Sender<crate::types::auth::AuthenticateResponse>,
50 ) -> Result<(), ServerError>;
51
52 async fn status(
53 &self,
54 request: crate::types::status::StatusRequest,
55 auth_token: Option<String>,
56 ) -> Result<crate::types::status::StatusResponse, ServerError>;
57
58 async fn list(
59 &self,
60 auth_token: Option<String>,
61 ) -> Result<Vec<crate::types::Metadata>, ServerError>;
62
63 async fn download(
64 &self,
65 request: crate::types::download::DownloadRequest,
66 auth_token: Option<String>,
67 ) -> Result<tokio::sync::mpsc::Receiver<crate::types::download::DownloadResponse>, ServerError>;
68
69 async fn upload(
70 &self,
71 mut rx: tokio::sync::mpsc::Receiver<crate::types::upload::UploadRequest>,
72 auth_token: Option<String>,
73 ) -> Result<(), ServerError>;
74}
75
76pub struct BiursServer {
77 server_impl: Arc<dyn Server + Send + Sync + 'static>,
78}
79
80impl BiursServer {
81 pub fn new(server: impl Server + Send + Sync + 'static) -> Self {
82 Self {
83 server_impl: Arc::new(server),
84 }
85 }
86}
87
88#[async_trait::async_trait]
89impl proto::biurs_v1::back_it_up_server::BackItUp for BiursServer {
90 type AuthenticateStream = Pin<
91 Box<
92 dyn futures::Stream<Item = Result<proto::biurs_v1::AuthenticateResponse, tonic::Status>>
93 + Send,
94 >,
95 >;
96
97 type DownloadStream = Pin<
98 Box<
99 dyn futures::Stream<Item = Result<proto::biurs_v1::DownloadResponse, tonic::Status>>
100 + Send,
101 >,
102 >;
103
104 async fn authenticate(
105 &self,
106 request: tonic::Request<tonic::Streaming<proto::biurs_v1::AuthenticateRequest>>,
107 ) -> Result<tonic::Response<Self::AuthenticateStream>, tonic::Status> {
108 use tokio_stream::StreamExt;
109
110 let mut request_stream = request.into_inner();
111 let (tx_req, rx_req) =
112 tokio::sync::mpsc::channel::<crate::types::auth::AuthenticateRequest>(64);
113 let (tx_res, rx_res) =
114 tokio::sync::mpsc::channel::<crate::types::auth::AuthenticateResponse>(64);
115
116 tokio::spawn(async move {
117 while let Some(result) = request_stream.next().await {
118 let result = result?;
119 let result: crate::types::auth::AuthenticateRequest = result.try_into()?;
120 tx_req
121 .send(result)
122 .await
123 .map_err(|err| tonic::Status::internal(err.to_string()))?;
124 }
125 Ok::<(), tonic::Status>(())
126 });
127
128 let server_impl = self.server_impl.clone();
129 tokio::spawn(async move {
130 server_impl.auth(rx_req, tx_res).await?;
131 Ok::<(), tonic::Status>(())
132 });
133
134 let stream = tokio_stream::wrappers::ReceiverStream::new(rx_res);
135 Ok(tonic::Response::new(
136 Box::pin(stream.map(|i| Ok(i.into()))) as Self::AuthenticateStream
137 ))
138 }
139
140 async fn status(
141 &self,
142 request: tonic::Request<proto::biurs_v1::StatusRequest>,
143 ) -> Result<tonic::Response<proto::biurs_v1::StatusResponse>, tonic::Status> {
144 let auth_token = request
145 .metadata()
146 .get("authentication")
147 .map(|value| value.to_str())
148 .transpose()
149 .map_err(|err| tonic::Status::internal(err.to_string()))?
150 .map(|s| s.to_owned());
151
152 let request = request.into_inner().try_into()?;
153 let response = self.server_impl.status(request, auth_token).await?;
154 Ok(tonic::Response::new(response.into()))
155 }
156
157 async fn list(
158 &self,
159 request: tonic::Request<proto::biurs_v1::ListRequest>,
160 ) -> Result<tonic::Response<proto::biurs_v1::ListResponse>, tonic::Status> {
161 let auth_token = request
162 .metadata()
163 .get("authentication")
164 .map(|value| value.to_str())
165 .transpose()
166 .map_err(|err| tonic::Status::internal(err.to_string()))?
167 .map(|s| s.to_owned());
168
169 let response = self.server_impl.list(auth_token).await?;
170 Ok(tonic::Response::new(response.into()))
171 }
172
173 async fn download(
174 &self,
175 request: tonic::Request<proto::biurs_v1::DownloadRequest>,
176 ) -> Result<tonic::Response<Self::DownloadStream>, tonic::Status> {
177 use tokio_stream::StreamExt;
178
179 let auth_token = request
180 .metadata()
181 .get("authentication")
182 .map(|value| value.to_str())
183 .transpose()
184 .map_err(|err| tonic::Status::internal(err.to_string()))?
185 .map(|s| s.to_owned());
186
187 let request = request.into_inner().into();
188 let rx = self.server_impl.download(request, auth_token).await?;
189 let stream = tokio_stream::wrappers::ReceiverStream::new(rx);
190
191 Ok(tonic::Response::new(
192 Box::pin(stream.map(|i| Ok(i.into()))) as Self::DownloadStream
193 ))
194 }
195
196 async fn upload(
197 &self,
198 request: tonic::Request<tonic::Streaming<proto::biurs_v1::UploadRequest>>,
199 ) -> Result<tonic::Response<proto::biurs_v1::UploadResponse>, tonic::Status> {
200 use tokio_stream::StreamExt;
201
202 let auth_token = request
203 .metadata()
204 .get("authentication")
205 .map(|value| value.to_str())
206 .transpose()
207 .map_err(|err| tonic::Status::internal(err.to_string()))?
208 .map(|s| s.to_owned());
209
210 let mut stream = request.into_inner();
211 let (tx, rx) = tokio::sync::mpsc::channel::<crate::types::upload::UploadRequest>(64);
212
213 tokio::spawn(async move {
214 while let Some(result) = stream.next().await {
215 let result = result?;
216 let result: crate::types::upload::UploadRequest = result.try_into()?;
217 tx.send(result)
218 .await
219 .map_err(|err| tonic::Status::internal(err.to_string()))?;
220 }
221 Ok::<(), tonic::Status>(())
222 });
223
224 self.server_impl.upload(rx, auth_token).await?;
225
226 Ok(tonic::Response::new(proto::biurs_v1::UploadResponse {}))
227 }
228}