pub struct DuplexService<Request, Response, S: Service<ServiceRequest>, ServiceRequest> { /* private fields */ }Expand description
A tower::Service that implements a server and a client simultaneously over a bi-directional
channel. As a server it is able to process RPC calls from a remote client, and as a client it is
capable of making RPC calls into a remote server. It is very convinient in a system that
requires asynchronous communication in both directions.
Implementations§
Source§impl<Request, Response, S: Service<ServiceRequest>, ServiceRequest> DuplexService<Request, Response, S, ServiceRequest>
impl<Request, Response, S: Service<ServiceRequest>, ServiceRequest> DuplexService<Request, Response, S, ServiceRequest>
Sourcepub fn new_pair(service: S) -> (Self, DuplexClient<Request, Response>)
pub fn new_pair(service: S) -> (Self, DuplexClient<Request, Response>)
Create a new server instance, with an associated client handle. The server stops if all the
client handles are dropped. To start the server use the [run] or [run_with] methods.
§Example
use core::task::{Context, Poll};
use tower_duplex::DuplexService;
/// A Service that converts requests to lower or upper case
enum ChangeCase {
ToLower,
ToUpper,
}
impl tower::Service<String> for ChangeCase {
type Response = String;
type Error = ();
type Future = std::pin::Pin<
Box<dyn std::future::Future<Output = Result<Self::Response, Self::Error>> + Send>,
>;
fn poll_ready(&mut self, _: &mut Context) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, req: String) -> Self::Future {
let to_upper = matches!(self, ChangeCase::ToUpper);
Box::pin(async move {
if to_upper {
Ok(req.to_uppercase())
} else {
Ok(req.to_lowercase())
}
})
}
}
let (server, client): (DuplexService<String, String, _, _>, _) =
DuplexService::new_pair(ChangeCase::ToUpper);Sourcepub async fn run_with<Rcv, Snd, RcvErr, SndErr>(
self,
receiver: Rcv,
sender: Snd,
) -> Result<(), DuplexError<RcvErr, SndErr>>where
Rcv: TryStream<Ok = DuplexValue<ServiceRequest, Response>, Error = RcvErr> + Unpin,
Snd: Sink<DuplexValue<Request, <<S as Service<ServiceRequest>>::Future as Future>::Output>, Error = SndErr> + Unpin,
pub async fn run_with<Rcv, Snd, RcvErr, SndErr>(
self,
receiver: Rcv,
sender: Snd,
) -> Result<(), DuplexError<RcvErr, SndErr>>where
Rcv: TryStream<Ok = DuplexValue<ServiceRequest, Response>, Error = RcvErr> + Unpin,
Snd: Sink<DuplexValue<Request, <<S as Service<ServiceRequest>>::Future as Future>::Output>, Error = SndErr> + Unpin,
Source§impl<Request, Response, S: Service<ServiceRequest>, ServiceRequest> DuplexService<Request, Response, S, ServiceRequest>where
for<'de> ServiceRequest: Serialize + Deserialize<'de>,
for<'de> <<S as Service<ServiceRequest>>::Future as Future>::Output: Serialize + Deserialize<'de>,
for<'de> Request: Serialize + Deserialize<'de>,
for<'de> Response: Serialize + Deserialize<'de>,
impl<Request, Response, S: Service<ServiceRequest>, ServiceRequest> DuplexService<Request, Response, S, ServiceRequest>where
for<'de> ServiceRequest: Serialize + Deserialize<'de>,
for<'de> <<S as Service<ServiceRequest>>::Future as Future>::Output: Serialize + Deserialize<'de>,
for<'de> Request: Serialize + Deserialize<'de>,
for<'de> Response: Serialize + Deserialize<'de>,
Sourcepub async fn run<R: AsyncRead + Unpin, W: AsyncWrite + Unpin>(
self,
reader: R,
writer: W,
) -> Result<(), DuplexError<Error, Error>>
pub async fn run<R: AsyncRead + Unpin, W: AsyncWrite + Unpin>( self, reader: R, writer: W, ) -> Result<(), DuplexError<Error, Error>>
Run the server loop with the provided AsyncRead and AsyncWrite. The server loop
serves remote RPC calls, and handles local RPC calls from client handles.
§Example
use core::task::{Context, Poll};
use tokio::sync::mpsc;
use tower::Service;
use tower_duplex::DuplexService;
/// A Service that converts requests to lower or upper case
enum ChangeCase {
ToLower,
ToUpper,
}
impl tower::Service<String> for ChangeCase {
type Response = String;
type Error = ();
type Future = std::pin::Pin<
Box<dyn std::future::Future<Output = Result<Self::Response, Self::Error>> + Send>,
>;
fn poll_ready(&mut self, _: &mut Context) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, req: String) -> Self::Future {
let to_upper = matches!(self, ChangeCase::ToUpper);
Box::pin(async move {
if to_upper {
Ok(req.to_uppercase())
} else {
Ok(req.to_lowercase())
}
})
}
}
#[tokio::main]
async fn main() {
// `server1` handles serves requests from `server2` and converts strings to upper case.
// It also forwards requests from `client1` to `server2`.
let (server1, mut client1): (DuplexService<String, Result<String, ()>, _, _>, _) =
DuplexService::new_pair(ChangeCase::ToUpper);
// `server2` handles serves requests from `server1` and converts strings to lower case.
// It also forwards requests from `client2` to `server1`.
let (server2, mut client2): (DuplexService<String, Result<String, ()>, _, _>, _) =
DuplexService::new_pair(ChangeCase::ToLower);
let ((r1, w1), (r2, w2)) = tokio::net::UnixStream::pair()
.map(|(a, b)| (a.into_split(), b.into_split()))
.unwrap();
tokio::spawn(server1.run(r1, w1));
tokio::spawn(server2.run(r2, w2));
assert_eq!(
client2.call("String".to_string()).await.unwrap().unwrap(),
"STRING"
);
assert_eq!(
client1.call("String".to_string()).await.unwrap().unwrap(),
"string"
);
}Auto Trait Implementations§
impl<Request, Response, S, ServiceRequest> Freeze for DuplexService<Request, Response, S, ServiceRequest>where
S: Freeze,
impl<Request, Response, S, ServiceRequest> RefUnwindSafe for DuplexService<Request, Response, S, ServiceRequest>where
S: RefUnwindSafe,
ServiceRequest: RefUnwindSafe,
impl<Request, Response, S, ServiceRequest> Send for DuplexService<Request, Response, S, ServiceRequest>
impl<Request, Response, S, ServiceRequest> Sync for DuplexService<Request, Response, S, ServiceRequest>
impl<Request, Response, S, ServiceRequest> Unpin for DuplexService<Request, Response, S, ServiceRequest>
impl<Request, Response, S, ServiceRequest> UnwindSafe for DuplexService<Request, Response, S, ServiceRequest>where
S: UnwindSafe,
ServiceRequest: UnwindSafe,
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Mutably borrows from an owned value. Read more