tsoracle_server/server.rs
1//
2// ░▀█▀░█▀▀░█▀█░█▀▄░█▀█░█▀▀░█░░░█▀▀
3// ░░█░░▀▀█░█░█░█▀▄░█▀█░█░░░█░░░█▀▀
4// ░░▀░░▀▀▀░▀▀▀░▀░▀░▀░▀░▀▀▀░▀▀▀░▀▀▀
5//
6// tsoracle — Distributed Timestamp Oracle
7// https://www.tsoracle.rs
8//
9// Copyright (c) 2026 Prisma Risk
10//
11// Licensed under the Apache License, Version 2.0 (the "License");
12// you may not use this file except in compliance with the License.
13// You may obtain a copy of the License at
14//
15// https://www.apache.org/licenses/LICENSE-2.0
16//
17// Unless required by applicable law or agreed to in writing, software
18// distributed under the License is distributed on an "AS IS" BASIS,
19// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20// See the License for the specific language governing permissions and
21// limitations under the License.
22//
23
24use core::time::Duration;
25use std::future::Future;
26use std::net::SocketAddr;
27use std::sync::Arc;
28use tokio::sync::watch;
29use tonic::service::Routes;
30use tonic::transport::Server as TonicServer;
31use tsoracle_consensus::ConsensusDriver;
32#[cfg(any(test, feature = "test-fakes"))]
33use tsoracle_core::{CoreError, WindowGrant};
34use tsoracle_core::{Epoch, PeerEndpoint};
35use tsoracle_proto::v1::tso_service_server::TsoServiceServer;
36
37use crate::bt::Bt;
38use crate::clock::{Clock, SystemClock};
39use crate::service::TsoServiceImpl;
40use crate::serving_core::ServingCore;
41
42#[derive(Debug, thiserror::Error)]
43pub enum BuildError {
44 #[error("consensus_driver is required")]
45 MissingConsensusDriver,
46 /// `max_seq_count` was set to 0. Every positive `count` would then be
47 /// rejected as `SeqCountTooLarge` (the `count >= 1` floor leaves no valid
48 /// value), silently disabling `GetSeq`. Rejected at build time so the
49 /// misconfiguration surfaces immediately rather than at first request.
50 #[error("max_seq_count must be >= 1 (0 would reject every GetSeq request)")]
51 ZeroMaxSeqCount,
52 /// `max_seq_batch_keys` was set to 0. Every `GetSeqBatch` request would
53 /// then be rejected as exceeding the batch-key cap (the minimum is 1
54 /// entry), silently disabling `GetSeqBatch`. Rejected at build time so the
55 /// misconfiguration surfaces immediately rather than at first request.
56 #[error("max_seq_batch_keys must be >= 1 (0 would reject every GetSeqBatch request)")]
57 ZeroMaxSeqBatchKeys,
58}
59
60#[derive(Debug, thiserror::Error)]
61pub enum ServerError {
62 #[error("transport: {0}")]
63 Transport(#[from] tonic::transport::Error),
64 #[error("consensus: {0}")]
65 Consensus(#[from] tsoracle_consensus::ConsensusError),
66 #[error("core: {0}")]
67 Core(#[from] tsoracle_core::CoreError),
68 /// The leader-watch task panicked. Distinct from a clean error return so
69 /// operators can tell "driver returned Err" (recoverable design) from
70 /// "task panicked" (programming bug).
71 #[error("leader-watch task panicked: {payload}{bt}")]
72 WatchPanic { payload: String, bt: Bt },
73 /// The consensus driver's `leadership_events()` stream ended cleanly while
74 /// the leader-watch task was running. The stream is contracted to live for
75 /// the life of the server, so its end is anomalous (driver shutdown, lost
76 /// session, etc.) — distinct from a `Consensus` error returned mid-fence.
77 /// The watch task publishes `ServingState::NotServing` before returning
78 /// this variant so embedders who never observe the [`WatchGuard`] still get
79 /// the documented fail-safe behavior.
80 #[error("consensus driver leadership stream closed")]
81 WatchStreamClosed,
82 /// The embedded protobuf descriptor set failed to decode while building the
83 /// gRPC reflection service. `tsoracle-proto`'s `build.rs` emits these bytes
84 /// from checked-in `.proto` sources, so a failure here signals build-artifact
85 /// drift (a corrupt or stale descriptor) rather than a runtime condition —
86 /// surfaced as a diagnosable startup error instead of a process panic.
87 #[cfg(feature = "reflection")]
88 #[error("failed to build gRPC reflection service from embedded descriptor set: {0}")]
89 ReflectionInit(#[source] tonic_reflection::server::Error),
90}
91
92#[derive(Clone, Debug)]
93pub enum ServingState {
94 NotServing {
95 leader_endpoint: Option<PeerEndpoint>,
96 leader_epoch: Option<Epoch>,
97 },
98 Serving,
99}
100
101/// Default bound on how long a graceful shutdown waits for the leader-watch
102/// task to stop cooperatively before forcibly aborting it. The abort is a
103/// last-resort safety net for a consensus driver whose `load_high_water` /
104/// `persist_high_water` is wedged (the trait places no latency bound; see
105/// [`ConsensusDriver`]). Chosen to sit comfortably under a typical Kubernetes
106/// `terminationGracePeriodSeconds` (30s) so the abort, the tonic drain, and
107/// process exit all complete before the kubelet escalates to SIGKILL.
108const DEFAULT_SHUTDOWN_GRACE: Duration = Duration::from_secs(10);
109
110pub struct ServerBuilder {
111 consensus: Option<Arc<dyn ConsensusDriver>>,
112 clock: Option<Arc<dyn Clock>>,
113 window_ahead: Duration,
114 failover_advance: Duration,
115 shutdown_grace: Duration,
116 heartbeat_interval: Duration,
117 max_seq_count: u32,
118 max_seq_batch_keys: u32,
119 #[cfg(any(feature = "tls-rustls", feature = "tls-native"))]
120 tls_config: Option<tonic::transport::ServerTlsConfig>,
121}
122
123impl Default for ServerBuilder {
124 fn default() -> Self {
125 ServerBuilder {
126 consensus: None,
127 clock: None,
128 window_ahead: Duration::from_secs(3),
129 failover_advance: Duration::from_secs(1),
130 shutdown_grace: DEFAULT_SHUTDOWN_GRACE,
131 heartbeat_interval: Duration::from_secs(10),
132 max_seq_count: tsoracle_core::DEFAULT_MAX_SEQ_COUNT,
133 max_seq_batch_keys: tsoracle_core::DEFAULT_MAX_SEQ_BATCH_KEYS,
134 #[cfg(any(feature = "tls-rustls", feature = "tls-native"))]
135 tls_config: None,
136 }
137 }
138}
139
140impl ServerBuilder {
141 pub fn consensus_driver(mut self, driver: Arc<dyn ConsensusDriver>) -> Self {
142 self.consensus = Some(driver);
143 self
144 }
145 pub fn clock(mut self, clock: Arc<dyn Clock>) -> Self {
146 self.clock = Some(clock);
147 self
148 }
149 pub fn window_ahead(mut self, window_ahead: Duration) -> Self {
150 self.window_ahead = window_ahead;
151 self
152 }
153 pub fn failover_advance(mut self, failover_advance: Duration) -> Self {
154 self.failover_advance = failover_advance;
155 self
156 }
157
158 /// Bound on how long a graceful shutdown waits for the leader-watch task to
159 /// stop cooperatively before forcibly aborting it.
160 ///
161 /// On shutdown the server drops the watch task's cancel signal and waits for
162 /// it to publish `NotServing` and return. That wait is normally
163 /// near-instant, but the task observes cancellation only at its `select!`
164 /// boundaries — never inside a fence attempt. A consensus driver whose
165 /// `load_high_water` / `persist_high_water` never returns (the trait places
166 /// no latency bound) would otherwise park the task mid-fence and block
167 /// process exit indefinitely, leading to a SIGKILL on a Kubernetes drain.
168 /// Once `shutdown_grace` elapses the server aborts the task so exit always
169 /// makes progress. Set this comfortably below your deployment's
170 /// `terminationGracePeriodSeconds`. Defaults to 10s. A value of zero aborts
171 /// immediately without waiting for a cooperative stop.
172 pub fn shutdown_grace(mut self, shutdown_grace: Duration) -> Self {
173 self.shutdown_grace = shutdown_grace;
174 self
175 }
176
177 /// Interval between heartbeat log lines emitted at `target = "tsoracle::heartbeat"`.
178 /// Defaults to 10 seconds. Pass `Duration::ZERO` to disable the heartbeat task entirely.
179 ///
180 /// The heartbeat surfaces serving role, current epoch, requests served,
181 /// timestamps issued, and key error counters every interval — proof-of-life
182 /// for production deployments that may not have a metrics exporter installed.
183 ///
184 /// Requires `feature = "tracing"` to emit output; with `tracing` off the
185 /// setter is accepted but no task is spawned (no subscriber to log to).
186 pub fn heartbeat_interval(mut self, interval: Duration) -> Self {
187 self.heartbeat_interval = interval;
188 self
189 }
190
191 /// Configure TLS termination for this server. Applied inside
192 /// [`Server::serve`], [`Server::serve_with_shutdown`], and
193 /// [`Server::serve_with_listener`]. Not applied to [`Server::into_router`] —
194 /// embedders mounting tsoracle alongside their own services control TLS
195 /// on their own tonic builder.
196 #[cfg(any(feature = "tls-rustls", feature = "tls-native"))]
197 pub fn tls_config(mut self, cfg: tonic::transport::ServerTlsConfig) -> Self {
198 self.tls_config = Some(cfg);
199 self
200 }
201
202 /// Per-call ceiling on `GetSeq`'s `count` — the largest contiguous block a
203 /// single dense-sequence request may reserve. Defaults to
204 /// [`tsoracle_core::DEFAULT_MAX_SEQ_COUNT`] (`65_536`).
205 ///
206 /// This is a soft anti-abuse guardrail, not a representational limit: a
207 /// dense block is permanently consumed (the gapless counter only moves
208 /// forward), so the cap bounds how much one call can irrevocably reserve.
209 /// Raise it for batch-allocation workloads or lower it to tighten abuse
210 /// control. The server is the sole authority — clients no longer pre-check
211 /// `count` against a fixed constant, so changing this needs no client
212 /// rebuild; an over-cap request is rejected with `INVALID_ARGUMENT`. The
213 /// `count >= 1` floor is always enforced regardless of this value.
214 ///
215 /// A value of `0` is rejected by [`Self::build`] with
216 /// [`BuildError::ZeroMaxSeqCount`]: it would leave no valid `count` (the
217 /// floor is 1), silently disabling `GetSeq`, so the misconfiguration is
218 /// surfaced at build time rather than at first request.
219 pub fn max_seq_count(mut self, max_seq_count: u32) -> Self {
220 self.max_seq_count = max_seq_count;
221 self
222 }
223
224 /// Cap on the number of `(key, count)` entries accepted in one
225 /// `GetSeqBatch` request. Defaults to
226 /// [`tsoracle_core::DEFAULT_MAX_SEQ_BATCH_KEYS`] (`128`).
227 ///
228 /// This is a soft anti-abuse guardrail bounding fan-out and the size of
229 /// one atomic consensus entry. A value of `0` is rejected by
230 /// [`Self::build`] with [`BuildError::ZeroMaxSeqBatchKeys`]: it would
231 /// make every `GetSeqBatch` call fail, silently disabling the RPC, so the
232 /// misconfiguration is surfaced at build time. Deployment-specific tuning;
233 /// clients need no rebuild after changing this.
234 pub fn max_seq_batch_keys(mut self, max_seq_batch_keys: u32) -> Self {
235 self.max_seq_batch_keys = max_seq_batch_keys;
236 self
237 }
238
239 pub fn build(self) -> Result<Server, BuildError> {
240 let consensus = self.consensus.ok_or(BuildError::MissingConsensusDriver)?;
241 if self.max_seq_count == 0 {
242 return Err(BuildError::ZeroMaxSeqCount);
243 }
244 if self.max_seq_batch_keys == 0 {
245 return Err(BuildError::ZeroMaxSeqBatchKeys);
246 }
247 let clock = self.clock.unwrap_or_else(|| Arc::new(SystemClock));
248 Ok(Server {
249 consensus,
250 clock,
251 window_ahead: self.window_ahead,
252 failover_advance: self.failover_advance,
253 shutdown_grace: self.shutdown_grace,
254 heartbeat_interval: self.heartbeat_interval,
255 core: Arc::new(ServingCore::new(
256 self.window_ahead,
257 self.max_seq_count,
258 self.max_seq_batch_keys,
259 )),
260 reporter: Arc::new(crate::reporter::Reporter::new()),
261 #[cfg(any(feature = "tls-rustls", feature = "tls-native"))]
262 tls_config: self.tls_config,
263 })
264 }
265}
266
267pub struct Server {
268 pub(crate) consensus: Arc<dyn ConsensusDriver>,
269 pub(crate) clock: Arc<dyn Clock>,
270 pub(crate) window_ahead: Duration,
271 pub(crate) failover_advance: Duration,
272 /// Bound on the graceful-shutdown wait for the leader-watch task before a
273 /// forced abort. See [`ServerBuilder::shutdown_grace`].
274 pub(crate) shutdown_grace: Duration,
275 /// Interval between periodic heartbeat log lines. See [`ServerBuilder::heartbeat_interval`].
276 ///
277 /// The only reader is the `cfg(feature = "tracing")` spawn block in
278 /// `into_router_parts`; without `tracing` there is no subscriber to log to
279 /// and the spawn arm is compiled out, so the field is genuinely unread.
280 #[cfg_attr(not(feature = "tracing"), allow(dead_code))]
281 pub(crate) heartbeat_interval: Duration,
282 /// Owns the allocator, serving-state channel, and both extension locks, with
283 /// the lock-ordering and step-down invariants private behind its methods.
284 ///
285 /// Held behind an `Arc` so the leader-watch task, the gRPC service, and the
286 /// [`WatchGuard`] / [`serve_inner`] shutdown paths can all reach the same
287 /// core. The guard and the serve loop use their clone to close the serving
288 /// gate *synchronously* at shutdown, leaving the watch task's later
289 /// `step_down` a harmless idempotent repeat.
290 pub(crate) core: Arc<ServingCore>,
291 pub(crate) reporter: Arc<crate::reporter::Reporter>,
292 #[cfg(any(feature = "tls-rustls", feature = "tls-native"))]
293 pub(crate) tls_config: Option<tonic::transport::ServerTlsConfig>,
294}
295
296/// Raw parts produced by [`Server::into_router_parts`]: the gRPC `Routes`, the
297/// leader-watch task's cooperative-cancel sender (dropping it stops the task),
298/// the task's join handle, and the optional heartbeat task's cancel sender /
299/// join handle. [`Server::into_router`] wraps these into a [`WatchGuard`]; the
300/// `serve_*` methods consume them directly via [`serve_inner`].
301///
302/// The heartbeat fields are `None` when the heartbeat is disabled — either by
303/// `ServerBuilder::heartbeat_interval(Duration::ZERO)` or by building without
304/// the `tracing` feature (there is no subscriber to log to).
305pub(crate) struct RouterParts {
306 pub routes: Routes,
307 pub cancel_tx: tokio::sync::oneshot::Sender<()>,
308 pub watch_handle: tokio::task::JoinHandle<Result<(), ServerError>>,
309 pub heartbeat_cancel_tx: Option<tokio::sync::oneshot::Sender<()>>,
310 pub heartbeat_handle: Option<tokio::task::JoinHandle<()>>,
311}
312
313impl Server {
314 pub fn builder() -> ServerBuilder {
315 ServerBuilder::default()
316 }
317
318 /// Subscribe to serving-state transitions.
319 ///
320 /// Returns a fresh `watch::Receiver` observing the same `ServingState`
321 /// the server publishes as leadership comes and goes. Embedders use this
322 /// to gate their own startup on `ServingState::Serving` (see the
323 /// `embedded_router` and piggyback examples). Because `into_router`
324 /// consumes the `Server`, capture the receiver before mounting.
325 ///
326 /// This method is the stable observation API: the receiver is minted from
327 /// the server's `watch::Sender`, so the receiver's type can evolve (e.g. a
328 /// future newtype around `ServingState`) without breaking embedders that
329 /// go through it.
330 pub fn subscribe(&self) -> watch::Receiver<ServingState> {
331 self.core.subscribe()
332 }
333}
334
335impl Server {
336 /// Return the configured `TsoServiceServer<TsoServiceImpl>` as a tonic
337 /// `Routes` value plus a [`WatchGuard`] for the spawned leader-watch task,
338 /// so callers can mount tsoracle's service alongside their own services
339 /// on a shared tonic listener instead of binding a dedicated port.
340 ///
341 /// The returned [`WatchGuard`] owns the leader-watch task. **Keep it alive
342 /// for as long as the mounted `Routes` should serve**: the watch task holds
343 /// an `Arc<Server>` (and the consensus driver) and maintains serving state
344 /// across leadership transitions. Dropping the guard — or calling
345 /// [`WatchGuard::shutdown`] — cooperatively stops the task at the embedder's
346 /// own shutdown. Without the guard the task would keep `Arc<Server>` alive
347 /// until the leadership stream happened to close.
348 ///
349 /// Every termination of the task — cooperative cancellation, driver error,
350 /// panic, or clean EOF on the leadership stream (surfaced as
351 /// `ServerError::WatchStreamClosed`) — publishes
352 /// `ServingState::NotServing { leader_endpoint: None }` before returning, so
353 /// all subsequent RPCs fail fast with `FAILED_PRECONDITION`. Embedders who
354 /// drop the guard without awaiting still get fail-safe behavior.
355 ///
356 /// The `Server::serve()` method is a thin wrapper over this — it calls
357 /// `into_router`, builds a tonic `Server`, and binds a listener.
358 ///
359 /// Returns `Err(ServerError::ReflectionInit)` (only reachable under the
360 /// `reflection` feature) if the embedded descriptor set fails to decode.
361 /// That decode happens before the leader-watch task is spawned, so a failure
362 /// leaves nothing running to clean up.
363 pub fn into_router(self) -> Result<(Routes, WatchGuard), ServerError> {
364 // Read the `Copy` grace before `into_router_parts` consumes `self`, so
365 // the returned guard can bound its own shutdown wait identically to the
366 // `serve_*` paths.
367 let shutdown_grace = self.shutdown_grace;
368 // Clone the shared core and reporter before `into_router_parts` consumes
369 // `self`, so the guard can close the serving gate synchronously on drop /
370 // shutdown rather than relying on the watch task's later publish, and can
371 // record the shutdown_watch_aborted counter if the grace-bounded reap fires.
372 let core = self.core.clone();
373 let reporter = self.reporter.clone();
374 let parts = self.into_router_parts()?;
375 Ok((
376 parts.routes,
377 WatchGuard {
378 cancel_tx: Some(parts.cancel_tx),
379 handle: Some(parts.watch_handle),
380 shutdown_grace,
381 core,
382 reporter,
383 heartbeat_cancel_tx: parts.heartbeat_cancel_tx,
384 heartbeat_handle: parts.heartbeat_handle,
385 },
386 ))
387 }
388
389 /// Spawn the leader-watch task and assemble the gRPC `Routes`, returning
390 /// the raw parts: the routes, the task's cooperative-cancel sender, and its
391 /// `JoinHandle`. [`Self::into_router`] wraps these into a [`WatchGuard`] for
392 /// embedders; the `serve_*` methods drive the parts directly via
393 /// [`serve_inner`], so neither path needs to unwrap the guard's `Option`
394 /// fields.
395 fn into_router_parts(self) -> Result<RouterParts, ServerError> {
396 // Build the reflection service first: a descriptor-decode failure must
397 // surface before we spawn the leader-watch task below, so an error path
398 // never leaks a running task.
399 #[cfg(feature = "reflection")]
400 let reflection = build_reflection_service(tsoracle_proto::FILE_DESCRIPTOR_SET)?;
401
402 let server = Arc::new(self);
403
404 // Cooperative cancellation channel. The `WatchGuard` holds the sender;
405 // the task's `cancel` future resolves on either an explicit send or a
406 // sender drop, so dropping the guard is sufficient to stop the task.
407 let (cancel_tx, cancel_rx) = tokio::sync::oneshot::channel::<()>();
408
409 let watch_server = server.clone();
410 let watch_handle = tokio::spawn(async move {
411 use futures::FutureExt;
412 // Resolves when the WatchGuard signals cancellation or is dropped.
413 let cancel = async move {
414 let _ = cancel_rx.await;
415 };
416 // catch_unwind so a panic in run_leader_watch still routes through
417 // the poisoning path. Without this, embedders who mount into_router
418 // directly and never observe the guard would see
419 // ServingState::Serving remain published while the watch task is
420 // dead — the inverse of the fail-safe guarantee documented above.
421 // The panic is re-raised after poisoning so serve / serve_with_*
422 // continue to translate it into ServerError::WatchPanic via
423 // join_to_server_result.
424 let outcome = std::panic::AssertUnwindSafe(crate::fence::run_leader_watch(
425 watch_server.clone(),
426 cancel,
427 ))
428 .catch_unwind()
429 .await;
430 match outcome {
431 Ok(result) => {
432 if let Err(ref _e) = result {
433 // Poison BEFORE returning so embedders who do not observe
434 // the guard still get fail-safe behavior.
435 watch_server.core.step_down(None, None);
436 #[cfg(feature = "tracing")]
437 tracing::error!(error = %_e, "leader-watch terminated; serving disabled");
438 }
439 result
440 }
441 Err(panic_payload) => {
442 // Mirror the Err branch: poison BEFORE re-raising so
443 // guard-dropping embedders still observe NotServing.
444 watch_server.core.step_down(None, None);
445 #[cfg(feature = "tracing")]
446 tracing::error!("leader-watch panicked; serving disabled");
447 std::panic::resume_unwind(panic_payload);
448 }
449 }
450 });
451
452 // Spawn the heartbeat task, if enabled. Gated on `feature = "tracing"`
453 // because the heartbeat module is only compiled with `tracing`
454 // (no subscriber to emit to without it) — and on a non-zero interval,
455 // since `Duration::ZERO` is the documented opt-out.
456 //
457 // The task body is wrapped in `AssertUnwindSafe(...).catch_unwind()`
458 // mirroring the leader-watch spawn above: on panic we bump the
459 // `heartbeat_task_panicked` counter and log at error level, then let
460 // the task end (no restart — the heartbeat is observability, not
461 // correctness, so a panicked task must not be allowed to thrash).
462 let (heartbeat_cancel_tx, heartbeat_handle) = {
463 #[cfg(feature = "tracing")]
464 {
465 if server.heartbeat_interval.is_zero() {
466 (None, None)
467 } else {
468 use futures::FutureExt;
469 let (htx, hrx) = tokio::sync::oneshot::channel::<()>();
470 let hb_reporter = server.reporter.clone();
471 let hb_core = server.core.clone();
472 let hb_interval = server.heartbeat_interval;
473 let handle = tokio::spawn(async move {
474 let outcome =
475 std::panic::AssertUnwindSafe(crate::heartbeat::run_heartbeat(
476 hb_interval,
477 hb_core,
478 hb_reporter.clone(),
479 hrx,
480 ))
481 .catch_unwind()
482 .await;
483 if outcome.is_err() {
484 hb_reporter.heartbeat_task_panicked.increment(1);
485 tracing::error!(
486 target: "tsoracle::heartbeat",
487 "heartbeat task panicked; liveness logs disabled until restart"
488 );
489 }
490 });
491 (Some(htx), Some(handle))
492 }
493 }
494 #[cfg(not(feature = "tracing"))]
495 {
496 (None, None)
497 }
498 };
499
500 let service = TsoServiceImpl { server };
501 #[allow(unused_mut)]
502 let mut routes = Routes::new(TsoServiceServer::new(service));
503 #[cfg(feature = "reflection")]
504 {
505 routes = routes.add_service(reflection);
506 }
507 Ok(RouterParts {
508 routes,
509 cancel_tx,
510 watch_handle,
511 heartbeat_cancel_tx,
512 heartbeat_handle,
513 })
514 }
515
516 pub async fn serve(self, addr: SocketAddr) -> Result<(), ServerError> {
517 self.serve_with_shutdown(addr, futures::future::pending())
518 .await
519 }
520
521 /// Run the gRPC server until either the caller's `shutdown` fires or the
522 /// leader-watch task terminates.
523 ///
524 /// Three outcomes:
525 /// 1. `shutdown` fires first → tonic drains in-flights and returns Ok.
526 /// The watch task is then stopped cooperatively, bounded by
527 /// `shutdown_grace` and forcibly aborted if it overruns (e.g. parked in a
528 /// wedged consensus-driver call); any error it had been about to return
529 /// is forfeited (the process is shutting down anyway).
530 /// 2. Watch returns `Ok(Err(e))` → poisoned state is already published;
531 /// `cancel_tx` triggers tonic's graceful shutdown; in-flight `GetTs`
532 /// calls whose `try_grant` already succeeded complete with the
533 /// timestamps they were allocated; new calls fail fast. Returns `Err(e)`
534 /// — the watch error wins even if the drain itself also errors (see
535 /// `combine_watch_and_drain`); a drain error is surfaced only when the
536 /// watch ended cleanly.
537 /// 3. Watch task panics → returns `Err(ServerError::WatchPanic{..})`
538 /// with the panic payload stringified. Same drain semantics as (2).
539 pub async fn serve_with_shutdown(
540 self,
541 addr: SocketAddr,
542 shutdown: impl Future<Output = ()> + Send + 'static,
543 ) -> Result<(), ServerError> {
544 #[cfg(any(feature = "tls-rustls", feature = "tls-native"))]
545 let tls_config = self.tls_config.clone();
546
547 // Read the `Copy` grace and clone the shared core and reporter before
548 // `into_router_parts` consumes `self`.
549 let shutdown_grace = self.shutdown_grace;
550 let core = self.core.clone();
551 let reporter = self.reporter.clone();
552 let parts = self.into_router_parts()?;
553 let (combined_shutdown, cancel_tx) = combined_shutdown_with_cancel(shutdown);
554
555 let mut tonic = TonicServer::builder();
556 #[cfg(any(feature = "tls-rustls", feature = "tls-native"))]
557 if let Some(cfg) = tls_config {
558 tonic = tonic.tls_config(cfg).map_err(ServerError::Transport)?;
559 }
560 let serve = tonic
561 .add_routes(parts.routes)
562 .serve_with_shutdown(addr, combined_shutdown);
563
564 serve_inner(
565 parts.cancel_tx,
566 parts.watch_handle,
567 parts.heartbeat_cancel_tx,
568 parts.heartbeat_handle,
569 serve,
570 cancel_tx,
571 shutdown_grace,
572 core,
573 reporter,
574 )
575 .await
576 }
577
578 /// Run the gRPC server on a caller-provided `TcpListener` until either
579 /// the caller-provided `shutdown` fires or the leader-watch task terminates.
580 ///
581 /// Use this instead of [`Self::serve_with_shutdown`] when you need to
582 /// observe the OS-picked port (`127.0.0.1:0`) before clients connect, or
583 /// when you want to wrap the listener in an outer adapter before passing it
584 /// in. The listening socket is owned by the caller and passed here; tsoracle
585 /// starts accepting on it immediately.
586 ///
587 /// Three outcomes:
588 /// 1. `shutdown` fires first → tonic drains in-flights and returns `Ok`.
589 /// The watch handle is aborted; any error it had been about to return
590 /// is forfeited (the process is shutting down anyway).
591 /// 2. Watch returns `Ok(Err(e))` → poisoned state is already published;
592 /// the caller-provided shutdown is cancelled internally so tonic begins
593 /// graceful shutdown; in-flight `GetTs` calls whose `try_grant` already
594 /// succeeded complete with the timestamps they were allocated; new calls
595 /// fail fast. Returns `Err(e)` — the watch error wins even if the drain
596 /// itself also errors (see `combine_watch_and_drain`); a drain error is
597 /// surfaced only when the watch ended cleanly.
598 /// 3. Watch task panics → returns `Err(ServerError::WatchPanic{..})`
599 /// with the panic payload stringified. Same drain semantics as (2).
600 pub async fn serve_with_listener(
601 self,
602 listener: tokio::net::TcpListener,
603 shutdown: impl Future<Output = ()> + Send + 'static,
604 ) -> Result<(), ServerError> {
605 #[cfg(any(feature = "tls-rustls", feature = "tls-native"))]
606 let tls_config = self.tls_config.clone();
607
608 // Read the `Copy` grace and clone the shared core and reporter before
609 // `into_router_parts` consumes `self`.
610 let shutdown_grace = self.shutdown_grace;
611 let core = self.core.clone();
612 let reporter = self.reporter.clone();
613 let parts = self.into_router_parts()?;
614 let (combined_shutdown, cancel_tx) = combined_shutdown_with_cancel(shutdown);
615
616 let incoming = tonic::transport::server::TcpIncoming::from(listener);
617
618 let mut tonic = TonicServer::builder();
619 #[cfg(any(feature = "tls-rustls", feature = "tls-native"))]
620 if let Some(cfg) = tls_config {
621 tonic = tonic.tls_config(cfg).map_err(ServerError::Transport)?;
622 }
623 let serve = tonic
624 .add_routes(parts.routes)
625 .serve_with_incoming_shutdown(incoming, combined_shutdown);
626
627 serve_inner(
628 parts.cancel_tx,
629 parts.watch_handle,
630 parts.heartbeat_cancel_tx,
631 parts.heartbeat_handle,
632 serve,
633 cancel_tx,
634 shutdown_grace,
635 core,
636 reporter,
637 )
638 .await
639 }
640}
641
642/// RAII handle to the leader-watch task spawned by [`Server::into_router`].
643///
644/// The watch task holds an `Arc<Server>` (and thus the consensus driver) and
645/// maintains serving state across leadership transitions. This guard ties the
646/// task's lifetime to the guard's: dropping it cooperatively cancels the task,
647/// and the task publishes [`ServingState::NotServing`] before it stops, so any
648/// `Routes` an embedder still has mounted fails subsequent RPCs fast.
649///
650/// Cancellation is cooperative — the task stops at its next await boundary and
651/// never mid-fence, so it is never torn down while holding internal locks, in
652/// contrast to a raw [`tokio::task::JoinHandle::abort`].
653pub struct WatchGuard {
654 // `Option` so `Drop` and the consuming `shutdown` / `abort` methods can
655 // each take a field without a partial-move conflict against the `Drop`
656 // impl. Dropping the sender (rather than sending) is itself the cancel
657 // signal: the task's `cancel` future resolves on sender-drop too.
658 cancel_tx: Option<tokio::sync::oneshot::Sender<()>>,
659 handle: Option<tokio::task::JoinHandle<Result<(), ServerError>>>,
660 /// Bound on the cooperative-stop wait in [`Self::shutdown`] before a forced
661 /// abort. Inherited from [`ServerBuilder::shutdown_grace`].
662 shutdown_grace: Duration,
663 /// Shared serving core, cloned from the `Server`. Lets `Drop` (and the
664 /// consuming `shutdown` / `abort`, which trigger `Drop` on return) close the
665 /// serving gate synchronously at the drop site, instead of waiting for the
666 /// watch task to observe cancellation and publish `NotServing` on its own
667 /// timeline — a window during which the fast gate would still admit RPCs.
668 core: Arc<ServingCore>,
669 /// Metrics reporter, cloned from the `Server`. Used to record the
670 /// `shutdown_watch_aborted` counter if the grace-bounded reap fires.
671 reporter: Arc<crate::reporter::Reporter>,
672 /// Cooperative-cancel sender for the heartbeat task. `None` when the
673 /// heartbeat is disabled (interval == 0 or built without `tracing`).
674 /// Dropping the sender resolves the task's cancel future.
675 heartbeat_cancel_tx: Option<tokio::sync::oneshot::Sender<()>>,
676 /// Join handle for the heartbeat task. `None` when the heartbeat is
677 /// disabled. Output is `()` because the task never returns an error —
678 /// panics are caught inside the task body and recorded via the
679 /// `heartbeat_task_panicked` counter.
680 heartbeat_handle: Option<tokio::task::JoinHandle<()>>,
681}
682
683impl WatchGuard {
684 /// Signal the leader-watch task to stop, wait for it to drain, and report
685 /// its outcome.
686 ///
687 /// A cooperatively cancelled task returns `Ok(())` — the stop was
688 /// requested, so it is not an error. If the task had already terminated on
689 /// its own (driver error, stream EOF, or panic) the original outcome is
690 /// surfaced verbatim: `Err(e)` or [`ServerError::WatchPanic`]. Either way
691 /// serving state is `NotServing` once this returns.
692 ///
693 /// The cooperative wait is bounded by the configured
694 /// [`ServerBuilder::shutdown_grace`]: if the task is parked in a
695 /// consensus-driver call that never returns it is aborted once the grace
696 /// elapses (still reported as `Ok(())`), so an embedder's shutdown can never
697 /// wedge behind a hung driver.
698 pub async fn shutdown(mut self) -> Result<(), ServerError> {
699 // Dropping the senders fires each task's cancel future. The heartbeat
700 // task is reaped first because it is bounded by `tokio::time::sleep`
701 // (cooperative stop is fast and never wedges on a driver call), so its
702 // reap returns quickly and leaves the grace budget for the watch task
703 // — which may be parked in a wedged consensus-driver call.
704 self.heartbeat_cancel_tx.take();
705 self.cancel_tx.take();
706 if let Some(mut hb_handle) = self.heartbeat_handle.take() {
707 match tokio::time::timeout(self.shutdown_grace, &mut hb_handle).await {
708 Ok(Ok(())) => {}
709 // Task panicked — already counted + logged via catch_unwind in
710 // the task body. Nothing more to do here.
711 Ok(Err(_join_err)) => {}
712 // Grace overrun — sleep + select! should always observe a
713 // dropped cancel sender, so this is a backstop. Abort and
714 // reap; no separate metric (the heartbeat is observability
715 // only — its lateness is not a serving correctness signal).
716 Err(_elapsed) => {
717 hb_handle.abort();
718 let _ = (&mut hb_handle).await;
719 }
720 }
721 }
722 match self.handle.take() {
723 Some(mut handle) => join_to_server_result(
724 await_watch_within_grace(&mut handle, self.shutdown_grace, &self.reporter).await,
725 ),
726 None => Ok(()),
727 }
728 }
729
730 /// Hard-abort the leader-watch task without waiting for a cooperative stop.
731 ///
732 /// Prefer [`Self::shutdown`] or simply dropping the guard; both let the
733 /// task stop at a safe point. This is an escape hatch for callers that
734 /// cannot await and accept that the task may be torn down mid-fence.
735 pub fn abort(mut self) {
736 if let Some(handle) = self.handle.take() {
737 handle.abort();
738 }
739 // Hard-abort the heartbeat task too — leaving it running after the
740 // watch is torn down would publish heartbeats describing a stale
741 // (typically `NotServing`) view until the Arc<Reporter> is dropped.
742 if let Some(hb_handle) = self.heartbeat_handle.take() {
743 hb_handle.abort();
744 }
745 }
746
747 /// Whether the leader-watch task has finished — terminated for any reason
748 /// (cooperative cancel, driver error, stream EOF, or panic).
749 ///
750 /// A read-only liveness probe that neither consumes the guard nor disturbs
751 /// its cancel-on-drop behavior, so an embedder can poll task health while
752 /// keeping the guard alive.
753 pub fn is_finished(&self) -> bool {
754 self.handle
755 .as_ref()
756 .is_none_or(|handle| handle.is_finished())
757 }
758}
759
760impl Drop for WatchGuard {
761 fn drop(&mut self) {
762 // Close the serving gate synchronously, here at the drop site. Dropping
763 // the cancel sender below only *requests* the watch task to stop; the
764 // task publishes `NotServing` later, on its own timeline. Between this
765 // drop and the task's next poll the fast gate would still read `Serving`
766 // (and the allocator would still grant), so an RPC could be admitted by a
767 // server that has already been told to stop serving. `step_down` clears
768 // the allocator and publishes `NotServing` now, before any await; the
769 // watch task's later `step_down(None, None)` on cooperative cancel (see
770 // `fence::run_leader_watch`) republishes the identical state, so the
771 // double-close is harmless and idempotent.
772 self.core.step_down(None, None);
773 // Dropping the sender (if `shutdown` / `abort` did not already take it)
774 // resolves the task's cancel future; the task then publishes
775 // `NotServing` and returns. The `JoinHandle` is dropped here too,
776 // detaching the task to finish its cooperative shutdown on its own.
777 self.cancel_tx.take();
778 // Same treatment for the heartbeat task. `Drop` is sync so we cannot
779 // await the cooperative stop; instead we drop the cancel sender (the
780 // task will observe it at its next select! boundary) and hard-abort
781 // the handle so it cannot outlive the guard and publish heartbeats
782 // describing a stale view if the runtime keeps the Arc alive.
783 self.heartbeat_cancel_tx.take();
784 if let Some(hb_handle) = self.heartbeat_handle.take() {
785 hb_handle.abort();
786 }
787 }
788}
789
790/// Merge the caller's `shutdown` future with an internal cancellation signal.
791///
792/// Both [`Server::serve_with_shutdown`] and [`Server::serve_with_listener`]
793/// need tonic to stop when EITHER the caller's `shutdown` fires OR the
794/// leader-watch task terminates (signalled by firing the returned
795/// `oneshot::Sender`). This builds that merged shutdown future and hands back
796/// the sender so [`serve_inner`] can trip it from the watch arm.
797fn combined_shutdown_with_cancel(
798 shutdown: impl Future<Output = ()> + Send + 'static,
799) -> (
800 impl Future<Output = ()> + Send + 'static,
801 tokio::sync::oneshot::Sender<()>,
802) {
803 let (cancel_tx, cancel_rx) = tokio::sync::oneshot::channel::<()>();
804 let combined_shutdown = async move {
805 tokio::select! {
806 _ = shutdown => {}
807 _ = cancel_rx => {}
808 }
809 };
810 (combined_shutdown, cancel_tx)
811}
812
813/// Wait for the leader-watch task to stop cooperatively, but no longer than
814/// `grace`, then forcibly abort it if it is still running.
815///
816/// The watch task observes its cancel signal only at the `select!` boundaries
817/// in [`crate::fence::run_leader_watch`], never inside a fence attempt. A
818/// consensus driver whose `load_high_water` / `persist_high_water` never
819/// returns (the trait places no latency bound; see
820/// [`tsoracle_consensus::ConsensusDriver`]) therefore parks the task upstream
821/// of any cancel-observing await, so dropping the cancel sender cannot stop it.
822/// Left unbounded, the shutdown wait would block process exit until the kubelet
823/// escalates to SIGKILL on a drain. Bounding the wait by `grace` and aborting
824/// on expiry guarantees forward progress: `tokio` tears a suspended task (and
825/// the wedged driver future it holds) down at the abort, dropping its
826/// drain-barrier guard.
827///
828/// Returns the task's join result. A clean cooperative stop forwards its real
829/// outcome verbatim; an aborted task surfaces as a cancelled `JoinError`, which
830/// [`join_to_server_result`] maps to `Ok(())` — the stop was requested, so a
831/// forced abort during shutdown is not an error. A `grace` of zero aborts
832/// immediately (the `timeout` future is already elapsed on first poll).
833async fn await_watch_within_grace(
834 watch_handle: &mut tokio::task::JoinHandle<Result<(), ServerError>>,
835 grace: Duration,
836 reporter: &Arc<crate::reporter::Reporter>,
837) -> Result<Result<(), ServerError>, tokio::task::JoinError> {
838 match tokio::time::timeout(grace, &mut *watch_handle).await {
839 Ok(join_result) => join_result,
840 Err(_elapsed) => {
841 reporter.shutdown_watch_aborted.increment(1);
842 #[cfg(feature = "tracing")]
843 tracing::warn!(
844 grace_ms = grace.as_millis() as u64,
845 "leader-watch task did not stop within the shutdown grace; aborting it (a consensus-driver call likely exceeded its latency bound)"
846 );
847 watch_handle.abort();
848 // Reap the aborted task so its Drop (releasing any held drain-barrier
849 // guard) runs before we report shutdown complete. Bounded: an aborted
850 // task resolves at its next poll.
851 (&mut *watch_handle).await
852 }
853 }
854}
855
856/// Drive the gRPC `serve_future` against the leader-watch task, shared by
857/// [`Server::serve_with_shutdown`] and [`Server::serve_with_listener`].
858///
859/// The two public methods differ only in how `serve_future` is assembled
860/// (address-bound via `serve_with_shutdown` vs listener-bound via
861/// `serve_with_incoming_shutdown`); everything downstream — the biased select,
862/// the cooperative-cancel path, and the drain/translate logic — is identical
863/// and lives here so a future change need only be made once.
864///
865/// `tonic_cancel_tx` is the cancellation half paired with the `serve_future`'s
866/// shutdown signal (see [`combined_shutdown_with_cancel`]); firing it begins
867/// tonic's graceful drain when the watch task terminates first. `watch_cancel_tx`
868/// is the leader-watch task's own cooperative-cancel sender (the same one a
869/// [`WatchGuard`] holds for embedders); dropping it stops the task. Taking the
870/// raw parts rather than a `WatchGuard` keeps this path free of the guard's
871/// `Option` fields — neither the watch handle nor the cancel sender is optional
872/// here. `shutdown_grace` bounds the user-shutdown arm's wait for the watch task
873/// (see [`await_watch_within_grace`]). `core` is the shared serving core (the
874/// same one the watch task and the gRPC service hold): the user-shutdown arm
875/// closes the gate on it synchronously so no RPC is admitted in the window
876/// before the watch task observes cancellation and publishes `NotServing`.
877// Private serve helper. The wide signature is the cost of being the single
878// merge point for the two public `serve_*` paths and the leader-watch +
879// heartbeat task pair: bundling these into a struct just to placate clippy
880// would obscure the lifecycle (every parameter is consumed exactly once and
881// has no shared identity worth naming). Keep the arguments visible.
882#[allow(clippy::too_many_arguments)]
883async fn serve_inner<S>(
884 watch_cancel_tx: tokio::sync::oneshot::Sender<()>,
885 mut watch_handle: tokio::task::JoinHandle<Result<(), ServerError>>,
886 heartbeat_cancel_tx: Option<tokio::sync::oneshot::Sender<()>>,
887 heartbeat_handle: Option<tokio::task::JoinHandle<()>>,
888 serve_future: S,
889 tonic_cancel_tx: tokio::sync::oneshot::Sender<()>,
890 shutdown_grace: Duration,
891 core: Arc<ServingCore>,
892 reporter: Arc<crate::reporter::Reporter>,
893) -> Result<(), ServerError>
894where
895 S: Future<Output = Result<(), tonic::transport::Error>>,
896{
897 tokio::pin!(serve_future);
898
899 let outcome = tokio::select! {
900 // Bias toward the watch arm: if both are ready in the same poll
901 // (rare but possible — graceful shutdown completed in the same
902 // tick the watch returned), we want to surface the watch error
903 // rather than report a clean shutdown.
904 biased;
905
906 watch_result = &mut watch_handle => {
907 // Watch terminated. State is already poisoned (see watch
908 // task body in into_router). Trigger tonic drain, wait for
909 // it to finish, then report the watch's outcome — preferring
910 // it over any drain error, which surfaces only if the watch
911 // itself ended cleanly.
912 let _ = tonic_cancel_tx.send(());
913 let drain_result = serve_future.await;
914 combine_watch_and_drain(watch_result, drain_result)
915 }
916 serve_result = &mut serve_future => {
917 // User shutdown fired (or our cancel — but watch arm has
918 // `biased` priority, so reaching here means user shutdown).
919 // Prefer a cooperative stop: dropping the cancel sender resolves
920 // the task's cancel future so it stops at its next `select!`
921 // boundary, having published `NotServing` and never torn down
922 // mid-fence while holding `extension_gate.write()`. But a
923 // cooperative stop is only observed at those boundaries, never
924 // inside a fence attempt — a consensus-driver call that never
925 // returns (the trait places no latency bound) would park the task
926 // upstream of any cancel point and block process exit until a
927 // kubelet SIGKILL. `await_watch_within_grace` therefore bounds the
928 // wait by `shutdown_grace` and aborts the task if it overruns. The
929 // task's own outcome (a clean `Ok(())` on cooperative stop, a
930 // cancelled `JoinError` on abort) is discarded; the user-requested
931 // shutdown result wins.
932 //
933 // Close the serving gate synchronously first: dropping the sender
934 // only *requests* the stop, and a task aborted on grace expiry (or
935 // simply not yet rescheduled) may never reach its own `step_down`. So
936 // a `GetTs` arriving during the drain would still be admitted unless
937 // we close the gate here. `step_down` is idempotent with the watch
938 // task's own cooperative-cancel publish.
939 core.step_down(None, None);
940 drop(watch_cancel_tx);
941 let _ = await_watch_within_grace(&mut watch_handle, shutdown_grace, &reporter).await;
942 serve_result?;
943 Ok(())
944 }
945 };
946
947 // Stop the heartbeat task on every exit path. Done after the watch reap so
948 // the watch-arm `combine_watch_and_drain` already saw the watch outcome,
949 // and the user-shutdown arm has finished its grace-bounded wait. Dropping
950 // the cancel sender breaks the task's `tokio::select! { biased; cancel,
951 // sleep }` loop on the next poll; if the task is wedged for any reason we
952 // hard-abort on grace overrun. The task's outcome is observability only
953 // and cannot influence serving correctness, so its join result is dropped.
954 drop(heartbeat_cancel_tx);
955 if let Some(mut hb_handle) = heartbeat_handle {
956 match tokio::time::timeout(shutdown_grace, &mut hb_handle).await {
957 Ok(Ok(())) => {}
958 Ok(Err(_join_err)) => {} // panic — already counted via catch_unwind
959 Err(_elapsed) => {
960 hb_handle.abort();
961 let _ = (&mut hb_handle).await;
962 }
963 }
964 }
965
966 outcome
967}
968
969/// Convert a `JoinHandle` result into a `ServerError`-typed result.
970///
971/// - `Ok(Ok(()))` — cooperative cancellation: `run_leader_watch` observed its
972/// cancel signal (the `WatchGuard` was dropped, `WatchGuard::shutdown` was
973/// called, or `serve_inner` cancelled it on user shutdown), published
974/// `NotServing`, and returned cleanly. Forwarded verbatim as `Ok(())`.
975/// - `Ok(Err(e))` — task returned an error (including `WatchStreamClosed`
976/// from a clean EOF). Forward verbatim.
977/// - `Err(JoinError)` — task was aborted or panicked. An abort
978/// (`WatchGuard::abort` or `JoinHandle::abort`) maps to Ok (we asked for it);
979/// a panic maps to `WatchPanic` with payload.
980fn join_to_server_result(
981 join_result: Result<Result<(), ServerError>, tokio::task::JoinError>,
982) -> Result<(), ServerError> {
983 match join_result {
984 Ok(inner) => inner,
985 Err(join_err) if join_err.is_panic() => {
986 let payload = panic_payload_to_string(join_err.into_panic());
987 Err(ServerError::WatchPanic {
988 payload,
989 bt: Bt::capture(),
990 })
991 }
992 Err(_cancelled) => Ok(()),
993 }
994}
995
996/// Combine the leader-watch outcome with the tonic graceful-drain outcome
997/// after the watch arm fired.
998///
999/// When the watch task terminates first we trigger the drain and then must
1000/// report a single result. The watch error is the root cause — poisoned
1001/// serving state was already published before the task returned — so it wins
1002/// when both fail. A drain error (port stolen, resource exhaustion) is only
1003/// surfaced when the watch outcome is otherwise `Ok`; previously it was
1004/// discarded via `let _ = serve.await`, hiding a failed drain behind a clean
1005/// shutdown report.
1006///
1007/// Generic over the drain error so the precedence logic is unit-testable
1008/// without fabricating a `tonic::transport::Error` (which has no public
1009/// constructor): production passes `Result<(), tonic::transport::Error>`,
1010/// tests pass `Result<(), ServerError>` via the reflexive `From` impl.
1011fn combine_watch_and_drain<E>(
1012 watch_result: Result<Result<(), ServerError>, tokio::task::JoinError>,
1013 drain_result: Result<(), E>,
1014) -> Result<(), ServerError>
1015where
1016 ServerError: From<E>,
1017{
1018 match join_to_server_result(watch_result) {
1019 Err(watch_err) => Err(watch_err),
1020 Ok(()) => drain_result.map_err(ServerError::from),
1021 }
1022}
1023
1024/// Build the gRPC reflection service from an encoded protobuf descriptor set.
1025///
1026/// Factored out of [`Server::into_router`] so the decode-failure path is unit
1027/// testable: production passes [`tsoracle_proto::FILE_DESCRIPTOR_SET`], while
1028/// tests can feed deliberately corrupt bytes to exercise the error mapping.
1029/// A decode failure becomes [`ServerError::ReflectionInit`] rather than a panic.
1030#[cfg(feature = "reflection")]
1031fn build_reflection_service(
1032 descriptor_set: &[u8],
1033) -> Result<
1034 tonic_reflection::server::v1::ServerReflectionServer<
1035 impl tonic_reflection::server::v1::ServerReflection,
1036 >,
1037 ServerError,
1038> {
1039 tonic_reflection::server::Builder::configure()
1040 .register_encoded_file_descriptor_set(descriptor_set)
1041 .build_v1()
1042 .map_err(ServerError::ReflectionInit)
1043}
1044
1045fn panic_payload_to_string(panic: Box<dyn std::any::Any + Send>) -> String {
1046 if let Some(text) = panic.downcast_ref::<&'static str>() {
1047 (*text).to_string()
1048 } else if let Some(text) = panic.downcast_ref::<String>() {
1049 text.clone()
1050 } else {
1051 "watch task panicked with non-string payload".to_string()
1052 }
1053}
1054
1055#[cfg(any(test, feature = "test-fakes"))]
1056impl Server {
1057 /// Test-only entry point for the leader-watch loop. Exposed to integration
1058 /// tests via the `test-fakes` feature; not part of the stable public API.
1059 #[doc(hidden)]
1060 pub async fn run_leader_watch_for_tests(self: Arc<Self>) -> Result<(), ServerError> {
1061 // A never-resolving cancel future: these tests drive termination via
1062 // leadership events or `JoinHandle::abort`, not cooperative cancel.
1063 crate::fence::run_leader_watch(self, futures::future::pending()).await
1064 }
1065
1066 /// Test-only allocator probe. Issues a window grant against the current
1067 /// in-memory state without going through the gRPC service. Used by
1068 /// regression tests that need to observe the behavioral fence (no
1069 /// timestamp at or below the prior leader's high-water) directly.
1070 #[doc(hidden)]
1071 pub fn try_grant_for_tests(&self, count: u32) -> Result<WindowGrant, CoreError> {
1072 self.core.try_grant(self.clock.now_ms(), count)
1073 }
1074}
1075
1076#[cfg(test)]
1077mod tests {
1078 use super::*;
1079
1080 #[test]
1081 fn panic_payload_to_string_recovers_static_str() {
1082 // `panic!("literal")` produces a `&'static str` payload; we want the
1083 // verbatim text so operators see what the watch task said.
1084 let payload: Box<dyn std::any::Any + Send> = Box::new("watch boom");
1085 assert_eq!(panic_payload_to_string(payload), "watch boom");
1086 }
1087
1088 #[test]
1089 fn panic_payload_to_string_recovers_owned_string() {
1090 // `panic!("{var}")` produces a `String` payload (formatted at panic
1091 // time); the helper must downcast that branch too.
1092 let payload: Box<dyn std::any::Any + Send> = Box::new(String::from("formatted"));
1093 assert_eq!(panic_payload_to_string(payload), "formatted");
1094 }
1095
1096 #[test]
1097 fn panic_payload_to_string_falls_back_for_other_types() {
1098 // Custom payloads (panic!(MyType { .. })) hit the catch-all branch.
1099 struct Custom;
1100 let payload: Box<dyn std::any::Any + Send> = Box::new(Custom);
1101 assert_eq!(
1102 panic_payload_to_string(payload),
1103 "watch task panicked with non-string payload",
1104 );
1105 }
1106
1107 #[test]
1108 fn serving_transitions_publish_through_core() {
1109 // The Server delegates serving-state transitions to its ServingCore; a
1110 // step_down on a freshly built Server lands as NotServing with the hint.
1111 // (The #346 send_replace-with-zero-receivers regression is pinned by the
1112 // ServingCore unit tests, which can observe `receiver_count` directly.)
1113 let server = Server::builder()
1114 .consensus_driver(Arc::new(crate::test_fakes::InMemoryDriver::new()))
1115 .build()
1116 .expect("build must succeed");
1117
1118 let hint = PeerEndpoint::try_from("new-leader:9000").unwrap();
1119 server.core.step_down(Some(hint.clone()), Some(Epoch(7)));
1120
1121 match server.core.serving_state() {
1122 ServingState::NotServing {
1123 leader_endpoint,
1124 leader_epoch,
1125 } => {
1126 assert_eq!(leader_endpoint, Some(hint));
1127 assert_eq!(leader_epoch, Some(Epoch(7)));
1128 }
1129 ServingState::Serving => panic!("expected NotServing after step_down"),
1130 }
1131 }
1132
1133 #[cfg(any(feature = "tls-rustls", feature = "tls-native"))]
1134 #[test]
1135 fn builder_stores_tls_config() {
1136 // The serve_* paths read `tls_config` from `Server` (not the builder)
1137 // after `into_router` consumes self — so the field must survive the
1138 // builder → Server hand-off, not just the builder method.
1139 use crate::test_fakes::InMemoryDriver;
1140
1141 let driver = Arc::new(InMemoryDriver::new());
1142 let cfg = tonic::transport::ServerTlsConfig::new();
1143 let server = Server::builder()
1144 .consensus_driver(driver)
1145 .tls_config(cfg)
1146 .build()
1147 .expect("build with tls_config must succeed");
1148 assert!(server.tls_config.is_some());
1149 }
1150
1151 #[test]
1152 fn build_rejects_zero_max_seq_count() {
1153 // max_seq_count(0) makes every positive `count` fail as
1154 // SeqCountTooLarge{max:0} (the count>=1 floor leaves no valid value) —
1155 // GetSeq is silently dead. build() must fail-fast rather than ship a
1156 // server where GetSeq can never succeed.
1157 // `Server` is not `Debug`, so match rather than `expect_err`.
1158 match Server::builder()
1159 .consensus_driver(Arc::new(crate::test_fakes::InMemoryDriver::new()))
1160 .max_seq_count(0)
1161 .build()
1162 {
1163 Err(BuildError::ZeroMaxSeqCount) => {}
1164 Err(other) => panic!("expected ZeroMaxSeqCount, got {other:?}"),
1165 Ok(_) => panic!("max_seq_count(0) must be rejected at build, but build succeeded"),
1166 }
1167 }
1168
1169 #[test]
1170 fn build_accepts_max_seq_count_of_one() {
1171 // 1 is the smallest usable cap: a single-ordinal block is valid, so the
1172 // floor and the cap coincide and GetSeq still works. Must build.
1173 Server::builder()
1174 .consensus_driver(Arc::new(crate::test_fakes::InMemoryDriver::new()))
1175 .max_seq_count(1)
1176 .build()
1177 .expect("max_seq_count(1) must build");
1178 }
1179
1180 #[test]
1181 fn build_accepts_max_seq_batch_keys_of_one() {
1182 // 1 is the smallest usable batch cap: a single-entry batch is valid, so
1183 // the cap must build (the guard rejects only 0).
1184 Server::builder()
1185 .consensus_driver(Arc::new(crate::test_fakes::InMemoryDriver::new()))
1186 .max_seq_batch_keys(1)
1187 .build()
1188 .expect("max_seq_batch_keys(1) must build");
1189 }
1190
1191 #[test]
1192 fn zero_max_seq_batch_keys_is_rejected() {
1193 // max_seq_batch_keys(0) makes every GetSeqBatch call fail as
1194 // "batch has N entries; the maximum is 0" — the RPC is silently dead.
1195 // build() must fail-fast rather than ship a server where GetSeqBatch
1196 // can never succeed.
1197 match Server::builder()
1198 .consensus_driver(Arc::new(crate::test_fakes::InMemoryDriver::new()))
1199 .max_seq_batch_keys(0)
1200 .build()
1201 {
1202 Err(BuildError::ZeroMaxSeqBatchKeys) => {}
1203 Err(other) => panic!("expected ZeroMaxSeqBatchKeys, got {other:?}"),
1204 Ok(_) => panic!("max_seq_batch_keys(0) must be rejected at build, but build succeeded"),
1205 }
1206 }
1207
1208 #[tokio::test]
1209 async fn join_to_server_result_passes_through_clean_outcome() {
1210 // Ok(Ok(())) — task finished cleanly; forward verbatim.
1211 let handle = tokio::spawn(async { Ok::<(), ServerError>(()) });
1212 let join = handle.await;
1213 assert!(matches!(join_to_server_result(join), Ok(())));
1214 }
1215
1216 #[tokio::test]
1217 async fn join_to_server_result_forwards_inner_error() {
1218 // Ok(Err(e)) — task returned an error; forward it.
1219 let handle = tokio::spawn(async {
1220 Err::<(), ServerError>(ServerError::WatchPanic {
1221 payload: "synthetic".into(),
1222 bt: Bt::capture(),
1223 })
1224 });
1225 let join = handle.await;
1226 match join_to_server_result(join) {
1227 Err(ServerError::WatchPanic { payload, .. }) => assert_eq!(payload, "synthetic"),
1228 other => panic!("expected forwarded WatchPanic, got {other:?}"),
1229 }
1230 }
1231
1232 #[tokio::test]
1233 async fn join_to_server_result_translates_panic_to_watch_panic() {
1234 // Err(JoinError::is_panic) — task panicked; surface as WatchPanic with
1235 // the payload stringified by `panic_payload_to_string`.
1236 let handle = tokio::spawn(async {
1237 panic!("intentional");
1238 #[allow(unreachable_code)]
1239 Ok::<(), ServerError>(())
1240 });
1241 let join = handle.await;
1242 match join_to_server_result(join) {
1243 Err(ServerError::WatchPanic { payload, .. }) => {
1244 assert!(payload.contains("intentional"))
1245 }
1246 other => panic!("expected WatchPanic, got {other:?}"),
1247 }
1248 }
1249
1250 #[tokio::test]
1251 async fn join_to_server_result_treats_cancellation_as_clean_exit() {
1252 // Err(JoinError::is_cancelled) — caller aborted the task; we asked
1253 // for that, so map to Ok.
1254 let handle: tokio::task::JoinHandle<Result<(), ServerError>> =
1255 tokio::spawn(async { futures::future::pending().await });
1256 handle.abort();
1257 let join = handle.await;
1258 assert!(matches!(join_to_server_result(join), Ok(())));
1259 }
1260
1261 #[tokio::test]
1262 async fn combine_watch_and_drain_surfaces_drain_error_when_watch_ok() {
1263 // Watch ended cleanly but the graceful drain failed (port stolen,
1264 // resource exhaustion). The drain error must not be swallowed.
1265 let watch = tokio::spawn(async { Ok::<(), ServerError>(()) }).await;
1266 let drain: Result<(), ServerError> = Err(ServerError::WatchStreamClosed);
1267 assert!(matches!(
1268 combine_watch_and_drain(watch, drain),
1269 Err(ServerError::WatchStreamClosed)
1270 ));
1271 }
1272
1273 #[tokio::test]
1274 async fn combine_watch_and_drain_returns_ok_when_both_succeed() {
1275 // Watch clean, drain clean — the only fully-Ok outcome.
1276 let watch = tokio::spawn(async { Ok::<(), ServerError>(()) }).await;
1277 let drain: Result<(), ServerError> = Ok(());
1278 assert!(matches!(combine_watch_and_drain(watch, drain), Ok(())));
1279 }
1280
1281 #[tokio::test]
1282 async fn combine_watch_and_drain_prefers_watch_error_over_drain_error() {
1283 // Both failed. The watch error is the root cause (poisoned state was
1284 // already published), so it wins; the drain error is dropped.
1285 let watch = tokio::spawn(async {
1286 Err::<(), ServerError>(ServerError::WatchPanic {
1287 payload: "watch".into(),
1288 bt: Bt::capture(),
1289 })
1290 })
1291 .await;
1292 let drain: Result<(), ServerError> = Err(ServerError::WatchStreamClosed);
1293 match combine_watch_and_drain(watch, drain) {
1294 Err(ServerError::WatchPanic { payload, .. }) => assert_eq!(payload, "watch"),
1295 other => panic!("expected watch error to win, got {other:?}"),
1296 }
1297 }
1298
1299 #[tokio::test]
1300 async fn combine_watch_and_drain_returns_watch_error_when_drain_ok() {
1301 // Watch failed, drain succeeded — forward the watch error verbatim.
1302 let watch = tokio::spawn(async {
1303 Err::<(), ServerError>(ServerError::WatchPanic {
1304 payload: "watch".into(),
1305 bt: Bt::capture(),
1306 })
1307 })
1308 .await;
1309 let drain: Result<(), ServerError> = Ok(());
1310 match combine_watch_and_drain(watch, drain) {
1311 Err(ServerError::WatchPanic { payload, .. }) => assert_eq!(payload, "watch"),
1312 other => panic!("expected forwarded watch error, got {other:?}"),
1313 }
1314 }
1315
1316 #[tokio::test]
1317 async fn dropping_watch_guard_closes_serving_gate_synchronously() {
1318 // Regression: dropping the guard must close the serving gate at the
1319 // drop site, not on the watch task's later timeline. Build a guard whose
1320 // watch handle never touches the core (so the ONLY thing that can flip
1321 // the state to NotServing is `Drop`), publish `Serving`, then drop the
1322 // guard and read the state with NO await in between. On the current-thread
1323 // test runtime no other task can run between the synchronous `drop` and
1324 // the synchronous `serving_state` read, so observing `NotServing` proves
1325 // `Drop` closed the gate synchronously rather than the watch task.
1326 let core = Arc::new(ServingCore::new(
1327 Duration::from_secs(3),
1328 tsoracle_core::DEFAULT_MAX_SEQ_COUNT,
1329 tsoracle_core::DEFAULT_MAX_SEQ_BATCH_KEYS,
1330 ));
1331 core.publish_serving();
1332
1333 let handle: tokio::task::JoinHandle<Result<(), ServerError>> =
1334 tokio::spawn(async { Ok(()) });
1335 let (cancel_tx, _cancel_rx) = tokio::sync::oneshot::channel::<()>();
1336 let guard = WatchGuard {
1337 cancel_tx: Some(cancel_tx),
1338 handle: Some(handle),
1339 shutdown_grace: Duration::from_secs(10),
1340 core: core.clone(),
1341 reporter: Arc::new(crate::reporter::Reporter::new()),
1342 heartbeat_cancel_tx: None,
1343 heartbeat_handle: None,
1344 };
1345
1346 drop(guard);
1347
1348 assert!(
1349 matches!(core.serving_state(), ServingState::NotServing { .. }),
1350 "dropping the WatchGuard must close the serving gate synchronously",
1351 );
1352 }
1353
1354 #[tokio::test]
1355 async fn serve_inner_closes_serving_gate_on_user_shutdown() {
1356 // Regression for the serve path: when the caller's shutdown fires,
1357 // `serve_inner` drops the watch cancel sender and waits out the grace,
1358 // forcibly aborting the watch task if it overruns. A task parked upstream
1359 // of any cancel-observing await (modelled here by a never-resolving
1360 // future) is aborted before it can publish `NotServing`, so the gate
1361 // would stay open unless `serve_inner` closes it itself. Seed `Serving`,
1362 // run the user-shutdown arm with a zero grace (immediate abort), and
1363 // assert the gate is closed on return.
1364 let core = Arc::new(ServingCore::new(
1365 Duration::from_secs(3),
1366 tsoracle_core::DEFAULT_MAX_SEQ_COUNT,
1367 tsoracle_core::DEFAULT_MAX_SEQ_BATCH_KEYS,
1368 ));
1369 core.publish_serving();
1370
1371 let watch_handle: tokio::task::JoinHandle<Result<(), ServerError>> =
1372 tokio::spawn(async { futures::future::pending().await });
1373 let (watch_cancel_tx, _watch_cancel_rx) = tokio::sync::oneshot::channel::<()>();
1374 let (tonic_cancel_tx, _tonic_cancel_rx) = tokio::sync::oneshot::channel::<()>();
1375
1376 // A serve future that is immediately ready models the user's shutdown
1377 // having fired; with the biased select preferring the (pending) watch arm,
1378 // control reaches the user-shutdown arm.
1379 let serve_future = async { Ok::<(), tonic::transport::Error>(()) };
1380
1381 let result = serve_inner(
1382 watch_cancel_tx,
1383 watch_handle,
1384 None, // heartbeat_cancel_tx — heartbeat disabled in this regression test
1385 None, // heartbeat_handle
1386 serve_future,
1387 tonic_cancel_tx,
1388 Duration::from_millis(0),
1389 core.clone(),
1390 Arc::new(crate::reporter::Reporter::new()),
1391 )
1392 .await;
1393
1394 assert!(
1395 result.is_ok(),
1396 "user shutdown must return Ok, got {result:?}"
1397 );
1398 assert!(
1399 matches!(core.serving_state(), ServingState::NotServing { .. }),
1400 "serve_inner's user-shutdown arm must close the serving gate synchronously",
1401 );
1402 }
1403
1404 #[cfg(feature = "reflection")]
1405 #[test]
1406 fn build_reflection_service_accepts_embedded_descriptor_set() {
1407 // The descriptor set emitted by `tsoracle-proto`'s build.rs must decode
1408 // cleanly — this is the production happy path that previously sat behind
1409 // an `expect`.
1410 assert!(build_reflection_service(tsoracle_proto::FILE_DESCRIPTOR_SET).is_ok());
1411 }
1412
1413 #[cfg(feature = "reflection")]
1414 #[test]
1415 fn build_reflection_service_maps_corrupt_descriptor_to_typed_error() {
1416 // A descriptor set that fails to decode (build artifact drift) must
1417 // surface as a typed `ServerError::ReflectionInit`, not a panic. The
1418 // bytes below are not a valid encoded `FileDescriptorSet`.
1419 let corrupt = b"\xff\xff\xff\xff not a descriptor set";
1420 // The Ok variant wraps a reflection service that is not `Debug`, so map
1421 // to a unit result before asserting on the error variant.
1422 match build_reflection_service(corrupt).map(|_| ()) {
1423 Err(ServerError::ReflectionInit(_)) => {}
1424 other => panic!("expected ReflectionInit error, got {other:?}"),
1425 }
1426 }
1427}