covert_framework/
sync_service.rs

1use std::task::Poll;
2
3use covert_types::error::ApiError;
4use futures::future::BoxFuture;
5use tokio::sync::{mpsc, oneshot};
6use tower::{Service, ServiceExt};
7
8struct Message<Req, Res, Err> {
9    request: Req,
10    tx: oneshot::Sender<Result<Res, Err>>,
11}
12
13#[derive(Debug)]
14pub struct SyncService<Req, Res> {
15    tx: mpsc::UnboundedSender<Message<Req, Res, ApiError>>,
16}
17
18impl<Req, Res> Clone for SyncService<Req, Res> {
19    fn clone(&self) -> Self {
20        Self {
21            tx: self.tx.clone(),
22        }
23    }
24}
25
26impl<Req, Res> SyncService<Req, Res> {
27    pub fn new<T>(service: T) -> Self
28    where
29        T: Service<Req, Response = Res, Error = ApiError> + Send + Clone + 'static,
30        T::Future: Send,
31        Req: Send + 'static,
32        Res: Send + 'static,
33    {
34        let (tx, mut rx) = mpsc::unbounded_channel::<Message<Req, Res, ApiError>>();
35
36        tokio::spawn(async move {
37            while let Some(message) = rx.recv().await {
38                let svc = service.clone();
39                // Ensure that a slow response does not block other requests
40                tokio::spawn(async move {
41                    let resp = svc.oneshot(message.request).await;
42                    if message.tx.send(resp).is_err() {
43                        tracing::error!(
44                            "Failed to notify sync service of the response from the worker"
45                        );
46                    }
47                });
48            }
49        });
50
51        Self { tx }
52    }
53}
54
55impl<Req, Res> Service<Req> for SyncService<Req, Res>
56where
57    Req: Send + 'static,
58    Res: Send + 'static,
59{
60    type Response = Res;
61
62    type Error = ApiError;
63
64    type Future = BoxFuture<'static, Result<Res, ApiError>>;
65
66    fn poll_ready(
67        &mut self,
68        _cx: &mut std::task::Context<'_>,
69    ) -> std::task::Poll<Result<(), Self::Error>> {
70        Poll::Ready(Ok(()))
71    }
72
73    fn call(&mut self, req: Req) -> Self::Future {
74        let this = self.clone();
75        Box::pin(async move {
76            let (tx, rx) = oneshot::channel();
77            this.tx
78                .send(Message { request: req, tx })
79                .map_err(|_| ApiError::internal_error())?;
80
81            rx.await.map_err(|_| ApiError::internal_error())?
82        })
83    }
84}
85
86// THE TEST
87
88trait Handler: Send + Sync {}
89
90impl<Req: Send, Res: Send> Handler for SyncService<Req, Res> {}
91
92struct PostgresBackend(SyncService<hyper::Request<hyper::Body>, hyper::Response<hyper::Body>>);
93
94impl Handler for PostgresBackend {}