tsoracle_server/server.rs
1//
2// ░▀█▀░█▀▀░█▀█░█▀▄░█▀█░█▀▀░█░░░█▀▀
3// ░░█░░▀▀█░█░█░█▀▄░█▀█░█░░░█░░░█▀▀
4// ░░▀░░▀▀▀░▀▀▀░▀░▀░▀░▀░▀▀▀░▀▀▀░▀▀▀
5//
6// tsoracle — Distributed Timestamp Oracle
7//
8// Copyright (c) 2026 Prisma Risk
9// Licensed under the Apache License, Version 2.0
10// https://github.com/prisma-risk/tsoracle
11//
12
13use core::time::Duration;
14use parking_lot::Mutex;
15use std::future::Future;
16use std::net::SocketAddr;
17use std::sync::Arc;
18use tokio::sync::watch;
19use tonic::service::Routes;
20use tonic::transport::Server as TonicServer;
21use tsoracle_consensus::ConsensusDriver;
22use tsoracle_core::{Allocator, Bt, Clock, SystemClock};
23#[cfg(any(test, feature = "test-fakes"))]
24use tsoracle_core::{CoreError, WindowGrant};
25use tsoracle_proto::v1::tso_service_server::TsoServiceServer;
26
27use crate::service::TsoServiceImpl;
28
29#[derive(Debug, thiserror::Error)]
30pub enum BuildError {
31 #[error("consensus_driver is required")]
32 MissingConsensusDriver,
33 /// Surfaced when [`crate::leader_hint::KEY`] fails [`crate::leader_hint::validate_key`].
34 /// Today the key is a valid `const &'static str`, so this variant is
35 /// developer-error insurance: a future edit that breaks the key triggers
36 /// a startup failure rather than silently stripping the trailer from
37 /// every NOT_LEADER response.
38 #[error("invalid leader-hint metadata key: {0}")]
39 InvalidLeaderHintKey(#[from] tonic::metadata::errors::InvalidMetadataKey),
40}
41
42#[derive(Debug, thiserror::Error)]
43pub enum ServerError {
44 #[error("transport: {0}")]
45 Transport(#[from] tonic::transport::Error),
46 #[error("consensus: {0}")]
47 Consensus(#[from] tsoracle_consensus::ConsensusError),
48 #[error("core: {0}")]
49 Core(#[from] tsoracle_core::CoreError),
50 /// The leader-watch task panicked. Distinct from a clean error return so
51 /// operators can tell "driver returned Err" (recoverable design) from
52 /// "task panicked" (programming bug).
53 #[error("leader-watch task panicked: {payload}{bt}")]
54 WatchPanic { payload: String, bt: Bt },
55 /// The consensus driver's `leadership_events()` stream ended cleanly while
56 /// the leader-watch task was running. The stream is contracted to live for
57 /// the life of the server, so its end is anomalous (driver shutdown, lost
58 /// session, etc.) — distinct from a `Consensus` error returned mid-fence.
59 /// The watch task publishes `ServingState::NotServing` before returning
60 /// this variant so embedders who never observe the `JoinHandle` still get
61 /// the documented fail-safe behavior.
62 #[error("consensus driver leadership stream closed")]
63 WatchStreamClosed,
64}
65
66#[derive(Clone, Debug)]
67pub enum ServingState {
68 NotServing { leader_endpoint: Option<String> },
69 Serving,
70}
71
72pub struct ServerBuilder {
73 consensus: Option<Arc<dyn ConsensusDriver>>,
74 clock: Option<Arc<dyn Clock>>,
75 window_ahead: Duration,
76 failover_advance: Duration,
77 #[cfg(any(feature = "tls-rustls", feature = "tls-native"))]
78 tls_config: Option<tonic::transport::ServerTlsConfig>,
79}
80
81impl Default for ServerBuilder {
82 fn default() -> Self {
83 ServerBuilder {
84 consensus: None,
85 clock: None,
86 window_ahead: Duration::from_secs(3),
87 failover_advance: Duration::from_secs(1),
88 #[cfg(any(feature = "tls-rustls", feature = "tls-native"))]
89 tls_config: None,
90 }
91 }
92}
93
94impl ServerBuilder {
95 pub fn consensus_driver(mut self, driver: Arc<dyn ConsensusDriver>) -> Self {
96 self.consensus = Some(driver);
97 self
98 }
99 pub fn clock(mut self, clock: Arc<dyn Clock>) -> Self {
100 self.clock = Some(clock);
101 self
102 }
103 pub fn window_ahead(mut self, window_ahead: Duration) -> Self {
104 self.window_ahead = window_ahead;
105 self
106 }
107 pub fn failover_advance(mut self, failover_advance: Duration) -> Self {
108 self.failover_advance = failover_advance;
109 self
110 }
111
112 /// Configure TLS termination for this server. Applied inside
113 /// [`Server::serve`], [`Server::serve_with_shutdown`], and
114 /// [`Server::serve_with_listener`]. Not applied to [`Server::into_router`] —
115 /// embedders mounting tsoracle alongside their own services control TLS
116 /// on their own tonic builder.
117 #[cfg(any(feature = "tls-rustls", feature = "tls-native"))]
118 pub fn tls_config(mut self, cfg: tonic::transport::ServerTlsConfig) -> Self {
119 self.tls_config = Some(cfg);
120 self
121 }
122
123 pub fn build(self) -> Result<Server, BuildError> {
124 crate::leader_hint::validate_key()?;
125 let consensus = self.consensus.ok_or(BuildError::MissingConsensusDriver)?;
126 let clock = self.clock.unwrap_or_else(|| Arc::new(SystemClock));
127 let (state_tx, state_rx) = watch::channel(ServingState::NotServing {
128 leader_endpoint: None,
129 });
130 Ok(Server {
131 consensus,
132 clock,
133 window_ahead: self.window_ahead,
134 failover_advance: self.failover_advance,
135 allocator: Arc::new(Mutex::new(Allocator::new())),
136 state_tx,
137 state_rx,
138 extension_lock: Arc::new(tokio::sync::Mutex::new(())),
139 extension_gate: Arc::new(tokio::sync::RwLock::new(())),
140 #[cfg(any(feature = "tls-rustls", feature = "tls-native"))]
141 tls_config: self.tls_config,
142 })
143 }
144}
145
146pub struct Server {
147 pub(crate) consensus: Arc<dyn ConsensusDriver>,
148 pub(crate) clock: Arc<dyn Clock>,
149 pub(crate) window_ahead: Duration,
150 pub(crate) failover_advance: Duration,
151 pub(crate) allocator: Arc<Mutex<Allocator>>,
152 pub(crate) state_tx: watch::Sender<ServingState>,
153 pub state_rx: watch::Receiver<ServingState>,
154 /// Serializes window extensions so a stampeding burst of `WindowExhausted`
155 /// requests resolves to a single `persist_high_water` round-trip. Acquired
156 /// before `extension_gate`; combined with a recheck-after-acquire inside
157 /// `extend_window`, latecomers find the window already extended and
158 /// return without contacting consensus.
159 pub(crate) extension_lock: Arc<tokio::sync::Mutex<()>>,
160 /// Read-locked by window-extension calls for the duration of their
161 /// prepare → persist → commit dance. The leader-watch task takes a
162 /// write lock between clearing serving and calling load_high_water,
163 /// which drains all in-flight extensions started under the prior epoch
164 /// before the fence proceeds.
165 pub(crate) extension_gate: Arc<tokio::sync::RwLock<()>>,
166 #[cfg(any(feature = "tls-rustls", feature = "tls-native"))]
167 pub(crate) tls_config: Option<tonic::transport::ServerTlsConfig>,
168}
169
170impl Server {
171 pub fn builder() -> ServerBuilder {
172 ServerBuilder::default()
173 }
174
175 /// Single transition API used in response to evidence that the current
176 /// epoch is no longer valid: consensus rejection (NotLeader/Fenced
177 /// during persist), leader-watch task termination, or other authoritative
178 /// signals of leadership loss.
179 ///
180 /// Clears the allocator BEFORE publishing `NotServing` so a racing
181 /// `try_grant` either (a) observes `CoreError::NotLeader` rather than
182 /// handing out a stale-epoch grant, or (b) succeeds against a still-valid
183 /// epoch and the *next* call observes `NotServing`. Either ordering is
184 /// safe; what is not safe is the inverse (publish first, clear later)
185 /// because a request between those two steps would see `Serving` AND a
186 /// still-leader allocator.
187 ///
188 /// Does NOT take `extension_gate.write()`. That would deadlock against
189 /// in-flight extensions currently holding the read lock and awaiting
190 /// `persist_high_water`. Those in-flights will either complete cleanly
191 /// (the next `try_grant` then sees `NotServing`) or fail with
192 /// NotLeader/Fenced (and reach this helper themselves — it is idempotent).
193 pub(crate) fn step_down_due_to_consensus_rejection(&self, leader_endpoint: Option<String>) {
194 self.allocator.lock().on_leadership_lost();
195 let _ = self
196 .state_tx
197 .send(ServingState::NotServing { leader_endpoint });
198 }
199}
200
201impl Server {
202 /// Return the configured `TsoServiceServer<TsoServiceImpl>` as a tonic
203 /// `Routes` value plus a `JoinHandle` for the spawned leader-watch task,
204 /// so callers can mount tsoracle's service alongside their own services
205 /// on a shared tonic listener instead of binding a dedicated port.
206 ///
207 /// The `JoinHandle` payload is `Result<(), ServerError>` so embedders
208 /// can observe leader-watch termination. The task never returns
209 /// `Ok(())`: every termination — driver error, panic, or clean EOF on
210 /// the leadership stream (surfaced as `ServerError::WatchStreamClosed`)
211 /// — publishes `ServingState::NotServing { leader_endpoint: None }`
212 /// before returning, so all subsequent RPCs fail fast with
213 /// `FAILED_PRECONDITION`. Embedders who never inspect the handle still
214 /// get fail-safe behavior.
215 ///
216 /// The `Server::serve()` method is a thin wrapper over this — it calls
217 /// `into_router`, builds a tonic `Server`, and binds a listener.
218 pub fn into_router(self) -> (Routes, tokio::task::JoinHandle<Result<(), ServerError>>) {
219 let server = Arc::new(self);
220
221 let watch_server = server.clone();
222 let watch_handle = tokio::spawn(async move {
223 use futures::FutureExt;
224 // catch_unwind so a panic in run_leader_watch still routes through
225 // the poisoning path. Without this, embedders who mount into_router
226 // directly and never observe the JoinHandle would see
227 // ServingState::Serving remain published while the watch task is
228 // dead — the inverse of the fail-safe guarantee documented above.
229 // The panic is re-raised after poisoning so serve / serve_with_*
230 // continue to translate it into ServerError::WatchPanic via
231 // join_to_server_result.
232 let outcome =
233 std::panic::AssertUnwindSafe(crate::fence::run_leader_watch(watch_server.clone()))
234 .catch_unwind()
235 .await;
236 match outcome {
237 Ok(result) => {
238 if let Err(ref _e) = result {
239 // Poison BEFORE returning so embedders who do not observe
240 // the JoinHandle still get fail-safe behavior.
241 watch_server.step_down_due_to_consensus_rejection(None);
242 #[cfg(feature = "tracing")]
243 tracing::error!(error = %_e, "leader-watch terminated; serving disabled");
244 }
245 result
246 }
247 Err(panic_payload) => {
248 // Mirror the Err branch: poison BEFORE re-raising so
249 // handle-dropping embedders still observe NotServing.
250 watch_server.step_down_due_to_consensus_rejection(None);
251 #[cfg(feature = "tracing")]
252 tracing::error!("leader-watch panicked; serving disabled");
253 std::panic::resume_unwind(panic_payload);
254 }
255 }
256 });
257
258 let service = TsoServiceImpl { server };
259 #[allow(unused_mut)]
260 let mut routes = Routes::new(TsoServiceServer::new(service));
261 #[cfg(feature = "reflection")]
262 {
263 #[expect(
264 clippy::expect_used,
265 reason = "`FILE_DESCRIPTOR_SET` is generated by `tsoracle-proto`'s `build.rs` from checked-in `.proto` sources; if it ever fails to decode, the build itself is broken. Tracked by #9."
266 )]
267 let reflection = tonic_reflection::server::Builder::configure()
268 .register_encoded_file_descriptor_set(tsoracle_proto::FILE_DESCRIPTOR_SET)
269 .build_v1()
270 .expect("FILE_DESCRIPTOR_SET emitted by build.rs is always valid");
271 routes = routes.add_service(reflection);
272 }
273 (routes, watch_handle)
274 }
275
276 pub async fn serve(self, addr: SocketAddr) -> Result<(), ServerError> {
277 self.serve_with_shutdown(addr, futures::future::pending())
278 .await
279 }
280
281 /// Run the gRPC server until either the caller's `shutdown` fires or the
282 /// leader-watch task terminates.
283 ///
284 /// Three outcomes:
285 /// 1. `shutdown` fires first → tonic drains in-flights and returns Ok.
286 /// The watch handle is aborted; any error it had been about to return
287 /// is forfeited (the process is shutting down anyway).
288 /// 2. Watch returns `Ok(Err(e))` → poisoned state is already published;
289 /// `cancel_tx` triggers tonic's graceful shutdown; in-flight `GetTs`
290 /// calls whose `try_grant` already succeeded complete with the
291 /// timestamps they were allocated; new calls fail fast. Returns `Err(e)`.
292 /// 3. Watch task panics → returns `Err(ServerError::WatchPanic{..})`
293 /// with the panic payload stringified. Same drain semantics as (2).
294 pub async fn serve_with_shutdown(
295 self,
296 addr: SocketAddr,
297 shutdown: impl Future<Output = ()> + Send + 'static,
298 ) -> Result<(), ServerError> {
299 #[cfg(any(feature = "tls-rustls", feature = "tls-native"))]
300 let tls_config = self.tls_config.clone();
301
302 let (routes, mut watch_handle) = self.into_router();
303 let (cancel_tx, cancel_rx) = tokio::sync::oneshot::channel::<()>();
304
305 // tonic stops when EITHER the user's shutdown fires OR we cancel
306 // because the watch task terminated.
307 let combined_shutdown = async move {
308 tokio::select! {
309 _ = shutdown => {}
310 _ = cancel_rx => {}
311 }
312 };
313
314 let mut tonic = TonicServer::builder();
315 #[cfg(any(feature = "tls-rustls", feature = "tls-native"))]
316 if let Some(cfg) = tls_config {
317 tonic = tonic.tls_config(cfg).map_err(ServerError::Transport)?;
318 }
319 let serve = tonic
320 .add_routes(routes)
321 .serve_with_shutdown(addr, combined_shutdown);
322 tokio::pin!(serve);
323
324 tokio::select! {
325 // Bias toward the watch arm: if both are ready in the same poll
326 // (rare but possible — graceful shutdown completed in the same
327 // tick the watch returned), we want to surface the watch error
328 // rather than report a clean shutdown.
329 biased;
330
331 watch_result = &mut watch_handle => {
332 // Watch terminated. State is already poisoned (see watch
333 // task body in into_router). Trigger tonic drain and wait
334 // for it to finish, then report the watch's outcome.
335 let _ = cancel_tx.send(());
336 let _ = serve.await;
337 join_to_server_result(watch_result)
338 }
339 serve_result = &mut serve => {
340 // User shutdown fired (or our cancel — but watch arm has
341 // `biased` priority, so reaching here means user shutdown).
342 // The watch task may still be running; aborting it loses
343 // any error it was about to report, but the process is
344 // shutting down so that's acceptable.
345 watch_handle.abort();
346 serve_result?;
347 Ok(())
348 }
349 }
350 }
351
352 /// Run the gRPC server on a caller-provided `TcpListener` until either
353 /// the caller-provided `shutdown` fires or the leader-watch task terminates.
354 ///
355 /// Use this instead of [`Self::serve_with_shutdown`] when you need to
356 /// observe the OS-picked port (`127.0.0.1:0`) before clients connect, or
357 /// when you want to wrap the listener in an outer adapter before passing it
358 /// in. The listening socket is owned by the caller and passed here; tsoracle
359 /// starts accepting on it immediately.
360 ///
361 /// Three outcomes:
362 /// 1. `shutdown` fires first → tonic drains in-flights and returns `Ok`.
363 /// The watch handle is aborted; any error it had been about to return
364 /// is forfeited (the process is shutting down anyway).
365 /// 2. Watch returns `Ok(Err(e))` → poisoned state is already published;
366 /// the caller-provided shutdown is cancelled internally so tonic begins
367 /// graceful shutdown; in-flight `GetTs` calls whose `try_grant` already
368 /// succeeded complete with the timestamps they were allocated; new calls
369 /// fail fast. Returns `Err(e)`.
370 /// 3. Watch task panics → returns `Err(ServerError::WatchPanic{..})`
371 /// with the panic payload stringified. Same drain semantics as (2).
372 pub async fn serve_with_listener(
373 self,
374 listener: tokio::net::TcpListener,
375 shutdown: impl Future<Output = ()> + Send + 'static,
376 ) -> Result<(), ServerError> {
377 #[cfg(any(feature = "tls-rustls", feature = "tls-native"))]
378 let tls_config = self.tls_config.clone();
379
380 let (routes, mut watch_handle) = self.into_router();
381 let (cancel_tx, cancel_rx) = tokio::sync::oneshot::channel::<()>();
382
383 let combined_shutdown = async move {
384 tokio::select! {
385 _ = shutdown => {}
386 _ = cancel_rx => {}
387 }
388 };
389
390 let incoming = tonic::transport::server::TcpIncoming::from(listener);
391
392 let mut tonic = TonicServer::builder();
393 #[cfg(any(feature = "tls-rustls", feature = "tls-native"))]
394 if let Some(cfg) = tls_config {
395 tonic = tonic.tls_config(cfg).map_err(ServerError::Transport)?;
396 }
397 let serve = tonic
398 .add_routes(routes)
399 .serve_with_incoming_shutdown(incoming, combined_shutdown);
400 tokio::pin!(serve);
401
402 tokio::select! {
403 biased;
404
405 watch_result = &mut watch_handle => {
406 let _ = cancel_tx.send(());
407 let _ = serve.await;
408 join_to_server_result(watch_result)
409 }
410 serve_result = &mut serve => {
411 watch_handle.abort();
412 serve_result?;
413 Ok(())
414 }
415 }
416 }
417}
418
419/// Convert a `JoinHandle` result into a `ServerError`-typed result.
420///
421/// - `Ok(Ok(()))` — unreachable in production: `run_leader_watch` only
422/// returns from its loop via the `WatchStreamClosed` branch. Forwarded
423/// verbatim so the conversion remains total for test helpers that spawn
424/// no-op join futures.
425/// - `Ok(Err(e))` — task returned an error (including `WatchStreamClosed`
426/// from a clean EOF). Forward verbatim.
427/// - `Err(JoinError)` — task was cancelled or panicked. Cancellation maps to
428/// Ok (we asked for it); panic maps to `WatchPanic` with payload.
429fn join_to_server_result(
430 join_result: Result<Result<(), ServerError>, tokio::task::JoinError>,
431) -> Result<(), ServerError> {
432 match join_result {
433 Ok(inner) => inner,
434 Err(join_err) if join_err.is_panic() => {
435 let payload = panic_payload_to_string(join_err.into_panic());
436 Err(ServerError::WatchPanic {
437 payload,
438 bt: Bt::capture(),
439 })
440 }
441 Err(_cancelled) => Ok(()),
442 }
443}
444
445fn panic_payload_to_string(panic: Box<dyn std::any::Any + Send>) -> String {
446 if let Some(text) = panic.downcast_ref::<&'static str>() {
447 (*text).to_string()
448 } else if let Some(text) = panic.downcast_ref::<String>() {
449 text.clone()
450 } else {
451 "watch task panicked with non-string payload".to_string()
452 }
453}
454
455#[cfg(any(test, feature = "test-fakes"))]
456impl Server {
457 /// Test-only entry point for the leader-watch loop. Exposed to integration
458 /// tests via the `test-fakes` feature; not part of the stable public API.
459 #[doc(hidden)]
460 pub async fn run_leader_watch_for_tests(self: Arc<Self>) -> Result<(), ServerError> {
461 crate::fence::run_leader_watch(self).await
462 }
463
464 /// Test-only allocator probe. Issues a window grant against the current
465 /// in-memory state without going through the gRPC service. Used by
466 /// regression tests that need to observe the behavioral fence (no
467 /// timestamp at or below the prior leader's high-water) directly.
468 #[doc(hidden)]
469 pub fn try_grant_for_tests(&self, count: u32) -> Result<WindowGrant, CoreError> {
470 self.allocator.lock().try_grant(self.clock.now_ms(), count)
471 }
472}
473
474#[cfg(test)]
475mod tests {
476 use super::*;
477
478 #[test]
479 fn panic_payload_to_string_recovers_static_str() {
480 // `panic!("literal")` produces a `&'static str` payload; we want the
481 // verbatim text so operators see what the watch task said.
482 let payload: Box<dyn std::any::Any + Send> = Box::new("watch boom");
483 assert_eq!(panic_payload_to_string(payload), "watch boom");
484 }
485
486 #[test]
487 fn panic_payload_to_string_recovers_owned_string() {
488 // `panic!("{var}")` produces a `String` payload (formatted at panic
489 // time); the helper must downcast that branch too.
490 let payload: Box<dyn std::any::Any + Send> = Box::new(String::from("formatted"));
491 assert_eq!(panic_payload_to_string(payload), "formatted");
492 }
493
494 #[test]
495 fn panic_payload_to_string_falls_back_for_other_types() {
496 // Custom payloads (panic!(MyType { .. })) hit the catch-all branch.
497 struct Custom;
498 let payload: Box<dyn std::any::Any + Send> = Box::new(Custom);
499 assert_eq!(
500 panic_payload_to_string(payload),
501 "watch task panicked with non-string payload",
502 );
503 }
504
505 #[cfg(any(feature = "tls-rustls", feature = "tls-native"))]
506 #[test]
507 fn builder_stores_tls_config() {
508 // The serve_* paths read `tls_config` from `Server` (not the builder)
509 // after `into_router` consumes self — so the field must survive the
510 // builder → Server hand-off, not just the builder method.
511 use crate::test_fakes::InMemoryDriver;
512
513 let driver = Arc::new(InMemoryDriver::new());
514 let cfg = tonic::transport::ServerTlsConfig::new();
515 let server = Server::builder()
516 .consensus_driver(driver)
517 .tls_config(cfg)
518 .build()
519 .expect("build with tls_config must succeed");
520 assert!(server.tls_config.is_some());
521 }
522
523 #[tokio::test]
524 async fn join_to_server_result_passes_through_clean_outcome() {
525 // Ok(Ok(())) — task finished cleanly; forward verbatim.
526 let handle = tokio::spawn(async { Ok::<(), ServerError>(()) });
527 let join = handle.await;
528 assert!(matches!(join_to_server_result(join), Ok(())));
529 }
530
531 #[tokio::test]
532 async fn join_to_server_result_forwards_inner_error() {
533 // Ok(Err(e)) — task returned an error; forward it.
534 let handle = tokio::spawn(async {
535 Err::<(), ServerError>(ServerError::WatchPanic {
536 payload: "synthetic".into(),
537 bt: Bt::capture(),
538 })
539 });
540 let join = handle.await;
541 match join_to_server_result(join) {
542 Err(ServerError::WatchPanic { payload, .. }) => assert_eq!(payload, "synthetic"),
543 other => panic!("expected forwarded WatchPanic, got {other:?}"),
544 }
545 }
546
547 #[tokio::test]
548 async fn join_to_server_result_translates_panic_to_watch_panic() {
549 // Err(JoinError::is_panic) — task panicked; surface as WatchPanic with
550 // the payload stringified by `panic_payload_to_string`.
551 let handle = tokio::spawn(async {
552 panic!("intentional");
553 #[allow(unreachable_code)]
554 Ok::<(), ServerError>(())
555 });
556 let join = handle.await;
557 match join_to_server_result(join) {
558 Err(ServerError::WatchPanic { payload, .. }) => {
559 assert!(payload.contains("intentional"))
560 }
561 other => panic!("expected WatchPanic, got {other:?}"),
562 }
563 }
564
565 #[tokio::test]
566 async fn join_to_server_result_treats_cancellation_as_clean_exit() {
567 // Err(JoinError::is_cancelled) — caller aborted the task; we asked
568 // for that, so map to Ok.
569 let handle: tokio::task::JoinHandle<Result<(), ServerError>> =
570 tokio::spawn(async { futures::future::pending().await });
571 handle.abort();
572 let join = handle.await;
573 assert!(matches!(join_to_server_result(join), Ok(())));
574 }
575}