1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
//
// ░▀█▀░█▀▀░█▀█░█▀▄░█▀█░█▀▀░█░░░█▀▀
// ░░█░░▀▀█░█░█░█▀▄░█▀█░█░░░█░░░█▀▀
// ░░▀░░▀▀▀░▀▀▀░▀░▀░▀░▀░▀▀▀░▀▀▀░▀▀▀
//
// tsoracle — Distributed Timestamp Oracle
//
// Copyright (c) 2026 Prisma Risk
// Licensed under the Apache License, Version 2.0
// https://github.com/prisma-risk/tsoracle
//
// #[PerformanceCriticalPath]
use std::sync::Arc;
use tonic::{Request, Response, Status};
use tsoracle_consensus::ConsensusError;
use tsoracle_core::CoreError;
use tsoracle_proto::v1::{GetTsRequest, GetTsResponse, LeaderHint, tso_service_server::TsoService};
use crate::leader_hint::not_leader_status;
use crate::server::{Server, ServingState};
/// Snapshot the best-available leader hint from `state_rx`. Used wherever we
/// need to surface a `FAILED_PRECONDITION` "not leader" response from a
/// service-layer code path; matches what the fast NOT_LEADER gate emits.
fn leader_hint_from(server: &Server) -> LeaderHint {
let endpoint = match server.state_rx.borrow().clone() {
ServingState::NotServing { leader_endpoint } => leader_endpoint,
ServingState::Serving => None,
};
LeaderHint {
leader_endpoint: endpoint,
leader_epoch: None,
}
}
fn core_status(error: CoreError) -> Status {
match error {
CoreError::NotLeader => Status::failed_precondition("not leader"),
CoreError::WindowExhausted => Status::internal("window exhausted"),
CoreError::InvalidCount(count) => {
Status::invalid_argument(format!("invalid count: {count}"))
}
CoreError::PhysicalMsOutOfRange(physical_ms) => Status::out_of_range(format!(
"physical_ms {physical_ms} exceeds 46-bit timestamp field"
)),
CoreError::InvalidLeadershipWindow {
fence_floor,
committed_ceiling,
} => Status::internal(format!(
"invalid leadership window: fence_floor {fence_floor} exceeds committed_ceiling {committed_ceiling}"
)),
}
}
pub struct TsoServiceImpl {
pub(crate) server: Arc<Server>,
}
#[tonic::async_trait]
impl TsoService for TsoServiceImpl {
async fn get_ts(&self, req: Request<GetTsRequest>) -> Result<Response<GetTsResponse>, Status> {
crate::failpoint!("server::service::before_allocate");
let count = req.into_inner().count;
if count == 0 {
return Err(Status::invalid_argument("count must be >= 1"));
}
// Fast NOT_LEADER gate.
if let ServingState::NotServing { leader_endpoint } = self.server.state_rx.borrow().clone()
{
return Err(not_leader_status(LeaderHint {
leader_endpoint,
leader_epoch: None,
}));
}
// Two attempts: first one may return WindowExhausted, in which case we
// extend and retry once. A second WindowExhausted is a driver bug.
for attempt in 0..2 {
let outcome = {
let mut allocator = self.server.allocator.lock();
allocator.try_grant(self.server.clock.now_ms(), count)
};
match outcome {
Ok(grant) => {
#[cfg(feature = "metrics")]
{
metrics::counter!("tsoracle.get_ts.total").increment(1);
metrics::counter!("tsoracle.get_ts.timestamps_issued")
.increment(u64::from(grant.count));
}
return Ok(Response::new(GetTsResponse {
physical_ms: grant.physical_ms,
logical_start: grant.logical_start,
count: grant.count,
epoch: grant.epoch.0,
}));
}
Err(CoreError::NotLeader) => {
return Err(not_leader_status(leader_hint_from(&self.server)));
}
Err(CoreError::InvalidCount(c)) => {
return Err(Status::invalid_argument(format!("invalid count: {c}")));
}
Err(CoreError::PhysicalMsOutOfRange(v)) => {
return Err(Status::out_of_range(format!(
"physical_ms {v} exceeds 46-bit timestamp field"
)));
}
Err(e @ CoreError::InvalidLeadershipWindow { .. }) => {
return Err(core_status(e));
}
Err(CoreError::WindowExhausted) if attempt == 0 => {
self.extend_window(count).await?;
continue;
}
Err(CoreError::WindowExhausted) => {
return Err(Status::internal("window still exhausted after extension"));
}
}
}
unreachable!("loop returns or continues exactly twice")
}
}
impl TsoServiceImpl {
/// Extend the window with single-flight coalescing.
///
/// `extension_lock` (a `tokio::sync::Mutex`) is acquired first so only one
/// caller in any concurrent burst proceeds into the prepare/persist/commit
/// sequence. After acquiring, the caller rechecks whether the window has
/// already been extended enough to satisfy its own `count` — if yes, it
/// returns without contacting consensus. `count` is the caller's own
/// request count, used so the recheck mirrors the outer loop's next
/// `try_grant` exactly (a coarser check could skip an extension that the
/// outer retry still actually needs).
async fn extend_window(&self, count: u32) -> Result<(), Status> {
// Single-flight gate: serialize peer extenders so consensus is hit
// once per stampede, not once per stampeder.
let _extension_lock = self.server.extension_lock.lock().await;
// Recheck-after-acquire: a peer extender may have run prepare →
// persist → commit while we waited for the lock. If the outer
// try_grant retry would now succeed, skip the consensus round-trip.
// Reading now_ms fresh inside the lock keeps the predicate aligned
// with what the outer loop's next try_grant will observe.
if self
.server
.allocator
.lock()
.would_grant(self.server.clock.now_ms(), count)
{
return Ok(());
}
// Drain barrier: leader-watch's write() waits behind this read until
// our commit applies (or is silently dropped by the epoch check).
let _gate = self.server.extension_gate.read().await;
crate::failpoint!("server::service::extension_gate_held");
let (requested, epoch) = {
let allocator = self.server.allocator.lock();
let Some(epoch) = allocator.epoch() else {
// Lost leadership between the outer fast-gate check and here.
// Surface as a leader redirect (with the hint state_rx knows
// about), not a bare FAILED_PRECONDITION without metadata.
return Err(not_leader_status(leader_hint_from(&self.server)));
};
let now = self.server.clock.now_ms();
let target = allocator
.try_prepare_window_extension(now, self.server.window_ahead.as_millis() as u64)
.map_err(core_status)?;
(target, epoch)
};
// Count and time only the consensus round-trip itself: the
// recheck-after-acquire short-circuit above skips it, and operators
// tuning `window_ahead` care about how often a stampede actually
// reached persist + how long that took (success or failure).
#[cfg(feature = "metrics")]
let extension_started_at = std::time::Instant::now();
let persist_outcome = self
.server
.consensus
.persist_high_water(requested, epoch)
.await;
#[cfg(feature = "metrics")]
{
metrics::counter!("tsoracle.window.extensions.total").increment(1);
metrics::histogram!("tsoracle.window.extension_latency")
.record(extension_started_at.elapsed().as_secs_f64());
}
let actual = match persist_outcome {
Ok(v) => v,
// NotLeader / Fenced are authoritative proof from the consensus
// driver that this node's epoch is stale. Step down immediately
// — letting subsequent try_grant calls keep serving from a
// fenced epoch, even briefly, is the wrong tradeoff for a TSO.
// The step_down helper clears the allocator and publishes
// NotServing under the single transition API; the hint we pass
// back is whatever state_rx most recently knew about.
Err(ConsensusError::NotLeader { .. }) | Err(ConsensusError::Fenced { .. }) => {
self.server.step_down_due_to_consensus_rejection(None);
return Err(not_leader_status(leader_hint_from(&self.server)));
}
// Transient driver failure: storage hiccup, peer transport flap,
// quorum momentarily lost. Tell the client it MAY retry.
Err(ConsensusError::TransientDriver(e)) => {
return Err(Status::unavailable(format!("persist: {e}")));
}
// Permanent driver failure: read-only filesystem, corruption,
// gone storage device, invariant violation. Surface honestly so
// clients do not silently retry into a tarpit.
Err(ConsensusError::PermanentDriver(e)) => {
return Err(Status::internal(format!("persist: {e}")));
}
};
self.server
.allocator
.lock()
.try_commit_window_extension(actual, epoch)
.map_err(core_status)?;
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use tsoracle_core::Epoch;
#[test]
fn core_status_maps_each_variant_to_documented_code() {
// Every CoreError variant has a distinct gRPC status code; if a
// future edit drops a branch the mapping table here catches it.
assert_eq!(
core_status(CoreError::NotLeader).code(),
tonic::Code::FailedPrecondition,
);
assert_eq!(
core_status(CoreError::WindowExhausted).code(),
tonic::Code::Internal,
);
let invalid = core_status(CoreError::InvalidCount(7));
assert_eq!(invalid.code(), tonic::Code::InvalidArgument);
assert!(invalid.message().contains("invalid count: 7"));
let oor = core_status(CoreError::PhysicalMsOutOfRange(1 << 47));
assert_eq!(oor.code(), tonic::Code::OutOfRange);
assert!(oor.message().contains("46-bit"));
let invalid_window = core_status(CoreError::InvalidLeadershipWindow {
fence_floor: 9,
committed_ceiling: 4,
});
assert_eq!(invalid_window.code(), tonic::Code::Internal);
assert!(invalid_window.message().contains("fence_floor 9"));
assert!(invalid_window.message().contains("committed_ceiling 4"));
}
#[test]
fn leader_hint_from_returns_endpoint_when_not_serving() {
// Build a Server with the in-memory fake driver so we can mutate
// ServingState directly; the helper just snapshots state_rx.
let server = Server::builder()
.consensus_driver(std::sync::Arc::new(crate::test_fakes::InMemoryDriver::new()))
.clock(std::sync::Arc::new(tsoracle_core::SystemClock))
.build()
.unwrap();
let _ = server.state_tx.send(ServingState::NotServing {
leader_endpoint: Some("http://other-node:9000".into()),
});
let hint = leader_hint_from(&server);
assert_eq!(
hint.leader_endpoint.as_deref(),
Some("http://other-node:9000")
);
assert!(hint.leader_epoch.is_none());
// The Serving branch flips the endpoint to None — exercises the
// race-window path that's otherwise only reachable when an extension
// observes mid-transition state_rx.
let _ = server.state_tx.send(ServingState::Serving);
let hint = leader_hint_from(&server);
assert!(hint.leader_endpoint.is_none());
// Sanity: Epoch construction itself is unrelated to this helper.
let _ = Epoch(1);
}
}