Skip to main content

mnem_transport/
client.rs

1//! Async HTTP `RemoteClient`.
2//!
3//! Behind the `client` feature so the default `mnem-transport` build
4//! stays WASM-clean (no tokio, no reqwest). This module ships the
5//! full four-verb surface: `list_refs` + capability negotiation, plus
6//! the CAR-body verbs `fetch_blocks`, `push_blocks`, `advance_head`
7//! (B3.3).
8//!
9//! ## Trait
10//!
11//! [`RemoteClient`] is the async surface mnem-cli / mnem http talk to.
12//! It is async-trait free: the associated-type-position `-> impl
13//! Future` shape needs `trait-return-impl-trait` which is stable but
14//! noisy on older toolchains; we use a concrete `Pin<Box<dyn
15//! Future>>` return shape and an inherent-method-per-verb pattern on
16//! [`HttpRemoteClient`] until async-fn-in-trait stabilises in the
17//! workspace MSRV.
18//!
19//! ## HTTP semantics (frozen here so B3.3 can fill bodies without
20//! drift)
21//!
22//! | Verb | Method + path | Auth required? | `mnem-protocol` | `mnem-capabilities` |
23//! |---|---|---|---|---|
24//! | `list_refs`    | `GET /remote/v1/refs`              | no  | 1 | advertised |
25//! | `fetch_blocks` | `POST /remote/v1/fetch-blocks`     | no  | 1 | agreed |
26//! | `push_blocks`  | `POST /remote/v1/push-blocks`      | yes | 1 | agreed |
27//! | `advance_head` | `POST /remote/v1/advance-head`     | yes | 1 | agreed |
28//!
29//! Bearer tokens are injected ONLY on push endpoints. The `Authorization`
30//! header never goes out on `list_refs` / `fetch_blocks`. This is a
31//! defence-in-depth choice: read-side requests MUST stay usable from
32//! unauthenticated contexts (caches, mirrors) and leaking a write
33//! token on a GET is a known mis-use of bearer auth.
34
35#![cfg(feature = "client")]
36#![allow(
37    clippy::missing_errors_doc,
38    clippy::module_name_repetitions,
39    clippy::too_long_first_doc_paragraph
40)]
41
42use std::future::Future;
43use std::pin::Pin;
44
45use bytes::Bytes;
46use mnem_core::id::Cid;
47use reqwest::{Client, StatusCode};
48use serde::{Deserialize, Serialize};
49
50use crate::error::ClientError;
51use crate::have_set::{BloomHaveSet, HaveSet};
52use crate::protocol::{
53    CAPABILITIES_HEADER, Capability, CapabilitySet, PROTOCOL_HEADER, PROTOCOL_VERSION,
54};
55use crate::remote::RemoteConfig;
56use crate::secret_token::SecretToken;
57
58/// Boxed-future alias so the trait methods stay object-safe on
59/// stable Rust without pulling in `async-trait`.
60type BoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + Send + 'a>>;
61
62/// Async surface mnem-cli / mnem http talk to when speaking to a
63/// remote peer. Every method corresponds to one of the four wire
64/// verbs documented in the module-level docs.
65///
66/// This trait is deliberately narrow: it carries only the bytes the
67/// caller needs and folds all error kinds into [`ClientError`]. The
68/// CAR body on the wire is a `Bytes` blob for now; streaming framing
69/// lands under B3.3 when the CAR framing module is spec-pinned.
70pub trait RemoteClient: Send + Sync {
71    /// `GET /remote/v1/refs` - enumerate the server's current refs
72    /// and the capability set it will negotiate against.
73    fn list_refs(&self) -> BoxFuture<'_, Result<RefsResponse, ClientError>>;
74
75    /// `POST /remote/v1/fetch-blocks` - request a CAR body containing
76    /// every block in `wants` and its transitive graph, minus
77    /// anything in `have_set`.
78    fn fetch_blocks(
79        &self,
80        wants: Vec<Cid>,
81        have_set: BloomHaveSet,
82    ) -> BoxFuture<'_, Result<Bytes, ClientError>>;
83
84    /// `POST /remote/v1/push-blocks` - upload a CAR body. Requires
85    /// the bearer token.
86    fn push_blocks(&self, car_body: Bytes) -> BoxFuture<'_, Result<PushResponse, ClientError>>;
87
88    /// `POST /remote/v1/advance-head` - atomic compare-and-swap on a
89    /// named ref. Requires the bearer token. Returns
90    /// [`ClientError::CasMismatch`] on 409.
91    fn advance_head(
92        &self,
93        old: Cid,
94        new: Cid,
95        ref_name: String,
96    ) -> BoxFuture<'_, Result<(), ClientError>>;
97}
98
99/// `GET /remote/v1/refs` response body.
100///
101/// The on-wire shape matches the server's [`RefsResponse`] in
102/// `crates/mnem-http/src/routes/remote.rs`:
103///
104/// ```json
105/// {
106///   "head": "bafy..." | null,
107///   "refs": { "HEAD": "bafy...", "main": "bafy..." },
108///   "capabilities": ["have-set-bloom", "atomic-push", ...]
109/// }
110/// ```
111///
112/// The client deserialises via a private DTO (`RefsWireBody`) so
113/// invalid CIDs become a `Protocol` error instead of a `Deserialize`
114/// panic, and unknown capability strings are silently dropped via
115/// [`parse_wire_capabilities`].
116#[derive(Clone, Debug, Eq, PartialEq)]
117pub struct RefsResponse {
118    /// Canonical default-branch head, or `None` on a fresh server.
119    /// Mirrored under the `"HEAD"` key of `refs` when present; this
120    /// top-level field exists so clients that only care about the
121    /// default branch can index a single well-known slot.
122    pub head: Option<Cid>,
123    /// Map of ref name -> current head CID, as the server sees it.
124    pub refs: std::collections::BTreeMap<String, Cid>,
125    /// Capabilities the server is willing to speak for this session.
126    /// The client intersects with its own advertisement; the result
127    /// is the agreed set. Any unknown capability strings the server
128    /// advertised are dropped here - see [`parse_wire_capabilities`].
129    pub capabilities: Vec<Capability>,
130}
131
132/// Private on-wire DTO for `GET /remote/v1/refs`. Kept separate from
133/// the public [`RefsResponse`] so we can (a) tolerate unknown
134/// capability strings without failing the deserialise, and (b)
135/// surface invalid CIDs as a `Protocol` error rather than a generic
136/// deserialise failure.
137#[derive(Debug, Deserialize)]
138struct RefsWireBody {
139    #[serde(default)]
140    head: Option<String>,
141    #[serde(default)]
142    refs: std::collections::BTreeMap<String, String>,
143    #[serde(default)]
144    capabilities: Vec<String>,
145}
146
147/// Parse a slice of wire-form capability strings into `Capability`s,
148/// silently dropping any value unknown to this build. This is the
149/// forward-compat policy requires: the server may advertise
150/// capabilities added in a minor release; older clients MUST ignore
151/// them rather than failing the handshake.
152#[must_use]
153pub fn parse_wire_capabilities(raw: &[String]) -> Vec<Capability> {
154    let mut out: Vec<Capability> = raw
155        .iter()
156        .filter_map(|s| s.parse::<Capability>().ok())
157        .collect();
158    // Sort-dedupe for determinism; the server already emits sorted
159    // but nothing in the wire contract guarantees it.
160    out.sort_by_key(Capability::as_wire_str);
161    out.dedup();
162    out
163}
164
165/// `POST /remote/v1/push-blocks` response body. The server echoes
166/// the CID count it accepted; the client cross-checks against the
167/// CAR roots it sent.
168#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
169pub struct PushResponse {
170    /// Number of blocks the server stored from this push.
171    pub accepted: u64,
172    /// The CID of the root block after the push, echoed for the
173    /// caller's optimistic cache invalidation.
174    pub root: Cid,
175}
176
177/// Reference HTTP [`RemoteClient`] implementation. A single instance
178/// is tied to exactly one [`RemoteConfig`] and one base URL.
179#[derive(Debug)]
180pub struct HttpRemoteClient {
181    client: Client,
182    base_url: String,
183    token: Option<SecretToken>,
184    /// Capabilities advertised by the local peer. After a successful
185    /// [`Self::negotiate_capabilities`] this is narrowed to the
186    /// intersection with the server's advertised set.
187    capabilities: CapabilitySet,
188}
189
190impl HttpRemoteClient {
191    /// Build a client from a parsed [`RemoteConfig`]. A fresh
192    /// [`reqwest::Client`] is created per call; callers who want to
193    /// pool connections across remotes should build one client
194    /// explicitly and share it via the future `with_client`
195    /// constructor (deferred, tracked in B3.3).
196    #[must_use]
197    pub fn new(cfg: RemoteConfig) -> Self {
198        let capabilities = if cfg.capabilities.is_empty() {
199            CapabilitySet::all_known()
200        } else {
201            CapabilitySet::with_caps(cfg.capabilities.iter().copied())
202        };
203        Self {
204            client: Client::new(),
205            base_url: cfg.url.trim_end_matches('/').to_owned(),
206            token: cfg.token,
207            capabilities,
208        }
209    }
210
211    /// Negotiate capabilities with the remote by calling `list_refs`
212    /// and intersecting the server's advertised set with the local
213    /// one. After this call, [`Self::capabilities`] returns the
214    /// agreed set that all subsequent verbs must operate under.
215    pub async fn negotiate_capabilities(&mut self) -> Result<(), ClientError> {
216        let refs = self.list_refs_impl().await?;
217        let server_caps = CapabilitySet::with_caps(refs.capabilities.iter().copied());
218        self.capabilities = self.capabilities.intersect(&server_caps);
219        Ok(())
220    }
221
222    /// The agreed-upon capability set. Before
223    /// [`Self::negotiate_capabilities`] has been called this is the
224    /// local advertisement; after, it's the intersection with the
225    /// server.
226    #[must_use]
227    pub const fn capabilities(&self) -> &CapabilitySet {
228        &self.capabilities
229    }
230
231    // -- inherent impls per verb -----------------------------------------
232
233    async fn list_refs_impl(&self) -> Result<RefsResponse, ClientError> {
234        // `list_refs` is read-side: token MUST NOT be attached.
235        let url = format!("{}/remote/v1/refs", self.base_url);
236        let req = self
237            .client
238            .get(&url)
239            .header(PROTOCOL_HEADER, PROTOCOL_VERSION.to_string())
240            .header(CAPABILITIES_HEADER, self.capabilities.serialize());
241        let resp = req.send().await?;
242        let status = resp.status();
243        if status == StatusCode::UNAUTHORIZED || status == StatusCode::FORBIDDEN {
244            return Err(ClientError::Auth(format!(
245                "list_refs rejected with {status}"
246            )));
247        }
248        if !status.is_success() {
249            return Err(ClientError::Protocol(format!(
250                "list_refs: unexpected status {status}"
251            )));
252        }
253        let body = resp.bytes().await?;
254        let wire: RefsWireBody = serde_json::from_slice(&body)?;
255        // Parse the wire strings into strongly-typed values. Invalid
256        // CIDs are a protocol error; unknown capability strings are
257        // dropped forward-compat rules.
258        let head =
259            match wire.head {
260                None => None,
261                Some(ref s) => Some(Cid::parse_str(s).map_err(|e| {
262                    ClientError::Protocol(format!("list_refs: invalid head CID: {e}"))
263                })?),
264            };
265        let mut refs = std::collections::BTreeMap::new();
266        for (name, cid_str) in wire.refs {
267            let cid = Cid::parse_str(&cid_str).map_err(|e| {
268                ClientError::Protocol(format!("list_refs: invalid CID for ref `{name}`: {e}"))
269            })?;
270            refs.insert(name, cid);
271        }
272        let capabilities = parse_wire_capabilities(&wire.capabilities);
273        Ok(RefsResponse {
274            head,
275            refs,
276            capabilities,
277        })
278    }
279
280    /// Build the bearer-auth `Authorization` header value, if a
281    /// token is configured. Returns `None` when the client holds no
282    /// token; callers MUST NOT fall through to an unauthenticated
283    /// push in that case.
284    fn bearer_header(&self) -> Option<String> {
285        self.token
286            .as_ref()
287            .map(|t| format!("Bearer {}", t.expose()))
288    }
289}
290
291impl HttpRemoteClient {
292    /// `POST /remote/v1/fetch-blocks`. Read-side; no bearer.
293    async fn fetch_blocks_impl(
294        &self,
295        wants: Vec<Cid>,
296        have_set: BloomHaveSet,
297    ) -> Result<Bytes, ClientError> {
298        let url = format!("{}/remote/v1/fetch-blocks", self.base_url);
299        let wants_str: Vec<String> = wants.iter().map(Cid::to_string).collect();
300        let body = serde_json::json!({
301            "wants": wants_str,
302            "have_set": have_set.serialize(),
303        });
304        let resp = self
305            .client
306            .post(&url)
307            .header(PROTOCOL_HEADER, PROTOCOL_VERSION.to_string())
308            .header(CAPABILITIES_HEADER, self.capabilities.serialize())
309            .json(&body)
310            .send()
311            .await?;
312        let status = resp.status();
313        if status == StatusCode::UNAUTHORIZED || status == StatusCode::FORBIDDEN {
314            return Err(ClientError::Auth(format!(
315                "fetch_blocks rejected with {status}"
316            )));
317        }
318        if !status.is_success() {
319            return Err(ClientError::Protocol(format!(
320                "fetch_blocks: unexpected status {status}"
321            )));
322        }
323        let bytes = resp.bytes().await?;
324        Ok(bytes)
325    }
326
327    /// `POST /remote/v1/push-blocks`. Bearer-required.
328    async fn push_blocks_impl(&self, car_body: Bytes) -> Result<PushResponse, ClientError> {
329        let url = format!("{}/remote/v1/push-blocks", self.base_url);
330        let auth = self
331            .bearer_header()
332            .ok_or_else(|| ClientError::Auth("push_blocks: no bearer token configured".into()))?;
333        let resp = self
334            .client
335            .post(&url)
336            .header(PROTOCOL_HEADER, PROTOCOL_VERSION.to_string())
337            .header(CAPABILITIES_HEADER, self.capabilities.serialize())
338            .header(reqwest::header::AUTHORIZATION, auth)
339            .header(reqwest::header::CONTENT_TYPE, "application/vnd.ipld.car")
340            .body(car_body)
341            .send()
342            .await?;
343        let status = resp.status();
344        if status == StatusCode::UNAUTHORIZED || status == StatusCode::FORBIDDEN {
345            return Err(ClientError::Auth(format!(
346                "push_blocks rejected with {status}"
347            )));
348        }
349        if !status.is_success() {
350            return Err(ClientError::Protocol(format!(
351                "push_blocks: unexpected status {status}"
352            )));
353        }
354        // Server response carries `{staged, blocks_accepted}`; we
355        // project into `PushResponse { accepted, root }` for the
356        // client shape. `staged` is optional server-side but the
357        // importer rejects empty-root CARs, so in practice it's
358        // always present.
359        #[derive(Deserialize)]
360        struct Wire {
361            staged: Option<String>,
362            blocks_accepted: u64,
363        }
364        let body = resp.bytes().await?;
365        let wire: Wire = serde_json::from_slice(&body)?;
366        let root_str = wire.staged.ok_or_else(|| {
367            ClientError::Protocol("push_blocks: server returned null staged root".into())
368        })?;
369        let root = Cid::parse_str(&root_str).map_err(|e| {
370            ClientError::Protocol(format!("push_blocks: server staged root parse: {e}"))
371        })?;
372        Ok(PushResponse {
373            accepted: wire.blocks_accepted,
374            root,
375        })
376    }
377
378    /// `POST /remote/v1/advance-head`. Bearer-required. Maps 409 to
379    /// [`ClientError::CasMismatch`].
380    async fn advance_head_impl(
381        &self,
382        old: Cid,
383        new: Cid,
384        ref_name: String,
385    ) -> Result<(), ClientError> {
386        let url = format!("{}/remote/v1/advance-head", self.base_url);
387        let auth = self
388            .bearer_header()
389            .ok_or_else(|| ClientError::Auth("advance_head: no bearer token configured".into()))?;
390        let body = serde_json::json!({
391            "old": old.to_string(),
392            "new": new.to_string(),
393            "ref": ref_name,
394        });
395        let resp = self
396            .client
397            .post(&url)
398            .header(PROTOCOL_HEADER, PROTOCOL_VERSION.to_string())
399            .header(CAPABILITIES_HEADER, self.capabilities.serialize())
400            .header(reqwest::header::AUTHORIZATION, auth)
401            .json(&body)
402            .send()
403            .await?;
404        let status = resp.status();
405        if status == StatusCode::UNAUTHORIZED || status == StatusCode::FORBIDDEN {
406            return Err(ClientError::Auth(format!(
407                "advance_head rejected with {status}"
408            )));
409        }
410        if status == StatusCode::CONFLICT {
411            // Server replies with `{current: <cid>}` on CAS
412            // mismatch. If parsing fails we still surface a
413            // mismatch with the client's `old` echoed back, since
414            // that is what the caller needs to retry.
415            #[derive(Deserialize)]
416            struct CurrentBody {
417                current: Option<String>,
418            }
419            let bytes = resp.bytes().await.unwrap_or_default();
420            let actual = serde_json::from_slice::<CurrentBody>(&bytes)
421                .ok()
422                .and_then(|c| c.current)
423                .and_then(|s| Cid::parse_str(&s).ok())
424                .unwrap_or_else(|| old.clone());
425            return Err(ClientError::CasMismatch {
426                ref_name,
427                expected: old,
428                actual,
429            });
430        }
431        if !status.is_success() {
432            return Err(ClientError::Protocol(format!(
433                "advance_head: unexpected status {status}"
434            )));
435        }
436        let _ = new;
437        Ok(())
438    }
439}
440
441impl RemoteClient for HttpRemoteClient {
442    fn list_refs(&self) -> BoxFuture<'_, Result<RefsResponse, ClientError>> {
443        Box::pin(self.list_refs_impl())
444    }
445
446    fn fetch_blocks(
447        &self,
448        wants: Vec<Cid>,
449        have_set: BloomHaveSet,
450    ) -> BoxFuture<'_, Result<Bytes, ClientError>> {
451        Box::pin(self.fetch_blocks_impl(wants, have_set))
452    }
453
454    fn push_blocks(&self, car_body: Bytes) -> BoxFuture<'_, Result<PushResponse, ClientError>> {
455        Box::pin(self.push_blocks_impl(car_body))
456    }
457
458    fn advance_head(
459        &self,
460        old: Cid,
461        new: Cid,
462        ref_name: String,
463    ) -> BoxFuture<'_, Result<(), ClientError>> {
464        Box::pin(self.advance_head_impl(old, new, ref_name))
465    }
466}
467
468#[cfg(test)]
469mod tests {
470    use super::*;
471    use httpmock::prelude::*;
472
473    #[tokio::test]
474    async fn list_refs_omits_authorization_header() {
475        let server = MockServer::start_async().await;
476        let mock = server
477            .mock_async(|when, then| {
478                // Authorization MUST NOT be present on
479                // list_refs: read-side verb, no token.
480                when.method(GET)
481                    .path("/remote/v1/refs")
482                    .header_missing("authorization");
483                then.status(200)
484                    .header("content-type", "application/json")
485                    .body(r#"{"refs":{},"capabilities":["have-set-bloom","atomic-push"]}"#);
486            })
487            .await;
488
489        let cfg = RemoteConfig::new("origin", server.base_url())
490            .with_token(SecretToken::new("unit-test-token"));
491        let client = HttpRemoteClient::new(cfg);
492        let refs = client.list_refs_impl().await.expect("list_refs ok");
493        assert!(refs.capabilities.contains(&Capability::HaveSetBloom));
494        assert!(refs.capabilities.contains(&Capability::AtomicPush));
495        mock.assert_async().await;
496    }
497
498    #[tokio::test]
499    async fn negotiate_capabilities_intersects() {
500        let server = MockServer::start_async().await;
501        let _mock = server
502            .mock_async(|when, then| {
503                when.method(GET).path("/remote/v1/refs");
504                then.status(200)
505                    .header("content-type", "application/json")
506                    // Server knows have-set-bloom + atomic-push.
507                    .body(r#"{"refs":{},"capabilities":["have-set-bloom","atomic-push"]}"#);
508            })
509            .await;
510
511        // Local advertises have-set-bloom + push-negotiate.
512        let cfg = RemoteConfig::new("origin", server.base_url())
513            .with_capability(Capability::HaveSetBloom)
514            .with_capability(Capability::PushNegotiate);
515        let mut client = HttpRemoteClient::new(cfg);
516        client.negotiate_capabilities().await.expect("negotiate ok");
517        // Intersection is {have-set-bloom}. atomic-push was
518        // server-only, push-negotiate was client-only.
519        let agreed = client.capabilities();
520        assert!(agreed.contains(Capability::HaveSetBloom));
521        assert!(!agreed.contains(Capability::AtomicPush));
522        assert!(!agreed.contains(Capability::PushNegotiate));
523    }
524
525    #[test]
526    fn bearer_header_includes_token_when_present() {
527        let cfg = RemoteConfig::new("origin", "https://example.com")
528            .with_token(SecretToken::new("tok-abc"));
529        let client = HttpRemoteClient::new(cfg);
530        assert_eq!(client.bearer_header().as_deref(), Some("Bearer tok-abc"));
531    }
532
533    #[test]
534    fn bearer_header_none_when_no_token() {
535        let cfg = RemoteConfig::new("origin", "https://example.com");
536        let client = HttpRemoteClient::new(cfg);
537        assert!(client.bearer_header().is_none());
538    }
539}