Skip to main content

tsoracle_server/
server.rs

1//
2//  ░▀█▀░█▀▀░█▀█░█▀▄░█▀█░█▀▀░█░░░█▀▀
3//  ░░█░░▀▀█░█░█░█▀▄░█▀█░█░░░█░░░█▀▀
4//  ░░▀░░▀▀▀░▀▀▀░▀░▀░▀░▀░▀▀▀░▀▀▀░▀▀▀
5//
6//  tsoracle — Distributed Timestamp Oracle
7//
8//  Copyright (c) 2026 Prisma Risk
9//  Licensed under the Apache License, Version 2.0
10//  https://github.com/prisma-risk/tsoracle
11//
12
13use core::time::Duration;
14use parking_lot::Mutex;
15use std::future::Future;
16use std::net::SocketAddr;
17use std::sync::Arc;
18use tokio::sync::watch;
19use tonic::service::Routes;
20use tonic::transport::Server as TonicServer;
21use tsoracle_consensus::ConsensusDriver;
22use tsoracle_core::{Allocator, Bt, Clock, Epoch, SystemClock};
23#[cfg(any(test, feature = "test-fakes"))]
24use tsoracle_core::{CoreError, WindowGrant};
25use tsoracle_proto::v1::tso_service_server::TsoServiceServer;
26
27use crate::service::TsoServiceImpl;
28
29#[derive(Debug, thiserror::Error)]
30pub enum BuildError {
31    #[error("consensus_driver is required")]
32    MissingConsensusDriver,
33    /// Surfaced when [`tsoracle_proto::v1::LEADER_HINT_TRAILER_KEY`] fails [`crate::leader_hint::validate_key`].
34    /// Today the key is a valid `const &'static str`, so this variant is
35    /// developer-error insurance: a future edit that breaks the key triggers
36    /// a startup failure rather than silently stripping the trailer from
37    /// every NOT_LEADER response.
38    #[error("invalid leader-hint metadata key: {0}")]
39    InvalidLeaderHintKey(#[from] tonic::metadata::errors::InvalidMetadataKey),
40}
41
42#[derive(Debug, thiserror::Error)]
43pub enum ServerError {
44    #[error("transport: {0}")]
45    Transport(#[from] tonic::transport::Error),
46    #[error("consensus: {0}")]
47    Consensus(#[from] tsoracle_consensus::ConsensusError),
48    #[error("core: {0}")]
49    Core(#[from] tsoracle_core::CoreError),
50    /// The leader-watch task panicked. Distinct from a clean error return so
51    /// operators can tell "driver returned Err" (recoverable design) from
52    /// "task panicked" (programming bug).
53    #[error("leader-watch task panicked: {payload}{bt}")]
54    WatchPanic { payload: String, bt: Bt },
55    /// The consensus driver's `leadership_events()` stream ended cleanly while
56    /// the leader-watch task was running. The stream is contracted to live for
57    /// the life of the server, so its end is anomalous (driver shutdown, lost
58    /// session, etc.) — distinct from a `Consensus` error returned mid-fence.
59    /// The watch task publishes `ServingState::NotServing` before returning
60    /// this variant so embedders who never observe the `JoinHandle` still get
61    /// the documented fail-safe behavior.
62    #[error("consensus driver leadership stream closed")]
63    WatchStreamClosed,
64    /// The embedded protobuf descriptor set failed to decode while building the
65    /// gRPC reflection service. `tsoracle-proto`'s `build.rs` emits these bytes
66    /// from checked-in `.proto` sources, so a failure here signals build-artifact
67    /// drift (a corrupt or stale descriptor) rather than a runtime condition —
68    /// surfaced as a diagnosable startup error instead of a process panic.
69    #[cfg(feature = "reflection")]
70    #[error("failed to build gRPC reflection service from embedded descriptor set: {0}")]
71    ReflectionInit(#[source] tonic_reflection::server::Error),
72}
73
74#[derive(Clone, Debug)]
75pub enum ServingState {
76    NotServing {
77        leader_endpoint: Option<String>,
78        leader_epoch: Option<Epoch>,
79    },
80    Serving,
81}
82
83pub struct ServerBuilder {
84    consensus: Option<Arc<dyn ConsensusDriver>>,
85    clock: Option<Arc<dyn Clock>>,
86    window_ahead: Duration,
87    failover_advance: Duration,
88    #[cfg(any(feature = "tls-rustls", feature = "tls-native"))]
89    tls_config: Option<tonic::transport::ServerTlsConfig>,
90}
91
92impl Default for ServerBuilder {
93    fn default() -> Self {
94        ServerBuilder {
95            consensus: None,
96            clock: None,
97            window_ahead: Duration::from_secs(3),
98            failover_advance: Duration::from_secs(1),
99            #[cfg(any(feature = "tls-rustls", feature = "tls-native"))]
100            tls_config: None,
101        }
102    }
103}
104
105impl ServerBuilder {
106    pub fn consensus_driver(mut self, driver: Arc<dyn ConsensusDriver>) -> Self {
107        self.consensus = Some(driver);
108        self
109    }
110    pub fn clock(mut self, clock: Arc<dyn Clock>) -> Self {
111        self.clock = Some(clock);
112        self
113    }
114    pub fn window_ahead(mut self, window_ahead: Duration) -> Self {
115        self.window_ahead = window_ahead;
116        self
117    }
118    pub fn failover_advance(mut self, failover_advance: Duration) -> Self {
119        self.failover_advance = failover_advance;
120        self
121    }
122
123    /// Configure TLS termination for this server. Applied inside
124    /// [`Server::serve`], [`Server::serve_with_shutdown`], and
125    /// [`Server::serve_with_listener`]. Not applied to [`Server::into_router`] —
126    /// embedders mounting tsoracle alongside their own services control TLS
127    /// on their own tonic builder.
128    #[cfg(any(feature = "tls-rustls", feature = "tls-native"))]
129    pub fn tls_config(mut self, cfg: tonic::transport::ServerTlsConfig) -> Self {
130        self.tls_config = Some(cfg);
131        self
132    }
133
134    pub fn build(self) -> Result<Server, BuildError> {
135        crate::leader_hint::validate_key()?;
136        let consensus = self.consensus.ok_or(BuildError::MissingConsensusDriver)?;
137        let clock = self.clock.unwrap_or_else(|| Arc::new(SystemClock));
138        let (state_tx, state_rx) = watch::channel(ServingState::NotServing {
139            leader_endpoint: None,
140            leader_epoch: None,
141        });
142        Ok(Server {
143            consensus,
144            clock,
145            window_ahead: self.window_ahead,
146            failover_advance: self.failover_advance,
147            allocator: Arc::new(Mutex::new(Allocator::new())),
148            state_tx,
149            state_rx,
150            extension_lock: Arc::new(tokio::sync::Mutex::new(())),
151            extension_gate: Arc::new(tokio::sync::RwLock::new(())),
152            #[cfg(any(feature = "tls-rustls", feature = "tls-native"))]
153            tls_config: self.tls_config,
154        })
155    }
156}
157
158pub struct Server {
159    pub(crate) consensus: Arc<dyn ConsensusDriver>,
160    pub(crate) clock: Arc<dyn Clock>,
161    pub(crate) window_ahead: Duration,
162    pub(crate) failover_advance: Duration,
163    pub(crate) allocator: Arc<Mutex<Allocator>>,
164    pub(crate) state_tx: watch::Sender<ServingState>,
165    pub state_rx: watch::Receiver<ServingState>,
166    /// Serializes window extensions so a stampeding burst of `WindowExhausted`
167    /// requests resolves to a single `persist_high_water` round-trip. Acquired
168    /// before `extension_gate`; combined with a recheck-after-acquire inside
169    /// `extend_window`, latecomers find the window already extended and
170    /// return without contacting consensus.
171    pub(crate) extension_lock: Arc<tokio::sync::Mutex<()>>,
172    /// Read-locked by window-extension calls for the duration of their
173    /// prepare → persist → commit dance. The leader-watch task takes a
174    /// write lock between clearing serving and calling load_high_water,
175    /// which drains all in-flight extensions started under the prior epoch
176    /// before the fence proceeds.
177    pub(crate) extension_gate: Arc<tokio::sync::RwLock<()>>,
178    #[cfg(any(feature = "tls-rustls", feature = "tls-native"))]
179    pub(crate) tls_config: Option<tonic::transport::ServerTlsConfig>,
180}
181
182impl Server {
183    pub fn builder() -> ServerBuilder {
184        ServerBuilder::default()
185    }
186
187    /// Single transition API used in response to evidence that the current
188    /// epoch is no longer valid: consensus rejection (NotLeader/Fenced
189    /// during persist), leader-watch task termination, or other authoritative
190    /// signals of leadership loss.
191    ///
192    /// Clears the allocator BEFORE publishing `NotServing` so a racing
193    /// `try_grant` either (a) observes `CoreError::NotLeader` rather than
194    /// handing out a stale-epoch grant, or (b) succeeds against a still-valid
195    /// epoch and the *next* call observes `NotServing`. Either ordering is
196    /// safe; what is not safe is the inverse (publish first, clear later)
197    /// because a request between those two steps would see `Serving` AND a
198    /// still-leader allocator.
199    ///
200    /// Does NOT take `extension_gate.write()`. That would deadlock against
201    /// in-flight extensions currently holding the read lock and awaiting
202    /// `persist_high_water`. Those in-flights will either complete cleanly
203    /// (the next `try_grant` then sees `NotServing`) or fail with
204    /// NotLeader/Fenced (and reach this helper themselves — it is idempotent).
205    ///
206    /// `leader_epoch` carries the authoritative new-leader epoch when the
207    /// rejection reveals it — e.g. `ConsensusError::Fenced { current, .. }`
208    /// names the epoch that fenced us. Publishing it lets the resulting
209    /// `NotServing` hint redirect clients with an epoch to validate against;
210    /// callers with no epoch to offer (stream closure, panic) pass `None`.
211    pub(crate) fn step_down_due_to_consensus_rejection(
212        &self,
213        leader_endpoint: Option<String>,
214        leader_epoch: Option<Epoch>,
215    ) {
216        self.allocator.lock().on_leadership_lost();
217        let _ = self.state_tx.send(ServingState::NotServing {
218            leader_endpoint,
219            leader_epoch,
220        });
221    }
222}
223
224impl Server {
225    /// Return the configured `TsoServiceServer<TsoServiceImpl>` as a tonic
226    /// `Routes` value plus a `JoinHandle` for the spawned leader-watch task,
227    /// so callers can mount tsoracle's service alongside their own services
228    /// on a shared tonic listener instead of binding a dedicated port.
229    ///
230    /// The `JoinHandle` payload is `Result<(), ServerError>` so embedders
231    /// can observe leader-watch termination. The task never returns
232    /// `Ok(())`: every termination — driver error, panic, or clean EOF on
233    /// the leadership stream (surfaced as `ServerError::WatchStreamClosed`)
234    /// — publishes `ServingState::NotServing { leader_endpoint: None }`
235    /// before returning, so all subsequent RPCs fail fast with
236    /// `FAILED_PRECONDITION`. Embedders who never inspect the handle still
237    /// get fail-safe behavior.
238    ///
239    /// The `Server::serve()` method is a thin wrapper over this — it calls
240    /// `into_router`, builds a tonic `Server`, and binds a listener.
241    ///
242    /// Returns `Err(ServerError::ReflectionInit)` (only reachable under the
243    /// `reflection` feature) if the embedded descriptor set fails to decode.
244    /// That decode happens before the leader-watch task is spawned, so a failure
245    /// leaves nothing running to clean up.
246    pub fn into_router(
247        self,
248    ) -> Result<(Routes, tokio::task::JoinHandle<Result<(), ServerError>>), ServerError> {
249        // Build the reflection service first: a descriptor-decode failure must
250        // surface before we spawn the leader-watch task below, so an error path
251        // never leaks a running task.
252        #[cfg(feature = "reflection")]
253        let reflection = build_reflection_service(tsoracle_proto::FILE_DESCRIPTOR_SET)?;
254
255        let server = Arc::new(self);
256
257        let watch_server = server.clone();
258        let watch_handle = tokio::spawn(async move {
259            use futures::FutureExt;
260            // catch_unwind so a panic in run_leader_watch still routes through
261            // the poisoning path. Without this, embedders who mount into_router
262            // directly and never observe the JoinHandle would see
263            // ServingState::Serving remain published while the watch task is
264            // dead — the inverse of the fail-safe guarantee documented above.
265            // The panic is re-raised after poisoning so serve / serve_with_*
266            // continue to translate it into ServerError::WatchPanic via
267            // join_to_server_result.
268            let outcome =
269                std::panic::AssertUnwindSafe(crate::fence::run_leader_watch(watch_server.clone()))
270                    .catch_unwind()
271                    .await;
272            match outcome {
273                Ok(result) => {
274                    if let Err(ref _e) = result {
275                        // Poison BEFORE returning so embedders who do not observe
276                        // the JoinHandle still get fail-safe behavior.
277                        watch_server.step_down_due_to_consensus_rejection(None, None);
278                        #[cfg(feature = "tracing")]
279                        tracing::error!(error = %_e, "leader-watch terminated; serving disabled");
280                    }
281                    result
282                }
283                Err(panic_payload) => {
284                    // Mirror the Err branch: poison BEFORE re-raising so
285                    // handle-dropping embedders still observe NotServing.
286                    watch_server.step_down_due_to_consensus_rejection(None, None);
287                    #[cfg(feature = "tracing")]
288                    tracing::error!("leader-watch panicked; serving disabled");
289                    std::panic::resume_unwind(panic_payload);
290                }
291            }
292        });
293
294        let service = TsoServiceImpl { server };
295        #[allow(unused_mut)]
296        let mut routes = Routes::new(TsoServiceServer::new(service));
297        #[cfg(feature = "reflection")]
298        {
299            routes = routes.add_service(reflection);
300        }
301        Ok((routes, watch_handle))
302    }
303
304    pub async fn serve(self, addr: SocketAddr) -> Result<(), ServerError> {
305        self.serve_with_shutdown(addr, futures::future::pending())
306            .await
307    }
308
309    /// Run the gRPC server until either the caller's `shutdown` fires or the
310    /// leader-watch task terminates.
311    ///
312    /// Three outcomes:
313    /// 1. `shutdown` fires first → tonic drains in-flights and returns Ok.
314    ///    The watch handle is aborted; any error it had been about to return
315    ///    is forfeited (the process is shutting down anyway).
316    /// 2. Watch returns `Ok(Err(e))` → poisoned state is already published;
317    ///    `cancel_tx` triggers tonic's graceful shutdown; in-flight `GetTs`
318    ///    calls whose `try_grant` already succeeded complete with the
319    ///    timestamps they were allocated; new calls fail fast. Returns `Err(e)`
320    ///    — the watch error wins even if the drain itself also errors (see
321    ///    `combine_watch_and_drain`); a drain error is surfaced only when the
322    ///    watch ended cleanly.
323    /// 3. Watch task panics → returns `Err(ServerError::WatchPanic{..})`
324    ///    with the panic payload stringified. Same drain semantics as (2).
325    pub async fn serve_with_shutdown(
326        self,
327        addr: SocketAddr,
328        shutdown: impl Future<Output = ()> + Send + 'static,
329    ) -> Result<(), ServerError> {
330        #[cfg(any(feature = "tls-rustls", feature = "tls-native"))]
331        let tls_config = self.tls_config.clone();
332
333        let (routes, mut watch_handle) = self.into_router()?;
334        let (cancel_tx, cancel_rx) = tokio::sync::oneshot::channel::<()>();
335
336        // tonic stops when EITHER the user's shutdown fires OR we cancel
337        // because the watch task terminated.
338        let combined_shutdown = async move {
339            tokio::select! {
340                _ = shutdown => {}
341                _ = cancel_rx => {}
342            }
343        };
344
345        let mut tonic = TonicServer::builder();
346        #[cfg(any(feature = "tls-rustls", feature = "tls-native"))]
347        if let Some(cfg) = tls_config {
348            tonic = tonic.tls_config(cfg).map_err(ServerError::Transport)?;
349        }
350        let serve = tonic
351            .add_routes(routes)
352            .serve_with_shutdown(addr, combined_shutdown);
353        tokio::pin!(serve);
354
355        tokio::select! {
356            // Bias toward the watch arm: if both are ready in the same poll
357            // (rare but possible — graceful shutdown completed in the same
358            // tick the watch returned), we want to surface the watch error
359            // rather than report a clean shutdown.
360            biased;
361
362            watch_result = &mut watch_handle => {
363                // Watch terminated. State is already poisoned (see watch
364                // task body in into_router). Trigger tonic drain, wait for
365                // it to finish, then report the watch's outcome — preferring
366                // it over any drain error, which surfaces only if the watch
367                // itself ended cleanly.
368                let _ = cancel_tx.send(());
369                let drain_result = serve.await;
370                combine_watch_and_drain(watch_result, drain_result)
371            }
372            serve_result = &mut serve => {
373                // User shutdown fired (or our cancel — but watch arm has
374                // `biased` priority, so reaching here means user shutdown).
375                // The watch task may still be running; aborting it loses
376                // any error it was about to report, but the process is
377                // shutting down so that's acceptable.
378                watch_handle.abort();
379                serve_result?;
380                Ok(())
381            }
382        }
383    }
384
385    /// Run the gRPC server on a caller-provided `TcpListener` until either
386    /// the caller-provided `shutdown` fires or the leader-watch task terminates.
387    ///
388    /// Use this instead of [`Self::serve_with_shutdown`] when you need to
389    /// observe the OS-picked port (`127.0.0.1:0`) before clients connect, or
390    /// when you want to wrap the listener in an outer adapter before passing it
391    /// in. The listening socket is owned by the caller and passed here; tsoracle
392    /// starts accepting on it immediately.
393    ///
394    /// Three outcomes:
395    /// 1. `shutdown` fires first → tonic drains in-flights and returns `Ok`.
396    ///    The watch handle is aborted; any error it had been about to return
397    ///    is forfeited (the process is shutting down anyway).
398    /// 2. Watch returns `Ok(Err(e))` → poisoned state is already published;
399    ///    the caller-provided shutdown is cancelled internally so tonic begins
400    ///    graceful shutdown; in-flight `GetTs` calls whose `try_grant` already
401    ///    succeeded complete with the timestamps they were allocated; new calls
402    ///    fail fast. Returns `Err(e)` — the watch error wins even if the drain
403    ///    itself also errors (see `combine_watch_and_drain`); a drain error is
404    ///    surfaced only when the watch ended cleanly.
405    /// 3. Watch task panics → returns `Err(ServerError::WatchPanic{..})`
406    ///    with the panic payload stringified. Same drain semantics as (2).
407    pub async fn serve_with_listener(
408        self,
409        listener: tokio::net::TcpListener,
410        shutdown: impl Future<Output = ()> + Send + 'static,
411    ) -> Result<(), ServerError> {
412        #[cfg(any(feature = "tls-rustls", feature = "tls-native"))]
413        let tls_config = self.tls_config.clone();
414
415        let (routes, mut watch_handle) = self.into_router()?;
416        let (cancel_tx, cancel_rx) = tokio::sync::oneshot::channel::<()>();
417
418        let combined_shutdown = async move {
419            tokio::select! {
420                _ = shutdown => {}
421                _ = cancel_rx => {}
422            }
423        };
424
425        let incoming = tonic::transport::server::TcpIncoming::from(listener);
426
427        let mut tonic = TonicServer::builder();
428        #[cfg(any(feature = "tls-rustls", feature = "tls-native"))]
429        if let Some(cfg) = tls_config {
430            tonic = tonic.tls_config(cfg).map_err(ServerError::Transport)?;
431        }
432        let serve = tonic
433            .add_routes(routes)
434            .serve_with_incoming_shutdown(incoming, combined_shutdown);
435        tokio::pin!(serve);
436
437        tokio::select! {
438            biased;
439
440            watch_result = &mut watch_handle => {
441                let _ = cancel_tx.send(());
442                let drain_result = serve.await;
443                combine_watch_and_drain(watch_result, drain_result)
444            }
445            serve_result = &mut serve => {
446                watch_handle.abort();
447                serve_result?;
448                Ok(())
449            }
450        }
451    }
452}
453
454/// Convert a `JoinHandle` result into a `ServerError`-typed result.
455///
456/// - `Ok(Ok(()))` — unreachable in production: `run_leader_watch` only
457///   returns from its loop via the `WatchStreamClosed` branch. Forwarded
458///   verbatim so the conversion remains total for test helpers that spawn
459///   no-op join futures.
460/// - `Ok(Err(e))` — task returned an error (including `WatchStreamClosed`
461///   from a clean EOF). Forward verbatim.
462/// - `Err(JoinError)` — task was cancelled or panicked. Cancellation maps to
463///   Ok (we asked for it); panic maps to `WatchPanic` with payload.
464fn join_to_server_result(
465    join_result: Result<Result<(), ServerError>, tokio::task::JoinError>,
466) -> Result<(), ServerError> {
467    match join_result {
468        Ok(inner) => inner,
469        Err(join_err) if join_err.is_panic() => {
470            let payload = panic_payload_to_string(join_err.into_panic());
471            Err(ServerError::WatchPanic {
472                payload,
473                bt: Bt::capture(),
474            })
475        }
476        Err(_cancelled) => Ok(()),
477    }
478}
479
480/// Combine the leader-watch outcome with the tonic graceful-drain outcome
481/// after the watch arm fired.
482///
483/// When the watch task terminates first we trigger the drain and then must
484/// report a single result. The watch error is the root cause — poisoned
485/// serving state was already published before the task returned — so it wins
486/// when both fail. A drain error (port stolen, resource exhaustion) is only
487/// surfaced when the watch outcome is otherwise `Ok`; previously it was
488/// discarded via `let _ = serve.await`, hiding a failed drain behind a clean
489/// shutdown report.
490///
491/// Generic over the drain error so the precedence logic is unit-testable
492/// without fabricating a `tonic::transport::Error` (which has no public
493/// constructor): production passes `Result<(), tonic::transport::Error>`,
494/// tests pass `Result<(), ServerError>` via the reflexive `From` impl.
495fn combine_watch_and_drain<E>(
496    watch_result: Result<Result<(), ServerError>, tokio::task::JoinError>,
497    drain_result: Result<(), E>,
498) -> Result<(), ServerError>
499where
500    ServerError: From<E>,
501{
502    match join_to_server_result(watch_result) {
503        Err(watch_err) => Err(watch_err),
504        Ok(()) => drain_result.map_err(ServerError::from),
505    }
506}
507
508/// Build the gRPC reflection service from an encoded protobuf descriptor set.
509///
510/// Factored out of [`Server::into_router`] so the decode-failure path is unit
511/// testable: production passes [`tsoracle_proto::FILE_DESCRIPTOR_SET`], while
512/// tests can feed deliberately corrupt bytes to exercise the error mapping.
513/// A decode failure becomes [`ServerError::ReflectionInit`] rather than a panic.
514#[cfg(feature = "reflection")]
515fn build_reflection_service(
516    descriptor_set: &[u8],
517) -> Result<
518    tonic_reflection::server::v1::ServerReflectionServer<
519        impl tonic_reflection::server::v1::ServerReflection,
520    >,
521    ServerError,
522> {
523    tonic_reflection::server::Builder::configure()
524        .register_encoded_file_descriptor_set(descriptor_set)
525        .build_v1()
526        .map_err(ServerError::ReflectionInit)
527}
528
529fn panic_payload_to_string(panic: Box<dyn std::any::Any + Send>) -> String {
530    if let Some(text) = panic.downcast_ref::<&'static str>() {
531        (*text).to_string()
532    } else if let Some(text) = panic.downcast_ref::<String>() {
533        text.clone()
534    } else {
535        "watch task panicked with non-string payload".to_string()
536    }
537}
538
539#[cfg(any(test, feature = "test-fakes"))]
540impl Server {
541    /// Test-only entry point for the leader-watch loop. Exposed to integration
542    /// tests via the `test-fakes` feature; not part of the stable public API.
543    #[doc(hidden)]
544    pub async fn run_leader_watch_for_tests(self: Arc<Self>) -> Result<(), ServerError> {
545        crate::fence::run_leader_watch(self).await
546    }
547
548    /// Test-only allocator probe. Issues a window grant against the current
549    /// in-memory state without going through the gRPC service. Used by
550    /// regression tests that need to observe the behavioral fence (no
551    /// timestamp at or below the prior leader's high-water) directly.
552    #[doc(hidden)]
553    pub fn try_grant_for_tests(&self, count: u32) -> Result<WindowGrant, CoreError> {
554        self.allocator.lock().try_grant(self.clock.now_ms(), count)
555    }
556}
557
558#[cfg(test)]
559mod tests {
560    use super::*;
561
562    #[test]
563    fn panic_payload_to_string_recovers_static_str() {
564        // `panic!("literal")` produces a `&'static str` payload; we want the
565        // verbatim text so operators see what the watch task said.
566        let payload: Box<dyn std::any::Any + Send> = Box::new("watch boom");
567        assert_eq!(panic_payload_to_string(payload), "watch boom");
568    }
569
570    #[test]
571    fn panic_payload_to_string_recovers_owned_string() {
572        // `panic!("{var}")` produces a `String` payload (formatted at panic
573        // time); the helper must downcast that branch too.
574        let payload: Box<dyn std::any::Any + Send> = Box::new(String::from("formatted"));
575        assert_eq!(panic_payload_to_string(payload), "formatted");
576    }
577
578    #[test]
579    fn panic_payload_to_string_falls_back_for_other_types() {
580        // Custom payloads (panic!(MyType { .. })) hit the catch-all branch.
581        struct Custom;
582        let payload: Box<dyn std::any::Any + Send> = Box::new(Custom);
583        assert_eq!(
584            panic_payload_to_string(payload),
585            "watch task panicked with non-string payload",
586        );
587    }
588
589    #[cfg(any(feature = "tls-rustls", feature = "tls-native"))]
590    #[test]
591    fn builder_stores_tls_config() {
592        // The serve_* paths read `tls_config` from `Server` (not the builder)
593        // after `into_router` consumes self — so the field must survive the
594        // builder → Server hand-off, not just the builder method.
595        use crate::test_fakes::InMemoryDriver;
596
597        let driver = Arc::new(InMemoryDriver::new());
598        let cfg = tonic::transport::ServerTlsConfig::new();
599        let server = Server::builder()
600            .consensus_driver(driver)
601            .tls_config(cfg)
602            .build()
603            .expect("build with tls_config must succeed");
604        assert!(server.tls_config.is_some());
605    }
606
607    #[tokio::test]
608    async fn join_to_server_result_passes_through_clean_outcome() {
609        // Ok(Ok(())) — task finished cleanly; forward verbatim.
610        let handle = tokio::spawn(async { Ok::<(), ServerError>(()) });
611        let join = handle.await;
612        assert!(matches!(join_to_server_result(join), Ok(())));
613    }
614
615    #[tokio::test]
616    async fn join_to_server_result_forwards_inner_error() {
617        // Ok(Err(e)) — task returned an error; forward it.
618        let handle = tokio::spawn(async {
619            Err::<(), ServerError>(ServerError::WatchPanic {
620                payload: "synthetic".into(),
621                bt: Bt::capture(),
622            })
623        });
624        let join = handle.await;
625        match join_to_server_result(join) {
626            Err(ServerError::WatchPanic { payload, .. }) => assert_eq!(payload, "synthetic"),
627            other => panic!("expected forwarded WatchPanic, got {other:?}"),
628        }
629    }
630
631    #[tokio::test]
632    async fn join_to_server_result_translates_panic_to_watch_panic() {
633        // Err(JoinError::is_panic) — task panicked; surface as WatchPanic with
634        // the payload stringified by `panic_payload_to_string`.
635        let handle = tokio::spawn(async {
636            panic!("intentional");
637            #[allow(unreachable_code)]
638            Ok::<(), ServerError>(())
639        });
640        let join = handle.await;
641        match join_to_server_result(join) {
642            Err(ServerError::WatchPanic { payload, .. }) => {
643                assert!(payload.contains("intentional"))
644            }
645            other => panic!("expected WatchPanic, got {other:?}"),
646        }
647    }
648
649    #[tokio::test]
650    async fn join_to_server_result_treats_cancellation_as_clean_exit() {
651        // Err(JoinError::is_cancelled) — caller aborted the task; we asked
652        // for that, so map to Ok.
653        let handle: tokio::task::JoinHandle<Result<(), ServerError>> =
654            tokio::spawn(async { futures::future::pending().await });
655        handle.abort();
656        let join = handle.await;
657        assert!(matches!(join_to_server_result(join), Ok(())));
658    }
659
660    #[tokio::test]
661    async fn combine_watch_and_drain_surfaces_drain_error_when_watch_ok() {
662        // Watch ended cleanly but the graceful drain failed (port stolen,
663        // resource exhaustion). The drain error must not be swallowed.
664        let watch = tokio::spawn(async { Ok::<(), ServerError>(()) }).await;
665        let drain: Result<(), ServerError> = Err(ServerError::WatchStreamClosed);
666        assert!(matches!(
667            combine_watch_and_drain(watch, drain),
668            Err(ServerError::WatchStreamClosed)
669        ));
670    }
671
672    #[tokio::test]
673    async fn combine_watch_and_drain_returns_ok_when_both_succeed() {
674        // Watch clean, drain clean — the only fully-Ok outcome.
675        let watch = tokio::spawn(async { Ok::<(), ServerError>(()) }).await;
676        let drain: Result<(), ServerError> = Ok(());
677        assert!(matches!(combine_watch_and_drain(watch, drain), Ok(())));
678    }
679
680    #[tokio::test]
681    async fn combine_watch_and_drain_prefers_watch_error_over_drain_error() {
682        // Both failed. The watch error is the root cause (poisoned state was
683        // already published), so it wins; the drain error is dropped.
684        let watch = tokio::spawn(async {
685            Err::<(), ServerError>(ServerError::WatchPanic {
686                payload: "watch".into(),
687                bt: Bt::capture(),
688            })
689        })
690        .await;
691        let drain: Result<(), ServerError> = Err(ServerError::WatchStreamClosed);
692        match combine_watch_and_drain(watch, drain) {
693            Err(ServerError::WatchPanic { payload, .. }) => assert_eq!(payload, "watch"),
694            other => panic!("expected watch error to win, got {other:?}"),
695        }
696    }
697
698    #[tokio::test]
699    async fn combine_watch_and_drain_returns_watch_error_when_drain_ok() {
700        // Watch failed, drain succeeded — forward the watch error verbatim.
701        let watch = tokio::spawn(async {
702            Err::<(), ServerError>(ServerError::WatchPanic {
703                payload: "watch".into(),
704                bt: Bt::capture(),
705            })
706        })
707        .await;
708        let drain: Result<(), ServerError> = Ok(());
709        match combine_watch_and_drain(watch, drain) {
710            Err(ServerError::WatchPanic { payload, .. }) => assert_eq!(payload, "watch"),
711            other => panic!("expected forwarded watch error, got {other:?}"),
712        }
713    }
714
715    #[cfg(feature = "reflection")]
716    #[test]
717    fn build_reflection_service_accepts_embedded_descriptor_set() {
718        // The descriptor set emitted by `tsoracle-proto`'s build.rs must decode
719        // cleanly — this is the production happy path that previously sat behind
720        // an `expect`.
721        assert!(build_reflection_service(tsoracle_proto::FILE_DESCRIPTOR_SET).is_ok());
722    }
723
724    #[cfg(feature = "reflection")]
725    #[test]
726    fn build_reflection_service_maps_corrupt_descriptor_to_typed_error() {
727        // A descriptor set that fails to decode (build artifact drift) must
728        // surface as a typed `ServerError::ReflectionInit`, not a panic. The
729        // bytes below are not a valid encoded `FileDescriptorSet`.
730        let corrupt = b"\xff\xff\xff\xff not a descriptor set";
731        // The Ok variant wraps a reflection service that is not `Debug`, so map
732        // to a unit result before asserting on the error variant.
733        match build_reflection_service(corrupt).map(|_| ()) {
734            Err(ServerError::ReflectionInit(_)) => {}
735            other => panic!("expected ReflectionInit error, got {other:?}"),
736        }
737    }
738}