Skip to main content

tsoracle_client/
lib.rs

1//
2//  ░▀█▀░█▀▀░█▀█░█▀▄░█▀█░█▀▀░█░░░█▀▀
3//  ░░█░░▀▀█░█░█░█▀▄░█▀█░█░░░█░░░█▀▀
4//  ░░▀░░▀▀▀░▀▀▀░▀░▀░▀░▀░▀▀▀░▀▀▀░▀▀▀
5//
6//  tsoracle — Distributed Timestamp Oracle
7//  https://www.tsoracle.rs
8//
9//  Copyright (c) 2026 Prisma Risk
10//
11//  Licensed under the Apache License, Version 2.0 (the "License");
12//  you may not use this file except in compliance with the License.
13//  You may obtain a copy of the License at
14//
15//      https://www.apache.org/licenses/LICENSE-2.0
16//
17//  Unless required by applicable law or agreed to in writing, software
18//  distributed under the License is distributed on an "AS IS" BASIS,
19//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20//  See the License for the specific language governing permissions and
21//  limitations under the License.
22//
23
24#![doc = include_str!("../README.md")]
25// Panic policy (see CONTRIBUTING.md). `cfg_attr(not(test), ...)` skips the lint
26// for the lib's own unit tests; integration tests are separate compilation units.
27#![cfg_attr(not(test), warn(clippy::unwrap_used, clippy::expect_used))]
28
29mod attempt;
30mod budget;
31mod channel_pool;
32mod driver;
33mod driver_supervisor;
34mod error;
35mod leader_hint;
36mod response;
37mod retry;
38mod retry_policy;
39mod transport;
40mod worklist;
41
42#[cfg(test)]
43mod test_support;
44
45pub use error::ClientError;
46pub use retry_policy::RetryPolicy;
47pub use transport::BoxError;
48
49use std::sync::Arc;
50use std::time::Duration;
51use tsoracle_core::{Epoch, LOGICAL_MAX, Timestamp};
52
53/// The server's per-call cap on requested timestamps, fixed by the 18-bit
54/// logical width. Callers asking for more than this can't be served by any
55/// single RPC; the client rejects them up-front rather than burning a queue
56/// slot and round-trip to learn the same thing from the server.
57pub(crate) const MAX_TIMESTAMPS_PER_RPC: u32 = LOGICAL_MAX + 1;
58
59use crate::channel_pool::ChannelPool;
60
61pub struct ClientBuilder {
62    endpoints: Vec<String>,
63    flush_interval: Duration,
64    connector: Option<Arc<crate::transport::ChannelConnector>>,
65    tls_required: bool,
66    retry_policy: RetryPolicy,
67}
68
69impl ClientBuilder {
70    pub fn endpoints(endpoints: Vec<String>) -> Self {
71        ClientBuilder {
72            endpoints,
73            flush_interval: Duration::from_millis(1),
74            connector: None,
75            tls_required: false,
76            retry_policy: RetryPolicy::default(),
77        }
78    }
79
80    pub fn batch_flush_interval(mut self, flush_interval: Duration) -> Self {
81        self.flush_interval = flush_interval;
82        self
83    }
84
85    /// Override the default [`RetryPolicy`].
86    ///
87    /// The policy controls per-attempt deadlines, the overall deadline
88    /// across all candidate endpoints, the cap on attempts, and the
89    /// jittered backoff base. The per-attempt deadline is also pushed
90    /// down to `tonic::transport::Endpoint::connect_timeout` and
91    /// `Endpoint::timeout` for the built-in default and TLS transport
92    /// paths so a blackholed peer fails fast at the transport layer.
93    /// User-supplied [`Self::channel_connector`] closures own their
94    /// own Endpoint config; the policy still bounds the retry loop's
95    /// outer `tokio::time::timeout` around them.
96    pub fn retry_policy(mut self, policy: RetryPolicy) -> Self {
97        self.retry_policy = policy;
98        self
99    }
100
101    /// Configure the client to dial bare endpoints with TLS. Bare `host:port`
102    /// becomes `https://host:port`; explicit `http://...` endpoints supplied
103    /// in [`Self::endpoints`] remain plaintext; explicit `https://...`
104    /// endpoints use the provided TLS config.
105    ///
106    /// Wire-supplied `http://...` leader-hint trailers are NOT honored under
107    /// `tls_config` — they are dropped to prevent a contacted peer from
108    /// downgrading the transport. Operator-supplied configuration still wins;
109    /// untrusted wire input does not.
110    ///
111    /// Setting both [`Self::channel_connector`] and `tls_config` is allowed;
112    /// the last call wins (standard builder semantics). Calling
113    /// `channel_connector` after `tls_config` also clears the
114    /// reject-plaintext-hint policy, since the caller-owned connector owns
115    /// its own scheme policy.
116    #[cfg(any(feature = "tls-rustls", feature = "tls-native"))]
117    pub fn tls_config(mut self, cfg: tonic::transport::ClientTlsConfig) -> Self {
118        self.connector = Some(crate::transport::tls_connector(
119            cfg,
120            self.retry_policy.clone(),
121        ));
122        self.tls_required = true;
123        self
124    }
125
126    /// Replace the default plaintext channel construction with a caller-owned
127    /// closure. The closure is invoked on first use of each endpoint —
128    /// configured endpoints and leader-hint redirects alike. Errors returned
129    /// from the closure surface as [`ClientError::Connector`].
130    ///
131    /// See module docs for the interaction with [`Self::tls_config`]
132    /// (last-wins) and the scheme matrix. A caller-owned connector replaces
133    /// the built-in TLS plumbing entirely, including the
134    /// reject-plaintext-leader-hint policy — the closure is responsible for
135    /// whatever scheme policy it wants to enforce.
136    pub fn channel_connector<F, Fut>(mut self, connector: F) -> Self
137    where
138        F: Fn(&str) -> Fut + Send + Sync + 'static,
139        Fut: std::future::Future<Output = Result<tonic::transport::Channel, crate::BoxError>>
140            + Send
141            + 'static,
142    {
143        let wrapped: Arc<crate::transport::ChannelConnector> = Arc::new(move |endpoint: &str| {
144            let fut = connector(endpoint);
145            Box::pin(async move { fut.await.map_err(ClientError::Connector) })
146        });
147        self.connector = Some(wrapped);
148        self.tls_required = false;
149        self
150    }
151
152    pub async fn build(self) -> Result<Client, ClientError> {
153        if self.endpoints.is_empty() {
154            return Err(ClientError::NoReachableEndpoints);
155        }
156        let pool = Arc::new(ChannelPool::new(
157            self.endpoints,
158            self.connector,
159            self.tls_required,
160            self.retry_policy,
161        ));
162        let pool_for_rpc = pool.clone();
163        let driver = driver::Driver::spawn(
164            move |count| {
165                let pool = pool_for_rpc.clone();
166                Box::pin(async move { retry::issue_rpc(&pool, count).await })
167            },
168            self.flush_interval,
169        );
170        Ok(Client { pool, driver })
171    }
172}
173
174pub struct Client {
175    pool: Arc<ChannelPool>,
176    driver: driver::Driver,
177}
178
179impl Client {
180    pub async fn connect(endpoints: Vec<String>) -> Result<Self, ClientError> {
181        ClientBuilder::endpoints(endpoints).build().await
182    }
183
184    /// The endpoint the client currently believes is the leader, or `None`
185    /// if no leader has been observed yet or the cached entry has aged past
186    /// the configured `leader_ttl`.
187    ///
188    /// Read-only diagnostic surface for ops dashboards and integration tests
189    /// asserting that a client has converged to the expected leader. It
190    /// reflects the cache as last updated by a completed RPC — it neither
191    /// triggers nor waits on any network round-trip, and the TTL check is
192    /// lazy (an expired entry reads as `None`).
193    pub fn cached_leader(&self) -> Option<String> {
194        self.pool.cached_leader()
195    }
196
197    pub async fn get_ts(&self) -> Result<Timestamp, ClientError> {
198        Ok(self.driver.request(1).await?[0])
199    }
200
201    pub async fn get_ts_batch(&self, count: u32) -> Result<Vec<Timestamp>, ClientError> {
202        if count == 0 || count > MAX_TIMESTAMPS_PER_RPC {
203            return Err(ClientError::InvalidCount(count));
204        }
205        self.driver.request(count).await
206    }
207
208    /// Read the leader's current safe-point in physical-millisecond units.
209    ///
210    /// Targets the cached leader if known, otherwise the first configured
211    /// endpoint. Followers return `MaxSafe::max_safe_physical_ms == 0` rather
212    /// than erroring, matching the proto contract; pollers needing freshness
213    /// should target the leader endpoint.
214    pub async fn get_current_max_safe(&self) -> Result<MaxSafe, ClientError> {
215        let endpoint = self
216            .pool
217            .cached_leader()
218            .or_else(|| self.pool.iter_round_robin().into_iter().next())
219            .ok_or(ClientError::NoReachableEndpoints)?;
220        let (mut svc, _cell) = self.pool.client_with_cell(&endpoint).await?;
221        let resp = svc
222            .get_current_max_safe(tsoracle_proto::v1::GetCurrentMaxSafeRequest {})
223            .await
224            .map_err(ClientError::Rpc)?;
225        let inner = resp.into_inner();
226        Ok(MaxSafe {
227            max_safe_physical_ms: inner.max_safe_physical_ms,
228            epoch: Epoch::from_wire(inner.epoch_hi, inner.epoch_lo),
229        })
230    }
231}
232
233/// The leader's view of the durable safe-point, returned by
234/// [`Client::get_current_max_safe`].
235#[derive(Copy, Clone, Debug, PartialEq, Eq)]
236pub struct MaxSafe {
237    /// Safe-point in physical-millisecond units; `0` is the cold-start sentinel
238    /// (also the value any follower returns).
239    pub max_safe_physical_ms: u64,
240    /// Leader epoch that issued this view.
241    pub epoch: Epoch,
242}
243
244#[cfg(test)]
245mod tests {
246    use super::*;
247
248    #[tokio::test]
249    async fn cached_leader_is_none_before_any_rpc() {
250        // A freshly built client has issued no RPC, so the channel pool's
251        // leader cache is empty and `cached_leader()` reports `None`. This
252        // pins the "nothing observed yet" branch of the diagnostic accessor
253        // without needing a server; the post-RPC `Some(addr)` case is covered
254        // end-to-end in `tsoracle-tests`.
255        let client = Client::connect(vec!["http://127.0.0.1:1".into()])
256            .await
257            .expect("build with a non-empty endpoint list must succeed");
258        assert_eq!(client.cached_leader(), None);
259    }
260
261    #[tokio::test]
262    async fn build_rejects_empty_endpoint_list() {
263        // Validation prevents a Client whose `pool` has no endpoints to try;
264        // every RPC would fail-fast with `NoReachableEndpoints` and burn no
265        // network roundtrips at all, so reject up-front instead.
266        match ClientBuilder::endpoints(Vec::new()).build().await {
267            Err(ClientError::NoReachableEndpoints) => {}
268            Err(other) => panic!("expected NoReachableEndpoints, got {other:?}"),
269            Ok(_) => panic!("expected Err, got Ok(Client)"),
270        }
271    }
272
273    #[tokio::test]
274    async fn channel_connector_error_surfaces_as_connector_variant() {
275        let builder = ClientBuilder::endpoints(vec!["a:1".into()]).channel_connector(
276            |_endpoint: &str| async move {
277                Err::<tonic::transport::Channel, crate::BoxError>(
278                    std::io::Error::other("boom").into(),
279                )
280            },
281        );
282        let client = builder.build().await.expect("build must not fail");
283        let result = client.get_ts().await;
284        match result {
285            Err(ClientError::Connector(inner)) => {
286                assert!(inner.to_string().contains("boom"));
287            }
288            other => panic!("expected ClientError::Connector, got {other:?}"),
289        }
290    }
291
292    // Marker payload used by both last-wins tests. The free function holds
293    // the body in one source location so coverage credit flows from the test
294    // where the closure DOES run; the test that asserts "this never runs"
295    // just calls the same helper.
296    #[cfg(any(feature = "tls-rustls", feature = "tls-native"))]
297    async fn marker_connector_failure() -> Result<tonic::transport::Channel, crate::BoxError> {
298        Err("MARKER".into())
299    }
300
301    #[cfg(any(feature = "tls-rustls", feature = "tls-native"))]
302    #[tokio::test]
303    async fn tls_config_then_channel_connector_last_wins() {
304        // channel_connector is set LAST, so its path runs on get_ts. The
305        // marker error surfaces as `ClientError::Connector`, proving the
306        // builder did not silently keep the prior tls_config.
307        let builder = ClientBuilder::endpoints(vec!["a:1".into()])
308            .tls_config(tonic::transport::ClientTlsConfig::new())
309            .channel_connector(|_endpoint: &str| marker_connector_failure());
310        let client = builder.build().await.expect("build must not fail");
311        match client.get_ts().await {
312            Err(ClientError::Connector(inner)) => {
313                assert!(inner.to_string().contains("MARKER"));
314            }
315            other => panic!("expected Connector(MARKER), got {other:?}"),
316        }
317    }
318
319    #[cfg(any(feature = "tls-rustls", feature = "tls-native"))]
320    #[tokio::test]
321    async fn channel_connector_then_tls_config_last_wins() {
322        // tls_config is set LAST, so the connector path is replaced and its
323        // marker error must NOT surface. The tls_config path produces a
324        // transport-level failure (or NoReachableEndpoints / Rpc) instead.
325        let builder = ClientBuilder::endpoints(vec!["a:1".into()])
326            .channel_connector(|_endpoint: &str| marker_connector_failure())
327            .tls_config(tonic::transport::ClientTlsConfig::new());
328        let client = builder.build().await.expect("build must not fail");
329        let result = client.get_ts().await;
330        if let Err(ClientError::Connector(inner)) = &result
331            && inner.to_string().contains("MARKER")
332        {
333            panic!("tls_config set last must overwrite the prior channel_connector");
334        }
335    }
336
337    #[tokio::test]
338    async fn batch_flush_interval_overrides_default() {
339        // The builder's `batch_flush_interval` knob feeds the driver's
340        // coalescing window; without a test it could silently revert to the
341        // default and no-one would notice from black-box behavior. We
342        // confirm the override path by reaching into the builder fields.
343        let custom = Duration::from_millis(25);
344        let builder = ClientBuilder::endpoints(vec!["http://127.0.0.1:1".into()])
345            .batch_flush_interval(custom);
346        assert_eq!(builder.flush_interval, custom);
347    }
348
349    #[tokio::test]
350    async fn retry_policy_override_propagates_to_builder() {
351        // The builder field is what `build` hands to the pool and retry
352        // loop. If `retry_policy()` ever silently stops storing the
353        // override, the loop reverts to defaults; this test pins the
354        // override path against that.
355        let policy = RetryPolicy {
356            max_attempts: 7,
357            per_attempt_deadline: Duration::from_millis(11),
358            overall_deadline: Duration::from_millis(13),
359            base_backoff: Duration::from_millis(17),
360            leader_ttl: Duration::from_millis(19),
361        };
362        let builder = ClientBuilder::endpoints(vec!["http://127.0.0.1:1".into()])
363            .retry_policy(policy.clone());
364        assert_eq!(builder.retry_policy.max_attempts, policy.max_attempts);
365        assert_eq!(
366            builder.retry_policy.per_attempt_deadline,
367            policy.per_attempt_deadline
368        );
369        assert_eq!(
370            builder.retry_policy.overall_deadline,
371            policy.overall_deadline
372        );
373        assert_eq!(builder.retry_policy.base_backoff, policy.base_backoff);
374        assert_eq!(builder.retry_policy.leader_ttl, policy.leader_ttl);
375    }
376
377    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
378    async fn get_ts_returns_within_overall_deadline_when_all_endpoints_unreachable() {
379        // End-to-end test of the issue's acceptance criterion: with no
380        // listener bound at the configured endpoints, a `get_ts` call
381        // must return well before the OS-default TCP timeout (~75 s on
382        // Linux). The bound here is generous enough to absorb CI
383        // scheduler jitter — the assertion is "fast", not "exactly the
384        // configured deadline".
385        let policy = RetryPolicy {
386            max_attempts: 3,
387            per_attempt_deadline: Duration::from_millis(100),
388            overall_deadline: Duration::from_millis(300),
389            base_backoff: Duration::ZERO,
390            leader_ttl: Duration::from_secs(30),
391        };
392        let client = ClientBuilder::endpoints(vec![
393            "http://127.0.0.1:1".into(),
394            "http://127.0.0.1:2".into(),
395            "http://127.0.0.1:3".into(),
396        ])
397        .retry_policy(policy)
398        .build()
399        .await
400        .expect("builder must accept the policy");
401        let start = std::time::Instant::now();
402        let result = client.get_ts().await;
403        let elapsed = start.elapsed();
404        assert!(result.is_err(), "no listener can reply: {result:?}");
405        assert!(
406            elapsed < Duration::from_secs(2),
407            "deadline must short-circuit; took {elapsed:?}"
408        );
409    }
410}