tarpc/lib.rs
1// Copyright 2018 Google LLC
2//
3// Use of this source code is governed by an MIT-style
4// license that can be found in the LICENSE file or at
5// https://opensource.org/licenses/MIT.
6//! *Disclaimer*: This is not an official Google product.
7//!
8//! tarpc is an RPC framework for rust with a focus on ease of use. Defining a
9//! service can be done in just a few lines of code, and most of the boilerplate of
10//! writing a server is taken care of for you.
11//!
12//! [Documentation](https://docs.rs/crate/tarpc/)
13//!
14//! ## What is an RPC framework?
15//! "RPC" stands for "Remote Procedure Call," a function call where the work of
16//! producing the return value is being done somewhere else. When an rpc function is
17//! invoked, behind the scenes the function contacts some other process somewhere
18//! and asks them to evaluate the function instead. The original function then
19//! returns the value produced by the other process.
20//!
21//! RPC frameworks are a fundamental building block of most microservices-oriented
22//! architectures. Two well-known ones are [gRPC](http://www.grpc.io) and
23//! [Cap'n Proto](https://capnproto.org/).
24//!
25//! tarpc differentiates itself from other RPC frameworks by defining the schema in code,
26//! rather than in a separate language such as .proto. This means there's no separate compilation
27//! process, and no context switching between different languages.
28//!
29//! Some other features of tarpc:
30//! - Pluggable transport: any type implementing `Stream<Item = Request> + Sink<Response>` can be
31//! used as a transport to connect the client and server.
32//! - `Send + 'static` optional: if the transport doesn't require it, neither does tarpc!
33//! - Cascading cancellation: dropping a request will send a cancellation message to the server.
34//! The server will cease any unfinished work on the request, subsequently cancelling any of its
35//! own requests, repeating for the entire chain of transitive dependencies.
36//! - Configurable deadlines and deadline propagation: request deadlines default to 10s if
37//! unspecified. The server will automatically cease work when the deadline has passed. Any
38//! requests sent by the server that use the request context will propagate the request deadline.
39//! For example, if a server is handling a request with a 10s deadline, does 2s of work, then
40//! sends a request to another server, that server will see an 8s deadline.
41//! - Distributed tracing: tarpc is instrumented with
42//! [tracing](https://github.com/tokio-rs/tracing) primitives extended with
43//! [OpenTelemetry](https://opentelemetry.io/) traces. Using a compatible tracing subscriber like
44//! [OTLP](https://github.com/open-telemetry/opentelemetry-rust/tree/main/opentelemetry-otlp),
45//! each RPC can be traced through the client, server, and other dependencies downstream of the
46//! server. Even for applications not connected to a distributed tracing collector, the
47//! instrumentation can also be ingested by regular loggers like
48//! [env_logger](https://github.com/env-logger-rs/env_logger/).
49//! - Serde serialization: enabling the `serde1` Cargo feature will make service requests and
50//! responses `Serialize + Deserialize`. It's entirely optional, though: in-memory transports can
51//! be used, as well, so the price of serialization doesn't have to be paid when it's not needed.
52//!
53//! ## Usage
54//! Add to your `Cargo.toml` dependencies:
55//!
56//! ```toml
57//! tarpc = "0.36"
58//! ```
59//!
60//! The `tarpc::service` attribute expands to a collection of items that form an rpc service.
61//! These generated types make it easy and ergonomic to write servers with less boilerplate.
62//! Simply implement the generated service trait, and you're off to the races!
63//!
64//! ## Example
65//!
66//! This example uses [tokio](https://tokio.rs), so add the following dependencies to
67//! your `Cargo.toml`:
68//!
69//! ```toml
70//! anyhow = "1.0"
71//! futures = "0.3"
72//! tarpc = { version = "0.36", features = ["tokio1"] }
73//! tokio = { version = "1.0", features = ["macros"] }
74//! ```
75//!
76//! In the following example, we use an in-process channel for communication between
77//! client and server. In real code, you will likely communicate over the network.
78//! For a more real-world example, see [example-service](example-service).
79//!
80//! First, let's set up the dependencies and service definition.
81//!
82//! ```rust
83//! # extern crate futures;
84//!
85//! use futures::{
86//! prelude::*,
87//! };
88//! use tarpc::{
89//! client, context,
90//! server::{self, incoming::Incoming, Channel},
91//! };
92//!
93//! // This is the service definition. It looks a lot like a trait definition.
94//! // It defines one RPC, hello, which takes one arg, name, and returns a String.
95//! #[tarpc::service]
96//! trait World {
97//! /// Returns a greeting for name.
98//! async fn hello(name: String) -> String;
99//! }
100//! ```
101//!
102//! This service definition generates a trait called `World`. Next we need to
103//! implement it for our Server struct.
104//!
105//! ```rust
106//! # extern crate futures;
107//! # use futures::{
108//! # prelude::*,
109//! # };
110//! # use tarpc::{
111//! # client, context,
112//! # server::{self, incoming::Incoming},
113//! # };
114//! # // This is the service definition. It looks a lot like a trait definition.
115//! # // It defines one RPC, hello, which takes one arg, name, and returns a String.
116//! # #[tarpc::service]
117//! # trait World {
118//! # /// Returns a greeting for name.
119//! # async fn hello(name: String) -> String;
120//! # }
121//! // This is the type that implements the generated World trait. It is the business logic
122//! // and is used to start the server.
123//! #[derive(Clone)]
124//! struct HelloServer;
125//!
126//! impl World for HelloServer {
127//! // Each defined rpc generates an async fn that serves the RPC
128//! async fn hello(self, _: context::Context, name: String) -> String {
129//! format!("Hello, {name}!")
130//! }
131//! }
132//! ```
133//!
134//! Lastly let's write our `main` that will start the server. While this example uses an
135//! [in-process channel](transport::channel), tarpc also ships a generic [`serde_transport`]
136//! behind the `serde-transport` feature, with additional [TCP](serde_transport::tcp) functionality
137//! available behind the `tcp` feature.
138//!
139//! ```rust
140//! # extern crate futures;
141//! # use futures::{
142//! # prelude::*,
143//! # };
144//! # use tarpc::{
145//! # client, context,
146//! # server::{self, Channel},
147//! # };
148//! # // This is the service definition. It looks a lot like a trait definition.
149//! # // It defines one RPC, hello, which takes one arg, name, and returns a String.
150//! # #[tarpc::service]
151//! # trait World {
152//! # /// Returns a greeting for name.
153//! # async fn hello(name: String) -> String;
154//! # }
155//! # // This is the type that implements the generated World trait. It is the business logic
156//! # // and is used to start the server.
157//! # #[derive(Clone)]
158//! # struct HelloServer;
159//! # impl World for HelloServer {
160//! // Each defined rpc generates an async fn that serves the RPC
161//! # async fn hello(self, _: context::Context, name: String) -> String {
162//! # format!("Hello, {name}!")
163//! # }
164//! # }
165//! # #[cfg(not(feature = "tokio1"))]
166//! # fn main() {}
167//! # #[cfg(feature = "tokio1")]
168//! #[tokio::main]
169//! async fn main() -> anyhow::Result<()> {
170//! let (client_transport, server_transport) = tarpc::transport::channel::unbounded();
171//!
172//! let server = server::BaseChannel::with_defaults(server_transport);
173//! tokio::spawn(
174//! server.execute(HelloServer.serve())
175//! // Handle all requests concurrently.
176//! .for_each(|response| async move {
177//! tokio::spawn(response);
178//! }));
179//!
180//! // WorldClient is generated by the #[tarpc::service] attribute. It has a constructor `new`
181//! // that takes a config and any Transport as input.
182//! let mut client = WorldClient::new(client::Config::default(), client_transport).spawn();
183//!
184//! // The client has an RPC method for each RPC defined in the annotated trait. It takes the same
185//! // args as defined, with the addition of a Context, which is always the first arg. The Context
186//! // specifies a deadline and trace information which can be helpful in debugging requests.
187//! let hello = client.hello(context::current(), "Stim".to_string()).await?;
188//!
189//! println!("{hello}");
190//!
191//! Ok(())
192//! }
193//! ```
194//!
195//! ## Service Documentation
196//!
197//! Use `cargo doc` as you normally would to see the documentation created for all
198//! items expanded by a `service!` invocation.
199
200#![deny(missing_docs)]
201#![allow(clippy::type_complexity)]
202#![cfg_attr(docsrs, feature(doc_cfg))]
203
204#[cfg(feature = "serde1")]
205#[doc(hidden)]
206pub use serde;
207
208#[cfg(feature = "serde-transport")]
209pub use {tokio_serde, tokio_util};
210
211#[cfg(feature = "serde-transport")]
212#[cfg_attr(docsrs, doc(cfg(feature = "serde-transport")))]
213pub mod serde_transport;
214
215pub mod trace;
216
217#[cfg(feature = "serde1")]
218pub use tarpc_plugins::derive_serde;
219
220/// The main macro that creates RPC services.
221///
222/// Rpc methods are specified, mirroring trait syntax:
223///
224/// ```
225/// #[tarpc::service]
226/// trait Service {
227/// /// Say hello
228/// async fn hello(name: String) -> String;
229/// }
230/// ```
231///
232/// Attributes can be attached to each rpc. These attributes
233/// will then be attached to the generated service traits'
234/// corresponding `fn`s, as well as to the client stubs' RPCs.
235///
236/// The following items are expanded in the enclosing module:
237///
238/// * `trait Service` -- defines the RPC service.
239/// * `fn serve` -- turns a service impl into a request handler.
240/// * `Client` -- a client stub with a fn for each RPC.
241/// * `fn new_stub` -- creates a new Client stub.
242pub use tarpc_plugins::service;
243
244pub(crate) mod cancellations;
245pub mod client;
246pub mod context;
247pub mod server;
248pub mod transport;
249pub(crate) mod util;
250
251pub use crate::transport::sealed::Transport;
252
253use std::{any::Any, error::Error, io, sync::Arc, time::Instant};
254
255/// A message from a client to a server.
256#[derive(Debug)]
257#[cfg_attr(feature = "serde1", derive(serde::Serialize, serde::Deserialize))]
258#[non_exhaustive]
259pub enum ClientMessage<T> {
260 /// A request initiated by a user. The server responds to a request by invoking a
261 /// service-provided request handler. The handler completes with a [`response`](Response), which
262 /// the server sends back to the client.
263 Request(Request<T>),
264 /// A command to cancel an in-flight request, automatically sent by the client when a response
265 /// future is dropped.
266 ///
267 /// When received, the server will immediately cancel the main task (top-level future) of the
268 /// request handler for the associated request. Any tasks spawned by the request handler will
269 /// not be canceled, because the framework layer does not
270 /// know about them.
271 Cancel {
272 /// The trace context associates the message with a specific chain of causally-related actions,
273 /// possibly orchestrated across many distributed systems.
274 #[cfg_attr(feature = "serde1", serde(default))]
275 trace_context: trace::Context,
276 /// The ID of the request to cancel.
277 request_id: u64,
278 },
279}
280
281/// A request from a client to a server.
282#[derive(Clone, Copy, Debug)]
283#[cfg_attr(feature = "serde1", derive(serde::Serialize, serde::Deserialize))]
284pub struct Request<T> {
285 /// Trace context, deadline, and other cross-cutting concerns.
286 pub context: context::Context,
287 /// Uniquely identifies the request across all requests sent over a single channel.
288 pub id: u64,
289 /// The request body.
290 pub message: T,
291}
292
293/// Implemented by the request types generated by tarpc::service.
294pub trait RequestName {
295 /// The name of a request.
296 fn name(&self) -> &str;
297}
298
299impl<Req> RequestName for Arc<Req>
300where
301 Req: RequestName,
302{
303 fn name(&self) -> &str {
304 self.as_ref().name()
305 }
306}
307
308impl<Req> RequestName for Box<Req>
309where
310 Req: RequestName,
311{
312 fn name(&self) -> &str {
313 self.as_ref().name()
314 }
315}
316
317/// Impls for common std types for testing.
318impl RequestName for String {
319 fn name(&self) -> &str {
320 "string"
321 }
322}
323
324impl RequestName for char {
325 fn name(&self) -> &str {
326 "char"
327 }
328}
329
330impl RequestName for () {
331 fn name(&self) -> &str {
332 "unit"
333 }
334}
335
336impl RequestName for i32 {
337 fn name(&self) -> &str {
338 "i32"
339 }
340}
341
342impl RequestName for u32 {
343 fn name(&self) -> &str {
344 "u32"
345 }
346}
347
348impl RequestName for i64 {
349 fn name(&self) -> &str {
350 "i64"
351 }
352}
353
354impl RequestName for u64 {
355 fn name(&self) -> &str {
356 "u64"
357 }
358}
359
360/// A response from a server to a client.
361#[derive(Clone, Debug, PartialEq, Eq, Hash)]
362#[cfg_attr(feature = "serde1", derive(serde::Serialize, serde::Deserialize))]
363pub struct Response<T> {
364 /// The ID of the request being responded to.
365 pub request_id: u64,
366 /// The response body, or an error if the request failed.
367 pub message: Result<T, ServerError>,
368}
369
370/// An error indicating the server aborted the request early, e.g., due to request throttling.
371#[derive(thiserror::Error, Clone, Debug, PartialEq, Eq, Hash)]
372#[error("{kind:?}: {detail}")]
373#[non_exhaustive]
374#[cfg_attr(feature = "serde1", derive(serde::Serialize, serde::Deserialize))]
375pub struct ServerError {
376 #[cfg_attr(
377 feature = "serde1",
378 serde(serialize_with = "util::serde::serialize_io_error_kind_as_u32")
379 )]
380 #[cfg_attr(
381 feature = "serde1",
382 serde(deserialize_with = "util::serde::deserialize_io_error_kind_from_u32")
383 )]
384 /// The type of error that occurred to fail the request.
385 pub kind: io::ErrorKind,
386 /// A message describing more detail about the error that occurred.
387 pub detail: String,
388}
389
390/// Critical errors that result in a Channel disconnecting.
391#[derive(thiserror::Error, Debug, PartialEq, Eq)]
392pub enum ChannelError<E>
393where
394 E: ?Sized,
395{
396 /// Could not read from the transport.
397 #[error("could not read from the transport")]
398 Read(#[source] Arc<E>),
399 /// Could not ready the transport for writes.
400 #[error("could not ready the transport for writes")]
401 Ready(#[source] Arc<E>),
402 /// Could not write to the transport.
403 #[error("could not write to the transport")]
404 Write(#[source] Arc<E>),
405 /// Could not flush the transport.
406 #[error("could not flush the transport")]
407 Flush(#[source] Arc<E>),
408 /// Could not close the write end of the transport.
409 #[error("could not close the write end of the transport")]
410 Close(#[source] Arc<E>),
411}
412
413impl<E> Clone for ChannelError<E>
414where
415 E: ?Sized,
416{
417 fn clone(&self) -> Self {
418 use ChannelError::*;
419 match self {
420 Read(e) => Read(e.clone()),
421 Ready(e) => Ready(e.clone()),
422 Write(e) => Write(e.clone()),
423 Flush(e) => Flush(e.clone()),
424 Close(e) => Close(e.clone()),
425 }
426 }
427}
428
429impl<E> ChannelError<E>
430where
431 E: Error + Send + Sync + 'static,
432{
433 /// Converts the ChannelError's source error type to a dyn Error. This is useful in type-erased
434 /// contexts, for example, storing a ChannelError in a non-generic type like
435 /// [`client::RpcError`].
436 fn upcast_error(self) -> ChannelError<dyn Error + Send + Sync + 'static> {
437 use ChannelError::*;
438 match self {
439 Read(e) => Read(e),
440 Ready(e) => Ready(e),
441 Write(e) => Write(e),
442 Flush(e) => Flush(e),
443 Close(e) => Close(e),
444 }
445 }
446}
447
448impl<E> ChannelError<E>
449where
450 E: Send + Sync + 'static,
451{
452 /// Converts the ChannelError's source error type to a dyn Any. This is useful in type-erased
453 /// contexts, for example, storing a ChannelError in a non-generic type like
454 /// [`client::RpcError`].
455 fn upcast_any(self) -> ChannelError<dyn Any + Send + Sync + 'static> {
456 use ChannelError::*;
457 match self {
458 Read(e) => Read(e),
459 Ready(e) => Ready(e),
460 Write(e) => Write(e),
461 Flush(e) => Flush(e),
462 Close(e) => Close(e),
463 }
464 }
465}
466
467impl ChannelError<dyn Any + Send + Sync + 'static> {
468 /// Converts the ChannelError's source error type to a concrete type. This is useful in
469 /// type-erased contexts, for example, storing a ChannelError in a non-generic type like
470 /// [`Client::RpcError`].
471 fn downcast<E>(self) -> Result<ChannelError<E>, Self>
472 where
473 E: Any + Send + Sync,
474 {
475 use ChannelError::*;
476 match self {
477 Read(e) => e.downcast::<E>().map(Read).map_err(Read),
478 Ready(e) => e.downcast::<E>().map(Ready).map_err(Ready),
479 Write(e) => e.downcast::<E>().map(Write).map_err(Write),
480 Flush(e) => e.downcast::<E>().map(Flush).map_err(Flush),
481 Close(e) => e.downcast::<E>().map(Close).map_err(Close),
482 }
483 }
484}
485
486impl ServerError {
487 /// Returns a new server error with `kind` and `detail`.
488 pub fn new(kind: io::ErrorKind, detail: String) -> ServerError {
489 Self { kind, detail }
490 }
491}
492
493impl<T> Request<T> {
494 /// Returns the deadline for this request.
495 pub fn deadline(&self) -> &Instant {
496 &self.context.deadline
497 }
498}
499
500#[test]
501fn test_channel_any_casts() {
502 use assert_matches::assert_matches;
503 let any = ChannelError::Read(Arc::new("")).upcast_any();
504 assert_matches!(any, ChannelError::Read(_));
505 assert_matches!(any.downcast::<&'static str>(), Ok(ChannelError::Read(_)));
506
507 let any = ChannelError::Ready(Arc::new("")).upcast_any();
508 assert_matches!(any, ChannelError::Ready(_));
509 assert_matches!(any.downcast::<&'static str>(), Ok(ChannelError::Ready(_)));
510
511 let any = ChannelError::Write(Arc::new("")).upcast_any();
512 assert_matches!(any, ChannelError::Write(_));
513 assert_matches!(any.downcast::<&'static str>(), Ok(ChannelError::Write(_)));
514
515 let any = ChannelError::Flush(Arc::new("")).upcast_any();
516 assert_matches!(any, ChannelError::Flush(_));
517 assert_matches!(any.downcast::<&'static str>(), Ok(ChannelError::Flush(_)));
518
519 let any = ChannelError::Close(Arc::new("")).upcast_any();
520 assert_matches!(any, ChannelError::Close(_));
521 assert_matches!(any.downcast::<&'static str>(), Ok(ChannelError::Close(_)));
522}
523
524#[test]
525fn test_channel_error_upcast() {
526 use assert_matches::assert_matches;
527 use std::fmt;
528
529 #[derive(Debug)]
530 struct E;
531 impl fmt::Display for E {
532 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
533 write!(f, "E")
534 }
535 }
536 impl Error for E {}
537 assert_matches!(
538 ChannelError::Read(Arc::new(E)).upcast_error(),
539 ChannelError::Read(_)
540 );
541 assert_matches!(
542 ChannelError::Ready(Arc::new(E)).upcast_error(),
543 ChannelError::Ready(_)
544 );
545 assert_matches!(
546 ChannelError::Write(Arc::new(E)).upcast_error(),
547 ChannelError::Write(_)
548 );
549 assert_matches!(
550 ChannelError::Flush(Arc::new(E)).upcast_error(),
551 ChannelError::Flush(_)
552 );
553 assert_matches!(
554 ChannelError::Close(Arc::new(E)).upcast_error(),
555 ChannelError::Close(_)
556 );
557}