restate_sdk/http_server.rs
1//! # Serving
2//! Restate services run as an HTTP endpoint.
3//!
4//! ## Creating an HTTP endpoint
5//! 1. Create the endpoint
6//! 2. Bind one or multiple services to it.
7//! 3. Listen on the specified port (default `9080`) for connections and requests.
8//!
9//! ```rust,no_run
10//! # #[path = "../examples/services/mod.rs"]
11//! # mod services;
12//! # use services::my_service::{MyService, MyServiceImpl};
13//! # use services::my_virtual_object::{MyVirtualObject, MyVirtualObjectImpl};
14//! # use services::my_workflow::{MyWorkflow, MyWorkflowImpl};
15//! use restate_sdk::endpoint::Endpoint;
16//! use restate_sdk::http_server::HttpServer;
17//!
18//! #[tokio::main]
19//! async fn main() {
20//! HttpServer::new(
21//! Endpoint::builder()
22//! .bind(MyServiceImpl.serve())
23//! .bind(MyVirtualObjectImpl.serve())
24//! .bind(MyWorkflowImpl.serve())
25//! .build(),
26//! )
27//! .listen_and_serve("0.0.0.0:9080".parse().unwrap())
28//! .await;
29//! }
30//! ```
31//!
32//!
33//! ## Validating request identity
34//!
35//! SDKs can validate that incoming requests come from a particular Restate
36//! instance. You can find out more about request identity in the [Security docs](https://docs.restate.dev/operate/security#locking-down-service-access).
37//! Add the identity key to your endpoint as follows:
38//!
39//! ```rust,no_run
40//! # #[path = "../examples/services/mod.rs"]
41//! # mod services;
42//! # use services::my_service::{MyService, MyServiceImpl};
43//! # use restate_sdk::endpoint::Endpoint;
44//! # use restate_sdk::http_server::HttpServer;
45//! #
46//! # #[tokio::main]
47//! # async fn main() {
48//! HttpServer::new(
49//! Endpoint::builder()
50//! .bind(MyServiceImpl.serve())
51//! .identity_key("publickeyv1_w7YHemBctH5Ck2nQRQ47iBBqhNHy4FV7t2Usbye2A6f")
52//! .unwrap()
53//! .build(),
54//! )
55//! .listen_and_serve("0.0.0.0:9080".parse().unwrap())
56//! .await;
57//! # }
58//! ```
59
60use crate::endpoint::Endpoint;
61use crate::hyper::HyperEndpoint;
62use futures::FutureExt;
63use hyper::server::conn::http2;
64use hyper_util::rt::{TokioExecutor, TokioIo};
65use std::future::Future;
66use std::net::SocketAddr;
67use std::time::Duration;
68use tokio::net::TcpListener;
69use tracing::{info, warn};
70
71/// Http server to expose your Restate services.
72pub struct HttpServer {
73 endpoint: Endpoint,
74}
75
76impl From<Endpoint> for HttpServer {
77 fn from(endpoint: Endpoint) -> Self {
78 Self { endpoint }
79 }
80}
81
82impl HttpServer {
83 /// Create new [`HttpServer`] from an [`Endpoint`].
84 pub fn new(endpoint: Endpoint) -> Self {
85 Self { endpoint }
86 }
87
88 /// Listen on the given address and serve.
89 ///
90 /// The future will be completed once `SIGTERM` is sent to the process.
91 pub async fn listen_and_serve(self, addr: SocketAddr) {
92 let listener = TcpListener::bind(addr).await.expect("listener can bind");
93 self.serve(listener).await;
94 }
95
96 /// Serve on the given listener.
97 ///
98 /// The future will be completed once `SIGTERM` is sent to the process.
99 pub async fn serve(self, listener: TcpListener) {
100 self.serve_with_cancel(listener, tokio::signal::ctrl_c().map(|_| ()))
101 .await;
102 }
103
104 /// Serve on the given listener, and cancel the execution with the given future.
105 pub async fn serve_with_cancel(self, listener: TcpListener, cancel_signal_future: impl Future) {
106 let endpoint = HyperEndpoint::new(self.endpoint);
107 let graceful = hyper_util::server::graceful::GracefulShutdown::new();
108
109 // when this signal completes, start shutdown
110 let mut signal = std::pin::pin!(cancel_signal_future);
111
112 info!("Starting listening on {}", listener.local_addr().unwrap());
113
114 // Our server accept loop
115 loop {
116 tokio::select! {
117 Ok((stream, remote)) = listener.accept() => {
118 let endpoint = endpoint.clone();
119
120 let conn = http2::Builder::new(TokioExecutor::default())
121 .serve_connection(TokioIo::new(stream), endpoint);
122
123 let fut = graceful.watch(conn);
124
125 tokio::spawn(async move {
126 if let Err(e) = fut.await {
127 warn!("Error serving connection {remote}: {:?}", e);
128 }
129 });
130 },
131 _ = &mut signal => {
132 info!("Shutting down");
133 // stop the accept loop
134 break;
135 }
136 }
137 }
138
139 // Wait graceful shutdown
140 tokio::select! {
141 _ = graceful.shutdown() => {},
142 _ = tokio::time::sleep(Duration::from_secs(10)) => {
143 warn!("Timed out waiting for all connections to close");
144 }
145 }
146 }
147}