#![cfg(feature = "client")]
#![allow(
clippy::missing_errors_doc,
clippy::module_name_repetitions,
clippy::too_long_first_doc_paragraph
)]
use std::future::Future;
use std::pin::Pin;
use bytes::Bytes;
use mnem_core::id::Cid;
use reqwest::{Client, StatusCode};
use serde::{Deserialize, Serialize};
use crate::error::ClientError;
use crate::have_set::{BloomHaveSet, HaveSet};
use crate::protocol::{
CAPABILITIES_HEADER, Capability, CapabilitySet, PROTOCOL_HEADER, PROTOCOL_VERSION,
};
use crate::remote::RemoteConfig;
use crate::secret_token::SecretToken;
type BoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + Send + 'a>>;
pub trait RemoteClient: Send + Sync {
fn list_refs(&self) -> BoxFuture<'_, Result<RefsResponse, ClientError>>;
fn fetch_blocks(
&self,
wants: Vec<Cid>,
have_set: BloomHaveSet,
) -> BoxFuture<'_, Result<Bytes, ClientError>>;
fn push_blocks(&self, car_body: Bytes) -> BoxFuture<'_, Result<PushResponse, ClientError>>;
fn advance_head(
&self,
old: Cid,
new: Cid,
ref_name: String,
) -> BoxFuture<'_, Result<(), ClientError>>;
}
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct RefsResponse {
pub head: Option<Cid>,
pub refs: std::collections::BTreeMap<String, Cid>,
pub capabilities: Vec<Capability>,
}
#[derive(Debug, Deserialize)]
struct RefsWireBody {
#[serde(default)]
head: Option<String>,
#[serde(default)]
refs: std::collections::BTreeMap<String, String>,
#[serde(default)]
capabilities: Vec<String>,
}
#[must_use]
pub fn parse_wire_capabilities(raw: &[String]) -> Vec<Capability> {
let mut out: Vec<Capability> = raw
.iter()
.filter_map(|s| s.parse::<Capability>().ok())
.collect();
out.sort_by_key(Capability::as_wire_str);
out.dedup();
out
}
#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
pub struct PushResponse {
pub accepted: u64,
pub root: Cid,
}
#[derive(Debug)]
pub struct HttpRemoteClient {
client: Client,
base_url: String,
token: Option<SecretToken>,
capabilities: CapabilitySet,
}
impl HttpRemoteClient {
#[must_use]
pub fn new(cfg: RemoteConfig) -> Self {
let capabilities = if cfg.capabilities.is_empty() {
CapabilitySet::all_known()
} else {
CapabilitySet::with_caps(cfg.capabilities.iter().copied())
};
Self {
client: Client::new(),
base_url: cfg.url.trim_end_matches('/').to_owned(),
token: cfg.token,
capabilities,
}
}
pub async fn negotiate_capabilities(&mut self) -> Result<(), ClientError> {
let refs = self.list_refs_impl().await?;
let server_caps = CapabilitySet::with_caps(refs.capabilities.iter().copied());
self.capabilities = self.capabilities.intersect(&server_caps);
Ok(())
}
#[must_use]
pub const fn capabilities(&self) -> &CapabilitySet {
&self.capabilities
}
async fn list_refs_impl(&self) -> Result<RefsResponse, ClientError> {
let url = format!("{}/remote/v1/refs", self.base_url);
let req = self
.client
.get(&url)
.header(PROTOCOL_HEADER, PROTOCOL_VERSION.to_string())
.header(CAPABILITIES_HEADER, self.capabilities.serialize());
let resp = req.send().await?;
let status = resp.status();
if status == StatusCode::UNAUTHORIZED || status == StatusCode::FORBIDDEN {
return Err(ClientError::Auth(format!(
"list_refs rejected with {status}"
)));
}
if !status.is_success() {
return Err(ClientError::Protocol(format!(
"list_refs: unexpected status {status}"
)));
}
let body = resp.bytes().await?;
let wire: RefsWireBody = serde_json::from_slice(&body)?;
let head =
match wire.head {
None => None,
Some(ref s) => Some(Cid::parse_str(s).map_err(|e| {
ClientError::Protocol(format!("list_refs: invalid head CID: {e}"))
})?),
};
let mut refs = std::collections::BTreeMap::new();
for (name, cid_str) in wire.refs {
let cid = Cid::parse_str(&cid_str).map_err(|e| {
ClientError::Protocol(format!("list_refs: invalid CID for ref `{name}`: {e}"))
})?;
refs.insert(name, cid);
}
let capabilities = parse_wire_capabilities(&wire.capabilities);
Ok(RefsResponse {
head,
refs,
capabilities,
})
}
fn bearer_header(&self) -> Option<String> {
self.token
.as_ref()
.map(|t| format!("Bearer {}", t.expose()))
}
}
impl HttpRemoteClient {
async fn fetch_blocks_impl(
&self,
wants: Vec<Cid>,
have_set: BloomHaveSet,
) -> Result<Bytes, ClientError> {
let url = format!("{}/remote/v1/fetch-blocks", self.base_url);
let wants_str: Vec<String> = wants.iter().map(Cid::to_string).collect();
let body = serde_json::json!({
"wants": wants_str,
"have_set": have_set.serialize(),
});
let resp = self
.client
.post(&url)
.header(PROTOCOL_HEADER, PROTOCOL_VERSION.to_string())
.header(CAPABILITIES_HEADER, self.capabilities.serialize())
.json(&body)
.send()
.await?;
let status = resp.status();
if status == StatusCode::UNAUTHORIZED || status == StatusCode::FORBIDDEN {
return Err(ClientError::Auth(format!(
"fetch_blocks rejected with {status}"
)));
}
if !status.is_success() {
return Err(ClientError::Protocol(format!(
"fetch_blocks: unexpected status {status}"
)));
}
let bytes = resp.bytes().await?;
Ok(bytes)
}
async fn push_blocks_impl(&self, car_body: Bytes) -> Result<PushResponse, ClientError> {
let url = format!("{}/remote/v1/push-blocks", self.base_url);
let auth = self
.bearer_header()
.ok_or_else(|| ClientError::Auth("push_blocks: no bearer token configured".into()))?;
let resp = self
.client
.post(&url)
.header(PROTOCOL_HEADER, PROTOCOL_VERSION.to_string())
.header(CAPABILITIES_HEADER, self.capabilities.serialize())
.header(reqwest::header::AUTHORIZATION, auth)
.header(reqwest::header::CONTENT_TYPE, "application/vnd.ipld.car")
.body(car_body)
.send()
.await?;
let status = resp.status();
if status == StatusCode::UNAUTHORIZED || status == StatusCode::FORBIDDEN {
return Err(ClientError::Auth(format!(
"push_blocks rejected with {status}"
)));
}
if !status.is_success() {
return Err(ClientError::Protocol(format!(
"push_blocks: unexpected status {status}"
)));
}
#[derive(Deserialize)]
struct Wire {
staged: Option<String>,
blocks_accepted: u64,
}
let body = resp.bytes().await?;
let wire: Wire = serde_json::from_slice(&body)?;
let root_str = wire.staged.ok_or_else(|| {
ClientError::Protocol("push_blocks: server returned null staged root".into())
})?;
let root = Cid::parse_str(&root_str).map_err(|e| {
ClientError::Protocol(format!("push_blocks: server staged root parse: {e}"))
})?;
Ok(PushResponse {
accepted: wire.blocks_accepted,
root,
})
}
async fn advance_head_impl(
&self,
old: Cid,
new: Cid,
ref_name: String,
) -> Result<(), ClientError> {
let url = format!("{}/remote/v1/advance-head", self.base_url);
let auth = self
.bearer_header()
.ok_or_else(|| ClientError::Auth("advance_head: no bearer token configured".into()))?;
let body = serde_json::json!({
"old": old.to_string(),
"new": new.to_string(),
"ref": ref_name,
});
let resp = self
.client
.post(&url)
.header(PROTOCOL_HEADER, PROTOCOL_VERSION.to_string())
.header(CAPABILITIES_HEADER, self.capabilities.serialize())
.header(reqwest::header::AUTHORIZATION, auth)
.json(&body)
.send()
.await?;
let status = resp.status();
if status == StatusCode::UNAUTHORIZED || status == StatusCode::FORBIDDEN {
return Err(ClientError::Auth(format!(
"advance_head rejected with {status}"
)));
}
if status == StatusCode::CONFLICT {
#[derive(Deserialize)]
struct CurrentBody {
current: Option<String>,
}
let bytes = resp.bytes().await.unwrap_or_default();
let actual = serde_json::from_slice::<CurrentBody>(&bytes)
.ok()
.and_then(|c| c.current)
.and_then(|s| Cid::parse_str(&s).ok())
.unwrap_or_else(|| old.clone());
return Err(ClientError::CasMismatch {
ref_name,
expected: old,
actual,
});
}
if !status.is_success() {
return Err(ClientError::Protocol(format!(
"advance_head: unexpected status {status}"
)));
}
let _ = new;
Ok(())
}
}
impl RemoteClient for HttpRemoteClient {
fn list_refs(&self) -> BoxFuture<'_, Result<RefsResponse, ClientError>> {
Box::pin(self.list_refs_impl())
}
fn fetch_blocks(
&self,
wants: Vec<Cid>,
have_set: BloomHaveSet,
) -> BoxFuture<'_, Result<Bytes, ClientError>> {
Box::pin(self.fetch_blocks_impl(wants, have_set))
}
fn push_blocks(&self, car_body: Bytes) -> BoxFuture<'_, Result<PushResponse, ClientError>> {
Box::pin(self.push_blocks_impl(car_body))
}
fn advance_head(
&self,
old: Cid,
new: Cid,
ref_name: String,
) -> BoxFuture<'_, Result<(), ClientError>> {
Box::pin(self.advance_head_impl(old, new, ref_name))
}
}
#[cfg(test)]
mod tests {
use super::*;
use httpmock::prelude::*;
#[tokio::test]
async fn list_refs_omits_authorization_header() {
let server = MockServer::start_async().await;
let mock = server
.mock_async(|when, then| {
when.method(GET)
.path("/remote/v1/refs")
.header_missing("authorization");
then.status(200)
.header("content-type", "application/json")
.body(r#"{"refs":{},"capabilities":["have-set-bloom","atomic-push"]}"#);
})
.await;
let cfg = RemoteConfig::new("origin", server.base_url())
.with_token(SecretToken::new("unit-test-token"));
let client = HttpRemoteClient::new(cfg);
let refs = client.list_refs_impl().await.expect("list_refs ok");
assert!(refs.capabilities.contains(&Capability::HaveSetBloom));
assert!(refs.capabilities.contains(&Capability::AtomicPush));
mock.assert_async().await;
}
#[tokio::test]
async fn negotiate_capabilities_intersects() {
let server = MockServer::start_async().await;
let _mock = server
.mock_async(|when, then| {
when.method(GET).path("/remote/v1/refs");
then.status(200)
.header("content-type", "application/json")
.body(r#"{"refs":{},"capabilities":["have-set-bloom","atomic-push"]}"#);
})
.await;
let cfg = RemoteConfig::new("origin", server.base_url())
.with_capability(Capability::HaveSetBloom)
.with_capability(Capability::PushNegotiate);
let mut client = HttpRemoteClient::new(cfg);
client.negotiate_capabilities().await.expect("negotiate ok");
let agreed = client.capabilities();
assert!(agreed.contains(Capability::HaveSetBloom));
assert!(!agreed.contains(Capability::AtomicPush));
assert!(!agreed.contains(Capability::PushNegotiate));
}
#[test]
fn bearer_header_includes_token_when_present() {
let cfg = RemoteConfig::new("origin", "https://example.com")
.with_token(SecretToken::new("tok-abc"));
let client = HttpRemoteClient::new(cfg);
assert_eq!(client.bearer_header().as_deref(), Some("Bearer tok-abc"));
}
#[test]
fn bearer_header_none_when_no_token() {
let cfg = RemoteConfig::new("origin", "https://example.com");
let client = HttpRemoteClient::new(cfg);
assert!(client.bearer_header().is_none());
}
}