use std::collections::BTreeMap;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
use reqwest::{Method, StatusCode};
use serde::de::DeserializeOwned;
use serde::Serialize;
use crate::error::Error;
use super::config::{CallOptions, Consistency, Prefer, Routing, SeedAuth, SeedTls, Timeouts};
use super::discovery::{DiscoveredPeer, Discovery};
use super::error as seed_err;
use super::health::HealthHandle;
use super::peers::{Endpoint, Peer, PeerErrorClass, PeerSet};
use super::resources::{
CustodyResource, MeshResource, OtaResource, PairResource, StoreResource, WitnessResource,
};
use super::retry;
use super::session::SeedSession;
use super::token_book::{SharedTokenBook, TokenBook};
static INSECURE_WARN: AtomicBool = AtomicBool::new(false);
#[derive(Debug, Clone)]
pub struct SeedClient {
inner: Arc<SeedInner>,
}
#[derive(Debug)]
pub(crate) struct SeedInner {
pub(crate) http: reqwest::Client,
pub(crate) peers: Arc<Mutex<PeerSet>>,
pub(crate) auth: SeedAuth,
pub(crate) timeouts: Timeouts,
pub(crate) max_retries: u32,
#[allow(dead_code)]
pub(crate) routing: Routing,
pub(crate) token_book: SharedTokenBook,
#[allow(dead_code)]
pub(crate) health: Option<HealthHandle>,
pub(crate) auth_failure_counts: Mutex<BTreeMap<String, u32>>,
pub(crate) discovery: Option<Arc<dyn Discovery>>,
}
impl SeedClient {
pub fn builder() -> SeedClientBuilder {
SeedClientBuilder::default()
}
pub fn pair(&self) -> PairResource<'_> {
PairResource { client: self }
}
pub fn store(&self) -> StoreResource<'_> {
StoreResource { client: self }
}
pub fn witness(&self) -> WitnessResource<'_> {
WitnessResource { client: self }
}
pub fn custody(&self) -> CustodyResource<'_> {
CustodyResource { client: self }
}
pub fn ota(&self) -> OtaResource<'_> {
OtaResource { client: self }
}
pub fn mesh(&self) -> MeshResource<'_> {
MeshResource { client: self }
}
pub async fn rediscover(&self) -> Result<(), Error> {
if let Some(ref d) = self.inner.discovery {
let new_peers = d.discover().await?;
if new_peers.is_empty() {
return Err(Error::Validation(
"seed: Discovery returned zero peers; refusing to reset PeerSet".into(),
));
}
self.rebuild_peer_set(&new_peers)?;
} else {
let mut guard = self.inner.peers.lock().map_err(|_| Error::Api {
code: 0,
message: "seed: peers lock poisoned".into(),
})?;
guard.rediscover();
}
Ok(())
}
fn rebuild_peer_set(&self, discovered: &[DiscoveredPeer]) -> Result<(), Error> {
let endpoints = discovered
.iter()
.map(|p| Endpoint::parse(&p.url))
.collect::<Result<Vec<_>, _>>()?;
let new_set = PeerSet::new(endpoints)?;
let mut guard = self.inner.peers.lock().map_err(|_| Error::Api {
code: 0,
message: "seed: peers lock poisoned".into(),
})?;
*guard = new_set;
Ok(())
}
pub fn session(&self) -> SeedSession<'_> {
let pinned_peer = {
let guard = self.inner.peers.lock().expect("peers lock poisoned");
guard.pick().endpoint.key()
};
SeedSession {
client: self,
pinned_peer,
}
}
pub fn peers(&self) -> Vec<Peer> {
let guard = self.inner.peers.lock().expect("peers lock poisoned");
guard.peers().to_vec()
}
#[doc(hidden)]
pub fn token_for_peer(&self, peer_key: &str) -> Option<String> {
self.inner
.token_book
.get(peer_key)
.map(|s| s.as_str().to_owned())
}
#[doc(hidden)]
pub fn trust_score_failures(&self, peer_key: &str) -> u32 {
self.inner
.auth_failure_counts
.lock()
.ok()
.and_then(|g| g.get(peer_key).copied())
.unwrap_or(0)
}
#[doc(hidden)]
pub fn reset_trust_score(&self, peer_url: Option<&str>) {
if let Ok(mut guard) = self.inner.auth_failure_counts.lock() {
match peer_url {
Some(key) => {
guard.remove(key);
}
None => guard.clear(),
}
}
}
pub async fn status(&self) -> Result<super::models::Status, Error> {
self.request_get("/status").await
}
pub async fn status_with(&self, opts: CallOptions) -> Result<super::models::Status, Error> {
self.request_get_opts("/status", &opts).await
}
pub async fn identity(&self) -> Result<super::models::Identity, Error> {
self.request_get("/identity").await
}
pub async fn identity_with(&self, opts: CallOptions) -> Result<super::models::Identity, Error> {
self.request_get_opts("/identity", &opts).await
}
#[allow(dead_code)]
pub(crate) fn inner(&self) -> &SeedInner {
&self.inner
}
pub(crate) async fn request_get<T: DeserializeOwned>(&self, path: &str) -> Result<T, Error> {
self.request::<T, ()>(Method::GET, path, None, false, None)
.await
}
pub(crate) async fn request_post<T, B>(
&self,
path: &str,
body: &B,
idempotent: bool,
) -> Result<T, Error>
where
T: DeserializeOwned,
B: Serialize + ?Sized,
{
self.request::<T, &B>(Method::POST, path, Some(body), idempotent, None)
.await
}
pub(crate) async fn request_delete<T: DeserializeOwned>(&self, path: &str) -> Result<T, Error> {
self.request::<T, ()>(Method::DELETE, path, None, false, None)
.await
}
pub(crate) async fn request_on_peer_get<T: DeserializeOwned>(
&self,
path: &str,
pinned: Option<&str>,
) -> Result<T, Error> {
self.request::<T, ()>(Method::GET, path, None, false, pinned)
.await
}
pub(crate) async fn request_on_peer_post<T, B>(
&self,
path: &str,
body: &B,
idempotent: bool,
pinned: Option<&str>,
) -> Result<T, Error>
where
T: DeserializeOwned,
B: Serialize + ?Sized,
{
self.request::<T, &B>(Method::POST, path, Some(body), idempotent, pinned)
.await
}
pub(crate) async fn request_get_opts<T: DeserializeOwned>(
&self,
path: &str,
opts: &CallOptions,
) -> Result<T, Error> {
let pin = self.resolve_call_options(opts)?;
self.request::<T, ()>(Method::GET, path, None, false, pin.as_deref())
.await
}
pub(crate) async fn request_post_opts<T, B>(
&self,
path: &str,
body: &B,
idempotent: bool,
opts: &CallOptions,
) -> Result<T, Error>
where
T: DeserializeOwned,
B: Serialize + ?Sized,
{
let pin = self.resolve_call_options(opts)?;
self.request::<T, &B>(Method::POST, path, Some(body), idempotent, pin.as_deref())
.await
}
fn resolve_call_options(&self, opts: &CallOptions) -> Result<Option<String>, Error> {
if let Some(Consistency::Strong) = opts.consistency {
return Err(seed_err::unsupported(
"strong consistency unsupported; seed has no quorum",
));
}
if let Some(peer) = opts.peer.as_deref() {
let guard = self.inner.peers.lock().map_err(|_| Error::Api {
code: 0,
message: "seed: peers lock poisoned".into(),
})?;
let key = match guard.find_by_key(peer) {
Some(p) => p.endpoint.key(),
None => {
return Err(seed_err::config(&format!("peer not in mesh: {peer}")));
}
};
return Ok(Some(key));
}
if let Some(prefer) = opts.prefer {
let guard = self.inner.peers.lock().map_err(|_| Error::Api {
code: 0,
message: "seed: peers lock poisoned".into(),
})?;
let key = match prefer {
Prefer::Closest | Prefer::Any => guard.pick().endpoint.key(),
Prefer::LocalFirst => guard.pick_local_first().endpoint.key(),
Prefer::Random => guard.pick_random().endpoint.key(),
};
return Ok(Some(key));
}
Ok(None)
}
fn pick_peer(&self, pinned: Option<&str>) -> Result<Endpoint, Error> {
let guard = self.inner.peers.lock().map_err(|_| Error::Api {
code: 0,
message: "seed: peers lock poisoned".into(),
})?;
let ep = match pinned.and_then(|k| guard.find_by_key(k)) {
Some(p) => p.endpoint.clone(),
None => guard.pick().endpoint.clone(),
};
Ok(ep)
}
fn next_peer(&self, failed_key: &str) -> Option<Endpoint> {
let guard = self.inner.peers.lock().ok()?;
let failed_peer = guard.find_by_key(failed_key)?.clone();
guard.next_after(&failed_peer).map(|p| p.endpoint.clone())
}
async fn request<T, B>(
&self,
method: Method,
path: &str,
body: Option<B>,
idempotent: bool,
pinned: Option<&str>,
) -> Result<T, Error>
where
T: DeserializeOwned,
B: Serialize,
{
let started = Instant::now();
let mut attempt: u32 = 0;
let mut peer = self.pick_peer(pinned)?;
let total_peers = self.peer_count();
let mut peers_tried: usize = 0;
let mut last_err: Option<Error> = None;
let body_bytes: Option<Vec<u8>> = match body.as_ref() {
Some(b) => Some(serde_json::to_vec(b).map_err(Error::from)?),
None => None,
};
loop {
if started.elapsed() > self.inner.timeouts.total {
return Err(last_err.unwrap_or(Error::Api {
code: 0,
message: "seed: total deadline exceeded".into(),
}));
}
let peer_key = peer.key();
let url = peer.join_api(path)?;
let call_started = Instant::now();
let mut req = self.inner.http.request(method.clone(), url.clone());
if let Some(tok) = self.inner.token_book.get(&peer_key) {
req = req.header("X-Pairing-Token", tok.as_str());
} else if let SeedAuth::PairingToken(tok) = &self.inner.auth {
req = req.header("X-Pairing-Token", tok.as_str());
}
req = req.header(reqwest::header::ACCEPT, "application/json");
if let Some(bytes) = body_bytes.as_ref() {
req = req
.header(reqwest::header::CONTENT_TYPE, "application/json")
.body(bytes.clone());
}
let send_result = req.send().await;
match send_result {
Ok(response) => {
let status = response.status();
if status.is_success() {
self.mark_peer_success(&peer_key, call_started.elapsed());
self.reset_auth_failures(&peer_key);
return parse_success::<T>(response).await;
}
let headers = response.headers().clone();
let body_text = response.text().await.unwrap_or_default();
if let Some(class) = classify_status(status) {
self.mark_peer_failure(&peer_key, class);
}
if is_auth_status(status) {
let count = self.bump_auth_failures(&peer_key);
if count >= TRUST_SCORE_THRESHOLD {
return Err(seed_err::trust_score_blocked(&peer_key));
}
}
match dispatch_status_outcome(status) {
StatusOutcome::Cycle => {
peers_tried += 1;
last_err = Some(seed_err::from_response(status, &body_text, path));
if peers_tried >= total_peers {
if retry::should_retry(&method, status, idempotent)
&& attempt < self.inner.max_retries
{
let hint = retry::parse_retry_after(&headers, &body_text);
let delay = delay_for(attempt, hint);
if started.elapsed() + delay > self.inner.timeouts.total {
return Err(last_err.take().unwrap_or_else(|| {
seed_err::from_response(status, &body_text, path)
}));
}
tokio::time::sleep(delay).await;
attempt += 1;
continue;
}
return Err(last_err.take().unwrap_or_else(|| {
seed_err::from_response(status, &body_text, path)
}));
}
match self.next_peer(&peer_key) {
Some(next) => {
peer = next;
continue;
}
None => {
return Err(seed_err::from_response(status, &body_text, path));
}
}
}
StatusOutcome::PinAndBackoff => {
if retry::should_retry(&method, status, idempotent)
&& attempt < self.inner.max_retries
{
let hint = retry::parse_retry_after(&headers, &body_text);
let delay = delay_for(attempt, hint);
if started.elapsed() + delay > self.inner.timeouts.total {
return Err(seed_err::from_response(status, &body_text, path));
}
tokio::time::sleep(delay).await;
attempt += 1;
continue;
}
return Err(seed_err::from_response(status, &body_text, path));
}
StatusOutcome::Surface => {
return Err(seed_err::from_response(status, &body_text, path));
}
}
}
Err(e) => {
let class = if e.is_timeout() {
PeerErrorClass::Timeout
} else {
PeerErrorClass::Network
};
self.mark_peer_failure(&peer_key, class);
last_err = Some(Error::from(reqwest_error_into_transport(e)));
peers_tried += 1;
if peers_tried < total_peers {
if let Some(next) = self.next_peer(&peer_key) {
peer = next;
continue;
}
}
if attempt < self.inner.max_retries {
let delay = retry::compute_delay(
attempt,
retry::DEFAULT_BASE_MS,
retry::DEFAULT_CAP_MS,
);
if started.elapsed() + delay > self.inner.timeouts.total {
return Err(last_err.take().unwrap_or(Error::Api {
code: 0,
message: "seed: transport exhausted".into(),
}));
}
tokio::time::sleep(delay).await;
attempt += 1;
peers_tried = 0; continue;
}
return Err(last_err.take().unwrap_or(Error::Api {
code: 0,
message: "seed: transport exhausted".into(),
}));
}
}
}
}
fn peer_count(&self) -> usize {
self.inner.peers.lock().map(|g| g.len()).unwrap_or(1)
}
fn mark_peer_success(&self, peer_key: &str, latency: Duration) {
if let Ok(mut guard) = self.inner.peers.lock() {
guard.mark_success(peer_key, latency);
}
}
fn mark_peer_failure(&self, peer_key: &str, class: PeerErrorClass) {
if let Ok(mut guard) = self.inner.peers.lock() {
guard.mark_failure(peer_key, class);
}
}
fn reset_auth_failures(&self, peer_key: &str) {
if let Ok(mut guard) = self.inner.auth_failure_counts.lock() {
guard.remove(peer_key);
}
}
fn bump_auth_failures(&self, peer_key: &str) -> u32 {
match self.inner.auth_failure_counts.lock() {
Ok(mut guard) => {
let n = guard.entry(peer_key.to_owned()).or_insert(0);
*n = n.saturating_add(1);
*n
}
Err(_) => 0,
}
}
}
enum StatusOutcome {
Cycle,
PinAndBackoff,
Surface,
}
fn dispatch_status_outcome(status: StatusCode) -> StatusOutcome {
match status.as_u16() {
500 | 502 | 503 | 504 => StatusOutcome::Cycle,
429 => StatusOutcome::PinAndBackoff,
_ => StatusOutcome::Surface,
}
}
fn is_auth_status(status: StatusCode) -> bool {
matches!(status.as_u16(), 401 | 403)
}
const TRUST_SCORE_THRESHOLD: u32 = 3;
fn classify_status(status: StatusCode) -> Option<PeerErrorClass> {
match status.as_u16() {
503 => Some(PeerErrorClass::ServiceUnavailable),
500 | 502 | 504 => Some(PeerErrorClass::Server5xx),
_ => None,
}
}
fn reqwest_error_into_transport(e: reqwest::Error) -> reqwest::Error {
e
}
fn delay_for(attempt: u32, server_hint: Option<Duration>) -> Duration {
let computed = retry::compute_delay(attempt, retry::DEFAULT_BASE_MS, retry::DEFAULT_CAP_MS);
match server_hint {
Some(hint) => {
std::cmp::max(hint, computed).min(Duration::from_millis(retry::DEFAULT_CAP_MS))
}
None => computed,
}
}
async fn parse_success<T: DeserializeOwned>(response: reqwest::Response) -> Result<T, Error> {
let status = response.status();
let text = response.text().await?;
if status == StatusCode::NO_CONTENT || text.is_empty() {
return serde_json::from_str::<T>("null")
.or_else(|_| serde_json::from_str::<T>("{}"))
.map_err(Error::from);
}
serde_json::from_str::<T>(&text).map_err(Error::from)
}
#[derive(Default)]
pub struct SeedClientBuilder {
endpoints: Vec<String>,
auth: SeedAuth,
tls: SeedTls,
timeouts: Timeouts,
routing: Routing,
max_retries: Option<u32>,
token_book: Option<Box<dyn TokenBook>>,
health_interval: Option<Duration>,
discovery: Option<Arc<dyn Discovery>>,
}
impl std::fmt::Debug for SeedClientBuilder {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("SeedClientBuilder")
.field("endpoints", &self.endpoints)
.field("auth", &self.auth)
.field("tls", &self.tls)
.field("timeouts", &self.timeouts)
.field("routing", &self.routing)
.field("max_retries", &self.max_retries)
.field("token_book", &self.token_book.is_some())
.field("health_interval", &self.health_interval)
.field("discovery", &self.discovery.is_some())
.finish()
}
}
impl SeedClientBuilder {
pub fn endpoint(mut self, url: impl Into<String>) -> Self {
self.endpoints = vec![url.into()];
self
}
pub fn endpoints<S: AsRef<str>>(mut self, urls: &[S]) -> Self {
self.endpoints = urls.iter().map(|u| u.as_ref().to_owned()).collect();
self
}
pub fn auth(mut self, auth: SeedAuth) -> Self {
self.auth = auth;
self
}
pub fn tls(mut self, tls: SeedTls) -> Self {
self.tls = tls;
self
}
pub fn timeouts(mut self, timeouts: Timeouts) -> Self {
self.timeouts = timeouts;
self
}
pub fn routing(mut self, routing: Routing) -> Self {
self.routing = routing;
self
}
pub fn max_retries(mut self, n: u32) -> Self {
self.max_retries = Some(n);
self
}
pub fn token_book<B: TokenBook + 'static>(mut self, book: B) -> Self {
self.token_book = Some(Box::new(book));
self
}
pub fn health_interval(mut self, interval: Duration) -> Self {
self.health_interval = Some(interval);
self
}
pub fn discovery<D: Discovery + 'static>(mut self, discovery: D) -> Self {
self.discovery = Some(Arc::new(discovery));
self
}
pub fn discovery_arc(mut self, discovery: Arc<dyn Discovery>) -> Self {
self.discovery = Some(discovery);
self
}
pub fn build(self) -> Result<SeedClient, Error> {
let mut endpoints_str = self.endpoints.clone();
let mut discovered_peers: Vec<DiscoveredPeer> = Vec::new();
if endpoints_str.is_empty() {
if let Some(ref d) = self.discovery {
let discovered = block_on_discover(d.as_ref())?;
if discovered.is_empty() {
return Err(Error::Validation(
"SeedClient: Discovery returned zero peers at build time".into(),
));
}
endpoints_str = discovered.iter().map(|p| p.url.clone()).collect();
discovered_peers = discovered;
}
}
if endpoints_str.is_empty() {
return Err(Error::Validation(
"SeedClient: at least one .endpoint(...) or .discovery(...) is required".into(),
));
}
let endpoints = endpoints_str
.iter()
.map(|s| Endpoint::parse(s))
.collect::<Result<Vec<_>, _>>()?;
let peer_set = PeerSet::new(endpoints)?;
let peers = Arc::new(Mutex::new(peer_set));
let http = build_http_client(&self.tls, &self.timeouts, &discovered_peers)?;
let token_book = match self.token_book {
Some(book) => SharedTokenBook::new_boxed(book),
None => SharedTokenBook::default(),
};
if let SeedAuth::PairingToken(tok) = &self.auth {
let guard = peers.lock().expect("peers lock poisoned");
for p in guard.peers() {
if token_book.get(&p.endpoint.key()).is_none() {
token_book.set(&p.endpoint.key(), tok.clone());
}
}
}
let health = self
.health_interval
.map(|interval| HealthHandle::spawn(http.clone(), Arc::clone(&peers), interval));
Ok(SeedClient {
inner: Arc::new(SeedInner {
http,
peers,
auth: self.auth,
timeouts: self.timeouts,
max_retries: self.max_retries.unwrap_or(3),
routing: self.routing,
token_book,
health,
auth_failure_counts: Mutex::new(BTreeMap::new()),
discovery: self.discovery,
}),
})
}
}
fn block_on_discover(d: &dyn Discovery) -> Result<Vec<DiscoveredPeer>, Error> {
std::thread::scope(|scope| {
scope
.spawn(|| {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.map_err(|e| Error::Validation(format!("seed: rt build: {e}")))?;
rt.block_on(d.discover())
})
.join()
.map_err(|_| Error::Api {
code: 0,
message: "seed: discovery thread panicked".into(),
})?
})
}
fn build_http_client(
tls: &SeedTls,
timeouts: &Timeouts,
discovered: &[DiscoveredPeer],
) -> Result<reqwest::Client, Error> {
let mut builder = reqwest::Client::builder()
.connect_timeout(timeouts.connect)
.timeout(timeouts.read)
.redirect(reqwest::redirect::Policy::none())
.pool_idle_timeout(Some(Duration::from_secs(60)));
let pins = super::tls_pin::build_pin_map(discovered)
.map_err(|e| Error::Validation(format!("seed: invalid peer URL during pin map: {e}")))?;
match tls {
SeedTls::Pinned(pem) => {
let cert = reqwest::Certificate::from_pem(pem)
.map_err(|e| Error::Validation(format!("invalid seed trust root PEM: {e}")))?;
builder = builder
.tls_built_in_root_certs(false)
.add_root_certificate(cert);
}
SeedTls::System if !pins.is_empty() => {
let verifier = super::tls_pin::FingerprintPinVerifier::with_webpki_roots(pins)
.map_err(|e| Error::Validation(format!("seed: fingerprint verifier: {e}")))?;
builder = install_rustls_verifier(builder, verifier)?;
}
SeedTls::Insecure if !pins.is_empty() => {
if !INSECURE_WARN.swap(true, Ordering::Relaxed) {
eprintln!(
"cognitum-rs seed: TLS verification is DISABLED via \
SeedTls::Insecure. Never use this in production — \
prefer SeedTls::Pinned for self-signed seeds (ADR-0007)."
);
}
let verifier = super::tls_pin::FingerprintPinVerifier::with_insecure_fallback(pins);
builder = install_rustls_verifier(builder, verifier)?;
}
SeedTls::System => {}
SeedTls::Insecure => {
if !INSECURE_WARN.swap(true, Ordering::Relaxed) {
eprintln!(
"cognitum-rs seed: TLS verification is DISABLED via \
SeedTls::Insecure. Never use this in production — \
prefer SeedTls::Pinned for self-signed seeds (ADR-0007)."
);
}
builder = builder.danger_accept_invalid_certs(true);
}
}
builder
.build()
.map_err(|e| Error::Validation(format!("seed http client: {e}")))
}
fn install_rustls_verifier(
builder: reqwest::ClientBuilder,
verifier: super::tls_pin::FingerprintPinVerifier,
) -> Result<reqwest::ClientBuilder, Error> {
use rustls::crypto::CryptoProvider;
if CryptoProvider::get_default().is_none() {
let _ = rustls::crypto::ring::default_provider().install_default();
}
let config = rustls::ClientConfig::builder()
.dangerous()
.with_custom_certificate_verifier(Arc::new(verifier))
.with_no_client_auth();
Ok(builder.use_preconfigured_tls(config))
}
impl SharedTokenBook {
pub(crate) fn new_boxed(book: Box<dyn TokenBook>) -> Self {
Self::from_boxed(book)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn builder_rejects_empty_endpoints() {
let err = SeedClient::builder().build().unwrap_err();
assert!(matches!(err, Error::Validation(_)));
}
#[test]
fn builder_accepts_session_routing_with_mesh() {
let client = SeedClient::builder()
.endpoints(&["https://s1:8443", "https://s2:8443"])
.routing(Routing::Session)
.build()
.expect("mesh builds");
assert_eq!(client.inner().peers.lock().unwrap().len(), 2);
}
#[test]
fn builder_accepts_multi_endpoint_phase_1_5() {
let client = SeedClient::builder()
.endpoints(&["https://s1:8443", "https://s2:8443", "https://s3:8443"])
.build()
.expect("three-peer mesh builds");
assert_eq!(client.peers().len(), 3);
}
#[test]
fn builder_builds_with_system_tls() {
let client = SeedClient::builder()
.endpoint("https://cognitum.local:8443")
.tls(SeedTls::System)
.build()
.expect("system-TLS seed client should build");
assert_eq!(client.peers().len(), 1);
}
#[test]
fn builder_builds_with_insecure_tls() {
let client = SeedClient::builder()
.endpoint("https://localhost:18443")
.tls(SeedTls::Insecure)
.build()
.expect("insecure seed client should build");
assert_eq!(client.peers().len(), 1);
}
#[test]
fn builder_rejects_invalid_endpoint() {
let err = SeedClient::builder()
.endpoint("not a url")
.build()
.unwrap_err();
assert!(matches!(err, Error::Validation(_)));
}
#[test]
fn builder_accepts_pinned_tls_bytes() {
let result = SeedClient::builder()
.endpoint("https://seed:8443")
.tls(SeedTls::Pinned(b"not a real pem".to_vec()))
.build();
match result {
Ok(_) => {}
Err(Error::Validation(msg)) => {
assert!(msg.contains("trust root"), "got: {msg}");
}
Err(other) => panic!("unexpected: {other:?}"),
}
}
#[test]
fn builder_seeds_token_book_from_single_pairing_token() {
let client = SeedClient::builder()
.endpoints(&["https://a:8443", "https://b:8443"])
.auth(SeedAuth::pairing_token("shared"))
.build()
.expect("build");
assert_eq!(
client
.inner()
.token_book
.get("https://a:8443")
.unwrap()
.as_str(),
"shared"
);
assert_eq!(
client
.inner()
.token_book
.get("https://b:8443")
.unwrap()
.as_str(),
"shared"
);
}
#[test]
fn session_pins_peer_key() {
let client = SeedClient::builder()
.endpoints(&["https://a:8443", "https://b:8443"])
.build()
.unwrap();
let session = client.session();
assert!(
session.pinned_peer() == "https://a:8443" || session.pinned_peer() == "https://b:8443"
);
}
#[tokio::test]
async fn post_body_serialized_once_across_retries() {
use std::sync::atomic::{AtomicUsize, Ordering as SerdeOrd};
use std::sync::Arc as StdArc;
use wiremock::matchers::{method as wmethod, path as wpath};
use wiremock::{Mock, MockServer, ResponseTemplate};
struct CountingBody {
counter: StdArc<AtomicUsize>,
}
impl serde::Serialize for CountingBody {
fn serialize<S: serde::Serializer>(&self, s: S) -> Result<S::Ok, S::Error> {
self.counter.fetch_add(1, SerdeOrd::Relaxed);
use serde::ser::SerializeMap;
let mut m = s.serialize_map(Some(1))?;
m.serialize_entry("probe", "1")?;
m.end()
}
}
let server = MockServer::start().await;
Mock::given(wmethod("POST"))
.and(wpath("/api/v1/store/query"))
.respond_with(ResponseTemplate::new(503).set_body_string("{}"))
.up_to_n_times(2)
.mount(&server)
.await;
Mock::given(wmethod("POST"))
.and(wpath("/api/v1/store/query"))
.respond_with(
ResponseTemplate::new(200)
.set_body_json(serde_json::json!({"results": [], "query_ms": 0.0})),
)
.expect(1)
.mount(&server)
.await;
let client = SeedClient::builder()
.endpoint(server.uri())
.tls(SeedTls::System)
.max_retries(3)
.build()
.unwrap();
let counter = StdArc::new(AtomicUsize::new(0));
let body = CountingBody {
counter: StdArc::clone(&counter),
};
let _: serde_json::Value = client
.request_post("/store/query", &body, true)
.await
.expect("eventual success");
assert_eq!(
counter.load(SerdeOrd::Relaxed),
1,
"body serialized more than once — retry loop re-serialized \
per attempt (regression on #23)"
);
}
}