tsoracle_server/server.rs
1use core::time::Duration;
2use parking_lot::Mutex;
3use std::future::Future;
4use std::net::SocketAddr;
5use std::sync::Arc;
6use tokio::sync::watch;
7use tonic::service::Routes;
8use tonic::transport::Server as TonicServer;
9use tsoracle_consensus::ConsensusDriver;
10use tsoracle_core::{Allocator, Clock, SystemClock};
11#[cfg(any(test, feature = "test-fakes"))]
12use tsoracle_core::{CoreError, WindowGrant};
13use tsoracle_proto::v1::tso_service_server::TsoServiceServer;
14
15use crate::service::TsoServiceImpl;
16
17#[derive(Debug, thiserror::Error)]
18pub enum BuildError {
19 #[error("consensus_driver is required")]
20 MissingConsensusDriver,
21}
22
23#[derive(Debug, thiserror::Error)]
24pub enum ServerError {
25 #[error("transport: {0}")]
26 Transport(#[from] tonic::transport::Error),
27 #[error("consensus: {0}")]
28 Consensus(#[from] tsoracle_consensus::ConsensusError),
29 #[error("core: {0}")]
30 Core(#[from] tsoracle_core::CoreError),
31 /// The leader-watch task panicked. Distinct from a clean error return so
32 /// operators can tell "driver returned Err" (recoverable design) from
33 /// "task panicked" (programming bug).
34 #[error("leader-watch task panicked: {payload}")]
35 WatchPanic { payload: String },
36}
37
38#[derive(Clone, Debug)]
39pub enum ServingState {
40 NotServing { leader_endpoint: Option<String> },
41 Serving,
42}
43
44pub struct ServerBuilder {
45 consensus: Option<Arc<dyn ConsensusDriver>>,
46 clock: Option<Arc<dyn Clock>>,
47 window_ahead: Duration,
48 failover_advance: Duration,
49}
50
51impl Default for ServerBuilder {
52 fn default() -> Self {
53 ServerBuilder {
54 consensus: None,
55 clock: None,
56 window_ahead: Duration::from_secs(3),
57 failover_advance: Duration::from_secs(1),
58 }
59 }
60}
61
62impl ServerBuilder {
63 pub fn consensus_driver(mut self, b: Arc<dyn ConsensusDriver>) -> Self {
64 self.consensus = Some(b);
65 self
66 }
67 pub fn clock(mut self, c: Arc<dyn Clock>) -> Self {
68 self.clock = Some(c);
69 self
70 }
71 pub fn window_ahead(mut self, d: Duration) -> Self {
72 self.window_ahead = d;
73 self
74 }
75 pub fn failover_advance(mut self, d: Duration) -> Self {
76 self.failover_advance = d;
77 self
78 }
79 pub fn build(self) -> Result<Server, BuildError> {
80 let consensus = self.consensus.ok_or(BuildError::MissingConsensusDriver)?;
81 let clock = self.clock.unwrap_or_else(|| Arc::new(SystemClock));
82 let (state_tx, state_rx) = watch::channel(ServingState::NotServing {
83 leader_endpoint: None,
84 });
85 Ok(Server {
86 consensus,
87 clock,
88 window_ahead: self.window_ahead,
89 failover_advance: self.failover_advance,
90 allocator: Arc::new(Mutex::new(Allocator::new())),
91 state_tx,
92 state_rx,
93 extension_lock: Arc::new(tokio::sync::Mutex::new(())),
94 extension_gate: Arc::new(tokio::sync::RwLock::new(())),
95 })
96 }
97}
98
99pub struct Server {
100 pub(crate) consensus: Arc<dyn ConsensusDriver>,
101 pub(crate) clock: Arc<dyn Clock>,
102 pub(crate) window_ahead: Duration,
103 pub(crate) failover_advance: Duration,
104 pub(crate) allocator: Arc<Mutex<Allocator>>,
105 pub(crate) state_tx: watch::Sender<ServingState>,
106 pub state_rx: watch::Receiver<ServingState>,
107 /// Serializes window extensions so a stampeding burst of `WindowExhausted`
108 /// requests resolves to a single `persist_high_water` round-trip. Acquired
109 /// before `extension_gate`; combined with a recheck-after-acquire inside
110 /// `extend_window`, latecomers find the window already extended and
111 /// return without contacting consensus.
112 pub(crate) extension_lock: Arc<tokio::sync::Mutex<()>>,
113 /// Read-locked by window-extension calls for the duration of their
114 /// prepare → persist → commit dance. The leader-watch task takes a
115 /// write lock between clearing serving and calling load_high_water,
116 /// which drains all in-flight extensions started under the prior epoch
117 /// before the fence proceeds.
118 pub(crate) extension_gate: Arc<tokio::sync::RwLock<()>>,
119}
120
121impl Server {
122 pub fn builder() -> ServerBuilder {
123 ServerBuilder::default()
124 }
125
126 /// Single transition API used in response to evidence that the current
127 /// epoch is no longer valid: consensus rejection (NotLeader/Fenced
128 /// during persist), leader-watch task termination, or other authoritative
129 /// signals of leadership loss.
130 ///
131 /// Clears the allocator BEFORE publishing `NotServing` so a racing
132 /// `try_grant` either (a) observes `CoreError::NotLeader` rather than
133 /// handing out a stale-epoch grant, or (b) succeeds against a still-valid
134 /// epoch and the *next* call observes `NotServing`. Either ordering is
135 /// safe; what is not safe is the inverse (publish first, clear later)
136 /// because a request between those two steps would see `Serving` AND a
137 /// still-leader allocator.
138 ///
139 /// Does NOT take `extension_gate.write()`. That would deadlock against
140 /// in-flight extensions currently holding the read lock and awaiting
141 /// `persist_high_water`. Those in-flights will either complete cleanly
142 /// (the next `try_grant` then sees `NotServing`) or fail with
143 /// NotLeader/Fenced (and reach this helper themselves — it is idempotent).
144 pub(crate) fn step_down_due_to_consensus_rejection(&self, leader_endpoint: Option<String>) {
145 self.allocator.lock().on_leadership_lost();
146 let _ = self
147 .state_tx
148 .send(ServingState::NotServing { leader_endpoint });
149 }
150}
151
152impl Server {
153 /// Return the configured `TsoServiceServer<TsoServiceImpl>` as a tonic
154 /// `Routes` value plus a `JoinHandle` for the spawned leader-watch task,
155 /// so callers can mount tsoracle's service alongside their own services
156 /// on a shared tonic listener instead of binding a dedicated port.
157 ///
158 /// The `JoinHandle` payload is `Result<(), ServerError>` so embedders
159 /// can observe leader-watch termination. Before returning an error, the
160 /// task publishes `ServingState::NotServing { leader_endpoint: None }`
161 /// so all subsequent RPCs fail fast with `FAILED_PRECONDITION` — even
162 /// embedders who never inspect the handle get fail-safe behavior.
163 ///
164 /// The `Server::serve()` method is a thin wrapper over this — it calls
165 /// `into_router`, builds a tonic `Server`, and binds a listener.
166 pub fn into_router(self) -> (Routes, tokio::task::JoinHandle<Result<(), ServerError>>) {
167 let server = Arc::new(self);
168
169 let watch_server = server.clone();
170 let watch_handle = tokio::spawn(async move {
171 let result = crate::fence::run_leader_watch(watch_server.clone()).await;
172 if let Err(ref _e) = result {
173 // Poison BEFORE returning so embedders who do not observe
174 // the JoinHandle still get fail-safe behavior.
175 watch_server.step_down_due_to_consensus_rejection(None);
176 #[cfg(feature = "tracing")]
177 tracing::error!(error = %_e, "leader-watch terminated; serving disabled");
178 }
179 result
180 });
181
182 let service = TsoServiceImpl { server };
183 let routes = Routes::new(TsoServiceServer::new(service));
184 (routes, watch_handle)
185 }
186
187 pub async fn serve(self, addr: SocketAddr) -> Result<(), ServerError> {
188 self.serve_with_shutdown(addr, futures::future::pending())
189 .await
190 }
191
192 /// Run the gRPC server until either the caller's `shutdown` fires or the
193 /// leader-watch task terminates.
194 ///
195 /// Three outcomes:
196 /// 1. `shutdown` fires first → tonic drains in-flights and returns Ok.
197 /// The watch handle is aborted; any error it had been about to return
198 /// is forfeited (the process is shutting down anyway).
199 /// 2. Watch returns `Ok(Err(e))` → poisoned state is already published;
200 /// `cancel_tx` triggers tonic's graceful shutdown; in-flight `GetTs`
201 /// calls whose `try_grant` already succeeded complete with the
202 /// timestamps they were allocated; new calls fail fast. Returns `Err(e)`.
203 /// 3. Watch task panics → returns `Err(ServerError::WatchPanic{..})`
204 /// with the panic payload stringified. Same drain semantics as (2).
205 pub async fn serve_with_shutdown(
206 self,
207 addr: SocketAddr,
208 shutdown: impl Future<Output = ()> + Send + 'static,
209 ) -> Result<(), ServerError> {
210 let (routes, mut watch_handle) = self.into_router();
211 let (cancel_tx, cancel_rx) = tokio::sync::oneshot::channel::<()>();
212
213 // tonic stops when EITHER the user's shutdown fires OR we cancel
214 // because the watch task terminated.
215 let combined_shutdown = async move {
216 tokio::select! {
217 _ = shutdown => {}
218 _ = cancel_rx => {}
219 }
220 };
221
222 let serve = TonicServer::builder()
223 .add_routes(routes)
224 .serve_with_shutdown(addr, combined_shutdown);
225 tokio::pin!(serve);
226
227 tokio::select! {
228 // Bias toward the watch arm: if both are ready in the same poll
229 // (rare but possible — graceful shutdown completed in the same
230 // tick the watch returned), we want to surface the watch error
231 // rather than report a clean shutdown.
232 biased;
233
234 watch_result = &mut watch_handle => {
235 // Watch terminated. State is already poisoned (see watch
236 // task body in into_router). Trigger tonic drain and wait
237 // for it to finish, then report the watch's outcome.
238 let _ = cancel_tx.send(());
239 let _ = serve.await;
240 join_to_server_result(watch_result)
241 }
242 serve_result = &mut serve => {
243 // User shutdown fired (or our cancel — but watch arm has
244 // `biased` priority, so reaching here means user shutdown).
245 // The watch task may still be running; aborting it loses
246 // any error it was about to report, but the process is
247 // shutting down so that's acceptable.
248 watch_handle.abort();
249 serve_result?;
250 Ok(())
251 }
252 }
253 }
254}
255
256/// Convert a `JoinHandle` result into a `ServerError`-typed result.
257///
258/// - `Ok(Ok(()))` — task ended cleanly (driver stream closed). Caller decides
259/// whether this is normal (shutdown) or anomalous.
260/// - `Ok(Err(e))` — task returned an error. Forward verbatim.
261/// - `Err(JoinError)` — task was cancelled or panicked. Cancellation maps to
262/// Ok (we asked for it); panic maps to `WatchPanic` with payload.
263fn join_to_server_result(
264 r: Result<Result<(), ServerError>, tokio::task::JoinError>,
265) -> Result<(), ServerError> {
266 match r {
267 Ok(inner) => inner,
268 Err(join_err) if join_err.is_panic() => {
269 let payload = panic_payload_to_string(join_err.into_panic());
270 Err(ServerError::WatchPanic { payload })
271 }
272 Err(_cancelled) => Ok(()),
273 }
274}
275
276fn panic_payload_to_string(panic: Box<dyn std::any::Any + Send>) -> String {
277 if let Some(s) = panic.downcast_ref::<&'static str>() {
278 (*s).to_string()
279 } else if let Some(s) = panic.downcast_ref::<String>() {
280 s.clone()
281 } else {
282 "watch task panicked with non-string payload".to_string()
283 }
284}
285
286#[cfg(any(test, feature = "test-fakes"))]
287impl Server {
288 /// Test-only entry point for the leader-watch loop. Exposed to integration
289 /// tests via the `test-fakes` feature; not part of the stable public API.
290 #[doc(hidden)]
291 pub async fn run_leader_watch_for_tests(self: Arc<Self>) -> Result<(), ServerError> {
292 crate::fence::run_leader_watch(self).await
293 }
294
295 /// Test-only allocator probe. Issues a window grant against the current
296 /// in-memory state without going through the gRPC service. Used by
297 /// regression tests that need to observe the behavioral fence (no
298 /// timestamp at or below the prior leader's high-water) directly.
299 #[doc(hidden)]
300 pub fn try_grant_for_tests(&self, count: u32) -> Result<WindowGrant, CoreError> {
301 self.allocator.lock().try_grant(self.clock.now_ms(), count)
302 }
303}