Skip to main content

tsoracle_server/
server.rs

1//
2//  ░▀█▀░█▀▀░█▀█░█▀▄░█▀█░█▀▀░█░░░█▀▀
3//  ░░█░░▀▀█░█░█░█▀▄░█▀█░█░░░█░░░█▀▀
4//  ░░▀░░▀▀▀░▀▀▀░▀░▀░▀░▀░▀▀▀░▀▀▀░▀▀▀
5//
6//  tsoracle — Distributed Timestamp Oracle
7//  https://www.tsoracle.rs
8//
9//  Copyright (c) 2026 Prisma Risk
10//
11//  Licensed under the Apache License, Version 2.0 (the "License");
12//  you may not use this file except in compliance with the License.
13//  You may obtain a copy of the License at
14//
15//      https://www.apache.org/licenses/LICENSE-2.0
16//
17//  Unless required by applicable law or agreed to in writing, software
18//  distributed under the License is distributed on an "AS IS" BASIS,
19//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20//  See the License for the specific language governing permissions and
21//  limitations under the License.
22//
23
24use core::time::Duration;
25use std::future::Future;
26use std::net::SocketAddr;
27use std::sync::Arc;
28use tokio::sync::watch;
29use tonic::service::Routes;
30use tonic::transport::Server as TonicServer;
31use tsoracle_consensus::ConsensusDriver;
32#[cfg(any(test, feature = "test-fakes"))]
33use tsoracle_core::{CoreError, WindowGrant};
34use tsoracle_core::{Epoch, PeerEndpoint};
35use tsoracle_proto::v1::tso_service_server::TsoServiceServer;
36
37use crate::bt::Bt;
38use crate::clock::{Clock, SystemClock};
39use crate::service::TsoServiceImpl;
40use crate::serving_core::ServingCore;
41
42#[derive(Debug, thiserror::Error)]
43pub enum BuildError {
44    #[error("consensus_driver is required")]
45    MissingConsensusDriver,
46    /// `max_seq_count` was set to 0. Every positive `count` would then be
47    /// rejected as `SeqCountTooLarge` (the `count >= 1` floor leaves no valid
48    /// value), silently disabling `GetSeq`. Rejected at build time so the
49    /// misconfiguration surfaces immediately rather than at first request.
50    #[error("max_seq_count must be >= 1 (0 would reject every GetSeq request)")]
51    ZeroMaxSeqCount,
52}
53
54#[derive(Debug, thiserror::Error)]
55pub enum ServerError {
56    #[error("transport: {0}")]
57    Transport(#[from] tonic::transport::Error),
58    #[error("consensus: {0}")]
59    Consensus(#[from] tsoracle_consensus::ConsensusError),
60    #[error("core: {0}")]
61    Core(#[from] tsoracle_core::CoreError),
62    /// The leader-watch task panicked. Distinct from a clean error return so
63    /// operators can tell "driver returned Err" (recoverable design) from
64    /// "task panicked" (programming bug).
65    #[error("leader-watch task panicked: {payload}{bt}")]
66    WatchPanic { payload: String, bt: Bt },
67    /// The consensus driver's `leadership_events()` stream ended cleanly while
68    /// the leader-watch task was running. The stream is contracted to live for
69    /// the life of the server, so its end is anomalous (driver shutdown, lost
70    /// session, etc.) — distinct from a `Consensus` error returned mid-fence.
71    /// The watch task publishes `ServingState::NotServing` before returning
72    /// this variant so embedders who never observe the [`WatchGuard`] still get
73    /// the documented fail-safe behavior.
74    #[error("consensus driver leadership stream closed")]
75    WatchStreamClosed,
76    /// The embedded protobuf descriptor set failed to decode while building the
77    /// gRPC reflection service. `tsoracle-proto`'s `build.rs` emits these bytes
78    /// from checked-in `.proto` sources, so a failure here signals build-artifact
79    /// drift (a corrupt or stale descriptor) rather than a runtime condition —
80    /// surfaced as a diagnosable startup error instead of a process panic.
81    #[cfg(feature = "reflection")]
82    #[error("failed to build gRPC reflection service from embedded descriptor set: {0}")]
83    ReflectionInit(#[source] tonic_reflection::server::Error),
84}
85
86#[derive(Clone, Debug)]
87pub enum ServingState {
88    NotServing {
89        leader_endpoint: Option<PeerEndpoint>,
90        leader_epoch: Option<Epoch>,
91    },
92    Serving,
93}
94
95/// Default bound on how long a graceful shutdown waits for the leader-watch
96/// task to stop cooperatively before forcibly aborting it. The abort is a
97/// last-resort safety net for a consensus driver whose `load_high_water` /
98/// `persist_high_water` is wedged (the trait places no latency bound; see
99/// [`ConsensusDriver`]). Chosen to sit comfortably under a typical Kubernetes
100/// `terminationGracePeriodSeconds` (30s) so the abort, the tonic drain, and
101/// process exit all complete before the kubelet escalates to SIGKILL.
102const DEFAULT_SHUTDOWN_GRACE: Duration = Duration::from_secs(10);
103
104pub struct ServerBuilder {
105    consensus: Option<Arc<dyn ConsensusDriver>>,
106    clock: Option<Arc<dyn Clock>>,
107    window_ahead: Duration,
108    failover_advance: Duration,
109    shutdown_grace: Duration,
110    heartbeat_interval: Duration,
111    max_seq_count: u32,
112    #[cfg(any(feature = "tls-rustls", feature = "tls-native"))]
113    tls_config: Option<tonic::transport::ServerTlsConfig>,
114}
115
116impl Default for ServerBuilder {
117    fn default() -> Self {
118        ServerBuilder {
119            consensus: None,
120            clock: None,
121            window_ahead: Duration::from_secs(3),
122            failover_advance: Duration::from_secs(1),
123            shutdown_grace: DEFAULT_SHUTDOWN_GRACE,
124            heartbeat_interval: Duration::from_secs(10),
125            max_seq_count: tsoracle_core::DEFAULT_MAX_SEQ_COUNT,
126            #[cfg(any(feature = "tls-rustls", feature = "tls-native"))]
127            tls_config: None,
128        }
129    }
130}
131
132impl ServerBuilder {
133    pub fn consensus_driver(mut self, driver: Arc<dyn ConsensusDriver>) -> Self {
134        self.consensus = Some(driver);
135        self
136    }
137    pub fn clock(mut self, clock: Arc<dyn Clock>) -> Self {
138        self.clock = Some(clock);
139        self
140    }
141    pub fn window_ahead(mut self, window_ahead: Duration) -> Self {
142        self.window_ahead = window_ahead;
143        self
144    }
145    pub fn failover_advance(mut self, failover_advance: Duration) -> Self {
146        self.failover_advance = failover_advance;
147        self
148    }
149
150    /// Bound on how long a graceful shutdown waits for the leader-watch task to
151    /// stop cooperatively before forcibly aborting it.
152    ///
153    /// On shutdown the server drops the watch task's cancel signal and waits for
154    /// it to publish `NotServing` and return. That wait is normally
155    /// near-instant, but the task observes cancellation only at its `select!`
156    /// boundaries — never inside a fence attempt. A consensus driver whose
157    /// `load_high_water` / `persist_high_water` never returns (the trait places
158    /// no latency bound) would otherwise park the task mid-fence and block
159    /// process exit indefinitely, leading to a SIGKILL on a Kubernetes drain.
160    /// Once `shutdown_grace` elapses the server aborts the task so exit always
161    /// makes progress. Set this comfortably below your deployment's
162    /// `terminationGracePeriodSeconds`. Defaults to 10s. A value of zero aborts
163    /// immediately without waiting for a cooperative stop.
164    pub fn shutdown_grace(mut self, shutdown_grace: Duration) -> Self {
165        self.shutdown_grace = shutdown_grace;
166        self
167    }
168
169    /// Interval between heartbeat log lines emitted at `target = "tsoracle::heartbeat"`.
170    /// Defaults to 10 seconds. Pass `Duration::ZERO` to disable the heartbeat task entirely.
171    ///
172    /// The heartbeat surfaces serving role, current epoch, requests served,
173    /// timestamps issued, and key error counters every interval — proof-of-life
174    /// for production deployments that may not have a metrics exporter installed.
175    ///
176    /// Requires `feature = "tracing"` to emit output; with `tracing` off the
177    /// setter is accepted but no task is spawned (no subscriber to log to).
178    pub fn heartbeat_interval(mut self, interval: Duration) -> Self {
179        self.heartbeat_interval = interval;
180        self
181    }
182
183    /// Configure TLS termination for this server. Applied inside
184    /// [`Server::serve`], [`Server::serve_with_shutdown`], and
185    /// [`Server::serve_with_listener`]. Not applied to [`Server::into_router`] —
186    /// embedders mounting tsoracle alongside their own services control TLS
187    /// on their own tonic builder.
188    #[cfg(any(feature = "tls-rustls", feature = "tls-native"))]
189    pub fn tls_config(mut self, cfg: tonic::transport::ServerTlsConfig) -> Self {
190        self.tls_config = Some(cfg);
191        self
192    }
193
194    /// Per-call ceiling on `GetSeq`'s `count` — the largest contiguous block a
195    /// single dense-sequence request may reserve. Defaults to
196    /// [`tsoracle_core::DEFAULT_MAX_SEQ_COUNT`] (`65_536`).
197    ///
198    /// This is a soft anti-abuse guardrail, not a representational limit: a
199    /// dense block is permanently consumed (the gapless counter only moves
200    /// forward), so the cap bounds how much one call can irrevocably reserve.
201    /// Raise it for batch-allocation workloads or lower it to tighten abuse
202    /// control. The server is the sole authority — clients no longer pre-check
203    /// `count` against a fixed constant, so changing this needs no client
204    /// rebuild; an over-cap request is rejected with `INVALID_ARGUMENT`. The
205    /// `count >= 1` floor is always enforced regardless of this value.
206    ///
207    /// A value of `0` is rejected by [`Self::build`] with
208    /// [`BuildError::ZeroMaxSeqCount`]: it would leave no valid `count` (the
209    /// floor is 1), silently disabling `GetSeq`, so the misconfiguration is
210    /// surfaced at build time rather than at first request.
211    pub fn max_seq_count(mut self, max_seq_count: u32) -> Self {
212        self.max_seq_count = max_seq_count;
213        self
214    }
215
216    pub fn build(self) -> Result<Server, BuildError> {
217        let consensus = self.consensus.ok_or(BuildError::MissingConsensusDriver)?;
218        if self.max_seq_count == 0 {
219            return Err(BuildError::ZeroMaxSeqCount);
220        }
221        let clock = self.clock.unwrap_or_else(|| Arc::new(SystemClock));
222        Ok(Server {
223            consensus,
224            clock,
225            window_ahead: self.window_ahead,
226            failover_advance: self.failover_advance,
227            shutdown_grace: self.shutdown_grace,
228            heartbeat_interval: self.heartbeat_interval,
229            core: Arc::new(ServingCore::new(self.window_ahead, self.max_seq_count)),
230            reporter: Arc::new(crate::reporter::Reporter::new()),
231            #[cfg(any(feature = "tls-rustls", feature = "tls-native"))]
232            tls_config: self.tls_config,
233        })
234    }
235}
236
237pub struct Server {
238    pub(crate) consensus: Arc<dyn ConsensusDriver>,
239    pub(crate) clock: Arc<dyn Clock>,
240    pub(crate) window_ahead: Duration,
241    pub(crate) failover_advance: Duration,
242    /// Bound on the graceful-shutdown wait for the leader-watch task before a
243    /// forced abort. See [`ServerBuilder::shutdown_grace`].
244    pub(crate) shutdown_grace: Duration,
245    /// Interval between periodic heartbeat log lines. See [`ServerBuilder::heartbeat_interval`].
246    ///
247    /// The only reader is the `cfg(feature = "tracing")` spawn block in
248    /// `into_router_parts`; without `tracing` there is no subscriber to log to
249    /// and the spawn arm is compiled out, so the field is genuinely unread.
250    #[cfg_attr(not(feature = "tracing"), allow(dead_code))]
251    pub(crate) heartbeat_interval: Duration,
252    /// Owns the allocator, serving-state channel, and both extension locks, with
253    /// the lock-ordering and step-down invariants private behind its methods.
254    ///
255    /// Held behind an `Arc` so the leader-watch task, the gRPC service, and the
256    /// [`WatchGuard`] / [`serve_inner`] shutdown paths can all reach the same
257    /// core. The guard and the serve loop use their clone to close the serving
258    /// gate *synchronously* at shutdown, leaving the watch task's later
259    /// `step_down` a harmless idempotent repeat.
260    pub(crate) core: Arc<ServingCore>,
261    pub(crate) reporter: Arc<crate::reporter::Reporter>,
262    #[cfg(any(feature = "tls-rustls", feature = "tls-native"))]
263    pub(crate) tls_config: Option<tonic::transport::ServerTlsConfig>,
264}
265
266/// Raw parts produced by [`Server::into_router_parts`]: the gRPC `Routes`, the
267/// leader-watch task's cooperative-cancel sender (dropping it stops the task),
268/// the task's join handle, and the optional heartbeat task's cancel sender /
269/// join handle. [`Server::into_router`] wraps these into a [`WatchGuard`]; the
270/// `serve_*` methods consume them directly via [`serve_inner`].
271///
272/// The heartbeat fields are `None` when the heartbeat is disabled — either by
273/// `ServerBuilder::heartbeat_interval(Duration::ZERO)` or by building without
274/// the `tracing` feature (there is no subscriber to log to).
275pub(crate) struct RouterParts {
276    pub routes: Routes,
277    pub cancel_tx: tokio::sync::oneshot::Sender<()>,
278    pub watch_handle: tokio::task::JoinHandle<Result<(), ServerError>>,
279    pub heartbeat_cancel_tx: Option<tokio::sync::oneshot::Sender<()>>,
280    pub heartbeat_handle: Option<tokio::task::JoinHandle<()>>,
281}
282
283impl Server {
284    pub fn builder() -> ServerBuilder {
285        ServerBuilder::default()
286    }
287
288    /// Subscribe to serving-state transitions.
289    ///
290    /// Returns a fresh `watch::Receiver` observing the same `ServingState`
291    /// the server publishes as leadership comes and goes. Embedders use this
292    /// to gate their own startup on `ServingState::Serving` (see the
293    /// `embedded_router` and piggyback examples). Because `into_router`
294    /// consumes the `Server`, capture the receiver before mounting.
295    ///
296    /// This method is the stable observation API: the receiver is minted from
297    /// the server's `watch::Sender`, so the receiver's type can evolve (e.g. a
298    /// future newtype around `ServingState`) without breaking embedders that
299    /// go through it.
300    pub fn subscribe(&self) -> watch::Receiver<ServingState> {
301        self.core.subscribe()
302    }
303}
304
305impl Server {
306    /// Return the configured `TsoServiceServer<TsoServiceImpl>` as a tonic
307    /// `Routes` value plus a [`WatchGuard`] for the spawned leader-watch task,
308    /// so callers can mount tsoracle's service alongside their own services
309    /// on a shared tonic listener instead of binding a dedicated port.
310    ///
311    /// The returned [`WatchGuard`] owns the leader-watch task. **Keep it alive
312    /// for as long as the mounted `Routes` should serve**: the watch task holds
313    /// an `Arc<Server>` (and the consensus driver) and maintains serving state
314    /// across leadership transitions. Dropping the guard — or calling
315    /// [`WatchGuard::shutdown`] — cooperatively stops the task at the embedder's
316    /// own shutdown. Without the guard the task would keep `Arc<Server>` alive
317    /// until the leadership stream happened to close.
318    ///
319    /// Every termination of the task — cooperative cancellation, driver error,
320    /// panic, or clean EOF on the leadership stream (surfaced as
321    /// `ServerError::WatchStreamClosed`) — publishes
322    /// `ServingState::NotServing { leader_endpoint: None }` before returning, so
323    /// all subsequent RPCs fail fast with `FAILED_PRECONDITION`. Embedders who
324    /// drop the guard without awaiting still get fail-safe behavior.
325    ///
326    /// The `Server::serve()` method is a thin wrapper over this — it calls
327    /// `into_router`, builds a tonic `Server`, and binds a listener.
328    ///
329    /// Returns `Err(ServerError::ReflectionInit)` (only reachable under the
330    /// `reflection` feature) if the embedded descriptor set fails to decode.
331    /// That decode happens before the leader-watch task is spawned, so a failure
332    /// leaves nothing running to clean up.
333    pub fn into_router(self) -> Result<(Routes, WatchGuard), ServerError> {
334        // Read the `Copy` grace before `into_router_parts` consumes `self`, so
335        // the returned guard can bound its own shutdown wait identically to the
336        // `serve_*` paths.
337        let shutdown_grace = self.shutdown_grace;
338        // Clone the shared core and reporter before `into_router_parts` consumes
339        // `self`, so the guard can close the serving gate synchronously on drop /
340        // shutdown rather than relying on the watch task's later publish, and can
341        // record the shutdown_watch_aborted counter if the grace-bounded reap fires.
342        let core = self.core.clone();
343        let reporter = self.reporter.clone();
344        let parts = self.into_router_parts()?;
345        Ok((
346            parts.routes,
347            WatchGuard {
348                cancel_tx: Some(parts.cancel_tx),
349                handle: Some(parts.watch_handle),
350                shutdown_grace,
351                core,
352                reporter,
353                heartbeat_cancel_tx: parts.heartbeat_cancel_tx,
354                heartbeat_handle: parts.heartbeat_handle,
355            },
356        ))
357    }
358
359    /// Spawn the leader-watch task and assemble the gRPC `Routes`, returning
360    /// the raw parts: the routes, the task's cooperative-cancel sender, and its
361    /// `JoinHandle`. [`Self::into_router`] wraps these into a [`WatchGuard`] for
362    /// embedders; the `serve_*` methods drive the parts directly via
363    /// [`serve_inner`], so neither path needs to unwrap the guard's `Option`
364    /// fields.
365    fn into_router_parts(self) -> Result<RouterParts, ServerError> {
366        // Build the reflection service first: a descriptor-decode failure must
367        // surface before we spawn the leader-watch task below, so an error path
368        // never leaks a running task.
369        #[cfg(feature = "reflection")]
370        let reflection = build_reflection_service(tsoracle_proto::FILE_DESCRIPTOR_SET)?;
371
372        let server = Arc::new(self);
373
374        // Cooperative cancellation channel. The `WatchGuard` holds the sender;
375        // the task's `cancel` future resolves on either an explicit send or a
376        // sender drop, so dropping the guard is sufficient to stop the task.
377        let (cancel_tx, cancel_rx) = tokio::sync::oneshot::channel::<()>();
378
379        let watch_server = server.clone();
380        let watch_handle = tokio::spawn(async move {
381            use futures::FutureExt;
382            // Resolves when the WatchGuard signals cancellation or is dropped.
383            let cancel = async move {
384                let _ = cancel_rx.await;
385            };
386            // catch_unwind so a panic in run_leader_watch still routes through
387            // the poisoning path. Without this, embedders who mount into_router
388            // directly and never observe the guard would see
389            // ServingState::Serving remain published while the watch task is
390            // dead — the inverse of the fail-safe guarantee documented above.
391            // The panic is re-raised after poisoning so serve / serve_with_*
392            // continue to translate it into ServerError::WatchPanic via
393            // join_to_server_result.
394            let outcome = std::panic::AssertUnwindSafe(crate::fence::run_leader_watch(
395                watch_server.clone(),
396                cancel,
397            ))
398            .catch_unwind()
399            .await;
400            match outcome {
401                Ok(result) => {
402                    if let Err(ref _e) = result {
403                        // Poison BEFORE returning so embedders who do not observe
404                        // the guard still get fail-safe behavior.
405                        watch_server.core.step_down(None, None);
406                        #[cfg(feature = "tracing")]
407                        tracing::error!(error = %_e, "leader-watch terminated; serving disabled");
408                    }
409                    result
410                }
411                Err(panic_payload) => {
412                    // Mirror the Err branch: poison BEFORE re-raising so
413                    // guard-dropping embedders still observe NotServing.
414                    watch_server.core.step_down(None, None);
415                    #[cfg(feature = "tracing")]
416                    tracing::error!("leader-watch panicked; serving disabled");
417                    std::panic::resume_unwind(panic_payload);
418                }
419            }
420        });
421
422        // Spawn the heartbeat task, if enabled. Gated on `feature = "tracing"`
423        // because the heartbeat module is only compiled with `tracing`
424        // (no subscriber to emit to without it) — and on a non-zero interval,
425        // since `Duration::ZERO` is the documented opt-out.
426        //
427        // The task body is wrapped in `AssertUnwindSafe(...).catch_unwind()`
428        // mirroring the leader-watch spawn above: on panic we bump the
429        // `heartbeat_task_panicked` counter and log at error level, then let
430        // the task end (no restart — the heartbeat is observability, not
431        // correctness, so a panicked task must not be allowed to thrash).
432        let (heartbeat_cancel_tx, heartbeat_handle) = {
433            #[cfg(feature = "tracing")]
434            {
435                if server.heartbeat_interval.is_zero() {
436                    (None, None)
437                } else {
438                    use futures::FutureExt;
439                    let (htx, hrx) = tokio::sync::oneshot::channel::<()>();
440                    let hb_reporter = server.reporter.clone();
441                    let hb_core = server.core.clone();
442                    let hb_interval = server.heartbeat_interval;
443                    let handle = tokio::spawn(async move {
444                        let outcome =
445                            std::panic::AssertUnwindSafe(crate::heartbeat::run_heartbeat(
446                                hb_interval,
447                                hb_core,
448                                hb_reporter.clone(),
449                                hrx,
450                            ))
451                            .catch_unwind()
452                            .await;
453                        if outcome.is_err() {
454                            hb_reporter.heartbeat_task_panicked.increment(1);
455                            tracing::error!(
456                                target: "tsoracle::heartbeat",
457                                "heartbeat task panicked; liveness logs disabled until restart"
458                            );
459                        }
460                    });
461                    (Some(htx), Some(handle))
462                }
463            }
464            #[cfg(not(feature = "tracing"))]
465            {
466                (None, None)
467            }
468        };
469
470        let service = TsoServiceImpl { server };
471        #[allow(unused_mut)]
472        let mut routes = Routes::new(TsoServiceServer::new(service));
473        #[cfg(feature = "reflection")]
474        {
475            routes = routes.add_service(reflection);
476        }
477        Ok(RouterParts {
478            routes,
479            cancel_tx,
480            watch_handle,
481            heartbeat_cancel_tx,
482            heartbeat_handle,
483        })
484    }
485
486    pub async fn serve(self, addr: SocketAddr) -> Result<(), ServerError> {
487        self.serve_with_shutdown(addr, futures::future::pending())
488            .await
489    }
490
491    /// Run the gRPC server until either the caller's `shutdown` fires or the
492    /// leader-watch task terminates.
493    ///
494    /// Three outcomes:
495    /// 1. `shutdown` fires first → tonic drains in-flights and returns Ok.
496    ///    The watch task is then stopped cooperatively, bounded by
497    ///    `shutdown_grace` and forcibly aborted if it overruns (e.g. parked in a
498    ///    wedged consensus-driver call); any error it had been about to return
499    ///    is forfeited (the process is shutting down anyway).
500    /// 2. Watch returns `Ok(Err(e))` → poisoned state is already published;
501    ///    `cancel_tx` triggers tonic's graceful shutdown; in-flight `GetTs`
502    ///    calls whose `try_grant` already succeeded complete with the
503    ///    timestamps they were allocated; new calls fail fast. Returns `Err(e)`
504    ///    — the watch error wins even if the drain itself also errors (see
505    ///    `combine_watch_and_drain`); a drain error is surfaced only when the
506    ///    watch ended cleanly.
507    /// 3. Watch task panics → returns `Err(ServerError::WatchPanic{..})`
508    ///    with the panic payload stringified. Same drain semantics as (2).
509    pub async fn serve_with_shutdown(
510        self,
511        addr: SocketAddr,
512        shutdown: impl Future<Output = ()> + Send + 'static,
513    ) -> Result<(), ServerError> {
514        #[cfg(any(feature = "tls-rustls", feature = "tls-native"))]
515        let tls_config = self.tls_config.clone();
516
517        // Read the `Copy` grace and clone the shared core and reporter before
518        // `into_router_parts` consumes `self`.
519        let shutdown_grace = self.shutdown_grace;
520        let core = self.core.clone();
521        let reporter = self.reporter.clone();
522        let parts = self.into_router_parts()?;
523        let (combined_shutdown, cancel_tx) = combined_shutdown_with_cancel(shutdown);
524
525        let mut tonic = TonicServer::builder();
526        #[cfg(any(feature = "tls-rustls", feature = "tls-native"))]
527        if let Some(cfg) = tls_config {
528            tonic = tonic.tls_config(cfg).map_err(ServerError::Transport)?;
529        }
530        let serve = tonic
531            .add_routes(parts.routes)
532            .serve_with_shutdown(addr, combined_shutdown);
533
534        serve_inner(
535            parts.cancel_tx,
536            parts.watch_handle,
537            parts.heartbeat_cancel_tx,
538            parts.heartbeat_handle,
539            serve,
540            cancel_tx,
541            shutdown_grace,
542            core,
543            reporter,
544        )
545        .await
546    }
547
548    /// Run the gRPC server on a caller-provided `TcpListener` until either
549    /// the caller-provided `shutdown` fires or the leader-watch task terminates.
550    ///
551    /// Use this instead of [`Self::serve_with_shutdown`] when you need to
552    /// observe the OS-picked port (`127.0.0.1:0`) before clients connect, or
553    /// when you want to wrap the listener in an outer adapter before passing it
554    /// in. The listening socket is owned by the caller and passed here; tsoracle
555    /// starts accepting on it immediately.
556    ///
557    /// Three outcomes:
558    /// 1. `shutdown` fires first → tonic drains in-flights and returns `Ok`.
559    ///    The watch handle is aborted; any error it had been about to return
560    ///    is forfeited (the process is shutting down anyway).
561    /// 2. Watch returns `Ok(Err(e))` → poisoned state is already published;
562    ///    the caller-provided shutdown is cancelled internally so tonic begins
563    ///    graceful shutdown; in-flight `GetTs` calls whose `try_grant` already
564    ///    succeeded complete with the timestamps they were allocated; new calls
565    ///    fail fast. Returns `Err(e)` — the watch error wins even if the drain
566    ///    itself also errors (see `combine_watch_and_drain`); a drain error is
567    ///    surfaced only when the watch ended cleanly.
568    /// 3. Watch task panics → returns `Err(ServerError::WatchPanic{..})`
569    ///    with the panic payload stringified. Same drain semantics as (2).
570    pub async fn serve_with_listener(
571        self,
572        listener: tokio::net::TcpListener,
573        shutdown: impl Future<Output = ()> + Send + 'static,
574    ) -> Result<(), ServerError> {
575        #[cfg(any(feature = "tls-rustls", feature = "tls-native"))]
576        let tls_config = self.tls_config.clone();
577
578        // Read the `Copy` grace and clone the shared core and reporter before
579        // `into_router_parts` consumes `self`.
580        let shutdown_grace = self.shutdown_grace;
581        let core = self.core.clone();
582        let reporter = self.reporter.clone();
583        let parts = self.into_router_parts()?;
584        let (combined_shutdown, cancel_tx) = combined_shutdown_with_cancel(shutdown);
585
586        let incoming = tonic::transport::server::TcpIncoming::from(listener);
587
588        let mut tonic = TonicServer::builder();
589        #[cfg(any(feature = "tls-rustls", feature = "tls-native"))]
590        if let Some(cfg) = tls_config {
591            tonic = tonic.tls_config(cfg).map_err(ServerError::Transport)?;
592        }
593        let serve = tonic
594            .add_routes(parts.routes)
595            .serve_with_incoming_shutdown(incoming, combined_shutdown);
596
597        serve_inner(
598            parts.cancel_tx,
599            parts.watch_handle,
600            parts.heartbeat_cancel_tx,
601            parts.heartbeat_handle,
602            serve,
603            cancel_tx,
604            shutdown_grace,
605            core,
606            reporter,
607        )
608        .await
609    }
610}
611
612/// RAII handle to the leader-watch task spawned by [`Server::into_router`].
613///
614/// The watch task holds an `Arc<Server>` (and thus the consensus driver) and
615/// maintains serving state across leadership transitions. This guard ties the
616/// task's lifetime to the guard's: dropping it cooperatively cancels the task,
617/// and the task publishes [`ServingState::NotServing`] before it stops, so any
618/// `Routes` an embedder still has mounted fails subsequent RPCs fast.
619///
620/// Cancellation is cooperative — the task stops at its next await boundary and
621/// never mid-fence, so it is never torn down while holding internal locks, in
622/// contrast to a raw [`tokio::task::JoinHandle::abort`].
623pub struct WatchGuard {
624    // `Option` so `Drop` and the consuming `shutdown` / `abort` methods can
625    // each take a field without a partial-move conflict against the `Drop`
626    // impl. Dropping the sender (rather than sending) is itself the cancel
627    // signal: the task's `cancel` future resolves on sender-drop too.
628    cancel_tx: Option<tokio::sync::oneshot::Sender<()>>,
629    handle: Option<tokio::task::JoinHandle<Result<(), ServerError>>>,
630    /// Bound on the cooperative-stop wait in [`Self::shutdown`] before a forced
631    /// abort. Inherited from [`ServerBuilder::shutdown_grace`].
632    shutdown_grace: Duration,
633    /// Shared serving core, cloned from the `Server`. Lets `Drop` (and the
634    /// consuming `shutdown` / `abort`, which trigger `Drop` on return) close the
635    /// serving gate synchronously at the drop site, instead of waiting for the
636    /// watch task to observe cancellation and publish `NotServing` on its own
637    /// timeline — a window during which the fast gate would still admit RPCs.
638    core: Arc<ServingCore>,
639    /// Metrics reporter, cloned from the `Server`. Used to record the
640    /// `shutdown_watch_aborted` counter if the grace-bounded reap fires.
641    reporter: Arc<crate::reporter::Reporter>,
642    /// Cooperative-cancel sender for the heartbeat task. `None` when the
643    /// heartbeat is disabled (interval == 0 or built without `tracing`).
644    /// Dropping the sender resolves the task's cancel future.
645    heartbeat_cancel_tx: Option<tokio::sync::oneshot::Sender<()>>,
646    /// Join handle for the heartbeat task. `None` when the heartbeat is
647    /// disabled. Output is `()` because the task never returns an error —
648    /// panics are caught inside the task body and recorded via the
649    /// `heartbeat_task_panicked` counter.
650    heartbeat_handle: Option<tokio::task::JoinHandle<()>>,
651}
652
653impl WatchGuard {
654    /// Signal the leader-watch task to stop, wait for it to drain, and report
655    /// its outcome.
656    ///
657    /// A cooperatively cancelled task returns `Ok(())` — the stop was
658    /// requested, so it is not an error. If the task had already terminated on
659    /// its own (driver error, stream EOF, or panic) the original outcome is
660    /// surfaced verbatim: `Err(e)` or [`ServerError::WatchPanic`]. Either way
661    /// serving state is `NotServing` once this returns.
662    ///
663    /// The cooperative wait is bounded by the configured
664    /// [`ServerBuilder::shutdown_grace`]: if the task is parked in a
665    /// consensus-driver call that never returns it is aborted once the grace
666    /// elapses (still reported as `Ok(())`), so an embedder's shutdown can never
667    /// wedge behind a hung driver.
668    pub async fn shutdown(mut self) -> Result<(), ServerError> {
669        // Dropping the senders fires each task's cancel future. The heartbeat
670        // task is reaped first because it is bounded by `tokio::time::sleep`
671        // (cooperative stop is fast and never wedges on a driver call), so its
672        // reap returns quickly and leaves the grace budget for the watch task
673        // — which may be parked in a wedged consensus-driver call.
674        self.heartbeat_cancel_tx.take();
675        self.cancel_tx.take();
676        if let Some(mut hb_handle) = self.heartbeat_handle.take() {
677            match tokio::time::timeout(self.shutdown_grace, &mut hb_handle).await {
678                Ok(Ok(())) => {}
679                // Task panicked — already counted + logged via catch_unwind in
680                // the task body. Nothing more to do here.
681                Ok(Err(_join_err)) => {}
682                // Grace overrun — sleep + select! should always observe a
683                // dropped cancel sender, so this is a backstop. Abort and
684                // reap; no separate metric (the heartbeat is observability
685                // only — its lateness is not a serving correctness signal).
686                Err(_elapsed) => {
687                    hb_handle.abort();
688                    let _ = (&mut hb_handle).await;
689                }
690            }
691        }
692        match self.handle.take() {
693            Some(mut handle) => join_to_server_result(
694                await_watch_within_grace(&mut handle, self.shutdown_grace, &self.reporter).await,
695            ),
696            None => Ok(()),
697        }
698    }
699
700    /// Hard-abort the leader-watch task without waiting for a cooperative stop.
701    ///
702    /// Prefer [`Self::shutdown`] or simply dropping the guard; both let the
703    /// task stop at a safe point. This is an escape hatch for callers that
704    /// cannot await and accept that the task may be torn down mid-fence.
705    pub fn abort(mut self) {
706        if let Some(handle) = self.handle.take() {
707            handle.abort();
708        }
709        // Hard-abort the heartbeat task too — leaving it running after the
710        // watch is torn down would publish heartbeats describing a stale
711        // (typically `NotServing`) view until the Arc<Reporter> is dropped.
712        if let Some(hb_handle) = self.heartbeat_handle.take() {
713            hb_handle.abort();
714        }
715    }
716
717    /// Whether the leader-watch task has finished — terminated for any reason
718    /// (cooperative cancel, driver error, stream EOF, or panic).
719    ///
720    /// A read-only liveness probe that neither consumes the guard nor disturbs
721    /// its cancel-on-drop behavior, so an embedder can poll task health while
722    /// keeping the guard alive.
723    pub fn is_finished(&self) -> bool {
724        self.handle
725            .as_ref()
726            .is_none_or(|handle| handle.is_finished())
727    }
728}
729
730impl Drop for WatchGuard {
731    fn drop(&mut self) {
732        // Close the serving gate synchronously, here at the drop site. Dropping
733        // the cancel sender below only *requests* the watch task to stop; the
734        // task publishes `NotServing` later, on its own timeline. Between this
735        // drop and the task's next poll the fast gate would still read `Serving`
736        // (and the allocator would still grant), so an RPC could be admitted by a
737        // server that has already been told to stop serving. `step_down` clears
738        // the allocator and publishes `NotServing` now, before any await; the
739        // watch task's later `step_down(None, None)` on cooperative cancel (see
740        // `fence::run_leader_watch`) republishes the identical state, so the
741        // double-close is harmless and idempotent.
742        self.core.step_down(None, None);
743        // Dropping the sender (if `shutdown` / `abort` did not already take it)
744        // resolves the task's cancel future; the task then publishes
745        // `NotServing` and returns. The `JoinHandle` is dropped here too,
746        // detaching the task to finish its cooperative shutdown on its own.
747        self.cancel_tx.take();
748        // Same treatment for the heartbeat task. `Drop` is sync so we cannot
749        // await the cooperative stop; instead we drop the cancel sender (the
750        // task will observe it at its next select! boundary) and hard-abort
751        // the handle so it cannot outlive the guard and publish heartbeats
752        // describing a stale view if the runtime keeps the Arc alive.
753        self.heartbeat_cancel_tx.take();
754        if let Some(hb_handle) = self.heartbeat_handle.take() {
755            hb_handle.abort();
756        }
757    }
758}
759
760/// Merge the caller's `shutdown` future with an internal cancellation signal.
761///
762/// Both [`Server::serve_with_shutdown`] and [`Server::serve_with_listener`]
763/// need tonic to stop when EITHER the caller's `shutdown` fires OR the
764/// leader-watch task terminates (signalled by firing the returned
765/// `oneshot::Sender`). This builds that merged shutdown future and hands back
766/// the sender so [`serve_inner`] can trip it from the watch arm.
767fn combined_shutdown_with_cancel(
768    shutdown: impl Future<Output = ()> + Send + 'static,
769) -> (
770    impl Future<Output = ()> + Send + 'static,
771    tokio::sync::oneshot::Sender<()>,
772) {
773    let (cancel_tx, cancel_rx) = tokio::sync::oneshot::channel::<()>();
774    let combined_shutdown = async move {
775        tokio::select! {
776            _ = shutdown => {}
777            _ = cancel_rx => {}
778        }
779    };
780    (combined_shutdown, cancel_tx)
781}
782
783/// Wait for the leader-watch task to stop cooperatively, but no longer than
784/// `grace`, then forcibly abort it if it is still running.
785///
786/// The watch task observes its cancel signal only at the `select!` boundaries
787/// in [`crate::fence::run_leader_watch`], never inside a fence attempt. A
788/// consensus driver whose `load_high_water` / `persist_high_water` never
789/// returns (the trait places no latency bound; see
790/// [`tsoracle_consensus::ConsensusDriver`]) therefore parks the task upstream
791/// of any cancel-observing await, so dropping the cancel sender cannot stop it.
792/// Left unbounded, the shutdown wait would block process exit until the kubelet
793/// escalates to SIGKILL on a drain. Bounding the wait by `grace` and aborting
794/// on expiry guarantees forward progress: `tokio` tears a suspended task (and
795/// the wedged driver future it holds) down at the abort, dropping its
796/// drain-barrier guard.
797///
798/// Returns the task's join result. A clean cooperative stop forwards its real
799/// outcome verbatim; an aborted task surfaces as a cancelled `JoinError`, which
800/// [`join_to_server_result`] maps to `Ok(())` — the stop was requested, so a
801/// forced abort during shutdown is not an error. A `grace` of zero aborts
802/// immediately (the `timeout` future is already elapsed on first poll).
803async fn await_watch_within_grace(
804    watch_handle: &mut tokio::task::JoinHandle<Result<(), ServerError>>,
805    grace: Duration,
806    reporter: &Arc<crate::reporter::Reporter>,
807) -> Result<Result<(), ServerError>, tokio::task::JoinError> {
808    match tokio::time::timeout(grace, &mut *watch_handle).await {
809        Ok(join_result) => join_result,
810        Err(_elapsed) => {
811            reporter.shutdown_watch_aborted.increment(1);
812            #[cfg(feature = "tracing")]
813            tracing::warn!(
814                grace_ms = grace.as_millis() as u64,
815                "leader-watch task did not stop within the shutdown grace; aborting it (a consensus-driver call likely exceeded its latency bound)"
816            );
817            watch_handle.abort();
818            // Reap the aborted task so its Drop (releasing any held drain-barrier
819            // guard) runs before we report shutdown complete. Bounded: an aborted
820            // task resolves at its next poll.
821            (&mut *watch_handle).await
822        }
823    }
824}
825
826/// Drive the gRPC `serve_future` against the leader-watch task, shared by
827/// [`Server::serve_with_shutdown`] and [`Server::serve_with_listener`].
828///
829/// The two public methods differ only in how `serve_future` is assembled
830/// (address-bound via `serve_with_shutdown` vs listener-bound via
831/// `serve_with_incoming_shutdown`); everything downstream — the biased select,
832/// the cooperative-cancel path, and the drain/translate logic — is identical
833/// and lives here so a future change need only be made once.
834///
835/// `tonic_cancel_tx` is the cancellation half paired with the `serve_future`'s
836/// shutdown signal (see [`combined_shutdown_with_cancel`]); firing it begins
837/// tonic's graceful drain when the watch task terminates first. `watch_cancel_tx`
838/// is the leader-watch task's own cooperative-cancel sender (the same one a
839/// [`WatchGuard`] holds for embedders); dropping it stops the task. Taking the
840/// raw parts rather than a `WatchGuard` keeps this path free of the guard's
841/// `Option` fields — neither the watch handle nor the cancel sender is optional
842/// here. `shutdown_grace` bounds the user-shutdown arm's wait for the watch task
843/// (see [`await_watch_within_grace`]). `core` is the shared serving core (the
844/// same one the watch task and the gRPC service hold): the user-shutdown arm
845/// closes the gate on it synchronously so no RPC is admitted in the window
846/// before the watch task observes cancellation and publishes `NotServing`.
847// Private serve helper. The wide signature is the cost of being the single
848// merge point for the two public `serve_*` paths and the leader-watch +
849// heartbeat task pair: bundling these into a struct just to placate clippy
850// would obscure the lifecycle (every parameter is consumed exactly once and
851// has no shared identity worth naming). Keep the arguments visible.
852#[allow(clippy::too_many_arguments)]
853async fn serve_inner<S>(
854    watch_cancel_tx: tokio::sync::oneshot::Sender<()>,
855    mut watch_handle: tokio::task::JoinHandle<Result<(), ServerError>>,
856    heartbeat_cancel_tx: Option<tokio::sync::oneshot::Sender<()>>,
857    heartbeat_handle: Option<tokio::task::JoinHandle<()>>,
858    serve_future: S,
859    tonic_cancel_tx: tokio::sync::oneshot::Sender<()>,
860    shutdown_grace: Duration,
861    core: Arc<ServingCore>,
862    reporter: Arc<crate::reporter::Reporter>,
863) -> Result<(), ServerError>
864where
865    S: Future<Output = Result<(), tonic::transport::Error>>,
866{
867    tokio::pin!(serve_future);
868
869    let outcome = tokio::select! {
870        // Bias toward the watch arm: if both are ready in the same poll
871        // (rare but possible — graceful shutdown completed in the same
872        // tick the watch returned), we want to surface the watch error
873        // rather than report a clean shutdown.
874        biased;
875
876        watch_result = &mut watch_handle => {
877            // Watch terminated. State is already poisoned (see watch
878            // task body in into_router). Trigger tonic drain, wait for
879            // it to finish, then report the watch's outcome — preferring
880            // it over any drain error, which surfaces only if the watch
881            // itself ended cleanly.
882            let _ = tonic_cancel_tx.send(());
883            let drain_result = serve_future.await;
884            combine_watch_and_drain(watch_result, drain_result)
885        }
886        serve_result = &mut serve_future => {
887            // User shutdown fired (or our cancel — but watch arm has
888            // `biased` priority, so reaching here means user shutdown).
889            // Prefer a cooperative stop: dropping the cancel sender resolves
890            // the task's cancel future so it stops at its next `select!`
891            // boundary, having published `NotServing` and never torn down
892            // mid-fence while holding `extension_gate.write()`. But a
893            // cooperative stop is only observed at those boundaries, never
894            // inside a fence attempt — a consensus-driver call that never
895            // returns (the trait places no latency bound) would park the task
896            // upstream of any cancel point and block process exit until a
897            // kubelet SIGKILL. `await_watch_within_grace` therefore bounds the
898            // wait by `shutdown_grace` and aborts the task if it overruns. The
899            // task's own outcome (a clean `Ok(())` on cooperative stop, a
900            // cancelled `JoinError` on abort) is discarded; the user-requested
901            // shutdown result wins.
902            //
903            // Close the serving gate synchronously first: dropping the sender
904            // only *requests* the stop, and a task aborted on grace expiry (or
905            // simply not yet rescheduled) may never reach its own `step_down`. So
906            // a `GetTs` arriving during the drain would still be admitted unless
907            // we close the gate here. `step_down` is idempotent with the watch
908            // task's own cooperative-cancel publish.
909            core.step_down(None, None);
910            drop(watch_cancel_tx);
911            let _ = await_watch_within_grace(&mut watch_handle, shutdown_grace, &reporter).await;
912            serve_result?;
913            Ok(())
914        }
915    };
916
917    // Stop the heartbeat task on every exit path. Done after the watch reap so
918    // the watch-arm `combine_watch_and_drain` already saw the watch outcome,
919    // and the user-shutdown arm has finished its grace-bounded wait. Dropping
920    // the cancel sender breaks the task's `tokio::select! { biased; cancel,
921    // sleep }` loop on the next poll; if the task is wedged for any reason we
922    // hard-abort on grace overrun. The task's outcome is observability only
923    // and cannot influence serving correctness, so its join result is dropped.
924    drop(heartbeat_cancel_tx);
925    if let Some(mut hb_handle) = heartbeat_handle {
926        match tokio::time::timeout(shutdown_grace, &mut hb_handle).await {
927            Ok(Ok(())) => {}
928            Ok(Err(_join_err)) => {} // panic — already counted via catch_unwind
929            Err(_elapsed) => {
930                hb_handle.abort();
931                let _ = (&mut hb_handle).await;
932            }
933        }
934    }
935
936    outcome
937}
938
939/// Convert a `JoinHandle` result into a `ServerError`-typed result.
940///
941/// - `Ok(Ok(()))` — cooperative cancellation: `run_leader_watch` observed its
942///   cancel signal (the `WatchGuard` was dropped, `WatchGuard::shutdown` was
943///   called, or `serve_inner` cancelled it on user shutdown), published
944///   `NotServing`, and returned cleanly. Forwarded verbatim as `Ok(())`.
945/// - `Ok(Err(e))` — task returned an error (including `WatchStreamClosed`
946///   from a clean EOF). Forward verbatim.
947/// - `Err(JoinError)` — task was aborted or panicked. An abort
948///   (`WatchGuard::abort` or `JoinHandle::abort`) maps to Ok (we asked for it);
949///   a panic maps to `WatchPanic` with payload.
950fn join_to_server_result(
951    join_result: Result<Result<(), ServerError>, tokio::task::JoinError>,
952) -> Result<(), ServerError> {
953    match join_result {
954        Ok(inner) => inner,
955        Err(join_err) if join_err.is_panic() => {
956            let payload = panic_payload_to_string(join_err.into_panic());
957            Err(ServerError::WatchPanic {
958                payload,
959                bt: Bt::capture(),
960            })
961        }
962        Err(_cancelled) => Ok(()),
963    }
964}
965
966/// Combine the leader-watch outcome with the tonic graceful-drain outcome
967/// after the watch arm fired.
968///
969/// When the watch task terminates first we trigger the drain and then must
970/// report a single result. The watch error is the root cause — poisoned
971/// serving state was already published before the task returned — so it wins
972/// when both fail. A drain error (port stolen, resource exhaustion) is only
973/// surfaced when the watch outcome is otherwise `Ok`; previously it was
974/// discarded via `let _ = serve.await`, hiding a failed drain behind a clean
975/// shutdown report.
976///
977/// Generic over the drain error so the precedence logic is unit-testable
978/// without fabricating a `tonic::transport::Error` (which has no public
979/// constructor): production passes `Result<(), tonic::transport::Error>`,
980/// tests pass `Result<(), ServerError>` via the reflexive `From` impl.
981fn combine_watch_and_drain<E>(
982    watch_result: Result<Result<(), ServerError>, tokio::task::JoinError>,
983    drain_result: Result<(), E>,
984) -> Result<(), ServerError>
985where
986    ServerError: From<E>,
987{
988    match join_to_server_result(watch_result) {
989        Err(watch_err) => Err(watch_err),
990        Ok(()) => drain_result.map_err(ServerError::from),
991    }
992}
993
994/// Build the gRPC reflection service from an encoded protobuf descriptor set.
995///
996/// Factored out of [`Server::into_router`] so the decode-failure path is unit
997/// testable: production passes [`tsoracle_proto::FILE_DESCRIPTOR_SET`], while
998/// tests can feed deliberately corrupt bytes to exercise the error mapping.
999/// A decode failure becomes [`ServerError::ReflectionInit`] rather than a panic.
1000#[cfg(feature = "reflection")]
1001fn build_reflection_service(
1002    descriptor_set: &[u8],
1003) -> Result<
1004    tonic_reflection::server::v1::ServerReflectionServer<
1005        impl tonic_reflection::server::v1::ServerReflection,
1006    >,
1007    ServerError,
1008> {
1009    tonic_reflection::server::Builder::configure()
1010        .register_encoded_file_descriptor_set(descriptor_set)
1011        .build_v1()
1012        .map_err(ServerError::ReflectionInit)
1013}
1014
1015fn panic_payload_to_string(panic: Box<dyn std::any::Any + Send>) -> String {
1016    if let Some(text) = panic.downcast_ref::<&'static str>() {
1017        (*text).to_string()
1018    } else if let Some(text) = panic.downcast_ref::<String>() {
1019        text.clone()
1020    } else {
1021        "watch task panicked with non-string payload".to_string()
1022    }
1023}
1024
1025#[cfg(any(test, feature = "test-fakes"))]
1026impl Server {
1027    /// Test-only entry point for the leader-watch loop. Exposed to integration
1028    /// tests via the `test-fakes` feature; not part of the stable public API.
1029    #[doc(hidden)]
1030    pub async fn run_leader_watch_for_tests(self: Arc<Self>) -> Result<(), ServerError> {
1031        // A never-resolving cancel future: these tests drive termination via
1032        // leadership events or `JoinHandle::abort`, not cooperative cancel.
1033        crate::fence::run_leader_watch(self, futures::future::pending()).await
1034    }
1035
1036    /// Test-only allocator probe. Issues a window grant against the current
1037    /// in-memory state without going through the gRPC service. Used by
1038    /// regression tests that need to observe the behavioral fence (no
1039    /// timestamp at or below the prior leader's high-water) directly.
1040    #[doc(hidden)]
1041    pub fn try_grant_for_tests(&self, count: u32) -> Result<WindowGrant, CoreError> {
1042        self.core.try_grant(self.clock.now_ms(), count)
1043    }
1044}
1045
1046#[cfg(test)]
1047mod tests {
1048    use super::*;
1049
1050    #[test]
1051    fn panic_payload_to_string_recovers_static_str() {
1052        // `panic!("literal")` produces a `&'static str` payload; we want the
1053        // verbatim text so operators see what the watch task said.
1054        let payload: Box<dyn std::any::Any + Send> = Box::new("watch boom");
1055        assert_eq!(panic_payload_to_string(payload), "watch boom");
1056    }
1057
1058    #[test]
1059    fn panic_payload_to_string_recovers_owned_string() {
1060        // `panic!("{var}")` produces a `String` payload (formatted at panic
1061        // time); the helper must downcast that branch too.
1062        let payload: Box<dyn std::any::Any + Send> = Box::new(String::from("formatted"));
1063        assert_eq!(panic_payload_to_string(payload), "formatted");
1064    }
1065
1066    #[test]
1067    fn panic_payload_to_string_falls_back_for_other_types() {
1068        // Custom payloads (panic!(MyType { .. })) hit the catch-all branch.
1069        struct Custom;
1070        let payload: Box<dyn std::any::Any + Send> = Box::new(Custom);
1071        assert_eq!(
1072            panic_payload_to_string(payload),
1073            "watch task panicked with non-string payload",
1074        );
1075    }
1076
1077    #[test]
1078    fn serving_transitions_publish_through_core() {
1079        // The Server delegates serving-state transitions to its ServingCore; a
1080        // step_down on a freshly built Server lands as NotServing with the hint.
1081        // (The #346 send_replace-with-zero-receivers regression is pinned by the
1082        // ServingCore unit tests, which can observe `receiver_count` directly.)
1083        let server = Server::builder()
1084            .consensus_driver(Arc::new(crate::test_fakes::InMemoryDriver::new()))
1085            .build()
1086            .expect("build must succeed");
1087
1088        let hint = PeerEndpoint::try_from("new-leader:9000").unwrap();
1089        server.core.step_down(Some(hint.clone()), Some(Epoch(7)));
1090
1091        match server.core.serving_state() {
1092            ServingState::NotServing {
1093                leader_endpoint,
1094                leader_epoch,
1095            } => {
1096                assert_eq!(leader_endpoint, Some(hint));
1097                assert_eq!(leader_epoch, Some(Epoch(7)));
1098            }
1099            ServingState::Serving => panic!("expected NotServing after step_down"),
1100        }
1101    }
1102
1103    #[cfg(any(feature = "tls-rustls", feature = "tls-native"))]
1104    #[test]
1105    fn builder_stores_tls_config() {
1106        // The serve_* paths read `tls_config` from `Server` (not the builder)
1107        // after `into_router` consumes self — so the field must survive the
1108        // builder → Server hand-off, not just the builder method.
1109        use crate::test_fakes::InMemoryDriver;
1110
1111        let driver = Arc::new(InMemoryDriver::new());
1112        let cfg = tonic::transport::ServerTlsConfig::new();
1113        let server = Server::builder()
1114            .consensus_driver(driver)
1115            .tls_config(cfg)
1116            .build()
1117            .expect("build with tls_config must succeed");
1118        assert!(server.tls_config.is_some());
1119    }
1120
1121    #[test]
1122    fn build_rejects_zero_max_seq_count() {
1123        // max_seq_count(0) makes every positive `count` fail as
1124        // SeqCountTooLarge{max:0} (the count>=1 floor leaves no valid value) —
1125        // GetSeq is silently dead. build() must fail-fast rather than ship a
1126        // server where GetSeq can never succeed.
1127        // `Server` is not `Debug`, so match rather than `expect_err`.
1128        match Server::builder()
1129            .consensus_driver(Arc::new(crate::test_fakes::InMemoryDriver::new()))
1130            .max_seq_count(0)
1131            .build()
1132        {
1133            Err(BuildError::ZeroMaxSeqCount) => {}
1134            Err(other) => panic!("expected ZeroMaxSeqCount, got {other:?}"),
1135            Ok(_) => panic!("max_seq_count(0) must be rejected at build, but build succeeded"),
1136        }
1137    }
1138
1139    #[test]
1140    fn build_accepts_max_seq_count_of_one() {
1141        // 1 is the smallest usable cap: a single-ordinal block is valid, so the
1142        // floor and the cap coincide and GetSeq still works. Must build.
1143        Server::builder()
1144            .consensus_driver(Arc::new(crate::test_fakes::InMemoryDriver::new()))
1145            .max_seq_count(1)
1146            .build()
1147            .expect("max_seq_count(1) must build");
1148    }
1149
1150    #[tokio::test]
1151    async fn join_to_server_result_passes_through_clean_outcome() {
1152        // Ok(Ok(())) — task finished cleanly; forward verbatim.
1153        let handle = tokio::spawn(async { Ok::<(), ServerError>(()) });
1154        let join = handle.await;
1155        assert!(matches!(join_to_server_result(join), Ok(())));
1156    }
1157
1158    #[tokio::test]
1159    async fn join_to_server_result_forwards_inner_error() {
1160        // Ok(Err(e)) — task returned an error; forward it.
1161        let handle = tokio::spawn(async {
1162            Err::<(), ServerError>(ServerError::WatchPanic {
1163                payload: "synthetic".into(),
1164                bt: Bt::capture(),
1165            })
1166        });
1167        let join = handle.await;
1168        match join_to_server_result(join) {
1169            Err(ServerError::WatchPanic { payload, .. }) => assert_eq!(payload, "synthetic"),
1170            other => panic!("expected forwarded WatchPanic, got {other:?}"),
1171        }
1172    }
1173
1174    #[tokio::test]
1175    async fn join_to_server_result_translates_panic_to_watch_panic() {
1176        // Err(JoinError::is_panic) — task panicked; surface as WatchPanic with
1177        // the payload stringified by `panic_payload_to_string`.
1178        let handle = tokio::spawn(async {
1179            panic!("intentional");
1180            #[allow(unreachable_code)]
1181            Ok::<(), ServerError>(())
1182        });
1183        let join = handle.await;
1184        match join_to_server_result(join) {
1185            Err(ServerError::WatchPanic { payload, .. }) => {
1186                assert!(payload.contains("intentional"))
1187            }
1188            other => panic!("expected WatchPanic, got {other:?}"),
1189        }
1190    }
1191
1192    #[tokio::test]
1193    async fn join_to_server_result_treats_cancellation_as_clean_exit() {
1194        // Err(JoinError::is_cancelled) — caller aborted the task; we asked
1195        // for that, so map to Ok.
1196        let handle: tokio::task::JoinHandle<Result<(), ServerError>> =
1197            tokio::spawn(async { futures::future::pending().await });
1198        handle.abort();
1199        let join = handle.await;
1200        assert!(matches!(join_to_server_result(join), Ok(())));
1201    }
1202
1203    #[tokio::test]
1204    async fn combine_watch_and_drain_surfaces_drain_error_when_watch_ok() {
1205        // Watch ended cleanly but the graceful drain failed (port stolen,
1206        // resource exhaustion). The drain error must not be swallowed.
1207        let watch = tokio::spawn(async { Ok::<(), ServerError>(()) }).await;
1208        let drain: Result<(), ServerError> = Err(ServerError::WatchStreamClosed);
1209        assert!(matches!(
1210            combine_watch_and_drain(watch, drain),
1211            Err(ServerError::WatchStreamClosed)
1212        ));
1213    }
1214
1215    #[tokio::test]
1216    async fn combine_watch_and_drain_returns_ok_when_both_succeed() {
1217        // Watch clean, drain clean — the only fully-Ok outcome.
1218        let watch = tokio::spawn(async { Ok::<(), ServerError>(()) }).await;
1219        let drain: Result<(), ServerError> = Ok(());
1220        assert!(matches!(combine_watch_and_drain(watch, drain), Ok(())));
1221    }
1222
1223    #[tokio::test]
1224    async fn combine_watch_and_drain_prefers_watch_error_over_drain_error() {
1225        // Both failed. The watch error is the root cause (poisoned state was
1226        // already published), so it wins; the drain error is dropped.
1227        let watch = tokio::spawn(async {
1228            Err::<(), ServerError>(ServerError::WatchPanic {
1229                payload: "watch".into(),
1230                bt: Bt::capture(),
1231            })
1232        })
1233        .await;
1234        let drain: Result<(), ServerError> = Err(ServerError::WatchStreamClosed);
1235        match combine_watch_and_drain(watch, drain) {
1236            Err(ServerError::WatchPanic { payload, .. }) => assert_eq!(payload, "watch"),
1237            other => panic!("expected watch error to win, got {other:?}"),
1238        }
1239    }
1240
1241    #[tokio::test]
1242    async fn combine_watch_and_drain_returns_watch_error_when_drain_ok() {
1243        // Watch failed, drain succeeded — forward the watch error verbatim.
1244        let watch = tokio::spawn(async {
1245            Err::<(), ServerError>(ServerError::WatchPanic {
1246                payload: "watch".into(),
1247                bt: Bt::capture(),
1248            })
1249        })
1250        .await;
1251        let drain: Result<(), ServerError> = Ok(());
1252        match combine_watch_and_drain(watch, drain) {
1253            Err(ServerError::WatchPanic { payload, .. }) => assert_eq!(payload, "watch"),
1254            other => panic!("expected forwarded watch error, got {other:?}"),
1255        }
1256    }
1257
1258    #[tokio::test]
1259    async fn dropping_watch_guard_closes_serving_gate_synchronously() {
1260        // Regression: dropping the guard must close the serving gate at the
1261        // drop site, not on the watch task's later timeline. Build a guard whose
1262        // watch handle never touches the core (so the ONLY thing that can flip
1263        // the state to NotServing is `Drop`), publish `Serving`, then drop the
1264        // guard and read the state with NO await in between. On the current-thread
1265        // test runtime no other task can run between the synchronous `drop` and
1266        // the synchronous `serving_state` read, so observing `NotServing` proves
1267        // `Drop` closed the gate synchronously rather than the watch task.
1268        let core = Arc::new(ServingCore::new(
1269            Duration::from_secs(3),
1270            tsoracle_core::DEFAULT_MAX_SEQ_COUNT,
1271        ));
1272        core.publish_serving();
1273
1274        let handle: tokio::task::JoinHandle<Result<(), ServerError>> =
1275            tokio::spawn(async { Ok(()) });
1276        let (cancel_tx, _cancel_rx) = tokio::sync::oneshot::channel::<()>();
1277        let guard = WatchGuard {
1278            cancel_tx: Some(cancel_tx),
1279            handle: Some(handle),
1280            shutdown_grace: Duration::from_secs(10),
1281            core: core.clone(),
1282            reporter: Arc::new(crate::reporter::Reporter::new()),
1283            heartbeat_cancel_tx: None,
1284            heartbeat_handle: None,
1285        };
1286
1287        drop(guard);
1288
1289        assert!(
1290            matches!(core.serving_state(), ServingState::NotServing { .. }),
1291            "dropping the WatchGuard must close the serving gate synchronously",
1292        );
1293    }
1294
1295    #[tokio::test]
1296    async fn serve_inner_closes_serving_gate_on_user_shutdown() {
1297        // Regression for the serve path: when the caller's shutdown fires,
1298        // `serve_inner` drops the watch cancel sender and waits out the grace,
1299        // forcibly aborting the watch task if it overruns. A task parked upstream
1300        // of any cancel-observing await (modelled here by a never-resolving
1301        // future) is aborted before it can publish `NotServing`, so the gate
1302        // would stay open unless `serve_inner` closes it itself. Seed `Serving`,
1303        // run the user-shutdown arm with a zero grace (immediate abort), and
1304        // assert the gate is closed on return.
1305        let core = Arc::new(ServingCore::new(
1306            Duration::from_secs(3),
1307            tsoracle_core::DEFAULT_MAX_SEQ_COUNT,
1308        ));
1309        core.publish_serving();
1310
1311        let watch_handle: tokio::task::JoinHandle<Result<(), ServerError>> =
1312            tokio::spawn(async { futures::future::pending().await });
1313        let (watch_cancel_tx, _watch_cancel_rx) = tokio::sync::oneshot::channel::<()>();
1314        let (tonic_cancel_tx, _tonic_cancel_rx) = tokio::sync::oneshot::channel::<()>();
1315
1316        // A serve future that is immediately ready models the user's shutdown
1317        // having fired; with the biased select preferring the (pending) watch arm,
1318        // control reaches the user-shutdown arm.
1319        let serve_future = async { Ok::<(), tonic::transport::Error>(()) };
1320
1321        let result = serve_inner(
1322            watch_cancel_tx,
1323            watch_handle,
1324            None, // heartbeat_cancel_tx — heartbeat disabled in this regression test
1325            None, // heartbeat_handle
1326            serve_future,
1327            tonic_cancel_tx,
1328            Duration::from_millis(0),
1329            core.clone(),
1330            Arc::new(crate::reporter::Reporter::new()),
1331        )
1332        .await;
1333
1334        assert!(
1335            result.is_ok(),
1336            "user shutdown must return Ok, got {result:?}"
1337        );
1338        assert!(
1339            matches!(core.serving_state(), ServingState::NotServing { .. }),
1340            "serve_inner's user-shutdown arm must close the serving gate synchronously",
1341        );
1342    }
1343
1344    #[cfg(feature = "reflection")]
1345    #[test]
1346    fn build_reflection_service_accepts_embedded_descriptor_set() {
1347        // The descriptor set emitted by `tsoracle-proto`'s build.rs must decode
1348        // cleanly — this is the production happy path that previously sat behind
1349        // an `expect`.
1350        assert!(build_reflection_service(tsoracle_proto::FILE_DESCRIPTOR_SET).is_ok());
1351    }
1352
1353    #[cfg(feature = "reflection")]
1354    #[test]
1355    fn build_reflection_service_maps_corrupt_descriptor_to_typed_error() {
1356        // A descriptor set that fails to decode (build artifact drift) must
1357        // surface as a typed `ServerError::ReflectionInit`, not a panic. The
1358        // bytes below are not a valid encoded `FileDescriptorSet`.
1359        let corrupt = b"\xff\xff\xff\xff not a descriptor set";
1360        // The Ok variant wraps a reflection service that is not `Debug`, so map
1361        // to a unit result before asserting on the error variant.
1362        match build_reflection_service(corrupt).map(|_| ()) {
1363            Err(ServerError::ReflectionInit(_)) => {}
1364            other => panic!("expected ReflectionInit error, got {other:?}"),
1365        }
1366    }
1367}