forked_tarpc/lib.rs
1// Copyright 2018 Google LLC
2//
3// Use of this source code is governed by an MIT-style
4// license that can be found in the LICENSE file or at
5// https://opensource.org/licenses/MIT.
6//! *Disclaimer*: This is not an official Google product.
7//!
8//! tarpc is an RPC framework for rust with a focus on ease of use. Defining a
9//! service can be done in just a few lines of code, and most of the boilerplate of
10//! writing a server is taken care of for you.
11//!
12//! [Documentation](https://docs.rs/crate/tarpc/)
13//!
14//! ## What is an RPC framework?
15//! "RPC" stands for "Remote Procedure Call," a function call where the work of
16//! producing the return value is being done somewhere else. When an rpc function is
17//! invoked, behind the scenes the function contacts some other process somewhere
18//! and asks them to evaluate the function instead. The original function then
19//! returns the value produced by the other process.
20//!
21//! RPC frameworks are a fundamental building block of most microservices-oriented
22//! architectures. Two well-known ones are [gRPC](http://www.grpc.io) and
23//! [Cap'n Proto](https://capnproto.org/).
24//!
25//! tarpc differentiates itself from other RPC frameworks by defining the schema in code,
26//! rather than in a separate language such as .proto. This means there's no separate compilation
27//! process, and no context switching between different languages.
28//!
29//! Some other features of tarpc:
30//! - Pluggable transport: any type implementing `Stream<Item = Request> + Sink<Response>` can be
31//! used as a transport to connect the client and server.
32//! - `Send + 'static` optional: if the transport doesn't require it, neither does tarpc!
33//! - Cascading cancellation: dropping a request will send a cancellation message to the server.
34//! The server will cease any unfinished work on the request, subsequently cancelling any of its
35//! own requests, repeating for the entire chain of transitive dependencies.
36//! - Configurable deadlines and deadline propagation: request deadlines default to 10s if
37//! unspecified. The server will automatically cease work when the deadline has passed. Any
38//! requests sent by the server that use the request context will propagate the request deadline.
39//! For example, if a server is handling a request with a 10s deadline, does 2s of work, then
40//! sends a request to another server, that server will see an 8s deadline.
41//! - Distributed tracing: tarpc is instrumented with
42//! [tracing](https://github.com/tokio-rs/tracing) primitives extended with
43//! [OpenTelemetry](https://opentelemetry.io/) traces. Using a compatible tracing subscriber like
44//! [Jaeger](https://github.com/open-telemetry/opentelemetry-rust/tree/main/opentelemetry-jaeger),
45//! each RPC can be traced through the client, server, and other dependencies downstream of the
46//! server. Even for applications not connected to a distributed tracing collector, the
47//! instrumentation can also be ingested by regular loggers like
48//! [env_logger](https://github.com/env-logger-rs/env_logger/).
49//! - Serde serialization: enabling the `serde1` Cargo feature will make service requests and
50//! responses `Serialize + Deserialize`. It's entirely optional, though: in-memory transports can
51//! be used, as well, so the price of serialization doesn't have to be paid when it's not needed.
52//!
53//! ## Usage
54//! Add to your `Cargo.toml` dependencies:
55//!
56//! ```toml
57//! tarpc = "0.29"
58//! ```
59//!
60//! The `tarpc::service` attribute expands to a collection of items that form an rpc service.
61//! These generated types make it easy and ergonomic to write servers with less boilerplate.
62//! Simply implement the generated service trait, and you're off to the races!
63//!
64//! ## Example
65//!
66//! This example uses [tokio](https://tokio.rs), so add the following dependencies to
67//! your `Cargo.toml`:
68//!
69//! ```toml
70//! anyhow = "1.0"
71//! futures = "0.3"
72//! tarpc = { version = "0.29", features = ["tokio1"] }
73//! tokio = { version = "1.0", features = ["macros"] }
74//! ```
75//!
76//! In the following example, we use an in-process channel for communication between
77//! client and server. In real code, you will likely communicate over the network.
78//! For a more real-world example, see [example-service](example-service).
79//!
80//! First, let's set up the dependencies and service definition.
81//!
82//! ```rust
83//! # extern crate futures;
84//!
85//! use futures::{
86//! future::{self, Ready},
87//! prelude::*,
88//! };
89//! use tarpc::{
90//! client, context,
91//! server::{self, incoming::Incoming, Channel},
92//! };
93//!
94//! // This is the service definition. It looks a lot like a trait definition.
95//! // It defines one RPC, hello, which takes one arg, name, and returns a String.
96//! #[tarpc::service]
97//! trait World {
98//! /// Returns a greeting for name.
99//! async fn hello(name: String) -> String;
100//! }
101//! ```
102//!
103//! This service definition generates a trait called `World`. Next we need to
104//! implement it for our Server struct.
105//!
106//! ```rust
107//! # extern crate futures;
108//! # use futures::{
109//! # future::{self, Ready},
110//! # prelude::*,
111//! # };
112//! # use tarpc::{
113//! # client, context,
114//! # server::{self, incoming::Incoming},
115//! # };
116//! # // This is the service definition. It looks a lot like a trait definition.
117//! # // It defines one RPC, hello, which takes one arg, name, and returns a String.
118//! # #[tarpc::service]
119//! # trait World {
120//! # /// Returns a greeting for name.
121//! # async fn hello(name: String) -> String;
122//! # }
123//! // This is the type that implements the generated World trait. It is the business logic
124//! // and is used to start the server.
125//! #[derive(Clone)]
126//! struct HelloServer;
127//!
128//! impl World for HelloServer {
129//! // Each defined rpc generates two items in the trait, a fn that serves the RPC, and
130//! // an associated type representing the future output by the fn.
131//!
132//! type HelloFut = Ready<String>;
133//!
134//! fn hello(self, _: context::Context, name: String) -> Self::HelloFut {
135//! future::ready(format!("Hello, {name}!"))
136//! }
137//! }
138//! ```
139//!
140//! Lastly let's write our `main` that will start the server. While this example uses an
141//! [in-process channel](transport::channel), tarpc also ships a generic [`serde_transport`]
142//! behind the `serde-transport` feature, with additional [TCP](serde_transport::tcp) functionality
143//! available behind the `tcp` feature.
144//!
145//! ```rust
146//! # extern crate futures;
147//! # use futures::{
148//! # future::{self, Ready},
149//! # prelude::*,
150//! # };
151//! # use tarpc::{
152//! # client, context,
153//! # server::{self, Channel},
154//! # };
155//! # // This is the service definition. It looks a lot like a trait definition.
156//! # // It defines one RPC, hello, which takes one arg, name, and returns a String.
157//! # #[tarpc::service]
158//! # trait World {
159//! # /// Returns a greeting for name.
160//! # async fn hello(name: String) -> String;
161//! # }
162//! # // This is the type that implements the generated World trait. It is the business logic
163//! # // and is used to start the server.
164//! # #[derive(Clone)]
165//! # struct HelloServer;
166//! # impl World for HelloServer {
167//! # // Each defined rpc generates two items in the trait, a fn that serves the RPC, and
168//! # // an associated type representing the future output by the fn.
169//! # type HelloFut = Ready<String>;
170//! # fn hello(self, _: context::Context, name: String) -> Self::HelloFut {
171//! # future::ready(format!("Hello, {name}!"))
172//! # }
173//! # }
174//! # #[cfg(not(feature = "tokio1"))]
175//! # fn main() {}
176//! # #[cfg(feature = "tokio1")]
177//! #[tokio::main]
178//! async fn main() -> anyhow::Result<()> {
179//! let (client_transport, server_transport) = tarpc::transport::channel::unbounded();
180//!
181//! let server = server::BaseChannel::with_defaults(server_transport);
182//! tokio::spawn(server.execute(HelloServer.serve()));
183//!
184//! // WorldClient is generated by the #[tarpc::service] attribute. It has a constructor `new`
185//! // that takes a config and any Transport as input.
186//! let mut client = WorldClient::new(client::Config::default(), client_transport).spawn();
187//!
188//! // The client has an RPC method for each RPC defined in the annotated trait. It takes the same
189//! // args as defined, with the addition of a Context, which is always the first arg. The Context
190//! // specifies a deadline and trace information which can be helpful in debugging requests.
191//! let hello = client.hello(context::current(), "Stim".to_string()).await?;
192//!
193//! println!("{hello}");
194//!
195//! Ok(())
196//! }
197//! ```
198//!
199//! ## Service Documentation
200//!
201//! Use `cargo doc` as you normally would to see the documentation created for all
202//! items expanded by a `service!` invocation.
203#![deny(missing_docs)]
204#![allow(clippy::type_complexity)]
205#![cfg_attr(docsrs, feature(doc_cfg))]
206
207#[cfg(feature = "serde1")]
208#[doc(hidden)]
209pub use serde;
210
211#[cfg(feature = "serde-transport")]
212pub use {tokio_serde, tokio_util};
213
214#[cfg(feature = "serde-transport")]
215#[cfg_attr(docsrs, doc(cfg(feature = "serde-transport")))]
216pub mod serde_transport;
217
218pub mod trace;
219
220#[cfg(feature = "serde1")]
221pub use tarpc_plugins::derive_serde;
222
223/// The main macro that creates RPC services.
224///
225/// Rpc methods are specified, mirroring trait syntax:
226///
227/// ```
228/// #[tarpc::service]
229/// trait Service {
230/// /// Say hello
231/// async fn hello(name: String) -> String;
232/// }
233/// ```
234///
235/// Attributes can be attached to each rpc. These attributes
236/// will then be attached to the generated service traits'
237/// corresponding `fn`s, as well as to the client stubs' RPCs.
238///
239/// The following items are expanded in the enclosing module:
240///
241/// * `trait Service` -- defines the RPC service.
242/// * `fn serve` -- turns a service impl into a request handler.
243/// * `Client` -- a client stub with a fn for each RPC.
244/// * `fn new_stub` -- creates a new Client stub.
245pub use tarpc_plugins::service;
246
247/// A utility macro that can be used for RPC server implementations.
248///
249/// Syntactic sugar to make using async functions in the server implementation
250/// easier. It does this by rewriting code like this, which would normally not
251/// compile because async functions are disallowed in trait implementations:
252///
253/// ```rust
254/// # use tarpc::context;
255/// # use std::net::SocketAddr;
256/// #[tarpc::service]
257/// trait World {
258/// async fn hello(name: String) -> String;
259/// }
260///
261/// #[derive(Clone)]
262/// struct HelloServer(SocketAddr);
263///
264/// #[tarpc::server]
265/// impl World for HelloServer {
266/// async fn hello(self, _: context::Context, name: String) -> String {
267/// format!("Hello, {name}! You are connected from {:?}.", self.0)
268/// }
269/// }
270/// ```
271///
272/// Into code like this, which matches the service trait definition:
273///
274/// ```rust
275/// # use tarpc::context;
276/// # use std::pin::Pin;
277/// # use futures::Future;
278/// # use std::net::SocketAddr;
279/// #[derive(Clone)]
280/// struct HelloServer(SocketAddr);
281///
282/// #[tarpc::service]
283/// trait World {
284/// async fn hello(name: String) -> String;
285/// }
286///
287/// impl World for HelloServer {
288/// type HelloFut = Pin<Box<dyn Future<Output = String> + Send>>;
289///
290/// fn hello(self, _: context::Context, name: String) -> Pin<Box<dyn Future<Output = String>
291/// + Send>> {
292/// Box::pin(async move {
293/// format!("Hello, {name}! You are connected from {:?}.", self.0)
294/// })
295/// }
296/// }
297/// ```
298///
299/// Note that this won't touch functions unless they have been annotated with
300/// `async`, meaning that this should not break existing code.
301pub use tarpc_plugins::server;
302
303pub(crate) mod cancellations;
304pub mod client;
305pub mod context;
306pub mod server;
307pub mod transport;
308pub(crate) mod util;
309
310pub use crate::transport::sealed::Transport;
311
312use anyhow::Context as _;
313use futures::task::*;
314use std::{error::Error, fmt::Display, io, time::SystemTime};
315
316/// A message from a client to a server.
317#[derive(Debug)]
318#[cfg_attr(feature = "serde1", derive(serde::Serialize, serde::Deserialize))]
319#[non_exhaustive]
320pub enum ClientMessage<T> {
321 /// A request initiated by a user. The server responds to a request by invoking a
322 /// service-provided request handler. The handler completes with a [`response`](Response), which
323 /// the server sends back to the client.
324 Request(Request<T>),
325 /// A command to cancel an in-flight request, automatically sent by the client when a response
326 /// future is dropped.
327 ///
328 /// When received, the server will immediately cancel the main task (top-level future) of the
329 /// request handler for the associated request. Any tasks spawned by the request handler will
330 /// not be canceled, because the framework layer does not
331 /// know about them.
332 Cancel {
333 /// The trace context associates the message with a specific chain of causally-related actions,
334 /// possibly orchestrated across many distributed systems.
335 #[cfg_attr(feature = "serde1", serde(default))]
336 trace_context: trace::Context,
337 /// The ID of the request to cancel.
338 request_id: u64,
339 },
340}
341
342/// A request from a client to a server.
343#[derive(Clone, Copy, Debug)]
344#[cfg_attr(feature = "serde1", derive(serde::Serialize, serde::Deserialize))]
345pub struct Request<T> {
346 /// Trace context, deadline, and other cross-cutting concerns.
347 pub context: context::Context,
348 /// Uniquely identifies the request across all requests sent over a single channel.
349 pub id: u64,
350 /// The request body.
351 pub message: T,
352}
353
354/// A response from a server to a client.
355#[derive(Clone, Debug, PartialEq, Eq, Hash)]
356#[non_exhaustive]
357#[cfg_attr(feature = "serde1", derive(serde::Serialize, serde::Deserialize))]
358pub struct Response<T> {
359 /// The ID of the request being responded to.
360 pub request_id: u64,
361 /// The response body, or an error if the request failed.
362 pub message: Result<T, ServerError>,
363}
364
365/// An error indicating the server aborted the request early, e.g., due to request throttling.
366#[derive(thiserror::Error, Clone, Debug, PartialEq, Eq, Hash)]
367#[error("{kind:?}: {detail}")]
368#[non_exhaustive]
369#[cfg_attr(feature = "serde1", derive(serde::Serialize, serde::Deserialize))]
370pub struct ServerError {
371 #[cfg_attr(
372 feature = "serde1",
373 serde(serialize_with = "util::serde::serialize_io_error_kind_as_u32")
374 )]
375 #[cfg_attr(
376 feature = "serde1",
377 serde(deserialize_with = "util::serde::deserialize_io_error_kind_from_u32")
378 )]
379 /// The type of error that occurred to fail the request.
380 pub kind: io::ErrorKind,
381 /// A message describing more detail about the error that occurred.
382 pub detail: String,
383}
384
385impl<T> Request<T> {
386 /// Returns the deadline for this request.
387 pub fn deadline(&self) -> &SystemTime {
388 &self.context.deadline
389 }
390}
391
392pub(crate) trait PollContext<T> {
393 fn context<C>(self, context: C) -> Poll<Option<anyhow::Result<T>>>
394 where
395 C: Display + Send + Sync + 'static;
396
397 fn with_context<C, F>(self, f: F) -> Poll<Option<anyhow::Result<T>>>
398 where
399 C: Display + Send + Sync + 'static,
400 F: FnOnce() -> C;
401}
402
403impl<T, E> PollContext<T> for Poll<Option<Result<T, E>>>
404where
405 E: Error + Send + Sync + 'static,
406{
407 fn context<C>(self, context: C) -> Poll<Option<anyhow::Result<T>>>
408 where
409 C: Display + Send + Sync + 'static,
410 {
411 self.map(|o| o.map(|r| r.context(context)))
412 }
413
414 fn with_context<C, F>(self, f: F) -> Poll<Option<anyhow::Result<T>>>
415 where
416 C: Display + Send + Sync + 'static,
417 F: FnOnce() -> C,
418 {
419 self.map(|o| o.map(|r| r.with_context(f)))
420 }
421}