DuplexService

Struct DuplexService 

Source
pub struct DuplexService<Request, Response, S: Service<ServiceRequest>, ServiceRequest> { /* private fields */ }
Expand description

A tower::Service that implements a server and a client simultaneously over a bi-directional channel. As a server it is able to process RPC calls from a remote client, and as a client it is capable of making RPC calls into a remote server. It is very convinient in a system that requires asynchronous communication in both directions.

Implementations§

Source§

impl<Request, Response, S: Service<ServiceRequest>, ServiceRequest> DuplexService<Request, Response, S, ServiceRequest>

Source

pub fn new_pair(service: S) -> (Self, DuplexClient<Request, Response>)

Create a new server instance, with an associated client handle. The server stops if all the client handles are dropped. To start the server use the [run] or [run_with] methods.

§Example
use core::task::{Context, Poll};

use tower_duplex::DuplexService;

/// A Service that converts requests to lower or upper case
enum ChangeCase {
    ToLower,
    ToUpper,
}

impl tower::Service<String> for ChangeCase {
    type Response = String;
    type Error = ();
    type Future = std::pin::Pin<
        Box<dyn std::future::Future<Output = Result<Self::Response, Self::Error>> + Send>,
    >;

    fn poll_ready(&mut self, _: &mut Context) -> Poll<Result<(), Self::Error>> {
        Poll::Ready(Ok(()))
    }

    fn call(&mut self, req: String) -> Self::Future {
        let to_upper = matches!(self, ChangeCase::ToUpper);
        Box::pin(async move {
            if to_upper {
                Ok(req.to_uppercase())
            } else {
                Ok(req.to_lowercase())
            }
        })
    }
}

let (server, client): (DuplexService<String, String, _, _>, _) =
    DuplexService::new_pair(ChangeCase::ToUpper);
Source

pub async fn run_with<Rcv, Snd, RcvErr, SndErr>( self, receiver: Rcv, sender: Snd, ) -> Result<(), DuplexError<RcvErr, SndErr>>
where Rcv: TryStream<Ok = DuplexValue<ServiceRequest, Response>, Error = RcvErr> + Unpin, Snd: Sink<DuplexValue<Request, <<S as Service<ServiceRequest>>::Future as Future>::Output>, Error = SndErr> + Unpin,

Run the server loop with the provided TryStream and Sink. The server loop serves remote RPC calls, and handles local RPC calls from client handles.

Source§

impl<Request, Response, S: Service<ServiceRequest>, ServiceRequest> DuplexService<Request, Response, S, ServiceRequest>
where for<'de> ServiceRequest: Serialize + Deserialize<'de>, for<'de> <<S as Service<ServiceRequest>>::Future as Future>::Output: Serialize + Deserialize<'de>, for<'de> Request: Serialize + Deserialize<'de>, for<'de> Response: Serialize + Deserialize<'de>,

Source

pub async fn run<R: AsyncRead + Unpin, W: AsyncWrite + Unpin>( self, reader: R, writer: W, ) -> Result<(), DuplexError<Error, Error>>

Run the server loop with the provided AsyncRead and AsyncWrite. The server loop serves remote RPC calls, and handles local RPC calls from client handles.

§Example
use core::task::{Context, Poll};

use tokio::sync::mpsc;
use tower::Service;
use tower_duplex::DuplexService;

/// A Service that converts requests to lower or upper case
enum ChangeCase {
    ToLower,
    ToUpper,
}

impl tower::Service<String> for ChangeCase {
    type Response = String;
    type Error = ();
    type Future = std::pin::Pin<
        Box<dyn std::future::Future<Output = Result<Self::Response, Self::Error>> + Send>,
    >;

    fn poll_ready(&mut self, _: &mut Context) -> Poll<Result<(), Self::Error>> {
        Poll::Ready(Ok(()))
    }

    fn call(&mut self, req: String) -> Self::Future {
        let to_upper = matches!(self, ChangeCase::ToUpper);
        Box::pin(async move {
            if to_upper {
                Ok(req.to_uppercase())
            } else {
                Ok(req.to_lowercase())
            }
        })
    }
}

#[tokio::main]
async fn main() {
    // `server1` handles serves requests from `server2` and converts strings to upper case.
    // It also forwards requests from `client1` to `server2`.
    let (server1, mut client1): (DuplexService<String, Result<String, ()>, _, _>, _) =
        DuplexService::new_pair(ChangeCase::ToUpper);
    // `server2` handles serves requests from `server1` and converts strings to lower case.
    // It also forwards requests from `client2` to `server1`.
    let (server2, mut client2): (DuplexService<String, Result<String, ()>, _, _>, _) =
        DuplexService::new_pair(ChangeCase::ToLower);

    let ((r1, w1), (r2, w2)) = tokio::net::UnixStream::pair()
        .map(|(a, b)| (a.into_split(), b.into_split()))
        .unwrap();

    tokio::spawn(server1.run(r1, w1));
    tokio::spawn(server2.run(r2, w2));

    assert_eq!(
        client2.call("String".to_string()).await.unwrap().unwrap(),
        "STRING"
    );

    assert_eq!(
        client1.call("String".to_string()).await.unwrap().unwrap(),
        "string"
    );
}

Auto Trait Implementations§

§

impl<Request, Response, S, ServiceRequest> Freeze for DuplexService<Request, Response, S, ServiceRequest>
where S: Freeze,

§

impl<Request, Response, S, ServiceRequest> RefUnwindSafe for DuplexService<Request, Response, S, ServiceRequest>
where S: RefUnwindSafe, ServiceRequest: RefUnwindSafe,

§

impl<Request, Response, S, ServiceRequest> Send for DuplexService<Request, Response, S, ServiceRequest>
where S: Send, ServiceRequest: Send, Request: Send, Response: Send,

§

impl<Request, Response, S, ServiceRequest> Sync for DuplexService<Request, Response, S, ServiceRequest>
where S: Sync, ServiceRequest: Sync, Request: Send, Response: Send,

§

impl<Request, Response, S, ServiceRequest> Unpin for DuplexService<Request, Response, S, ServiceRequest>
where S: Unpin, ServiceRequest: Unpin,

§

impl<Request, Response, S, ServiceRequest> UnwindSafe for DuplexService<Request, Response, S, ServiceRequest>
where S: UnwindSafe, ServiceRequest: UnwindSafe,

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more