tsoracle_server/server.rs
1//
2// ░▀█▀░█▀▀░█▀█░█▀▄░█▀█░█▀▀░█░░░█▀▀
3// ░░█░░▀▀█░█░█░█▀▄░█▀█░█░░░█░░░█▀▀
4// ░░▀░░▀▀▀░▀▀▀░▀░▀░▀░▀░▀▀▀░▀▀▀░▀▀▀
5//
6// tsoracle — Distributed Timestamp Oracle
7// https://www.tsoracle.rs
8//
9// Copyright (c) 2026 Prisma Risk
10//
11// Licensed under the Apache License, Version 2.0 (the "License");
12// you may not use this file except in compliance with the License.
13// You may obtain a copy of the License at
14//
15// https://www.apache.org/licenses/LICENSE-2.0
16//
17// Unless required by applicable law or agreed to in writing, software
18// distributed under the License is distributed on an "AS IS" BASIS,
19// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20// See the License for the specific language governing permissions and
21// limitations under the License.
22//
23
24use core::time::Duration;
25use std::future::Future;
26use std::net::SocketAddr;
27use std::sync::Arc;
28use tokio::sync::watch;
29use tonic::service::Routes;
30use tonic::transport::Server as TonicServer;
31use tsoracle_consensus::ConsensusDriver;
32#[cfg(any(test, feature = "test-fakes"))]
33use tsoracle_core::{CoreError, WindowGrant};
34use tsoracle_core::{Epoch, PeerEndpoint};
35use tsoracle_proto::v1::tso_service_server::TsoServiceServer;
36
37use crate::bt::Bt;
38use crate::clock::{Clock, SystemClock};
39use crate::service::TsoServiceImpl;
40use crate::serving_core::ServingCore;
41
42#[derive(Debug, thiserror::Error)]
43pub enum BuildError {
44 #[error("consensus_driver is required")]
45 MissingConsensusDriver,
46 /// `max_seq_count` was set to 0. Every positive `count` would then be
47 /// rejected as `SeqCountTooLarge` (the `count >= 1` floor leaves no valid
48 /// value), silently disabling `GetSeq`. Rejected at build time so the
49 /// misconfiguration surfaces immediately rather than at first request.
50 #[error("max_seq_count must be >= 1 (0 would reject every GetSeq request)")]
51 ZeroMaxSeqCount,
52}
53
54#[derive(Debug, thiserror::Error)]
55pub enum ServerError {
56 #[error("transport: {0}")]
57 Transport(#[from] tonic::transport::Error),
58 #[error("consensus: {0}")]
59 Consensus(#[from] tsoracle_consensus::ConsensusError),
60 #[error("core: {0}")]
61 Core(#[from] tsoracle_core::CoreError),
62 /// The leader-watch task panicked. Distinct from a clean error return so
63 /// operators can tell "driver returned Err" (recoverable design) from
64 /// "task panicked" (programming bug).
65 #[error("leader-watch task panicked: {payload}{bt}")]
66 WatchPanic { payload: String, bt: Bt },
67 /// The consensus driver's `leadership_events()` stream ended cleanly while
68 /// the leader-watch task was running. The stream is contracted to live for
69 /// the life of the server, so its end is anomalous (driver shutdown, lost
70 /// session, etc.) — distinct from a `Consensus` error returned mid-fence.
71 /// The watch task publishes `ServingState::NotServing` before returning
72 /// this variant so embedders who never observe the [`WatchGuard`] still get
73 /// the documented fail-safe behavior.
74 #[error("consensus driver leadership stream closed")]
75 WatchStreamClosed,
76 /// The embedded protobuf descriptor set failed to decode while building the
77 /// gRPC reflection service. `tsoracle-proto`'s `build.rs` emits these bytes
78 /// from checked-in `.proto` sources, so a failure here signals build-artifact
79 /// drift (a corrupt or stale descriptor) rather than a runtime condition —
80 /// surfaced as a diagnosable startup error instead of a process panic.
81 #[cfg(feature = "reflection")]
82 #[error("failed to build gRPC reflection service from embedded descriptor set: {0}")]
83 ReflectionInit(#[source] tonic_reflection::server::Error),
84}
85
86#[derive(Clone, Debug)]
87pub enum ServingState {
88 NotServing {
89 leader_endpoint: Option<PeerEndpoint>,
90 leader_epoch: Option<Epoch>,
91 },
92 Serving,
93}
94
95/// Default bound on how long a graceful shutdown waits for the leader-watch
96/// task to stop cooperatively before forcibly aborting it. The abort is a
97/// last-resort safety net for a consensus driver whose `load_high_water` /
98/// `persist_high_water` is wedged (the trait places no latency bound; see
99/// [`ConsensusDriver`]). Chosen to sit comfortably under a typical Kubernetes
100/// `terminationGracePeriodSeconds` (30s) so the abort, the tonic drain, and
101/// process exit all complete before the kubelet escalates to SIGKILL.
102const DEFAULT_SHUTDOWN_GRACE: Duration = Duration::from_secs(10);
103
104pub struct ServerBuilder {
105 consensus: Option<Arc<dyn ConsensusDriver>>,
106 clock: Option<Arc<dyn Clock>>,
107 window_ahead: Duration,
108 failover_advance: Duration,
109 shutdown_grace: Duration,
110 heartbeat_interval: Duration,
111 max_seq_count: u32,
112 #[cfg(any(feature = "tls-rustls", feature = "tls-native"))]
113 tls_config: Option<tonic::transport::ServerTlsConfig>,
114}
115
116impl Default for ServerBuilder {
117 fn default() -> Self {
118 ServerBuilder {
119 consensus: None,
120 clock: None,
121 window_ahead: Duration::from_secs(3),
122 failover_advance: Duration::from_secs(1),
123 shutdown_grace: DEFAULT_SHUTDOWN_GRACE,
124 heartbeat_interval: Duration::from_secs(10),
125 max_seq_count: tsoracle_core::DEFAULT_MAX_SEQ_COUNT,
126 #[cfg(any(feature = "tls-rustls", feature = "tls-native"))]
127 tls_config: None,
128 }
129 }
130}
131
132impl ServerBuilder {
133 pub fn consensus_driver(mut self, driver: Arc<dyn ConsensusDriver>) -> Self {
134 self.consensus = Some(driver);
135 self
136 }
137 pub fn clock(mut self, clock: Arc<dyn Clock>) -> Self {
138 self.clock = Some(clock);
139 self
140 }
141 pub fn window_ahead(mut self, window_ahead: Duration) -> Self {
142 self.window_ahead = window_ahead;
143 self
144 }
145 pub fn failover_advance(mut self, failover_advance: Duration) -> Self {
146 self.failover_advance = failover_advance;
147 self
148 }
149
150 /// Bound on how long a graceful shutdown waits for the leader-watch task to
151 /// stop cooperatively before forcibly aborting it.
152 ///
153 /// On shutdown the server drops the watch task's cancel signal and waits for
154 /// it to publish `NotServing` and return. That wait is normally
155 /// near-instant, but the task observes cancellation only at its `select!`
156 /// boundaries — never inside a fence attempt. A consensus driver whose
157 /// `load_high_water` / `persist_high_water` never returns (the trait places
158 /// no latency bound) would otherwise park the task mid-fence and block
159 /// process exit indefinitely, leading to a SIGKILL on a Kubernetes drain.
160 /// Once `shutdown_grace` elapses the server aborts the task so exit always
161 /// makes progress. Set this comfortably below your deployment's
162 /// `terminationGracePeriodSeconds`. Defaults to 10s. A value of zero aborts
163 /// immediately without waiting for a cooperative stop.
164 pub fn shutdown_grace(mut self, shutdown_grace: Duration) -> Self {
165 self.shutdown_grace = shutdown_grace;
166 self
167 }
168
169 /// Interval between heartbeat log lines emitted at `target = "tsoracle::heartbeat"`.
170 /// Defaults to 10 seconds. Pass `Duration::ZERO` to disable the heartbeat task entirely.
171 ///
172 /// The heartbeat surfaces serving role, current epoch, requests served,
173 /// timestamps issued, and key error counters every interval — proof-of-life
174 /// for production deployments that may not have a metrics exporter installed.
175 ///
176 /// Requires `feature = "tracing"` to emit output; with `tracing` off the
177 /// setter is accepted but no task is spawned (no subscriber to log to).
178 pub fn heartbeat_interval(mut self, interval: Duration) -> Self {
179 self.heartbeat_interval = interval;
180 self
181 }
182
183 /// Configure TLS termination for this server. Applied inside
184 /// [`Server::serve`], [`Server::serve_with_shutdown`], and
185 /// [`Server::serve_with_listener`]. Not applied to [`Server::into_router`] —
186 /// embedders mounting tsoracle alongside their own services control TLS
187 /// on their own tonic builder.
188 #[cfg(any(feature = "tls-rustls", feature = "tls-native"))]
189 pub fn tls_config(mut self, cfg: tonic::transport::ServerTlsConfig) -> Self {
190 self.tls_config = Some(cfg);
191 self
192 }
193
194 /// Per-call ceiling on `GetSeq`'s `count` — the largest contiguous block a
195 /// single dense-sequence request may reserve. Defaults to
196 /// [`tsoracle_core::DEFAULT_MAX_SEQ_COUNT`] (`65_536`).
197 ///
198 /// This is a soft anti-abuse guardrail, not a representational limit: a
199 /// dense block is permanently consumed (the gapless counter only moves
200 /// forward), so the cap bounds how much one call can irrevocably reserve.
201 /// Raise it for batch-allocation workloads or lower it to tighten abuse
202 /// control. The server is the sole authority — clients no longer pre-check
203 /// `count` against a fixed constant, so changing this needs no client
204 /// rebuild; an over-cap request is rejected with `INVALID_ARGUMENT`. The
205 /// `count >= 1` floor is always enforced regardless of this value.
206 ///
207 /// A value of `0` is rejected by [`Self::build`] with
208 /// [`BuildError::ZeroMaxSeqCount`]: it would leave no valid `count` (the
209 /// floor is 1), silently disabling `GetSeq`, so the misconfiguration is
210 /// surfaced at build time rather than at first request.
211 pub fn max_seq_count(mut self, max_seq_count: u32) -> Self {
212 self.max_seq_count = max_seq_count;
213 self
214 }
215
216 pub fn build(self) -> Result<Server, BuildError> {
217 let consensus = self.consensus.ok_or(BuildError::MissingConsensusDriver)?;
218 if self.max_seq_count == 0 {
219 return Err(BuildError::ZeroMaxSeqCount);
220 }
221 let clock = self.clock.unwrap_or_else(|| Arc::new(SystemClock));
222 Ok(Server {
223 consensus,
224 clock,
225 window_ahead: self.window_ahead,
226 failover_advance: self.failover_advance,
227 shutdown_grace: self.shutdown_grace,
228 heartbeat_interval: self.heartbeat_interval,
229 core: Arc::new(ServingCore::new(self.window_ahead, self.max_seq_count)),
230 reporter: Arc::new(crate::reporter::Reporter::new()),
231 #[cfg(any(feature = "tls-rustls", feature = "tls-native"))]
232 tls_config: self.tls_config,
233 })
234 }
235}
236
237pub struct Server {
238 pub(crate) consensus: Arc<dyn ConsensusDriver>,
239 pub(crate) clock: Arc<dyn Clock>,
240 pub(crate) window_ahead: Duration,
241 pub(crate) failover_advance: Duration,
242 /// Bound on the graceful-shutdown wait for the leader-watch task before a
243 /// forced abort. See [`ServerBuilder::shutdown_grace`].
244 pub(crate) shutdown_grace: Duration,
245 /// Interval between periodic heartbeat log lines. See [`ServerBuilder::heartbeat_interval`].
246 ///
247 /// The only reader is the `cfg(feature = "tracing")` spawn block in
248 /// `into_router_parts`; without `tracing` there is no subscriber to log to
249 /// and the spawn arm is compiled out, so the field is genuinely unread.
250 #[cfg_attr(not(feature = "tracing"), allow(dead_code))]
251 pub(crate) heartbeat_interval: Duration,
252 /// Owns the allocator, serving-state channel, and both extension locks, with
253 /// the lock-ordering and step-down invariants private behind its methods.
254 ///
255 /// Held behind an `Arc` so the leader-watch task, the gRPC service, and the
256 /// [`WatchGuard`] / [`serve_inner`] shutdown paths can all reach the same
257 /// core. The guard and the serve loop use their clone to close the serving
258 /// gate *synchronously* at shutdown, leaving the watch task's later
259 /// `step_down` a harmless idempotent repeat.
260 pub(crate) core: Arc<ServingCore>,
261 pub(crate) reporter: Arc<crate::reporter::Reporter>,
262 #[cfg(any(feature = "tls-rustls", feature = "tls-native"))]
263 pub(crate) tls_config: Option<tonic::transport::ServerTlsConfig>,
264}
265
266/// Raw parts produced by [`Server::into_router_parts`]: the gRPC `Routes`, the
267/// leader-watch task's cooperative-cancel sender (dropping it stops the task),
268/// the task's join handle, and the optional heartbeat task's cancel sender /
269/// join handle. [`Server::into_router`] wraps these into a [`WatchGuard`]; the
270/// `serve_*` methods consume them directly via [`serve_inner`].
271///
272/// The heartbeat fields are `None` when the heartbeat is disabled — either by
273/// `ServerBuilder::heartbeat_interval(Duration::ZERO)` or by building without
274/// the `tracing` feature (there is no subscriber to log to).
275pub(crate) struct RouterParts {
276 pub routes: Routes,
277 pub cancel_tx: tokio::sync::oneshot::Sender<()>,
278 pub watch_handle: tokio::task::JoinHandle<Result<(), ServerError>>,
279 pub heartbeat_cancel_tx: Option<tokio::sync::oneshot::Sender<()>>,
280 pub heartbeat_handle: Option<tokio::task::JoinHandle<()>>,
281}
282
283impl Server {
284 pub fn builder() -> ServerBuilder {
285 ServerBuilder::default()
286 }
287
288 /// Subscribe to serving-state transitions.
289 ///
290 /// Returns a fresh `watch::Receiver` observing the same `ServingState`
291 /// the server publishes as leadership comes and goes. Embedders use this
292 /// to gate their own startup on `ServingState::Serving` (see the
293 /// `embedded_router` and piggyback examples). Because `into_router`
294 /// consumes the `Server`, capture the receiver before mounting.
295 ///
296 /// This method is the stable observation API: the receiver is minted from
297 /// the server's `watch::Sender`, so the receiver's type can evolve (e.g. a
298 /// future newtype around `ServingState`) without breaking embedders that
299 /// go through it.
300 pub fn subscribe(&self) -> watch::Receiver<ServingState> {
301 self.core.subscribe()
302 }
303}
304
305impl Server {
306 /// Return the configured `TsoServiceServer<TsoServiceImpl>` as a tonic
307 /// `Routes` value plus a [`WatchGuard`] for the spawned leader-watch task,
308 /// so callers can mount tsoracle's service alongside their own services
309 /// on a shared tonic listener instead of binding a dedicated port.
310 ///
311 /// The returned [`WatchGuard`] owns the leader-watch task. **Keep it alive
312 /// for as long as the mounted `Routes` should serve**: the watch task holds
313 /// an `Arc<Server>` (and the consensus driver) and maintains serving state
314 /// across leadership transitions. Dropping the guard — or calling
315 /// [`WatchGuard::shutdown`] — cooperatively stops the task at the embedder's
316 /// own shutdown. Without the guard the task would keep `Arc<Server>` alive
317 /// until the leadership stream happened to close.
318 ///
319 /// Every termination of the task — cooperative cancellation, driver error,
320 /// panic, or clean EOF on the leadership stream (surfaced as
321 /// `ServerError::WatchStreamClosed`) — publishes
322 /// `ServingState::NotServing { leader_endpoint: None }` before returning, so
323 /// all subsequent RPCs fail fast with `FAILED_PRECONDITION`. Embedders who
324 /// drop the guard without awaiting still get fail-safe behavior.
325 ///
326 /// The `Server::serve()` method is a thin wrapper over this — it calls
327 /// `into_router`, builds a tonic `Server`, and binds a listener.
328 ///
329 /// Returns `Err(ServerError::ReflectionInit)` (only reachable under the
330 /// `reflection` feature) if the embedded descriptor set fails to decode.
331 /// That decode happens before the leader-watch task is spawned, so a failure
332 /// leaves nothing running to clean up.
333 pub fn into_router(self) -> Result<(Routes, WatchGuard), ServerError> {
334 // Read the `Copy` grace before `into_router_parts` consumes `self`, so
335 // the returned guard can bound its own shutdown wait identically to the
336 // `serve_*` paths.
337 let shutdown_grace = self.shutdown_grace;
338 // Clone the shared core and reporter before `into_router_parts` consumes
339 // `self`, so the guard can close the serving gate synchronously on drop /
340 // shutdown rather than relying on the watch task's later publish, and can
341 // record the shutdown_watch_aborted counter if the grace-bounded reap fires.
342 let core = self.core.clone();
343 let reporter = self.reporter.clone();
344 let parts = self.into_router_parts()?;
345 Ok((
346 parts.routes,
347 WatchGuard {
348 cancel_tx: Some(parts.cancel_tx),
349 handle: Some(parts.watch_handle),
350 shutdown_grace,
351 core,
352 reporter,
353 heartbeat_cancel_tx: parts.heartbeat_cancel_tx,
354 heartbeat_handle: parts.heartbeat_handle,
355 },
356 ))
357 }
358
359 /// Spawn the leader-watch task and assemble the gRPC `Routes`, returning
360 /// the raw parts: the routes, the task's cooperative-cancel sender, and its
361 /// `JoinHandle`. [`Self::into_router`] wraps these into a [`WatchGuard`] for
362 /// embedders; the `serve_*` methods drive the parts directly via
363 /// [`serve_inner`], so neither path needs to unwrap the guard's `Option`
364 /// fields.
365 fn into_router_parts(self) -> Result<RouterParts, ServerError> {
366 // Build the reflection service first: a descriptor-decode failure must
367 // surface before we spawn the leader-watch task below, so an error path
368 // never leaks a running task.
369 #[cfg(feature = "reflection")]
370 let reflection = build_reflection_service(tsoracle_proto::FILE_DESCRIPTOR_SET)?;
371
372 let server = Arc::new(self);
373
374 // Cooperative cancellation channel. The `WatchGuard` holds the sender;
375 // the task's `cancel` future resolves on either an explicit send or a
376 // sender drop, so dropping the guard is sufficient to stop the task.
377 let (cancel_tx, cancel_rx) = tokio::sync::oneshot::channel::<()>();
378
379 let watch_server = server.clone();
380 let watch_handle = tokio::spawn(async move {
381 use futures::FutureExt;
382 // Resolves when the WatchGuard signals cancellation or is dropped.
383 let cancel = async move {
384 let _ = cancel_rx.await;
385 };
386 // catch_unwind so a panic in run_leader_watch still routes through
387 // the poisoning path. Without this, embedders who mount into_router
388 // directly and never observe the guard would see
389 // ServingState::Serving remain published while the watch task is
390 // dead — the inverse of the fail-safe guarantee documented above.
391 // The panic is re-raised after poisoning so serve / serve_with_*
392 // continue to translate it into ServerError::WatchPanic via
393 // join_to_server_result.
394 let outcome = std::panic::AssertUnwindSafe(crate::fence::run_leader_watch(
395 watch_server.clone(),
396 cancel,
397 ))
398 .catch_unwind()
399 .await;
400 match outcome {
401 Ok(result) => {
402 if let Err(ref _e) = result {
403 // Poison BEFORE returning so embedders who do not observe
404 // the guard still get fail-safe behavior.
405 watch_server.core.step_down(None, None);
406 #[cfg(feature = "tracing")]
407 tracing::error!(error = %_e, "leader-watch terminated; serving disabled");
408 }
409 result
410 }
411 Err(panic_payload) => {
412 // Mirror the Err branch: poison BEFORE re-raising so
413 // guard-dropping embedders still observe NotServing.
414 watch_server.core.step_down(None, None);
415 #[cfg(feature = "tracing")]
416 tracing::error!("leader-watch panicked; serving disabled");
417 std::panic::resume_unwind(panic_payload);
418 }
419 }
420 });
421
422 // Spawn the heartbeat task, if enabled. Gated on `feature = "tracing"`
423 // because the heartbeat module is only compiled with `tracing`
424 // (no subscriber to emit to without it) — and on a non-zero interval,
425 // since `Duration::ZERO` is the documented opt-out.
426 //
427 // The task body is wrapped in `AssertUnwindSafe(...).catch_unwind()`
428 // mirroring the leader-watch spawn above: on panic we bump the
429 // `heartbeat_task_panicked` counter and log at error level, then let
430 // the task end (no restart — the heartbeat is observability, not
431 // correctness, so a panicked task must not be allowed to thrash).
432 let (heartbeat_cancel_tx, heartbeat_handle) = {
433 #[cfg(feature = "tracing")]
434 {
435 if server.heartbeat_interval.is_zero() {
436 (None, None)
437 } else {
438 use futures::FutureExt;
439 let (htx, hrx) = tokio::sync::oneshot::channel::<()>();
440 let hb_reporter = server.reporter.clone();
441 let hb_core = server.core.clone();
442 let hb_interval = server.heartbeat_interval;
443 let handle = tokio::spawn(async move {
444 let outcome =
445 std::panic::AssertUnwindSafe(crate::heartbeat::run_heartbeat(
446 hb_interval,
447 hb_core,
448 hb_reporter.clone(),
449 hrx,
450 ))
451 .catch_unwind()
452 .await;
453 if outcome.is_err() {
454 hb_reporter.heartbeat_task_panicked.increment(1);
455 tracing::error!(
456 target: "tsoracle::heartbeat",
457 "heartbeat task panicked; liveness logs disabled until restart"
458 );
459 }
460 });
461 (Some(htx), Some(handle))
462 }
463 }
464 #[cfg(not(feature = "tracing"))]
465 {
466 (None, None)
467 }
468 };
469
470 let service = TsoServiceImpl { server };
471 #[allow(unused_mut)]
472 let mut routes = Routes::new(TsoServiceServer::new(service));
473 #[cfg(feature = "reflection")]
474 {
475 routes = routes.add_service(reflection);
476 }
477 Ok(RouterParts {
478 routes,
479 cancel_tx,
480 watch_handle,
481 heartbeat_cancel_tx,
482 heartbeat_handle,
483 })
484 }
485
486 pub async fn serve(self, addr: SocketAddr) -> Result<(), ServerError> {
487 self.serve_with_shutdown(addr, futures::future::pending())
488 .await
489 }
490
491 /// Run the gRPC server until either the caller's `shutdown` fires or the
492 /// leader-watch task terminates.
493 ///
494 /// Three outcomes:
495 /// 1. `shutdown` fires first → tonic drains in-flights and returns Ok.
496 /// The watch task is then stopped cooperatively, bounded by
497 /// `shutdown_grace` and forcibly aborted if it overruns (e.g. parked in a
498 /// wedged consensus-driver call); any error it had been about to return
499 /// is forfeited (the process is shutting down anyway).
500 /// 2. Watch returns `Ok(Err(e))` → poisoned state is already published;
501 /// `cancel_tx` triggers tonic's graceful shutdown; in-flight `GetTs`
502 /// calls whose `try_grant` already succeeded complete with the
503 /// timestamps they were allocated; new calls fail fast. Returns `Err(e)`
504 /// — the watch error wins even if the drain itself also errors (see
505 /// `combine_watch_and_drain`); a drain error is surfaced only when the
506 /// watch ended cleanly.
507 /// 3. Watch task panics → returns `Err(ServerError::WatchPanic{..})`
508 /// with the panic payload stringified. Same drain semantics as (2).
509 pub async fn serve_with_shutdown(
510 self,
511 addr: SocketAddr,
512 shutdown: impl Future<Output = ()> + Send + 'static,
513 ) -> Result<(), ServerError> {
514 #[cfg(any(feature = "tls-rustls", feature = "tls-native"))]
515 let tls_config = self.tls_config.clone();
516
517 // Read the `Copy` grace and clone the shared core and reporter before
518 // `into_router_parts` consumes `self`.
519 let shutdown_grace = self.shutdown_grace;
520 let core = self.core.clone();
521 let reporter = self.reporter.clone();
522 let parts = self.into_router_parts()?;
523 let (combined_shutdown, cancel_tx) = combined_shutdown_with_cancel(shutdown);
524
525 let mut tonic = TonicServer::builder();
526 #[cfg(any(feature = "tls-rustls", feature = "tls-native"))]
527 if let Some(cfg) = tls_config {
528 tonic = tonic.tls_config(cfg).map_err(ServerError::Transport)?;
529 }
530 let serve = tonic
531 .add_routes(parts.routes)
532 .serve_with_shutdown(addr, combined_shutdown);
533
534 serve_inner(
535 parts.cancel_tx,
536 parts.watch_handle,
537 parts.heartbeat_cancel_tx,
538 parts.heartbeat_handle,
539 serve,
540 cancel_tx,
541 shutdown_grace,
542 core,
543 reporter,
544 )
545 .await
546 }
547
548 /// Run the gRPC server on a caller-provided `TcpListener` until either
549 /// the caller-provided `shutdown` fires or the leader-watch task terminates.
550 ///
551 /// Use this instead of [`Self::serve_with_shutdown`] when you need to
552 /// observe the OS-picked port (`127.0.0.1:0`) before clients connect, or
553 /// when you want to wrap the listener in an outer adapter before passing it
554 /// in. The listening socket is owned by the caller and passed here; tsoracle
555 /// starts accepting on it immediately.
556 ///
557 /// Three outcomes:
558 /// 1. `shutdown` fires first → tonic drains in-flights and returns `Ok`.
559 /// The watch handle is aborted; any error it had been about to return
560 /// is forfeited (the process is shutting down anyway).
561 /// 2. Watch returns `Ok(Err(e))` → poisoned state is already published;
562 /// the caller-provided shutdown is cancelled internally so tonic begins
563 /// graceful shutdown; in-flight `GetTs` calls whose `try_grant` already
564 /// succeeded complete with the timestamps they were allocated; new calls
565 /// fail fast. Returns `Err(e)` — the watch error wins even if the drain
566 /// itself also errors (see `combine_watch_and_drain`); a drain error is
567 /// surfaced only when the watch ended cleanly.
568 /// 3. Watch task panics → returns `Err(ServerError::WatchPanic{..})`
569 /// with the panic payload stringified. Same drain semantics as (2).
570 pub async fn serve_with_listener(
571 self,
572 listener: tokio::net::TcpListener,
573 shutdown: impl Future<Output = ()> + Send + 'static,
574 ) -> Result<(), ServerError> {
575 #[cfg(any(feature = "tls-rustls", feature = "tls-native"))]
576 let tls_config = self.tls_config.clone();
577
578 // Read the `Copy` grace and clone the shared core and reporter before
579 // `into_router_parts` consumes `self`.
580 let shutdown_grace = self.shutdown_grace;
581 let core = self.core.clone();
582 let reporter = self.reporter.clone();
583 let parts = self.into_router_parts()?;
584 let (combined_shutdown, cancel_tx) = combined_shutdown_with_cancel(shutdown);
585
586 let incoming = tonic::transport::server::TcpIncoming::from(listener);
587
588 let mut tonic = TonicServer::builder();
589 #[cfg(any(feature = "tls-rustls", feature = "tls-native"))]
590 if let Some(cfg) = tls_config {
591 tonic = tonic.tls_config(cfg).map_err(ServerError::Transport)?;
592 }
593 let serve = tonic
594 .add_routes(parts.routes)
595 .serve_with_incoming_shutdown(incoming, combined_shutdown);
596
597 serve_inner(
598 parts.cancel_tx,
599 parts.watch_handle,
600 parts.heartbeat_cancel_tx,
601 parts.heartbeat_handle,
602 serve,
603 cancel_tx,
604 shutdown_grace,
605 core,
606 reporter,
607 )
608 .await
609 }
610}
611
612/// RAII handle to the leader-watch task spawned by [`Server::into_router`].
613///
614/// The watch task holds an `Arc<Server>` (and thus the consensus driver) and
615/// maintains serving state across leadership transitions. This guard ties the
616/// task's lifetime to the guard's: dropping it cooperatively cancels the task,
617/// and the task publishes [`ServingState::NotServing`] before it stops, so any
618/// `Routes` an embedder still has mounted fails subsequent RPCs fast.
619///
620/// Cancellation is cooperative — the task stops at its next await boundary and
621/// never mid-fence, so it is never torn down while holding internal locks, in
622/// contrast to a raw [`tokio::task::JoinHandle::abort`].
623pub struct WatchGuard {
624 // `Option` so `Drop` and the consuming `shutdown` / `abort` methods can
625 // each take a field without a partial-move conflict against the `Drop`
626 // impl. Dropping the sender (rather than sending) is itself the cancel
627 // signal: the task's `cancel` future resolves on sender-drop too.
628 cancel_tx: Option<tokio::sync::oneshot::Sender<()>>,
629 handle: Option<tokio::task::JoinHandle<Result<(), ServerError>>>,
630 /// Bound on the cooperative-stop wait in [`Self::shutdown`] before a forced
631 /// abort. Inherited from [`ServerBuilder::shutdown_grace`].
632 shutdown_grace: Duration,
633 /// Shared serving core, cloned from the `Server`. Lets `Drop` (and the
634 /// consuming `shutdown` / `abort`, which trigger `Drop` on return) close the
635 /// serving gate synchronously at the drop site, instead of waiting for the
636 /// watch task to observe cancellation and publish `NotServing` on its own
637 /// timeline — a window during which the fast gate would still admit RPCs.
638 core: Arc<ServingCore>,
639 /// Metrics reporter, cloned from the `Server`. Used to record the
640 /// `shutdown_watch_aborted` counter if the grace-bounded reap fires.
641 reporter: Arc<crate::reporter::Reporter>,
642 /// Cooperative-cancel sender for the heartbeat task. `None` when the
643 /// heartbeat is disabled (interval == 0 or built without `tracing`).
644 /// Dropping the sender resolves the task's cancel future.
645 heartbeat_cancel_tx: Option<tokio::sync::oneshot::Sender<()>>,
646 /// Join handle for the heartbeat task. `None` when the heartbeat is
647 /// disabled. Output is `()` because the task never returns an error —
648 /// panics are caught inside the task body and recorded via the
649 /// `heartbeat_task_panicked` counter.
650 heartbeat_handle: Option<tokio::task::JoinHandle<()>>,
651}
652
653impl WatchGuard {
654 /// Signal the leader-watch task to stop, wait for it to drain, and report
655 /// its outcome.
656 ///
657 /// A cooperatively cancelled task returns `Ok(())` — the stop was
658 /// requested, so it is not an error. If the task had already terminated on
659 /// its own (driver error, stream EOF, or panic) the original outcome is
660 /// surfaced verbatim: `Err(e)` or [`ServerError::WatchPanic`]. Either way
661 /// serving state is `NotServing` once this returns.
662 ///
663 /// The cooperative wait is bounded by the configured
664 /// [`ServerBuilder::shutdown_grace`]: if the task is parked in a
665 /// consensus-driver call that never returns it is aborted once the grace
666 /// elapses (still reported as `Ok(())`), so an embedder's shutdown can never
667 /// wedge behind a hung driver.
668 pub async fn shutdown(mut self) -> Result<(), ServerError> {
669 // Dropping the senders fires each task's cancel future. The heartbeat
670 // task is reaped first because it is bounded by `tokio::time::sleep`
671 // (cooperative stop is fast and never wedges on a driver call), so its
672 // reap returns quickly and leaves the grace budget for the watch task
673 // — which may be parked in a wedged consensus-driver call.
674 self.heartbeat_cancel_tx.take();
675 self.cancel_tx.take();
676 if let Some(mut hb_handle) = self.heartbeat_handle.take() {
677 match tokio::time::timeout(self.shutdown_grace, &mut hb_handle).await {
678 Ok(Ok(())) => {}
679 // Task panicked — already counted + logged via catch_unwind in
680 // the task body. Nothing more to do here.
681 Ok(Err(_join_err)) => {}
682 // Grace overrun — sleep + select! should always observe a
683 // dropped cancel sender, so this is a backstop. Abort and
684 // reap; no separate metric (the heartbeat is observability
685 // only — its lateness is not a serving correctness signal).
686 Err(_elapsed) => {
687 hb_handle.abort();
688 let _ = (&mut hb_handle).await;
689 }
690 }
691 }
692 match self.handle.take() {
693 Some(mut handle) => join_to_server_result(
694 await_watch_within_grace(&mut handle, self.shutdown_grace, &self.reporter).await,
695 ),
696 None => Ok(()),
697 }
698 }
699
700 /// Hard-abort the leader-watch task without waiting for a cooperative stop.
701 ///
702 /// Prefer [`Self::shutdown`] or simply dropping the guard; both let the
703 /// task stop at a safe point. This is an escape hatch for callers that
704 /// cannot await and accept that the task may be torn down mid-fence.
705 pub fn abort(mut self) {
706 if let Some(handle) = self.handle.take() {
707 handle.abort();
708 }
709 // Hard-abort the heartbeat task too — leaving it running after the
710 // watch is torn down would publish heartbeats describing a stale
711 // (typically `NotServing`) view until the Arc<Reporter> is dropped.
712 if let Some(hb_handle) = self.heartbeat_handle.take() {
713 hb_handle.abort();
714 }
715 }
716
717 /// Whether the leader-watch task has finished — terminated for any reason
718 /// (cooperative cancel, driver error, stream EOF, or panic).
719 ///
720 /// A read-only liveness probe that neither consumes the guard nor disturbs
721 /// its cancel-on-drop behavior, so an embedder can poll task health while
722 /// keeping the guard alive.
723 pub fn is_finished(&self) -> bool {
724 self.handle
725 .as_ref()
726 .is_none_or(|handle| handle.is_finished())
727 }
728}
729
730impl Drop for WatchGuard {
731 fn drop(&mut self) {
732 // Close the serving gate synchronously, here at the drop site. Dropping
733 // the cancel sender below only *requests* the watch task to stop; the
734 // task publishes `NotServing` later, on its own timeline. Between this
735 // drop and the task's next poll the fast gate would still read `Serving`
736 // (and the allocator would still grant), so an RPC could be admitted by a
737 // server that has already been told to stop serving. `step_down` clears
738 // the allocator and publishes `NotServing` now, before any await; the
739 // watch task's later `step_down(None, None)` on cooperative cancel (see
740 // `fence::run_leader_watch`) republishes the identical state, so the
741 // double-close is harmless and idempotent.
742 self.core.step_down(None, None);
743 // Dropping the sender (if `shutdown` / `abort` did not already take it)
744 // resolves the task's cancel future; the task then publishes
745 // `NotServing` and returns. The `JoinHandle` is dropped here too,
746 // detaching the task to finish its cooperative shutdown on its own.
747 self.cancel_tx.take();
748 // Same treatment for the heartbeat task. `Drop` is sync so we cannot
749 // await the cooperative stop; instead we drop the cancel sender (the
750 // task will observe it at its next select! boundary) and hard-abort
751 // the handle so it cannot outlive the guard and publish heartbeats
752 // describing a stale view if the runtime keeps the Arc alive.
753 self.heartbeat_cancel_tx.take();
754 if let Some(hb_handle) = self.heartbeat_handle.take() {
755 hb_handle.abort();
756 }
757 }
758}
759
760/// Merge the caller's `shutdown` future with an internal cancellation signal.
761///
762/// Both [`Server::serve_with_shutdown`] and [`Server::serve_with_listener`]
763/// need tonic to stop when EITHER the caller's `shutdown` fires OR the
764/// leader-watch task terminates (signalled by firing the returned
765/// `oneshot::Sender`). This builds that merged shutdown future and hands back
766/// the sender so [`serve_inner`] can trip it from the watch arm.
767fn combined_shutdown_with_cancel(
768 shutdown: impl Future<Output = ()> + Send + 'static,
769) -> (
770 impl Future<Output = ()> + Send + 'static,
771 tokio::sync::oneshot::Sender<()>,
772) {
773 let (cancel_tx, cancel_rx) = tokio::sync::oneshot::channel::<()>();
774 let combined_shutdown = async move {
775 tokio::select! {
776 _ = shutdown => {}
777 _ = cancel_rx => {}
778 }
779 };
780 (combined_shutdown, cancel_tx)
781}
782
783/// Wait for the leader-watch task to stop cooperatively, but no longer than
784/// `grace`, then forcibly abort it if it is still running.
785///
786/// The watch task observes its cancel signal only at the `select!` boundaries
787/// in [`crate::fence::run_leader_watch`], never inside a fence attempt. A
788/// consensus driver whose `load_high_water` / `persist_high_water` never
789/// returns (the trait places no latency bound; see
790/// [`tsoracle_consensus::ConsensusDriver`]) therefore parks the task upstream
791/// of any cancel-observing await, so dropping the cancel sender cannot stop it.
792/// Left unbounded, the shutdown wait would block process exit until the kubelet
793/// escalates to SIGKILL on a drain. Bounding the wait by `grace` and aborting
794/// on expiry guarantees forward progress: `tokio` tears a suspended task (and
795/// the wedged driver future it holds) down at the abort, dropping its
796/// drain-barrier guard.
797///
798/// Returns the task's join result. A clean cooperative stop forwards its real
799/// outcome verbatim; an aborted task surfaces as a cancelled `JoinError`, which
800/// [`join_to_server_result`] maps to `Ok(())` — the stop was requested, so a
801/// forced abort during shutdown is not an error. A `grace` of zero aborts
802/// immediately (the `timeout` future is already elapsed on first poll).
803async fn await_watch_within_grace(
804 watch_handle: &mut tokio::task::JoinHandle<Result<(), ServerError>>,
805 grace: Duration,
806 reporter: &Arc<crate::reporter::Reporter>,
807) -> Result<Result<(), ServerError>, tokio::task::JoinError> {
808 match tokio::time::timeout(grace, &mut *watch_handle).await {
809 Ok(join_result) => join_result,
810 Err(_elapsed) => {
811 reporter.shutdown_watch_aborted.increment(1);
812 #[cfg(feature = "tracing")]
813 tracing::warn!(
814 grace_ms = grace.as_millis() as u64,
815 "leader-watch task did not stop within the shutdown grace; aborting it (a consensus-driver call likely exceeded its latency bound)"
816 );
817 watch_handle.abort();
818 // Reap the aborted task so its Drop (releasing any held drain-barrier
819 // guard) runs before we report shutdown complete. Bounded: an aborted
820 // task resolves at its next poll.
821 (&mut *watch_handle).await
822 }
823 }
824}
825
826/// Drive the gRPC `serve_future` against the leader-watch task, shared by
827/// [`Server::serve_with_shutdown`] and [`Server::serve_with_listener`].
828///
829/// The two public methods differ only in how `serve_future` is assembled
830/// (address-bound via `serve_with_shutdown` vs listener-bound via
831/// `serve_with_incoming_shutdown`); everything downstream — the biased select,
832/// the cooperative-cancel path, and the drain/translate logic — is identical
833/// and lives here so a future change need only be made once.
834///
835/// `tonic_cancel_tx` is the cancellation half paired with the `serve_future`'s
836/// shutdown signal (see [`combined_shutdown_with_cancel`]); firing it begins
837/// tonic's graceful drain when the watch task terminates first. `watch_cancel_tx`
838/// is the leader-watch task's own cooperative-cancel sender (the same one a
839/// [`WatchGuard`] holds for embedders); dropping it stops the task. Taking the
840/// raw parts rather than a `WatchGuard` keeps this path free of the guard's
841/// `Option` fields — neither the watch handle nor the cancel sender is optional
842/// here. `shutdown_grace` bounds the user-shutdown arm's wait for the watch task
843/// (see [`await_watch_within_grace`]). `core` is the shared serving core (the
844/// same one the watch task and the gRPC service hold): the user-shutdown arm
845/// closes the gate on it synchronously so no RPC is admitted in the window
846/// before the watch task observes cancellation and publishes `NotServing`.
847// Private serve helper. The wide signature is the cost of being the single
848// merge point for the two public `serve_*` paths and the leader-watch +
849// heartbeat task pair: bundling these into a struct just to placate clippy
850// would obscure the lifecycle (every parameter is consumed exactly once and
851// has no shared identity worth naming). Keep the arguments visible.
852#[allow(clippy::too_many_arguments)]
853async fn serve_inner<S>(
854 watch_cancel_tx: tokio::sync::oneshot::Sender<()>,
855 mut watch_handle: tokio::task::JoinHandle<Result<(), ServerError>>,
856 heartbeat_cancel_tx: Option<tokio::sync::oneshot::Sender<()>>,
857 heartbeat_handle: Option<tokio::task::JoinHandle<()>>,
858 serve_future: S,
859 tonic_cancel_tx: tokio::sync::oneshot::Sender<()>,
860 shutdown_grace: Duration,
861 core: Arc<ServingCore>,
862 reporter: Arc<crate::reporter::Reporter>,
863) -> Result<(), ServerError>
864where
865 S: Future<Output = Result<(), tonic::transport::Error>>,
866{
867 tokio::pin!(serve_future);
868
869 let outcome = tokio::select! {
870 // Bias toward the watch arm: if both are ready in the same poll
871 // (rare but possible — graceful shutdown completed in the same
872 // tick the watch returned), we want to surface the watch error
873 // rather than report a clean shutdown.
874 biased;
875
876 watch_result = &mut watch_handle => {
877 // Watch terminated. State is already poisoned (see watch
878 // task body in into_router). Trigger tonic drain, wait for
879 // it to finish, then report the watch's outcome — preferring
880 // it over any drain error, which surfaces only if the watch
881 // itself ended cleanly.
882 let _ = tonic_cancel_tx.send(());
883 let drain_result = serve_future.await;
884 combine_watch_and_drain(watch_result, drain_result)
885 }
886 serve_result = &mut serve_future => {
887 // User shutdown fired (or our cancel — but watch arm has
888 // `biased` priority, so reaching here means user shutdown).
889 // Prefer a cooperative stop: dropping the cancel sender resolves
890 // the task's cancel future so it stops at its next `select!`
891 // boundary, having published `NotServing` and never torn down
892 // mid-fence while holding `extension_gate.write()`. But a
893 // cooperative stop is only observed at those boundaries, never
894 // inside a fence attempt — a consensus-driver call that never
895 // returns (the trait places no latency bound) would park the task
896 // upstream of any cancel point and block process exit until a
897 // kubelet SIGKILL. `await_watch_within_grace` therefore bounds the
898 // wait by `shutdown_grace` and aborts the task if it overruns. The
899 // task's own outcome (a clean `Ok(())` on cooperative stop, a
900 // cancelled `JoinError` on abort) is discarded; the user-requested
901 // shutdown result wins.
902 //
903 // Close the serving gate synchronously first: dropping the sender
904 // only *requests* the stop, and a task aborted on grace expiry (or
905 // simply not yet rescheduled) may never reach its own `step_down`. So
906 // a `GetTs` arriving during the drain would still be admitted unless
907 // we close the gate here. `step_down` is idempotent with the watch
908 // task's own cooperative-cancel publish.
909 core.step_down(None, None);
910 drop(watch_cancel_tx);
911 let _ = await_watch_within_grace(&mut watch_handle, shutdown_grace, &reporter).await;
912 serve_result?;
913 Ok(())
914 }
915 };
916
917 // Stop the heartbeat task on every exit path. Done after the watch reap so
918 // the watch-arm `combine_watch_and_drain` already saw the watch outcome,
919 // and the user-shutdown arm has finished its grace-bounded wait. Dropping
920 // the cancel sender breaks the task's `tokio::select! { biased; cancel,
921 // sleep }` loop on the next poll; if the task is wedged for any reason we
922 // hard-abort on grace overrun. The task's outcome is observability only
923 // and cannot influence serving correctness, so its join result is dropped.
924 drop(heartbeat_cancel_tx);
925 if let Some(mut hb_handle) = heartbeat_handle {
926 match tokio::time::timeout(shutdown_grace, &mut hb_handle).await {
927 Ok(Ok(())) => {}
928 Ok(Err(_join_err)) => {} // panic — already counted via catch_unwind
929 Err(_elapsed) => {
930 hb_handle.abort();
931 let _ = (&mut hb_handle).await;
932 }
933 }
934 }
935
936 outcome
937}
938
939/// Convert a `JoinHandle` result into a `ServerError`-typed result.
940///
941/// - `Ok(Ok(()))` — cooperative cancellation: `run_leader_watch` observed its
942/// cancel signal (the `WatchGuard` was dropped, `WatchGuard::shutdown` was
943/// called, or `serve_inner` cancelled it on user shutdown), published
944/// `NotServing`, and returned cleanly. Forwarded verbatim as `Ok(())`.
945/// - `Ok(Err(e))` — task returned an error (including `WatchStreamClosed`
946/// from a clean EOF). Forward verbatim.
947/// - `Err(JoinError)` — task was aborted or panicked. An abort
948/// (`WatchGuard::abort` or `JoinHandle::abort`) maps to Ok (we asked for it);
949/// a panic maps to `WatchPanic` with payload.
950fn join_to_server_result(
951 join_result: Result<Result<(), ServerError>, tokio::task::JoinError>,
952) -> Result<(), ServerError> {
953 match join_result {
954 Ok(inner) => inner,
955 Err(join_err) if join_err.is_panic() => {
956 let payload = panic_payload_to_string(join_err.into_panic());
957 Err(ServerError::WatchPanic {
958 payload,
959 bt: Bt::capture(),
960 })
961 }
962 Err(_cancelled) => Ok(()),
963 }
964}
965
966/// Combine the leader-watch outcome with the tonic graceful-drain outcome
967/// after the watch arm fired.
968///
969/// When the watch task terminates first we trigger the drain and then must
970/// report a single result. The watch error is the root cause — poisoned
971/// serving state was already published before the task returned — so it wins
972/// when both fail. A drain error (port stolen, resource exhaustion) is only
973/// surfaced when the watch outcome is otherwise `Ok`; previously it was
974/// discarded via `let _ = serve.await`, hiding a failed drain behind a clean
975/// shutdown report.
976///
977/// Generic over the drain error so the precedence logic is unit-testable
978/// without fabricating a `tonic::transport::Error` (which has no public
979/// constructor): production passes `Result<(), tonic::transport::Error>`,
980/// tests pass `Result<(), ServerError>` via the reflexive `From` impl.
981fn combine_watch_and_drain<E>(
982 watch_result: Result<Result<(), ServerError>, tokio::task::JoinError>,
983 drain_result: Result<(), E>,
984) -> Result<(), ServerError>
985where
986 ServerError: From<E>,
987{
988 match join_to_server_result(watch_result) {
989 Err(watch_err) => Err(watch_err),
990 Ok(()) => drain_result.map_err(ServerError::from),
991 }
992}
993
994/// Build the gRPC reflection service from an encoded protobuf descriptor set.
995///
996/// Factored out of [`Server::into_router`] so the decode-failure path is unit
997/// testable: production passes [`tsoracle_proto::FILE_DESCRIPTOR_SET`], while
998/// tests can feed deliberately corrupt bytes to exercise the error mapping.
999/// A decode failure becomes [`ServerError::ReflectionInit`] rather than a panic.
1000#[cfg(feature = "reflection")]
1001fn build_reflection_service(
1002 descriptor_set: &[u8],
1003) -> Result<
1004 tonic_reflection::server::v1::ServerReflectionServer<
1005 impl tonic_reflection::server::v1::ServerReflection,
1006 >,
1007 ServerError,
1008> {
1009 tonic_reflection::server::Builder::configure()
1010 .register_encoded_file_descriptor_set(descriptor_set)
1011 .build_v1()
1012 .map_err(ServerError::ReflectionInit)
1013}
1014
1015fn panic_payload_to_string(panic: Box<dyn std::any::Any + Send>) -> String {
1016 if let Some(text) = panic.downcast_ref::<&'static str>() {
1017 (*text).to_string()
1018 } else if let Some(text) = panic.downcast_ref::<String>() {
1019 text.clone()
1020 } else {
1021 "watch task panicked with non-string payload".to_string()
1022 }
1023}
1024
1025#[cfg(any(test, feature = "test-fakes"))]
1026impl Server {
1027 /// Test-only entry point for the leader-watch loop. Exposed to integration
1028 /// tests via the `test-fakes` feature; not part of the stable public API.
1029 #[doc(hidden)]
1030 pub async fn run_leader_watch_for_tests(self: Arc<Self>) -> Result<(), ServerError> {
1031 // A never-resolving cancel future: these tests drive termination via
1032 // leadership events or `JoinHandle::abort`, not cooperative cancel.
1033 crate::fence::run_leader_watch(self, futures::future::pending()).await
1034 }
1035
1036 /// Test-only allocator probe. Issues a window grant against the current
1037 /// in-memory state without going through the gRPC service. Used by
1038 /// regression tests that need to observe the behavioral fence (no
1039 /// timestamp at or below the prior leader's high-water) directly.
1040 #[doc(hidden)]
1041 pub fn try_grant_for_tests(&self, count: u32) -> Result<WindowGrant, CoreError> {
1042 self.core.try_grant(self.clock.now_ms(), count)
1043 }
1044}
1045
1046#[cfg(test)]
1047mod tests {
1048 use super::*;
1049
1050 #[test]
1051 fn panic_payload_to_string_recovers_static_str() {
1052 // `panic!("literal")` produces a `&'static str` payload; we want the
1053 // verbatim text so operators see what the watch task said.
1054 let payload: Box<dyn std::any::Any + Send> = Box::new("watch boom");
1055 assert_eq!(panic_payload_to_string(payload), "watch boom");
1056 }
1057
1058 #[test]
1059 fn panic_payload_to_string_recovers_owned_string() {
1060 // `panic!("{var}")` produces a `String` payload (formatted at panic
1061 // time); the helper must downcast that branch too.
1062 let payload: Box<dyn std::any::Any + Send> = Box::new(String::from("formatted"));
1063 assert_eq!(panic_payload_to_string(payload), "formatted");
1064 }
1065
1066 #[test]
1067 fn panic_payload_to_string_falls_back_for_other_types() {
1068 // Custom payloads (panic!(MyType { .. })) hit the catch-all branch.
1069 struct Custom;
1070 let payload: Box<dyn std::any::Any + Send> = Box::new(Custom);
1071 assert_eq!(
1072 panic_payload_to_string(payload),
1073 "watch task panicked with non-string payload",
1074 );
1075 }
1076
1077 #[test]
1078 fn serving_transitions_publish_through_core() {
1079 // The Server delegates serving-state transitions to its ServingCore; a
1080 // step_down on a freshly built Server lands as NotServing with the hint.
1081 // (The #346 send_replace-with-zero-receivers regression is pinned by the
1082 // ServingCore unit tests, which can observe `receiver_count` directly.)
1083 let server = Server::builder()
1084 .consensus_driver(Arc::new(crate::test_fakes::InMemoryDriver::new()))
1085 .build()
1086 .expect("build must succeed");
1087
1088 let hint = PeerEndpoint::try_from("new-leader:9000").unwrap();
1089 server.core.step_down(Some(hint.clone()), Some(Epoch(7)));
1090
1091 match server.core.serving_state() {
1092 ServingState::NotServing {
1093 leader_endpoint,
1094 leader_epoch,
1095 } => {
1096 assert_eq!(leader_endpoint, Some(hint));
1097 assert_eq!(leader_epoch, Some(Epoch(7)));
1098 }
1099 ServingState::Serving => panic!("expected NotServing after step_down"),
1100 }
1101 }
1102
1103 #[cfg(any(feature = "tls-rustls", feature = "tls-native"))]
1104 #[test]
1105 fn builder_stores_tls_config() {
1106 // The serve_* paths read `tls_config` from `Server` (not the builder)
1107 // after `into_router` consumes self — so the field must survive the
1108 // builder → Server hand-off, not just the builder method.
1109 use crate::test_fakes::InMemoryDriver;
1110
1111 let driver = Arc::new(InMemoryDriver::new());
1112 let cfg = tonic::transport::ServerTlsConfig::new();
1113 let server = Server::builder()
1114 .consensus_driver(driver)
1115 .tls_config(cfg)
1116 .build()
1117 .expect("build with tls_config must succeed");
1118 assert!(server.tls_config.is_some());
1119 }
1120
1121 #[test]
1122 fn build_rejects_zero_max_seq_count() {
1123 // max_seq_count(0) makes every positive `count` fail as
1124 // SeqCountTooLarge{max:0} (the count>=1 floor leaves no valid value) —
1125 // GetSeq is silently dead. build() must fail-fast rather than ship a
1126 // server where GetSeq can never succeed.
1127 // `Server` is not `Debug`, so match rather than `expect_err`.
1128 match Server::builder()
1129 .consensus_driver(Arc::new(crate::test_fakes::InMemoryDriver::new()))
1130 .max_seq_count(0)
1131 .build()
1132 {
1133 Err(BuildError::ZeroMaxSeqCount) => {}
1134 Err(other) => panic!("expected ZeroMaxSeqCount, got {other:?}"),
1135 Ok(_) => panic!("max_seq_count(0) must be rejected at build, but build succeeded"),
1136 }
1137 }
1138
1139 #[test]
1140 fn build_accepts_max_seq_count_of_one() {
1141 // 1 is the smallest usable cap: a single-ordinal block is valid, so the
1142 // floor and the cap coincide and GetSeq still works. Must build.
1143 Server::builder()
1144 .consensus_driver(Arc::new(crate::test_fakes::InMemoryDriver::new()))
1145 .max_seq_count(1)
1146 .build()
1147 .expect("max_seq_count(1) must build");
1148 }
1149
1150 #[tokio::test]
1151 async fn join_to_server_result_passes_through_clean_outcome() {
1152 // Ok(Ok(())) — task finished cleanly; forward verbatim.
1153 let handle = tokio::spawn(async { Ok::<(), ServerError>(()) });
1154 let join = handle.await;
1155 assert!(matches!(join_to_server_result(join), Ok(())));
1156 }
1157
1158 #[tokio::test]
1159 async fn join_to_server_result_forwards_inner_error() {
1160 // Ok(Err(e)) — task returned an error; forward it.
1161 let handle = tokio::spawn(async {
1162 Err::<(), ServerError>(ServerError::WatchPanic {
1163 payload: "synthetic".into(),
1164 bt: Bt::capture(),
1165 })
1166 });
1167 let join = handle.await;
1168 match join_to_server_result(join) {
1169 Err(ServerError::WatchPanic { payload, .. }) => assert_eq!(payload, "synthetic"),
1170 other => panic!("expected forwarded WatchPanic, got {other:?}"),
1171 }
1172 }
1173
1174 #[tokio::test]
1175 async fn join_to_server_result_translates_panic_to_watch_panic() {
1176 // Err(JoinError::is_panic) — task panicked; surface as WatchPanic with
1177 // the payload stringified by `panic_payload_to_string`.
1178 let handle = tokio::spawn(async {
1179 panic!("intentional");
1180 #[allow(unreachable_code)]
1181 Ok::<(), ServerError>(())
1182 });
1183 let join = handle.await;
1184 match join_to_server_result(join) {
1185 Err(ServerError::WatchPanic { payload, .. }) => {
1186 assert!(payload.contains("intentional"))
1187 }
1188 other => panic!("expected WatchPanic, got {other:?}"),
1189 }
1190 }
1191
1192 #[tokio::test]
1193 async fn join_to_server_result_treats_cancellation_as_clean_exit() {
1194 // Err(JoinError::is_cancelled) — caller aborted the task; we asked
1195 // for that, so map to Ok.
1196 let handle: tokio::task::JoinHandle<Result<(), ServerError>> =
1197 tokio::spawn(async { futures::future::pending().await });
1198 handle.abort();
1199 let join = handle.await;
1200 assert!(matches!(join_to_server_result(join), Ok(())));
1201 }
1202
1203 #[tokio::test]
1204 async fn combine_watch_and_drain_surfaces_drain_error_when_watch_ok() {
1205 // Watch ended cleanly but the graceful drain failed (port stolen,
1206 // resource exhaustion). The drain error must not be swallowed.
1207 let watch = tokio::spawn(async { Ok::<(), ServerError>(()) }).await;
1208 let drain: Result<(), ServerError> = Err(ServerError::WatchStreamClosed);
1209 assert!(matches!(
1210 combine_watch_and_drain(watch, drain),
1211 Err(ServerError::WatchStreamClosed)
1212 ));
1213 }
1214
1215 #[tokio::test]
1216 async fn combine_watch_and_drain_returns_ok_when_both_succeed() {
1217 // Watch clean, drain clean — the only fully-Ok outcome.
1218 let watch = tokio::spawn(async { Ok::<(), ServerError>(()) }).await;
1219 let drain: Result<(), ServerError> = Ok(());
1220 assert!(matches!(combine_watch_and_drain(watch, drain), Ok(())));
1221 }
1222
1223 #[tokio::test]
1224 async fn combine_watch_and_drain_prefers_watch_error_over_drain_error() {
1225 // Both failed. The watch error is the root cause (poisoned state was
1226 // already published), so it wins; the drain error is dropped.
1227 let watch = tokio::spawn(async {
1228 Err::<(), ServerError>(ServerError::WatchPanic {
1229 payload: "watch".into(),
1230 bt: Bt::capture(),
1231 })
1232 })
1233 .await;
1234 let drain: Result<(), ServerError> = Err(ServerError::WatchStreamClosed);
1235 match combine_watch_and_drain(watch, drain) {
1236 Err(ServerError::WatchPanic { payload, .. }) => assert_eq!(payload, "watch"),
1237 other => panic!("expected watch error to win, got {other:?}"),
1238 }
1239 }
1240
1241 #[tokio::test]
1242 async fn combine_watch_and_drain_returns_watch_error_when_drain_ok() {
1243 // Watch failed, drain succeeded — forward the watch error verbatim.
1244 let watch = tokio::spawn(async {
1245 Err::<(), ServerError>(ServerError::WatchPanic {
1246 payload: "watch".into(),
1247 bt: Bt::capture(),
1248 })
1249 })
1250 .await;
1251 let drain: Result<(), ServerError> = Ok(());
1252 match combine_watch_and_drain(watch, drain) {
1253 Err(ServerError::WatchPanic { payload, .. }) => assert_eq!(payload, "watch"),
1254 other => panic!("expected forwarded watch error, got {other:?}"),
1255 }
1256 }
1257
1258 #[tokio::test]
1259 async fn dropping_watch_guard_closes_serving_gate_synchronously() {
1260 // Regression: dropping the guard must close the serving gate at the
1261 // drop site, not on the watch task's later timeline. Build a guard whose
1262 // watch handle never touches the core (so the ONLY thing that can flip
1263 // the state to NotServing is `Drop`), publish `Serving`, then drop the
1264 // guard and read the state with NO await in between. On the current-thread
1265 // test runtime no other task can run between the synchronous `drop` and
1266 // the synchronous `serving_state` read, so observing `NotServing` proves
1267 // `Drop` closed the gate synchronously rather than the watch task.
1268 let core = Arc::new(ServingCore::new(
1269 Duration::from_secs(3),
1270 tsoracle_core::DEFAULT_MAX_SEQ_COUNT,
1271 ));
1272 core.publish_serving();
1273
1274 let handle: tokio::task::JoinHandle<Result<(), ServerError>> =
1275 tokio::spawn(async { Ok(()) });
1276 let (cancel_tx, _cancel_rx) = tokio::sync::oneshot::channel::<()>();
1277 let guard = WatchGuard {
1278 cancel_tx: Some(cancel_tx),
1279 handle: Some(handle),
1280 shutdown_grace: Duration::from_secs(10),
1281 core: core.clone(),
1282 reporter: Arc::new(crate::reporter::Reporter::new()),
1283 heartbeat_cancel_tx: None,
1284 heartbeat_handle: None,
1285 };
1286
1287 drop(guard);
1288
1289 assert!(
1290 matches!(core.serving_state(), ServingState::NotServing { .. }),
1291 "dropping the WatchGuard must close the serving gate synchronously",
1292 );
1293 }
1294
1295 #[tokio::test]
1296 async fn serve_inner_closes_serving_gate_on_user_shutdown() {
1297 // Regression for the serve path: when the caller's shutdown fires,
1298 // `serve_inner` drops the watch cancel sender and waits out the grace,
1299 // forcibly aborting the watch task if it overruns. A task parked upstream
1300 // of any cancel-observing await (modelled here by a never-resolving
1301 // future) is aborted before it can publish `NotServing`, so the gate
1302 // would stay open unless `serve_inner` closes it itself. Seed `Serving`,
1303 // run the user-shutdown arm with a zero grace (immediate abort), and
1304 // assert the gate is closed on return.
1305 let core = Arc::new(ServingCore::new(
1306 Duration::from_secs(3),
1307 tsoracle_core::DEFAULT_MAX_SEQ_COUNT,
1308 ));
1309 core.publish_serving();
1310
1311 let watch_handle: tokio::task::JoinHandle<Result<(), ServerError>> =
1312 tokio::spawn(async { futures::future::pending().await });
1313 let (watch_cancel_tx, _watch_cancel_rx) = tokio::sync::oneshot::channel::<()>();
1314 let (tonic_cancel_tx, _tonic_cancel_rx) = tokio::sync::oneshot::channel::<()>();
1315
1316 // A serve future that is immediately ready models the user's shutdown
1317 // having fired; with the biased select preferring the (pending) watch arm,
1318 // control reaches the user-shutdown arm.
1319 let serve_future = async { Ok::<(), tonic::transport::Error>(()) };
1320
1321 let result = serve_inner(
1322 watch_cancel_tx,
1323 watch_handle,
1324 None, // heartbeat_cancel_tx — heartbeat disabled in this regression test
1325 None, // heartbeat_handle
1326 serve_future,
1327 tonic_cancel_tx,
1328 Duration::from_millis(0),
1329 core.clone(),
1330 Arc::new(crate::reporter::Reporter::new()),
1331 )
1332 .await;
1333
1334 assert!(
1335 result.is_ok(),
1336 "user shutdown must return Ok, got {result:?}"
1337 );
1338 assert!(
1339 matches!(core.serving_state(), ServingState::NotServing { .. }),
1340 "serve_inner's user-shutdown arm must close the serving gate synchronously",
1341 );
1342 }
1343
1344 #[cfg(feature = "reflection")]
1345 #[test]
1346 fn build_reflection_service_accepts_embedded_descriptor_set() {
1347 // The descriptor set emitted by `tsoracle-proto`'s build.rs must decode
1348 // cleanly — this is the production happy path that previously sat behind
1349 // an `expect`.
1350 assert!(build_reflection_service(tsoracle_proto::FILE_DESCRIPTOR_SET).is_ok());
1351 }
1352
1353 #[cfg(feature = "reflection")]
1354 #[test]
1355 fn build_reflection_service_maps_corrupt_descriptor_to_typed_error() {
1356 // A descriptor set that fails to decode (build artifact drift) must
1357 // surface as a typed `ServerError::ReflectionInit`, not a panic. The
1358 // bytes below are not a valid encoded `FileDescriptorSet`.
1359 let corrupt = b"\xff\xff\xff\xff not a descriptor set";
1360 // The Ok variant wraps a reflection service that is not `Debug`, so map
1361 // to a unit result before asserting on the error variant.
1362 match build_reflection_service(corrupt).map(|_| ()) {
1363 Err(ServerError::ReflectionInit(_)) => {}
1364 other => panic!("expected ReflectionInit error, got {other:?}"),
1365 }
1366 }
1367}