1#![cfg(feature = "client")]
36#![allow(
37 clippy::missing_errors_doc,
38 clippy::module_name_repetitions,
39 clippy::too_long_first_doc_paragraph
40)]
41
42use std::future::Future;
43use std::pin::Pin;
44
45use bytes::Bytes;
46use mnem_core::id::Cid;
47use reqwest::{Client, StatusCode};
48use serde::{Deserialize, Serialize};
49
50use crate::error::ClientError;
51use crate::have_set::{BloomHaveSet, HaveSet};
52use crate::protocol::{
53 CAPABILITIES_HEADER, Capability, CapabilitySet, PROTOCOL_HEADER, PROTOCOL_VERSION,
54};
55use crate::remote::RemoteConfig;
56use crate::secret_token::SecretToken;
57
58type BoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + Send + 'a>>;
61
62pub trait RemoteClient: Send + Sync {
71 fn list_refs(&self) -> BoxFuture<'_, Result<RefsResponse, ClientError>>;
74
75 fn fetch_blocks(
79 &self,
80 wants: Vec<Cid>,
81 have_set: BloomHaveSet,
82 ) -> BoxFuture<'_, Result<Bytes, ClientError>>;
83
84 fn push_blocks(&self, car_body: Bytes) -> BoxFuture<'_, Result<PushResponse, ClientError>>;
87
88 fn advance_head(
92 &self,
93 old: Cid,
94 new: Cid,
95 ref_name: String,
96 ) -> BoxFuture<'_, Result<(), ClientError>>;
97}
98
99#[derive(Clone, Debug, Eq, PartialEq)]
117pub struct RefsResponse {
118 pub head: Option<Cid>,
123 pub refs: std::collections::BTreeMap<String, Cid>,
125 pub capabilities: Vec<Capability>,
130}
131
132#[derive(Debug, Deserialize)]
138struct RefsWireBody {
139 #[serde(default)]
140 head: Option<String>,
141 #[serde(default)]
142 refs: std::collections::BTreeMap<String, String>,
143 #[serde(default)]
144 capabilities: Vec<String>,
145}
146
147#[must_use]
153pub fn parse_wire_capabilities(raw: &[String]) -> Vec<Capability> {
154 let mut out: Vec<Capability> = raw
155 .iter()
156 .filter_map(|s| s.parse::<Capability>().ok())
157 .collect();
158 out.sort_by_key(Capability::as_wire_str);
161 out.dedup();
162 out
163}
164
165#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
169pub struct PushResponse {
170 pub accepted: u64,
172 pub root: Cid,
175}
176
177#[derive(Debug)]
180pub struct HttpRemoteClient {
181 client: Client,
182 base_url: String,
183 token: Option<SecretToken>,
184 capabilities: CapabilitySet,
188}
189
190impl HttpRemoteClient {
191 #[must_use]
197 pub fn new(cfg: RemoteConfig) -> Self {
198 let capabilities = if cfg.capabilities.is_empty() {
199 CapabilitySet::all_known()
200 } else {
201 CapabilitySet::with_caps(cfg.capabilities.iter().copied())
202 };
203 Self {
204 client: Client::new(),
205 base_url: cfg.url.trim_end_matches('/').to_owned(),
206 token: cfg.token,
207 capabilities,
208 }
209 }
210
211 pub async fn negotiate_capabilities(&mut self) -> Result<(), ClientError> {
216 let refs = self.list_refs_impl().await?;
217 let server_caps = CapabilitySet::with_caps(refs.capabilities.iter().copied());
218 self.capabilities = self.capabilities.intersect(&server_caps);
219 Ok(())
220 }
221
222 #[must_use]
227 pub const fn capabilities(&self) -> &CapabilitySet {
228 &self.capabilities
229 }
230
231 async fn list_refs_impl(&self) -> Result<RefsResponse, ClientError> {
234 let url = format!("{}/remote/v1/refs", self.base_url);
236 let req = self
237 .client
238 .get(&url)
239 .header(PROTOCOL_HEADER, PROTOCOL_VERSION.to_string())
240 .header(CAPABILITIES_HEADER, self.capabilities.serialize());
241 let resp = req.send().await?;
242 let status = resp.status();
243 if status == StatusCode::UNAUTHORIZED || status == StatusCode::FORBIDDEN {
244 return Err(ClientError::Auth(format!(
245 "list_refs rejected with {status}"
246 )));
247 }
248 if !status.is_success() {
249 return Err(ClientError::Protocol(format!(
250 "list_refs: unexpected status {status}"
251 )));
252 }
253 let body = resp.bytes().await?;
254 let wire: RefsWireBody = serde_json::from_slice(&body)?;
255 let head =
259 match wire.head {
260 None => None,
261 Some(ref s) => Some(Cid::parse_str(s).map_err(|e| {
262 ClientError::Protocol(format!("list_refs: invalid head CID: {e}"))
263 })?),
264 };
265 let mut refs = std::collections::BTreeMap::new();
266 for (name, cid_str) in wire.refs {
267 let cid = Cid::parse_str(&cid_str).map_err(|e| {
268 ClientError::Protocol(format!("list_refs: invalid CID for ref `{name}`: {e}"))
269 })?;
270 refs.insert(name, cid);
271 }
272 let capabilities = parse_wire_capabilities(&wire.capabilities);
273 Ok(RefsResponse {
274 head,
275 refs,
276 capabilities,
277 })
278 }
279
280 fn bearer_header(&self) -> Option<String> {
285 self.token
286 .as_ref()
287 .map(|t| format!("Bearer {}", t.expose()))
288 }
289}
290
291impl HttpRemoteClient {
292 async fn fetch_blocks_impl(
294 &self,
295 wants: Vec<Cid>,
296 have_set: BloomHaveSet,
297 ) -> Result<Bytes, ClientError> {
298 let url = format!("{}/remote/v1/fetch-blocks", self.base_url);
299 let wants_str: Vec<String> = wants.iter().map(Cid::to_string).collect();
300 let body = serde_json::json!({
301 "wants": wants_str,
302 "have_set": have_set.serialize(),
303 });
304 let resp = self
305 .client
306 .post(&url)
307 .header(PROTOCOL_HEADER, PROTOCOL_VERSION.to_string())
308 .header(CAPABILITIES_HEADER, self.capabilities.serialize())
309 .json(&body)
310 .send()
311 .await?;
312 let status = resp.status();
313 if status == StatusCode::UNAUTHORIZED || status == StatusCode::FORBIDDEN {
314 return Err(ClientError::Auth(format!(
315 "fetch_blocks rejected with {status}"
316 )));
317 }
318 if !status.is_success() {
319 return Err(ClientError::Protocol(format!(
320 "fetch_blocks: unexpected status {status}"
321 )));
322 }
323 let bytes = resp.bytes().await?;
324 Ok(bytes)
325 }
326
327 async fn push_blocks_impl(&self, car_body: Bytes) -> Result<PushResponse, ClientError> {
329 let url = format!("{}/remote/v1/push-blocks", self.base_url);
330 let auth = self
331 .bearer_header()
332 .ok_or_else(|| ClientError::Auth("push_blocks: no bearer token configured".into()))?;
333 let resp = self
334 .client
335 .post(&url)
336 .header(PROTOCOL_HEADER, PROTOCOL_VERSION.to_string())
337 .header(CAPABILITIES_HEADER, self.capabilities.serialize())
338 .header(reqwest::header::AUTHORIZATION, auth)
339 .header(reqwest::header::CONTENT_TYPE, "application/vnd.ipld.car")
340 .body(car_body)
341 .send()
342 .await?;
343 let status = resp.status();
344 if status == StatusCode::UNAUTHORIZED || status == StatusCode::FORBIDDEN {
345 return Err(ClientError::Auth(format!(
346 "push_blocks rejected with {status}"
347 )));
348 }
349 if !status.is_success() {
350 return Err(ClientError::Protocol(format!(
351 "push_blocks: unexpected status {status}"
352 )));
353 }
354 #[derive(Deserialize)]
360 struct Wire {
361 staged: Option<String>,
362 blocks_accepted: u64,
363 }
364 let body = resp.bytes().await?;
365 let wire: Wire = serde_json::from_slice(&body)?;
366 let root_str = wire.staged.ok_or_else(|| {
367 ClientError::Protocol("push_blocks: server returned null staged root".into())
368 })?;
369 let root = Cid::parse_str(&root_str).map_err(|e| {
370 ClientError::Protocol(format!("push_blocks: server staged root parse: {e}"))
371 })?;
372 Ok(PushResponse {
373 accepted: wire.blocks_accepted,
374 root,
375 })
376 }
377
378 async fn advance_head_impl(
381 &self,
382 old: Cid,
383 new: Cid,
384 ref_name: String,
385 ) -> Result<(), ClientError> {
386 let url = format!("{}/remote/v1/advance-head", self.base_url);
387 let auth = self
388 .bearer_header()
389 .ok_or_else(|| ClientError::Auth("advance_head: no bearer token configured".into()))?;
390 let body = serde_json::json!({
391 "old": old.to_string(),
392 "new": new.to_string(),
393 "ref": ref_name,
394 });
395 let resp = self
396 .client
397 .post(&url)
398 .header(PROTOCOL_HEADER, PROTOCOL_VERSION.to_string())
399 .header(CAPABILITIES_HEADER, self.capabilities.serialize())
400 .header(reqwest::header::AUTHORIZATION, auth)
401 .json(&body)
402 .send()
403 .await?;
404 let status = resp.status();
405 if status == StatusCode::UNAUTHORIZED || status == StatusCode::FORBIDDEN {
406 return Err(ClientError::Auth(format!(
407 "advance_head rejected with {status}"
408 )));
409 }
410 if status == StatusCode::CONFLICT {
411 #[derive(Deserialize)]
416 struct CurrentBody {
417 current: Option<String>,
418 }
419 let bytes = resp.bytes().await.unwrap_or_default();
420 let actual = serde_json::from_slice::<CurrentBody>(&bytes)
421 .ok()
422 .and_then(|c| c.current)
423 .and_then(|s| Cid::parse_str(&s).ok())
424 .unwrap_or_else(|| old.clone());
425 return Err(ClientError::CasMismatch {
426 ref_name,
427 expected: old,
428 actual,
429 });
430 }
431 if !status.is_success() {
432 return Err(ClientError::Protocol(format!(
433 "advance_head: unexpected status {status}"
434 )));
435 }
436 let _ = new;
437 Ok(())
438 }
439}
440
441impl RemoteClient for HttpRemoteClient {
442 fn list_refs(&self) -> BoxFuture<'_, Result<RefsResponse, ClientError>> {
443 Box::pin(self.list_refs_impl())
444 }
445
446 fn fetch_blocks(
447 &self,
448 wants: Vec<Cid>,
449 have_set: BloomHaveSet,
450 ) -> BoxFuture<'_, Result<Bytes, ClientError>> {
451 Box::pin(self.fetch_blocks_impl(wants, have_set))
452 }
453
454 fn push_blocks(&self, car_body: Bytes) -> BoxFuture<'_, Result<PushResponse, ClientError>> {
455 Box::pin(self.push_blocks_impl(car_body))
456 }
457
458 fn advance_head(
459 &self,
460 old: Cid,
461 new: Cid,
462 ref_name: String,
463 ) -> BoxFuture<'_, Result<(), ClientError>> {
464 Box::pin(self.advance_head_impl(old, new, ref_name))
465 }
466}
467
468#[cfg(test)]
469mod tests {
470 use super::*;
471 use httpmock::prelude::*;
472
473 #[tokio::test]
474 async fn list_refs_omits_authorization_header() {
475 let server = MockServer::start_async().await;
476 let mock = server
477 .mock_async(|when, then| {
478 when.method(GET)
481 .path("/remote/v1/refs")
482 .header_missing("authorization");
483 then.status(200)
484 .header("content-type", "application/json")
485 .body(r#"{"refs":{},"capabilities":["have-set-bloom","atomic-push"]}"#);
486 })
487 .await;
488
489 let cfg = RemoteConfig::new("origin", server.base_url())
490 .with_token(SecretToken::new("unit-test-token"));
491 let client = HttpRemoteClient::new(cfg);
492 let refs = client.list_refs_impl().await.expect("list_refs ok");
493 assert!(refs.capabilities.contains(&Capability::HaveSetBloom));
494 assert!(refs.capabilities.contains(&Capability::AtomicPush));
495 mock.assert_async().await;
496 }
497
498 #[tokio::test]
499 async fn negotiate_capabilities_intersects() {
500 let server = MockServer::start_async().await;
501 let _mock = server
502 .mock_async(|when, then| {
503 when.method(GET).path("/remote/v1/refs");
504 then.status(200)
505 .header("content-type", "application/json")
506 .body(r#"{"refs":{},"capabilities":["have-set-bloom","atomic-push"]}"#);
508 })
509 .await;
510
511 let cfg = RemoteConfig::new("origin", server.base_url())
513 .with_capability(Capability::HaveSetBloom)
514 .with_capability(Capability::PushNegotiate);
515 let mut client = HttpRemoteClient::new(cfg);
516 client.negotiate_capabilities().await.expect("negotiate ok");
517 let agreed = client.capabilities();
520 assert!(agreed.contains(Capability::HaveSetBloom));
521 assert!(!agreed.contains(Capability::AtomicPush));
522 assert!(!agreed.contains(Capability::PushNegotiate));
523 }
524
525 #[test]
526 fn bearer_header_includes_token_when_present() {
527 let cfg = RemoteConfig::new("origin", "https://example.com")
528 .with_token(SecretToken::new("tok-abc"));
529 let client = HttpRemoteClient::new(cfg);
530 assert_eq!(client.bearer_header().as_deref(), Some("Bearer tok-abc"));
531 }
532
533 #[test]
534 fn bearer_header_none_when_no_token() {
535 let cfg = RemoteConfig::new("origin", "https://example.com");
536 let client = HttpRemoteClient::new(cfg);
537 assert!(client.bearer_header().is_none());
538 }
539}