Skip to main content

tsoracle_server/
server.rs

1use core::time::Duration;
2use parking_lot::Mutex;
3use std::future::Future;
4use std::net::SocketAddr;
5use std::sync::Arc;
6use tokio::sync::watch;
7use tonic::service::Routes;
8use tonic::transport::Server as TonicServer;
9use tsoracle_consensus::ConsensusDriver;
10use tsoracle_core::{Allocator, Clock, SystemClock};
11#[cfg(any(test, feature = "test-fakes"))]
12use tsoracle_core::{CoreError, WindowGrant};
13use tsoracle_proto::v1::tso_service_server::TsoServiceServer;
14
15use crate::service::TsoServiceImpl;
16
17#[derive(Debug, thiserror::Error)]
18pub enum BuildError {
19    #[error("consensus_driver is required")]
20    MissingConsensusDriver,
21}
22
23#[derive(Debug, thiserror::Error)]
24pub enum ServerError {
25    #[error("transport: {0}")]
26    Transport(#[from] tonic::transport::Error),
27    #[error("consensus: {0}")]
28    Consensus(#[from] tsoracle_consensus::ConsensusError),
29    #[error("core: {0}")]
30    Core(#[from] tsoracle_core::CoreError),
31    /// The leader-watch task panicked. Distinct from a clean error return so
32    /// operators can tell "driver returned Err" (recoverable design) from
33    /// "task panicked" (programming bug).
34    #[error("leader-watch task panicked: {payload}")]
35    WatchPanic { payload: String },
36}
37
38#[derive(Clone, Debug)]
39pub enum ServingState {
40    NotServing { leader_endpoint: Option<String> },
41    Serving,
42}
43
44pub struct ServerBuilder {
45    consensus: Option<Arc<dyn ConsensusDriver>>,
46    clock: Option<Arc<dyn Clock>>,
47    window_ahead: Duration,
48    failover_advance: Duration,
49}
50
51impl Default for ServerBuilder {
52    fn default() -> Self {
53        ServerBuilder {
54            consensus: None,
55            clock: None,
56            window_ahead: Duration::from_secs(3),
57            failover_advance: Duration::from_secs(1),
58        }
59    }
60}
61
62impl ServerBuilder {
63    pub fn consensus_driver(mut self, b: Arc<dyn ConsensusDriver>) -> Self {
64        self.consensus = Some(b);
65        self
66    }
67    pub fn clock(mut self, c: Arc<dyn Clock>) -> Self {
68        self.clock = Some(c);
69        self
70    }
71    pub fn window_ahead(mut self, d: Duration) -> Self {
72        self.window_ahead = d;
73        self
74    }
75    pub fn failover_advance(mut self, d: Duration) -> Self {
76        self.failover_advance = d;
77        self
78    }
79    pub fn build(self) -> Result<Server, BuildError> {
80        let consensus = self.consensus.ok_or(BuildError::MissingConsensusDriver)?;
81        let clock = self.clock.unwrap_or_else(|| Arc::new(SystemClock));
82        let (state_tx, state_rx) = watch::channel(ServingState::NotServing {
83            leader_endpoint: None,
84        });
85        Ok(Server {
86            consensus,
87            clock,
88            window_ahead: self.window_ahead,
89            failover_advance: self.failover_advance,
90            allocator: Arc::new(Mutex::new(Allocator::new())),
91            state_tx,
92            state_rx,
93            extension_lock: Arc::new(tokio::sync::Mutex::new(())),
94            extension_gate: Arc::new(tokio::sync::RwLock::new(())),
95        })
96    }
97}
98
99pub struct Server {
100    pub(crate) consensus: Arc<dyn ConsensusDriver>,
101    pub(crate) clock: Arc<dyn Clock>,
102    pub(crate) window_ahead: Duration,
103    pub(crate) failover_advance: Duration,
104    pub(crate) allocator: Arc<Mutex<Allocator>>,
105    pub(crate) state_tx: watch::Sender<ServingState>,
106    pub state_rx: watch::Receiver<ServingState>,
107    /// Serializes window extensions so a stampeding burst of `WindowExhausted`
108    /// requests resolves to a single `persist_high_water` round-trip. Acquired
109    /// before `extension_gate`; combined with a recheck-after-acquire inside
110    /// `extend_window`, latecomers find the window already extended and
111    /// return without contacting consensus.
112    pub(crate) extension_lock: Arc<tokio::sync::Mutex<()>>,
113    /// Read-locked by window-extension calls for the duration of their
114    /// prepare → persist → commit dance. The leader-watch task takes a
115    /// write lock between clearing serving and calling load_high_water,
116    /// which drains all in-flight extensions started under the prior epoch
117    /// before the fence proceeds.
118    pub(crate) extension_gate: Arc<tokio::sync::RwLock<()>>,
119}
120
121impl Server {
122    pub fn builder() -> ServerBuilder {
123        ServerBuilder::default()
124    }
125
126    /// Single transition API used in response to evidence that the current
127    /// epoch is no longer valid: consensus rejection (NotLeader/Fenced
128    /// during persist), leader-watch task termination, or other authoritative
129    /// signals of leadership loss.
130    ///
131    /// Clears the allocator BEFORE publishing `NotServing` so a racing
132    /// `try_grant` either (a) observes `CoreError::NotLeader` rather than
133    /// handing out a stale-epoch grant, or (b) succeeds against a still-valid
134    /// epoch and the *next* call observes `NotServing`. Either ordering is
135    /// safe; what is not safe is the inverse (publish first, clear later)
136    /// because a request between those two steps would see `Serving` AND a
137    /// still-leader allocator.
138    ///
139    /// Does NOT take `extension_gate.write()`. That would deadlock against
140    /// in-flight extensions currently holding the read lock and awaiting
141    /// `persist_high_water`. Those in-flights will either complete cleanly
142    /// (the next `try_grant` then sees `NotServing`) or fail with
143    /// NotLeader/Fenced (and reach this helper themselves — it is idempotent).
144    pub(crate) fn step_down_due_to_consensus_rejection(&self, leader_endpoint: Option<String>) {
145        self.allocator.lock().on_leadership_lost();
146        let _ = self
147            .state_tx
148            .send(ServingState::NotServing { leader_endpoint });
149    }
150}
151
152impl Server {
153    /// Return the configured `TsoServiceServer<TsoServiceImpl>` as a tonic
154    /// `Routes` value plus a `JoinHandle` for the spawned leader-watch task,
155    /// so callers can mount tsoracle's service alongside their own services
156    /// on a shared tonic listener instead of binding a dedicated port.
157    ///
158    /// The `JoinHandle` payload is `Result<(), ServerError>` so embedders
159    /// can observe leader-watch termination. Before returning an error, the
160    /// task publishes `ServingState::NotServing { leader_endpoint: None }`
161    /// so all subsequent RPCs fail fast with `FAILED_PRECONDITION` — even
162    /// embedders who never inspect the handle get fail-safe behavior.
163    ///
164    /// The `Server::serve()` method is a thin wrapper over this — it calls
165    /// `into_router`, builds a tonic `Server`, and binds a listener.
166    pub fn into_router(self) -> (Routes, tokio::task::JoinHandle<Result<(), ServerError>>) {
167        let server = Arc::new(self);
168
169        let watch_server = server.clone();
170        let watch_handle = tokio::spawn(async move {
171            let result = crate::fence::run_leader_watch(watch_server.clone()).await;
172            if let Err(ref _e) = result {
173                // Poison BEFORE returning so embedders who do not observe
174                // the JoinHandle still get fail-safe behavior.
175                watch_server.step_down_due_to_consensus_rejection(None);
176                #[cfg(feature = "tracing")]
177                tracing::error!(error = %_e, "leader-watch terminated; serving disabled");
178            }
179            result
180        });
181
182        let service = TsoServiceImpl { server };
183        let routes = Routes::new(TsoServiceServer::new(service));
184        (routes, watch_handle)
185    }
186
187    pub async fn serve(self, addr: SocketAddr) -> Result<(), ServerError> {
188        self.serve_with_shutdown(addr, futures::future::pending())
189            .await
190    }
191
192    /// Run the gRPC server until either the caller's `shutdown` fires or the
193    /// leader-watch task terminates.
194    ///
195    /// Three outcomes:
196    /// 1. `shutdown` fires first → tonic drains in-flights and returns Ok.
197    ///    The watch handle is aborted; any error it had been about to return
198    ///    is forfeited (the process is shutting down anyway).
199    /// 2. Watch returns `Ok(Err(e))` → poisoned state is already published;
200    ///    `cancel_tx` triggers tonic's graceful shutdown; in-flight `GetTs`
201    ///    calls whose `try_grant` already succeeded complete with the
202    ///    timestamps they were allocated; new calls fail fast. Returns `Err(e)`.
203    /// 3. Watch task panics → returns `Err(ServerError::WatchPanic{..})`
204    ///    with the panic payload stringified. Same drain semantics as (2).
205    pub async fn serve_with_shutdown(
206        self,
207        addr: SocketAddr,
208        shutdown: impl Future<Output = ()> + Send + 'static,
209    ) -> Result<(), ServerError> {
210        let (routes, mut watch_handle) = self.into_router();
211        let (cancel_tx, cancel_rx) = tokio::sync::oneshot::channel::<()>();
212
213        // tonic stops when EITHER the user's shutdown fires OR we cancel
214        // because the watch task terminated.
215        let combined_shutdown = async move {
216            tokio::select! {
217                _ = shutdown => {}
218                _ = cancel_rx => {}
219            }
220        };
221
222        let serve = TonicServer::builder()
223            .add_routes(routes)
224            .serve_with_shutdown(addr, combined_shutdown);
225        tokio::pin!(serve);
226
227        tokio::select! {
228            // Bias toward the watch arm: if both are ready in the same poll
229            // (rare but possible — graceful shutdown completed in the same
230            // tick the watch returned), we want to surface the watch error
231            // rather than report a clean shutdown.
232            biased;
233
234            watch_result = &mut watch_handle => {
235                // Watch terminated. State is already poisoned (see watch
236                // task body in into_router). Trigger tonic drain and wait
237                // for it to finish, then report the watch's outcome.
238                let _ = cancel_tx.send(());
239                let _ = serve.await;
240                join_to_server_result(watch_result)
241            }
242            serve_result = &mut serve => {
243                // User shutdown fired (or our cancel — but watch arm has
244                // `biased` priority, so reaching here means user shutdown).
245                // The watch task may still be running; aborting it loses
246                // any error it was about to report, but the process is
247                // shutting down so that's acceptable.
248                watch_handle.abort();
249                serve_result?;
250                Ok(())
251            }
252        }
253    }
254}
255
256/// Convert a `JoinHandle` result into a `ServerError`-typed result.
257///
258/// - `Ok(Ok(()))` — task ended cleanly (driver stream closed). Caller decides
259///   whether this is normal (shutdown) or anomalous.
260/// - `Ok(Err(e))` — task returned an error. Forward verbatim.
261/// - `Err(JoinError)` — task was cancelled or panicked. Cancellation maps to
262///   Ok (we asked for it); panic maps to `WatchPanic` with payload.
263fn join_to_server_result(
264    r: Result<Result<(), ServerError>, tokio::task::JoinError>,
265) -> Result<(), ServerError> {
266    match r {
267        Ok(inner) => inner,
268        Err(join_err) if join_err.is_panic() => {
269            let payload = panic_payload_to_string(join_err.into_panic());
270            Err(ServerError::WatchPanic { payload })
271        }
272        Err(_cancelled) => Ok(()),
273    }
274}
275
276fn panic_payload_to_string(panic: Box<dyn std::any::Any + Send>) -> String {
277    if let Some(s) = panic.downcast_ref::<&'static str>() {
278        (*s).to_string()
279    } else if let Some(s) = panic.downcast_ref::<String>() {
280        s.clone()
281    } else {
282        "watch task panicked with non-string payload".to_string()
283    }
284}
285
286#[cfg(any(test, feature = "test-fakes"))]
287impl Server {
288    /// Test-only entry point for the leader-watch loop. Exposed to integration
289    /// tests via the `test-fakes` feature; not part of the stable public API.
290    #[doc(hidden)]
291    pub async fn run_leader_watch_for_tests(self: Arc<Self>) -> Result<(), ServerError> {
292        crate::fence::run_leader_watch(self).await
293    }
294
295    /// Test-only allocator probe. Issues a window grant against the current
296    /// in-memory state without going through the gRPC service. Used by
297    /// regression tests that need to observe the behavioral fence (no
298    /// timestamp at or below the prior leader's high-water) directly.
299    #[doc(hidden)]
300    pub fn try_grant_for_tests(&self, count: u32) -> Result<WindowGrant, CoreError> {
301        self.allocator.lock().try_grant(self.clock.now_ms(), count)
302    }
303}