restate_sdk/
http_server.rs

1//! # Serving
2//! Restate services run as an HTTP endpoint.
3//!
4//! ## Creating an HTTP endpoint
5//! 1. Create the endpoint
6//! 2. Bind one or multiple services to it.
7//! 3. Listen on the specified port (default `9080`) for connections and requests.
8//!
9//! ```rust,no_run
10//! # #[path = "../examples/services/mod.rs"]
11//! # mod services;
12//! # use services::my_service::{MyService, MyServiceImpl};
13//! # use services::my_virtual_object::{MyVirtualObject, MyVirtualObjectImpl};
14//! # use services::my_workflow::{MyWorkflow, MyWorkflowImpl};
15//! use restate_sdk::endpoint::Endpoint;
16//! use restate_sdk::http_server::HttpServer;
17//!
18//! #[tokio::main]
19//! async fn main() {
20//!     HttpServer::new(
21//!         Endpoint::builder()
22//!             .bind(MyServiceImpl.serve())
23//!             .bind(MyVirtualObjectImpl.serve())
24//!             .bind(MyWorkflowImpl.serve())
25//!             .build(),
26//!     )
27//!     .listen_and_serve("0.0.0.0:9080".parse().unwrap())
28//!     .await;
29//! }
30//! ```
31//!
32//!
33//! ## Validating request identity
34//!
35//! SDKs can validate that incoming requests come from a particular Restate
36//! instance. You can find out more about request identity in the [Security docs](https://docs.restate.dev/operate/security#locking-down-service-access).
37//! Add the identity key to your endpoint as follows:
38//!
39//! ```rust,no_run
40//! # #[path = "../examples/services/mod.rs"]
41//! # mod services;
42//! # use services::my_service::{MyService, MyServiceImpl};
43//! # use restate_sdk::endpoint::Endpoint;
44//! # use restate_sdk::http_server::HttpServer;
45//! #
46//! # #[tokio::main]
47//! # async fn main() {
48//!     HttpServer::new(
49//!         Endpoint::builder()
50//!             .bind(MyServiceImpl.serve())
51//!             .identity_key("publickeyv1_w7YHemBctH5Ck2nQRQ47iBBqhNHy4FV7t2Usbye2A6f")
52//!             .unwrap()
53//!             .build(),
54//!     )
55//!     .listen_and_serve("0.0.0.0:9080".parse().unwrap())
56//!     .await;
57//! # }
58//! ```
59
60use crate::endpoint::Endpoint;
61use crate::hyper::HyperEndpoint;
62use futures::FutureExt;
63use hyper::server::conn::http2;
64use hyper_util::rt::{TokioExecutor, TokioIo};
65use std::future::Future;
66use std::net::SocketAddr;
67use std::time::Duration;
68use tokio::net::TcpListener;
69use tracing::{info, warn};
70
71/// Http server to expose your Restate services.
72pub struct HttpServer {
73    endpoint: Endpoint,
74}
75
76impl From<Endpoint> for HttpServer {
77    fn from(endpoint: Endpoint) -> Self {
78        Self { endpoint }
79    }
80}
81
82impl HttpServer {
83    /// Create new [`HttpServer`] from an [`Endpoint`].
84    pub fn new(endpoint: Endpoint) -> Self {
85        Self { endpoint }
86    }
87
88    /// Listen on the given address and serve.
89    ///
90    /// The future will be completed once `SIGTERM` is sent to the process.
91    pub async fn listen_and_serve(self, addr: SocketAddr) {
92        let listener = TcpListener::bind(addr).await.expect("listener can bind");
93        self.serve(listener).await;
94    }
95
96    /// Serve on the given listener.
97    ///
98    /// The future will be completed once `SIGTERM` is sent to the process.
99    pub async fn serve(self, listener: TcpListener) {
100        self.serve_with_cancel(listener, tokio::signal::ctrl_c().map(|_| ()))
101            .await;
102    }
103
104    /// Serve on the given listener, and cancel the execution with the given future.
105    pub async fn serve_with_cancel(self, listener: TcpListener, cancel_signal_future: impl Future) {
106        let endpoint = HyperEndpoint::new(self.endpoint);
107        let graceful = hyper_util::server::graceful::GracefulShutdown::new();
108
109        // when this signal completes, start shutdown
110        let mut signal = std::pin::pin!(cancel_signal_future);
111
112        info!("Starting listening on {}", listener.local_addr().unwrap());
113
114        // Our server accept loop
115        loop {
116            tokio::select! {
117                Ok((stream, remote)) = listener.accept() => {
118                    let endpoint = endpoint.clone();
119
120                    let conn = http2::Builder::new(TokioExecutor::default())
121                        .serve_connection(TokioIo::new(stream), endpoint);
122
123                    let fut = graceful.watch(conn);
124
125                    tokio::spawn(async move {
126                        if let Err(e) = fut.await {
127                            warn!("Error serving connection {remote}: {:?}", e);
128                        }
129                    });
130                },
131                _ = &mut signal => {
132                    info!("Shutting down");
133                    // stop the accept loop
134                    break;
135                }
136            }
137        }
138
139        // Wait graceful shutdown
140        tokio::select! {
141            _ = graceful.shutdown() => {},
142            _ = tokio::time::sleep(Duration::from_secs(10)) => {
143                warn!("Timed out waiting for all connections to close");
144            }
145        }
146    }
147}