Skip to main content

tsoracle_server/
server.rs

1//
2//  ░▀█▀░█▀▀░█▀█░█▀▄░█▀█░█▀▀░█░░░█▀▀
3//  ░░█░░▀▀█░█░█░█▀▄░█▀█░█░░░█░░░█▀▀
4//  ░░▀░░▀▀▀░▀▀▀░▀░▀░▀░▀░▀▀▀░▀▀▀░▀▀▀
5//
6//  tsoracle — Distributed Timestamp Oracle
7//  https://www.tsoracle.rs
8//
9//  Copyright (c) 2026 Prisma Risk
10//
11//  Licensed under the Apache License, Version 2.0 (the "License");
12//  you may not use this file except in compliance with the License.
13//  You may obtain a copy of the License at
14//
15//      https://www.apache.org/licenses/LICENSE-2.0
16//
17//  Unless required by applicable law or agreed to in writing, software
18//  distributed under the License is distributed on an "AS IS" BASIS,
19//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20//  See the License for the specific language governing permissions and
21//  limitations under the License.
22//
23
24use core::time::Duration;
25use std::future::Future;
26use std::net::SocketAddr;
27use std::sync::Arc;
28use tokio::sync::watch;
29use tonic::service::Routes;
30use tonic::transport::Server as TonicServer;
31use tsoracle_consensus::ConsensusDriver;
32#[cfg(any(test, feature = "test-fakes"))]
33use tsoracle_core::{CoreError, WindowGrant};
34use tsoracle_core::{Epoch, PeerEndpoint};
35use tsoracle_proto::v1::tso_service_server::TsoServiceServer;
36
37use crate::bt::Bt;
38use crate::clock::{Clock, SystemClock};
39use crate::service::TsoServiceImpl;
40use crate::serving_core::ServingCore;
41
42#[derive(Debug, thiserror::Error)]
43pub enum BuildError {
44    #[error("consensus_driver is required")]
45    MissingConsensusDriver,
46    /// `max_seq_count` was set to 0. Every positive `count` would then be
47    /// rejected as `SeqCountTooLarge` (the `count >= 1` floor leaves no valid
48    /// value), silently disabling `GetSeq`. Rejected at build time so the
49    /// misconfiguration surfaces immediately rather than at first request.
50    #[error("max_seq_count must be >= 1 (0 would reject every GetSeq request)")]
51    ZeroMaxSeqCount,
52    /// `max_seq_batch_keys` was set to 0. Every `GetSeqBatch` request would
53    /// then be rejected as exceeding the batch-key cap (the minimum is 1
54    /// entry), silently disabling `GetSeqBatch`. Rejected at build time so the
55    /// misconfiguration surfaces immediately rather than at first request.
56    #[error("max_seq_batch_keys must be >= 1 (0 would reject every GetSeqBatch request)")]
57    ZeroMaxSeqBatchKeys,
58}
59
60#[derive(Debug, thiserror::Error)]
61pub enum ServerError {
62    #[error("transport: {0}")]
63    Transport(#[from] tonic::transport::Error),
64    #[error("consensus: {0}")]
65    Consensus(#[from] tsoracle_consensus::ConsensusError),
66    #[error("core: {0}")]
67    Core(#[from] tsoracle_core::CoreError),
68    /// The leader-watch task panicked. Distinct from a clean error return so
69    /// operators can tell "driver returned Err" (recoverable design) from
70    /// "task panicked" (programming bug).
71    #[error("leader-watch task panicked: {payload}{bt}")]
72    WatchPanic { payload: String, bt: Bt },
73    /// The consensus driver's `leadership_events()` stream ended cleanly while
74    /// the leader-watch task was running. The stream is contracted to live for
75    /// the life of the server, so its end is anomalous (driver shutdown, lost
76    /// session, etc.) — distinct from a `Consensus` error returned mid-fence.
77    /// The watch task publishes `ServingState::NotServing` before returning
78    /// this variant so embedders who never observe the [`WatchGuard`] still get
79    /// the documented fail-safe behavior.
80    #[error("consensus driver leadership stream closed")]
81    WatchStreamClosed,
82    /// The embedded protobuf descriptor set failed to decode while building the
83    /// gRPC reflection service. `tsoracle-proto`'s `build.rs` emits these bytes
84    /// from checked-in `.proto` sources, so a failure here signals build-artifact
85    /// drift (a corrupt or stale descriptor) rather than a runtime condition —
86    /// surfaced as a diagnosable startup error instead of a process panic.
87    #[cfg(feature = "reflection")]
88    #[error("failed to build gRPC reflection service from embedded descriptor set: {0}")]
89    ReflectionInit(#[source] tonic_reflection::server::Error),
90}
91
92#[derive(Clone, Debug)]
93pub enum ServingState {
94    NotServing {
95        leader_endpoint: Option<PeerEndpoint>,
96        leader_epoch: Option<Epoch>,
97    },
98    Serving,
99}
100
101/// Default bound on how long a graceful shutdown waits for the leader-watch
102/// task to stop cooperatively before forcibly aborting it. The abort is a
103/// last-resort safety net for a consensus driver whose `load_high_water` /
104/// `persist_high_water` is wedged (the trait places no latency bound; see
105/// [`ConsensusDriver`]). Chosen to sit comfortably under a typical Kubernetes
106/// `terminationGracePeriodSeconds` (30s) so the abort, the tonic drain, and
107/// process exit all complete before the kubelet escalates to SIGKILL.
108const DEFAULT_SHUTDOWN_GRACE: Duration = Duration::from_secs(10);
109
110pub struct ServerBuilder {
111    consensus: Option<Arc<dyn ConsensusDriver>>,
112    clock: Option<Arc<dyn Clock>>,
113    window_ahead: Duration,
114    failover_advance: Duration,
115    shutdown_grace: Duration,
116    heartbeat_interval: Duration,
117    max_seq_count: u32,
118    max_seq_batch_keys: u32,
119    #[cfg(any(feature = "tls-rustls", feature = "tls-native"))]
120    tls_config: Option<tonic::transport::ServerTlsConfig>,
121}
122
123impl Default for ServerBuilder {
124    fn default() -> Self {
125        ServerBuilder {
126            consensus: None,
127            clock: None,
128            window_ahead: Duration::from_secs(3),
129            failover_advance: Duration::from_secs(1),
130            shutdown_grace: DEFAULT_SHUTDOWN_GRACE,
131            heartbeat_interval: Duration::from_secs(10),
132            max_seq_count: tsoracle_core::DEFAULT_MAX_SEQ_COUNT,
133            max_seq_batch_keys: tsoracle_core::DEFAULT_MAX_SEQ_BATCH_KEYS,
134            #[cfg(any(feature = "tls-rustls", feature = "tls-native"))]
135            tls_config: None,
136        }
137    }
138}
139
140impl ServerBuilder {
141    pub fn consensus_driver(mut self, driver: Arc<dyn ConsensusDriver>) -> Self {
142        self.consensus = Some(driver);
143        self
144    }
145    pub fn clock(mut self, clock: Arc<dyn Clock>) -> Self {
146        self.clock = Some(clock);
147        self
148    }
149    pub fn window_ahead(mut self, window_ahead: Duration) -> Self {
150        self.window_ahead = window_ahead;
151        self
152    }
153    pub fn failover_advance(mut self, failover_advance: Duration) -> Self {
154        self.failover_advance = failover_advance;
155        self
156    }
157
158    /// Bound on how long a graceful shutdown waits for the leader-watch task to
159    /// stop cooperatively before forcibly aborting it.
160    ///
161    /// On shutdown the server drops the watch task's cancel signal and waits for
162    /// it to publish `NotServing` and return. That wait is normally
163    /// near-instant, but the task observes cancellation only at its `select!`
164    /// boundaries — never inside a fence attempt. A consensus driver whose
165    /// `load_high_water` / `persist_high_water` never returns (the trait places
166    /// no latency bound) would otherwise park the task mid-fence and block
167    /// process exit indefinitely, leading to a SIGKILL on a Kubernetes drain.
168    /// Once `shutdown_grace` elapses the server aborts the task so exit always
169    /// makes progress. Set this comfortably below your deployment's
170    /// `terminationGracePeriodSeconds`. Defaults to 10s. A value of zero aborts
171    /// immediately without waiting for a cooperative stop.
172    pub fn shutdown_grace(mut self, shutdown_grace: Duration) -> Self {
173        self.shutdown_grace = shutdown_grace;
174        self
175    }
176
177    /// Interval between heartbeat log lines emitted at `target = "tsoracle::heartbeat"`.
178    /// Defaults to 10 seconds. Pass `Duration::ZERO` to disable the heartbeat task entirely.
179    ///
180    /// The heartbeat surfaces serving role, current epoch, requests served,
181    /// timestamps issued, and key error counters every interval — proof-of-life
182    /// for production deployments that may not have a metrics exporter installed.
183    ///
184    /// Requires `feature = "tracing"` to emit output; with `tracing` off the
185    /// setter is accepted but no task is spawned (no subscriber to log to).
186    pub fn heartbeat_interval(mut self, interval: Duration) -> Self {
187        self.heartbeat_interval = interval;
188        self
189    }
190
191    /// Configure TLS termination for this server. Applied inside
192    /// [`Server::serve`], [`Server::serve_with_shutdown`], and
193    /// [`Server::serve_with_listener`]. Not applied to [`Server::into_router`] —
194    /// embedders mounting tsoracle alongside their own services control TLS
195    /// on their own tonic builder.
196    #[cfg(any(feature = "tls-rustls", feature = "tls-native"))]
197    pub fn tls_config(mut self, cfg: tonic::transport::ServerTlsConfig) -> Self {
198        self.tls_config = Some(cfg);
199        self
200    }
201
202    /// Per-call ceiling on `GetSeq`'s `count` — the largest contiguous block a
203    /// single dense-sequence request may reserve. Defaults to
204    /// [`tsoracle_core::DEFAULT_MAX_SEQ_COUNT`] (`65_536`).
205    ///
206    /// This is a soft anti-abuse guardrail, not a representational limit: a
207    /// dense block is permanently consumed (the gapless counter only moves
208    /// forward), so the cap bounds how much one call can irrevocably reserve.
209    /// Raise it for batch-allocation workloads or lower it to tighten abuse
210    /// control. The server is the sole authority — clients no longer pre-check
211    /// `count` against a fixed constant, so changing this needs no client
212    /// rebuild; an over-cap request is rejected with `INVALID_ARGUMENT`. The
213    /// `count >= 1` floor is always enforced regardless of this value.
214    ///
215    /// A value of `0` is rejected by [`Self::build`] with
216    /// [`BuildError::ZeroMaxSeqCount`]: it would leave no valid `count` (the
217    /// floor is 1), silently disabling `GetSeq`, so the misconfiguration is
218    /// surfaced at build time rather than at first request.
219    pub fn max_seq_count(mut self, max_seq_count: u32) -> Self {
220        self.max_seq_count = max_seq_count;
221        self
222    }
223
224    /// Cap on the number of `(key, count)` entries accepted in one
225    /// `GetSeqBatch` request. Defaults to
226    /// [`tsoracle_core::DEFAULT_MAX_SEQ_BATCH_KEYS`] (`128`).
227    ///
228    /// This is a soft anti-abuse guardrail bounding fan-out and the size of
229    /// one atomic consensus entry. A value of `0` is rejected by
230    /// [`Self::build`] with [`BuildError::ZeroMaxSeqBatchKeys`]: it would
231    /// make every `GetSeqBatch` call fail, silently disabling the RPC, so the
232    /// misconfiguration is surfaced at build time. Deployment-specific tuning;
233    /// clients need no rebuild after changing this.
234    pub fn max_seq_batch_keys(mut self, max_seq_batch_keys: u32) -> Self {
235        self.max_seq_batch_keys = max_seq_batch_keys;
236        self
237    }
238
239    pub fn build(self) -> Result<Server, BuildError> {
240        let consensus = self.consensus.ok_or(BuildError::MissingConsensusDriver)?;
241        if self.max_seq_count == 0 {
242            return Err(BuildError::ZeroMaxSeqCount);
243        }
244        if self.max_seq_batch_keys == 0 {
245            return Err(BuildError::ZeroMaxSeqBatchKeys);
246        }
247        let clock = self.clock.unwrap_or_else(|| Arc::new(SystemClock));
248        Ok(Server {
249            consensus,
250            clock,
251            window_ahead: self.window_ahead,
252            failover_advance: self.failover_advance,
253            shutdown_grace: self.shutdown_grace,
254            heartbeat_interval: self.heartbeat_interval,
255            core: Arc::new(ServingCore::new(
256                self.window_ahead,
257                self.max_seq_count,
258                self.max_seq_batch_keys,
259            )),
260            reporter: Arc::new(crate::reporter::Reporter::new()),
261            #[cfg(any(feature = "tls-rustls", feature = "tls-native"))]
262            tls_config: self.tls_config,
263        })
264    }
265}
266
267pub struct Server {
268    pub(crate) consensus: Arc<dyn ConsensusDriver>,
269    pub(crate) clock: Arc<dyn Clock>,
270    pub(crate) window_ahead: Duration,
271    pub(crate) failover_advance: Duration,
272    /// Bound on the graceful-shutdown wait for the leader-watch task before a
273    /// forced abort. See [`ServerBuilder::shutdown_grace`].
274    pub(crate) shutdown_grace: Duration,
275    /// Interval between periodic heartbeat log lines. See [`ServerBuilder::heartbeat_interval`].
276    ///
277    /// The only reader is the `cfg(feature = "tracing")` spawn block in
278    /// `into_router_parts`; without `tracing` there is no subscriber to log to
279    /// and the spawn arm is compiled out, so the field is genuinely unread.
280    #[cfg_attr(not(feature = "tracing"), allow(dead_code))]
281    pub(crate) heartbeat_interval: Duration,
282    /// Owns the allocator, serving-state channel, and both extension locks, with
283    /// the lock-ordering and step-down invariants private behind its methods.
284    ///
285    /// Held behind an `Arc` so the leader-watch task, the gRPC service, and the
286    /// [`WatchGuard`] / [`serve_inner`] shutdown paths can all reach the same
287    /// core. The guard and the serve loop use their clone to close the serving
288    /// gate *synchronously* at shutdown, leaving the watch task's later
289    /// `step_down` a harmless idempotent repeat.
290    pub(crate) core: Arc<ServingCore>,
291    pub(crate) reporter: Arc<crate::reporter::Reporter>,
292    #[cfg(any(feature = "tls-rustls", feature = "tls-native"))]
293    pub(crate) tls_config: Option<tonic::transport::ServerTlsConfig>,
294}
295
296/// Raw parts produced by [`Server::into_router_parts`]: the gRPC `Routes`, the
297/// leader-watch task's cooperative-cancel sender (dropping it stops the task),
298/// the task's join handle, and the optional heartbeat task's cancel sender /
299/// join handle. [`Server::into_router`] wraps these into a [`WatchGuard`]; the
300/// `serve_*` methods consume them directly via [`serve_inner`].
301///
302/// The heartbeat fields are `None` when the heartbeat is disabled — either by
303/// `ServerBuilder::heartbeat_interval(Duration::ZERO)` or by building without
304/// the `tracing` feature (there is no subscriber to log to).
305pub(crate) struct RouterParts {
306    pub routes: Routes,
307    pub cancel_tx: tokio::sync::oneshot::Sender<()>,
308    pub watch_handle: tokio::task::JoinHandle<Result<(), ServerError>>,
309    pub heartbeat_cancel_tx: Option<tokio::sync::oneshot::Sender<()>>,
310    pub heartbeat_handle: Option<tokio::task::JoinHandle<()>>,
311}
312
313impl Server {
314    pub fn builder() -> ServerBuilder {
315        ServerBuilder::default()
316    }
317
318    /// Subscribe to serving-state transitions.
319    ///
320    /// Returns a fresh `watch::Receiver` observing the same `ServingState`
321    /// the server publishes as leadership comes and goes. Embedders use this
322    /// to gate their own startup on `ServingState::Serving` (see the
323    /// `embedded_router` and piggyback examples). Because `into_router`
324    /// consumes the `Server`, capture the receiver before mounting.
325    ///
326    /// This method is the stable observation API: the receiver is minted from
327    /// the server's `watch::Sender`, so the receiver's type can evolve (e.g. a
328    /// future newtype around `ServingState`) without breaking embedders that
329    /// go through it.
330    pub fn subscribe(&self) -> watch::Receiver<ServingState> {
331        self.core.subscribe()
332    }
333}
334
335impl Server {
336    /// Return the configured `TsoServiceServer<TsoServiceImpl>` as a tonic
337    /// `Routes` value plus a [`WatchGuard`] for the spawned leader-watch task,
338    /// so callers can mount tsoracle's service alongside their own services
339    /// on a shared tonic listener instead of binding a dedicated port.
340    ///
341    /// The returned [`WatchGuard`] owns the leader-watch task. **Keep it alive
342    /// for as long as the mounted `Routes` should serve**: the watch task holds
343    /// an `Arc<Server>` (and the consensus driver) and maintains serving state
344    /// across leadership transitions. Dropping the guard — or calling
345    /// [`WatchGuard::shutdown`] — cooperatively stops the task at the embedder's
346    /// own shutdown. Without the guard the task would keep `Arc<Server>` alive
347    /// until the leadership stream happened to close.
348    ///
349    /// Every termination of the task — cooperative cancellation, driver error,
350    /// panic, or clean EOF on the leadership stream (surfaced as
351    /// `ServerError::WatchStreamClosed`) — publishes
352    /// `ServingState::NotServing { leader_endpoint: None }` before returning, so
353    /// all subsequent RPCs fail fast with `FAILED_PRECONDITION`. Embedders who
354    /// drop the guard without awaiting still get fail-safe behavior.
355    ///
356    /// The `Server::serve()` method is a thin wrapper over this — it calls
357    /// `into_router`, builds a tonic `Server`, and binds a listener.
358    ///
359    /// Returns `Err(ServerError::ReflectionInit)` (only reachable under the
360    /// `reflection` feature) if the embedded descriptor set fails to decode.
361    /// That decode happens before the leader-watch task is spawned, so a failure
362    /// leaves nothing running to clean up.
363    pub fn into_router(self) -> Result<(Routes, WatchGuard), ServerError> {
364        // Read the `Copy` grace before `into_router_parts` consumes `self`, so
365        // the returned guard can bound its own shutdown wait identically to the
366        // `serve_*` paths.
367        let shutdown_grace = self.shutdown_grace;
368        // Clone the shared core and reporter before `into_router_parts` consumes
369        // `self`, so the guard can close the serving gate synchronously on drop /
370        // shutdown rather than relying on the watch task's later publish, and can
371        // record the shutdown_watch_aborted counter if the grace-bounded reap fires.
372        let core = self.core.clone();
373        let reporter = self.reporter.clone();
374        let parts = self.into_router_parts()?;
375        Ok((
376            parts.routes,
377            WatchGuard {
378                cancel_tx: Some(parts.cancel_tx),
379                handle: Some(parts.watch_handle),
380                shutdown_grace,
381                core,
382                reporter,
383                heartbeat_cancel_tx: parts.heartbeat_cancel_tx,
384                heartbeat_handle: parts.heartbeat_handle,
385            },
386        ))
387    }
388
389    /// Spawn the leader-watch task and assemble the gRPC `Routes`, returning
390    /// the raw parts: the routes, the task's cooperative-cancel sender, and its
391    /// `JoinHandle`. [`Self::into_router`] wraps these into a [`WatchGuard`] for
392    /// embedders; the `serve_*` methods drive the parts directly via
393    /// [`serve_inner`], so neither path needs to unwrap the guard's `Option`
394    /// fields.
395    fn into_router_parts(self) -> Result<RouterParts, ServerError> {
396        // Build the reflection service first: a descriptor-decode failure must
397        // surface before we spawn the leader-watch task below, so an error path
398        // never leaks a running task.
399        #[cfg(feature = "reflection")]
400        let reflection = build_reflection_service(tsoracle_proto::FILE_DESCRIPTOR_SET)?;
401
402        let server = Arc::new(self);
403
404        // Cooperative cancellation channel. The `WatchGuard` holds the sender;
405        // the task's `cancel` future resolves on either an explicit send or a
406        // sender drop, so dropping the guard is sufficient to stop the task.
407        let (cancel_tx, cancel_rx) = tokio::sync::oneshot::channel::<()>();
408
409        let watch_server = server.clone();
410        let watch_handle = tokio::spawn(async move {
411            use futures::FutureExt;
412            // Resolves when the WatchGuard signals cancellation or is dropped.
413            let cancel = async move {
414                let _ = cancel_rx.await;
415            };
416            // catch_unwind so a panic in run_leader_watch still routes through
417            // the poisoning path. Without this, embedders who mount into_router
418            // directly and never observe the guard would see
419            // ServingState::Serving remain published while the watch task is
420            // dead — the inverse of the fail-safe guarantee documented above.
421            // The panic is re-raised after poisoning so serve / serve_with_*
422            // continue to translate it into ServerError::WatchPanic via
423            // join_to_server_result.
424            let outcome = std::panic::AssertUnwindSafe(crate::fence::run_leader_watch(
425                watch_server.clone(),
426                cancel,
427            ))
428            .catch_unwind()
429            .await;
430            match outcome {
431                Ok(result) => {
432                    if let Err(ref _e) = result {
433                        // Poison BEFORE returning so embedders who do not observe
434                        // the guard still get fail-safe behavior.
435                        watch_server.core.step_down(None, None);
436                        #[cfg(feature = "tracing")]
437                        tracing::error!(error = %_e, "leader-watch terminated; serving disabled");
438                    }
439                    result
440                }
441                Err(panic_payload) => {
442                    // Mirror the Err branch: poison BEFORE re-raising so
443                    // guard-dropping embedders still observe NotServing.
444                    watch_server.core.step_down(None, None);
445                    #[cfg(feature = "tracing")]
446                    tracing::error!("leader-watch panicked; serving disabled");
447                    std::panic::resume_unwind(panic_payload);
448                }
449            }
450        });
451
452        // Spawn the heartbeat task, if enabled. Gated on `feature = "tracing"`
453        // because the heartbeat module is only compiled with `tracing`
454        // (no subscriber to emit to without it) — and on a non-zero interval,
455        // since `Duration::ZERO` is the documented opt-out.
456        //
457        // The task body is wrapped in `AssertUnwindSafe(...).catch_unwind()`
458        // mirroring the leader-watch spawn above: on panic we bump the
459        // `heartbeat_task_panicked` counter and log at error level, then let
460        // the task end (no restart — the heartbeat is observability, not
461        // correctness, so a panicked task must not be allowed to thrash).
462        let (heartbeat_cancel_tx, heartbeat_handle) = {
463            #[cfg(feature = "tracing")]
464            {
465                if server.heartbeat_interval.is_zero() {
466                    (None, None)
467                } else {
468                    use futures::FutureExt;
469                    let (htx, hrx) = tokio::sync::oneshot::channel::<()>();
470                    let hb_reporter = server.reporter.clone();
471                    let hb_core = server.core.clone();
472                    let hb_interval = server.heartbeat_interval;
473                    let handle = tokio::spawn(async move {
474                        let outcome =
475                            std::panic::AssertUnwindSafe(crate::heartbeat::run_heartbeat(
476                                hb_interval,
477                                hb_core,
478                                hb_reporter.clone(),
479                                hrx,
480                            ))
481                            .catch_unwind()
482                            .await;
483                        if outcome.is_err() {
484                            hb_reporter.heartbeat_task_panicked.increment(1);
485                            tracing::error!(
486                                target: "tsoracle::heartbeat",
487                                "heartbeat task panicked; liveness logs disabled until restart"
488                            );
489                        }
490                    });
491                    (Some(htx), Some(handle))
492                }
493            }
494            #[cfg(not(feature = "tracing"))]
495            {
496                (None, None)
497            }
498        };
499
500        let service = TsoServiceImpl { server };
501        #[allow(unused_mut)]
502        let mut routes = Routes::new(TsoServiceServer::new(service));
503        #[cfg(feature = "reflection")]
504        {
505            routes = routes.add_service(reflection);
506        }
507        Ok(RouterParts {
508            routes,
509            cancel_tx,
510            watch_handle,
511            heartbeat_cancel_tx,
512            heartbeat_handle,
513        })
514    }
515
516    pub async fn serve(self, addr: SocketAddr) -> Result<(), ServerError> {
517        self.serve_with_shutdown(addr, futures::future::pending())
518            .await
519    }
520
521    /// Run the gRPC server until either the caller's `shutdown` fires or the
522    /// leader-watch task terminates.
523    ///
524    /// Three outcomes:
525    /// 1. `shutdown` fires first → tonic drains in-flights and returns Ok.
526    ///    The watch task is then stopped cooperatively, bounded by
527    ///    `shutdown_grace` and forcibly aborted if it overruns (e.g. parked in a
528    ///    wedged consensus-driver call); any error it had been about to return
529    ///    is forfeited (the process is shutting down anyway).
530    /// 2. Watch returns `Ok(Err(e))` → poisoned state is already published;
531    ///    `cancel_tx` triggers tonic's graceful shutdown; in-flight `GetTs`
532    ///    calls whose `try_grant` already succeeded complete with the
533    ///    timestamps they were allocated; new calls fail fast. Returns `Err(e)`
534    ///    — the watch error wins even if the drain itself also errors (see
535    ///    `combine_watch_and_drain`); a drain error is surfaced only when the
536    ///    watch ended cleanly.
537    /// 3. Watch task panics → returns `Err(ServerError::WatchPanic{..})`
538    ///    with the panic payload stringified. Same drain semantics as (2).
539    pub async fn serve_with_shutdown(
540        self,
541        addr: SocketAddr,
542        shutdown: impl Future<Output = ()> + Send + 'static,
543    ) -> Result<(), ServerError> {
544        #[cfg(any(feature = "tls-rustls", feature = "tls-native"))]
545        let tls_config = self.tls_config.clone();
546
547        // Read the `Copy` grace and clone the shared core and reporter before
548        // `into_router_parts` consumes `self`.
549        let shutdown_grace = self.shutdown_grace;
550        let core = self.core.clone();
551        let reporter = self.reporter.clone();
552        let parts = self.into_router_parts()?;
553        let (combined_shutdown, cancel_tx) = combined_shutdown_with_cancel(shutdown);
554
555        let mut tonic = TonicServer::builder();
556        #[cfg(any(feature = "tls-rustls", feature = "tls-native"))]
557        if let Some(cfg) = tls_config {
558            tonic = tonic.tls_config(cfg).map_err(ServerError::Transport)?;
559        }
560        let serve = tonic
561            .add_routes(parts.routes)
562            .serve_with_shutdown(addr, combined_shutdown);
563
564        serve_inner(
565            parts.cancel_tx,
566            parts.watch_handle,
567            parts.heartbeat_cancel_tx,
568            parts.heartbeat_handle,
569            serve,
570            cancel_tx,
571            shutdown_grace,
572            core,
573            reporter,
574        )
575        .await
576    }
577
578    /// Run the gRPC server on a caller-provided `TcpListener` until either
579    /// the caller-provided `shutdown` fires or the leader-watch task terminates.
580    ///
581    /// Use this instead of [`Self::serve_with_shutdown`] when you need to
582    /// observe the OS-picked port (`127.0.0.1:0`) before clients connect, or
583    /// when you want to wrap the listener in an outer adapter before passing it
584    /// in. The listening socket is owned by the caller and passed here; tsoracle
585    /// starts accepting on it immediately.
586    ///
587    /// Three outcomes:
588    /// 1. `shutdown` fires first → tonic drains in-flights and returns `Ok`.
589    ///    The watch handle is aborted; any error it had been about to return
590    ///    is forfeited (the process is shutting down anyway).
591    /// 2. Watch returns `Ok(Err(e))` → poisoned state is already published;
592    ///    the caller-provided shutdown is cancelled internally so tonic begins
593    ///    graceful shutdown; in-flight `GetTs` calls whose `try_grant` already
594    ///    succeeded complete with the timestamps they were allocated; new calls
595    ///    fail fast. Returns `Err(e)` — the watch error wins even if the drain
596    ///    itself also errors (see `combine_watch_and_drain`); a drain error is
597    ///    surfaced only when the watch ended cleanly.
598    /// 3. Watch task panics → returns `Err(ServerError::WatchPanic{..})`
599    ///    with the panic payload stringified. Same drain semantics as (2).
600    pub async fn serve_with_listener(
601        self,
602        listener: tokio::net::TcpListener,
603        shutdown: impl Future<Output = ()> + Send + 'static,
604    ) -> Result<(), ServerError> {
605        #[cfg(any(feature = "tls-rustls", feature = "tls-native"))]
606        let tls_config = self.tls_config.clone();
607
608        // Read the `Copy` grace and clone the shared core and reporter before
609        // `into_router_parts` consumes `self`.
610        let shutdown_grace = self.shutdown_grace;
611        let core = self.core.clone();
612        let reporter = self.reporter.clone();
613        let parts = self.into_router_parts()?;
614        let (combined_shutdown, cancel_tx) = combined_shutdown_with_cancel(shutdown);
615
616        let incoming = tonic::transport::server::TcpIncoming::from(listener);
617
618        let mut tonic = TonicServer::builder();
619        #[cfg(any(feature = "tls-rustls", feature = "tls-native"))]
620        if let Some(cfg) = tls_config {
621            tonic = tonic.tls_config(cfg).map_err(ServerError::Transport)?;
622        }
623        let serve = tonic
624            .add_routes(parts.routes)
625            .serve_with_incoming_shutdown(incoming, combined_shutdown);
626
627        serve_inner(
628            parts.cancel_tx,
629            parts.watch_handle,
630            parts.heartbeat_cancel_tx,
631            parts.heartbeat_handle,
632            serve,
633            cancel_tx,
634            shutdown_grace,
635            core,
636            reporter,
637        )
638        .await
639    }
640}
641
642/// RAII handle to the leader-watch task spawned by [`Server::into_router`].
643///
644/// The watch task holds an `Arc<Server>` (and thus the consensus driver) and
645/// maintains serving state across leadership transitions. This guard ties the
646/// task's lifetime to the guard's: dropping it cooperatively cancels the task,
647/// and the task publishes [`ServingState::NotServing`] before it stops, so any
648/// `Routes` an embedder still has mounted fails subsequent RPCs fast.
649///
650/// Cancellation is cooperative — the task stops at its next await boundary and
651/// never mid-fence, so it is never torn down while holding internal locks, in
652/// contrast to a raw [`tokio::task::JoinHandle::abort`].
653pub struct WatchGuard {
654    // `Option` so `Drop` and the consuming `shutdown` / `abort` methods can
655    // each take a field without a partial-move conflict against the `Drop`
656    // impl. Dropping the sender (rather than sending) is itself the cancel
657    // signal: the task's `cancel` future resolves on sender-drop too.
658    cancel_tx: Option<tokio::sync::oneshot::Sender<()>>,
659    handle: Option<tokio::task::JoinHandle<Result<(), ServerError>>>,
660    /// Bound on the cooperative-stop wait in [`Self::shutdown`] before a forced
661    /// abort. Inherited from [`ServerBuilder::shutdown_grace`].
662    shutdown_grace: Duration,
663    /// Shared serving core, cloned from the `Server`. Lets `Drop` (and the
664    /// consuming `shutdown` / `abort`, which trigger `Drop` on return) close the
665    /// serving gate synchronously at the drop site, instead of waiting for the
666    /// watch task to observe cancellation and publish `NotServing` on its own
667    /// timeline — a window during which the fast gate would still admit RPCs.
668    core: Arc<ServingCore>,
669    /// Metrics reporter, cloned from the `Server`. Used to record the
670    /// `shutdown_watch_aborted` counter if the grace-bounded reap fires.
671    reporter: Arc<crate::reporter::Reporter>,
672    /// Cooperative-cancel sender for the heartbeat task. `None` when the
673    /// heartbeat is disabled (interval == 0 or built without `tracing`).
674    /// Dropping the sender resolves the task's cancel future.
675    heartbeat_cancel_tx: Option<tokio::sync::oneshot::Sender<()>>,
676    /// Join handle for the heartbeat task. `None` when the heartbeat is
677    /// disabled. Output is `()` because the task never returns an error —
678    /// panics are caught inside the task body and recorded via the
679    /// `heartbeat_task_panicked` counter.
680    heartbeat_handle: Option<tokio::task::JoinHandle<()>>,
681}
682
683impl WatchGuard {
684    /// Signal the leader-watch task to stop, wait for it to drain, and report
685    /// its outcome.
686    ///
687    /// A cooperatively cancelled task returns `Ok(())` — the stop was
688    /// requested, so it is not an error. If the task had already terminated on
689    /// its own (driver error, stream EOF, or panic) the original outcome is
690    /// surfaced verbatim: `Err(e)` or [`ServerError::WatchPanic`]. Either way
691    /// serving state is `NotServing` once this returns.
692    ///
693    /// The cooperative wait is bounded by the configured
694    /// [`ServerBuilder::shutdown_grace`]: if the task is parked in a
695    /// consensus-driver call that never returns it is aborted once the grace
696    /// elapses (still reported as `Ok(())`), so an embedder's shutdown can never
697    /// wedge behind a hung driver.
698    pub async fn shutdown(mut self) -> Result<(), ServerError> {
699        // Dropping the senders fires each task's cancel future. The heartbeat
700        // task is reaped first because it is bounded by `tokio::time::sleep`
701        // (cooperative stop is fast and never wedges on a driver call), so its
702        // reap returns quickly and leaves the grace budget for the watch task
703        // — which may be parked in a wedged consensus-driver call.
704        self.heartbeat_cancel_tx.take();
705        self.cancel_tx.take();
706        if let Some(mut hb_handle) = self.heartbeat_handle.take() {
707            match tokio::time::timeout(self.shutdown_grace, &mut hb_handle).await {
708                Ok(Ok(())) => {}
709                // Task panicked — already counted + logged via catch_unwind in
710                // the task body. Nothing more to do here.
711                Ok(Err(_join_err)) => {}
712                // Grace overrun — sleep + select! should always observe a
713                // dropped cancel sender, so this is a backstop. Abort and
714                // reap; no separate metric (the heartbeat is observability
715                // only — its lateness is not a serving correctness signal).
716                Err(_elapsed) => {
717                    hb_handle.abort();
718                    let _ = (&mut hb_handle).await;
719                }
720            }
721        }
722        match self.handle.take() {
723            Some(mut handle) => join_to_server_result(
724                await_watch_within_grace(&mut handle, self.shutdown_grace, &self.reporter).await,
725            ),
726            None => Ok(()),
727        }
728    }
729
730    /// Hard-abort the leader-watch task without waiting for a cooperative stop.
731    ///
732    /// Prefer [`Self::shutdown`] or simply dropping the guard; both let the
733    /// task stop at a safe point. This is an escape hatch for callers that
734    /// cannot await and accept that the task may be torn down mid-fence.
735    pub fn abort(mut self) {
736        if let Some(handle) = self.handle.take() {
737            handle.abort();
738        }
739        // Hard-abort the heartbeat task too — leaving it running after the
740        // watch is torn down would publish heartbeats describing a stale
741        // (typically `NotServing`) view until the Arc<Reporter> is dropped.
742        if let Some(hb_handle) = self.heartbeat_handle.take() {
743            hb_handle.abort();
744        }
745    }
746
747    /// Whether the leader-watch task has finished — terminated for any reason
748    /// (cooperative cancel, driver error, stream EOF, or panic).
749    ///
750    /// A read-only liveness probe that neither consumes the guard nor disturbs
751    /// its cancel-on-drop behavior, so an embedder can poll task health while
752    /// keeping the guard alive.
753    pub fn is_finished(&self) -> bool {
754        self.handle
755            .as_ref()
756            .is_none_or(|handle| handle.is_finished())
757    }
758}
759
760impl Drop for WatchGuard {
761    fn drop(&mut self) {
762        // Close the serving gate synchronously, here at the drop site. Dropping
763        // the cancel sender below only *requests* the watch task to stop; the
764        // task publishes `NotServing` later, on its own timeline. Between this
765        // drop and the task's next poll the fast gate would still read `Serving`
766        // (and the allocator would still grant), so an RPC could be admitted by a
767        // server that has already been told to stop serving. `step_down` clears
768        // the allocator and publishes `NotServing` now, before any await; the
769        // watch task's later `step_down(None, None)` on cooperative cancel (see
770        // `fence::run_leader_watch`) republishes the identical state, so the
771        // double-close is harmless and idempotent.
772        self.core.step_down(None, None);
773        // Dropping the sender (if `shutdown` / `abort` did not already take it)
774        // resolves the task's cancel future; the task then publishes
775        // `NotServing` and returns. The `JoinHandle` is dropped here too,
776        // detaching the task to finish its cooperative shutdown on its own.
777        self.cancel_tx.take();
778        // Same treatment for the heartbeat task. `Drop` is sync so we cannot
779        // await the cooperative stop; instead we drop the cancel sender (the
780        // task will observe it at its next select! boundary) and hard-abort
781        // the handle so it cannot outlive the guard and publish heartbeats
782        // describing a stale view if the runtime keeps the Arc alive.
783        self.heartbeat_cancel_tx.take();
784        if let Some(hb_handle) = self.heartbeat_handle.take() {
785            hb_handle.abort();
786        }
787    }
788}
789
790/// Merge the caller's `shutdown` future with an internal cancellation signal.
791///
792/// Both [`Server::serve_with_shutdown`] and [`Server::serve_with_listener`]
793/// need tonic to stop when EITHER the caller's `shutdown` fires OR the
794/// leader-watch task terminates (signalled by firing the returned
795/// `oneshot::Sender`). This builds that merged shutdown future and hands back
796/// the sender so [`serve_inner`] can trip it from the watch arm.
797fn combined_shutdown_with_cancel(
798    shutdown: impl Future<Output = ()> + Send + 'static,
799) -> (
800    impl Future<Output = ()> + Send + 'static,
801    tokio::sync::oneshot::Sender<()>,
802) {
803    let (cancel_tx, cancel_rx) = tokio::sync::oneshot::channel::<()>();
804    let combined_shutdown = async move {
805        tokio::select! {
806            _ = shutdown => {}
807            _ = cancel_rx => {}
808        }
809    };
810    (combined_shutdown, cancel_tx)
811}
812
813/// Wait for the leader-watch task to stop cooperatively, but no longer than
814/// `grace`, then forcibly abort it if it is still running.
815///
816/// The watch task observes its cancel signal only at the `select!` boundaries
817/// in [`crate::fence::run_leader_watch`], never inside a fence attempt. A
818/// consensus driver whose `load_high_water` / `persist_high_water` never
819/// returns (the trait places no latency bound; see
820/// [`tsoracle_consensus::ConsensusDriver`]) therefore parks the task upstream
821/// of any cancel-observing await, so dropping the cancel sender cannot stop it.
822/// Left unbounded, the shutdown wait would block process exit until the kubelet
823/// escalates to SIGKILL on a drain. Bounding the wait by `grace` and aborting
824/// on expiry guarantees forward progress: `tokio` tears a suspended task (and
825/// the wedged driver future it holds) down at the abort, dropping its
826/// drain-barrier guard.
827///
828/// Returns the task's join result. A clean cooperative stop forwards its real
829/// outcome verbatim; an aborted task surfaces as a cancelled `JoinError`, which
830/// [`join_to_server_result`] maps to `Ok(())` — the stop was requested, so a
831/// forced abort during shutdown is not an error. A `grace` of zero aborts
832/// immediately (the `timeout` future is already elapsed on first poll).
833async fn await_watch_within_grace(
834    watch_handle: &mut tokio::task::JoinHandle<Result<(), ServerError>>,
835    grace: Duration,
836    reporter: &Arc<crate::reporter::Reporter>,
837) -> Result<Result<(), ServerError>, tokio::task::JoinError> {
838    match tokio::time::timeout(grace, &mut *watch_handle).await {
839        Ok(join_result) => join_result,
840        Err(_elapsed) => {
841            reporter.shutdown_watch_aborted.increment(1);
842            #[cfg(feature = "tracing")]
843            tracing::warn!(
844                grace_ms = grace.as_millis() as u64,
845                "leader-watch task did not stop within the shutdown grace; aborting it (a consensus-driver call likely exceeded its latency bound)"
846            );
847            watch_handle.abort();
848            // Reap the aborted task so its Drop (releasing any held drain-barrier
849            // guard) runs before we report shutdown complete. Bounded: an aborted
850            // task resolves at its next poll.
851            (&mut *watch_handle).await
852        }
853    }
854}
855
856/// Drive the gRPC `serve_future` against the leader-watch task, shared by
857/// [`Server::serve_with_shutdown`] and [`Server::serve_with_listener`].
858///
859/// The two public methods differ only in how `serve_future` is assembled
860/// (address-bound via `serve_with_shutdown` vs listener-bound via
861/// `serve_with_incoming_shutdown`); everything downstream — the biased select,
862/// the cooperative-cancel path, and the drain/translate logic — is identical
863/// and lives here so a future change need only be made once.
864///
865/// `tonic_cancel_tx` is the cancellation half paired with the `serve_future`'s
866/// shutdown signal (see [`combined_shutdown_with_cancel`]); firing it begins
867/// tonic's graceful drain when the watch task terminates first. `watch_cancel_tx`
868/// is the leader-watch task's own cooperative-cancel sender (the same one a
869/// [`WatchGuard`] holds for embedders); dropping it stops the task. Taking the
870/// raw parts rather than a `WatchGuard` keeps this path free of the guard's
871/// `Option` fields — neither the watch handle nor the cancel sender is optional
872/// here. `shutdown_grace` bounds the user-shutdown arm's wait for the watch task
873/// (see [`await_watch_within_grace`]). `core` is the shared serving core (the
874/// same one the watch task and the gRPC service hold): the user-shutdown arm
875/// closes the gate on it synchronously so no RPC is admitted in the window
876/// before the watch task observes cancellation and publishes `NotServing`.
877// Private serve helper. The wide signature is the cost of being the single
878// merge point for the two public `serve_*` paths and the leader-watch +
879// heartbeat task pair: bundling these into a struct just to placate clippy
880// would obscure the lifecycle (every parameter is consumed exactly once and
881// has no shared identity worth naming). Keep the arguments visible.
882#[allow(clippy::too_many_arguments)]
883async fn serve_inner<S>(
884    watch_cancel_tx: tokio::sync::oneshot::Sender<()>,
885    mut watch_handle: tokio::task::JoinHandle<Result<(), ServerError>>,
886    heartbeat_cancel_tx: Option<tokio::sync::oneshot::Sender<()>>,
887    heartbeat_handle: Option<tokio::task::JoinHandle<()>>,
888    serve_future: S,
889    tonic_cancel_tx: tokio::sync::oneshot::Sender<()>,
890    shutdown_grace: Duration,
891    core: Arc<ServingCore>,
892    reporter: Arc<crate::reporter::Reporter>,
893) -> Result<(), ServerError>
894where
895    S: Future<Output = Result<(), tonic::transport::Error>>,
896{
897    tokio::pin!(serve_future);
898
899    let outcome = tokio::select! {
900        // Bias toward the watch arm: if both are ready in the same poll
901        // (rare but possible — graceful shutdown completed in the same
902        // tick the watch returned), we want to surface the watch error
903        // rather than report a clean shutdown.
904        biased;
905
906        watch_result = &mut watch_handle => {
907            // Watch terminated. State is already poisoned (see watch
908            // task body in into_router). Trigger tonic drain, wait for
909            // it to finish, then report the watch's outcome — preferring
910            // it over any drain error, which surfaces only if the watch
911            // itself ended cleanly.
912            let _ = tonic_cancel_tx.send(());
913            let drain_result = serve_future.await;
914            combine_watch_and_drain(watch_result, drain_result)
915        }
916        serve_result = &mut serve_future => {
917            // User shutdown fired (or our cancel — but watch arm has
918            // `biased` priority, so reaching here means user shutdown).
919            // Prefer a cooperative stop: dropping the cancel sender resolves
920            // the task's cancel future so it stops at its next `select!`
921            // boundary, having published `NotServing` and never torn down
922            // mid-fence while holding `extension_gate.write()`. But a
923            // cooperative stop is only observed at those boundaries, never
924            // inside a fence attempt — a consensus-driver call that never
925            // returns (the trait places no latency bound) would park the task
926            // upstream of any cancel point and block process exit until a
927            // kubelet SIGKILL. `await_watch_within_grace` therefore bounds the
928            // wait by `shutdown_grace` and aborts the task if it overruns. The
929            // task's own outcome (a clean `Ok(())` on cooperative stop, a
930            // cancelled `JoinError` on abort) is discarded; the user-requested
931            // shutdown result wins.
932            //
933            // Close the serving gate synchronously first: dropping the sender
934            // only *requests* the stop, and a task aborted on grace expiry (or
935            // simply not yet rescheduled) may never reach its own `step_down`. So
936            // a `GetTs` arriving during the drain would still be admitted unless
937            // we close the gate here. `step_down` is idempotent with the watch
938            // task's own cooperative-cancel publish.
939            core.step_down(None, None);
940            drop(watch_cancel_tx);
941            let _ = await_watch_within_grace(&mut watch_handle, shutdown_grace, &reporter).await;
942            serve_result?;
943            Ok(())
944        }
945    };
946
947    // Stop the heartbeat task on every exit path. Done after the watch reap so
948    // the watch-arm `combine_watch_and_drain` already saw the watch outcome,
949    // and the user-shutdown arm has finished its grace-bounded wait. Dropping
950    // the cancel sender breaks the task's `tokio::select! { biased; cancel,
951    // sleep }` loop on the next poll; if the task is wedged for any reason we
952    // hard-abort on grace overrun. The task's outcome is observability only
953    // and cannot influence serving correctness, so its join result is dropped.
954    drop(heartbeat_cancel_tx);
955    if let Some(mut hb_handle) = heartbeat_handle {
956        match tokio::time::timeout(shutdown_grace, &mut hb_handle).await {
957            Ok(Ok(())) => {}
958            Ok(Err(_join_err)) => {} // panic — already counted via catch_unwind
959            Err(_elapsed) => {
960                hb_handle.abort();
961                let _ = (&mut hb_handle).await;
962            }
963        }
964    }
965
966    outcome
967}
968
969/// Convert a `JoinHandle` result into a `ServerError`-typed result.
970///
971/// - `Ok(Ok(()))` — cooperative cancellation: `run_leader_watch` observed its
972///   cancel signal (the `WatchGuard` was dropped, `WatchGuard::shutdown` was
973///   called, or `serve_inner` cancelled it on user shutdown), published
974///   `NotServing`, and returned cleanly. Forwarded verbatim as `Ok(())`.
975/// - `Ok(Err(e))` — task returned an error (including `WatchStreamClosed`
976///   from a clean EOF). Forward verbatim.
977/// - `Err(JoinError)` — task was aborted or panicked. An abort
978///   (`WatchGuard::abort` or `JoinHandle::abort`) maps to Ok (we asked for it);
979///   a panic maps to `WatchPanic` with payload.
980fn join_to_server_result(
981    join_result: Result<Result<(), ServerError>, tokio::task::JoinError>,
982) -> Result<(), ServerError> {
983    match join_result {
984        Ok(inner) => inner,
985        Err(join_err) if join_err.is_panic() => {
986            let payload = panic_payload_to_string(join_err.into_panic());
987            Err(ServerError::WatchPanic {
988                payload,
989                bt: Bt::capture(),
990            })
991        }
992        Err(_cancelled) => Ok(()),
993    }
994}
995
996/// Combine the leader-watch outcome with the tonic graceful-drain outcome
997/// after the watch arm fired.
998///
999/// When the watch task terminates first we trigger the drain and then must
1000/// report a single result. The watch error is the root cause — poisoned
1001/// serving state was already published before the task returned — so it wins
1002/// when both fail. A drain error (port stolen, resource exhaustion) is only
1003/// surfaced when the watch outcome is otherwise `Ok`; previously it was
1004/// discarded via `let _ = serve.await`, hiding a failed drain behind a clean
1005/// shutdown report.
1006///
1007/// Generic over the drain error so the precedence logic is unit-testable
1008/// without fabricating a `tonic::transport::Error` (which has no public
1009/// constructor): production passes `Result<(), tonic::transport::Error>`,
1010/// tests pass `Result<(), ServerError>` via the reflexive `From` impl.
1011fn combine_watch_and_drain<E>(
1012    watch_result: Result<Result<(), ServerError>, tokio::task::JoinError>,
1013    drain_result: Result<(), E>,
1014) -> Result<(), ServerError>
1015where
1016    ServerError: From<E>,
1017{
1018    match join_to_server_result(watch_result) {
1019        Err(watch_err) => Err(watch_err),
1020        Ok(()) => drain_result.map_err(ServerError::from),
1021    }
1022}
1023
1024/// Build the gRPC reflection service from an encoded protobuf descriptor set.
1025///
1026/// Factored out of [`Server::into_router`] so the decode-failure path is unit
1027/// testable: production passes [`tsoracle_proto::FILE_DESCRIPTOR_SET`], while
1028/// tests can feed deliberately corrupt bytes to exercise the error mapping.
1029/// A decode failure becomes [`ServerError::ReflectionInit`] rather than a panic.
1030#[cfg(feature = "reflection")]
1031fn build_reflection_service(
1032    descriptor_set: &[u8],
1033) -> Result<
1034    tonic_reflection::server::v1::ServerReflectionServer<
1035        impl tonic_reflection::server::v1::ServerReflection,
1036    >,
1037    ServerError,
1038> {
1039    tonic_reflection::server::Builder::configure()
1040        .register_encoded_file_descriptor_set(descriptor_set)
1041        .build_v1()
1042        .map_err(ServerError::ReflectionInit)
1043}
1044
1045fn panic_payload_to_string(panic: Box<dyn std::any::Any + Send>) -> String {
1046    if let Some(text) = panic.downcast_ref::<&'static str>() {
1047        (*text).to_string()
1048    } else if let Some(text) = panic.downcast_ref::<String>() {
1049        text.clone()
1050    } else {
1051        "watch task panicked with non-string payload".to_string()
1052    }
1053}
1054
1055#[cfg(any(test, feature = "test-fakes"))]
1056impl Server {
1057    /// Test-only entry point for the leader-watch loop. Exposed to integration
1058    /// tests via the `test-fakes` feature; not part of the stable public API.
1059    #[doc(hidden)]
1060    pub async fn run_leader_watch_for_tests(self: Arc<Self>) -> Result<(), ServerError> {
1061        // A never-resolving cancel future: these tests drive termination via
1062        // leadership events or `JoinHandle::abort`, not cooperative cancel.
1063        crate::fence::run_leader_watch(self, futures::future::pending()).await
1064    }
1065
1066    /// Test-only allocator probe. Issues a window grant against the current
1067    /// in-memory state without going through the gRPC service. Used by
1068    /// regression tests that need to observe the behavioral fence (no
1069    /// timestamp at or below the prior leader's high-water) directly.
1070    #[doc(hidden)]
1071    pub fn try_grant_for_tests(&self, count: u32) -> Result<WindowGrant, CoreError> {
1072        self.core.try_grant(self.clock.now_ms(), count)
1073    }
1074}
1075
1076#[cfg(test)]
1077mod tests {
1078    use super::*;
1079
1080    #[test]
1081    fn panic_payload_to_string_recovers_static_str() {
1082        // `panic!("literal")` produces a `&'static str` payload; we want the
1083        // verbatim text so operators see what the watch task said.
1084        let payload: Box<dyn std::any::Any + Send> = Box::new("watch boom");
1085        assert_eq!(panic_payload_to_string(payload), "watch boom");
1086    }
1087
1088    #[test]
1089    fn panic_payload_to_string_recovers_owned_string() {
1090        // `panic!("{var}")` produces a `String` payload (formatted at panic
1091        // time); the helper must downcast that branch too.
1092        let payload: Box<dyn std::any::Any + Send> = Box::new(String::from("formatted"));
1093        assert_eq!(panic_payload_to_string(payload), "formatted");
1094    }
1095
1096    #[test]
1097    fn panic_payload_to_string_falls_back_for_other_types() {
1098        // Custom payloads (panic!(MyType { .. })) hit the catch-all branch.
1099        struct Custom;
1100        let payload: Box<dyn std::any::Any + Send> = Box::new(Custom);
1101        assert_eq!(
1102            panic_payload_to_string(payload),
1103            "watch task panicked with non-string payload",
1104        );
1105    }
1106
1107    #[test]
1108    fn serving_transitions_publish_through_core() {
1109        // The Server delegates serving-state transitions to its ServingCore; a
1110        // step_down on a freshly built Server lands as NotServing with the hint.
1111        // (The #346 send_replace-with-zero-receivers regression is pinned by the
1112        // ServingCore unit tests, which can observe `receiver_count` directly.)
1113        let server = Server::builder()
1114            .consensus_driver(Arc::new(crate::test_fakes::InMemoryDriver::new()))
1115            .build()
1116            .expect("build must succeed");
1117
1118        let hint = PeerEndpoint::try_from("new-leader:9000").unwrap();
1119        server.core.step_down(Some(hint.clone()), Some(Epoch(7)));
1120
1121        match server.core.serving_state() {
1122            ServingState::NotServing {
1123                leader_endpoint,
1124                leader_epoch,
1125            } => {
1126                assert_eq!(leader_endpoint, Some(hint));
1127                assert_eq!(leader_epoch, Some(Epoch(7)));
1128            }
1129            ServingState::Serving => panic!("expected NotServing after step_down"),
1130        }
1131    }
1132
1133    #[cfg(any(feature = "tls-rustls", feature = "tls-native"))]
1134    #[test]
1135    fn builder_stores_tls_config() {
1136        // The serve_* paths read `tls_config` from `Server` (not the builder)
1137        // after `into_router` consumes self — so the field must survive the
1138        // builder → Server hand-off, not just the builder method.
1139        use crate::test_fakes::InMemoryDriver;
1140
1141        let driver = Arc::new(InMemoryDriver::new());
1142        let cfg = tonic::transport::ServerTlsConfig::new();
1143        let server = Server::builder()
1144            .consensus_driver(driver)
1145            .tls_config(cfg)
1146            .build()
1147            .expect("build with tls_config must succeed");
1148        assert!(server.tls_config.is_some());
1149    }
1150
1151    #[test]
1152    fn build_rejects_zero_max_seq_count() {
1153        // max_seq_count(0) makes every positive `count` fail as
1154        // SeqCountTooLarge{max:0} (the count>=1 floor leaves no valid value) —
1155        // GetSeq is silently dead. build() must fail-fast rather than ship a
1156        // server where GetSeq can never succeed.
1157        // `Server` is not `Debug`, so match rather than `expect_err`.
1158        match Server::builder()
1159            .consensus_driver(Arc::new(crate::test_fakes::InMemoryDriver::new()))
1160            .max_seq_count(0)
1161            .build()
1162        {
1163            Err(BuildError::ZeroMaxSeqCount) => {}
1164            Err(other) => panic!("expected ZeroMaxSeqCount, got {other:?}"),
1165            Ok(_) => panic!("max_seq_count(0) must be rejected at build, but build succeeded"),
1166        }
1167    }
1168
1169    #[test]
1170    fn build_accepts_max_seq_count_of_one() {
1171        // 1 is the smallest usable cap: a single-ordinal block is valid, so the
1172        // floor and the cap coincide and GetSeq still works. Must build.
1173        Server::builder()
1174            .consensus_driver(Arc::new(crate::test_fakes::InMemoryDriver::new()))
1175            .max_seq_count(1)
1176            .build()
1177            .expect("max_seq_count(1) must build");
1178    }
1179
1180    #[test]
1181    fn build_accepts_max_seq_batch_keys_of_one() {
1182        // 1 is the smallest usable batch cap: a single-entry batch is valid, so
1183        // the cap must build (the guard rejects only 0).
1184        Server::builder()
1185            .consensus_driver(Arc::new(crate::test_fakes::InMemoryDriver::new()))
1186            .max_seq_batch_keys(1)
1187            .build()
1188            .expect("max_seq_batch_keys(1) must build");
1189    }
1190
1191    #[test]
1192    fn zero_max_seq_batch_keys_is_rejected() {
1193        // max_seq_batch_keys(0) makes every GetSeqBatch call fail as
1194        // "batch has N entries; the maximum is 0" — the RPC is silently dead.
1195        // build() must fail-fast rather than ship a server where GetSeqBatch
1196        // can never succeed.
1197        match Server::builder()
1198            .consensus_driver(Arc::new(crate::test_fakes::InMemoryDriver::new()))
1199            .max_seq_batch_keys(0)
1200            .build()
1201        {
1202            Err(BuildError::ZeroMaxSeqBatchKeys) => {}
1203            Err(other) => panic!("expected ZeroMaxSeqBatchKeys, got {other:?}"),
1204            Ok(_) => panic!("max_seq_batch_keys(0) must be rejected at build, but build succeeded"),
1205        }
1206    }
1207
1208    #[tokio::test]
1209    async fn join_to_server_result_passes_through_clean_outcome() {
1210        // Ok(Ok(())) — task finished cleanly; forward verbatim.
1211        let handle = tokio::spawn(async { Ok::<(), ServerError>(()) });
1212        let join = handle.await;
1213        assert!(matches!(join_to_server_result(join), Ok(())));
1214    }
1215
1216    #[tokio::test]
1217    async fn join_to_server_result_forwards_inner_error() {
1218        // Ok(Err(e)) — task returned an error; forward it.
1219        let handle = tokio::spawn(async {
1220            Err::<(), ServerError>(ServerError::WatchPanic {
1221                payload: "synthetic".into(),
1222                bt: Bt::capture(),
1223            })
1224        });
1225        let join = handle.await;
1226        match join_to_server_result(join) {
1227            Err(ServerError::WatchPanic { payload, .. }) => assert_eq!(payload, "synthetic"),
1228            other => panic!("expected forwarded WatchPanic, got {other:?}"),
1229        }
1230    }
1231
1232    #[tokio::test]
1233    async fn join_to_server_result_translates_panic_to_watch_panic() {
1234        // Err(JoinError::is_panic) — task panicked; surface as WatchPanic with
1235        // the payload stringified by `panic_payload_to_string`.
1236        let handle = tokio::spawn(async {
1237            panic!("intentional");
1238            #[allow(unreachable_code)]
1239            Ok::<(), ServerError>(())
1240        });
1241        let join = handle.await;
1242        match join_to_server_result(join) {
1243            Err(ServerError::WatchPanic { payload, .. }) => {
1244                assert!(payload.contains("intentional"))
1245            }
1246            other => panic!("expected WatchPanic, got {other:?}"),
1247        }
1248    }
1249
1250    #[tokio::test]
1251    async fn join_to_server_result_treats_cancellation_as_clean_exit() {
1252        // Err(JoinError::is_cancelled) — caller aborted the task; we asked
1253        // for that, so map to Ok.
1254        let handle: tokio::task::JoinHandle<Result<(), ServerError>> =
1255            tokio::spawn(async { futures::future::pending().await });
1256        handle.abort();
1257        let join = handle.await;
1258        assert!(matches!(join_to_server_result(join), Ok(())));
1259    }
1260
1261    #[tokio::test]
1262    async fn combine_watch_and_drain_surfaces_drain_error_when_watch_ok() {
1263        // Watch ended cleanly but the graceful drain failed (port stolen,
1264        // resource exhaustion). The drain error must not be swallowed.
1265        let watch = tokio::spawn(async { Ok::<(), ServerError>(()) }).await;
1266        let drain: Result<(), ServerError> = Err(ServerError::WatchStreamClosed);
1267        assert!(matches!(
1268            combine_watch_and_drain(watch, drain),
1269            Err(ServerError::WatchStreamClosed)
1270        ));
1271    }
1272
1273    #[tokio::test]
1274    async fn combine_watch_and_drain_returns_ok_when_both_succeed() {
1275        // Watch clean, drain clean — the only fully-Ok outcome.
1276        let watch = tokio::spawn(async { Ok::<(), ServerError>(()) }).await;
1277        let drain: Result<(), ServerError> = Ok(());
1278        assert!(matches!(combine_watch_and_drain(watch, drain), Ok(())));
1279    }
1280
1281    #[tokio::test]
1282    async fn combine_watch_and_drain_prefers_watch_error_over_drain_error() {
1283        // Both failed. The watch error is the root cause (poisoned state was
1284        // already published), so it wins; the drain error is dropped.
1285        let watch = tokio::spawn(async {
1286            Err::<(), ServerError>(ServerError::WatchPanic {
1287                payload: "watch".into(),
1288                bt: Bt::capture(),
1289            })
1290        })
1291        .await;
1292        let drain: Result<(), ServerError> = Err(ServerError::WatchStreamClosed);
1293        match combine_watch_and_drain(watch, drain) {
1294            Err(ServerError::WatchPanic { payload, .. }) => assert_eq!(payload, "watch"),
1295            other => panic!("expected watch error to win, got {other:?}"),
1296        }
1297    }
1298
1299    #[tokio::test]
1300    async fn combine_watch_and_drain_returns_watch_error_when_drain_ok() {
1301        // Watch failed, drain succeeded — forward the watch error verbatim.
1302        let watch = tokio::spawn(async {
1303            Err::<(), ServerError>(ServerError::WatchPanic {
1304                payload: "watch".into(),
1305                bt: Bt::capture(),
1306            })
1307        })
1308        .await;
1309        let drain: Result<(), ServerError> = Ok(());
1310        match combine_watch_and_drain(watch, drain) {
1311            Err(ServerError::WatchPanic { payload, .. }) => assert_eq!(payload, "watch"),
1312            other => panic!("expected forwarded watch error, got {other:?}"),
1313        }
1314    }
1315
1316    #[tokio::test]
1317    async fn dropping_watch_guard_closes_serving_gate_synchronously() {
1318        // Regression: dropping the guard must close the serving gate at the
1319        // drop site, not on the watch task's later timeline. Build a guard whose
1320        // watch handle never touches the core (so the ONLY thing that can flip
1321        // the state to NotServing is `Drop`), publish `Serving`, then drop the
1322        // guard and read the state with NO await in between. On the current-thread
1323        // test runtime no other task can run between the synchronous `drop` and
1324        // the synchronous `serving_state` read, so observing `NotServing` proves
1325        // `Drop` closed the gate synchronously rather than the watch task.
1326        let core = Arc::new(ServingCore::new(
1327            Duration::from_secs(3),
1328            tsoracle_core::DEFAULT_MAX_SEQ_COUNT,
1329            tsoracle_core::DEFAULT_MAX_SEQ_BATCH_KEYS,
1330        ));
1331        core.publish_serving();
1332
1333        let handle: tokio::task::JoinHandle<Result<(), ServerError>> =
1334            tokio::spawn(async { Ok(()) });
1335        let (cancel_tx, _cancel_rx) = tokio::sync::oneshot::channel::<()>();
1336        let guard = WatchGuard {
1337            cancel_tx: Some(cancel_tx),
1338            handle: Some(handle),
1339            shutdown_grace: Duration::from_secs(10),
1340            core: core.clone(),
1341            reporter: Arc::new(crate::reporter::Reporter::new()),
1342            heartbeat_cancel_tx: None,
1343            heartbeat_handle: None,
1344        };
1345
1346        drop(guard);
1347
1348        assert!(
1349            matches!(core.serving_state(), ServingState::NotServing { .. }),
1350            "dropping the WatchGuard must close the serving gate synchronously",
1351        );
1352    }
1353
1354    #[tokio::test]
1355    async fn serve_inner_closes_serving_gate_on_user_shutdown() {
1356        // Regression for the serve path: when the caller's shutdown fires,
1357        // `serve_inner` drops the watch cancel sender and waits out the grace,
1358        // forcibly aborting the watch task if it overruns. A task parked upstream
1359        // of any cancel-observing await (modelled here by a never-resolving
1360        // future) is aborted before it can publish `NotServing`, so the gate
1361        // would stay open unless `serve_inner` closes it itself. Seed `Serving`,
1362        // run the user-shutdown arm with a zero grace (immediate abort), and
1363        // assert the gate is closed on return.
1364        let core = Arc::new(ServingCore::new(
1365            Duration::from_secs(3),
1366            tsoracle_core::DEFAULT_MAX_SEQ_COUNT,
1367            tsoracle_core::DEFAULT_MAX_SEQ_BATCH_KEYS,
1368        ));
1369        core.publish_serving();
1370
1371        let watch_handle: tokio::task::JoinHandle<Result<(), ServerError>> =
1372            tokio::spawn(async { futures::future::pending().await });
1373        let (watch_cancel_tx, _watch_cancel_rx) = tokio::sync::oneshot::channel::<()>();
1374        let (tonic_cancel_tx, _tonic_cancel_rx) = tokio::sync::oneshot::channel::<()>();
1375
1376        // A serve future that is immediately ready models the user's shutdown
1377        // having fired; with the biased select preferring the (pending) watch arm,
1378        // control reaches the user-shutdown arm.
1379        let serve_future = async { Ok::<(), tonic::transport::Error>(()) };
1380
1381        let result = serve_inner(
1382            watch_cancel_tx,
1383            watch_handle,
1384            None, // heartbeat_cancel_tx — heartbeat disabled in this regression test
1385            None, // heartbeat_handle
1386            serve_future,
1387            tonic_cancel_tx,
1388            Duration::from_millis(0),
1389            core.clone(),
1390            Arc::new(crate::reporter::Reporter::new()),
1391        )
1392        .await;
1393
1394        assert!(
1395            result.is_ok(),
1396            "user shutdown must return Ok, got {result:?}"
1397        );
1398        assert!(
1399            matches!(core.serving_state(), ServingState::NotServing { .. }),
1400            "serve_inner's user-shutdown arm must close the serving gate synchronously",
1401        );
1402    }
1403
1404    #[cfg(feature = "reflection")]
1405    #[test]
1406    fn build_reflection_service_accepts_embedded_descriptor_set() {
1407        // The descriptor set emitted by `tsoracle-proto`'s build.rs must decode
1408        // cleanly — this is the production happy path that previously sat behind
1409        // an `expect`.
1410        assert!(build_reflection_service(tsoracle_proto::FILE_DESCRIPTOR_SET).is_ok());
1411    }
1412
1413    #[cfg(feature = "reflection")]
1414    #[test]
1415    fn build_reflection_service_maps_corrupt_descriptor_to_typed_error() {
1416        // A descriptor set that fails to decode (build artifact drift) must
1417        // surface as a typed `ServerError::ReflectionInit`, not a panic. The
1418        // bytes below are not a valid encoded `FileDescriptorSet`.
1419        let corrupt = b"\xff\xff\xff\xff not a descriptor set";
1420        // The Ok variant wraps a reflection service that is not `Debug`, so map
1421        // to a unit result before asserting on the error variant.
1422        match build_reflection_service(corrupt).map(|_| ()) {
1423            Err(ServerError::ReflectionInit(_)) => {}
1424            other => panic!("expected ReflectionInit error, got {other:?}"),
1425        }
1426    }
1427}