tsoracle_server/server.rs
1use core::time::Duration;
2use parking_lot::Mutex;
3use std::future::Future;
4use std::net::SocketAddr;
5use std::sync::Arc;
6use tokio::sync::watch;
7use tonic::service::Routes;
8use tonic::transport::Server as TonicServer;
9use tsoracle_consensus::ConsensusDriver;
10use tsoracle_core::{Allocator, Clock, SystemClock};
11#[cfg(any(test, feature = "test-fakes"))]
12use tsoracle_core::{CoreError, WindowGrant};
13use tsoracle_proto::v1::tso_service_server::TsoServiceServer;
14
15use crate::service::TsoServiceImpl;
16
17#[derive(Debug, thiserror::Error)]
18pub enum BuildError {
19 #[error("consensus_driver is required")]
20 MissingConsensusDriver,
21 /// Surfaced when [`crate::leader_hint::KEY`] fails [`crate::leader_hint::validate_key`].
22 /// Today the key is a valid `const &'static str`, so this variant is
23 /// developer-error insurance: a future edit that breaks the key triggers
24 /// a startup failure rather than silently stripping the trailer from
25 /// every NOT_LEADER response.
26 #[error("invalid leader-hint metadata key: {0}")]
27 InvalidLeaderHintKey(#[from] tonic::metadata::errors::InvalidMetadataKey),
28}
29
30#[derive(Debug, thiserror::Error)]
31pub enum ServerError {
32 #[error("transport: {0}")]
33 Transport(#[from] tonic::transport::Error),
34 #[error("consensus: {0}")]
35 Consensus(#[from] tsoracle_consensus::ConsensusError),
36 #[error("core: {0}")]
37 Core(#[from] tsoracle_core::CoreError),
38 /// The leader-watch task panicked. Distinct from a clean error return so
39 /// operators can tell "driver returned Err" (recoverable design) from
40 /// "task panicked" (programming bug).
41 #[error("leader-watch task panicked: {payload}")]
42 WatchPanic { payload: String },
43}
44
45#[derive(Clone, Debug)]
46pub enum ServingState {
47 NotServing { leader_endpoint: Option<String> },
48 Serving,
49}
50
51pub struct ServerBuilder {
52 consensus: Option<Arc<dyn ConsensusDriver>>,
53 clock: Option<Arc<dyn Clock>>,
54 window_ahead: Duration,
55 failover_advance: Duration,
56}
57
58impl Default for ServerBuilder {
59 fn default() -> Self {
60 ServerBuilder {
61 consensus: None,
62 clock: None,
63 window_ahead: Duration::from_secs(3),
64 failover_advance: Duration::from_secs(1),
65 }
66 }
67}
68
69impl ServerBuilder {
70 pub fn consensus_driver(mut self, driver: Arc<dyn ConsensusDriver>) -> Self {
71 self.consensus = Some(driver);
72 self
73 }
74 pub fn clock(mut self, clock: Arc<dyn Clock>) -> Self {
75 self.clock = Some(clock);
76 self
77 }
78 pub fn window_ahead(mut self, window_ahead: Duration) -> Self {
79 self.window_ahead = window_ahead;
80 self
81 }
82 pub fn failover_advance(mut self, failover_advance: Duration) -> Self {
83 self.failover_advance = failover_advance;
84 self
85 }
86 pub fn build(self) -> Result<Server, BuildError> {
87 crate::leader_hint::validate_key()?;
88 let consensus = self.consensus.ok_or(BuildError::MissingConsensusDriver)?;
89 let clock = self.clock.unwrap_or_else(|| Arc::new(SystemClock));
90 let (state_tx, state_rx) = watch::channel(ServingState::NotServing {
91 leader_endpoint: None,
92 });
93 Ok(Server {
94 consensus,
95 clock,
96 window_ahead: self.window_ahead,
97 failover_advance: self.failover_advance,
98 allocator: Arc::new(Mutex::new(Allocator::new())),
99 state_tx,
100 state_rx,
101 extension_lock: Arc::new(tokio::sync::Mutex::new(())),
102 extension_gate: Arc::new(tokio::sync::RwLock::new(())),
103 })
104 }
105}
106
107pub struct Server {
108 pub(crate) consensus: Arc<dyn ConsensusDriver>,
109 pub(crate) clock: Arc<dyn Clock>,
110 pub(crate) window_ahead: Duration,
111 pub(crate) failover_advance: Duration,
112 pub(crate) allocator: Arc<Mutex<Allocator>>,
113 pub(crate) state_tx: watch::Sender<ServingState>,
114 pub state_rx: watch::Receiver<ServingState>,
115 /// Serializes window extensions so a stampeding burst of `WindowExhausted`
116 /// requests resolves to a single `persist_high_water` round-trip. Acquired
117 /// before `extension_gate`; combined with a recheck-after-acquire inside
118 /// `extend_window`, latecomers find the window already extended and
119 /// return without contacting consensus.
120 pub(crate) extension_lock: Arc<tokio::sync::Mutex<()>>,
121 /// Read-locked by window-extension calls for the duration of their
122 /// prepare → persist → commit dance. The leader-watch task takes a
123 /// write lock between clearing serving and calling load_high_water,
124 /// which drains all in-flight extensions started under the prior epoch
125 /// before the fence proceeds.
126 pub(crate) extension_gate: Arc<tokio::sync::RwLock<()>>,
127}
128
129impl Server {
130 pub fn builder() -> ServerBuilder {
131 ServerBuilder::default()
132 }
133
134 /// Single transition API used in response to evidence that the current
135 /// epoch is no longer valid: consensus rejection (NotLeader/Fenced
136 /// during persist), leader-watch task termination, or other authoritative
137 /// signals of leadership loss.
138 ///
139 /// Clears the allocator BEFORE publishing `NotServing` so a racing
140 /// `try_grant` either (a) observes `CoreError::NotLeader` rather than
141 /// handing out a stale-epoch grant, or (b) succeeds against a still-valid
142 /// epoch and the *next* call observes `NotServing`. Either ordering is
143 /// safe; what is not safe is the inverse (publish first, clear later)
144 /// because a request between those two steps would see `Serving` AND a
145 /// still-leader allocator.
146 ///
147 /// Does NOT take `extension_gate.write()`. That would deadlock against
148 /// in-flight extensions currently holding the read lock and awaiting
149 /// `persist_high_water`. Those in-flights will either complete cleanly
150 /// (the next `try_grant` then sees `NotServing`) or fail with
151 /// NotLeader/Fenced (and reach this helper themselves — it is idempotent).
152 pub(crate) fn step_down_due_to_consensus_rejection(&self, leader_endpoint: Option<String>) {
153 self.allocator.lock().on_leadership_lost();
154 let _ = self
155 .state_tx
156 .send(ServingState::NotServing { leader_endpoint });
157 }
158}
159
160impl Server {
161 /// Return the configured `TsoServiceServer<TsoServiceImpl>` as a tonic
162 /// `Routes` value plus a `JoinHandle` for the spawned leader-watch task,
163 /// so callers can mount tsoracle's service alongside their own services
164 /// on a shared tonic listener instead of binding a dedicated port.
165 ///
166 /// The `JoinHandle` payload is `Result<(), ServerError>` so embedders
167 /// can observe leader-watch termination. Before returning an error, the
168 /// task publishes `ServingState::NotServing { leader_endpoint: None }`
169 /// so all subsequent RPCs fail fast with `FAILED_PRECONDITION` — even
170 /// embedders who never inspect the handle get fail-safe behavior.
171 ///
172 /// The `Server::serve()` method is a thin wrapper over this — it calls
173 /// `into_router`, builds a tonic `Server`, and binds a listener.
174 pub fn into_router(self) -> (Routes, tokio::task::JoinHandle<Result<(), ServerError>>) {
175 let server = Arc::new(self);
176
177 let watch_server = server.clone();
178 let watch_handle = tokio::spawn(async move {
179 use futures::FutureExt;
180 // catch_unwind so a panic in run_leader_watch still routes through
181 // the poisoning path. Without this, embedders who mount into_router
182 // directly and never observe the JoinHandle would see
183 // ServingState::Serving remain published while the watch task is
184 // dead — the inverse of the fail-safe guarantee documented above.
185 // The panic is re-raised after poisoning so serve / serve_with_*
186 // continue to translate it into ServerError::WatchPanic via
187 // join_to_server_result.
188 let outcome =
189 std::panic::AssertUnwindSafe(crate::fence::run_leader_watch(watch_server.clone()))
190 .catch_unwind()
191 .await;
192 match outcome {
193 Ok(result) => {
194 if let Err(ref _e) = result {
195 // Poison BEFORE returning so embedders who do not observe
196 // the JoinHandle still get fail-safe behavior.
197 watch_server.step_down_due_to_consensus_rejection(None);
198 #[cfg(feature = "tracing")]
199 tracing::error!(error = %_e, "leader-watch terminated; serving disabled");
200 }
201 result
202 }
203 Err(panic_payload) => {
204 // Mirror the Err branch: poison BEFORE re-raising so
205 // handle-dropping embedders still observe NotServing.
206 watch_server.step_down_due_to_consensus_rejection(None);
207 #[cfg(feature = "tracing")]
208 tracing::error!("leader-watch panicked; serving disabled");
209 std::panic::resume_unwind(panic_payload);
210 }
211 }
212 });
213
214 let service = TsoServiceImpl { server };
215 #[allow(unused_mut)]
216 let mut routes = Routes::new(TsoServiceServer::new(service));
217 #[cfg(feature = "reflection")]
218 {
219 #[expect(
220 clippy::expect_used,
221 reason = "`FILE_DESCRIPTOR_SET` is generated by `tsoracle-proto`'s `build.rs` from checked-in `.proto` sources; if it ever fails to decode, the build itself is broken. Tracked by #9."
222 )]
223 let reflection = tonic_reflection::server::Builder::configure()
224 .register_encoded_file_descriptor_set(tsoracle_proto::FILE_DESCRIPTOR_SET)
225 .build_v1()
226 .expect("FILE_DESCRIPTOR_SET emitted by build.rs is always valid");
227 routes = routes.add_service(reflection);
228 }
229 (routes, watch_handle)
230 }
231
232 pub async fn serve(self, addr: SocketAddr) -> Result<(), ServerError> {
233 self.serve_with_shutdown(addr, futures::future::pending())
234 .await
235 }
236
237 /// Run the gRPC server until either the caller's `shutdown` fires or the
238 /// leader-watch task terminates.
239 ///
240 /// Three outcomes:
241 /// 1. `shutdown` fires first → tonic drains in-flights and returns Ok.
242 /// The watch handle is aborted; any error it had been about to return
243 /// is forfeited (the process is shutting down anyway).
244 /// 2. Watch returns `Ok(Err(e))` → poisoned state is already published;
245 /// `cancel_tx` triggers tonic's graceful shutdown; in-flight `GetTs`
246 /// calls whose `try_grant` already succeeded complete with the
247 /// timestamps they were allocated; new calls fail fast. Returns `Err(e)`.
248 /// 3. Watch task panics → returns `Err(ServerError::WatchPanic{..})`
249 /// with the panic payload stringified. Same drain semantics as (2).
250 pub async fn serve_with_shutdown(
251 self,
252 addr: SocketAddr,
253 shutdown: impl Future<Output = ()> + Send + 'static,
254 ) -> Result<(), ServerError> {
255 let (routes, mut watch_handle) = self.into_router();
256 let (cancel_tx, cancel_rx) = tokio::sync::oneshot::channel::<()>();
257
258 // tonic stops when EITHER the user's shutdown fires OR we cancel
259 // because the watch task terminated.
260 let combined_shutdown = async move {
261 tokio::select! {
262 _ = shutdown => {}
263 _ = cancel_rx => {}
264 }
265 };
266
267 let serve = TonicServer::builder()
268 .add_routes(routes)
269 .serve_with_shutdown(addr, combined_shutdown);
270 tokio::pin!(serve);
271
272 tokio::select! {
273 // Bias toward the watch arm: if both are ready in the same poll
274 // (rare but possible — graceful shutdown completed in the same
275 // tick the watch returned), we want to surface the watch error
276 // rather than report a clean shutdown.
277 biased;
278
279 watch_result = &mut watch_handle => {
280 // Watch terminated. State is already poisoned (see watch
281 // task body in into_router). Trigger tonic drain and wait
282 // for it to finish, then report the watch's outcome.
283 let _ = cancel_tx.send(());
284 let _ = serve.await;
285 join_to_server_result(watch_result)
286 }
287 serve_result = &mut serve => {
288 // User shutdown fired (or our cancel — but watch arm has
289 // `biased` priority, so reaching here means user shutdown).
290 // The watch task may still be running; aborting it loses
291 // any error it was about to report, but the process is
292 // shutting down so that's acceptable.
293 watch_handle.abort();
294 serve_result?;
295 Ok(())
296 }
297 }
298 }
299
300 /// Run the gRPC server on a caller-provided `TcpListener` until either
301 /// the caller-provided `shutdown` fires or the leader-watch task terminates.
302 ///
303 /// Use this instead of [`Self::serve_with_shutdown`] when you need to
304 /// observe the OS-picked port (`127.0.0.1:0`) before clients connect, or
305 /// when you want to wrap the listener in an outer adapter before passing it
306 /// in. The listening socket is owned by the caller and passed here; tsoracle
307 /// starts accepting on it immediately.
308 ///
309 /// Three outcomes:
310 /// 1. `shutdown` fires first → tonic drains in-flights and returns `Ok`.
311 /// The watch handle is aborted; any error it had been about to return
312 /// is forfeited (the process is shutting down anyway).
313 /// 2. Watch returns `Ok(Err(e))` → poisoned state is already published;
314 /// the caller-provided shutdown is cancelled internally so tonic begins
315 /// graceful shutdown; in-flight `GetTs` calls whose `try_grant` already
316 /// succeeded complete with the timestamps they were allocated; new calls
317 /// fail fast. Returns `Err(e)`.
318 /// 3. Watch task panics → returns `Err(ServerError::WatchPanic{..})`
319 /// with the panic payload stringified. Same drain semantics as (2).
320 pub async fn serve_with_listener(
321 self,
322 listener: tokio::net::TcpListener,
323 shutdown: impl Future<Output = ()> + Send + 'static,
324 ) -> Result<(), ServerError> {
325 let (routes, mut watch_handle) = self.into_router();
326 let (cancel_tx, cancel_rx) = tokio::sync::oneshot::channel::<()>();
327
328 let combined_shutdown = async move {
329 tokio::select! {
330 _ = shutdown => {}
331 _ = cancel_rx => {}
332 }
333 };
334
335 let incoming = tonic::transport::server::TcpIncoming::from(listener);
336
337 let serve = TonicServer::builder()
338 .add_routes(routes)
339 .serve_with_incoming_shutdown(incoming, combined_shutdown);
340 tokio::pin!(serve);
341
342 tokio::select! {
343 biased;
344
345 watch_result = &mut watch_handle => {
346 let _ = cancel_tx.send(());
347 let _ = serve.await;
348 join_to_server_result(watch_result)
349 }
350 serve_result = &mut serve => {
351 watch_handle.abort();
352 serve_result?;
353 Ok(())
354 }
355 }
356 }
357}
358
359/// Convert a `JoinHandle` result into a `ServerError`-typed result.
360///
361/// - `Ok(Ok(()))` — task ended cleanly (driver stream closed). Caller decides
362/// whether this is normal (shutdown) or anomalous.
363/// - `Ok(Err(e))` — task returned an error. Forward verbatim.
364/// - `Err(JoinError)` — task was cancelled or panicked. Cancellation maps to
365/// Ok (we asked for it); panic maps to `WatchPanic` with payload.
366fn join_to_server_result(
367 join_result: Result<Result<(), ServerError>, tokio::task::JoinError>,
368) -> Result<(), ServerError> {
369 match join_result {
370 Ok(inner) => inner,
371 Err(join_err) if join_err.is_panic() => {
372 let payload = panic_payload_to_string(join_err.into_panic());
373 Err(ServerError::WatchPanic { payload })
374 }
375 Err(_cancelled) => Ok(()),
376 }
377}
378
379fn panic_payload_to_string(panic: Box<dyn std::any::Any + Send>) -> String {
380 if let Some(text) = panic.downcast_ref::<&'static str>() {
381 (*text).to_string()
382 } else if let Some(text) = panic.downcast_ref::<String>() {
383 text.clone()
384 } else {
385 "watch task panicked with non-string payload".to_string()
386 }
387}
388
389#[cfg(any(test, feature = "test-fakes"))]
390impl Server {
391 /// Test-only entry point for the leader-watch loop. Exposed to integration
392 /// tests via the `test-fakes` feature; not part of the stable public API.
393 #[doc(hidden)]
394 pub async fn run_leader_watch_for_tests(self: Arc<Self>) -> Result<(), ServerError> {
395 crate::fence::run_leader_watch(self).await
396 }
397
398 /// Test-only allocator probe. Issues a window grant against the current
399 /// in-memory state without going through the gRPC service. Used by
400 /// regression tests that need to observe the behavioral fence (no
401 /// timestamp at or below the prior leader's high-water) directly.
402 #[doc(hidden)]
403 pub fn try_grant_for_tests(&self, count: u32) -> Result<WindowGrant, CoreError> {
404 self.allocator.lock().try_grant(self.clock.now_ms(), count)
405 }
406}