Skip to main content

tsoracle_server/
server.rs

1//
2//  ░▀█▀░█▀▀░█▀█░█▀▄░█▀█░█▀▀░█░░░█▀▀
3//  ░░█░░▀▀█░█░█░█▀▄░█▀█░█░░░█░░░█▀▀
4//  ░░▀░░▀▀▀░▀▀▀░▀░▀░▀░▀░▀▀▀░▀▀▀░▀▀▀
5//
6//  tsoracle — Distributed Timestamp Oracle
7//
8//  Copyright (c) 2026 Prisma Risk
9//  Licensed under the Apache License, Version 2.0
10//  https://github.com/prisma-risk/tsoracle
11//
12
13use core::time::Duration;
14use parking_lot::Mutex;
15use std::future::Future;
16use std::net::SocketAddr;
17use std::sync::Arc;
18use tokio::sync::watch;
19use tonic::service::Routes;
20use tonic::transport::Server as TonicServer;
21use tsoracle_consensus::ConsensusDriver;
22use tsoracle_core::{Allocator, Clock, SystemClock};
23#[cfg(any(test, feature = "test-fakes"))]
24use tsoracle_core::{CoreError, WindowGrant};
25use tsoracle_proto::v1::tso_service_server::TsoServiceServer;
26
27use crate::service::TsoServiceImpl;
28
29#[derive(Debug, thiserror::Error)]
30pub enum BuildError {
31    #[error("consensus_driver is required")]
32    MissingConsensusDriver,
33    /// Surfaced when [`crate::leader_hint::KEY`] fails [`crate::leader_hint::validate_key`].
34    /// Today the key is a valid `const &'static str`, so this variant is
35    /// developer-error insurance: a future edit that breaks the key triggers
36    /// a startup failure rather than silently stripping the trailer from
37    /// every NOT_LEADER response.
38    #[error("invalid leader-hint metadata key: {0}")]
39    InvalidLeaderHintKey(#[from] tonic::metadata::errors::InvalidMetadataKey),
40}
41
42#[derive(Debug, thiserror::Error)]
43pub enum ServerError {
44    #[error("transport: {0}")]
45    Transport(#[from] tonic::transport::Error),
46    #[error("consensus: {0}")]
47    Consensus(#[from] tsoracle_consensus::ConsensusError),
48    #[error("core: {0}")]
49    Core(#[from] tsoracle_core::CoreError),
50    /// The leader-watch task panicked. Distinct from a clean error return so
51    /// operators can tell "driver returned Err" (recoverable design) from
52    /// "task panicked" (programming bug).
53    #[error("leader-watch task panicked: {payload}")]
54    WatchPanic { payload: String },
55}
56
57#[derive(Clone, Debug)]
58pub enum ServingState {
59    NotServing { leader_endpoint: Option<String> },
60    Serving,
61}
62
63pub struct ServerBuilder {
64    consensus: Option<Arc<dyn ConsensusDriver>>,
65    clock: Option<Arc<dyn Clock>>,
66    window_ahead: Duration,
67    failover_advance: Duration,
68    #[cfg(any(feature = "tls-rustls", feature = "tls-native"))]
69    tls_config: Option<tonic::transport::ServerTlsConfig>,
70}
71
72impl Default for ServerBuilder {
73    fn default() -> Self {
74        ServerBuilder {
75            consensus: None,
76            clock: None,
77            window_ahead: Duration::from_secs(3),
78            failover_advance: Duration::from_secs(1),
79            #[cfg(any(feature = "tls-rustls", feature = "tls-native"))]
80            tls_config: None,
81        }
82    }
83}
84
85impl ServerBuilder {
86    pub fn consensus_driver(mut self, driver: Arc<dyn ConsensusDriver>) -> Self {
87        self.consensus = Some(driver);
88        self
89    }
90    pub fn clock(mut self, clock: Arc<dyn Clock>) -> Self {
91        self.clock = Some(clock);
92        self
93    }
94    pub fn window_ahead(mut self, window_ahead: Duration) -> Self {
95        self.window_ahead = window_ahead;
96        self
97    }
98    pub fn failover_advance(mut self, failover_advance: Duration) -> Self {
99        self.failover_advance = failover_advance;
100        self
101    }
102
103    /// Configure TLS termination for this server. Applied inside
104    /// [`Server::serve`], [`Server::serve_with_shutdown`], and
105    /// [`Server::serve_with_listener`]. Not applied to [`Server::into_router`] —
106    /// embedders mounting tsoracle alongside their own services control TLS
107    /// on their own tonic builder.
108    #[cfg(any(feature = "tls-rustls", feature = "tls-native"))]
109    pub fn tls_config(mut self, cfg: tonic::transport::ServerTlsConfig) -> Self {
110        self.tls_config = Some(cfg);
111        self
112    }
113
114    pub fn build(self) -> Result<Server, BuildError> {
115        crate::leader_hint::validate_key()?;
116        let consensus = self.consensus.ok_or(BuildError::MissingConsensusDriver)?;
117        let clock = self.clock.unwrap_or_else(|| Arc::new(SystemClock));
118        let (state_tx, state_rx) = watch::channel(ServingState::NotServing {
119            leader_endpoint: None,
120        });
121        Ok(Server {
122            consensus,
123            clock,
124            window_ahead: self.window_ahead,
125            failover_advance: self.failover_advance,
126            allocator: Arc::new(Mutex::new(Allocator::new())),
127            state_tx,
128            state_rx,
129            extension_lock: Arc::new(tokio::sync::Mutex::new(())),
130            extension_gate: Arc::new(tokio::sync::RwLock::new(())),
131            #[cfg(any(feature = "tls-rustls", feature = "tls-native"))]
132            tls_config: self.tls_config,
133        })
134    }
135}
136
137pub struct Server {
138    pub(crate) consensus: Arc<dyn ConsensusDriver>,
139    pub(crate) clock: Arc<dyn Clock>,
140    pub(crate) window_ahead: Duration,
141    pub(crate) failover_advance: Duration,
142    pub(crate) allocator: Arc<Mutex<Allocator>>,
143    pub(crate) state_tx: watch::Sender<ServingState>,
144    pub state_rx: watch::Receiver<ServingState>,
145    /// Serializes window extensions so a stampeding burst of `WindowExhausted`
146    /// requests resolves to a single `persist_high_water` round-trip. Acquired
147    /// before `extension_gate`; combined with a recheck-after-acquire inside
148    /// `extend_window`, latecomers find the window already extended and
149    /// return without contacting consensus.
150    pub(crate) extension_lock: Arc<tokio::sync::Mutex<()>>,
151    /// Read-locked by window-extension calls for the duration of their
152    /// prepare → persist → commit dance. The leader-watch task takes a
153    /// write lock between clearing serving and calling load_high_water,
154    /// which drains all in-flight extensions started under the prior epoch
155    /// before the fence proceeds.
156    pub(crate) extension_gate: Arc<tokio::sync::RwLock<()>>,
157    #[cfg(any(feature = "tls-rustls", feature = "tls-native"))]
158    pub(crate) tls_config: Option<tonic::transport::ServerTlsConfig>,
159}
160
161impl Server {
162    pub fn builder() -> ServerBuilder {
163        ServerBuilder::default()
164    }
165
166    /// Single transition API used in response to evidence that the current
167    /// epoch is no longer valid: consensus rejection (NotLeader/Fenced
168    /// during persist), leader-watch task termination, or other authoritative
169    /// signals of leadership loss.
170    ///
171    /// Clears the allocator BEFORE publishing `NotServing` so a racing
172    /// `try_grant` either (a) observes `CoreError::NotLeader` rather than
173    /// handing out a stale-epoch grant, or (b) succeeds against a still-valid
174    /// epoch and the *next* call observes `NotServing`. Either ordering is
175    /// safe; what is not safe is the inverse (publish first, clear later)
176    /// because a request between those two steps would see `Serving` AND a
177    /// still-leader allocator.
178    ///
179    /// Does NOT take `extension_gate.write()`. That would deadlock against
180    /// in-flight extensions currently holding the read lock and awaiting
181    /// `persist_high_water`. Those in-flights will either complete cleanly
182    /// (the next `try_grant` then sees `NotServing`) or fail with
183    /// NotLeader/Fenced (and reach this helper themselves — it is idempotent).
184    pub(crate) fn step_down_due_to_consensus_rejection(&self, leader_endpoint: Option<String>) {
185        self.allocator.lock().on_leadership_lost();
186        let _ = self
187            .state_tx
188            .send(ServingState::NotServing { leader_endpoint });
189    }
190}
191
192impl Server {
193    /// Return the configured `TsoServiceServer<TsoServiceImpl>` as a tonic
194    /// `Routes` value plus a `JoinHandle` for the spawned leader-watch task,
195    /// so callers can mount tsoracle's service alongside their own services
196    /// on a shared tonic listener instead of binding a dedicated port.
197    ///
198    /// The `JoinHandle` payload is `Result<(), ServerError>` so embedders
199    /// can observe leader-watch termination. Before returning an error, the
200    /// task publishes `ServingState::NotServing { leader_endpoint: None }`
201    /// so all subsequent RPCs fail fast with `FAILED_PRECONDITION` — even
202    /// embedders who never inspect the handle get fail-safe behavior.
203    ///
204    /// The `Server::serve()` method is a thin wrapper over this — it calls
205    /// `into_router`, builds a tonic `Server`, and binds a listener.
206    pub fn into_router(self) -> (Routes, tokio::task::JoinHandle<Result<(), ServerError>>) {
207        let server = Arc::new(self);
208
209        let watch_server = server.clone();
210        let watch_handle = tokio::spawn(async move {
211            use futures::FutureExt;
212            // catch_unwind so a panic in run_leader_watch still routes through
213            // the poisoning path. Without this, embedders who mount into_router
214            // directly and never observe the JoinHandle would see
215            // ServingState::Serving remain published while the watch task is
216            // dead — the inverse of the fail-safe guarantee documented above.
217            // The panic is re-raised after poisoning so serve / serve_with_*
218            // continue to translate it into ServerError::WatchPanic via
219            // join_to_server_result.
220            let outcome =
221                std::panic::AssertUnwindSafe(crate::fence::run_leader_watch(watch_server.clone()))
222                    .catch_unwind()
223                    .await;
224            match outcome {
225                Ok(result) => {
226                    if let Err(ref _e) = result {
227                        // Poison BEFORE returning so embedders who do not observe
228                        // the JoinHandle still get fail-safe behavior.
229                        watch_server.step_down_due_to_consensus_rejection(None);
230                        #[cfg(feature = "tracing")]
231                        tracing::error!(error = %_e, "leader-watch terminated; serving disabled");
232                    }
233                    result
234                }
235                Err(panic_payload) => {
236                    // Mirror the Err branch: poison BEFORE re-raising so
237                    // handle-dropping embedders still observe NotServing.
238                    watch_server.step_down_due_to_consensus_rejection(None);
239                    #[cfg(feature = "tracing")]
240                    tracing::error!("leader-watch panicked; serving disabled");
241                    std::panic::resume_unwind(panic_payload);
242                }
243            }
244        });
245
246        let service = TsoServiceImpl { server };
247        #[allow(unused_mut)]
248        let mut routes = Routes::new(TsoServiceServer::new(service));
249        #[cfg(feature = "reflection")]
250        {
251            #[expect(
252                clippy::expect_used,
253                reason = "`FILE_DESCRIPTOR_SET` is generated by `tsoracle-proto`'s `build.rs` from checked-in `.proto` sources; if it ever fails to decode, the build itself is broken. Tracked by #9."
254            )]
255            let reflection = tonic_reflection::server::Builder::configure()
256                .register_encoded_file_descriptor_set(tsoracle_proto::FILE_DESCRIPTOR_SET)
257                .build_v1()
258                .expect("FILE_DESCRIPTOR_SET emitted by build.rs is always valid");
259            routes = routes.add_service(reflection);
260        }
261        (routes, watch_handle)
262    }
263
264    pub async fn serve(self, addr: SocketAddr) -> Result<(), ServerError> {
265        self.serve_with_shutdown(addr, futures::future::pending())
266            .await
267    }
268
269    /// Run the gRPC server until either the caller's `shutdown` fires or the
270    /// leader-watch task terminates.
271    ///
272    /// Three outcomes:
273    /// 1. `shutdown` fires first → tonic drains in-flights and returns Ok.
274    ///    The watch handle is aborted; any error it had been about to return
275    ///    is forfeited (the process is shutting down anyway).
276    /// 2. Watch returns `Ok(Err(e))` → poisoned state is already published;
277    ///    `cancel_tx` triggers tonic's graceful shutdown; in-flight `GetTs`
278    ///    calls whose `try_grant` already succeeded complete with the
279    ///    timestamps they were allocated; new calls fail fast. Returns `Err(e)`.
280    /// 3. Watch task panics → returns `Err(ServerError::WatchPanic{..})`
281    ///    with the panic payload stringified. Same drain semantics as (2).
282    pub async fn serve_with_shutdown(
283        self,
284        addr: SocketAddr,
285        shutdown: impl Future<Output = ()> + Send + 'static,
286    ) -> Result<(), ServerError> {
287        #[cfg(any(feature = "tls-rustls", feature = "tls-native"))]
288        let tls_config = self.tls_config.clone();
289
290        let (routes, mut watch_handle) = self.into_router();
291        let (cancel_tx, cancel_rx) = tokio::sync::oneshot::channel::<()>();
292
293        // tonic stops when EITHER the user's shutdown fires OR we cancel
294        // because the watch task terminated.
295        let combined_shutdown = async move {
296            tokio::select! {
297                _ = shutdown => {}
298                _ = cancel_rx => {}
299            }
300        };
301
302        let mut tonic = TonicServer::builder();
303        #[cfg(any(feature = "tls-rustls", feature = "tls-native"))]
304        if let Some(cfg) = tls_config {
305            tonic = tonic.tls_config(cfg).map_err(ServerError::Transport)?;
306        }
307        let serve = tonic
308            .add_routes(routes)
309            .serve_with_shutdown(addr, combined_shutdown);
310        tokio::pin!(serve);
311
312        tokio::select! {
313            // Bias toward the watch arm: if both are ready in the same poll
314            // (rare but possible — graceful shutdown completed in the same
315            // tick the watch returned), we want to surface the watch error
316            // rather than report a clean shutdown.
317            biased;
318
319            watch_result = &mut watch_handle => {
320                // Watch terminated. State is already poisoned (see watch
321                // task body in into_router). Trigger tonic drain and wait
322                // for it to finish, then report the watch's outcome.
323                let _ = cancel_tx.send(());
324                let _ = serve.await;
325                join_to_server_result(watch_result)
326            }
327            serve_result = &mut serve => {
328                // User shutdown fired (or our cancel — but watch arm has
329                // `biased` priority, so reaching here means user shutdown).
330                // The watch task may still be running; aborting it loses
331                // any error it was about to report, but the process is
332                // shutting down so that's acceptable.
333                watch_handle.abort();
334                serve_result?;
335                Ok(())
336            }
337        }
338    }
339
340    /// Run the gRPC server on a caller-provided `TcpListener` until either
341    /// the caller-provided `shutdown` fires or the leader-watch task terminates.
342    ///
343    /// Use this instead of [`Self::serve_with_shutdown`] when you need to
344    /// observe the OS-picked port (`127.0.0.1:0`) before clients connect, or
345    /// when you want to wrap the listener in an outer adapter before passing it
346    /// in. The listening socket is owned by the caller and passed here; tsoracle
347    /// starts accepting on it immediately.
348    ///
349    /// Three outcomes:
350    /// 1. `shutdown` fires first → tonic drains in-flights and returns `Ok`.
351    ///    The watch handle is aborted; any error it had been about to return
352    ///    is forfeited (the process is shutting down anyway).
353    /// 2. Watch returns `Ok(Err(e))` → poisoned state is already published;
354    ///    the caller-provided shutdown is cancelled internally so tonic begins
355    ///    graceful shutdown; in-flight `GetTs` calls whose `try_grant` already
356    ///    succeeded complete with the timestamps they were allocated; new calls
357    ///    fail fast. Returns `Err(e)`.
358    /// 3. Watch task panics → returns `Err(ServerError::WatchPanic{..})`
359    ///    with the panic payload stringified. Same drain semantics as (2).
360    pub async fn serve_with_listener(
361        self,
362        listener: tokio::net::TcpListener,
363        shutdown: impl Future<Output = ()> + Send + 'static,
364    ) -> Result<(), ServerError> {
365        #[cfg(any(feature = "tls-rustls", feature = "tls-native"))]
366        let tls_config = self.tls_config.clone();
367
368        let (routes, mut watch_handle) = self.into_router();
369        let (cancel_tx, cancel_rx) = tokio::sync::oneshot::channel::<()>();
370
371        let combined_shutdown = async move {
372            tokio::select! {
373                _ = shutdown => {}
374                _ = cancel_rx => {}
375            }
376        };
377
378        let incoming = tonic::transport::server::TcpIncoming::from(listener);
379
380        let mut tonic = TonicServer::builder();
381        #[cfg(any(feature = "tls-rustls", feature = "tls-native"))]
382        if let Some(cfg) = tls_config {
383            tonic = tonic.tls_config(cfg).map_err(ServerError::Transport)?;
384        }
385        let serve = tonic
386            .add_routes(routes)
387            .serve_with_incoming_shutdown(incoming, combined_shutdown);
388        tokio::pin!(serve);
389
390        tokio::select! {
391            biased;
392
393            watch_result = &mut watch_handle => {
394                let _ = cancel_tx.send(());
395                let _ = serve.await;
396                join_to_server_result(watch_result)
397            }
398            serve_result = &mut serve => {
399                watch_handle.abort();
400                serve_result?;
401                Ok(())
402            }
403        }
404    }
405}
406
407/// Convert a `JoinHandle` result into a `ServerError`-typed result.
408///
409/// - `Ok(Ok(()))` — task ended cleanly (driver stream closed). Caller decides
410///   whether this is normal (shutdown) or anomalous.
411/// - `Ok(Err(e))` — task returned an error. Forward verbatim.
412/// - `Err(JoinError)` — task was cancelled or panicked. Cancellation maps to
413///   Ok (we asked for it); panic maps to `WatchPanic` with payload.
414fn join_to_server_result(
415    join_result: Result<Result<(), ServerError>, tokio::task::JoinError>,
416) -> Result<(), ServerError> {
417    match join_result {
418        Ok(inner) => inner,
419        Err(join_err) if join_err.is_panic() => {
420            let payload = panic_payload_to_string(join_err.into_panic());
421            Err(ServerError::WatchPanic { payload })
422        }
423        Err(_cancelled) => Ok(()),
424    }
425}
426
427fn panic_payload_to_string(panic: Box<dyn std::any::Any + Send>) -> String {
428    if let Some(text) = panic.downcast_ref::<&'static str>() {
429        (*text).to_string()
430    } else if let Some(text) = panic.downcast_ref::<String>() {
431        text.clone()
432    } else {
433        "watch task panicked with non-string payload".to_string()
434    }
435}
436
437#[cfg(any(test, feature = "test-fakes"))]
438impl Server {
439    /// Test-only entry point for the leader-watch loop. Exposed to integration
440    /// tests via the `test-fakes` feature; not part of the stable public API.
441    #[doc(hidden)]
442    pub async fn run_leader_watch_for_tests(self: Arc<Self>) -> Result<(), ServerError> {
443        crate::fence::run_leader_watch(self).await
444    }
445
446    /// Test-only allocator probe. Issues a window grant against the current
447    /// in-memory state without going through the gRPC service. Used by
448    /// regression tests that need to observe the behavioral fence (no
449    /// timestamp at or below the prior leader's high-water) directly.
450    #[doc(hidden)]
451    pub fn try_grant_for_tests(&self, count: u32) -> Result<WindowGrant, CoreError> {
452        self.allocator.lock().try_grant(self.clock.now_ms(), count)
453    }
454}
455
456#[cfg(test)]
457mod tests {
458    use super::*;
459
460    #[test]
461    fn panic_payload_to_string_recovers_static_str() {
462        // `panic!("literal")` produces a `&'static str` payload; we want the
463        // verbatim text so operators see what the watch task said.
464        let payload: Box<dyn std::any::Any + Send> = Box::new("watch boom");
465        assert_eq!(panic_payload_to_string(payload), "watch boom");
466    }
467
468    #[test]
469    fn panic_payload_to_string_recovers_owned_string() {
470        // `panic!("{var}")` produces a `String` payload (formatted at panic
471        // time); the helper must downcast that branch too.
472        let payload: Box<dyn std::any::Any + Send> = Box::new(String::from("formatted"));
473        assert_eq!(panic_payload_to_string(payload), "formatted");
474    }
475
476    #[test]
477    fn panic_payload_to_string_falls_back_for_other_types() {
478        // Custom payloads (panic!(MyType { .. })) hit the catch-all branch.
479        struct Custom;
480        let payload: Box<dyn std::any::Any + Send> = Box::new(Custom);
481        assert_eq!(
482            panic_payload_to_string(payload),
483            "watch task panicked with non-string payload",
484        );
485    }
486
487    #[cfg(any(feature = "tls-rustls", feature = "tls-native"))]
488    #[test]
489    fn builder_stores_tls_config() {
490        use crate::test_fakes::InMemoryDriver;
491
492        let driver = Arc::new(InMemoryDriver::new());
493        let cfg = tonic::transport::ServerTlsConfig::new();
494        let server = Server::builder()
495            .consensus_driver(driver)
496            .tls_config(cfg)
497            .build()
498            .expect("build with tls_config must succeed");
499        assert!(
500            server.tls_config.is_some(),
501            "tls_config must be stored on Server"
502        );
503    }
504
505    #[tokio::test]
506    async fn join_to_server_result_passes_through_clean_outcome() {
507        // Ok(Ok(())) — task finished cleanly; forward verbatim.
508        let handle = tokio::spawn(async { Ok::<(), ServerError>(()) });
509        let join = handle.await;
510        assert!(matches!(join_to_server_result(join), Ok(())));
511    }
512
513    #[tokio::test]
514    async fn join_to_server_result_forwards_inner_error() {
515        // Ok(Err(e)) — task returned an error; forward it.
516        let handle = tokio::spawn(async {
517            Err::<(), ServerError>(ServerError::WatchPanic {
518                payload: "synthetic".into(),
519            })
520        });
521        let join = handle.await;
522        match join_to_server_result(join) {
523            Err(ServerError::WatchPanic { payload }) => assert_eq!(payload, "synthetic"),
524            other => panic!("expected forwarded WatchPanic, got {other:?}"),
525        }
526    }
527
528    #[tokio::test]
529    async fn join_to_server_result_translates_panic_to_watch_panic() {
530        // Err(JoinError::is_panic) — task panicked; surface as WatchPanic with
531        // the payload stringified by `panic_payload_to_string`.
532        let handle = tokio::spawn(async {
533            panic!("intentional");
534            #[allow(unreachable_code)]
535            Ok::<(), ServerError>(())
536        });
537        let join = handle.await;
538        match join_to_server_result(join) {
539            Err(ServerError::WatchPanic { payload }) => assert!(payload.contains("intentional")),
540            other => panic!("expected WatchPanic, got {other:?}"),
541        }
542    }
543
544    #[tokio::test]
545    async fn join_to_server_result_treats_cancellation_as_clean_exit() {
546        // Err(JoinError::is_cancelled) — caller aborted the task; we asked
547        // for that, so map to Ok.
548        let handle: tokio::task::JoinHandle<Result<(), ServerError>> =
549            tokio::spawn(async { futures::future::pending().await });
550        handle.abort();
551        let join = handle.await;
552        assert!(matches!(join_to_server_result(join), Ok(())));
553    }
554}