tsoracle_server/server.rs
1//
2// ░▀█▀░█▀▀░█▀█░█▀▄░█▀█░█▀▀░█░░░█▀▀
3// ░░█░░▀▀█░█░█░█▀▄░█▀█░█░░░█░░░█▀▀
4// ░░▀░░▀▀▀░▀▀▀░▀░▀░▀░▀░▀▀▀░▀▀▀░▀▀▀
5//
6// tsoracle — Distributed Timestamp Oracle
7// https://www.tsoracle.rs
8//
9// Copyright (c) 2026 Prisma Risk
10//
11// Licensed under the Apache License, Version 2.0 (the "License");
12// you may not use this file except in compliance with the License.
13// You may obtain a copy of the License at
14//
15// https://www.apache.org/licenses/LICENSE-2.0
16//
17// Unless required by applicable law or agreed to in writing, software
18// distributed under the License is distributed on an "AS IS" BASIS,
19// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20// See the License for the specific language governing permissions and
21// limitations under the License.
22//
23
24use core::time::Duration;
25use std::future::Future;
26use std::net::SocketAddr;
27use std::sync::Arc;
28use tokio::sync::watch;
29use tonic::service::Routes;
30use tonic::transport::Server as TonicServer;
31use tsoracle_consensus::ConsensusDriver;
32#[cfg(any(test, feature = "test-fakes"))]
33use tsoracle_core::{CoreError, WindowGrant};
34use tsoracle_core::{Epoch, PeerEndpoint};
35use tsoracle_proto::v1::tso_service_server::TsoServiceServer;
36
37use crate::bt::Bt;
38use crate::clock::{Clock, SystemClock};
39use crate::service::TsoServiceImpl;
40use crate::serving_core::ServingCore;
41
42#[derive(Debug, thiserror::Error)]
43pub enum BuildError {
44 #[error("consensus_driver is required")]
45 MissingConsensusDriver,
46}
47
48#[derive(Debug, thiserror::Error)]
49pub enum ServerError {
50 #[error("transport: {0}")]
51 Transport(#[from] tonic::transport::Error),
52 #[error("consensus: {0}")]
53 Consensus(#[from] tsoracle_consensus::ConsensusError),
54 #[error("core: {0}")]
55 Core(#[from] tsoracle_core::CoreError),
56 /// The leader-watch task panicked. Distinct from a clean error return so
57 /// operators can tell "driver returned Err" (recoverable design) from
58 /// "task panicked" (programming bug).
59 #[error("leader-watch task panicked: {payload}{bt}")]
60 WatchPanic { payload: String, bt: Bt },
61 /// The consensus driver's `leadership_events()` stream ended cleanly while
62 /// the leader-watch task was running. The stream is contracted to live for
63 /// the life of the server, so its end is anomalous (driver shutdown, lost
64 /// session, etc.) — distinct from a `Consensus` error returned mid-fence.
65 /// The watch task publishes `ServingState::NotServing` before returning
66 /// this variant so embedders who never observe the [`WatchGuard`] still get
67 /// the documented fail-safe behavior.
68 #[error("consensus driver leadership stream closed")]
69 WatchStreamClosed,
70 /// The embedded protobuf descriptor set failed to decode while building the
71 /// gRPC reflection service. `tsoracle-proto`'s `build.rs` emits these bytes
72 /// from checked-in `.proto` sources, so a failure here signals build-artifact
73 /// drift (a corrupt or stale descriptor) rather than a runtime condition —
74 /// surfaced as a diagnosable startup error instead of a process panic.
75 #[cfg(feature = "reflection")]
76 #[error("failed to build gRPC reflection service from embedded descriptor set: {0}")]
77 ReflectionInit(#[source] tonic_reflection::server::Error),
78}
79
80#[derive(Clone, Debug)]
81pub enum ServingState {
82 NotServing {
83 leader_endpoint: Option<PeerEndpoint>,
84 leader_epoch: Option<Epoch>,
85 },
86 Serving,
87}
88
89/// Default bound on how long a graceful shutdown waits for the leader-watch
90/// task to stop cooperatively before forcibly aborting it. The abort is a
91/// last-resort safety net for a consensus driver whose `load_high_water` /
92/// `persist_high_water` is wedged (the trait places no latency bound; see
93/// [`ConsensusDriver`]). Chosen to sit comfortably under a typical Kubernetes
94/// `terminationGracePeriodSeconds` (30s) so the abort, the tonic drain, and
95/// process exit all complete before the kubelet escalates to SIGKILL.
96const DEFAULT_SHUTDOWN_GRACE: Duration = Duration::from_secs(10);
97
98pub struct ServerBuilder {
99 consensus: Option<Arc<dyn ConsensusDriver>>,
100 clock: Option<Arc<dyn Clock>>,
101 window_ahead: Duration,
102 failover_advance: Duration,
103 shutdown_grace: Duration,
104 heartbeat_interval: Duration,
105 #[cfg(any(feature = "tls-rustls", feature = "tls-native"))]
106 tls_config: Option<tonic::transport::ServerTlsConfig>,
107}
108
109impl Default for ServerBuilder {
110 fn default() -> Self {
111 ServerBuilder {
112 consensus: None,
113 clock: None,
114 window_ahead: Duration::from_secs(3),
115 failover_advance: Duration::from_secs(1),
116 shutdown_grace: DEFAULT_SHUTDOWN_GRACE,
117 heartbeat_interval: Duration::from_secs(10),
118 #[cfg(any(feature = "tls-rustls", feature = "tls-native"))]
119 tls_config: None,
120 }
121 }
122}
123
124impl ServerBuilder {
125 pub fn consensus_driver(mut self, driver: Arc<dyn ConsensusDriver>) -> Self {
126 self.consensus = Some(driver);
127 self
128 }
129 pub fn clock(mut self, clock: Arc<dyn Clock>) -> Self {
130 self.clock = Some(clock);
131 self
132 }
133 pub fn window_ahead(mut self, window_ahead: Duration) -> Self {
134 self.window_ahead = window_ahead;
135 self
136 }
137 pub fn failover_advance(mut self, failover_advance: Duration) -> Self {
138 self.failover_advance = failover_advance;
139 self
140 }
141
142 /// Bound on how long a graceful shutdown waits for the leader-watch task to
143 /// stop cooperatively before forcibly aborting it.
144 ///
145 /// On shutdown the server drops the watch task's cancel signal and waits for
146 /// it to publish `NotServing` and return. That wait is normally
147 /// near-instant, but the task observes cancellation only at its `select!`
148 /// boundaries — never inside a fence attempt. A consensus driver whose
149 /// `load_high_water` / `persist_high_water` never returns (the trait places
150 /// no latency bound) would otherwise park the task mid-fence and block
151 /// process exit indefinitely, leading to a SIGKILL on a Kubernetes drain.
152 /// Once `shutdown_grace` elapses the server aborts the task so exit always
153 /// makes progress. Set this comfortably below your deployment's
154 /// `terminationGracePeriodSeconds`. Defaults to 10s. A value of zero aborts
155 /// immediately without waiting for a cooperative stop.
156 pub fn shutdown_grace(mut self, shutdown_grace: Duration) -> Self {
157 self.shutdown_grace = shutdown_grace;
158 self
159 }
160
161 /// Interval between heartbeat log lines emitted at `target = "tsoracle::heartbeat"`.
162 /// Defaults to 10 seconds. Pass `Duration::ZERO` to disable the heartbeat task entirely.
163 ///
164 /// The heartbeat surfaces serving role, current epoch, requests served,
165 /// timestamps issued, and key error counters every interval — proof-of-life
166 /// for production deployments that may not have a metrics exporter installed.
167 ///
168 /// Requires `feature = "tracing"` to emit output; with `tracing` off the
169 /// setter is accepted but no task is spawned (no subscriber to log to).
170 pub fn heartbeat_interval(mut self, interval: Duration) -> Self {
171 self.heartbeat_interval = interval;
172 self
173 }
174
175 /// Configure TLS termination for this server. Applied inside
176 /// [`Server::serve`], [`Server::serve_with_shutdown`], and
177 /// [`Server::serve_with_listener`]. Not applied to [`Server::into_router`] —
178 /// embedders mounting tsoracle alongside their own services control TLS
179 /// on their own tonic builder.
180 #[cfg(any(feature = "tls-rustls", feature = "tls-native"))]
181 pub fn tls_config(mut self, cfg: tonic::transport::ServerTlsConfig) -> Self {
182 self.tls_config = Some(cfg);
183 self
184 }
185
186 pub fn build(self) -> Result<Server, BuildError> {
187 let consensus = self.consensus.ok_or(BuildError::MissingConsensusDriver)?;
188 let clock = self.clock.unwrap_or_else(|| Arc::new(SystemClock));
189 Ok(Server {
190 consensus,
191 clock,
192 window_ahead: self.window_ahead,
193 failover_advance: self.failover_advance,
194 shutdown_grace: self.shutdown_grace,
195 heartbeat_interval: self.heartbeat_interval,
196 core: Arc::new(ServingCore::new(self.window_ahead)),
197 reporter: Arc::new(crate::reporter::Reporter::new()),
198 #[cfg(any(feature = "tls-rustls", feature = "tls-native"))]
199 tls_config: self.tls_config,
200 })
201 }
202}
203
204pub struct Server {
205 pub(crate) consensus: Arc<dyn ConsensusDriver>,
206 pub(crate) clock: Arc<dyn Clock>,
207 pub(crate) window_ahead: Duration,
208 pub(crate) failover_advance: Duration,
209 /// Bound on the graceful-shutdown wait for the leader-watch task before a
210 /// forced abort. See [`ServerBuilder::shutdown_grace`].
211 pub(crate) shutdown_grace: Duration,
212 /// Interval between periodic heartbeat log lines. See [`ServerBuilder::heartbeat_interval`].
213 ///
214 /// The only reader is the `cfg(feature = "tracing")` spawn block in
215 /// `into_router_parts`; without `tracing` there is no subscriber to log to
216 /// and the spawn arm is compiled out, so the field is genuinely unread.
217 #[cfg_attr(not(feature = "tracing"), allow(dead_code))]
218 pub(crate) heartbeat_interval: Duration,
219 /// Owns the allocator, serving-state channel, and both extension locks, with
220 /// the lock-ordering and step-down invariants private behind its methods.
221 ///
222 /// Held behind an `Arc` so the leader-watch task, the gRPC service, and the
223 /// [`WatchGuard`] / [`serve_inner`] shutdown paths can all reach the same
224 /// core. The guard and the serve loop use their clone to close the serving
225 /// gate *synchronously* at shutdown, leaving the watch task's later
226 /// `step_down` a harmless idempotent repeat.
227 pub(crate) core: Arc<ServingCore>,
228 pub(crate) reporter: Arc<crate::reporter::Reporter>,
229 #[cfg(any(feature = "tls-rustls", feature = "tls-native"))]
230 pub(crate) tls_config: Option<tonic::transport::ServerTlsConfig>,
231}
232
233/// Raw parts produced by [`Server::into_router_parts`]: the gRPC `Routes`, the
234/// leader-watch task's cooperative-cancel sender (dropping it stops the task),
235/// the task's join handle, and the optional heartbeat task's cancel sender /
236/// join handle. [`Server::into_router`] wraps these into a [`WatchGuard`]; the
237/// `serve_*` methods consume them directly via [`serve_inner`].
238///
239/// The heartbeat fields are `None` when the heartbeat is disabled — either by
240/// `ServerBuilder::heartbeat_interval(Duration::ZERO)` or by building without
241/// the `tracing` feature (there is no subscriber to log to).
242pub(crate) struct RouterParts {
243 pub routes: Routes,
244 pub cancel_tx: tokio::sync::oneshot::Sender<()>,
245 pub watch_handle: tokio::task::JoinHandle<Result<(), ServerError>>,
246 pub heartbeat_cancel_tx: Option<tokio::sync::oneshot::Sender<()>>,
247 pub heartbeat_handle: Option<tokio::task::JoinHandle<()>>,
248}
249
250impl Server {
251 pub fn builder() -> ServerBuilder {
252 ServerBuilder::default()
253 }
254
255 /// Subscribe to serving-state transitions.
256 ///
257 /// Returns a fresh `watch::Receiver` observing the same `ServingState`
258 /// the server publishes as leadership comes and goes. Embedders use this
259 /// to gate their own startup on `ServingState::Serving` (see the
260 /// `embedded_router` and piggyback examples). Because `into_router`
261 /// consumes the `Server`, capture the receiver before mounting.
262 ///
263 /// This method is the stable observation API: the receiver is minted from
264 /// the server's `watch::Sender`, so the receiver's type can evolve (e.g. a
265 /// future newtype around `ServingState`) without breaking embedders that
266 /// go through it.
267 pub fn subscribe(&self) -> watch::Receiver<ServingState> {
268 self.core.subscribe()
269 }
270}
271
272impl Server {
273 /// Return the configured `TsoServiceServer<TsoServiceImpl>` as a tonic
274 /// `Routes` value plus a [`WatchGuard`] for the spawned leader-watch task,
275 /// so callers can mount tsoracle's service alongside their own services
276 /// on a shared tonic listener instead of binding a dedicated port.
277 ///
278 /// The returned [`WatchGuard`] owns the leader-watch task. **Keep it alive
279 /// for as long as the mounted `Routes` should serve**: the watch task holds
280 /// an `Arc<Server>` (and the consensus driver) and maintains serving state
281 /// across leadership transitions. Dropping the guard — or calling
282 /// [`WatchGuard::shutdown`] — cooperatively stops the task at the embedder's
283 /// own shutdown. Without the guard the task would keep `Arc<Server>` alive
284 /// until the leadership stream happened to close.
285 ///
286 /// Every termination of the task — cooperative cancellation, driver error,
287 /// panic, or clean EOF on the leadership stream (surfaced as
288 /// `ServerError::WatchStreamClosed`) — publishes
289 /// `ServingState::NotServing { leader_endpoint: None }` before returning, so
290 /// all subsequent RPCs fail fast with `FAILED_PRECONDITION`. Embedders who
291 /// drop the guard without awaiting still get fail-safe behavior.
292 ///
293 /// The `Server::serve()` method is a thin wrapper over this — it calls
294 /// `into_router`, builds a tonic `Server`, and binds a listener.
295 ///
296 /// Returns `Err(ServerError::ReflectionInit)` (only reachable under the
297 /// `reflection` feature) if the embedded descriptor set fails to decode.
298 /// That decode happens before the leader-watch task is spawned, so a failure
299 /// leaves nothing running to clean up.
300 pub fn into_router(self) -> Result<(Routes, WatchGuard), ServerError> {
301 // Read the `Copy` grace before `into_router_parts` consumes `self`, so
302 // the returned guard can bound its own shutdown wait identically to the
303 // `serve_*` paths.
304 let shutdown_grace = self.shutdown_grace;
305 // Clone the shared core and reporter before `into_router_parts` consumes
306 // `self`, so the guard can close the serving gate synchronously on drop /
307 // shutdown rather than relying on the watch task's later publish, and can
308 // record the shutdown_watch_aborted counter if the grace-bounded reap fires.
309 let core = self.core.clone();
310 let reporter = self.reporter.clone();
311 let parts = self.into_router_parts()?;
312 Ok((
313 parts.routes,
314 WatchGuard {
315 cancel_tx: Some(parts.cancel_tx),
316 handle: Some(parts.watch_handle),
317 shutdown_grace,
318 core,
319 reporter,
320 heartbeat_cancel_tx: parts.heartbeat_cancel_tx,
321 heartbeat_handle: parts.heartbeat_handle,
322 },
323 ))
324 }
325
326 /// Spawn the leader-watch task and assemble the gRPC `Routes`, returning
327 /// the raw parts: the routes, the task's cooperative-cancel sender, and its
328 /// `JoinHandle`. [`Self::into_router`] wraps these into a [`WatchGuard`] for
329 /// embedders; the `serve_*` methods drive the parts directly via
330 /// [`serve_inner`], so neither path needs to unwrap the guard's `Option`
331 /// fields.
332 fn into_router_parts(self) -> Result<RouterParts, ServerError> {
333 // Build the reflection service first: a descriptor-decode failure must
334 // surface before we spawn the leader-watch task below, so an error path
335 // never leaks a running task.
336 #[cfg(feature = "reflection")]
337 let reflection = build_reflection_service(tsoracle_proto::FILE_DESCRIPTOR_SET)?;
338
339 let server = Arc::new(self);
340
341 // Cooperative cancellation channel. The `WatchGuard` holds the sender;
342 // the task's `cancel` future resolves on either an explicit send or a
343 // sender drop, so dropping the guard is sufficient to stop the task.
344 let (cancel_tx, cancel_rx) = tokio::sync::oneshot::channel::<()>();
345
346 let watch_server = server.clone();
347 let watch_handle = tokio::spawn(async move {
348 use futures::FutureExt;
349 // Resolves when the WatchGuard signals cancellation or is dropped.
350 let cancel = async move {
351 let _ = cancel_rx.await;
352 };
353 // catch_unwind so a panic in run_leader_watch still routes through
354 // the poisoning path. Without this, embedders who mount into_router
355 // directly and never observe the guard would see
356 // ServingState::Serving remain published while the watch task is
357 // dead — the inverse of the fail-safe guarantee documented above.
358 // The panic is re-raised after poisoning so serve / serve_with_*
359 // continue to translate it into ServerError::WatchPanic via
360 // join_to_server_result.
361 let outcome = std::panic::AssertUnwindSafe(crate::fence::run_leader_watch(
362 watch_server.clone(),
363 cancel,
364 ))
365 .catch_unwind()
366 .await;
367 match outcome {
368 Ok(result) => {
369 if let Err(ref _e) = result {
370 // Poison BEFORE returning so embedders who do not observe
371 // the guard still get fail-safe behavior.
372 watch_server.core.step_down(None, None);
373 #[cfg(feature = "tracing")]
374 tracing::error!(error = %_e, "leader-watch terminated; serving disabled");
375 }
376 result
377 }
378 Err(panic_payload) => {
379 // Mirror the Err branch: poison BEFORE re-raising so
380 // guard-dropping embedders still observe NotServing.
381 watch_server.core.step_down(None, None);
382 #[cfg(feature = "tracing")]
383 tracing::error!("leader-watch panicked; serving disabled");
384 std::panic::resume_unwind(panic_payload);
385 }
386 }
387 });
388
389 // Spawn the heartbeat task, if enabled. Gated on `feature = "tracing"`
390 // because the heartbeat module is only compiled with `tracing`
391 // (no subscriber to emit to without it) — and on a non-zero interval,
392 // since `Duration::ZERO` is the documented opt-out.
393 //
394 // The task body is wrapped in `AssertUnwindSafe(...).catch_unwind()`
395 // mirroring the leader-watch spawn above: on panic we bump the
396 // `heartbeat_task_panicked` counter and log at error level, then let
397 // the task end (no restart — the heartbeat is observability, not
398 // correctness, so a panicked task must not be allowed to thrash).
399 let (heartbeat_cancel_tx, heartbeat_handle) = {
400 #[cfg(feature = "tracing")]
401 {
402 if server.heartbeat_interval.is_zero() {
403 (None, None)
404 } else {
405 use futures::FutureExt;
406 let (htx, hrx) = tokio::sync::oneshot::channel::<()>();
407 let hb_reporter = server.reporter.clone();
408 let hb_core = server.core.clone();
409 let hb_interval = server.heartbeat_interval;
410 let handle = tokio::spawn(async move {
411 let outcome =
412 std::panic::AssertUnwindSafe(crate::heartbeat::run_heartbeat(
413 hb_interval,
414 hb_core,
415 hb_reporter.clone(),
416 hrx,
417 ))
418 .catch_unwind()
419 .await;
420 if outcome.is_err() {
421 hb_reporter.heartbeat_task_panicked.increment(1);
422 tracing::error!(
423 target: "tsoracle::heartbeat",
424 "heartbeat task panicked; liveness logs disabled until restart"
425 );
426 }
427 });
428 (Some(htx), Some(handle))
429 }
430 }
431 #[cfg(not(feature = "tracing"))]
432 {
433 (None, None)
434 }
435 };
436
437 let service = TsoServiceImpl { server };
438 #[allow(unused_mut)]
439 let mut routes = Routes::new(TsoServiceServer::new(service));
440 #[cfg(feature = "reflection")]
441 {
442 routes = routes.add_service(reflection);
443 }
444 Ok(RouterParts {
445 routes,
446 cancel_tx,
447 watch_handle,
448 heartbeat_cancel_tx,
449 heartbeat_handle,
450 })
451 }
452
453 pub async fn serve(self, addr: SocketAddr) -> Result<(), ServerError> {
454 self.serve_with_shutdown(addr, futures::future::pending())
455 .await
456 }
457
458 /// Run the gRPC server until either the caller's `shutdown` fires or the
459 /// leader-watch task terminates.
460 ///
461 /// Three outcomes:
462 /// 1. `shutdown` fires first → tonic drains in-flights and returns Ok.
463 /// The watch task is then stopped cooperatively, bounded by
464 /// `shutdown_grace` and forcibly aborted if it overruns (e.g. parked in a
465 /// wedged consensus-driver call); any error it had been about to return
466 /// is forfeited (the process is shutting down anyway).
467 /// 2. Watch returns `Ok(Err(e))` → poisoned state is already published;
468 /// `cancel_tx` triggers tonic's graceful shutdown; in-flight `GetTs`
469 /// calls whose `try_grant` already succeeded complete with the
470 /// timestamps they were allocated; new calls fail fast. Returns `Err(e)`
471 /// — the watch error wins even if the drain itself also errors (see
472 /// `combine_watch_and_drain`); a drain error is surfaced only when the
473 /// watch ended cleanly.
474 /// 3. Watch task panics → returns `Err(ServerError::WatchPanic{..})`
475 /// with the panic payload stringified. Same drain semantics as (2).
476 pub async fn serve_with_shutdown(
477 self,
478 addr: SocketAddr,
479 shutdown: impl Future<Output = ()> + Send + 'static,
480 ) -> Result<(), ServerError> {
481 #[cfg(any(feature = "tls-rustls", feature = "tls-native"))]
482 let tls_config = self.tls_config.clone();
483
484 // Read the `Copy` grace and clone the shared core and reporter before
485 // `into_router_parts` consumes `self`.
486 let shutdown_grace = self.shutdown_grace;
487 let core = self.core.clone();
488 let reporter = self.reporter.clone();
489 let parts = self.into_router_parts()?;
490 let (combined_shutdown, cancel_tx) = combined_shutdown_with_cancel(shutdown);
491
492 let mut tonic = TonicServer::builder();
493 #[cfg(any(feature = "tls-rustls", feature = "tls-native"))]
494 if let Some(cfg) = tls_config {
495 tonic = tonic.tls_config(cfg).map_err(ServerError::Transport)?;
496 }
497 let serve = tonic
498 .add_routes(parts.routes)
499 .serve_with_shutdown(addr, combined_shutdown);
500
501 serve_inner(
502 parts.cancel_tx,
503 parts.watch_handle,
504 parts.heartbeat_cancel_tx,
505 parts.heartbeat_handle,
506 serve,
507 cancel_tx,
508 shutdown_grace,
509 core,
510 reporter,
511 )
512 .await
513 }
514
515 /// Run the gRPC server on a caller-provided `TcpListener` until either
516 /// the caller-provided `shutdown` fires or the leader-watch task terminates.
517 ///
518 /// Use this instead of [`Self::serve_with_shutdown`] when you need to
519 /// observe the OS-picked port (`127.0.0.1:0`) before clients connect, or
520 /// when you want to wrap the listener in an outer adapter before passing it
521 /// in. The listening socket is owned by the caller and passed here; tsoracle
522 /// starts accepting on it immediately.
523 ///
524 /// Three outcomes:
525 /// 1. `shutdown` fires first → tonic drains in-flights and returns `Ok`.
526 /// The watch handle is aborted; any error it had been about to return
527 /// is forfeited (the process is shutting down anyway).
528 /// 2. Watch returns `Ok(Err(e))` → poisoned state is already published;
529 /// the caller-provided shutdown is cancelled internally so tonic begins
530 /// graceful shutdown; in-flight `GetTs` calls whose `try_grant` already
531 /// succeeded complete with the timestamps they were allocated; new calls
532 /// fail fast. Returns `Err(e)` — the watch error wins even if the drain
533 /// itself also errors (see `combine_watch_and_drain`); a drain error is
534 /// surfaced only when the watch ended cleanly.
535 /// 3. Watch task panics → returns `Err(ServerError::WatchPanic{..})`
536 /// with the panic payload stringified. Same drain semantics as (2).
537 pub async fn serve_with_listener(
538 self,
539 listener: tokio::net::TcpListener,
540 shutdown: impl Future<Output = ()> + Send + 'static,
541 ) -> Result<(), ServerError> {
542 #[cfg(any(feature = "tls-rustls", feature = "tls-native"))]
543 let tls_config = self.tls_config.clone();
544
545 // Read the `Copy` grace and clone the shared core and reporter before
546 // `into_router_parts` consumes `self`.
547 let shutdown_grace = self.shutdown_grace;
548 let core = self.core.clone();
549 let reporter = self.reporter.clone();
550 let parts = self.into_router_parts()?;
551 let (combined_shutdown, cancel_tx) = combined_shutdown_with_cancel(shutdown);
552
553 let incoming = tonic::transport::server::TcpIncoming::from(listener);
554
555 let mut tonic = TonicServer::builder();
556 #[cfg(any(feature = "tls-rustls", feature = "tls-native"))]
557 if let Some(cfg) = tls_config {
558 tonic = tonic.tls_config(cfg).map_err(ServerError::Transport)?;
559 }
560 let serve = tonic
561 .add_routes(parts.routes)
562 .serve_with_incoming_shutdown(incoming, combined_shutdown);
563
564 serve_inner(
565 parts.cancel_tx,
566 parts.watch_handle,
567 parts.heartbeat_cancel_tx,
568 parts.heartbeat_handle,
569 serve,
570 cancel_tx,
571 shutdown_grace,
572 core,
573 reporter,
574 )
575 .await
576 }
577}
578
579/// RAII handle to the leader-watch task spawned by [`Server::into_router`].
580///
581/// The watch task holds an `Arc<Server>` (and thus the consensus driver) and
582/// maintains serving state across leadership transitions. This guard ties the
583/// task's lifetime to the guard's: dropping it cooperatively cancels the task,
584/// and the task publishes [`ServingState::NotServing`] before it stops, so any
585/// `Routes` an embedder still has mounted fails subsequent RPCs fast.
586///
587/// Cancellation is cooperative — the task stops at its next await boundary and
588/// never mid-fence, so it is never torn down while holding internal locks, in
589/// contrast to a raw [`tokio::task::JoinHandle::abort`].
590pub struct WatchGuard {
591 // `Option` so `Drop` and the consuming `shutdown` / `abort` methods can
592 // each take a field without a partial-move conflict against the `Drop`
593 // impl. Dropping the sender (rather than sending) is itself the cancel
594 // signal: the task's `cancel` future resolves on sender-drop too.
595 cancel_tx: Option<tokio::sync::oneshot::Sender<()>>,
596 handle: Option<tokio::task::JoinHandle<Result<(), ServerError>>>,
597 /// Bound on the cooperative-stop wait in [`Self::shutdown`] before a forced
598 /// abort. Inherited from [`ServerBuilder::shutdown_grace`].
599 shutdown_grace: Duration,
600 /// Shared serving core, cloned from the `Server`. Lets `Drop` (and the
601 /// consuming `shutdown` / `abort`, which trigger `Drop` on return) close the
602 /// serving gate synchronously at the drop site, instead of waiting for the
603 /// watch task to observe cancellation and publish `NotServing` on its own
604 /// timeline — a window during which the fast gate would still admit RPCs.
605 core: Arc<ServingCore>,
606 /// Metrics reporter, cloned from the `Server`. Used to record the
607 /// `shutdown_watch_aborted` counter if the grace-bounded reap fires.
608 reporter: Arc<crate::reporter::Reporter>,
609 /// Cooperative-cancel sender for the heartbeat task. `None` when the
610 /// heartbeat is disabled (interval == 0 or built without `tracing`).
611 /// Dropping the sender resolves the task's cancel future.
612 heartbeat_cancel_tx: Option<tokio::sync::oneshot::Sender<()>>,
613 /// Join handle for the heartbeat task. `None` when the heartbeat is
614 /// disabled. Output is `()` because the task never returns an error —
615 /// panics are caught inside the task body and recorded via the
616 /// `heartbeat_task_panicked` counter.
617 heartbeat_handle: Option<tokio::task::JoinHandle<()>>,
618}
619
620impl WatchGuard {
621 /// Signal the leader-watch task to stop, wait for it to drain, and report
622 /// its outcome.
623 ///
624 /// A cooperatively cancelled task returns `Ok(())` — the stop was
625 /// requested, so it is not an error. If the task had already terminated on
626 /// its own (driver error, stream EOF, or panic) the original outcome is
627 /// surfaced verbatim: `Err(e)` or [`ServerError::WatchPanic`]. Either way
628 /// serving state is `NotServing` once this returns.
629 ///
630 /// The cooperative wait is bounded by the configured
631 /// [`ServerBuilder::shutdown_grace`]: if the task is parked in a
632 /// consensus-driver call that never returns it is aborted once the grace
633 /// elapses (still reported as `Ok(())`), so an embedder's shutdown can never
634 /// wedge behind a hung driver.
635 pub async fn shutdown(mut self) -> Result<(), ServerError> {
636 // Dropping the senders fires each task's cancel future. The heartbeat
637 // task is reaped first because it is bounded by `tokio::time::sleep`
638 // (cooperative stop is fast and never wedges on a driver call), so its
639 // reap returns quickly and leaves the grace budget for the watch task
640 // — which may be parked in a wedged consensus-driver call.
641 self.heartbeat_cancel_tx.take();
642 self.cancel_tx.take();
643 if let Some(mut hb_handle) = self.heartbeat_handle.take() {
644 match tokio::time::timeout(self.shutdown_grace, &mut hb_handle).await {
645 Ok(Ok(())) => {}
646 // Task panicked — already counted + logged via catch_unwind in
647 // the task body. Nothing more to do here.
648 Ok(Err(_join_err)) => {}
649 // Grace overrun — sleep + select! should always observe a
650 // dropped cancel sender, so this is a backstop. Abort and
651 // reap; no separate metric (the heartbeat is observability
652 // only — its lateness is not a serving correctness signal).
653 Err(_elapsed) => {
654 hb_handle.abort();
655 let _ = (&mut hb_handle).await;
656 }
657 }
658 }
659 match self.handle.take() {
660 Some(mut handle) => join_to_server_result(
661 await_watch_within_grace(&mut handle, self.shutdown_grace, &self.reporter).await,
662 ),
663 None => Ok(()),
664 }
665 }
666
667 /// Hard-abort the leader-watch task without waiting for a cooperative stop.
668 ///
669 /// Prefer [`Self::shutdown`] or simply dropping the guard; both let the
670 /// task stop at a safe point. This is an escape hatch for callers that
671 /// cannot await and accept that the task may be torn down mid-fence.
672 pub fn abort(mut self) {
673 if let Some(handle) = self.handle.take() {
674 handle.abort();
675 }
676 // Hard-abort the heartbeat task too — leaving it running after the
677 // watch is torn down would publish heartbeats describing a stale
678 // (typically `NotServing`) view until the Arc<Reporter> is dropped.
679 if let Some(hb_handle) = self.heartbeat_handle.take() {
680 hb_handle.abort();
681 }
682 }
683
684 /// Whether the leader-watch task has finished — terminated for any reason
685 /// (cooperative cancel, driver error, stream EOF, or panic).
686 ///
687 /// A read-only liveness probe that neither consumes the guard nor disturbs
688 /// its cancel-on-drop behavior, so an embedder can poll task health while
689 /// keeping the guard alive.
690 pub fn is_finished(&self) -> bool {
691 self.handle
692 .as_ref()
693 .is_none_or(|handle| handle.is_finished())
694 }
695}
696
697impl Drop for WatchGuard {
698 fn drop(&mut self) {
699 // Close the serving gate synchronously, here at the drop site. Dropping
700 // the cancel sender below only *requests* the watch task to stop; the
701 // task publishes `NotServing` later, on its own timeline. Between this
702 // drop and the task's next poll the fast gate would still read `Serving`
703 // (and the allocator would still grant), so an RPC could be admitted by a
704 // server that has already been told to stop serving. `step_down` clears
705 // the allocator and publishes `NotServing` now, before any await; the
706 // watch task's later `step_down(None, None)` on cooperative cancel (see
707 // `fence::run_leader_watch`) republishes the identical state, so the
708 // double-close is harmless and idempotent.
709 self.core.step_down(None, None);
710 // Dropping the sender (if `shutdown` / `abort` did not already take it)
711 // resolves the task's cancel future; the task then publishes
712 // `NotServing` and returns. The `JoinHandle` is dropped here too,
713 // detaching the task to finish its cooperative shutdown on its own.
714 self.cancel_tx.take();
715 // Same treatment for the heartbeat task. `Drop` is sync so we cannot
716 // await the cooperative stop; instead we drop the cancel sender (the
717 // task will observe it at its next select! boundary) and hard-abort
718 // the handle so it cannot outlive the guard and publish heartbeats
719 // describing a stale view if the runtime keeps the Arc alive.
720 self.heartbeat_cancel_tx.take();
721 if let Some(hb_handle) = self.heartbeat_handle.take() {
722 hb_handle.abort();
723 }
724 }
725}
726
727/// Merge the caller's `shutdown` future with an internal cancellation signal.
728///
729/// Both [`Server::serve_with_shutdown`] and [`Server::serve_with_listener`]
730/// need tonic to stop when EITHER the caller's `shutdown` fires OR the
731/// leader-watch task terminates (signalled by firing the returned
732/// `oneshot::Sender`). This builds that merged shutdown future and hands back
733/// the sender so [`serve_inner`] can trip it from the watch arm.
734fn combined_shutdown_with_cancel(
735 shutdown: impl Future<Output = ()> + Send + 'static,
736) -> (
737 impl Future<Output = ()> + Send + 'static,
738 tokio::sync::oneshot::Sender<()>,
739) {
740 let (cancel_tx, cancel_rx) = tokio::sync::oneshot::channel::<()>();
741 let combined_shutdown = async move {
742 tokio::select! {
743 _ = shutdown => {}
744 _ = cancel_rx => {}
745 }
746 };
747 (combined_shutdown, cancel_tx)
748}
749
750/// Wait for the leader-watch task to stop cooperatively, but no longer than
751/// `grace`, then forcibly abort it if it is still running.
752///
753/// The watch task observes its cancel signal only at the `select!` boundaries
754/// in [`crate::fence::run_leader_watch`], never inside a fence attempt. A
755/// consensus driver whose `load_high_water` / `persist_high_water` never
756/// returns (the trait places no latency bound; see
757/// [`tsoracle_consensus::ConsensusDriver`]) therefore parks the task upstream
758/// of any cancel-observing await, so dropping the cancel sender cannot stop it.
759/// Left unbounded, the shutdown wait would block process exit until the kubelet
760/// escalates to SIGKILL on a drain. Bounding the wait by `grace` and aborting
761/// on expiry guarantees forward progress: `tokio` tears a suspended task (and
762/// the wedged driver future it holds) down at the abort, dropping its
763/// drain-barrier guard.
764///
765/// Returns the task's join result. A clean cooperative stop forwards its real
766/// outcome verbatim; an aborted task surfaces as a cancelled `JoinError`, which
767/// [`join_to_server_result`] maps to `Ok(())` — the stop was requested, so a
768/// forced abort during shutdown is not an error. A `grace` of zero aborts
769/// immediately (the `timeout` future is already elapsed on first poll).
770async fn await_watch_within_grace(
771 watch_handle: &mut tokio::task::JoinHandle<Result<(), ServerError>>,
772 grace: Duration,
773 reporter: &Arc<crate::reporter::Reporter>,
774) -> Result<Result<(), ServerError>, tokio::task::JoinError> {
775 match tokio::time::timeout(grace, &mut *watch_handle).await {
776 Ok(join_result) => join_result,
777 Err(_elapsed) => {
778 reporter.shutdown_watch_aborted.increment(1);
779 #[cfg(feature = "tracing")]
780 tracing::warn!(
781 grace_ms = grace.as_millis() as u64,
782 "leader-watch task did not stop within the shutdown grace; aborting it (a consensus-driver call likely exceeded its latency bound)"
783 );
784 watch_handle.abort();
785 // Reap the aborted task so its Drop (releasing any held drain-barrier
786 // guard) runs before we report shutdown complete. Bounded: an aborted
787 // task resolves at its next poll.
788 (&mut *watch_handle).await
789 }
790 }
791}
792
793/// Drive the gRPC `serve_future` against the leader-watch task, shared by
794/// [`Server::serve_with_shutdown`] and [`Server::serve_with_listener`].
795///
796/// The two public methods differ only in how `serve_future` is assembled
797/// (address-bound via `serve_with_shutdown` vs listener-bound via
798/// `serve_with_incoming_shutdown`); everything downstream — the biased select,
799/// the cooperative-cancel path, and the drain/translate logic — is identical
800/// and lives here so a future change need only be made once.
801///
802/// `tonic_cancel_tx` is the cancellation half paired with the `serve_future`'s
803/// shutdown signal (see [`combined_shutdown_with_cancel`]); firing it begins
804/// tonic's graceful drain when the watch task terminates first. `watch_cancel_tx`
805/// is the leader-watch task's own cooperative-cancel sender (the same one a
806/// [`WatchGuard`] holds for embedders); dropping it stops the task. Taking the
807/// raw parts rather than a `WatchGuard` keeps this path free of the guard's
808/// `Option` fields — neither the watch handle nor the cancel sender is optional
809/// here. `shutdown_grace` bounds the user-shutdown arm's wait for the watch task
810/// (see [`await_watch_within_grace`]). `core` is the shared serving core (the
811/// same one the watch task and the gRPC service hold): the user-shutdown arm
812/// closes the gate on it synchronously so no RPC is admitted in the window
813/// before the watch task observes cancellation and publishes `NotServing`.
814// Private serve helper. The wide signature is the cost of being the single
815// merge point for the two public `serve_*` paths and the leader-watch +
816// heartbeat task pair: bundling these into a struct just to placate clippy
817// would obscure the lifecycle (every parameter is consumed exactly once and
818// has no shared identity worth naming). Keep the arguments visible.
819#[allow(clippy::too_many_arguments)]
820async fn serve_inner<S>(
821 watch_cancel_tx: tokio::sync::oneshot::Sender<()>,
822 mut watch_handle: tokio::task::JoinHandle<Result<(), ServerError>>,
823 heartbeat_cancel_tx: Option<tokio::sync::oneshot::Sender<()>>,
824 heartbeat_handle: Option<tokio::task::JoinHandle<()>>,
825 serve_future: S,
826 tonic_cancel_tx: tokio::sync::oneshot::Sender<()>,
827 shutdown_grace: Duration,
828 core: Arc<ServingCore>,
829 reporter: Arc<crate::reporter::Reporter>,
830) -> Result<(), ServerError>
831where
832 S: Future<Output = Result<(), tonic::transport::Error>>,
833{
834 tokio::pin!(serve_future);
835
836 let outcome = tokio::select! {
837 // Bias toward the watch arm: if both are ready in the same poll
838 // (rare but possible — graceful shutdown completed in the same
839 // tick the watch returned), we want to surface the watch error
840 // rather than report a clean shutdown.
841 biased;
842
843 watch_result = &mut watch_handle => {
844 // Watch terminated. State is already poisoned (see watch
845 // task body in into_router). Trigger tonic drain, wait for
846 // it to finish, then report the watch's outcome — preferring
847 // it over any drain error, which surfaces only if the watch
848 // itself ended cleanly.
849 let _ = tonic_cancel_tx.send(());
850 let drain_result = serve_future.await;
851 combine_watch_and_drain(watch_result, drain_result)
852 }
853 serve_result = &mut serve_future => {
854 // User shutdown fired (or our cancel — but watch arm has
855 // `biased` priority, so reaching here means user shutdown).
856 // Prefer a cooperative stop: dropping the cancel sender resolves
857 // the task's cancel future so it stops at its next `select!`
858 // boundary, having published `NotServing` and never torn down
859 // mid-fence while holding `extension_gate.write()`. But a
860 // cooperative stop is only observed at those boundaries, never
861 // inside a fence attempt — a consensus-driver call that never
862 // returns (the trait places no latency bound) would park the task
863 // upstream of any cancel point and block process exit until a
864 // kubelet SIGKILL. `await_watch_within_grace` therefore bounds the
865 // wait by `shutdown_grace` and aborts the task if it overruns. The
866 // task's own outcome (a clean `Ok(())` on cooperative stop, a
867 // cancelled `JoinError` on abort) is discarded; the user-requested
868 // shutdown result wins.
869 //
870 // Close the serving gate synchronously first: dropping the sender
871 // only *requests* the stop, and a task aborted on grace expiry (or
872 // simply not yet rescheduled) may never reach its own `step_down`. So
873 // a `GetTs` arriving during the drain would still be admitted unless
874 // we close the gate here. `step_down` is idempotent with the watch
875 // task's own cooperative-cancel publish.
876 core.step_down(None, None);
877 drop(watch_cancel_tx);
878 let _ = await_watch_within_grace(&mut watch_handle, shutdown_grace, &reporter).await;
879 serve_result?;
880 Ok(())
881 }
882 };
883
884 // Stop the heartbeat task on every exit path. Done after the watch reap so
885 // the watch-arm `combine_watch_and_drain` already saw the watch outcome,
886 // and the user-shutdown arm has finished its grace-bounded wait. Dropping
887 // the cancel sender breaks the task's `tokio::select! { biased; cancel,
888 // sleep }` loop on the next poll; if the task is wedged for any reason we
889 // hard-abort on grace overrun. The task's outcome is observability only
890 // and cannot influence serving correctness, so its join result is dropped.
891 drop(heartbeat_cancel_tx);
892 if let Some(mut hb_handle) = heartbeat_handle {
893 match tokio::time::timeout(shutdown_grace, &mut hb_handle).await {
894 Ok(Ok(())) => {}
895 Ok(Err(_join_err)) => {} // panic — already counted via catch_unwind
896 Err(_elapsed) => {
897 hb_handle.abort();
898 let _ = (&mut hb_handle).await;
899 }
900 }
901 }
902
903 outcome
904}
905
906/// Convert a `JoinHandle` result into a `ServerError`-typed result.
907///
908/// - `Ok(Ok(()))` — cooperative cancellation: `run_leader_watch` observed its
909/// cancel signal (the `WatchGuard` was dropped, `WatchGuard::shutdown` was
910/// called, or `serve_inner` cancelled it on user shutdown), published
911/// `NotServing`, and returned cleanly. Forwarded verbatim as `Ok(())`.
912/// - `Ok(Err(e))` — task returned an error (including `WatchStreamClosed`
913/// from a clean EOF). Forward verbatim.
914/// - `Err(JoinError)` — task was aborted or panicked. An abort
915/// (`WatchGuard::abort` or `JoinHandle::abort`) maps to Ok (we asked for it);
916/// a panic maps to `WatchPanic` with payload.
917fn join_to_server_result(
918 join_result: Result<Result<(), ServerError>, tokio::task::JoinError>,
919) -> Result<(), ServerError> {
920 match join_result {
921 Ok(inner) => inner,
922 Err(join_err) if join_err.is_panic() => {
923 let payload = panic_payload_to_string(join_err.into_panic());
924 Err(ServerError::WatchPanic {
925 payload,
926 bt: Bt::capture(),
927 })
928 }
929 Err(_cancelled) => Ok(()),
930 }
931}
932
933/// Combine the leader-watch outcome with the tonic graceful-drain outcome
934/// after the watch arm fired.
935///
936/// When the watch task terminates first we trigger the drain and then must
937/// report a single result. The watch error is the root cause — poisoned
938/// serving state was already published before the task returned — so it wins
939/// when both fail. A drain error (port stolen, resource exhaustion) is only
940/// surfaced when the watch outcome is otherwise `Ok`; previously it was
941/// discarded via `let _ = serve.await`, hiding a failed drain behind a clean
942/// shutdown report.
943///
944/// Generic over the drain error so the precedence logic is unit-testable
945/// without fabricating a `tonic::transport::Error` (which has no public
946/// constructor): production passes `Result<(), tonic::transport::Error>`,
947/// tests pass `Result<(), ServerError>` via the reflexive `From` impl.
948fn combine_watch_and_drain<E>(
949 watch_result: Result<Result<(), ServerError>, tokio::task::JoinError>,
950 drain_result: Result<(), E>,
951) -> Result<(), ServerError>
952where
953 ServerError: From<E>,
954{
955 match join_to_server_result(watch_result) {
956 Err(watch_err) => Err(watch_err),
957 Ok(()) => drain_result.map_err(ServerError::from),
958 }
959}
960
961/// Build the gRPC reflection service from an encoded protobuf descriptor set.
962///
963/// Factored out of [`Server::into_router`] so the decode-failure path is unit
964/// testable: production passes [`tsoracle_proto::FILE_DESCRIPTOR_SET`], while
965/// tests can feed deliberately corrupt bytes to exercise the error mapping.
966/// A decode failure becomes [`ServerError::ReflectionInit`] rather than a panic.
967#[cfg(feature = "reflection")]
968fn build_reflection_service(
969 descriptor_set: &[u8],
970) -> Result<
971 tonic_reflection::server::v1::ServerReflectionServer<
972 impl tonic_reflection::server::v1::ServerReflection,
973 >,
974 ServerError,
975> {
976 tonic_reflection::server::Builder::configure()
977 .register_encoded_file_descriptor_set(descriptor_set)
978 .build_v1()
979 .map_err(ServerError::ReflectionInit)
980}
981
982fn panic_payload_to_string(panic: Box<dyn std::any::Any + Send>) -> String {
983 if let Some(text) = panic.downcast_ref::<&'static str>() {
984 (*text).to_string()
985 } else if let Some(text) = panic.downcast_ref::<String>() {
986 text.clone()
987 } else {
988 "watch task panicked with non-string payload".to_string()
989 }
990}
991
992#[cfg(any(test, feature = "test-fakes"))]
993impl Server {
994 /// Test-only entry point for the leader-watch loop. Exposed to integration
995 /// tests via the `test-fakes` feature; not part of the stable public API.
996 #[doc(hidden)]
997 pub async fn run_leader_watch_for_tests(self: Arc<Self>) -> Result<(), ServerError> {
998 // A never-resolving cancel future: these tests drive termination via
999 // leadership events or `JoinHandle::abort`, not cooperative cancel.
1000 crate::fence::run_leader_watch(self, futures::future::pending()).await
1001 }
1002
1003 /// Test-only allocator probe. Issues a window grant against the current
1004 /// in-memory state without going through the gRPC service. Used by
1005 /// regression tests that need to observe the behavioral fence (no
1006 /// timestamp at or below the prior leader's high-water) directly.
1007 #[doc(hidden)]
1008 pub fn try_grant_for_tests(&self, count: u32) -> Result<WindowGrant, CoreError> {
1009 self.core.try_grant(self.clock.now_ms(), count)
1010 }
1011}
1012
1013#[cfg(test)]
1014mod tests {
1015 use super::*;
1016
1017 #[test]
1018 fn panic_payload_to_string_recovers_static_str() {
1019 // `panic!("literal")` produces a `&'static str` payload; we want the
1020 // verbatim text so operators see what the watch task said.
1021 let payload: Box<dyn std::any::Any + Send> = Box::new("watch boom");
1022 assert_eq!(panic_payload_to_string(payload), "watch boom");
1023 }
1024
1025 #[test]
1026 fn panic_payload_to_string_recovers_owned_string() {
1027 // `panic!("{var}")` produces a `String` payload (formatted at panic
1028 // time); the helper must downcast that branch too.
1029 let payload: Box<dyn std::any::Any + Send> = Box::new(String::from("formatted"));
1030 assert_eq!(panic_payload_to_string(payload), "formatted");
1031 }
1032
1033 #[test]
1034 fn panic_payload_to_string_falls_back_for_other_types() {
1035 // Custom payloads (panic!(MyType { .. })) hit the catch-all branch.
1036 struct Custom;
1037 let payload: Box<dyn std::any::Any + Send> = Box::new(Custom);
1038 assert_eq!(
1039 panic_payload_to_string(payload),
1040 "watch task panicked with non-string payload",
1041 );
1042 }
1043
1044 #[test]
1045 fn serving_transitions_publish_through_core() {
1046 // The Server delegates serving-state transitions to its ServingCore; a
1047 // step_down on a freshly built Server lands as NotServing with the hint.
1048 // (The #346 send_replace-with-zero-receivers regression is pinned by the
1049 // ServingCore unit tests, which can observe `receiver_count` directly.)
1050 let server = Server::builder()
1051 .consensus_driver(Arc::new(crate::test_fakes::InMemoryDriver::new()))
1052 .build()
1053 .expect("build must succeed");
1054
1055 let hint = PeerEndpoint::try_from("new-leader:9000").unwrap();
1056 server.core.step_down(Some(hint.clone()), Some(Epoch(7)));
1057
1058 match server.core.serving_state() {
1059 ServingState::NotServing {
1060 leader_endpoint,
1061 leader_epoch,
1062 } => {
1063 assert_eq!(leader_endpoint, Some(hint));
1064 assert_eq!(leader_epoch, Some(Epoch(7)));
1065 }
1066 ServingState::Serving => panic!("expected NotServing after step_down"),
1067 }
1068 }
1069
1070 #[cfg(any(feature = "tls-rustls", feature = "tls-native"))]
1071 #[test]
1072 fn builder_stores_tls_config() {
1073 // The serve_* paths read `tls_config` from `Server` (not the builder)
1074 // after `into_router` consumes self — so the field must survive the
1075 // builder → Server hand-off, not just the builder method.
1076 use crate::test_fakes::InMemoryDriver;
1077
1078 let driver = Arc::new(InMemoryDriver::new());
1079 let cfg = tonic::transport::ServerTlsConfig::new();
1080 let server = Server::builder()
1081 .consensus_driver(driver)
1082 .tls_config(cfg)
1083 .build()
1084 .expect("build with tls_config must succeed");
1085 assert!(server.tls_config.is_some());
1086 }
1087
1088 #[tokio::test]
1089 async fn join_to_server_result_passes_through_clean_outcome() {
1090 // Ok(Ok(())) — task finished cleanly; forward verbatim.
1091 let handle = tokio::spawn(async { Ok::<(), ServerError>(()) });
1092 let join = handle.await;
1093 assert!(matches!(join_to_server_result(join), Ok(())));
1094 }
1095
1096 #[tokio::test]
1097 async fn join_to_server_result_forwards_inner_error() {
1098 // Ok(Err(e)) — task returned an error; forward it.
1099 let handle = tokio::spawn(async {
1100 Err::<(), ServerError>(ServerError::WatchPanic {
1101 payload: "synthetic".into(),
1102 bt: Bt::capture(),
1103 })
1104 });
1105 let join = handle.await;
1106 match join_to_server_result(join) {
1107 Err(ServerError::WatchPanic { payload, .. }) => assert_eq!(payload, "synthetic"),
1108 other => panic!("expected forwarded WatchPanic, got {other:?}"),
1109 }
1110 }
1111
1112 #[tokio::test]
1113 async fn join_to_server_result_translates_panic_to_watch_panic() {
1114 // Err(JoinError::is_panic) — task panicked; surface as WatchPanic with
1115 // the payload stringified by `panic_payload_to_string`.
1116 let handle = tokio::spawn(async {
1117 panic!("intentional");
1118 #[allow(unreachable_code)]
1119 Ok::<(), ServerError>(())
1120 });
1121 let join = handle.await;
1122 match join_to_server_result(join) {
1123 Err(ServerError::WatchPanic { payload, .. }) => {
1124 assert!(payload.contains("intentional"))
1125 }
1126 other => panic!("expected WatchPanic, got {other:?}"),
1127 }
1128 }
1129
1130 #[tokio::test]
1131 async fn join_to_server_result_treats_cancellation_as_clean_exit() {
1132 // Err(JoinError::is_cancelled) — caller aborted the task; we asked
1133 // for that, so map to Ok.
1134 let handle: tokio::task::JoinHandle<Result<(), ServerError>> =
1135 tokio::spawn(async { futures::future::pending().await });
1136 handle.abort();
1137 let join = handle.await;
1138 assert!(matches!(join_to_server_result(join), Ok(())));
1139 }
1140
1141 #[tokio::test]
1142 async fn combine_watch_and_drain_surfaces_drain_error_when_watch_ok() {
1143 // Watch ended cleanly but the graceful drain failed (port stolen,
1144 // resource exhaustion). The drain error must not be swallowed.
1145 let watch = tokio::spawn(async { Ok::<(), ServerError>(()) }).await;
1146 let drain: Result<(), ServerError> = Err(ServerError::WatchStreamClosed);
1147 assert!(matches!(
1148 combine_watch_and_drain(watch, drain),
1149 Err(ServerError::WatchStreamClosed)
1150 ));
1151 }
1152
1153 #[tokio::test]
1154 async fn combine_watch_and_drain_returns_ok_when_both_succeed() {
1155 // Watch clean, drain clean — the only fully-Ok outcome.
1156 let watch = tokio::spawn(async { Ok::<(), ServerError>(()) }).await;
1157 let drain: Result<(), ServerError> = Ok(());
1158 assert!(matches!(combine_watch_and_drain(watch, drain), Ok(())));
1159 }
1160
1161 #[tokio::test]
1162 async fn combine_watch_and_drain_prefers_watch_error_over_drain_error() {
1163 // Both failed. The watch error is the root cause (poisoned state was
1164 // already published), so it wins; the drain error is dropped.
1165 let watch = tokio::spawn(async {
1166 Err::<(), ServerError>(ServerError::WatchPanic {
1167 payload: "watch".into(),
1168 bt: Bt::capture(),
1169 })
1170 })
1171 .await;
1172 let drain: Result<(), ServerError> = Err(ServerError::WatchStreamClosed);
1173 match combine_watch_and_drain(watch, drain) {
1174 Err(ServerError::WatchPanic { payload, .. }) => assert_eq!(payload, "watch"),
1175 other => panic!("expected watch error to win, got {other:?}"),
1176 }
1177 }
1178
1179 #[tokio::test]
1180 async fn combine_watch_and_drain_returns_watch_error_when_drain_ok() {
1181 // Watch failed, drain succeeded — forward the watch error verbatim.
1182 let watch = tokio::spawn(async {
1183 Err::<(), ServerError>(ServerError::WatchPanic {
1184 payload: "watch".into(),
1185 bt: Bt::capture(),
1186 })
1187 })
1188 .await;
1189 let drain: Result<(), ServerError> = Ok(());
1190 match combine_watch_and_drain(watch, drain) {
1191 Err(ServerError::WatchPanic { payload, .. }) => assert_eq!(payload, "watch"),
1192 other => panic!("expected forwarded watch error, got {other:?}"),
1193 }
1194 }
1195
1196 #[tokio::test]
1197 async fn dropping_watch_guard_closes_serving_gate_synchronously() {
1198 // Regression: dropping the guard must close the serving gate at the
1199 // drop site, not on the watch task's later timeline. Build a guard whose
1200 // watch handle never touches the core (so the ONLY thing that can flip
1201 // the state to NotServing is `Drop`), publish `Serving`, then drop the
1202 // guard and read the state with NO await in between. On the current-thread
1203 // test runtime no other task can run between the synchronous `drop` and
1204 // the synchronous `serving_state` read, so observing `NotServing` proves
1205 // `Drop` closed the gate synchronously rather than the watch task.
1206 let core = Arc::new(ServingCore::new(Duration::from_secs(3)));
1207 core.publish_serving();
1208
1209 let handle: tokio::task::JoinHandle<Result<(), ServerError>> =
1210 tokio::spawn(async { Ok(()) });
1211 let (cancel_tx, _cancel_rx) = tokio::sync::oneshot::channel::<()>();
1212 let guard = WatchGuard {
1213 cancel_tx: Some(cancel_tx),
1214 handle: Some(handle),
1215 shutdown_grace: Duration::from_secs(10),
1216 core: core.clone(),
1217 reporter: Arc::new(crate::reporter::Reporter::new()),
1218 heartbeat_cancel_tx: None,
1219 heartbeat_handle: None,
1220 };
1221
1222 drop(guard);
1223
1224 assert!(
1225 matches!(core.serving_state(), ServingState::NotServing { .. }),
1226 "dropping the WatchGuard must close the serving gate synchronously",
1227 );
1228 }
1229
1230 #[tokio::test]
1231 async fn serve_inner_closes_serving_gate_on_user_shutdown() {
1232 // Regression for the serve path: when the caller's shutdown fires,
1233 // `serve_inner` drops the watch cancel sender and waits out the grace,
1234 // forcibly aborting the watch task if it overruns. A task parked upstream
1235 // of any cancel-observing await (modelled here by a never-resolving
1236 // future) is aborted before it can publish `NotServing`, so the gate
1237 // would stay open unless `serve_inner` closes it itself. Seed `Serving`,
1238 // run the user-shutdown arm with a zero grace (immediate abort), and
1239 // assert the gate is closed on return.
1240 let core = Arc::new(ServingCore::new(Duration::from_secs(3)));
1241 core.publish_serving();
1242
1243 let watch_handle: tokio::task::JoinHandle<Result<(), ServerError>> =
1244 tokio::spawn(async { futures::future::pending().await });
1245 let (watch_cancel_tx, _watch_cancel_rx) = tokio::sync::oneshot::channel::<()>();
1246 let (tonic_cancel_tx, _tonic_cancel_rx) = tokio::sync::oneshot::channel::<()>();
1247
1248 // A serve future that is immediately ready models the user's shutdown
1249 // having fired; with the biased select preferring the (pending) watch arm,
1250 // control reaches the user-shutdown arm.
1251 let serve_future = async { Ok::<(), tonic::transport::Error>(()) };
1252
1253 let result = serve_inner(
1254 watch_cancel_tx,
1255 watch_handle,
1256 None, // heartbeat_cancel_tx — heartbeat disabled in this regression test
1257 None, // heartbeat_handle
1258 serve_future,
1259 tonic_cancel_tx,
1260 Duration::from_millis(0),
1261 core.clone(),
1262 Arc::new(crate::reporter::Reporter::new()),
1263 )
1264 .await;
1265
1266 assert!(
1267 result.is_ok(),
1268 "user shutdown must return Ok, got {result:?}"
1269 );
1270 assert!(
1271 matches!(core.serving_state(), ServingState::NotServing { .. }),
1272 "serve_inner's user-shutdown arm must close the serving gate synchronously",
1273 );
1274 }
1275
1276 #[cfg(feature = "reflection")]
1277 #[test]
1278 fn build_reflection_service_accepts_embedded_descriptor_set() {
1279 // The descriptor set emitted by `tsoracle-proto`'s build.rs must decode
1280 // cleanly — this is the production happy path that previously sat behind
1281 // an `expect`.
1282 assert!(build_reflection_service(tsoracle_proto::FILE_DESCRIPTOR_SET).is_ok());
1283 }
1284
1285 #[cfg(feature = "reflection")]
1286 #[test]
1287 fn build_reflection_service_maps_corrupt_descriptor_to_typed_error() {
1288 // A descriptor set that fails to decode (build artifact drift) must
1289 // surface as a typed `ServerError::ReflectionInit`, not a panic. The
1290 // bytes below are not a valid encoded `FileDescriptorSet`.
1291 let corrupt = b"\xff\xff\xff\xff not a descriptor set";
1292 // The Ok variant wraps a reflection service that is not `Debug`, so map
1293 // to a unit result before asserting on the error variant.
1294 match build_reflection_service(corrupt).map(|_| ()) {
1295 Err(ServerError::ReflectionInit(_)) => {}
1296 other => panic!("expected ReflectionInit error, got {other:?}"),
1297 }
1298 }
1299}