tsoracle_server/server.rs
1//
2// ░▀█▀░█▀▀░█▀█░█▀▄░█▀█░█▀▀░█░░░█▀▀
3// ░░█░░▀▀█░█░█░█▀▄░█▀█░█░░░█░░░█▀▀
4// ░░▀░░▀▀▀░▀▀▀░▀░▀░▀░▀░▀▀▀░▀▀▀░▀▀▀
5//
6// tsoracle — Distributed Timestamp Oracle
7//
8// Copyright (c) 2026 Prisma Risk
9// Licensed under the Apache License, Version 2.0
10// https://github.com/prisma-risk/tsoracle
11//
12
13use core::time::Duration;
14use parking_lot::Mutex;
15use std::future::Future;
16use std::net::SocketAddr;
17use std::sync::Arc;
18use tokio::sync::watch;
19use tonic::service::Routes;
20use tonic::transport::Server as TonicServer;
21use tsoracle_consensus::ConsensusDriver;
22use tsoracle_core::{Allocator, Bt, Clock, Epoch, SystemClock};
23#[cfg(any(test, feature = "test-fakes"))]
24use tsoracle_core::{CoreError, WindowGrant};
25use tsoracle_proto::v1::tso_service_server::TsoServiceServer;
26
27use crate::service::TsoServiceImpl;
28
29#[derive(Debug, thiserror::Error)]
30pub enum BuildError {
31 #[error("consensus_driver is required")]
32 MissingConsensusDriver,
33 /// Surfaced when [`tsoracle_proto::v1::LEADER_HINT_TRAILER_KEY`] fails [`crate::leader_hint::validate_key`].
34 /// Today the key is a valid `const &'static str`, so this variant is
35 /// developer-error insurance: a future edit that breaks the key triggers
36 /// a startup failure rather than silently stripping the trailer from
37 /// every NOT_LEADER response.
38 #[error("invalid leader-hint metadata key: {0}")]
39 InvalidLeaderHintKey(#[from] tonic::metadata::errors::InvalidMetadataKey),
40}
41
42#[derive(Debug, thiserror::Error)]
43pub enum ServerError {
44 #[error("transport: {0}")]
45 Transport(#[from] tonic::transport::Error),
46 #[error("consensus: {0}")]
47 Consensus(#[from] tsoracle_consensus::ConsensusError),
48 #[error("core: {0}")]
49 Core(#[from] tsoracle_core::CoreError),
50 /// The leader-watch task panicked. Distinct from a clean error return so
51 /// operators can tell "driver returned Err" (recoverable design) from
52 /// "task panicked" (programming bug).
53 #[error("leader-watch task panicked: {payload}{bt}")]
54 WatchPanic { payload: String, bt: Bt },
55 /// The consensus driver's `leadership_events()` stream ended cleanly while
56 /// the leader-watch task was running. The stream is contracted to live for
57 /// the life of the server, so its end is anomalous (driver shutdown, lost
58 /// session, etc.) — distinct from a `Consensus` error returned mid-fence.
59 /// The watch task publishes `ServingState::NotServing` before returning
60 /// this variant so embedders who never observe the `JoinHandle` still get
61 /// the documented fail-safe behavior.
62 #[error("consensus driver leadership stream closed")]
63 WatchStreamClosed,
64 /// The embedded protobuf descriptor set failed to decode while building the
65 /// gRPC reflection service. `tsoracle-proto`'s `build.rs` emits these bytes
66 /// from checked-in `.proto` sources, so a failure here signals build-artifact
67 /// drift (a corrupt or stale descriptor) rather than a runtime condition —
68 /// surfaced as a diagnosable startup error instead of a process panic.
69 #[cfg(feature = "reflection")]
70 #[error("failed to build gRPC reflection service from embedded descriptor set: {0}")]
71 ReflectionInit(#[source] tonic_reflection::server::Error),
72}
73
74#[derive(Clone, Debug)]
75pub enum ServingState {
76 NotServing {
77 leader_endpoint: Option<String>,
78 leader_epoch: Option<Epoch>,
79 },
80 Serving,
81}
82
83pub struct ServerBuilder {
84 consensus: Option<Arc<dyn ConsensusDriver>>,
85 clock: Option<Arc<dyn Clock>>,
86 window_ahead: Duration,
87 failover_advance: Duration,
88 #[cfg(any(feature = "tls-rustls", feature = "tls-native"))]
89 tls_config: Option<tonic::transport::ServerTlsConfig>,
90}
91
92impl Default for ServerBuilder {
93 fn default() -> Self {
94 ServerBuilder {
95 consensus: None,
96 clock: None,
97 window_ahead: Duration::from_secs(3),
98 failover_advance: Duration::from_secs(1),
99 #[cfg(any(feature = "tls-rustls", feature = "tls-native"))]
100 tls_config: None,
101 }
102 }
103}
104
105impl ServerBuilder {
106 pub fn consensus_driver(mut self, driver: Arc<dyn ConsensusDriver>) -> Self {
107 self.consensus = Some(driver);
108 self
109 }
110 pub fn clock(mut self, clock: Arc<dyn Clock>) -> Self {
111 self.clock = Some(clock);
112 self
113 }
114 pub fn window_ahead(mut self, window_ahead: Duration) -> Self {
115 self.window_ahead = window_ahead;
116 self
117 }
118 pub fn failover_advance(mut self, failover_advance: Duration) -> Self {
119 self.failover_advance = failover_advance;
120 self
121 }
122
123 /// Configure TLS termination for this server. Applied inside
124 /// [`Server::serve`], [`Server::serve_with_shutdown`], and
125 /// [`Server::serve_with_listener`]. Not applied to [`Server::into_router`] —
126 /// embedders mounting tsoracle alongside their own services control TLS
127 /// on their own tonic builder.
128 #[cfg(any(feature = "tls-rustls", feature = "tls-native"))]
129 pub fn tls_config(mut self, cfg: tonic::transport::ServerTlsConfig) -> Self {
130 self.tls_config = Some(cfg);
131 self
132 }
133
134 pub fn build(self) -> Result<Server, BuildError> {
135 crate::leader_hint::validate_key()?;
136 let consensus = self.consensus.ok_or(BuildError::MissingConsensusDriver)?;
137 let clock = self.clock.unwrap_or_else(|| Arc::new(SystemClock));
138 let (state_tx, state_rx) = watch::channel(ServingState::NotServing {
139 leader_endpoint: None,
140 leader_epoch: None,
141 });
142 Ok(Server {
143 consensus,
144 clock,
145 window_ahead: self.window_ahead,
146 failover_advance: self.failover_advance,
147 allocator: Arc::new(Mutex::new(Allocator::new())),
148 state_tx,
149 state_rx,
150 extension_lock: Arc::new(tokio::sync::Mutex::new(())),
151 extension_gate: Arc::new(tokio::sync::RwLock::new(())),
152 #[cfg(any(feature = "tls-rustls", feature = "tls-native"))]
153 tls_config: self.tls_config,
154 })
155 }
156}
157
158pub struct Server {
159 pub(crate) consensus: Arc<dyn ConsensusDriver>,
160 pub(crate) clock: Arc<dyn Clock>,
161 pub(crate) window_ahead: Duration,
162 pub(crate) failover_advance: Duration,
163 pub(crate) allocator: Arc<Mutex<Allocator>>,
164 pub(crate) state_tx: watch::Sender<ServingState>,
165 pub state_rx: watch::Receiver<ServingState>,
166 /// Serializes window extensions so a stampeding burst of `WindowExhausted`
167 /// requests resolves to a single `persist_high_water` round-trip. Acquired
168 /// before `extension_gate`; combined with a recheck-after-acquire inside
169 /// `extend_window`, latecomers find the window already extended and
170 /// return without contacting consensus.
171 pub(crate) extension_lock: Arc<tokio::sync::Mutex<()>>,
172 /// Read-locked by window-extension calls for the duration of their
173 /// prepare → persist → commit dance. The leader-watch task takes a
174 /// write lock between clearing serving and calling load_high_water,
175 /// which drains all in-flight extensions started under the prior epoch
176 /// before the fence proceeds.
177 pub(crate) extension_gate: Arc<tokio::sync::RwLock<()>>,
178 #[cfg(any(feature = "tls-rustls", feature = "tls-native"))]
179 pub(crate) tls_config: Option<tonic::transport::ServerTlsConfig>,
180}
181
182impl Server {
183 pub fn builder() -> ServerBuilder {
184 ServerBuilder::default()
185 }
186
187 /// Single transition API used in response to evidence that the current
188 /// epoch is no longer valid: consensus rejection (NotLeader/Fenced
189 /// during persist), leader-watch task termination, or other authoritative
190 /// signals of leadership loss.
191 ///
192 /// Clears the allocator BEFORE publishing `NotServing` so a racing
193 /// `try_grant` either (a) observes `CoreError::NotLeader` rather than
194 /// handing out a stale-epoch grant, or (b) succeeds against a still-valid
195 /// epoch and the *next* call observes `NotServing`. Either ordering is
196 /// safe; what is not safe is the inverse (publish first, clear later)
197 /// because a request between those two steps would see `Serving` AND a
198 /// still-leader allocator.
199 ///
200 /// Does NOT take `extension_gate.write()`. That would deadlock against
201 /// in-flight extensions currently holding the read lock and awaiting
202 /// `persist_high_water`. Those in-flights will either complete cleanly
203 /// (the next `try_grant` then sees `NotServing`) or fail with
204 /// NotLeader/Fenced (and reach this helper themselves — it is idempotent).
205 ///
206 /// `leader_epoch` carries the authoritative new-leader epoch when the
207 /// rejection reveals it — e.g. `ConsensusError::Fenced { current, .. }`
208 /// names the epoch that fenced us. Publishing it lets the resulting
209 /// `NotServing` hint redirect clients with an epoch to validate against;
210 /// callers with no epoch to offer (stream closure, panic) pass `None`.
211 pub(crate) fn step_down_due_to_consensus_rejection(
212 &self,
213 leader_endpoint: Option<String>,
214 leader_epoch: Option<Epoch>,
215 ) {
216 self.allocator.lock().on_leadership_lost();
217 let _ = self.state_tx.send(ServingState::NotServing {
218 leader_endpoint,
219 leader_epoch,
220 });
221 }
222}
223
224impl Server {
225 /// Return the configured `TsoServiceServer<TsoServiceImpl>` as a tonic
226 /// `Routes` value plus a `JoinHandle` for the spawned leader-watch task,
227 /// so callers can mount tsoracle's service alongside their own services
228 /// on a shared tonic listener instead of binding a dedicated port.
229 ///
230 /// The `JoinHandle` payload is `Result<(), ServerError>` so embedders
231 /// can observe leader-watch termination. The task never returns
232 /// `Ok(())`: every termination — driver error, panic, or clean EOF on
233 /// the leadership stream (surfaced as `ServerError::WatchStreamClosed`)
234 /// — publishes `ServingState::NotServing { leader_endpoint: None }`
235 /// before returning, so all subsequent RPCs fail fast with
236 /// `FAILED_PRECONDITION`. Embedders who never inspect the handle still
237 /// get fail-safe behavior.
238 ///
239 /// The `Server::serve()` method is a thin wrapper over this — it calls
240 /// `into_router`, builds a tonic `Server`, and binds a listener.
241 ///
242 /// Returns `Err(ServerError::ReflectionInit)` (only reachable under the
243 /// `reflection` feature) if the embedded descriptor set fails to decode.
244 /// That decode happens before the leader-watch task is spawned, so a failure
245 /// leaves nothing running to clean up.
246 pub fn into_router(
247 self,
248 ) -> Result<(Routes, tokio::task::JoinHandle<Result<(), ServerError>>), ServerError> {
249 // Build the reflection service first: a descriptor-decode failure must
250 // surface before we spawn the leader-watch task below, so an error path
251 // never leaks a running task.
252 #[cfg(feature = "reflection")]
253 let reflection = build_reflection_service(tsoracle_proto::FILE_DESCRIPTOR_SET)?;
254
255 let server = Arc::new(self);
256
257 let watch_server = server.clone();
258 let watch_handle = tokio::spawn(async move {
259 use futures::FutureExt;
260 // catch_unwind so a panic in run_leader_watch still routes through
261 // the poisoning path. Without this, embedders who mount into_router
262 // directly and never observe the JoinHandle would see
263 // ServingState::Serving remain published while the watch task is
264 // dead — the inverse of the fail-safe guarantee documented above.
265 // The panic is re-raised after poisoning so serve / serve_with_*
266 // continue to translate it into ServerError::WatchPanic via
267 // join_to_server_result.
268 let outcome =
269 std::panic::AssertUnwindSafe(crate::fence::run_leader_watch(watch_server.clone()))
270 .catch_unwind()
271 .await;
272 match outcome {
273 Ok(result) => {
274 if let Err(ref _e) = result {
275 // Poison BEFORE returning so embedders who do not observe
276 // the JoinHandle still get fail-safe behavior.
277 watch_server.step_down_due_to_consensus_rejection(None, None);
278 #[cfg(feature = "tracing")]
279 tracing::error!(error = %_e, "leader-watch terminated; serving disabled");
280 }
281 result
282 }
283 Err(panic_payload) => {
284 // Mirror the Err branch: poison BEFORE re-raising so
285 // handle-dropping embedders still observe NotServing.
286 watch_server.step_down_due_to_consensus_rejection(None, None);
287 #[cfg(feature = "tracing")]
288 tracing::error!("leader-watch panicked; serving disabled");
289 std::panic::resume_unwind(panic_payload);
290 }
291 }
292 });
293
294 let service = TsoServiceImpl { server };
295 #[allow(unused_mut)]
296 let mut routes = Routes::new(TsoServiceServer::new(service));
297 #[cfg(feature = "reflection")]
298 {
299 routes = routes.add_service(reflection);
300 }
301 Ok((routes, watch_handle))
302 }
303
304 pub async fn serve(self, addr: SocketAddr) -> Result<(), ServerError> {
305 self.serve_with_shutdown(addr, futures::future::pending())
306 .await
307 }
308
309 /// Run the gRPC server until either the caller's `shutdown` fires or the
310 /// leader-watch task terminates.
311 ///
312 /// Three outcomes:
313 /// 1. `shutdown` fires first → tonic drains in-flights and returns Ok.
314 /// The watch handle is aborted; any error it had been about to return
315 /// is forfeited (the process is shutting down anyway).
316 /// 2. Watch returns `Ok(Err(e))` → poisoned state is already published;
317 /// `cancel_tx` triggers tonic's graceful shutdown; in-flight `GetTs`
318 /// calls whose `try_grant` already succeeded complete with the
319 /// timestamps they were allocated; new calls fail fast. Returns `Err(e)`
320 /// — the watch error wins even if the drain itself also errors (see
321 /// `combine_watch_and_drain`); a drain error is surfaced only when the
322 /// watch ended cleanly.
323 /// 3. Watch task panics → returns `Err(ServerError::WatchPanic{..})`
324 /// with the panic payload stringified. Same drain semantics as (2).
325 pub async fn serve_with_shutdown(
326 self,
327 addr: SocketAddr,
328 shutdown: impl Future<Output = ()> + Send + 'static,
329 ) -> Result<(), ServerError> {
330 #[cfg(any(feature = "tls-rustls", feature = "tls-native"))]
331 let tls_config = self.tls_config.clone();
332
333 let (routes, mut watch_handle) = self.into_router()?;
334 let (cancel_tx, cancel_rx) = tokio::sync::oneshot::channel::<()>();
335
336 // tonic stops when EITHER the user's shutdown fires OR we cancel
337 // because the watch task terminated.
338 let combined_shutdown = async move {
339 tokio::select! {
340 _ = shutdown => {}
341 _ = cancel_rx => {}
342 }
343 };
344
345 let mut tonic = TonicServer::builder();
346 #[cfg(any(feature = "tls-rustls", feature = "tls-native"))]
347 if let Some(cfg) = tls_config {
348 tonic = tonic.tls_config(cfg).map_err(ServerError::Transport)?;
349 }
350 let serve = tonic
351 .add_routes(routes)
352 .serve_with_shutdown(addr, combined_shutdown);
353 tokio::pin!(serve);
354
355 tokio::select! {
356 // Bias toward the watch arm: if both are ready in the same poll
357 // (rare but possible — graceful shutdown completed in the same
358 // tick the watch returned), we want to surface the watch error
359 // rather than report a clean shutdown.
360 biased;
361
362 watch_result = &mut watch_handle => {
363 // Watch terminated. State is already poisoned (see watch
364 // task body in into_router). Trigger tonic drain, wait for
365 // it to finish, then report the watch's outcome — preferring
366 // it over any drain error, which surfaces only if the watch
367 // itself ended cleanly.
368 let _ = cancel_tx.send(());
369 let drain_result = serve.await;
370 combine_watch_and_drain(watch_result, drain_result)
371 }
372 serve_result = &mut serve => {
373 // User shutdown fired (or our cancel — but watch arm has
374 // `biased` priority, so reaching here means user shutdown).
375 // The watch task may still be running; aborting it loses
376 // any error it was about to report, but the process is
377 // shutting down so that's acceptable.
378 watch_handle.abort();
379 serve_result?;
380 Ok(())
381 }
382 }
383 }
384
385 /// Run the gRPC server on a caller-provided `TcpListener` until either
386 /// the caller-provided `shutdown` fires or the leader-watch task terminates.
387 ///
388 /// Use this instead of [`Self::serve_with_shutdown`] when you need to
389 /// observe the OS-picked port (`127.0.0.1:0`) before clients connect, or
390 /// when you want to wrap the listener in an outer adapter before passing it
391 /// in. The listening socket is owned by the caller and passed here; tsoracle
392 /// starts accepting on it immediately.
393 ///
394 /// Three outcomes:
395 /// 1. `shutdown` fires first → tonic drains in-flights and returns `Ok`.
396 /// The watch handle is aborted; any error it had been about to return
397 /// is forfeited (the process is shutting down anyway).
398 /// 2. Watch returns `Ok(Err(e))` → poisoned state is already published;
399 /// the caller-provided shutdown is cancelled internally so tonic begins
400 /// graceful shutdown; in-flight `GetTs` calls whose `try_grant` already
401 /// succeeded complete with the timestamps they were allocated; new calls
402 /// fail fast. Returns `Err(e)` — the watch error wins even if the drain
403 /// itself also errors (see `combine_watch_and_drain`); a drain error is
404 /// surfaced only when the watch ended cleanly.
405 /// 3. Watch task panics → returns `Err(ServerError::WatchPanic{..})`
406 /// with the panic payload stringified. Same drain semantics as (2).
407 pub async fn serve_with_listener(
408 self,
409 listener: tokio::net::TcpListener,
410 shutdown: impl Future<Output = ()> + Send + 'static,
411 ) -> Result<(), ServerError> {
412 #[cfg(any(feature = "tls-rustls", feature = "tls-native"))]
413 let tls_config = self.tls_config.clone();
414
415 let (routes, mut watch_handle) = self.into_router()?;
416 let (cancel_tx, cancel_rx) = tokio::sync::oneshot::channel::<()>();
417
418 let combined_shutdown = async move {
419 tokio::select! {
420 _ = shutdown => {}
421 _ = cancel_rx => {}
422 }
423 };
424
425 let incoming = tonic::transport::server::TcpIncoming::from(listener);
426
427 let mut tonic = TonicServer::builder();
428 #[cfg(any(feature = "tls-rustls", feature = "tls-native"))]
429 if let Some(cfg) = tls_config {
430 tonic = tonic.tls_config(cfg).map_err(ServerError::Transport)?;
431 }
432 let serve = tonic
433 .add_routes(routes)
434 .serve_with_incoming_shutdown(incoming, combined_shutdown);
435 tokio::pin!(serve);
436
437 tokio::select! {
438 biased;
439
440 watch_result = &mut watch_handle => {
441 let _ = cancel_tx.send(());
442 let drain_result = serve.await;
443 combine_watch_and_drain(watch_result, drain_result)
444 }
445 serve_result = &mut serve => {
446 watch_handle.abort();
447 serve_result?;
448 Ok(())
449 }
450 }
451 }
452}
453
454/// Convert a `JoinHandle` result into a `ServerError`-typed result.
455///
456/// - `Ok(Ok(()))` — unreachable in production: `run_leader_watch` only
457/// returns from its loop via the `WatchStreamClosed` branch. Forwarded
458/// verbatim so the conversion remains total for test helpers that spawn
459/// no-op join futures.
460/// - `Ok(Err(e))` — task returned an error (including `WatchStreamClosed`
461/// from a clean EOF). Forward verbatim.
462/// - `Err(JoinError)` — task was cancelled or panicked. Cancellation maps to
463/// Ok (we asked for it); panic maps to `WatchPanic` with payload.
464fn join_to_server_result(
465 join_result: Result<Result<(), ServerError>, tokio::task::JoinError>,
466) -> Result<(), ServerError> {
467 match join_result {
468 Ok(inner) => inner,
469 Err(join_err) if join_err.is_panic() => {
470 let payload = panic_payload_to_string(join_err.into_panic());
471 Err(ServerError::WatchPanic {
472 payload,
473 bt: Bt::capture(),
474 })
475 }
476 Err(_cancelled) => Ok(()),
477 }
478}
479
480/// Combine the leader-watch outcome with the tonic graceful-drain outcome
481/// after the watch arm fired.
482///
483/// When the watch task terminates first we trigger the drain and then must
484/// report a single result. The watch error is the root cause — poisoned
485/// serving state was already published before the task returned — so it wins
486/// when both fail. A drain error (port stolen, resource exhaustion) is only
487/// surfaced when the watch outcome is otherwise `Ok`; previously it was
488/// discarded via `let _ = serve.await`, hiding a failed drain behind a clean
489/// shutdown report.
490///
491/// Generic over the drain error so the precedence logic is unit-testable
492/// without fabricating a `tonic::transport::Error` (which has no public
493/// constructor): production passes `Result<(), tonic::transport::Error>`,
494/// tests pass `Result<(), ServerError>` via the reflexive `From` impl.
495fn combine_watch_and_drain<E>(
496 watch_result: Result<Result<(), ServerError>, tokio::task::JoinError>,
497 drain_result: Result<(), E>,
498) -> Result<(), ServerError>
499where
500 ServerError: From<E>,
501{
502 match join_to_server_result(watch_result) {
503 Err(watch_err) => Err(watch_err),
504 Ok(()) => drain_result.map_err(ServerError::from),
505 }
506}
507
508/// Build the gRPC reflection service from an encoded protobuf descriptor set.
509///
510/// Factored out of [`Server::into_router`] so the decode-failure path is unit
511/// testable: production passes [`tsoracle_proto::FILE_DESCRIPTOR_SET`], while
512/// tests can feed deliberately corrupt bytes to exercise the error mapping.
513/// A decode failure becomes [`ServerError::ReflectionInit`] rather than a panic.
514#[cfg(feature = "reflection")]
515fn build_reflection_service(
516 descriptor_set: &[u8],
517) -> Result<
518 tonic_reflection::server::v1::ServerReflectionServer<
519 impl tonic_reflection::server::v1::ServerReflection,
520 >,
521 ServerError,
522> {
523 tonic_reflection::server::Builder::configure()
524 .register_encoded_file_descriptor_set(descriptor_set)
525 .build_v1()
526 .map_err(ServerError::ReflectionInit)
527}
528
529fn panic_payload_to_string(panic: Box<dyn std::any::Any + Send>) -> String {
530 if let Some(text) = panic.downcast_ref::<&'static str>() {
531 (*text).to_string()
532 } else if let Some(text) = panic.downcast_ref::<String>() {
533 text.clone()
534 } else {
535 "watch task panicked with non-string payload".to_string()
536 }
537}
538
539#[cfg(any(test, feature = "test-fakes"))]
540impl Server {
541 /// Test-only entry point for the leader-watch loop. Exposed to integration
542 /// tests via the `test-fakes` feature; not part of the stable public API.
543 #[doc(hidden)]
544 pub async fn run_leader_watch_for_tests(self: Arc<Self>) -> Result<(), ServerError> {
545 crate::fence::run_leader_watch(self).await
546 }
547
548 /// Test-only allocator probe. Issues a window grant against the current
549 /// in-memory state without going through the gRPC service. Used by
550 /// regression tests that need to observe the behavioral fence (no
551 /// timestamp at or below the prior leader's high-water) directly.
552 #[doc(hidden)]
553 pub fn try_grant_for_tests(&self, count: u32) -> Result<WindowGrant, CoreError> {
554 self.allocator.lock().try_grant(self.clock.now_ms(), count)
555 }
556}
557
558#[cfg(test)]
559mod tests {
560 use super::*;
561
562 #[test]
563 fn panic_payload_to_string_recovers_static_str() {
564 // `panic!("literal")` produces a `&'static str` payload; we want the
565 // verbatim text so operators see what the watch task said.
566 let payload: Box<dyn std::any::Any + Send> = Box::new("watch boom");
567 assert_eq!(panic_payload_to_string(payload), "watch boom");
568 }
569
570 #[test]
571 fn panic_payload_to_string_recovers_owned_string() {
572 // `panic!("{var}")` produces a `String` payload (formatted at panic
573 // time); the helper must downcast that branch too.
574 let payload: Box<dyn std::any::Any + Send> = Box::new(String::from("formatted"));
575 assert_eq!(panic_payload_to_string(payload), "formatted");
576 }
577
578 #[test]
579 fn panic_payload_to_string_falls_back_for_other_types() {
580 // Custom payloads (panic!(MyType { .. })) hit the catch-all branch.
581 struct Custom;
582 let payload: Box<dyn std::any::Any + Send> = Box::new(Custom);
583 assert_eq!(
584 panic_payload_to_string(payload),
585 "watch task panicked with non-string payload",
586 );
587 }
588
589 #[cfg(any(feature = "tls-rustls", feature = "tls-native"))]
590 #[test]
591 fn builder_stores_tls_config() {
592 // The serve_* paths read `tls_config` from `Server` (not the builder)
593 // after `into_router` consumes self — so the field must survive the
594 // builder → Server hand-off, not just the builder method.
595 use crate::test_fakes::InMemoryDriver;
596
597 let driver = Arc::new(InMemoryDriver::new());
598 let cfg = tonic::transport::ServerTlsConfig::new();
599 let server = Server::builder()
600 .consensus_driver(driver)
601 .tls_config(cfg)
602 .build()
603 .expect("build with tls_config must succeed");
604 assert!(server.tls_config.is_some());
605 }
606
607 #[tokio::test]
608 async fn join_to_server_result_passes_through_clean_outcome() {
609 // Ok(Ok(())) — task finished cleanly; forward verbatim.
610 let handle = tokio::spawn(async { Ok::<(), ServerError>(()) });
611 let join = handle.await;
612 assert!(matches!(join_to_server_result(join), Ok(())));
613 }
614
615 #[tokio::test]
616 async fn join_to_server_result_forwards_inner_error() {
617 // Ok(Err(e)) — task returned an error; forward it.
618 let handle = tokio::spawn(async {
619 Err::<(), ServerError>(ServerError::WatchPanic {
620 payload: "synthetic".into(),
621 bt: Bt::capture(),
622 })
623 });
624 let join = handle.await;
625 match join_to_server_result(join) {
626 Err(ServerError::WatchPanic { payload, .. }) => assert_eq!(payload, "synthetic"),
627 other => panic!("expected forwarded WatchPanic, got {other:?}"),
628 }
629 }
630
631 #[tokio::test]
632 async fn join_to_server_result_translates_panic_to_watch_panic() {
633 // Err(JoinError::is_panic) — task panicked; surface as WatchPanic with
634 // the payload stringified by `panic_payload_to_string`.
635 let handle = tokio::spawn(async {
636 panic!("intentional");
637 #[allow(unreachable_code)]
638 Ok::<(), ServerError>(())
639 });
640 let join = handle.await;
641 match join_to_server_result(join) {
642 Err(ServerError::WatchPanic { payload, .. }) => {
643 assert!(payload.contains("intentional"))
644 }
645 other => panic!("expected WatchPanic, got {other:?}"),
646 }
647 }
648
649 #[tokio::test]
650 async fn join_to_server_result_treats_cancellation_as_clean_exit() {
651 // Err(JoinError::is_cancelled) — caller aborted the task; we asked
652 // for that, so map to Ok.
653 let handle: tokio::task::JoinHandle<Result<(), ServerError>> =
654 tokio::spawn(async { futures::future::pending().await });
655 handle.abort();
656 let join = handle.await;
657 assert!(matches!(join_to_server_result(join), Ok(())));
658 }
659
660 #[tokio::test]
661 async fn combine_watch_and_drain_surfaces_drain_error_when_watch_ok() {
662 // Watch ended cleanly but the graceful drain failed (port stolen,
663 // resource exhaustion). The drain error must not be swallowed.
664 let watch = tokio::spawn(async { Ok::<(), ServerError>(()) }).await;
665 let drain: Result<(), ServerError> = Err(ServerError::WatchStreamClosed);
666 assert!(matches!(
667 combine_watch_and_drain(watch, drain),
668 Err(ServerError::WatchStreamClosed)
669 ));
670 }
671
672 #[tokio::test]
673 async fn combine_watch_and_drain_returns_ok_when_both_succeed() {
674 // Watch clean, drain clean — the only fully-Ok outcome.
675 let watch = tokio::spawn(async { Ok::<(), ServerError>(()) }).await;
676 let drain: Result<(), ServerError> = Ok(());
677 assert!(matches!(combine_watch_and_drain(watch, drain), Ok(())));
678 }
679
680 #[tokio::test]
681 async fn combine_watch_and_drain_prefers_watch_error_over_drain_error() {
682 // Both failed. The watch error is the root cause (poisoned state was
683 // already published), so it wins; the drain error is dropped.
684 let watch = tokio::spawn(async {
685 Err::<(), ServerError>(ServerError::WatchPanic {
686 payload: "watch".into(),
687 bt: Bt::capture(),
688 })
689 })
690 .await;
691 let drain: Result<(), ServerError> = Err(ServerError::WatchStreamClosed);
692 match combine_watch_and_drain(watch, drain) {
693 Err(ServerError::WatchPanic { payload, .. }) => assert_eq!(payload, "watch"),
694 other => panic!("expected watch error to win, got {other:?}"),
695 }
696 }
697
698 #[tokio::test]
699 async fn combine_watch_and_drain_returns_watch_error_when_drain_ok() {
700 // Watch failed, drain succeeded — forward the watch error verbatim.
701 let watch = tokio::spawn(async {
702 Err::<(), ServerError>(ServerError::WatchPanic {
703 payload: "watch".into(),
704 bt: Bt::capture(),
705 })
706 })
707 .await;
708 let drain: Result<(), ServerError> = Ok(());
709 match combine_watch_and_drain(watch, drain) {
710 Err(ServerError::WatchPanic { payload, .. }) => assert_eq!(payload, "watch"),
711 other => panic!("expected forwarded watch error, got {other:?}"),
712 }
713 }
714
715 #[cfg(feature = "reflection")]
716 #[test]
717 fn build_reflection_service_accepts_embedded_descriptor_set() {
718 // The descriptor set emitted by `tsoracle-proto`'s build.rs must decode
719 // cleanly — this is the production happy path that previously sat behind
720 // an `expect`.
721 assert!(build_reflection_service(tsoracle_proto::FILE_DESCRIPTOR_SET).is_ok());
722 }
723
724 #[cfg(feature = "reflection")]
725 #[test]
726 fn build_reflection_service_maps_corrupt_descriptor_to_typed_error() {
727 // A descriptor set that fails to decode (build artifact drift) must
728 // surface as a typed `ServerError::ReflectionInit`, not a panic. The
729 // bytes below are not a valid encoded `FileDescriptorSet`.
730 let corrupt = b"\xff\xff\xff\xff not a descriptor set";
731 // The Ok variant wraps a reflection service that is not `Debug`, so map
732 // to a unit result before asserting on the error variant.
733 match build_reflection_service(corrupt).map(|_| ()) {
734 Err(ServerError::ReflectionInit(_)) => {}
735 other => panic!("expected ReflectionInit error, got {other:?}"),
736 }
737 }
738}