biurs_core/
server.rs

1use std::{pin::Pin, sync::Arc};
2
3use crate::proto;
4
5#[derive(Debug, thiserror::Error)]
6pub enum ServerError {
7    #[error(transparent)]
8    Type(#[from] crate::types::TypeError),
9    #[error(transparent)]
10    Io(#[from] std::io::Error),
11    #[error("Expected message")]
12    MessageExpected,
13    #[error("Unexpected message type")]
14    UnexpectedMessageType,
15    #[error("Channel send error: {0}")]
16    Send(String),
17    #[error(transparent)]
18    Json(#[from] serde_json::Error),
19    #[error("Unauthorized")]
20    Unauthorized,
21    #[error(transparent)]
22    SystemTime(#[from] std::time::SystemTimeError),
23    #[error(transparent)]
24    Jwt(#[from] jsonwebtoken::errors::Error),
25}
26
27impl From<ServerError> for tonic::Status {
28    fn from(value: ServerError) -> Self {
29        match value {
30            ServerError::Type(_) | ServerError::Json(_) => {
31                tonic::Status::invalid_argument(value.to_string())
32            }
33            ServerError::Io(_)
34            | ServerError::MessageExpected
35            | ServerError::UnexpectedMessageType
36            | ServerError::Send(_)
37            | ServerError::Jwt(_)
38            | ServerError::SystemTime(_) => tonic::Status::internal(value.to_string()),
39            ServerError::Unauthorized => tonic::Status::unauthenticated(value.to_string()),
40        }
41    }
42}
43
44#[async_trait::async_trait]
45pub trait Server {
46    async fn auth(
47        &self,
48        mut rx_req: tokio::sync::mpsc::Receiver<crate::types::auth::AuthenticateRequest>,
49        tx_res: tokio::sync::mpsc::Sender<crate::types::auth::AuthenticateResponse>,
50    ) -> Result<(), ServerError>;
51
52    async fn status(
53        &self,
54        request: crate::types::status::StatusRequest,
55        auth_token: Option<String>,
56    ) -> Result<crate::types::status::StatusResponse, ServerError>;
57
58    async fn list(
59        &self,
60        auth_token: Option<String>,
61    ) -> Result<Vec<crate::types::Metadata>, ServerError>;
62
63    async fn download(
64        &self,
65        request: crate::types::download::DownloadRequest,
66        auth_token: Option<String>,
67    ) -> Result<tokio::sync::mpsc::Receiver<crate::types::download::DownloadResponse>, ServerError>;
68
69    async fn upload(
70        &self,
71        mut rx: tokio::sync::mpsc::Receiver<crate::types::upload::UploadRequest>,
72        auth_token: Option<String>,
73    ) -> Result<(), ServerError>;
74}
75
76pub struct BiursServer {
77    server_impl: Arc<dyn Server + Send + Sync + 'static>,
78}
79
80impl BiursServer {
81    pub fn new(server: impl Server + Send + Sync + 'static) -> Self {
82        Self {
83            server_impl: Arc::new(server),
84        }
85    }
86}
87
88#[async_trait::async_trait]
89impl proto::biurs_v1::back_it_up_server::BackItUp for BiursServer {
90    type AuthenticateStream = Pin<
91        Box<
92            dyn futures::Stream<Item = Result<proto::biurs_v1::AuthenticateResponse, tonic::Status>>
93                + Send,
94        >,
95    >;
96
97    type DownloadStream = Pin<
98        Box<
99            dyn futures::Stream<Item = Result<proto::biurs_v1::DownloadResponse, tonic::Status>>
100                + Send,
101        >,
102    >;
103
104    async fn authenticate(
105        &self,
106        request: tonic::Request<tonic::Streaming<proto::biurs_v1::AuthenticateRequest>>,
107    ) -> Result<tonic::Response<Self::AuthenticateStream>, tonic::Status> {
108        use tokio_stream::StreamExt;
109
110        let mut request_stream = request.into_inner();
111        let (tx_req, rx_req) =
112            tokio::sync::mpsc::channel::<crate::types::auth::AuthenticateRequest>(64);
113        let (tx_res, rx_res) =
114            tokio::sync::mpsc::channel::<crate::types::auth::AuthenticateResponse>(64);
115
116        tokio::spawn(async move {
117            while let Some(result) = request_stream.next().await {
118                let result = result?;
119                let result: crate::types::auth::AuthenticateRequest = result.try_into()?;
120                tx_req
121                    .send(result)
122                    .await
123                    .map_err(|err| tonic::Status::internal(err.to_string()))?;
124            }
125            Ok::<(), tonic::Status>(())
126        });
127
128        let server_impl = self.server_impl.clone();
129        tokio::spawn(async move {
130            server_impl.auth(rx_req, tx_res).await?;
131            Ok::<(), tonic::Status>(())
132        });
133
134        let stream = tokio_stream::wrappers::ReceiverStream::new(rx_res);
135        Ok(tonic::Response::new(
136            Box::pin(stream.map(|i| Ok(i.into()))) as Self::AuthenticateStream
137        ))
138    }
139
140    async fn status(
141        &self,
142        request: tonic::Request<proto::biurs_v1::StatusRequest>,
143    ) -> Result<tonic::Response<proto::biurs_v1::StatusResponse>, tonic::Status> {
144        let auth_token = request
145            .metadata()
146            .get("authentication")
147            .map(|value| value.to_str())
148            .transpose()
149            .map_err(|err| tonic::Status::internal(err.to_string()))?
150            .map(|s| s.to_owned());
151
152        let request = request.into_inner().try_into()?;
153        let response = self.server_impl.status(request, auth_token).await?;
154        Ok(tonic::Response::new(response.into()))
155    }
156
157    async fn list(
158        &self,
159        request: tonic::Request<proto::biurs_v1::ListRequest>,
160    ) -> Result<tonic::Response<proto::biurs_v1::ListResponse>, tonic::Status> {
161        let auth_token = request
162            .metadata()
163            .get("authentication")
164            .map(|value| value.to_str())
165            .transpose()
166            .map_err(|err| tonic::Status::internal(err.to_string()))?
167            .map(|s| s.to_owned());
168
169        let response = self.server_impl.list(auth_token).await?;
170        Ok(tonic::Response::new(response.into()))
171    }
172
173    async fn download(
174        &self,
175        request: tonic::Request<proto::biurs_v1::DownloadRequest>,
176    ) -> Result<tonic::Response<Self::DownloadStream>, tonic::Status> {
177        use tokio_stream::StreamExt;
178
179        let auth_token = request
180            .metadata()
181            .get("authentication")
182            .map(|value| value.to_str())
183            .transpose()
184            .map_err(|err| tonic::Status::internal(err.to_string()))?
185            .map(|s| s.to_owned());
186
187        let request = request.into_inner().into();
188        let rx = self.server_impl.download(request, auth_token).await?;
189        let stream = tokio_stream::wrappers::ReceiverStream::new(rx);
190
191        Ok(tonic::Response::new(
192            Box::pin(stream.map(|i| Ok(i.into()))) as Self::DownloadStream
193        ))
194    }
195
196    async fn upload(
197        &self,
198        request: tonic::Request<tonic::Streaming<proto::biurs_v1::UploadRequest>>,
199    ) -> Result<tonic::Response<proto::biurs_v1::UploadResponse>, tonic::Status> {
200        use tokio_stream::StreamExt;
201
202        let auth_token = request
203            .metadata()
204            .get("authentication")
205            .map(|value| value.to_str())
206            .transpose()
207            .map_err(|err| tonic::Status::internal(err.to_string()))?
208            .map(|s| s.to_owned());
209
210        let mut stream = request.into_inner();
211        let (tx, rx) = tokio::sync::mpsc::channel::<crate::types::upload::UploadRequest>(64);
212
213        tokio::spawn(async move {
214            while let Some(result) = stream.next().await {
215                let result = result?;
216                let result: crate::types::upload::UploadRequest = result.try_into()?;
217                tx.send(result)
218                    .await
219                    .map_err(|err| tonic::Status::internal(err.to_string()))?;
220            }
221            Ok::<(), tonic::Status>(())
222        });
223
224        self.server_impl.upload(rx, auth_token).await?;
225
226        Ok(tonic::Response::new(proto::biurs_v1::UploadResponse {}))
227    }
228}