1use std::str::FromStr;
2
3use crate::{
4 api::{route::RouteUrl, v0alpha1, v0alpha2, StatusCode},
5 error::NoosphereError,
6 stream::{from_car_stream, memo_history_stream, put_block_stream, to_car_stream},
7};
8use anyhow::{anyhow, Result};
9use async_stream::try_stream;
10use bytes::Bytes;
11use cid::Cid;
12use iroh_car::CarReader;
13use libipld_cbor::DagCborCodec;
14use noosphere_common::{ConditionalSend, ConditionalSync, UnsharedStream};
15
16use crate::{
17 authority::{generate_capability, Author, SphereAbility, SphereReference},
18 data::{Link, MemoIpld},
19};
20use noosphere_storage::{block_deserialize, block_serialize, BlockStore};
21use reqwest::header::HeaderMap;
22use tokio_stream::{Stream, StreamExt};
23use tokio_util::io::StreamReader;
24use ucan::{
25 builder::UcanBuilder,
26 capability::CapabilityView,
27 crypto::{did::DidParser, KeyMaterial},
28 store::{UcanJwtStore, UcanStore},
29 ucan::Ucan,
30};
31use url::Url;
32
33#[cfg(doc)]
34use crate::data::Did;
35
36use super::v0alpha1::ReplicationMode;
37
38pub struct Client<K, S>
43where
44 K: KeyMaterial + Clone + 'static,
45 S: UcanStore + BlockStore + 'static,
46{
47 pub session: v0alpha1::IdentifyResponse,
50
51 pub sphere_identity: String,
53
54 pub api_base: Url,
56
57 pub author: Author<K>,
59
60 pub store: S,
62
63 client: reqwest::Client,
64}
65
66impl<K, S> Client<K, S>
67where
68 K: KeyMaterial + Clone + 'static,
69 S: UcanStore + BlockStore + 'static,
70{
71 pub async fn identify(
78 sphere_identity: &str,
79 api_base: &Url,
80 author: &Author<K>,
81 did_parser: &mut DidParser,
82 store: S,
83 ) -> Result<Client<K, S>> {
84 debug!("Initializing Noosphere API client");
85 debug!("Client represents sphere {}", sphere_identity);
86 debug!("Client targetting API at {}", api_base);
87
88 let client = reqwest::Client::new();
89
90 let did_response = {
91 let mut url = api_base.clone();
92 url.set_path(&v0alpha1::Route::Did.to_string());
93 client.get(url).send().await?
94 };
95
96 match translate_status_code(did_response.status())? {
97 StatusCode::OK => (),
98 _ => return Err(anyhow!("Unable to look up gateway identity")),
99 };
100
101 let gateway_identity = did_response.text().await?;
102
103 let mut url = api_base.clone();
104 url.set_path(&v0alpha1::Route::Identify.to_string());
105
106 let (jwt, ucan_headers) = Self::make_bearer_token(
107 &gateway_identity,
108 author,
109 &generate_capability(sphere_identity, SphereAbility::Fetch),
110 &store,
111 )
112 .await?;
113
114 let identify_response: v0alpha1::IdentifyResponse = client
115 .get(url)
116 .bearer_auth(jwt)
117 .headers(ucan_headers)
118 .send()
119 .await?
120 .json()
121 .await?;
122
123 identify_response.verify(did_parser, &store).await?;
124
125 debug!(
126 "Handshake succeeded with gateway {}",
127 identify_response.gateway_identity
128 );
129
130 Ok(Client {
131 session: identify_response,
132 sphere_identity: sphere_identity.into(),
133 api_base: api_base.clone(),
134 author: author.clone(),
135 store,
136 client,
137 })
138 }
139
140 async fn make_bearer_token(
141 gateway_identity: &str,
142 author: &Author<K>,
143 capability: &CapabilityView<SphereReference, SphereAbility>,
144 store: &S,
145 ) -> Result<(String, HeaderMap)> {
146 let mut signable = UcanBuilder::default()
147 .issued_by(&author.key)
148 .for_audience(gateway_identity)
149 .with_lifetime(120)
150 .claiming_capability(capability)
151 .with_nonce()
152 .build()?;
153
154 let mut ucan_headers = HeaderMap::new();
155
156 let authorization = author.require_authorization()?;
157 let authorization_cid = Cid::try_from(authorization)?;
158
159 match authorization.as_ucan(store).await {
160 Ok(ucan) => {
161 if let Some(ucan_proofs) = ucan.proofs() {
162 let mut proofs_to_search: Vec<String> = ucan_proofs.clone();
164
165 debug!("Making bearer token... {:?}", proofs_to_search);
166
167 while let Some(cid_string) = proofs_to_search.pop() {
168 let cid = Cid::from_str(cid_string.as_str())?;
169 let jwt = store.require_token(&cid).await?;
170 let ucan = Ucan::from_str(&jwt)?;
171
172 debug!("Adding UCAN header for {}", cid);
173
174 if let Some(ucan_proofs) = ucan.proofs() {
175 proofs_to_search.extend(ucan_proofs.clone().into_iter());
176 }
177
178 ucan_headers.append("ucan", format!("{cid} {jwt}").parse()?);
179 }
180 }
181
182 ucan_headers.append(
183 "ucan",
184 format!("{} {}", authorization_cid, ucan.encode()?).parse()?,
185 );
186 }
187 _ => {
188 debug!(
189 "Unable to resolve authorization to a UCAN; it will be used as a blind proof"
190 )
191 }
192 };
193
194 signable
196 .proofs
197 .push(Cid::try_from(authorization)?.to_string());
198
199 let jwt = signable.sign().await?.encode()?;
200
201 Ok((jwt, ucan_headers))
207 }
208
209 pub async fn replicate<R>(
224 &self,
225 mode: R,
226 params: Option<&v0alpha1::ReplicateParameters>,
227 ) -> Result<(Cid, impl Stream<Item = Result<(Cid, Vec<u8>)>>)>
228 where
229 R: Into<ReplicationMode>,
230 {
231 let mode: ReplicationMode = mode.into();
232 let url = Url::try_from(RouteUrl(
233 &self.api_base,
234 v0alpha1::Route::Replicate(Some(mode.clone())),
235 params,
236 ))?;
237
238 debug!("Client replicating {} from {}", mode, url);
239
240 let capability = generate_capability(&self.sphere_identity, SphereAbility::Fetch);
241
242 let (token, ucan_headers) = Self::make_bearer_token(
243 &self.session.gateway_identity,
244 &self.author,
245 &capability,
246 &self.store,
247 )
248 .await?;
249
250 let response = self
251 .client
252 .get(url)
253 .bearer_auth(token)
254 .headers(ucan_headers)
255 .send()
256 .await?;
257
258 let reader = CarReader::new(StreamReader::new(response.bytes_stream().map(
259 |item| match item {
260 Ok(item) => Ok(item),
261 Err(error) => {
262 error!("Failed to read CAR stream: {}", error);
263 Err(std::io::Error::from(std::io::ErrorKind::BrokenPipe))
264 }
265 },
266 )))
267 .await?;
268
269 let root = reader.header().roots().first().cloned().ok_or_else(|| {
270 anyhow!(NoosphereError::UnexpectedGatewayResponse(
271 "Missing replication root".into()
272 ))
273 })?;
274
275 Ok((
276 root,
277 reader.stream().map(|block| match block {
278 Ok(block) => Ok(block),
279 Err(error) => Err(anyhow!(NoosphereError::UnexpectedGatewayResponse(format!(
280 "Replication stream ended prematurely: {}",
281 error
282 )))),
283 }),
284 ))
285 }
286
287 pub async fn fetch(
291 &self,
292 params: &v0alpha1::FetchParameters,
293 ) -> Result<Option<(Link<MemoIpld>, impl Stream<Item = Result<(Cid, Vec<u8>)>>)>> {
294 let url = Url::try_from(RouteUrl(
295 &self.api_base,
296 v0alpha1::Route::Fetch,
297 Some(params),
298 ))?;
299
300 debug!("Client fetching blocks from {}", url);
301
302 let capability = generate_capability(&self.sphere_identity, SphereAbility::Fetch);
303 let (token, ucan_headers) = Self::make_bearer_token(
304 &self.session.gateway_identity,
305 &self.author,
306 &capability,
307 &self.store,
308 )
309 .await?;
310
311 let response = self
312 .client
313 .get(url)
314 .bearer_auth(token)
315 .headers(ucan_headers)
316 .send()
317 .await?;
318
319 let reader = CarReader::new(StreamReader::new(response.bytes_stream().map(
320 |item| match item {
321 Ok(item) => Ok(item),
322 Err(error) => {
323 error!("Failed to read CAR stream: {}", error);
324 Err(std::io::Error::from(std::io::ErrorKind::BrokenPipe))
325 }
326 },
327 )))
328 .await?;
329
330 let tip = reader.header().roots().first().cloned();
331
332 if let Some(tip) = tip {
333 Ok(match tip.codec() {
334 0 => None,
336 _ => Some((
337 tip.into(),
338 reader.stream().map(|block| match block {
339 Ok(block) => Ok(block),
340 Err(error) => Err(anyhow!(error)),
341 }),
342 )),
343 })
344 } else {
345 Ok(None)
346 }
347 }
348
349 fn make_push_request_stream(
350 store: S,
351 push_body: v0alpha2::PushBody,
352 ) -> impl Stream<Item = Result<Bytes, std::io::Error>> + ConditionalSync + 'static {
353 let root = push_body.local_tip.into();
354 trace!("Creating push stream...");
355
356 let block_stream = try_stream! {
357
358 let history_stream = memo_history_stream(
359 store,
360 &push_body.local_tip,
361 push_body.local_base.as_ref(),
362 true
363 );
364
365 yield block_serialize::<DagCborCodec, _>(push_body)?;
366
367 for await item in history_stream {
368 yield item?;
369 };
370 };
371
372 UnsharedStream::new(Box::pin(to_car_stream(vec![root], block_stream)))
377 }
378
379 #[cfg(target_arch = "wasm32")]
380 async fn make_push_request(
381 &self,
382 url: Url,
383 ucan_headers: HeaderMap,
384 token: &str,
385 push_body: &v0alpha2::PushBody,
386 ) -> Result<impl Stream<Item = Result<(Cid, Vec<u8>)>> + ConditionalSend, v0alpha2::PushError>
387 {
388 use gloo_net::http::Headers;
394 use gloo_net::http::Method;
395 use gloo_net::http::RequestBuilder;
396 use js_sys::{JsString, Uint8Array};
397 use wasm_bindgen::JsValue;
398 use wasm_streams::ReadableStream;
399
400 let headers = Headers::new();
401 headers.append("Authorization", &format!("Bearer {}", token));
402
403 for (name, value) in ucan_headers {
404 if let (Some(name), Ok(value)) = (name, value.to_str()) {
405 headers.append(name.as_str(), value);
406 }
407 }
408
409 let stream = Self::make_push_request_stream(self.store.clone(), push_body.clone());
410
411 let readable_stream = ReadableStream::from_stream(stream.map(|result| match result {
412 Ok(bytes) => Ok(JsValue::from(Uint8Array::from(bytes.as_ref()))),
413 Err(error) => Err(JsValue::from(JsString::from(error.to_string()))),
414 }));
415
416 let request = RequestBuilder::new(url.as_str())
417 .method(Method::PUT)
418 .headers(headers)
419 .body(JsValue::from(readable_stream.as_raw()))
420 .map_err(|error| v0alpha2::PushError::Internal(Some(error.to_string())))?;
421
422 let response = request.send().await.map_err(|error| {
423 warn!("Push request failed: {}", error);
424 v0alpha2::PushError::BrokenUpstream
425 })?;
426
427 let body_stream = response
428 .body()
429 .ok_or_else(|| v0alpha2::PushError::UnexpectedBody)?;
430 let body_stream = ReadableStream::from_raw(wasm_bindgen::JsCast::unchecked_into::<
431 wasm_streams::readable::sys::ReadableStream,
432 >(JsValue::from(body_stream)));
433
434 let car_stream = body_stream.into_stream().map(|result| match result {
435 Ok(value) => match Uint8Array::try_from(value) {
436 Ok(array) => Ok(Bytes::from(array.to_vec())),
437 Err(_) => Err(std::io::Error::new(
438 std::io::ErrorKind::Other,
439 v0alpha2::PushError::UnexpectedBody,
440 )),
441 },
442 Err(error) => Err(std::io::Error::new(
443 std::io::ErrorKind::Other,
444 error.as_string().unwrap_or_default(),
445 )),
446 });
447
448 Ok(from_car_stream(car_stream))
449 }
450
451 #[cfg(not(target_arch = "wasm32"))]
452 async fn make_push_request(
453 &self,
454 url: Url,
455 ucan_headers: HeaderMap,
456 token: &str,
457 push_body: &v0alpha2::PushBody,
458 ) -> Result<impl Stream<Item = Result<(Cid, Vec<u8>)>> + ConditionalSend, v0alpha2::PushError>
459 {
460 use reqwest::Body;
461
462 let stream = Self::make_push_request_stream(self.store.clone(), push_body.clone());
463
464 let response = self
465 .client
466 .put(url)
467 .bearer_auth(token)
468 .headers(ucan_headers)
469 .header("Content-Type", "application/octet-stream")
470 .body(Body::wrap_stream(stream))
471 .send()
472 .await
473 .map_err(|error| {
474 warn!("Push request failed: {}", error);
475 v0alpha2::PushError::BrokenUpstream
476 })?;
477
478 let status = translate_status_code(response.status())?;
479 trace!("Checking response ({})...", status);
480
481 if status == StatusCode::CONFLICT {
482 return Err(v0alpha2::PushError::Conflict);
483 }
484
485 trace!("Fielding response...");
486
487 Ok(from_car_stream(response.bytes_stream()))
488 }
489
490 pub async fn push(
492 &self,
493 push_body: &v0alpha2::PushBody,
494 ) -> Result<v0alpha2::PushResponse, v0alpha2::PushError> {
495 let url = Url::try_from(RouteUrl::<_, ()>(
496 &self.api_base,
497 v0alpha2::Route::Push,
498 None,
499 ))?;
500 debug!(
501 "Client pushing changes for sphere {} to {}",
502 push_body.sphere, url
503 );
504 let capability = generate_capability(&self.sphere_identity, SphereAbility::Push);
505 let (token, ucan_headers) = Self::make_bearer_token(
506 &self.session.gateway_identity,
507 &self.author,
508 &capability,
509 &self.store,
510 )
511 .await?;
512
513 let block_stream = self
514 .make_push_request(url, ucan_headers, &token, push_body)
515 .await?;
516
517 tokio::pin!(block_stream);
518
519 let push_response = match block_stream.try_next().await? {
520 Some((_, bytes)) => block_deserialize::<DagCborCodec, v0alpha2::PushResponse>(&bytes)?,
521 _ => return Err(v0alpha2::PushError::UnexpectedBody),
522 };
523
524 put_block_stream(self.store.clone(), block_stream)
525 .await
526 .map_err(|error| {
527 warn!("Failed to store blocks from gateway: {}", error);
528 v0alpha2::PushError::BrokenDownstream
529 })?;
530
531 Ok(push_response)
532 }
533}
534
535fn translate_status_code(reqwest_code: reqwest::StatusCode) -> Result<StatusCode> {
550 let code: u16 = reqwest_code.into();
551 Ok(code.try_into()?)
552}