Skip to main content

tsoracle_server/
server.rs

1//
2//  ░▀█▀░█▀▀░█▀█░█▀▄░█▀█░█▀▀░█░░░█▀▀
3//  ░░█░░▀▀█░█░█░█▀▄░█▀█░█░░░█░░░█▀▀
4//  ░░▀░░▀▀▀░▀▀▀░▀░▀░▀░▀░▀▀▀░▀▀▀░▀▀▀
5//
6//  tsoracle — Distributed Timestamp Oracle
7//
8//  Copyright (c) 2026 Prisma Risk
9//  Licensed under the Apache License, Version 2.0
10//  https://github.com/prisma-risk/tsoracle
11//
12
13use core::time::Duration;
14use parking_lot::Mutex;
15use std::future::Future;
16use std::net::SocketAddr;
17use std::sync::Arc;
18use tokio::sync::watch;
19use tonic::service::Routes;
20use tonic::transport::Server as TonicServer;
21use tsoracle_consensus::ConsensusDriver;
22use tsoracle_core::{Allocator, Bt, Clock, SystemClock};
23#[cfg(any(test, feature = "test-fakes"))]
24use tsoracle_core::{CoreError, WindowGrant};
25use tsoracle_proto::v1::tso_service_server::TsoServiceServer;
26
27use crate::service::TsoServiceImpl;
28
29#[derive(Debug, thiserror::Error)]
30pub enum BuildError {
31    #[error("consensus_driver is required")]
32    MissingConsensusDriver,
33    /// Surfaced when [`crate::leader_hint::KEY`] fails [`crate::leader_hint::validate_key`].
34    /// Today the key is a valid `const &'static str`, so this variant is
35    /// developer-error insurance: a future edit that breaks the key triggers
36    /// a startup failure rather than silently stripping the trailer from
37    /// every NOT_LEADER response.
38    #[error("invalid leader-hint metadata key: {0}")]
39    InvalidLeaderHintKey(#[from] tonic::metadata::errors::InvalidMetadataKey),
40}
41
42#[derive(Debug, thiserror::Error)]
43pub enum ServerError {
44    #[error("transport: {0}")]
45    Transport(#[from] tonic::transport::Error),
46    #[error("consensus: {0}")]
47    Consensus(#[from] tsoracle_consensus::ConsensusError),
48    #[error("core: {0}")]
49    Core(#[from] tsoracle_core::CoreError),
50    /// The leader-watch task panicked. Distinct from a clean error return so
51    /// operators can tell "driver returned Err" (recoverable design) from
52    /// "task panicked" (programming bug).
53    #[error("leader-watch task panicked: {payload}{bt}")]
54    WatchPanic { payload: String, bt: Bt },
55    /// The consensus driver's `leadership_events()` stream ended cleanly while
56    /// the leader-watch task was running. The stream is contracted to live for
57    /// the life of the server, so its end is anomalous (driver shutdown, lost
58    /// session, etc.) — distinct from a `Consensus` error returned mid-fence.
59    /// The watch task publishes `ServingState::NotServing` before returning
60    /// this variant so embedders who never observe the `JoinHandle` still get
61    /// the documented fail-safe behavior.
62    #[error("consensus driver leadership stream closed")]
63    WatchStreamClosed,
64}
65
66#[derive(Clone, Debug)]
67pub enum ServingState {
68    NotServing { leader_endpoint: Option<String> },
69    Serving,
70}
71
72pub struct ServerBuilder {
73    consensus: Option<Arc<dyn ConsensusDriver>>,
74    clock: Option<Arc<dyn Clock>>,
75    window_ahead: Duration,
76    failover_advance: Duration,
77    #[cfg(any(feature = "tls-rustls", feature = "tls-native"))]
78    tls_config: Option<tonic::transport::ServerTlsConfig>,
79}
80
81impl Default for ServerBuilder {
82    fn default() -> Self {
83        ServerBuilder {
84            consensus: None,
85            clock: None,
86            window_ahead: Duration::from_secs(3),
87            failover_advance: Duration::from_secs(1),
88            #[cfg(any(feature = "tls-rustls", feature = "tls-native"))]
89            tls_config: None,
90        }
91    }
92}
93
94impl ServerBuilder {
95    pub fn consensus_driver(mut self, driver: Arc<dyn ConsensusDriver>) -> Self {
96        self.consensus = Some(driver);
97        self
98    }
99    pub fn clock(mut self, clock: Arc<dyn Clock>) -> Self {
100        self.clock = Some(clock);
101        self
102    }
103    pub fn window_ahead(mut self, window_ahead: Duration) -> Self {
104        self.window_ahead = window_ahead;
105        self
106    }
107    pub fn failover_advance(mut self, failover_advance: Duration) -> Self {
108        self.failover_advance = failover_advance;
109        self
110    }
111
112    /// Configure TLS termination for this server. Applied inside
113    /// [`Server::serve`], [`Server::serve_with_shutdown`], and
114    /// [`Server::serve_with_listener`]. Not applied to [`Server::into_router`] —
115    /// embedders mounting tsoracle alongside their own services control TLS
116    /// on their own tonic builder.
117    #[cfg(any(feature = "tls-rustls", feature = "tls-native"))]
118    pub fn tls_config(mut self, cfg: tonic::transport::ServerTlsConfig) -> Self {
119        self.tls_config = Some(cfg);
120        self
121    }
122
123    pub fn build(self) -> Result<Server, BuildError> {
124        crate::leader_hint::validate_key()?;
125        let consensus = self.consensus.ok_or(BuildError::MissingConsensusDriver)?;
126        let clock = self.clock.unwrap_or_else(|| Arc::new(SystemClock));
127        let (state_tx, state_rx) = watch::channel(ServingState::NotServing {
128            leader_endpoint: None,
129        });
130        Ok(Server {
131            consensus,
132            clock,
133            window_ahead: self.window_ahead,
134            failover_advance: self.failover_advance,
135            allocator: Arc::new(Mutex::new(Allocator::new())),
136            state_tx,
137            state_rx,
138            extension_lock: Arc::new(tokio::sync::Mutex::new(())),
139            extension_gate: Arc::new(tokio::sync::RwLock::new(())),
140            #[cfg(any(feature = "tls-rustls", feature = "tls-native"))]
141            tls_config: self.tls_config,
142        })
143    }
144}
145
146pub struct Server {
147    pub(crate) consensus: Arc<dyn ConsensusDriver>,
148    pub(crate) clock: Arc<dyn Clock>,
149    pub(crate) window_ahead: Duration,
150    pub(crate) failover_advance: Duration,
151    pub(crate) allocator: Arc<Mutex<Allocator>>,
152    pub(crate) state_tx: watch::Sender<ServingState>,
153    pub state_rx: watch::Receiver<ServingState>,
154    /// Serializes window extensions so a stampeding burst of `WindowExhausted`
155    /// requests resolves to a single `persist_high_water` round-trip. Acquired
156    /// before `extension_gate`; combined with a recheck-after-acquire inside
157    /// `extend_window`, latecomers find the window already extended and
158    /// return without contacting consensus.
159    pub(crate) extension_lock: Arc<tokio::sync::Mutex<()>>,
160    /// Read-locked by window-extension calls for the duration of their
161    /// prepare → persist → commit dance. The leader-watch task takes a
162    /// write lock between clearing serving and calling load_high_water,
163    /// which drains all in-flight extensions started under the prior epoch
164    /// before the fence proceeds.
165    pub(crate) extension_gate: Arc<tokio::sync::RwLock<()>>,
166    #[cfg(any(feature = "tls-rustls", feature = "tls-native"))]
167    pub(crate) tls_config: Option<tonic::transport::ServerTlsConfig>,
168}
169
170impl Server {
171    pub fn builder() -> ServerBuilder {
172        ServerBuilder::default()
173    }
174
175    /// Single transition API used in response to evidence that the current
176    /// epoch is no longer valid: consensus rejection (NotLeader/Fenced
177    /// during persist), leader-watch task termination, or other authoritative
178    /// signals of leadership loss.
179    ///
180    /// Clears the allocator BEFORE publishing `NotServing` so a racing
181    /// `try_grant` either (a) observes `CoreError::NotLeader` rather than
182    /// handing out a stale-epoch grant, or (b) succeeds against a still-valid
183    /// epoch and the *next* call observes `NotServing`. Either ordering is
184    /// safe; what is not safe is the inverse (publish first, clear later)
185    /// because a request between those two steps would see `Serving` AND a
186    /// still-leader allocator.
187    ///
188    /// Does NOT take `extension_gate.write()`. That would deadlock against
189    /// in-flight extensions currently holding the read lock and awaiting
190    /// `persist_high_water`. Those in-flights will either complete cleanly
191    /// (the next `try_grant` then sees `NotServing`) or fail with
192    /// NotLeader/Fenced (and reach this helper themselves — it is idempotent).
193    pub(crate) fn step_down_due_to_consensus_rejection(&self, leader_endpoint: Option<String>) {
194        self.allocator.lock().on_leadership_lost();
195        let _ = self
196            .state_tx
197            .send(ServingState::NotServing { leader_endpoint });
198    }
199}
200
201impl Server {
202    /// Return the configured `TsoServiceServer<TsoServiceImpl>` as a tonic
203    /// `Routes` value plus a `JoinHandle` for the spawned leader-watch task,
204    /// so callers can mount tsoracle's service alongside their own services
205    /// on a shared tonic listener instead of binding a dedicated port.
206    ///
207    /// The `JoinHandle` payload is `Result<(), ServerError>` so embedders
208    /// can observe leader-watch termination. The task never returns
209    /// `Ok(())`: every termination — driver error, panic, or clean EOF on
210    /// the leadership stream (surfaced as `ServerError::WatchStreamClosed`)
211    /// — publishes `ServingState::NotServing { leader_endpoint: None }`
212    /// before returning, so all subsequent RPCs fail fast with
213    /// `FAILED_PRECONDITION`. Embedders who never inspect the handle still
214    /// get fail-safe behavior.
215    ///
216    /// The `Server::serve()` method is a thin wrapper over this — it calls
217    /// `into_router`, builds a tonic `Server`, and binds a listener.
218    pub fn into_router(self) -> (Routes, tokio::task::JoinHandle<Result<(), ServerError>>) {
219        let server = Arc::new(self);
220
221        let watch_server = server.clone();
222        let watch_handle = tokio::spawn(async move {
223            use futures::FutureExt;
224            // catch_unwind so a panic in run_leader_watch still routes through
225            // the poisoning path. Without this, embedders who mount into_router
226            // directly and never observe the JoinHandle would see
227            // ServingState::Serving remain published while the watch task is
228            // dead — the inverse of the fail-safe guarantee documented above.
229            // The panic is re-raised after poisoning so serve / serve_with_*
230            // continue to translate it into ServerError::WatchPanic via
231            // join_to_server_result.
232            let outcome =
233                std::panic::AssertUnwindSafe(crate::fence::run_leader_watch(watch_server.clone()))
234                    .catch_unwind()
235                    .await;
236            match outcome {
237                Ok(result) => {
238                    if let Err(ref _e) = result {
239                        // Poison BEFORE returning so embedders who do not observe
240                        // the JoinHandle still get fail-safe behavior.
241                        watch_server.step_down_due_to_consensus_rejection(None);
242                        #[cfg(feature = "tracing")]
243                        tracing::error!(error = %_e, "leader-watch terminated; serving disabled");
244                    }
245                    result
246                }
247                Err(panic_payload) => {
248                    // Mirror the Err branch: poison BEFORE re-raising so
249                    // handle-dropping embedders still observe NotServing.
250                    watch_server.step_down_due_to_consensus_rejection(None);
251                    #[cfg(feature = "tracing")]
252                    tracing::error!("leader-watch panicked; serving disabled");
253                    std::panic::resume_unwind(panic_payload);
254                }
255            }
256        });
257
258        let service = TsoServiceImpl { server };
259        #[allow(unused_mut)]
260        let mut routes = Routes::new(TsoServiceServer::new(service));
261        #[cfg(feature = "reflection")]
262        {
263            #[expect(
264                clippy::expect_used,
265                reason = "`FILE_DESCRIPTOR_SET` is generated by `tsoracle-proto`'s `build.rs` from checked-in `.proto` sources; if it ever fails to decode, the build itself is broken. Tracked by #9."
266            )]
267            let reflection = tonic_reflection::server::Builder::configure()
268                .register_encoded_file_descriptor_set(tsoracle_proto::FILE_DESCRIPTOR_SET)
269                .build_v1()
270                .expect("FILE_DESCRIPTOR_SET emitted by build.rs is always valid");
271            routes = routes.add_service(reflection);
272        }
273        (routes, watch_handle)
274    }
275
276    pub async fn serve(self, addr: SocketAddr) -> Result<(), ServerError> {
277        self.serve_with_shutdown(addr, futures::future::pending())
278            .await
279    }
280
281    /// Run the gRPC server until either the caller's `shutdown` fires or the
282    /// leader-watch task terminates.
283    ///
284    /// Three outcomes:
285    /// 1. `shutdown` fires first → tonic drains in-flights and returns Ok.
286    ///    The watch handle is aborted; any error it had been about to return
287    ///    is forfeited (the process is shutting down anyway).
288    /// 2. Watch returns `Ok(Err(e))` → poisoned state is already published;
289    ///    `cancel_tx` triggers tonic's graceful shutdown; in-flight `GetTs`
290    ///    calls whose `try_grant` already succeeded complete with the
291    ///    timestamps they were allocated; new calls fail fast. Returns `Err(e)`.
292    /// 3. Watch task panics → returns `Err(ServerError::WatchPanic{..})`
293    ///    with the panic payload stringified. Same drain semantics as (2).
294    pub async fn serve_with_shutdown(
295        self,
296        addr: SocketAddr,
297        shutdown: impl Future<Output = ()> + Send + 'static,
298    ) -> Result<(), ServerError> {
299        #[cfg(any(feature = "tls-rustls", feature = "tls-native"))]
300        let tls_config = self.tls_config.clone();
301
302        let (routes, mut watch_handle) = self.into_router();
303        let (cancel_tx, cancel_rx) = tokio::sync::oneshot::channel::<()>();
304
305        // tonic stops when EITHER the user's shutdown fires OR we cancel
306        // because the watch task terminated.
307        let combined_shutdown = async move {
308            tokio::select! {
309                _ = shutdown => {}
310                _ = cancel_rx => {}
311            }
312        };
313
314        let mut tonic = TonicServer::builder();
315        #[cfg(any(feature = "tls-rustls", feature = "tls-native"))]
316        if let Some(cfg) = tls_config {
317            tonic = tonic.tls_config(cfg).map_err(ServerError::Transport)?;
318        }
319        let serve = tonic
320            .add_routes(routes)
321            .serve_with_shutdown(addr, combined_shutdown);
322        tokio::pin!(serve);
323
324        tokio::select! {
325            // Bias toward the watch arm: if both are ready in the same poll
326            // (rare but possible — graceful shutdown completed in the same
327            // tick the watch returned), we want to surface the watch error
328            // rather than report a clean shutdown.
329            biased;
330
331            watch_result = &mut watch_handle => {
332                // Watch terminated. State is already poisoned (see watch
333                // task body in into_router). Trigger tonic drain and wait
334                // for it to finish, then report the watch's outcome.
335                let _ = cancel_tx.send(());
336                let _ = serve.await;
337                join_to_server_result(watch_result)
338            }
339            serve_result = &mut serve => {
340                // User shutdown fired (or our cancel — but watch arm has
341                // `biased` priority, so reaching here means user shutdown).
342                // The watch task may still be running; aborting it loses
343                // any error it was about to report, but the process is
344                // shutting down so that's acceptable.
345                watch_handle.abort();
346                serve_result?;
347                Ok(())
348            }
349        }
350    }
351
352    /// Run the gRPC server on a caller-provided `TcpListener` until either
353    /// the caller-provided `shutdown` fires or the leader-watch task terminates.
354    ///
355    /// Use this instead of [`Self::serve_with_shutdown`] when you need to
356    /// observe the OS-picked port (`127.0.0.1:0`) before clients connect, or
357    /// when you want to wrap the listener in an outer adapter before passing it
358    /// in. The listening socket is owned by the caller and passed here; tsoracle
359    /// starts accepting on it immediately.
360    ///
361    /// Three outcomes:
362    /// 1. `shutdown` fires first → tonic drains in-flights and returns `Ok`.
363    ///    The watch handle is aborted; any error it had been about to return
364    ///    is forfeited (the process is shutting down anyway).
365    /// 2. Watch returns `Ok(Err(e))` → poisoned state is already published;
366    ///    the caller-provided shutdown is cancelled internally so tonic begins
367    ///    graceful shutdown; in-flight `GetTs` calls whose `try_grant` already
368    ///    succeeded complete with the timestamps they were allocated; new calls
369    ///    fail fast. Returns `Err(e)`.
370    /// 3. Watch task panics → returns `Err(ServerError::WatchPanic{..})`
371    ///    with the panic payload stringified. Same drain semantics as (2).
372    pub async fn serve_with_listener(
373        self,
374        listener: tokio::net::TcpListener,
375        shutdown: impl Future<Output = ()> + Send + 'static,
376    ) -> Result<(), ServerError> {
377        #[cfg(any(feature = "tls-rustls", feature = "tls-native"))]
378        let tls_config = self.tls_config.clone();
379
380        let (routes, mut watch_handle) = self.into_router();
381        let (cancel_tx, cancel_rx) = tokio::sync::oneshot::channel::<()>();
382
383        let combined_shutdown = async move {
384            tokio::select! {
385                _ = shutdown => {}
386                _ = cancel_rx => {}
387            }
388        };
389
390        let incoming = tonic::transport::server::TcpIncoming::from(listener);
391
392        let mut tonic = TonicServer::builder();
393        #[cfg(any(feature = "tls-rustls", feature = "tls-native"))]
394        if let Some(cfg) = tls_config {
395            tonic = tonic.tls_config(cfg).map_err(ServerError::Transport)?;
396        }
397        let serve = tonic
398            .add_routes(routes)
399            .serve_with_incoming_shutdown(incoming, combined_shutdown);
400        tokio::pin!(serve);
401
402        tokio::select! {
403            biased;
404
405            watch_result = &mut watch_handle => {
406                let _ = cancel_tx.send(());
407                let _ = serve.await;
408                join_to_server_result(watch_result)
409            }
410            serve_result = &mut serve => {
411                watch_handle.abort();
412                serve_result?;
413                Ok(())
414            }
415        }
416    }
417}
418
419/// Convert a `JoinHandle` result into a `ServerError`-typed result.
420///
421/// - `Ok(Ok(()))` — unreachable in production: `run_leader_watch` only
422///   returns from its loop via the `WatchStreamClosed` branch. Forwarded
423///   verbatim so the conversion remains total for test helpers that spawn
424///   no-op join futures.
425/// - `Ok(Err(e))` — task returned an error (including `WatchStreamClosed`
426///   from a clean EOF). Forward verbatim.
427/// - `Err(JoinError)` — task was cancelled or panicked. Cancellation maps to
428///   Ok (we asked for it); panic maps to `WatchPanic` with payload.
429fn join_to_server_result(
430    join_result: Result<Result<(), ServerError>, tokio::task::JoinError>,
431) -> Result<(), ServerError> {
432    match join_result {
433        Ok(inner) => inner,
434        Err(join_err) if join_err.is_panic() => {
435            let payload = panic_payload_to_string(join_err.into_panic());
436            Err(ServerError::WatchPanic {
437                payload,
438                bt: Bt::capture(),
439            })
440        }
441        Err(_cancelled) => Ok(()),
442    }
443}
444
445fn panic_payload_to_string(panic: Box<dyn std::any::Any + Send>) -> String {
446    if let Some(text) = panic.downcast_ref::<&'static str>() {
447        (*text).to_string()
448    } else if let Some(text) = panic.downcast_ref::<String>() {
449        text.clone()
450    } else {
451        "watch task panicked with non-string payload".to_string()
452    }
453}
454
455#[cfg(any(test, feature = "test-fakes"))]
456impl Server {
457    /// Test-only entry point for the leader-watch loop. Exposed to integration
458    /// tests via the `test-fakes` feature; not part of the stable public API.
459    #[doc(hidden)]
460    pub async fn run_leader_watch_for_tests(self: Arc<Self>) -> Result<(), ServerError> {
461        crate::fence::run_leader_watch(self).await
462    }
463
464    /// Test-only allocator probe. Issues a window grant against the current
465    /// in-memory state without going through the gRPC service. Used by
466    /// regression tests that need to observe the behavioral fence (no
467    /// timestamp at or below the prior leader's high-water) directly.
468    #[doc(hidden)]
469    pub fn try_grant_for_tests(&self, count: u32) -> Result<WindowGrant, CoreError> {
470        self.allocator.lock().try_grant(self.clock.now_ms(), count)
471    }
472}
473
474#[cfg(test)]
475mod tests {
476    use super::*;
477
478    #[test]
479    fn panic_payload_to_string_recovers_static_str() {
480        // `panic!("literal")` produces a `&'static str` payload; we want the
481        // verbatim text so operators see what the watch task said.
482        let payload: Box<dyn std::any::Any + Send> = Box::new("watch boom");
483        assert_eq!(panic_payload_to_string(payload), "watch boom");
484    }
485
486    #[test]
487    fn panic_payload_to_string_recovers_owned_string() {
488        // `panic!("{var}")` produces a `String` payload (formatted at panic
489        // time); the helper must downcast that branch too.
490        let payload: Box<dyn std::any::Any + Send> = Box::new(String::from("formatted"));
491        assert_eq!(panic_payload_to_string(payload), "formatted");
492    }
493
494    #[test]
495    fn panic_payload_to_string_falls_back_for_other_types() {
496        // Custom payloads (panic!(MyType { .. })) hit the catch-all branch.
497        struct Custom;
498        let payload: Box<dyn std::any::Any + Send> = Box::new(Custom);
499        assert_eq!(
500            panic_payload_to_string(payload),
501            "watch task panicked with non-string payload",
502        );
503    }
504
505    #[cfg(any(feature = "tls-rustls", feature = "tls-native"))]
506    #[test]
507    fn builder_stores_tls_config() {
508        // The serve_* paths read `tls_config` from `Server` (not the builder)
509        // after `into_router` consumes self — so the field must survive the
510        // builder → Server hand-off, not just the builder method.
511        use crate::test_fakes::InMemoryDriver;
512
513        let driver = Arc::new(InMemoryDriver::new());
514        let cfg = tonic::transport::ServerTlsConfig::new();
515        let server = Server::builder()
516            .consensus_driver(driver)
517            .tls_config(cfg)
518            .build()
519            .expect("build with tls_config must succeed");
520        assert!(server.tls_config.is_some());
521    }
522
523    #[tokio::test]
524    async fn join_to_server_result_passes_through_clean_outcome() {
525        // Ok(Ok(())) — task finished cleanly; forward verbatim.
526        let handle = tokio::spawn(async { Ok::<(), ServerError>(()) });
527        let join = handle.await;
528        assert!(matches!(join_to_server_result(join), Ok(())));
529    }
530
531    #[tokio::test]
532    async fn join_to_server_result_forwards_inner_error() {
533        // Ok(Err(e)) — task returned an error; forward it.
534        let handle = tokio::spawn(async {
535            Err::<(), ServerError>(ServerError::WatchPanic {
536                payload: "synthetic".into(),
537                bt: Bt::capture(),
538            })
539        });
540        let join = handle.await;
541        match join_to_server_result(join) {
542            Err(ServerError::WatchPanic { payload, .. }) => assert_eq!(payload, "synthetic"),
543            other => panic!("expected forwarded WatchPanic, got {other:?}"),
544        }
545    }
546
547    #[tokio::test]
548    async fn join_to_server_result_translates_panic_to_watch_panic() {
549        // Err(JoinError::is_panic) — task panicked; surface as WatchPanic with
550        // the payload stringified by `panic_payload_to_string`.
551        let handle = tokio::spawn(async {
552            panic!("intentional");
553            #[allow(unreachable_code)]
554            Ok::<(), ServerError>(())
555        });
556        let join = handle.await;
557        match join_to_server_result(join) {
558            Err(ServerError::WatchPanic { payload, .. }) => {
559                assert!(payload.contains("intentional"))
560            }
561            other => panic!("expected WatchPanic, got {other:?}"),
562        }
563    }
564
565    #[tokio::test]
566    async fn join_to_server_result_treats_cancellation_as_clean_exit() {
567        // Err(JoinError::is_cancelled) — caller aborted the task; we asked
568        // for that, so map to Ok.
569        let handle: tokio::task::JoinHandle<Result<(), ServerError>> =
570            tokio::spawn(async { futures::future::pending().await });
571        handle.abort();
572        let join = handle.await;
573        assert!(matches!(join_to_server_result(join), Ok(())));
574    }
575}