covert_framework/
sync_service.rs1use std::task::Poll;
2
3use covert_types::error::ApiError;
4use futures::future::BoxFuture;
5use tokio::sync::{mpsc, oneshot};
6use tower::{Service, ServiceExt};
7
8struct Message<Req, Res, Err> {
9 request: Req,
10 tx: oneshot::Sender<Result<Res, Err>>,
11}
12
13#[derive(Debug)]
14pub struct SyncService<Req, Res> {
15 tx: mpsc::UnboundedSender<Message<Req, Res, ApiError>>,
16}
17
18impl<Req, Res> Clone for SyncService<Req, Res> {
19 fn clone(&self) -> Self {
20 Self {
21 tx: self.tx.clone(),
22 }
23 }
24}
25
26impl<Req, Res> SyncService<Req, Res> {
27 pub fn new<T>(service: T) -> Self
28 where
29 T: Service<Req, Response = Res, Error = ApiError> + Send + Clone + 'static,
30 T::Future: Send,
31 Req: Send + 'static,
32 Res: Send + 'static,
33 {
34 let (tx, mut rx) = mpsc::unbounded_channel::<Message<Req, Res, ApiError>>();
35
36 tokio::spawn(async move {
37 while let Some(message) = rx.recv().await {
38 let svc = service.clone();
39 tokio::spawn(async move {
41 let resp = svc.oneshot(message.request).await;
42 if message.tx.send(resp).is_err() {
43 tracing::error!(
44 "Failed to notify sync service of the response from the worker"
45 );
46 }
47 });
48 }
49 });
50
51 Self { tx }
52 }
53}
54
55impl<Req, Res> Service<Req> for SyncService<Req, Res>
56where
57 Req: Send + 'static,
58 Res: Send + 'static,
59{
60 type Response = Res;
61
62 type Error = ApiError;
63
64 type Future = BoxFuture<'static, Result<Res, ApiError>>;
65
66 fn poll_ready(
67 &mut self,
68 _cx: &mut std::task::Context<'_>,
69 ) -> std::task::Poll<Result<(), Self::Error>> {
70 Poll::Ready(Ok(()))
71 }
72
73 fn call(&mut self, req: Req) -> Self::Future {
74 let this = self.clone();
75 Box::pin(async move {
76 let (tx, rx) = oneshot::channel();
77 this.tx
78 .send(Message { request: req, tx })
79 .map_err(|_| ApiError::internal_error())?;
80
81 rx.await.map_err(|_| ApiError::internal_error())?
82 })
83 }
84}
85
86trait Handler: Send + Sync {}
89
90impl<Req: Send, Res: Send> Handler for SyncService<Req, Res> {}
91
92struct PostgresBackend(SyncService<hyper::Request<hyper::Body>, hyper::Response<hyper::Body>>);
93
94impl Handler for PostgresBackend {}