tarpc/
lib.rs

1// Copyright 2018 Google LLC
2//
3// Use of this source code is governed by an MIT-style
4// license that can be found in the LICENSE file or at
5// https://opensource.org/licenses/MIT.
6//! *Disclaimer*: This is not an official Google product.
7//!
8//! tarpc is an RPC framework for rust with a focus on ease of use. Defining a
9//! service can be done in just a few lines of code, and most of the boilerplate of
10//! writing a server is taken care of for you.
11//!
12//! [Documentation](https://docs.rs/crate/tarpc/)
13//!
14//! ## What is an RPC framework?
15//! "RPC" stands for "Remote Procedure Call," a function call where the work of
16//! producing the return value is being done somewhere else. When an rpc function is
17//! invoked, behind the scenes the function contacts some other process somewhere
18//! and asks them to evaluate the function instead. The original function then
19//! returns the value produced by the other process.
20//!
21//! RPC frameworks are a fundamental building block of most microservices-oriented
22//! architectures. Two well-known ones are [gRPC](http://www.grpc.io) and
23//! [Cap'n Proto](https://capnproto.org/).
24//!
25//! tarpc differentiates itself from other RPC frameworks by defining the schema in code,
26//! rather than in a separate language such as .proto. This means there's no separate compilation
27//! process, and no context switching between different languages.
28//!
29//! Some other features of tarpc:
30//! - Pluggable transport: any type implementing `Stream<Item = Request> + Sink<Response>` can be
31//!   used as a transport to connect the client and server.
32//! - `Send + 'static` optional: if the transport doesn't require it, neither does tarpc!
33//! - Cascading cancellation: dropping a request will send a cancellation message to the server.
34//!   The server will cease any unfinished work on the request, subsequently cancelling any of its
35//!   own requests, repeating for the entire chain of transitive dependencies.
36//! - Configurable deadlines and deadline propagation: request deadlines default to 10s if
37//!   unspecified. The server will automatically cease work when the deadline has passed. Any
38//!   requests sent by the server that use the request context will propagate the request deadline.
39//!   For example, if a server is handling a request with a 10s deadline, does 2s of work, then
40//!   sends a request to another server, that server will see an 8s deadline.
41//! - Distributed tracing: tarpc is instrumented with
42//!   [tracing](https://github.com/tokio-rs/tracing) primitives extended with
43//!   [OpenTelemetry](https://opentelemetry.io/) traces. Using a compatible tracing subscriber like
44//!   [OTLP](https://github.com/open-telemetry/opentelemetry-rust/tree/main/opentelemetry-otlp),
45//!   each RPC can be traced through the client, server, and other dependencies downstream of the
46//!   server. Even for applications not connected to a distributed tracing collector, the
47//!   instrumentation can also be ingested by regular loggers like
48//!   [env_logger](https://github.com/env-logger-rs/env_logger/).
49//! - Serde serialization: enabling the `serde1` Cargo feature will make service requests and
50//!   responses `Serialize + Deserialize`. It's entirely optional, though: in-memory transports can
51//!   be used, as well, so the price of serialization doesn't have to be paid when it's not needed.
52//!
53//! ## Usage
54//! Add to your `Cargo.toml` dependencies:
55//!
56//! ```toml
57//! tarpc = "0.36"
58//! ```
59//!
60//! The `tarpc::service` attribute expands to a collection of items that form an rpc service.
61//! These generated types make it easy and ergonomic to write servers with less boilerplate.
62//! Simply implement the generated service trait, and you're off to the races!
63//!
64//! ## Example
65//!
66//! This example uses [tokio](https://tokio.rs), so add the following dependencies to
67//! your `Cargo.toml`:
68//!
69//! ```toml
70//! anyhow = "1.0"
71//! futures = "0.3"
72//! tarpc = { version = "0.36", features = ["tokio1"] }
73//! tokio = { version = "1.0", features = ["macros"] }
74//! ```
75//!
76//! In the following example, we use an in-process channel for communication between
77//! client and server. In real code, you will likely communicate over the network.
78//! For a more real-world example, see [example-service](example-service).
79//!
80//! First, let's set up the dependencies and service definition.
81//!
82//! ```rust
83//! # extern crate futures;
84//!
85//! use futures::{
86//!     prelude::*,
87//! };
88//! use tarpc::{
89//!     client, context,
90//!     server::{self, incoming::Incoming, Channel},
91//! };
92//!
93//! // This is the service definition. It looks a lot like a trait definition.
94//! // It defines one RPC, hello, which takes one arg, name, and returns a String.
95//! #[tarpc::service]
96//! trait World {
97//!     /// Returns a greeting for name.
98//!     async fn hello(name: String) -> String;
99//! }
100//! ```
101//!
102//! This service definition generates a trait called `World`. Next we need to
103//! implement it for our Server struct.
104//!
105//! ```rust
106//! # extern crate futures;
107//! # use futures::{
108//! #     prelude::*,
109//! # };
110//! # use tarpc::{
111//! #     client, context,
112//! #     server::{self, incoming::Incoming},
113//! # };
114//! # // This is the service definition. It looks a lot like a trait definition.
115//! # // It defines one RPC, hello, which takes one arg, name, and returns a String.
116//! # #[tarpc::service]
117//! # trait World {
118//! #     /// Returns a greeting for name.
119//! #     async fn hello(name: String) -> String;
120//! # }
121//! // This is the type that implements the generated World trait. It is the business logic
122//! // and is used to start the server.
123//! #[derive(Clone)]
124//! struct HelloServer;
125//!
126//! impl World for HelloServer {
127//!     // Each defined rpc generates an async fn that serves the RPC
128//!     async fn hello(self, _: context::Context, name: String) -> String {
129//!         format!("Hello, {name}!")
130//!     }
131//! }
132//! ```
133//!
134//! Lastly let's write our `main` that will start the server. While this example uses an
135//! [in-process channel](transport::channel), tarpc also ships a generic [`serde_transport`]
136//! behind the `serde-transport` feature, with additional [TCP](serde_transport::tcp) functionality
137//! available behind the `tcp` feature.
138//!
139//! ```rust
140//! # extern crate futures;
141//! # use futures::{
142//! #     prelude::*,
143//! # };
144//! # use tarpc::{
145//! #     client, context,
146//! #     server::{self, Channel},
147//! # };
148//! # // This is the service definition. It looks a lot like a trait definition.
149//! # // It defines one RPC, hello, which takes one arg, name, and returns a String.
150//! # #[tarpc::service]
151//! # trait World {
152//! #     /// Returns a greeting for name.
153//! #     async fn hello(name: String) -> String;
154//! # }
155//! # // This is the type that implements the generated World trait. It is the business logic
156//! # // and is used to start the server.
157//! # #[derive(Clone)]
158//! # struct HelloServer;
159//! # impl World for HelloServer {
160//!     // Each defined rpc generates an async fn that serves the RPC
161//! #     async fn hello(self, _: context::Context, name: String) -> String {
162//! #         format!("Hello, {name}!")
163//! #     }
164//! # }
165//! # #[cfg(not(feature = "tokio1"))]
166//! # fn main() {}
167//! # #[cfg(feature = "tokio1")]
168//! #[tokio::main]
169//! async fn main() -> anyhow::Result<()> {
170//!     let (client_transport, server_transport) = tarpc::transport::channel::unbounded();
171//!
172//!     let server = server::BaseChannel::with_defaults(server_transport);
173//!     tokio::spawn(
174//!         server.execute(HelloServer.serve())
175//!             // Handle all requests concurrently.
176//!             .for_each(|response| async move {
177//!                 tokio::spawn(response);
178//!             }));
179//!
180//!     // WorldClient is generated by the #[tarpc::service] attribute. It has a constructor `new`
181//!     // that takes a config and any Transport as input.
182//!     let mut client = WorldClient::new(client::Config::default(), client_transport).spawn();
183//!
184//!     // The client has an RPC method for each RPC defined in the annotated trait. It takes the same
185//!     // args as defined, with the addition of a Context, which is always the first arg. The Context
186//!     // specifies a deadline and trace information which can be helpful in debugging requests.
187//!     let hello = client.hello(context::current(), "Stim".to_string()).await?;
188//!
189//!     println!("{hello}");
190//!
191//!     Ok(())
192//! }
193//! ```
194//!
195//! ## Service Documentation
196//!
197//! Use `cargo doc` as you normally would to see the documentation created for all
198//! items expanded by a `service!` invocation.
199
200#![deny(missing_docs)]
201#![allow(clippy::type_complexity)]
202#![cfg_attr(docsrs, feature(doc_cfg))]
203
204#[cfg(feature = "serde1")]
205#[doc(hidden)]
206pub use serde;
207
208#[cfg(feature = "serde-transport")]
209pub use {tokio_serde, tokio_util};
210
211#[cfg(feature = "serde-transport")]
212#[cfg_attr(docsrs, doc(cfg(feature = "serde-transport")))]
213pub mod serde_transport;
214
215pub mod trace;
216
217#[cfg(feature = "serde1")]
218pub use tarpc_plugins::derive_serde;
219
220/// The main macro that creates RPC services.
221///
222/// Rpc methods are specified, mirroring trait syntax:
223///
224/// ```
225/// #[tarpc::service]
226/// trait Service {
227/// /// Say hello
228/// async fn hello(name: String) -> String;
229/// }
230/// ```
231///
232/// Attributes can be attached to each rpc. These attributes
233/// will then be attached to the generated service traits'
234/// corresponding `fn`s, as well as to the client stubs' RPCs.
235///
236/// The following items are expanded in the enclosing module:
237///
238/// * `trait Service` -- defines the RPC service.
239///   * `fn serve` -- turns a service impl into a request handler.
240/// * `Client` -- a client stub with a fn for each RPC.
241///   * `fn new_stub` -- creates a new Client stub.
242pub use tarpc_plugins::service;
243
244pub(crate) mod cancellations;
245pub mod client;
246pub mod context;
247pub mod server;
248pub mod transport;
249pub(crate) mod util;
250
251pub use crate::transport::sealed::Transport;
252
253use std::{any::Any, error::Error, io, sync::Arc, time::Instant};
254
255/// A message from a client to a server.
256#[derive(Debug)]
257#[cfg_attr(feature = "serde1", derive(serde::Serialize, serde::Deserialize))]
258#[non_exhaustive]
259pub enum ClientMessage<T> {
260    /// A request initiated by a user. The server responds to a request by invoking a
261    /// service-provided request handler.  The handler completes with a [`response`](Response), which
262    /// the server sends back to the client.
263    Request(Request<T>),
264    /// A command to cancel an in-flight request, automatically sent by the client when a response
265    /// future is dropped.
266    ///
267    /// When received, the server will immediately cancel the main task (top-level future) of the
268    /// request handler for the associated request. Any tasks spawned by the request handler will
269    /// not be canceled, because the framework layer does not
270    /// know about them.
271    Cancel {
272        /// The trace context associates the message with a specific chain of causally-related actions,
273        /// possibly orchestrated across many distributed systems.
274        #[cfg_attr(feature = "serde1", serde(default))]
275        trace_context: trace::Context,
276        /// The ID of the request to cancel.
277        request_id: u64,
278    },
279}
280
281/// A request from a client to a server.
282#[derive(Clone, Copy, Debug)]
283#[cfg_attr(feature = "serde1", derive(serde::Serialize, serde::Deserialize))]
284pub struct Request<T> {
285    /// Trace context, deadline, and other cross-cutting concerns.
286    pub context: context::Context,
287    /// Uniquely identifies the request across all requests sent over a single channel.
288    pub id: u64,
289    /// The request body.
290    pub message: T,
291}
292
293/// Implemented by the request types generated by tarpc::service.
294pub trait RequestName {
295    /// The name of a request.
296    fn name(&self) -> &str;
297}
298
299impl<Req> RequestName for Arc<Req>
300where
301    Req: RequestName,
302{
303    fn name(&self) -> &str {
304        self.as_ref().name()
305    }
306}
307
308impl<Req> RequestName for Box<Req>
309where
310    Req: RequestName,
311{
312    fn name(&self) -> &str {
313        self.as_ref().name()
314    }
315}
316
317/// Impls for common std types for testing.
318impl RequestName for String {
319    fn name(&self) -> &str {
320        "string"
321    }
322}
323
324impl RequestName for char {
325    fn name(&self) -> &str {
326        "char"
327    }
328}
329
330impl RequestName for () {
331    fn name(&self) -> &str {
332        "unit"
333    }
334}
335
336impl RequestName for i32 {
337    fn name(&self) -> &str {
338        "i32"
339    }
340}
341
342impl RequestName for u32 {
343    fn name(&self) -> &str {
344        "u32"
345    }
346}
347
348impl RequestName for i64 {
349    fn name(&self) -> &str {
350        "i64"
351    }
352}
353
354impl RequestName for u64 {
355    fn name(&self) -> &str {
356        "u64"
357    }
358}
359
360/// A response from a server to a client.
361#[derive(Clone, Debug, PartialEq, Eq, Hash)]
362#[cfg_attr(feature = "serde1", derive(serde::Serialize, serde::Deserialize))]
363pub struct Response<T> {
364    /// The ID of the request being responded to.
365    pub request_id: u64,
366    /// The response body, or an error if the request failed.
367    pub message: Result<T, ServerError>,
368}
369
370/// An error indicating the server aborted the request early, e.g., due to request throttling.
371#[derive(thiserror::Error, Clone, Debug, PartialEq, Eq, Hash)]
372#[error("{kind:?}: {detail}")]
373#[non_exhaustive]
374#[cfg_attr(feature = "serde1", derive(serde::Serialize, serde::Deserialize))]
375pub struct ServerError {
376    #[cfg_attr(
377        feature = "serde1",
378        serde(serialize_with = "util::serde::serialize_io_error_kind_as_u32")
379    )]
380    #[cfg_attr(
381        feature = "serde1",
382        serde(deserialize_with = "util::serde::deserialize_io_error_kind_from_u32")
383    )]
384    /// The type of error that occurred to fail the request.
385    pub kind: io::ErrorKind,
386    /// A message describing more detail about the error that occurred.
387    pub detail: String,
388}
389
390/// Critical errors that result in a Channel disconnecting.
391#[derive(thiserror::Error, Debug, PartialEq, Eq)]
392pub enum ChannelError<E>
393where
394    E: ?Sized,
395{
396    /// Could not read from the transport.
397    #[error("could not read from the transport")]
398    Read(#[source] Arc<E>),
399    /// Could not ready the transport for writes.
400    #[error("could not ready the transport for writes")]
401    Ready(#[source] Arc<E>),
402    /// Could not write to the transport.
403    #[error("could not write to the transport")]
404    Write(#[source] Arc<E>),
405    /// Could not flush the transport.
406    #[error("could not flush the transport")]
407    Flush(#[source] Arc<E>),
408    /// Could not close the write end of the transport.
409    #[error("could not close the write end of the transport")]
410    Close(#[source] Arc<E>),
411}
412
413impl<E> Clone for ChannelError<E>
414where
415    E: ?Sized,
416{
417    fn clone(&self) -> Self {
418        use ChannelError::*;
419        match self {
420            Read(e) => Read(e.clone()),
421            Ready(e) => Ready(e.clone()),
422            Write(e) => Write(e.clone()),
423            Flush(e) => Flush(e.clone()),
424            Close(e) => Close(e.clone()),
425        }
426    }
427}
428
429impl<E> ChannelError<E>
430where
431    E: Error + Send + Sync + 'static,
432{
433    /// Converts the ChannelError's source error type to a dyn Error. This is useful in type-erased
434    /// contexts, for example, storing a ChannelError in a non-generic type like
435    /// [`client::RpcError`].
436    fn upcast_error(self) -> ChannelError<dyn Error + Send + Sync + 'static> {
437        use ChannelError::*;
438        match self {
439            Read(e) => Read(e),
440            Ready(e) => Ready(e),
441            Write(e) => Write(e),
442            Flush(e) => Flush(e),
443            Close(e) => Close(e),
444        }
445    }
446}
447
448impl<E> ChannelError<E>
449where
450    E: Send + Sync + 'static,
451{
452    /// Converts the ChannelError's source error type to a dyn Any. This is useful in type-erased
453    /// contexts, for example, storing a ChannelError in a non-generic type like
454    /// [`client::RpcError`].
455    fn upcast_any(self) -> ChannelError<dyn Any + Send + Sync + 'static> {
456        use ChannelError::*;
457        match self {
458            Read(e) => Read(e),
459            Ready(e) => Ready(e),
460            Write(e) => Write(e),
461            Flush(e) => Flush(e),
462            Close(e) => Close(e),
463        }
464    }
465}
466
467impl ChannelError<dyn Any + Send + Sync + 'static> {
468    /// Converts the ChannelError's source error type to a concrete type. This is useful in
469    /// type-erased contexts, for example, storing a ChannelError in a non-generic type like
470    /// [`Client::RpcError`].
471    fn downcast<E>(self) -> Result<ChannelError<E>, Self>
472    where
473        E: Any + Send + Sync,
474    {
475        use ChannelError::*;
476        match self {
477            Read(e) => e.downcast::<E>().map(Read).map_err(Read),
478            Ready(e) => e.downcast::<E>().map(Ready).map_err(Ready),
479            Write(e) => e.downcast::<E>().map(Write).map_err(Write),
480            Flush(e) => e.downcast::<E>().map(Flush).map_err(Flush),
481            Close(e) => e.downcast::<E>().map(Close).map_err(Close),
482        }
483    }
484}
485
486impl ServerError {
487    /// Returns a new server error with `kind` and `detail`.
488    pub fn new(kind: io::ErrorKind, detail: String) -> ServerError {
489        Self { kind, detail }
490    }
491}
492
493impl<T> Request<T> {
494    /// Returns the deadline for this request.
495    pub fn deadline(&self) -> &Instant {
496        &self.context.deadline
497    }
498}
499
500#[test]
501fn test_channel_any_casts() {
502    use assert_matches::assert_matches;
503    let any = ChannelError::Read(Arc::new("")).upcast_any();
504    assert_matches!(any, ChannelError::Read(_));
505    assert_matches!(any.downcast::<&'static str>(), Ok(ChannelError::Read(_)));
506
507    let any = ChannelError::Ready(Arc::new("")).upcast_any();
508    assert_matches!(any, ChannelError::Ready(_));
509    assert_matches!(any.downcast::<&'static str>(), Ok(ChannelError::Ready(_)));
510
511    let any = ChannelError::Write(Arc::new("")).upcast_any();
512    assert_matches!(any, ChannelError::Write(_));
513    assert_matches!(any.downcast::<&'static str>(), Ok(ChannelError::Write(_)));
514
515    let any = ChannelError::Flush(Arc::new("")).upcast_any();
516    assert_matches!(any, ChannelError::Flush(_));
517    assert_matches!(any.downcast::<&'static str>(), Ok(ChannelError::Flush(_)));
518
519    let any = ChannelError::Close(Arc::new("")).upcast_any();
520    assert_matches!(any, ChannelError::Close(_));
521    assert_matches!(any.downcast::<&'static str>(), Ok(ChannelError::Close(_)));
522}
523
524#[test]
525fn test_channel_error_upcast() {
526    use assert_matches::assert_matches;
527    use std::fmt;
528
529    #[derive(Debug)]
530    struct E;
531    impl fmt::Display for E {
532        fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
533            write!(f, "E")
534        }
535    }
536    impl Error for E {}
537    assert_matches!(
538        ChannelError::Read(Arc::new(E)).upcast_error(),
539        ChannelError::Read(_)
540    );
541    assert_matches!(
542        ChannelError::Ready(Arc::new(E)).upcast_error(),
543        ChannelError::Ready(_)
544    );
545    assert_matches!(
546        ChannelError::Write(Arc::new(E)).upcast_error(),
547        ChannelError::Write(_)
548    );
549    assert_matches!(
550        ChannelError::Flush(Arc::new(E)).upcast_error(),
551        ChannelError::Flush(_)
552    );
553    assert_matches!(
554        ChannelError::Close(Arc::new(E)).upcast_error(),
555        ChannelError::Close(_)
556    );
557}