Skip to main content

tsoracle_server/
server.rs

1//
2//  ░▀█▀░█▀▀░█▀█░█▀▄░█▀█░█▀▀░█░░░█▀▀
3//  ░░█░░▀▀█░█░█░█▀▄░█▀█░█░░░█░░░█▀▀
4//  ░░▀░░▀▀▀░▀▀▀░▀░▀░▀░▀░▀▀▀░▀▀▀░▀▀▀
5//
6//  tsoracle — Distributed Timestamp Oracle
7//  https://www.tsoracle.rs
8//
9//  Copyright (c) 2026 Prisma Risk
10//
11//  Licensed under the Apache License, Version 2.0 (the "License");
12//  you may not use this file except in compliance with the License.
13//  You may obtain a copy of the License at
14//
15//      https://www.apache.org/licenses/LICENSE-2.0
16//
17//  Unless required by applicable law or agreed to in writing, software
18//  distributed under the License is distributed on an "AS IS" BASIS,
19//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20//  See the License for the specific language governing permissions and
21//  limitations under the License.
22//
23
24use core::time::Duration;
25use std::future::Future;
26use std::net::SocketAddr;
27use std::sync::Arc;
28use tokio::sync::watch;
29use tonic::service::Routes;
30use tonic::transport::Server as TonicServer;
31use tsoracle_consensus::ConsensusDriver;
32#[cfg(any(test, feature = "test-fakes"))]
33use tsoracle_core::{CoreError, WindowGrant};
34use tsoracle_core::{Epoch, PeerEndpoint};
35use tsoracle_proto::v1::tso_service_server::TsoServiceServer;
36
37use crate::bt::Bt;
38use crate::clock::{Clock, SystemClock};
39use crate::service::TsoServiceImpl;
40use crate::serving_core::ServingCore;
41
42#[derive(Debug, thiserror::Error)]
43pub enum BuildError {
44    #[error("consensus_driver is required")]
45    MissingConsensusDriver,
46}
47
48#[derive(Debug, thiserror::Error)]
49pub enum ServerError {
50    #[error("transport: {0}")]
51    Transport(#[from] tonic::transport::Error),
52    #[error("consensus: {0}")]
53    Consensus(#[from] tsoracle_consensus::ConsensusError),
54    #[error("core: {0}")]
55    Core(#[from] tsoracle_core::CoreError),
56    /// The leader-watch task panicked. Distinct from a clean error return so
57    /// operators can tell "driver returned Err" (recoverable design) from
58    /// "task panicked" (programming bug).
59    #[error("leader-watch task panicked: {payload}{bt}")]
60    WatchPanic { payload: String, bt: Bt },
61    /// The consensus driver's `leadership_events()` stream ended cleanly while
62    /// the leader-watch task was running. The stream is contracted to live for
63    /// the life of the server, so its end is anomalous (driver shutdown, lost
64    /// session, etc.) — distinct from a `Consensus` error returned mid-fence.
65    /// The watch task publishes `ServingState::NotServing` before returning
66    /// this variant so embedders who never observe the [`WatchGuard`] still get
67    /// the documented fail-safe behavior.
68    #[error("consensus driver leadership stream closed")]
69    WatchStreamClosed,
70    /// The embedded protobuf descriptor set failed to decode while building the
71    /// gRPC reflection service. `tsoracle-proto`'s `build.rs` emits these bytes
72    /// from checked-in `.proto` sources, so a failure here signals build-artifact
73    /// drift (a corrupt or stale descriptor) rather than a runtime condition —
74    /// surfaced as a diagnosable startup error instead of a process panic.
75    #[cfg(feature = "reflection")]
76    #[error("failed to build gRPC reflection service from embedded descriptor set: {0}")]
77    ReflectionInit(#[source] tonic_reflection::server::Error),
78}
79
80#[derive(Clone, Debug)]
81pub enum ServingState {
82    NotServing {
83        leader_endpoint: Option<PeerEndpoint>,
84        leader_epoch: Option<Epoch>,
85    },
86    Serving,
87}
88
89/// Default bound on how long a graceful shutdown waits for the leader-watch
90/// task to stop cooperatively before forcibly aborting it. The abort is a
91/// last-resort safety net for a consensus driver whose `load_high_water` /
92/// `persist_high_water` is wedged (the trait places no latency bound; see
93/// [`ConsensusDriver`]). Chosen to sit comfortably under a typical Kubernetes
94/// `terminationGracePeriodSeconds` (30s) so the abort, the tonic drain, and
95/// process exit all complete before the kubelet escalates to SIGKILL.
96const DEFAULT_SHUTDOWN_GRACE: Duration = Duration::from_secs(10);
97
98pub struct ServerBuilder {
99    consensus: Option<Arc<dyn ConsensusDriver>>,
100    clock: Option<Arc<dyn Clock>>,
101    window_ahead: Duration,
102    failover_advance: Duration,
103    shutdown_grace: Duration,
104    heartbeat_interval: Duration,
105    #[cfg(any(feature = "tls-rustls", feature = "tls-native"))]
106    tls_config: Option<tonic::transport::ServerTlsConfig>,
107}
108
109impl Default for ServerBuilder {
110    fn default() -> Self {
111        ServerBuilder {
112            consensus: None,
113            clock: None,
114            window_ahead: Duration::from_secs(3),
115            failover_advance: Duration::from_secs(1),
116            shutdown_grace: DEFAULT_SHUTDOWN_GRACE,
117            heartbeat_interval: Duration::from_secs(10),
118            #[cfg(any(feature = "tls-rustls", feature = "tls-native"))]
119            tls_config: None,
120        }
121    }
122}
123
124impl ServerBuilder {
125    pub fn consensus_driver(mut self, driver: Arc<dyn ConsensusDriver>) -> Self {
126        self.consensus = Some(driver);
127        self
128    }
129    pub fn clock(mut self, clock: Arc<dyn Clock>) -> Self {
130        self.clock = Some(clock);
131        self
132    }
133    pub fn window_ahead(mut self, window_ahead: Duration) -> Self {
134        self.window_ahead = window_ahead;
135        self
136    }
137    pub fn failover_advance(mut self, failover_advance: Duration) -> Self {
138        self.failover_advance = failover_advance;
139        self
140    }
141
142    /// Bound on how long a graceful shutdown waits for the leader-watch task to
143    /// stop cooperatively before forcibly aborting it.
144    ///
145    /// On shutdown the server drops the watch task's cancel signal and waits for
146    /// it to publish `NotServing` and return. That wait is normally
147    /// near-instant, but the task observes cancellation only at its `select!`
148    /// boundaries — never inside a fence attempt. A consensus driver whose
149    /// `load_high_water` / `persist_high_water` never returns (the trait places
150    /// no latency bound) would otherwise park the task mid-fence and block
151    /// process exit indefinitely, leading to a SIGKILL on a Kubernetes drain.
152    /// Once `shutdown_grace` elapses the server aborts the task so exit always
153    /// makes progress. Set this comfortably below your deployment's
154    /// `terminationGracePeriodSeconds`. Defaults to 10s. A value of zero aborts
155    /// immediately without waiting for a cooperative stop.
156    pub fn shutdown_grace(mut self, shutdown_grace: Duration) -> Self {
157        self.shutdown_grace = shutdown_grace;
158        self
159    }
160
161    /// Interval between heartbeat log lines emitted at `target = "tsoracle::heartbeat"`.
162    /// Defaults to 10 seconds. Pass `Duration::ZERO` to disable the heartbeat task entirely.
163    ///
164    /// The heartbeat surfaces serving role, current epoch, requests served,
165    /// timestamps issued, and key error counters every interval — proof-of-life
166    /// for production deployments that may not have a metrics exporter installed.
167    ///
168    /// Requires `feature = "tracing"` to emit output; with `tracing` off the
169    /// setter is accepted but no task is spawned (no subscriber to log to).
170    pub fn heartbeat_interval(mut self, interval: Duration) -> Self {
171        self.heartbeat_interval = interval;
172        self
173    }
174
175    /// Configure TLS termination for this server. Applied inside
176    /// [`Server::serve`], [`Server::serve_with_shutdown`], and
177    /// [`Server::serve_with_listener`]. Not applied to [`Server::into_router`] —
178    /// embedders mounting tsoracle alongside their own services control TLS
179    /// on their own tonic builder.
180    #[cfg(any(feature = "tls-rustls", feature = "tls-native"))]
181    pub fn tls_config(mut self, cfg: tonic::transport::ServerTlsConfig) -> Self {
182        self.tls_config = Some(cfg);
183        self
184    }
185
186    pub fn build(self) -> Result<Server, BuildError> {
187        let consensus = self.consensus.ok_or(BuildError::MissingConsensusDriver)?;
188        let clock = self.clock.unwrap_or_else(|| Arc::new(SystemClock));
189        Ok(Server {
190            consensus,
191            clock,
192            window_ahead: self.window_ahead,
193            failover_advance: self.failover_advance,
194            shutdown_grace: self.shutdown_grace,
195            heartbeat_interval: self.heartbeat_interval,
196            core: Arc::new(ServingCore::new(self.window_ahead)),
197            reporter: Arc::new(crate::reporter::Reporter::new()),
198            #[cfg(any(feature = "tls-rustls", feature = "tls-native"))]
199            tls_config: self.tls_config,
200        })
201    }
202}
203
204pub struct Server {
205    pub(crate) consensus: Arc<dyn ConsensusDriver>,
206    pub(crate) clock: Arc<dyn Clock>,
207    pub(crate) window_ahead: Duration,
208    pub(crate) failover_advance: Duration,
209    /// Bound on the graceful-shutdown wait for the leader-watch task before a
210    /// forced abort. See [`ServerBuilder::shutdown_grace`].
211    pub(crate) shutdown_grace: Duration,
212    /// Interval between periodic heartbeat log lines. See [`ServerBuilder::heartbeat_interval`].
213    ///
214    /// The only reader is the `cfg(feature = "tracing")` spawn block in
215    /// `into_router_parts`; without `tracing` there is no subscriber to log to
216    /// and the spawn arm is compiled out, so the field is genuinely unread.
217    #[cfg_attr(not(feature = "tracing"), allow(dead_code))]
218    pub(crate) heartbeat_interval: Duration,
219    /// Owns the allocator, serving-state channel, and both extension locks, with
220    /// the lock-ordering and step-down invariants private behind its methods.
221    ///
222    /// Held behind an `Arc` so the leader-watch task, the gRPC service, and the
223    /// [`WatchGuard`] / [`serve_inner`] shutdown paths can all reach the same
224    /// core. The guard and the serve loop use their clone to close the serving
225    /// gate *synchronously* at shutdown, leaving the watch task's later
226    /// `step_down` a harmless idempotent repeat.
227    pub(crate) core: Arc<ServingCore>,
228    pub(crate) reporter: Arc<crate::reporter::Reporter>,
229    #[cfg(any(feature = "tls-rustls", feature = "tls-native"))]
230    pub(crate) tls_config: Option<tonic::transport::ServerTlsConfig>,
231}
232
233/// Raw parts produced by [`Server::into_router_parts`]: the gRPC `Routes`, the
234/// leader-watch task's cooperative-cancel sender (dropping it stops the task),
235/// the task's join handle, and the optional heartbeat task's cancel sender /
236/// join handle. [`Server::into_router`] wraps these into a [`WatchGuard`]; the
237/// `serve_*` methods consume them directly via [`serve_inner`].
238///
239/// The heartbeat fields are `None` when the heartbeat is disabled — either by
240/// `ServerBuilder::heartbeat_interval(Duration::ZERO)` or by building without
241/// the `tracing` feature (there is no subscriber to log to).
242pub(crate) struct RouterParts {
243    pub routes: Routes,
244    pub cancel_tx: tokio::sync::oneshot::Sender<()>,
245    pub watch_handle: tokio::task::JoinHandle<Result<(), ServerError>>,
246    pub heartbeat_cancel_tx: Option<tokio::sync::oneshot::Sender<()>>,
247    pub heartbeat_handle: Option<tokio::task::JoinHandle<()>>,
248}
249
250impl Server {
251    pub fn builder() -> ServerBuilder {
252        ServerBuilder::default()
253    }
254
255    /// Subscribe to serving-state transitions.
256    ///
257    /// Returns a fresh `watch::Receiver` observing the same `ServingState`
258    /// the server publishes as leadership comes and goes. Embedders use this
259    /// to gate their own startup on `ServingState::Serving` (see the
260    /// `embedded_router` and piggyback examples). Because `into_router`
261    /// consumes the `Server`, capture the receiver before mounting.
262    ///
263    /// This method is the stable observation API: the receiver is minted from
264    /// the server's `watch::Sender`, so the receiver's type can evolve (e.g. a
265    /// future newtype around `ServingState`) without breaking embedders that
266    /// go through it.
267    pub fn subscribe(&self) -> watch::Receiver<ServingState> {
268        self.core.subscribe()
269    }
270}
271
272impl Server {
273    /// Return the configured `TsoServiceServer<TsoServiceImpl>` as a tonic
274    /// `Routes` value plus a [`WatchGuard`] for the spawned leader-watch task,
275    /// so callers can mount tsoracle's service alongside their own services
276    /// on a shared tonic listener instead of binding a dedicated port.
277    ///
278    /// The returned [`WatchGuard`] owns the leader-watch task. **Keep it alive
279    /// for as long as the mounted `Routes` should serve**: the watch task holds
280    /// an `Arc<Server>` (and the consensus driver) and maintains serving state
281    /// across leadership transitions. Dropping the guard — or calling
282    /// [`WatchGuard::shutdown`] — cooperatively stops the task at the embedder's
283    /// own shutdown. Without the guard the task would keep `Arc<Server>` alive
284    /// until the leadership stream happened to close.
285    ///
286    /// Every termination of the task — cooperative cancellation, driver error,
287    /// panic, or clean EOF on the leadership stream (surfaced as
288    /// `ServerError::WatchStreamClosed`) — publishes
289    /// `ServingState::NotServing { leader_endpoint: None }` before returning, so
290    /// all subsequent RPCs fail fast with `FAILED_PRECONDITION`. Embedders who
291    /// drop the guard without awaiting still get fail-safe behavior.
292    ///
293    /// The `Server::serve()` method is a thin wrapper over this — it calls
294    /// `into_router`, builds a tonic `Server`, and binds a listener.
295    ///
296    /// Returns `Err(ServerError::ReflectionInit)` (only reachable under the
297    /// `reflection` feature) if the embedded descriptor set fails to decode.
298    /// That decode happens before the leader-watch task is spawned, so a failure
299    /// leaves nothing running to clean up.
300    pub fn into_router(self) -> Result<(Routes, WatchGuard), ServerError> {
301        // Read the `Copy` grace before `into_router_parts` consumes `self`, so
302        // the returned guard can bound its own shutdown wait identically to the
303        // `serve_*` paths.
304        let shutdown_grace = self.shutdown_grace;
305        // Clone the shared core and reporter before `into_router_parts` consumes
306        // `self`, so the guard can close the serving gate synchronously on drop /
307        // shutdown rather than relying on the watch task's later publish, and can
308        // record the shutdown_watch_aborted counter if the grace-bounded reap fires.
309        let core = self.core.clone();
310        let reporter = self.reporter.clone();
311        let parts = self.into_router_parts()?;
312        Ok((
313            parts.routes,
314            WatchGuard {
315                cancel_tx: Some(parts.cancel_tx),
316                handle: Some(parts.watch_handle),
317                shutdown_grace,
318                core,
319                reporter,
320                heartbeat_cancel_tx: parts.heartbeat_cancel_tx,
321                heartbeat_handle: parts.heartbeat_handle,
322            },
323        ))
324    }
325
326    /// Spawn the leader-watch task and assemble the gRPC `Routes`, returning
327    /// the raw parts: the routes, the task's cooperative-cancel sender, and its
328    /// `JoinHandle`. [`Self::into_router`] wraps these into a [`WatchGuard`] for
329    /// embedders; the `serve_*` methods drive the parts directly via
330    /// [`serve_inner`], so neither path needs to unwrap the guard's `Option`
331    /// fields.
332    fn into_router_parts(self) -> Result<RouterParts, ServerError> {
333        // Build the reflection service first: a descriptor-decode failure must
334        // surface before we spawn the leader-watch task below, so an error path
335        // never leaks a running task.
336        #[cfg(feature = "reflection")]
337        let reflection = build_reflection_service(tsoracle_proto::FILE_DESCRIPTOR_SET)?;
338
339        let server = Arc::new(self);
340
341        // Cooperative cancellation channel. The `WatchGuard` holds the sender;
342        // the task's `cancel` future resolves on either an explicit send or a
343        // sender drop, so dropping the guard is sufficient to stop the task.
344        let (cancel_tx, cancel_rx) = tokio::sync::oneshot::channel::<()>();
345
346        let watch_server = server.clone();
347        let watch_handle = tokio::spawn(async move {
348            use futures::FutureExt;
349            // Resolves when the WatchGuard signals cancellation or is dropped.
350            let cancel = async move {
351                let _ = cancel_rx.await;
352            };
353            // catch_unwind so a panic in run_leader_watch still routes through
354            // the poisoning path. Without this, embedders who mount into_router
355            // directly and never observe the guard would see
356            // ServingState::Serving remain published while the watch task is
357            // dead — the inverse of the fail-safe guarantee documented above.
358            // The panic is re-raised after poisoning so serve / serve_with_*
359            // continue to translate it into ServerError::WatchPanic via
360            // join_to_server_result.
361            let outcome = std::panic::AssertUnwindSafe(crate::fence::run_leader_watch(
362                watch_server.clone(),
363                cancel,
364            ))
365            .catch_unwind()
366            .await;
367            match outcome {
368                Ok(result) => {
369                    if let Err(ref _e) = result {
370                        // Poison BEFORE returning so embedders who do not observe
371                        // the guard still get fail-safe behavior.
372                        watch_server.core.step_down(None, None);
373                        #[cfg(feature = "tracing")]
374                        tracing::error!(error = %_e, "leader-watch terminated; serving disabled");
375                    }
376                    result
377                }
378                Err(panic_payload) => {
379                    // Mirror the Err branch: poison BEFORE re-raising so
380                    // guard-dropping embedders still observe NotServing.
381                    watch_server.core.step_down(None, None);
382                    #[cfg(feature = "tracing")]
383                    tracing::error!("leader-watch panicked; serving disabled");
384                    std::panic::resume_unwind(panic_payload);
385                }
386            }
387        });
388
389        // Spawn the heartbeat task, if enabled. Gated on `feature = "tracing"`
390        // because the heartbeat module is only compiled with `tracing`
391        // (no subscriber to emit to without it) — and on a non-zero interval,
392        // since `Duration::ZERO` is the documented opt-out.
393        //
394        // The task body is wrapped in `AssertUnwindSafe(...).catch_unwind()`
395        // mirroring the leader-watch spawn above: on panic we bump the
396        // `heartbeat_task_panicked` counter and log at error level, then let
397        // the task end (no restart — the heartbeat is observability, not
398        // correctness, so a panicked task must not be allowed to thrash).
399        let (heartbeat_cancel_tx, heartbeat_handle) = {
400            #[cfg(feature = "tracing")]
401            {
402                if server.heartbeat_interval.is_zero() {
403                    (None, None)
404                } else {
405                    use futures::FutureExt;
406                    let (htx, hrx) = tokio::sync::oneshot::channel::<()>();
407                    let hb_reporter = server.reporter.clone();
408                    let hb_core = server.core.clone();
409                    let hb_interval = server.heartbeat_interval;
410                    let handle = tokio::spawn(async move {
411                        let outcome =
412                            std::panic::AssertUnwindSafe(crate::heartbeat::run_heartbeat(
413                                hb_interval,
414                                hb_core,
415                                hb_reporter.clone(),
416                                hrx,
417                            ))
418                            .catch_unwind()
419                            .await;
420                        if outcome.is_err() {
421                            hb_reporter.heartbeat_task_panicked.increment(1);
422                            tracing::error!(
423                                target: "tsoracle::heartbeat",
424                                "heartbeat task panicked; liveness logs disabled until restart"
425                            );
426                        }
427                    });
428                    (Some(htx), Some(handle))
429                }
430            }
431            #[cfg(not(feature = "tracing"))]
432            {
433                (None, None)
434            }
435        };
436
437        let service = TsoServiceImpl { server };
438        #[allow(unused_mut)]
439        let mut routes = Routes::new(TsoServiceServer::new(service));
440        #[cfg(feature = "reflection")]
441        {
442            routes = routes.add_service(reflection);
443        }
444        Ok(RouterParts {
445            routes,
446            cancel_tx,
447            watch_handle,
448            heartbeat_cancel_tx,
449            heartbeat_handle,
450        })
451    }
452
453    pub async fn serve(self, addr: SocketAddr) -> Result<(), ServerError> {
454        self.serve_with_shutdown(addr, futures::future::pending())
455            .await
456    }
457
458    /// Run the gRPC server until either the caller's `shutdown` fires or the
459    /// leader-watch task terminates.
460    ///
461    /// Three outcomes:
462    /// 1. `shutdown` fires first → tonic drains in-flights and returns Ok.
463    ///    The watch task is then stopped cooperatively, bounded by
464    ///    `shutdown_grace` and forcibly aborted if it overruns (e.g. parked in a
465    ///    wedged consensus-driver call); any error it had been about to return
466    ///    is forfeited (the process is shutting down anyway).
467    /// 2. Watch returns `Ok(Err(e))` → poisoned state is already published;
468    ///    `cancel_tx` triggers tonic's graceful shutdown; in-flight `GetTs`
469    ///    calls whose `try_grant` already succeeded complete with the
470    ///    timestamps they were allocated; new calls fail fast. Returns `Err(e)`
471    ///    — the watch error wins even if the drain itself also errors (see
472    ///    `combine_watch_and_drain`); a drain error is surfaced only when the
473    ///    watch ended cleanly.
474    /// 3. Watch task panics → returns `Err(ServerError::WatchPanic{..})`
475    ///    with the panic payload stringified. Same drain semantics as (2).
476    pub async fn serve_with_shutdown(
477        self,
478        addr: SocketAddr,
479        shutdown: impl Future<Output = ()> + Send + 'static,
480    ) -> Result<(), ServerError> {
481        #[cfg(any(feature = "tls-rustls", feature = "tls-native"))]
482        let tls_config = self.tls_config.clone();
483
484        // Read the `Copy` grace and clone the shared core and reporter before
485        // `into_router_parts` consumes `self`.
486        let shutdown_grace = self.shutdown_grace;
487        let core = self.core.clone();
488        let reporter = self.reporter.clone();
489        let parts = self.into_router_parts()?;
490        let (combined_shutdown, cancel_tx) = combined_shutdown_with_cancel(shutdown);
491
492        let mut tonic = TonicServer::builder();
493        #[cfg(any(feature = "tls-rustls", feature = "tls-native"))]
494        if let Some(cfg) = tls_config {
495            tonic = tonic.tls_config(cfg).map_err(ServerError::Transport)?;
496        }
497        let serve = tonic
498            .add_routes(parts.routes)
499            .serve_with_shutdown(addr, combined_shutdown);
500
501        serve_inner(
502            parts.cancel_tx,
503            parts.watch_handle,
504            parts.heartbeat_cancel_tx,
505            parts.heartbeat_handle,
506            serve,
507            cancel_tx,
508            shutdown_grace,
509            core,
510            reporter,
511        )
512        .await
513    }
514
515    /// Run the gRPC server on a caller-provided `TcpListener` until either
516    /// the caller-provided `shutdown` fires or the leader-watch task terminates.
517    ///
518    /// Use this instead of [`Self::serve_with_shutdown`] when you need to
519    /// observe the OS-picked port (`127.0.0.1:0`) before clients connect, or
520    /// when you want to wrap the listener in an outer adapter before passing it
521    /// in. The listening socket is owned by the caller and passed here; tsoracle
522    /// starts accepting on it immediately.
523    ///
524    /// Three outcomes:
525    /// 1. `shutdown` fires first → tonic drains in-flights and returns `Ok`.
526    ///    The watch handle is aborted; any error it had been about to return
527    ///    is forfeited (the process is shutting down anyway).
528    /// 2. Watch returns `Ok(Err(e))` → poisoned state is already published;
529    ///    the caller-provided shutdown is cancelled internally so tonic begins
530    ///    graceful shutdown; in-flight `GetTs` calls whose `try_grant` already
531    ///    succeeded complete with the timestamps they were allocated; new calls
532    ///    fail fast. Returns `Err(e)` — the watch error wins even if the drain
533    ///    itself also errors (see `combine_watch_and_drain`); a drain error is
534    ///    surfaced only when the watch ended cleanly.
535    /// 3. Watch task panics → returns `Err(ServerError::WatchPanic{..})`
536    ///    with the panic payload stringified. Same drain semantics as (2).
537    pub async fn serve_with_listener(
538        self,
539        listener: tokio::net::TcpListener,
540        shutdown: impl Future<Output = ()> + Send + 'static,
541    ) -> Result<(), ServerError> {
542        #[cfg(any(feature = "tls-rustls", feature = "tls-native"))]
543        let tls_config = self.tls_config.clone();
544
545        // Read the `Copy` grace and clone the shared core and reporter before
546        // `into_router_parts` consumes `self`.
547        let shutdown_grace = self.shutdown_grace;
548        let core = self.core.clone();
549        let reporter = self.reporter.clone();
550        let parts = self.into_router_parts()?;
551        let (combined_shutdown, cancel_tx) = combined_shutdown_with_cancel(shutdown);
552
553        let incoming = tonic::transport::server::TcpIncoming::from(listener);
554
555        let mut tonic = TonicServer::builder();
556        #[cfg(any(feature = "tls-rustls", feature = "tls-native"))]
557        if let Some(cfg) = tls_config {
558            tonic = tonic.tls_config(cfg).map_err(ServerError::Transport)?;
559        }
560        let serve = tonic
561            .add_routes(parts.routes)
562            .serve_with_incoming_shutdown(incoming, combined_shutdown);
563
564        serve_inner(
565            parts.cancel_tx,
566            parts.watch_handle,
567            parts.heartbeat_cancel_tx,
568            parts.heartbeat_handle,
569            serve,
570            cancel_tx,
571            shutdown_grace,
572            core,
573            reporter,
574        )
575        .await
576    }
577}
578
579/// RAII handle to the leader-watch task spawned by [`Server::into_router`].
580///
581/// The watch task holds an `Arc<Server>` (and thus the consensus driver) and
582/// maintains serving state across leadership transitions. This guard ties the
583/// task's lifetime to the guard's: dropping it cooperatively cancels the task,
584/// and the task publishes [`ServingState::NotServing`] before it stops, so any
585/// `Routes` an embedder still has mounted fails subsequent RPCs fast.
586///
587/// Cancellation is cooperative — the task stops at its next await boundary and
588/// never mid-fence, so it is never torn down while holding internal locks, in
589/// contrast to a raw [`tokio::task::JoinHandle::abort`].
590pub struct WatchGuard {
591    // `Option` so `Drop` and the consuming `shutdown` / `abort` methods can
592    // each take a field without a partial-move conflict against the `Drop`
593    // impl. Dropping the sender (rather than sending) is itself the cancel
594    // signal: the task's `cancel` future resolves on sender-drop too.
595    cancel_tx: Option<tokio::sync::oneshot::Sender<()>>,
596    handle: Option<tokio::task::JoinHandle<Result<(), ServerError>>>,
597    /// Bound on the cooperative-stop wait in [`Self::shutdown`] before a forced
598    /// abort. Inherited from [`ServerBuilder::shutdown_grace`].
599    shutdown_grace: Duration,
600    /// Shared serving core, cloned from the `Server`. Lets `Drop` (and the
601    /// consuming `shutdown` / `abort`, which trigger `Drop` on return) close the
602    /// serving gate synchronously at the drop site, instead of waiting for the
603    /// watch task to observe cancellation and publish `NotServing` on its own
604    /// timeline — a window during which the fast gate would still admit RPCs.
605    core: Arc<ServingCore>,
606    /// Metrics reporter, cloned from the `Server`. Used to record the
607    /// `shutdown_watch_aborted` counter if the grace-bounded reap fires.
608    reporter: Arc<crate::reporter::Reporter>,
609    /// Cooperative-cancel sender for the heartbeat task. `None` when the
610    /// heartbeat is disabled (interval == 0 or built without `tracing`).
611    /// Dropping the sender resolves the task's cancel future.
612    heartbeat_cancel_tx: Option<tokio::sync::oneshot::Sender<()>>,
613    /// Join handle for the heartbeat task. `None` when the heartbeat is
614    /// disabled. Output is `()` because the task never returns an error —
615    /// panics are caught inside the task body and recorded via the
616    /// `heartbeat_task_panicked` counter.
617    heartbeat_handle: Option<tokio::task::JoinHandle<()>>,
618}
619
620impl WatchGuard {
621    /// Signal the leader-watch task to stop, wait for it to drain, and report
622    /// its outcome.
623    ///
624    /// A cooperatively cancelled task returns `Ok(())` — the stop was
625    /// requested, so it is not an error. If the task had already terminated on
626    /// its own (driver error, stream EOF, or panic) the original outcome is
627    /// surfaced verbatim: `Err(e)` or [`ServerError::WatchPanic`]. Either way
628    /// serving state is `NotServing` once this returns.
629    ///
630    /// The cooperative wait is bounded by the configured
631    /// [`ServerBuilder::shutdown_grace`]: if the task is parked in a
632    /// consensus-driver call that never returns it is aborted once the grace
633    /// elapses (still reported as `Ok(())`), so an embedder's shutdown can never
634    /// wedge behind a hung driver.
635    pub async fn shutdown(mut self) -> Result<(), ServerError> {
636        // Dropping the senders fires each task's cancel future. The heartbeat
637        // task is reaped first because it is bounded by `tokio::time::sleep`
638        // (cooperative stop is fast and never wedges on a driver call), so its
639        // reap returns quickly and leaves the grace budget for the watch task
640        // — which may be parked in a wedged consensus-driver call.
641        self.heartbeat_cancel_tx.take();
642        self.cancel_tx.take();
643        if let Some(mut hb_handle) = self.heartbeat_handle.take() {
644            match tokio::time::timeout(self.shutdown_grace, &mut hb_handle).await {
645                Ok(Ok(())) => {}
646                // Task panicked — already counted + logged via catch_unwind in
647                // the task body. Nothing more to do here.
648                Ok(Err(_join_err)) => {}
649                // Grace overrun — sleep + select! should always observe a
650                // dropped cancel sender, so this is a backstop. Abort and
651                // reap; no separate metric (the heartbeat is observability
652                // only — its lateness is not a serving correctness signal).
653                Err(_elapsed) => {
654                    hb_handle.abort();
655                    let _ = (&mut hb_handle).await;
656                }
657            }
658        }
659        match self.handle.take() {
660            Some(mut handle) => join_to_server_result(
661                await_watch_within_grace(&mut handle, self.shutdown_grace, &self.reporter).await,
662            ),
663            None => Ok(()),
664        }
665    }
666
667    /// Hard-abort the leader-watch task without waiting for a cooperative stop.
668    ///
669    /// Prefer [`Self::shutdown`] or simply dropping the guard; both let the
670    /// task stop at a safe point. This is an escape hatch for callers that
671    /// cannot await and accept that the task may be torn down mid-fence.
672    pub fn abort(mut self) {
673        if let Some(handle) = self.handle.take() {
674            handle.abort();
675        }
676        // Hard-abort the heartbeat task too — leaving it running after the
677        // watch is torn down would publish heartbeats describing a stale
678        // (typically `NotServing`) view until the Arc<Reporter> is dropped.
679        if let Some(hb_handle) = self.heartbeat_handle.take() {
680            hb_handle.abort();
681        }
682    }
683
684    /// Whether the leader-watch task has finished — terminated for any reason
685    /// (cooperative cancel, driver error, stream EOF, or panic).
686    ///
687    /// A read-only liveness probe that neither consumes the guard nor disturbs
688    /// its cancel-on-drop behavior, so an embedder can poll task health while
689    /// keeping the guard alive.
690    pub fn is_finished(&self) -> bool {
691        self.handle
692            .as_ref()
693            .is_none_or(|handle| handle.is_finished())
694    }
695}
696
697impl Drop for WatchGuard {
698    fn drop(&mut self) {
699        // Close the serving gate synchronously, here at the drop site. Dropping
700        // the cancel sender below only *requests* the watch task to stop; the
701        // task publishes `NotServing` later, on its own timeline. Between this
702        // drop and the task's next poll the fast gate would still read `Serving`
703        // (and the allocator would still grant), so an RPC could be admitted by a
704        // server that has already been told to stop serving. `step_down` clears
705        // the allocator and publishes `NotServing` now, before any await; the
706        // watch task's later `step_down(None, None)` on cooperative cancel (see
707        // `fence::run_leader_watch`) republishes the identical state, so the
708        // double-close is harmless and idempotent.
709        self.core.step_down(None, None);
710        // Dropping the sender (if `shutdown` / `abort` did not already take it)
711        // resolves the task's cancel future; the task then publishes
712        // `NotServing` and returns. The `JoinHandle` is dropped here too,
713        // detaching the task to finish its cooperative shutdown on its own.
714        self.cancel_tx.take();
715        // Same treatment for the heartbeat task. `Drop` is sync so we cannot
716        // await the cooperative stop; instead we drop the cancel sender (the
717        // task will observe it at its next select! boundary) and hard-abort
718        // the handle so it cannot outlive the guard and publish heartbeats
719        // describing a stale view if the runtime keeps the Arc alive.
720        self.heartbeat_cancel_tx.take();
721        if let Some(hb_handle) = self.heartbeat_handle.take() {
722            hb_handle.abort();
723        }
724    }
725}
726
727/// Merge the caller's `shutdown` future with an internal cancellation signal.
728///
729/// Both [`Server::serve_with_shutdown`] and [`Server::serve_with_listener`]
730/// need tonic to stop when EITHER the caller's `shutdown` fires OR the
731/// leader-watch task terminates (signalled by firing the returned
732/// `oneshot::Sender`). This builds that merged shutdown future and hands back
733/// the sender so [`serve_inner`] can trip it from the watch arm.
734fn combined_shutdown_with_cancel(
735    shutdown: impl Future<Output = ()> + Send + 'static,
736) -> (
737    impl Future<Output = ()> + Send + 'static,
738    tokio::sync::oneshot::Sender<()>,
739) {
740    let (cancel_tx, cancel_rx) = tokio::sync::oneshot::channel::<()>();
741    let combined_shutdown = async move {
742        tokio::select! {
743            _ = shutdown => {}
744            _ = cancel_rx => {}
745        }
746    };
747    (combined_shutdown, cancel_tx)
748}
749
750/// Wait for the leader-watch task to stop cooperatively, but no longer than
751/// `grace`, then forcibly abort it if it is still running.
752///
753/// The watch task observes its cancel signal only at the `select!` boundaries
754/// in [`crate::fence::run_leader_watch`], never inside a fence attempt. A
755/// consensus driver whose `load_high_water` / `persist_high_water` never
756/// returns (the trait places no latency bound; see
757/// [`tsoracle_consensus::ConsensusDriver`]) therefore parks the task upstream
758/// of any cancel-observing await, so dropping the cancel sender cannot stop it.
759/// Left unbounded, the shutdown wait would block process exit until the kubelet
760/// escalates to SIGKILL on a drain. Bounding the wait by `grace` and aborting
761/// on expiry guarantees forward progress: `tokio` tears a suspended task (and
762/// the wedged driver future it holds) down at the abort, dropping its
763/// drain-barrier guard.
764///
765/// Returns the task's join result. A clean cooperative stop forwards its real
766/// outcome verbatim; an aborted task surfaces as a cancelled `JoinError`, which
767/// [`join_to_server_result`] maps to `Ok(())` — the stop was requested, so a
768/// forced abort during shutdown is not an error. A `grace` of zero aborts
769/// immediately (the `timeout` future is already elapsed on first poll).
770async fn await_watch_within_grace(
771    watch_handle: &mut tokio::task::JoinHandle<Result<(), ServerError>>,
772    grace: Duration,
773    reporter: &Arc<crate::reporter::Reporter>,
774) -> Result<Result<(), ServerError>, tokio::task::JoinError> {
775    match tokio::time::timeout(grace, &mut *watch_handle).await {
776        Ok(join_result) => join_result,
777        Err(_elapsed) => {
778            reporter.shutdown_watch_aborted.increment(1);
779            #[cfg(feature = "tracing")]
780            tracing::warn!(
781                grace_ms = grace.as_millis() as u64,
782                "leader-watch task did not stop within the shutdown grace; aborting it (a consensus-driver call likely exceeded its latency bound)"
783            );
784            watch_handle.abort();
785            // Reap the aborted task so its Drop (releasing any held drain-barrier
786            // guard) runs before we report shutdown complete. Bounded: an aborted
787            // task resolves at its next poll.
788            (&mut *watch_handle).await
789        }
790    }
791}
792
793/// Drive the gRPC `serve_future` against the leader-watch task, shared by
794/// [`Server::serve_with_shutdown`] and [`Server::serve_with_listener`].
795///
796/// The two public methods differ only in how `serve_future` is assembled
797/// (address-bound via `serve_with_shutdown` vs listener-bound via
798/// `serve_with_incoming_shutdown`); everything downstream — the biased select,
799/// the cooperative-cancel path, and the drain/translate logic — is identical
800/// and lives here so a future change need only be made once.
801///
802/// `tonic_cancel_tx` is the cancellation half paired with the `serve_future`'s
803/// shutdown signal (see [`combined_shutdown_with_cancel`]); firing it begins
804/// tonic's graceful drain when the watch task terminates first. `watch_cancel_tx`
805/// is the leader-watch task's own cooperative-cancel sender (the same one a
806/// [`WatchGuard`] holds for embedders); dropping it stops the task. Taking the
807/// raw parts rather than a `WatchGuard` keeps this path free of the guard's
808/// `Option` fields — neither the watch handle nor the cancel sender is optional
809/// here. `shutdown_grace` bounds the user-shutdown arm's wait for the watch task
810/// (see [`await_watch_within_grace`]). `core` is the shared serving core (the
811/// same one the watch task and the gRPC service hold): the user-shutdown arm
812/// closes the gate on it synchronously so no RPC is admitted in the window
813/// before the watch task observes cancellation and publishes `NotServing`.
814// Private serve helper. The wide signature is the cost of being the single
815// merge point for the two public `serve_*` paths and the leader-watch +
816// heartbeat task pair: bundling these into a struct just to placate clippy
817// would obscure the lifecycle (every parameter is consumed exactly once and
818// has no shared identity worth naming). Keep the arguments visible.
819#[allow(clippy::too_many_arguments)]
820async fn serve_inner<S>(
821    watch_cancel_tx: tokio::sync::oneshot::Sender<()>,
822    mut watch_handle: tokio::task::JoinHandle<Result<(), ServerError>>,
823    heartbeat_cancel_tx: Option<tokio::sync::oneshot::Sender<()>>,
824    heartbeat_handle: Option<tokio::task::JoinHandle<()>>,
825    serve_future: S,
826    tonic_cancel_tx: tokio::sync::oneshot::Sender<()>,
827    shutdown_grace: Duration,
828    core: Arc<ServingCore>,
829    reporter: Arc<crate::reporter::Reporter>,
830) -> Result<(), ServerError>
831where
832    S: Future<Output = Result<(), tonic::transport::Error>>,
833{
834    tokio::pin!(serve_future);
835
836    let outcome = tokio::select! {
837        // Bias toward the watch arm: if both are ready in the same poll
838        // (rare but possible — graceful shutdown completed in the same
839        // tick the watch returned), we want to surface the watch error
840        // rather than report a clean shutdown.
841        biased;
842
843        watch_result = &mut watch_handle => {
844            // Watch terminated. State is already poisoned (see watch
845            // task body in into_router). Trigger tonic drain, wait for
846            // it to finish, then report the watch's outcome — preferring
847            // it over any drain error, which surfaces only if the watch
848            // itself ended cleanly.
849            let _ = tonic_cancel_tx.send(());
850            let drain_result = serve_future.await;
851            combine_watch_and_drain(watch_result, drain_result)
852        }
853        serve_result = &mut serve_future => {
854            // User shutdown fired (or our cancel — but watch arm has
855            // `biased` priority, so reaching here means user shutdown).
856            // Prefer a cooperative stop: dropping the cancel sender resolves
857            // the task's cancel future so it stops at its next `select!`
858            // boundary, having published `NotServing` and never torn down
859            // mid-fence while holding `extension_gate.write()`. But a
860            // cooperative stop is only observed at those boundaries, never
861            // inside a fence attempt — a consensus-driver call that never
862            // returns (the trait places no latency bound) would park the task
863            // upstream of any cancel point and block process exit until a
864            // kubelet SIGKILL. `await_watch_within_grace` therefore bounds the
865            // wait by `shutdown_grace` and aborts the task if it overruns. The
866            // task's own outcome (a clean `Ok(())` on cooperative stop, a
867            // cancelled `JoinError` on abort) is discarded; the user-requested
868            // shutdown result wins.
869            //
870            // Close the serving gate synchronously first: dropping the sender
871            // only *requests* the stop, and a task aborted on grace expiry (or
872            // simply not yet rescheduled) may never reach its own `step_down`. So
873            // a `GetTs` arriving during the drain would still be admitted unless
874            // we close the gate here. `step_down` is idempotent with the watch
875            // task's own cooperative-cancel publish.
876            core.step_down(None, None);
877            drop(watch_cancel_tx);
878            let _ = await_watch_within_grace(&mut watch_handle, shutdown_grace, &reporter).await;
879            serve_result?;
880            Ok(())
881        }
882    };
883
884    // Stop the heartbeat task on every exit path. Done after the watch reap so
885    // the watch-arm `combine_watch_and_drain` already saw the watch outcome,
886    // and the user-shutdown arm has finished its grace-bounded wait. Dropping
887    // the cancel sender breaks the task's `tokio::select! { biased; cancel,
888    // sleep }` loop on the next poll; if the task is wedged for any reason we
889    // hard-abort on grace overrun. The task's outcome is observability only
890    // and cannot influence serving correctness, so its join result is dropped.
891    drop(heartbeat_cancel_tx);
892    if let Some(mut hb_handle) = heartbeat_handle {
893        match tokio::time::timeout(shutdown_grace, &mut hb_handle).await {
894            Ok(Ok(())) => {}
895            Ok(Err(_join_err)) => {} // panic — already counted via catch_unwind
896            Err(_elapsed) => {
897                hb_handle.abort();
898                let _ = (&mut hb_handle).await;
899            }
900        }
901    }
902
903    outcome
904}
905
906/// Convert a `JoinHandle` result into a `ServerError`-typed result.
907///
908/// - `Ok(Ok(()))` — cooperative cancellation: `run_leader_watch` observed its
909///   cancel signal (the `WatchGuard` was dropped, `WatchGuard::shutdown` was
910///   called, or `serve_inner` cancelled it on user shutdown), published
911///   `NotServing`, and returned cleanly. Forwarded verbatim as `Ok(())`.
912/// - `Ok(Err(e))` — task returned an error (including `WatchStreamClosed`
913///   from a clean EOF). Forward verbatim.
914/// - `Err(JoinError)` — task was aborted or panicked. An abort
915///   (`WatchGuard::abort` or `JoinHandle::abort`) maps to Ok (we asked for it);
916///   a panic maps to `WatchPanic` with payload.
917fn join_to_server_result(
918    join_result: Result<Result<(), ServerError>, tokio::task::JoinError>,
919) -> Result<(), ServerError> {
920    match join_result {
921        Ok(inner) => inner,
922        Err(join_err) if join_err.is_panic() => {
923            let payload = panic_payload_to_string(join_err.into_panic());
924            Err(ServerError::WatchPanic {
925                payload,
926                bt: Bt::capture(),
927            })
928        }
929        Err(_cancelled) => Ok(()),
930    }
931}
932
933/// Combine the leader-watch outcome with the tonic graceful-drain outcome
934/// after the watch arm fired.
935///
936/// When the watch task terminates first we trigger the drain and then must
937/// report a single result. The watch error is the root cause — poisoned
938/// serving state was already published before the task returned — so it wins
939/// when both fail. A drain error (port stolen, resource exhaustion) is only
940/// surfaced when the watch outcome is otherwise `Ok`; previously it was
941/// discarded via `let _ = serve.await`, hiding a failed drain behind a clean
942/// shutdown report.
943///
944/// Generic over the drain error so the precedence logic is unit-testable
945/// without fabricating a `tonic::transport::Error` (which has no public
946/// constructor): production passes `Result<(), tonic::transport::Error>`,
947/// tests pass `Result<(), ServerError>` via the reflexive `From` impl.
948fn combine_watch_and_drain<E>(
949    watch_result: Result<Result<(), ServerError>, tokio::task::JoinError>,
950    drain_result: Result<(), E>,
951) -> Result<(), ServerError>
952where
953    ServerError: From<E>,
954{
955    match join_to_server_result(watch_result) {
956        Err(watch_err) => Err(watch_err),
957        Ok(()) => drain_result.map_err(ServerError::from),
958    }
959}
960
961/// Build the gRPC reflection service from an encoded protobuf descriptor set.
962///
963/// Factored out of [`Server::into_router`] so the decode-failure path is unit
964/// testable: production passes [`tsoracle_proto::FILE_DESCRIPTOR_SET`], while
965/// tests can feed deliberately corrupt bytes to exercise the error mapping.
966/// A decode failure becomes [`ServerError::ReflectionInit`] rather than a panic.
967#[cfg(feature = "reflection")]
968fn build_reflection_service(
969    descriptor_set: &[u8],
970) -> Result<
971    tonic_reflection::server::v1::ServerReflectionServer<
972        impl tonic_reflection::server::v1::ServerReflection,
973    >,
974    ServerError,
975> {
976    tonic_reflection::server::Builder::configure()
977        .register_encoded_file_descriptor_set(descriptor_set)
978        .build_v1()
979        .map_err(ServerError::ReflectionInit)
980}
981
982fn panic_payload_to_string(panic: Box<dyn std::any::Any + Send>) -> String {
983    if let Some(text) = panic.downcast_ref::<&'static str>() {
984        (*text).to_string()
985    } else if let Some(text) = panic.downcast_ref::<String>() {
986        text.clone()
987    } else {
988        "watch task panicked with non-string payload".to_string()
989    }
990}
991
992#[cfg(any(test, feature = "test-fakes"))]
993impl Server {
994    /// Test-only entry point for the leader-watch loop. Exposed to integration
995    /// tests via the `test-fakes` feature; not part of the stable public API.
996    #[doc(hidden)]
997    pub async fn run_leader_watch_for_tests(self: Arc<Self>) -> Result<(), ServerError> {
998        // A never-resolving cancel future: these tests drive termination via
999        // leadership events or `JoinHandle::abort`, not cooperative cancel.
1000        crate::fence::run_leader_watch(self, futures::future::pending()).await
1001    }
1002
1003    /// Test-only allocator probe. Issues a window grant against the current
1004    /// in-memory state without going through the gRPC service. Used by
1005    /// regression tests that need to observe the behavioral fence (no
1006    /// timestamp at or below the prior leader's high-water) directly.
1007    #[doc(hidden)]
1008    pub fn try_grant_for_tests(&self, count: u32) -> Result<WindowGrant, CoreError> {
1009        self.core.try_grant(self.clock.now_ms(), count)
1010    }
1011}
1012
1013#[cfg(test)]
1014mod tests {
1015    use super::*;
1016
1017    #[test]
1018    fn panic_payload_to_string_recovers_static_str() {
1019        // `panic!("literal")` produces a `&'static str` payload; we want the
1020        // verbatim text so operators see what the watch task said.
1021        let payload: Box<dyn std::any::Any + Send> = Box::new("watch boom");
1022        assert_eq!(panic_payload_to_string(payload), "watch boom");
1023    }
1024
1025    #[test]
1026    fn panic_payload_to_string_recovers_owned_string() {
1027        // `panic!("{var}")` produces a `String` payload (formatted at panic
1028        // time); the helper must downcast that branch too.
1029        let payload: Box<dyn std::any::Any + Send> = Box::new(String::from("formatted"));
1030        assert_eq!(panic_payload_to_string(payload), "formatted");
1031    }
1032
1033    #[test]
1034    fn panic_payload_to_string_falls_back_for_other_types() {
1035        // Custom payloads (panic!(MyType { .. })) hit the catch-all branch.
1036        struct Custom;
1037        let payload: Box<dyn std::any::Any + Send> = Box::new(Custom);
1038        assert_eq!(
1039            panic_payload_to_string(payload),
1040            "watch task panicked with non-string payload",
1041        );
1042    }
1043
1044    #[test]
1045    fn serving_transitions_publish_through_core() {
1046        // The Server delegates serving-state transitions to its ServingCore; a
1047        // step_down on a freshly built Server lands as NotServing with the hint.
1048        // (The #346 send_replace-with-zero-receivers regression is pinned by the
1049        // ServingCore unit tests, which can observe `receiver_count` directly.)
1050        let server = Server::builder()
1051            .consensus_driver(Arc::new(crate::test_fakes::InMemoryDriver::new()))
1052            .build()
1053            .expect("build must succeed");
1054
1055        let hint = PeerEndpoint::try_from("new-leader:9000").unwrap();
1056        server.core.step_down(Some(hint.clone()), Some(Epoch(7)));
1057
1058        match server.core.serving_state() {
1059            ServingState::NotServing {
1060                leader_endpoint,
1061                leader_epoch,
1062            } => {
1063                assert_eq!(leader_endpoint, Some(hint));
1064                assert_eq!(leader_epoch, Some(Epoch(7)));
1065            }
1066            ServingState::Serving => panic!("expected NotServing after step_down"),
1067        }
1068    }
1069
1070    #[cfg(any(feature = "tls-rustls", feature = "tls-native"))]
1071    #[test]
1072    fn builder_stores_tls_config() {
1073        // The serve_* paths read `tls_config` from `Server` (not the builder)
1074        // after `into_router` consumes self — so the field must survive the
1075        // builder → Server hand-off, not just the builder method.
1076        use crate::test_fakes::InMemoryDriver;
1077
1078        let driver = Arc::new(InMemoryDriver::new());
1079        let cfg = tonic::transport::ServerTlsConfig::new();
1080        let server = Server::builder()
1081            .consensus_driver(driver)
1082            .tls_config(cfg)
1083            .build()
1084            .expect("build with tls_config must succeed");
1085        assert!(server.tls_config.is_some());
1086    }
1087
1088    #[tokio::test]
1089    async fn join_to_server_result_passes_through_clean_outcome() {
1090        // Ok(Ok(())) — task finished cleanly; forward verbatim.
1091        let handle = tokio::spawn(async { Ok::<(), ServerError>(()) });
1092        let join = handle.await;
1093        assert!(matches!(join_to_server_result(join), Ok(())));
1094    }
1095
1096    #[tokio::test]
1097    async fn join_to_server_result_forwards_inner_error() {
1098        // Ok(Err(e)) — task returned an error; forward it.
1099        let handle = tokio::spawn(async {
1100            Err::<(), ServerError>(ServerError::WatchPanic {
1101                payload: "synthetic".into(),
1102                bt: Bt::capture(),
1103            })
1104        });
1105        let join = handle.await;
1106        match join_to_server_result(join) {
1107            Err(ServerError::WatchPanic { payload, .. }) => assert_eq!(payload, "synthetic"),
1108            other => panic!("expected forwarded WatchPanic, got {other:?}"),
1109        }
1110    }
1111
1112    #[tokio::test]
1113    async fn join_to_server_result_translates_panic_to_watch_panic() {
1114        // Err(JoinError::is_panic) — task panicked; surface as WatchPanic with
1115        // the payload stringified by `panic_payload_to_string`.
1116        let handle = tokio::spawn(async {
1117            panic!("intentional");
1118            #[allow(unreachable_code)]
1119            Ok::<(), ServerError>(())
1120        });
1121        let join = handle.await;
1122        match join_to_server_result(join) {
1123            Err(ServerError::WatchPanic { payload, .. }) => {
1124                assert!(payload.contains("intentional"))
1125            }
1126            other => panic!("expected WatchPanic, got {other:?}"),
1127        }
1128    }
1129
1130    #[tokio::test]
1131    async fn join_to_server_result_treats_cancellation_as_clean_exit() {
1132        // Err(JoinError::is_cancelled) — caller aborted the task; we asked
1133        // for that, so map to Ok.
1134        let handle: tokio::task::JoinHandle<Result<(), ServerError>> =
1135            tokio::spawn(async { futures::future::pending().await });
1136        handle.abort();
1137        let join = handle.await;
1138        assert!(matches!(join_to_server_result(join), Ok(())));
1139    }
1140
1141    #[tokio::test]
1142    async fn combine_watch_and_drain_surfaces_drain_error_when_watch_ok() {
1143        // Watch ended cleanly but the graceful drain failed (port stolen,
1144        // resource exhaustion). The drain error must not be swallowed.
1145        let watch = tokio::spawn(async { Ok::<(), ServerError>(()) }).await;
1146        let drain: Result<(), ServerError> = Err(ServerError::WatchStreamClosed);
1147        assert!(matches!(
1148            combine_watch_and_drain(watch, drain),
1149            Err(ServerError::WatchStreamClosed)
1150        ));
1151    }
1152
1153    #[tokio::test]
1154    async fn combine_watch_and_drain_returns_ok_when_both_succeed() {
1155        // Watch clean, drain clean — the only fully-Ok outcome.
1156        let watch = tokio::spawn(async { Ok::<(), ServerError>(()) }).await;
1157        let drain: Result<(), ServerError> = Ok(());
1158        assert!(matches!(combine_watch_and_drain(watch, drain), Ok(())));
1159    }
1160
1161    #[tokio::test]
1162    async fn combine_watch_and_drain_prefers_watch_error_over_drain_error() {
1163        // Both failed. The watch error is the root cause (poisoned state was
1164        // already published), so it wins; the drain error is dropped.
1165        let watch = tokio::spawn(async {
1166            Err::<(), ServerError>(ServerError::WatchPanic {
1167                payload: "watch".into(),
1168                bt: Bt::capture(),
1169            })
1170        })
1171        .await;
1172        let drain: Result<(), ServerError> = Err(ServerError::WatchStreamClosed);
1173        match combine_watch_and_drain(watch, drain) {
1174            Err(ServerError::WatchPanic { payload, .. }) => assert_eq!(payload, "watch"),
1175            other => panic!("expected watch error to win, got {other:?}"),
1176        }
1177    }
1178
1179    #[tokio::test]
1180    async fn combine_watch_and_drain_returns_watch_error_when_drain_ok() {
1181        // Watch failed, drain succeeded — forward the watch error verbatim.
1182        let watch = tokio::spawn(async {
1183            Err::<(), ServerError>(ServerError::WatchPanic {
1184                payload: "watch".into(),
1185                bt: Bt::capture(),
1186            })
1187        })
1188        .await;
1189        let drain: Result<(), ServerError> = Ok(());
1190        match combine_watch_and_drain(watch, drain) {
1191            Err(ServerError::WatchPanic { payload, .. }) => assert_eq!(payload, "watch"),
1192            other => panic!("expected forwarded watch error, got {other:?}"),
1193        }
1194    }
1195
1196    #[tokio::test]
1197    async fn dropping_watch_guard_closes_serving_gate_synchronously() {
1198        // Regression: dropping the guard must close the serving gate at the
1199        // drop site, not on the watch task's later timeline. Build a guard whose
1200        // watch handle never touches the core (so the ONLY thing that can flip
1201        // the state to NotServing is `Drop`), publish `Serving`, then drop the
1202        // guard and read the state with NO await in between. On the current-thread
1203        // test runtime no other task can run between the synchronous `drop` and
1204        // the synchronous `serving_state` read, so observing `NotServing` proves
1205        // `Drop` closed the gate synchronously rather than the watch task.
1206        let core = Arc::new(ServingCore::new(Duration::from_secs(3)));
1207        core.publish_serving();
1208
1209        let handle: tokio::task::JoinHandle<Result<(), ServerError>> =
1210            tokio::spawn(async { Ok(()) });
1211        let (cancel_tx, _cancel_rx) = tokio::sync::oneshot::channel::<()>();
1212        let guard = WatchGuard {
1213            cancel_tx: Some(cancel_tx),
1214            handle: Some(handle),
1215            shutdown_grace: Duration::from_secs(10),
1216            core: core.clone(),
1217            reporter: Arc::new(crate::reporter::Reporter::new()),
1218            heartbeat_cancel_tx: None,
1219            heartbeat_handle: None,
1220        };
1221
1222        drop(guard);
1223
1224        assert!(
1225            matches!(core.serving_state(), ServingState::NotServing { .. }),
1226            "dropping the WatchGuard must close the serving gate synchronously",
1227        );
1228    }
1229
1230    #[tokio::test]
1231    async fn serve_inner_closes_serving_gate_on_user_shutdown() {
1232        // Regression for the serve path: when the caller's shutdown fires,
1233        // `serve_inner` drops the watch cancel sender and waits out the grace,
1234        // forcibly aborting the watch task if it overruns. A task parked upstream
1235        // of any cancel-observing await (modelled here by a never-resolving
1236        // future) is aborted before it can publish `NotServing`, so the gate
1237        // would stay open unless `serve_inner` closes it itself. Seed `Serving`,
1238        // run the user-shutdown arm with a zero grace (immediate abort), and
1239        // assert the gate is closed on return.
1240        let core = Arc::new(ServingCore::new(Duration::from_secs(3)));
1241        core.publish_serving();
1242
1243        let watch_handle: tokio::task::JoinHandle<Result<(), ServerError>> =
1244            tokio::spawn(async { futures::future::pending().await });
1245        let (watch_cancel_tx, _watch_cancel_rx) = tokio::sync::oneshot::channel::<()>();
1246        let (tonic_cancel_tx, _tonic_cancel_rx) = tokio::sync::oneshot::channel::<()>();
1247
1248        // A serve future that is immediately ready models the user's shutdown
1249        // having fired; with the biased select preferring the (pending) watch arm,
1250        // control reaches the user-shutdown arm.
1251        let serve_future = async { Ok::<(), tonic::transport::Error>(()) };
1252
1253        let result = serve_inner(
1254            watch_cancel_tx,
1255            watch_handle,
1256            None, // heartbeat_cancel_tx — heartbeat disabled in this regression test
1257            None, // heartbeat_handle
1258            serve_future,
1259            tonic_cancel_tx,
1260            Duration::from_millis(0),
1261            core.clone(),
1262            Arc::new(crate::reporter::Reporter::new()),
1263        )
1264        .await;
1265
1266        assert!(
1267            result.is_ok(),
1268            "user shutdown must return Ok, got {result:?}"
1269        );
1270        assert!(
1271            matches!(core.serving_state(), ServingState::NotServing { .. }),
1272            "serve_inner's user-shutdown arm must close the serving gate synchronously",
1273        );
1274    }
1275
1276    #[cfg(feature = "reflection")]
1277    #[test]
1278    fn build_reflection_service_accepts_embedded_descriptor_set() {
1279        // The descriptor set emitted by `tsoracle-proto`'s build.rs must decode
1280        // cleanly — this is the production happy path that previously sat behind
1281        // an `expect`.
1282        assert!(build_reflection_service(tsoracle_proto::FILE_DESCRIPTOR_SET).is_ok());
1283    }
1284
1285    #[cfg(feature = "reflection")]
1286    #[test]
1287    fn build_reflection_service_maps_corrupt_descriptor_to_typed_error() {
1288        // A descriptor set that fails to decode (build artifact drift) must
1289        // surface as a typed `ServerError::ReflectionInit`, not a panic. The
1290        // bytes below are not a valid encoded `FileDescriptorSet`.
1291        let corrupt = b"\xff\xff\xff\xff not a descriptor set";
1292        // The Ok variant wraps a reflection service that is not `Debug`, so map
1293        // to a unit result before asserting on the error variant.
1294        match build_reflection_service(corrupt).map(|_| ()) {
1295            Err(ServerError::ReflectionInit(_)) => {}
1296            other => panic!("expected ReflectionInit error, got {other:?}"),
1297        }
1298    }
1299}