Skip to main content

tsoracle_server/
server.rs

1use core::time::Duration;
2use parking_lot::Mutex;
3use std::future::Future;
4use std::net::SocketAddr;
5use std::sync::Arc;
6use tokio::sync::watch;
7use tonic::service::Routes;
8use tonic::transport::Server as TonicServer;
9use tsoracle_consensus::ConsensusDriver;
10use tsoracle_core::{Allocator, Clock, SystemClock};
11#[cfg(any(test, feature = "test-fakes"))]
12use tsoracle_core::{CoreError, WindowGrant};
13use tsoracle_proto::v1::tso_service_server::TsoServiceServer;
14
15use crate::service::TsoServiceImpl;
16
17#[derive(Debug, thiserror::Error)]
18pub enum BuildError {
19    #[error("consensus_driver is required")]
20    MissingConsensusDriver,
21    /// Surfaced when [`crate::leader_hint::KEY`] fails [`crate::leader_hint::validate_key`].
22    /// Today the key is a valid `const &'static str`, so this variant is
23    /// developer-error insurance: a future edit that breaks the key triggers
24    /// a startup failure rather than silently stripping the trailer from
25    /// every NOT_LEADER response.
26    #[error("invalid leader-hint metadata key: {0}")]
27    InvalidLeaderHintKey(#[from] tonic::metadata::errors::InvalidMetadataKey),
28}
29
30#[derive(Debug, thiserror::Error)]
31pub enum ServerError {
32    #[error("transport: {0}")]
33    Transport(#[from] tonic::transport::Error),
34    #[error("consensus: {0}")]
35    Consensus(#[from] tsoracle_consensus::ConsensusError),
36    #[error("core: {0}")]
37    Core(#[from] tsoracle_core::CoreError),
38    /// The leader-watch task panicked. Distinct from a clean error return so
39    /// operators can tell "driver returned Err" (recoverable design) from
40    /// "task panicked" (programming bug).
41    #[error("leader-watch task panicked: {payload}")]
42    WatchPanic { payload: String },
43}
44
45#[derive(Clone, Debug)]
46pub enum ServingState {
47    NotServing { leader_endpoint: Option<String> },
48    Serving,
49}
50
51pub struct ServerBuilder {
52    consensus: Option<Arc<dyn ConsensusDriver>>,
53    clock: Option<Arc<dyn Clock>>,
54    window_ahead: Duration,
55    failover_advance: Duration,
56}
57
58impl Default for ServerBuilder {
59    fn default() -> Self {
60        ServerBuilder {
61            consensus: None,
62            clock: None,
63            window_ahead: Duration::from_secs(3),
64            failover_advance: Duration::from_secs(1),
65        }
66    }
67}
68
69impl ServerBuilder {
70    pub fn consensus_driver(mut self, driver: Arc<dyn ConsensusDriver>) -> Self {
71        self.consensus = Some(driver);
72        self
73    }
74    pub fn clock(mut self, clock: Arc<dyn Clock>) -> Self {
75        self.clock = Some(clock);
76        self
77    }
78    pub fn window_ahead(mut self, window_ahead: Duration) -> Self {
79        self.window_ahead = window_ahead;
80        self
81    }
82    pub fn failover_advance(mut self, failover_advance: Duration) -> Self {
83        self.failover_advance = failover_advance;
84        self
85    }
86    pub fn build(self) -> Result<Server, BuildError> {
87        crate::leader_hint::validate_key()?;
88        let consensus = self.consensus.ok_or(BuildError::MissingConsensusDriver)?;
89        let clock = self.clock.unwrap_or_else(|| Arc::new(SystemClock));
90        let (state_tx, state_rx) = watch::channel(ServingState::NotServing {
91            leader_endpoint: None,
92        });
93        Ok(Server {
94            consensus,
95            clock,
96            window_ahead: self.window_ahead,
97            failover_advance: self.failover_advance,
98            allocator: Arc::new(Mutex::new(Allocator::new())),
99            state_tx,
100            state_rx,
101            extension_lock: Arc::new(tokio::sync::Mutex::new(())),
102            extension_gate: Arc::new(tokio::sync::RwLock::new(())),
103        })
104    }
105}
106
107pub struct Server {
108    pub(crate) consensus: Arc<dyn ConsensusDriver>,
109    pub(crate) clock: Arc<dyn Clock>,
110    pub(crate) window_ahead: Duration,
111    pub(crate) failover_advance: Duration,
112    pub(crate) allocator: Arc<Mutex<Allocator>>,
113    pub(crate) state_tx: watch::Sender<ServingState>,
114    pub state_rx: watch::Receiver<ServingState>,
115    /// Serializes window extensions so a stampeding burst of `WindowExhausted`
116    /// requests resolves to a single `persist_high_water` round-trip. Acquired
117    /// before `extension_gate`; combined with a recheck-after-acquire inside
118    /// `extend_window`, latecomers find the window already extended and
119    /// return without contacting consensus.
120    pub(crate) extension_lock: Arc<tokio::sync::Mutex<()>>,
121    /// Read-locked by window-extension calls for the duration of their
122    /// prepare → persist → commit dance. The leader-watch task takes a
123    /// write lock between clearing serving and calling load_high_water,
124    /// which drains all in-flight extensions started under the prior epoch
125    /// before the fence proceeds.
126    pub(crate) extension_gate: Arc<tokio::sync::RwLock<()>>,
127}
128
129impl Server {
130    pub fn builder() -> ServerBuilder {
131        ServerBuilder::default()
132    }
133
134    /// Single transition API used in response to evidence that the current
135    /// epoch is no longer valid: consensus rejection (NotLeader/Fenced
136    /// during persist), leader-watch task termination, or other authoritative
137    /// signals of leadership loss.
138    ///
139    /// Clears the allocator BEFORE publishing `NotServing` so a racing
140    /// `try_grant` either (a) observes `CoreError::NotLeader` rather than
141    /// handing out a stale-epoch grant, or (b) succeeds against a still-valid
142    /// epoch and the *next* call observes `NotServing`. Either ordering is
143    /// safe; what is not safe is the inverse (publish first, clear later)
144    /// because a request between those two steps would see `Serving` AND a
145    /// still-leader allocator.
146    ///
147    /// Does NOT take `extension_gate.write()`. That would deadlock against
148    /// in-flight extensions currently holding the read lock and awaiting
149    /// `persist_high_water`. Those in-flights will either complete cleanly
150    /// (the next `try_grant` then sees `NotServing`) or fail with
151    /// NotLeader/Fenced (and reach this helper themselves — it is idempotent).
152    pub(crate) fn step_down_due_to_consensus_rejection(&self, leader_endpoint: Option<String>) {
153        self.allocator.lock().on_leadership_lost();
154        let _ = self
155            .state_tx
156            .send(ServingState::NotServing { leader_endpoint });
157    }
158}
159
160impl Server {
161    /// Return the configured `TsoServiceServer<TsoServiceImpl>` as a tonic
162    /// `Routes` value plus a `JoinHandle` for the spawned leader-watch task,
163    /// so callers can mount tsoracle's service alongside their own services
164    /// on a shared tonic listener instead of binding a dedicated port.
165    ///
166    /// The `JoinHandle` payload is `Result<(), ServerError>` so embedders
167    /// can observe leader-watch termination. Before returning an error, the
168    /// task publishes `ServingState::NotServing { leader_endpoint: None }`
169    /// so all subsequent RPCs fail fast with `FAILED_PRECONDITION` — even
170    /// embedders who never inspect the handle get fail-safe behavior.
171    ///
172    /// The `Server::serve()` method is a thin wrapper over this — it calls
173    /// `into_router`, builds a tonic `Server`, and binds a listener.
174    pub fn into_router(self) -> (Routes, tokio::task::JoinHandle<Result<(), ServerError>>) {
175        let server = Arc::new(self);
176
177        let watch_server = server.clone();
178        let watch_handle = tokio::spawn(async move {
179            use futures::FutureExt;
180            // catch_unwind so a panic in run_leader_watch still routes through
181            // the poisoning path. Without this, embedders who mount into_router
182            // directly and never observe the JoinHandle would see
183            // ServingState::Serving remain published while the watch task is
184            // dead — the inverse of the fail-safe guarantee documented above.
185            // The panic is re-raised after poisoning so serve / serve_with_*
186            // continue to translate it into ServerError::WatchPanic via
187            // join_to_server_result.
188            let outcome =
189                std::panic::AssertUnwindSafe(crate::fence::run_leader_watch(watch_server.clone()))
190                    .catch_unwind()
191                    .await;
192            match outcome {
193                Ok(result) => {
194                    if let Err(ref _e) = result {
195                        // Poison BEFORE returning so embedders who do not observe
196                        // the JoinHandle still get fail-safe behavior.
197                        watch_server.step_down_due_to_consensus_rejection(None);
198                        #[cfg(feature = "tracing")]
199                        tracing::error!(error = %_e, "leader-watch terminated; serving disabled");
200                    }
201                    result
202                }
203                Err(panic_payload) => {
204                    // Mirror the Err branch: poison BEFORE re-raising so
205                    // handle-dropping embedders still observe NotServing.
206                    watch_server.step_down_due_to_consensus_rejection(None);
207                    #[cfg(feature = "tracing")]
208                    tracing::error!("leader-watch panicked; serving disabled");
209                    std::panic::resume_unwind(panic_payload);
210                }
211            }
212        });
213
214        let service = TsoServiceImpl { server };
215        #[allow(unused_mut)]
216        let mut routes = Routes::new(TsoServiceServer::new(service));
217        #[cfg(feature = "reflection")]
218        {
219            #[expect(
220                clippy::expect_used,
221                reason = "`FILE_DESCRIPTOR_SET` is generated by `tsoracle-proto`'s `build.rs` from checked-in `.proto` sources; if it ever fails to decode, the build itself is broken. Tracked by #9."
222            )]
223            let reflection = tonic_reflection::server::Builder::configure()
224                .register_encoded_file_descriptor_set(tsoracle_proto::FILE_DESCRIPTOR_SET)
225                .build_v1()
226                .expect("FILE_DESCRIPTOR_SET emitted by build.rs is always valid");
227            routes = routes.add_service(reflection);
228        }
229        (routes, watch_handle)
230    }
231
232    pub async fn serve(self, addr: SocketAddr) -> Result<(), ServerError> {
233        self.serve_with_shutdown(addr, futures::future::pending())
234            .await
235    }
236
237    /// Run the gRPC server until either the caller's `shutdown` fires or the
238    /// leader-watch task terminates.
239    ///
240    /// Three outcomes:
241    /// 1. `shutdown` fires first → tonic drains in-flights and returns Ok.
242    ///    The watch handle is aborted; any error it had been about to return
243    ///    is forfeited (the process is shutting down anyway).
244    /// 2. Watch returns `Ok(Err(e))` → poisoned state is already published;
245    ///    `cancel_tx` triggers tonic's graceful shutdown; in-flight `GetTs`
246    ///    calls whose `try_grant` already succeeded complete with the
247    ///    timestamps they were allocated; new calls fail fast. Returns `Err(e)`.
248    /// 3. Watch task panics → returns `Err(ServerError::WatchPanic{..})`
249    ///    with the panic payload stringified. Same drain semantics as (2).
250    pub async fn serve_with_shutdown(
251        self,
252        addr: SocketAddr,
253        shutdown: impl Future<Output = ()> + Send + 'static,
254    ) -> Result<(), ServerError> {
255        let (routes, mut watch_handle) = self.into_router();
256        let (cancel_tx, cancel_rx) = tokio::sync::oneshot::channel::<()>();
257
258        // tonic stops when EITHER the user's shutdown fires OR we cancel
259        // because the watch task terminated.
260        let combined_shutdown = async move {
261            tokio::select! {
262                _ = shutdown => {}
263                _ = cancel_rx => {}
264            }
265        };
266
267        let serve = TonicServer::builder()
268            .add_routes(routes)
269            .serve_with_shutdown(addr, combined_shutdown);
270        tokio::pin!(serve);
271
272        tokio::select! {
273            // Bias toward the watch arm: if both are ready in the same poll
274            // (rare but possible — graceful shutdown completed in the same
275            // tick the watch returned), we want to surface the watch error
276            // rather than report a clean shutdown.
277            biased;
278
279            watch_result = &mut watch_handle => {
280                // Watch terminated. State is already poisoned (see watch
281                // task body in into_router). Trigger tonic drain and wait
282                // for it to finish, then report the watch's outcome.
283                let _ = cancel_tx.send(());
284                let _ = serve.await;
285                join_to_server_result(watch_result)
286            }
287            serve_result = &mut serve => {
288                // User shutdown fired (or our cancel — but watch arm has
289                // `biased` priority, so reaching here means user shutdown).
290                // The watch task may still be running; aborting it loses
291                // any error it was about to report, but the process is
292                // shutting down so that's acceptable.
293                watch_handle.abort();
294                serve_result?;
295                Ok(())
296            }
297        }
298    }
299
300    /// Run the gRPC server on a caller-provided `TcpListener` until either
301    /// the caller-provided `shutdown` fires or the leader-watch task terminates.
302    ///
303    /// Use this instead of [`Self::serve_with_shutdown`] when you need to
304    /// observe the OS-picked port (`127.0.0.1:0`) before clients connect, or
305    /// when you want to wrap the listener in an outer adapter before passing it
306    /// in. The listening socket is owned by the caller and passed here; tsoracle
307    /// starts accepting on it immediately.
308    ///
309    /// Three outcomes:
310    /// 1. `shutdown` fires first → tonic drains in-flights and returns `Ok`.
311    ///    The watch handle is aborted; any error it had been about to return
312    ///    is forfeited (the process is shutting down anyway).
313    /// 2. Watch returns `Ok(Err(e))` → poisoned state is already published;
314    ///    the caller-provided shutdown is cancelled internally so tonic begins
315    ///    graceful shutdown; in-flight `GetTs` calls whose `try_grant` already
316    ///    succeeded complete with the timestamps they were allocated; new calls
317    ///    fail fast. Returns `Err(e)`.
318    /// 3. Watch task panics → returns `Err(ServerError::WatchPanic{..})`
319    ///    with the panic payload stringified. Same drain semantics as (2).
320    pub async fn serve_with_listener(
321        self,
322        listener: tokio::net::TcpListener,
323        shutdown: impl Future<Output = ()> + Send + 'static,
324    ) -> Result<(), ServerError> {
325        let (routes, mut watch_handle) = self.into_router();
326        let (cancel_tx, cancel_rx) = tokio::sync::oneshot::channel::<()>();
327
328        let combined_shutdown = async move {
329            tokio::select! {
330                _ = shutdown => {}
331                _ = cancel_rx => {}
332            }
333        };
334
335        let incoming = tonic::transport::server::TcpIncoming::from(listener);
336
337        let serve = TonicServer::builder()
338            .add_routes(routes)
339            .serve_with_incoming_shutdown(incoming, combined_shutdown);
340        tokio::pin!(serve);
341
342        tokio::select! {
343            biased;
344
345            watch_result = &mut watch_handle => {
346                let _ = cancel_tx.send(());
347                let _ = serve.await;
348                join_to_server_result(watch_result)
349            }
350            serve_result = &mut serve => {
351                watch_handle.abort();
352                serve_result?;
353                Ok(())
354            }
355        }
356    }
357}
358
359/// Convert a `JoinHandle` result into a `ServerError`-typed result.
360///
361/// - `Ok(Ok(()))` — task ended cleanly (driver stream closed). Caller decides
362///   whether this is normal (shutdown) or anomalous.
363/// - `Ok(Err(e))` — task returned an error. Forward verbatim.
364/// - `Err(JoinError)` — task was cancelled or panicked. Cancellation maps to
365///   Ok (we asked for it); panic maps to `WatchPanic` with payload.
366fn join_to_server_result(
367    join_result: Result<Result<(), ServerError>, tokio::task::JoinError>,
368) -> Result<(), ServerError> {
369    match join_result {
370        Ok(inner) => inner,
371        Err(join_err) if join_err.is_panic() => {
372            let payload = panic_payload_to_string(join_err.into_panic());
373            Err(ServerError::WatchPanic { payload })
374        }
375        Err(_cancelled) => Ok(()),
376    }
377}
378
379fn panic_payload_to_string(panic: Box<dyn std::any::Any + Send>) -> String {
380    if let Some(text) = panic.downcast_ref::<&'static str>() {
381        (*text).to_string()
382    } else if let Some(text) = panic.downcast_ref::<String>() {
383        text.clone()
384    } else {
385        "watch task panicked with non-string payload".to_string()
386    }
387}
388
389#[cfg(any(test, feature = "test-fakes"))]
390impl Server {
391    /// Test-only entry point for the leader-watch loop. Exposed to integration
392    /// tests via the `test-fakes` feature; not part of the stable public API.
393    #[doc(hidden)]
394    pub async fn run_leader_watch_for_tests(self: Arc<Self>) -> Result<(), ServerError> {
395        crate::fence::run_leader_watch(self).await
396    }
397
398    /// Test-only allocator probe. Issues a window grant against the current
399    /// in-memory state without going through the gRPC service. Used by
400    /// regression tests that need to observe the behavioral fence (no
401    /// timestamp at or below the prior leader's high-water) directly.
402    #[doc(hidden)]
403    pub fn try_grant_for_tests(&self, count: u32) -> Result<WindowGrant, CoreError> {
404        self.allocator.lock().try_grant(self.clock.now_ms(), count)
405    }
406}