noosphere_core/api/
client.rs

1use std::str::FromStr;
2
3use crate::{
4    api::{route::RouteUrl, v0alpha1, v0alpha2, StatusCode},
5    error::NoosphereError,
6    stream::{from_car_stream, memo_history_stream, put_block_stream, to_car_stream},
7};
8use anyhow::{anyhow, Result};
9use async_stream::try_stream;
10use bytes::Bytes;
11use cid::Cid;
12use iroh_car::CarReader;
13use libipld_cbor::DagCborCodec;
14use noosphere_common::{ConditionalSend, ConditionalSync, UnsharedStream};
15
16use crate::{
17    authority::{generate_capability, Author, SphereAbility, SphereReference},
18    data::{Link, MemoIpld},
19};
20use noosphere_storage::{block_deserialize, block_serialize, BlockStore};
21use reqwest::header::HeaderMap;
22use tokio_stream::{Stream, StreamExt};
23use tokio_util::io::StreamReader;
24use ucan::{
25    builder::UcanBuilder,
26    capability::CapabilityView,
27    crypto::{did::DidParser, KeyMaterial},
28    store::{UcanJwtStore, UcanStore},
29    ucan::Ucan,
30};
31use url::Url;
32
33#[cfg(doc)]
34use crate::data::Did;
35
36use super::v0alpha1::ReplicationMode;
37
38/// A [Client] is a simple, portable HTTP client for the Noosphere gateway REST
39/// API. It embodies the intended usage of the REST API, which includes an
40/// opening handshake (with associated key verification) and various
41/// UCAN-authorized verbs over sphere data.
42pub struct Client<K, S>
43where
44    K: KeyMaterial + Clone + 'static,
45    S: UcanStore + BlockStore + 'static,
46{
47    /// The [v0alpha1::IdentifyResponse] that was received from the gateway when
48    /// the [Client] was initialized
49    pub session: v0alpha1::IdentifyResponse,
50
51    /// The [Did] of the sphere represented by this [Client]
52    pub sphere_identity: String,
53
54    /// The [Url] for the gateway API being used by this [Client]
55    pub api_base: Url,
56
57    /// The [Author] that is wielding this [Client] to interact with the gateway API
58    pub author: Author<K>,
59
60    /// The backing [BlockStore] (also used as a [UcanStore]) for this [Client]
61    pub store: S,
62
63    client: reqwest::Client,
64}
65
66impl<K, S> Client<K, S>
67where
68    K: KeyMaterial + Clone + 'static,
69    S: UcanStore + BlockStore + 'static,
70{
71    /// Initialize the [Client] by perfoming an "identification" handshake with
72    /// a gateway whose API presumably lives at the specified URL. The request
73    /// is authorized (so the provided [Author] must have the appropriate
74    /// credentials), and the gateway responds with a
75    /// [v0alpha1::IdentifyResponse] to verify its own credentials for the
76    /// client.
77    pub async fn identify(
78        sphere_identity: &str,
79        api_base: &Url,
80        author: &Author<K>,
81        did_parser: &mut DidParser,
82        store: S,
83    ) -> Result<Client<K, S>> {
84        debug!("Initializing Noosphere API client");
85        debug!("Client represents sphere {}", sphere_identity);
86        debug!("Client targetting API at {}", api_base);
87
88        let client = reqwest::Client::new();
89
90        let did_response = {
91            let mut url = api_base.clone();
92            url.set_path(&v0alpha1::Route::Did.to_string());
93            client.get(url).send().await?
94        };
95
96        match translate_status_code(did_response.status())? {
97            StatusCode::OK => (),
98            _ => return Err(anyhow!("Unable to look up gateway identity")),
99        };
100
101        let gateway_identity = did_response.text().await?;
102
103        let mut url = api_base.clone();
104        url.set_path(&v0alpha1::Route::Identify.to_string());
105
106        let (jwt, ucan_headers) = Self::make_bearer_token(
107            &gateway_identity,
108            author,
109            &generate_capability(sphere_identity, SphereAbility::Fetch),
110            &store,
111        )
112        .await?;
113
114        let identify_response: v0alpha1::IdentifyResponse = client
115            .get(url)
116            .bearer_auth(jwt)
117            .headers(ucan_headers)
118            .send()
119            .await?
120            .json()
121            .await?;
122
123        identify_response.verify(did_parser, &store).await?;
124
125        debug!(
126            "Handshake succeeded with gateway {}",
127            identify_response.gateway_identity
128        );
129
130        Ok(Client {
131            session: identify_response,
132            sphere_identity: sphere_identity.into(),
133            api_base: api_base.clone(),
134            author: author.clone(),
135            store,
136            client,
137        })
138    }
139
140    async fn make_bearer_token(
141        gateway_identity: &str,
142        author: &Author<K>,
143        capability: &CapabilityView<SphereReference, SphereAbility>,
144        store: &S,
145    ) -> Result<(String, HeaderMap)> {
146        let mut signable = UcanBuilder::default()
147            .issued_by(&author.key)
148            .for_audience(gateway_identity)
149            .with_lifetime(120)
150            .claiming_capability(capability)
151            .with_nonce()
152            .build()?;
153
154        let mut ucan_headers = HeaderMap::new();
155
156        let authorization = author.require_authorization()?;
157        let authorization_cid = Cid::try_from(authorization)?;
158
159        match authorization.as_ucan(store).await {
160            Ok(ucan) => {
161                if let Some(ucan_proofs) = ucan.proofs() {
162                    // TODO(ucan-wg/rs-ucan#37): We should integrate a helper for this kind of stuff into rs-ucan
163                    let mut proofs_to_search: Vec<String> = ucan_proofs.clone();
164
165                    debug!("Making bearer token... {:?}", proofs_to_search);
166
167                    while let Some(cid_string) = proofs_to_search.pop() {
168                        let cid = Cid::from_str(cid_string.as_str())?;
169                        let jwt = store.require_token(&cid).await?;
170                        let ucan = Ucan::from_str(&jwt)?;
171
172                        debug!("Adding UCAN header for {}", cid);
173
174                        if let Some(ucan_proofs) = ucan.proofs() {
175                            proofs_to_search.extend(ucan_proofs.clone().into_iter());
176                        }
177
178                        ucan_headers.append("ucan", format!("{cid} {jwt}").parse()?);
179                    }
180                }
181
182                ucan_headers.append(
183                    "ucan",
184                    format!("{} {}", authorization_cid, ucan.encode()?).parse()?,
185                );
186            }
187            _ => {
188                debug!(
189                    "Unable to resolve authorization to a UCAN; it will be used as a blind proof"
190                )
191            }
192        };
193
194        // TODO(ucan-wg/rs-ucan#32): This is kind of a hack until we can add proofs by CID
195        signable
196            .proofs
197            .push(Cid::try_from(authorization)?.to_string());
198
199        let jwt = signable.sign().await?.encode()?;
200
201        // TODO: It is inefficient to send the same UCANs with every request,
202        // we should probably establish a conventional flow for syncing UCANs
203        // this way only once when pairing a gateway. For now, this is about the
204        // same efficiency as what we had before when UCANs were all inlined to
205        // a single token.
206        Ok((jwt, ucan_headers))
207    }
208
209    /// Replicate content from Noosphere, streaming its blocks from the
210    /// configured gateway.
211    ///
212    /// If [v0alpha1::ReplicateParameters] are specified, then the replication
213    /// will represent incremental history going back to the `since` version.
214    ///
215    /// Otherwise, the full [crate::data::SphereIpld] will be replicated
216    /// (excluding any history).
217    ///
218    /// If the gateway doesn't have the desired content, it will look it up from
219    /// other sources such as IPFS if they are available. Note that this means
220    /// this call can potentially block on upstream access to an IPFS node
221    /// (which, depending on the node's network configuration and peering
222    /// status, can be quite slow).
223    pub async fn replicate<R>(
224        &self,
225        mode: R,
226        params: Option<&v0alpha1::ReplicateParameters>,
227    ) -> Result<(Cid, impl Stream<Item = Result<(Cid, Vec<u8>)>>)>
228    where
229        R: Into<ReplicationMode>,
230    {
231        let mode: ReplicationMode = mode.into();
232        let url = Url::try_from(RouteUrl(
233            &self.api_base,
234            v0alpha1::Route::Replicate(Some(mode.clone())),
235            params,
236        ))?;
237
238        debug!("Client replicating {} from {}", mode, url);
239
240        let capability = generate_capability(&self.sphere_identity, SphereAbility::Fetch);
241
242        let (token, ucan_headers) = Self::make_bearer_token(
243            &self.session.gateway_identity,
244            &self.author,
245            &capability,
246            &self.store,
247        )
248        .await?;
249
250        let response = self
251            .client
252            .get(url)
253            .bearer_auth(token)
254            .headers(ucan_headers)
255            .send()
256            .await?;
257
258        let reader = CarReader::new(StreamReader::new(response.bytes_stream().map(
259            |item| match item {
260                Ok(item) => Ok(item),
261                Err(error) => {
262                    error!("Failed to read CAR stream: {}", error);
263                    Err(std::io::Error::from(std::io::ErrorKind::BrokenPipe))
264                }
265            },
266        )))
267        .await?;
268
269        let root = reader.header().roots().first().cloned().ok_or_else(|| {
270            anyhow!(NoosphereError::UnexpectedGatewayResponse(
271                "Missing replication root".into()
272            ))
273        })?;
274
275        Ok((
276            root,
277            reader.stream().map(|block| match block {
278                Ok(block) => Ok(block),
279                Err(error) => Err(anyhow!(NoosphereError::UnexpectedGatewayResponse(format!(
280                    "Replication stream ended prematurely: {}",
281                    error
282                )))),
283            }),
284        ))
285    }
286
287    /// Fetch the latest, canonical history of the client's sphere from the
288    /// gateway, which serves as the aggregation point for history across many
289    /// clients.
290    pub async fn fetch(
291        &self,
292        params: &v0alpha1::FetchParameters,
293    ) -> Result<Option<(Link<MemoIpld>, impl Stream<Item = Result<(Cid, Vec<u8>)>>)>> {
294        let url = Url::try_from(RouteUrl(
295            &self.api_base,
296            v0alpha1::Route::Fetch,
297            Some(params),
298        ))?;
299
300        debug!("Client fetching blocks from {}", url);
301
302        let capability = generate_capability(&self.sphere_identity, SphereAbility::Fetch);
303        let (token, ucan_headers) = Self::make_bearer_token(
304            &self.session.gateway_identity,
305            &self.author,
306            &capability,
307            &self.store,
308        )
309        .await?;
310
311        let response = self
312            .client
313            .get(url)
314            .bearer_auth(token)
315            .headers(ucan_headers)
316            .send()
317            .await?;
318
319        let reader = CarReader::new(StreamReader::new(response.bytes_stream().map(
320            |item| match item {
321                Ok(item) => Ok(item),
322                Err(error) => {
323                    error!("Failed to read CAR stream: {}", error);
324                    Err(std::io::Error::from(std::io::ErrorKind::BrokenPipe))
325                }
326            },
327        )))
328        .await?;
329
330        let tip = reader.header().roots().first().cloned();
331
332        if let Some(tip) = tip {
333            Ok(match tip.codec() {
334                // Identity codec = no changes
335                0 => None,
336                _ => Some((
337                    tip.into(),
338                    reader.stream().map(|block| match block {
339                        Ok(block) => Ok(block),
340                        Err(error) => Err(anyhow!(error)),
341                    }),
342                )),
343            })
344        } else {
345            Ok(None)
346        }
347    }
348
349    fn make_push_request_stream(
350        store: S,
351        push_body: v0alpha2::PushBody,
352    ) -> impl Stream<Item = Result<Bytes, std::io::Error>> + ConditionalSync + 'static {
353        let root = push_body.local_tip.into();
354        trace!("Creating push stream...");
355
356        let block_stream = try_stream! {
357
358            let history_stream = memo_history_stream(
359                store,
360                &push_body.local_tip,
361                push_body.local_base.as_ref(),
362                true
363            );
364
365            yield block_serialize::<DagCborCodec, _>(push_body)?;
366
367            for await item in history_stream {
368                yield item?;
369            };
370        };
371
372        // Safety: this stream is not shared by us, or by its consumer (reqwest
373        // on native targets, gloo-net on web) to others; the [Unshared] is required
374        // in order for the wrapped [Stream] to satisfy a `Sync` bound.
375        // See: https://github.com/seanmonstar/reqwest/issues/1969
376        UnsharedStream::new(Box::pin(to_car_stream(vec![root], block_stream)))
377    }
378
379    #[cfg(target_arch = "wasm32")]
380    async fn make_push_request(
381        &self,
382        url: Url,
383        ucan_headers: HeaderMap,
384        token: &str,
385        push_body: &v0alpha2::PushBody,
386    ) -> Result<impl Stream<Item = Result<(Cid, Vec<u8>)>> + ConditionalSend, v0alpha2::PushError>
387    {
388        // Implementation note: currently reqwest does not support streaming
389        // request bodies under wasm32 targets even though it is technically
390        // feasiable via [ReadableStream]. So, we jury rig a one-off streaming
391        // request here using wasm-bindgen and wasm-streams:
392
393        use gloo_net::http::Headers;
394        use gloo_net::http::Method;
395        use gloo_net::http::RequestBuilder;
396        use js_sys::{JsString, Uint8Array};
397        use wasm_bindgen::JsValue;
398        use wasm_streams::ReadableStream;
399
400        let headers = Headers::new();
401        headers.append("Authorization", &format!("Bearer {}", token));
402
403        for (name, value) in ucan_headers {
404            if let (Some(name), Ok(value)) = (name, value.to_str()) {
405                headers.append(name.as_str(), value);
406            }
407        }
408
409        let stream = Self::make_push_request_stream(self.store.clone(), push_body.clone());
410
411        let readable_stream = ReadableStream::from_stream(stream.map(|result| match result {
412            Ok(bytes) => Ok(JsValue::from(Uint8Array::from(bytes.as_ref()))),
413            Err(error) => Err(JsValue::from(JsString::from(error.to_string()))),
414        }));
415
416        let request = RequestBuilder::new(url.as_str())
417            .method(Method::PUT)
418            .headers(headers)
419            .body(JsValue::from(readable_stream.as_raw()))
420            .map_err(|error| v0alpha2::PushError::Internal(Some(error.to_string())))?;
421
422        let response = request.send().await.map_err(|error| {
423            warn!("Push request failed: {}", error);
424            v0alpha2::PushError::BrokenUpstream
425        })?;
426
427        let body_stream = response
428            .body()
429            .ok_or_else(|| v0alpha2::PushError::UnexpectedBody)?;
430        let body_stream = ReadableStream::from_raw(wasm_bindgen::JsCast::unchecked_into::<
431            wasm_streams::readable::sys::ReadableStream,
432        >(JsValue::from(body_stream)));
433
434        let car_stream = body_stream.into_stream().map(|result| match result {
435            Ok(value) => match Uint8Array::try_from(value) {
436                Ok(array) => Ok(Bytes::from(array.to_vec())),
437                Err(_) => Err(std::io::Error::new(
438                    std::io::ErrorKind::Other,
439                    v0alpha2::PushError::UnexpectedBody,
440                )),
441            },
442            Err(error) => Err(std::io::Error::new(
443                std::io::ErrorKind::Other,
444                error.as_string().unwrap_or_default(),
445            )),
446        });
447
448        Ok(from_car_stream(car_stream))
449    }
450
451    #[cfg(not(target_arch = "wasm32"))]
452    async fn make_push_request(
453        &self,
454        url: Url,
455        ucan_headers: HeaderMap,
456        token: &str,
457        push_body: &v0alpha2::PushBody,
458    ) -> Result<impl Stream<Item = Result<(Cid, Vec<u8>)>> + ConditionalSend, v0alpha2::PushError>
459    {
460        use reqwest::Body;
461
462        let stream = Self::make_push_request_stream(self.store.clone(), push_body.clone());
463
464        let response = self
465            .client
466            .put(url)
467            .bearer_auth(token)
468            .headers(ucan_headers)
469            .header("Content-Type", "application/octet-stream")
470            .body(Body::wrap_stream(stream))
471            .send()
472            .await
473            .map_err(|error| {
474                warn!("Push request failed: {}", error);
475                v0alpha2::PushError::BrokenUpstream
476            })?;
477
478        let status = translate_status_code(response.status())?;
479        trace!("Checking response ({})...", status);
480
481        if status == StatusCode::CONFLICT {
482            return Err(v0alpha2::PushError::Conflict);
483        }
484
485        trace!("Fielding response...");
486
487        Ok(from_car_stream(response.bytes_stream()))
488    }
489
490    /// Push the latest local history of this client to the gateway
491    pub async fn push(
492        &self,
493        push_body: &v0alpha2::PushBody,
494    ) -> Result<v0alpha2::PushResponse, v0alpha2::PushError> {
495        let url = Url::try_from(RouteUrl::<_, ()>(
496            &self.api_base,
497            v0alpha2::Route::Push,
498            None,
499        ))?;
500        debug!(
501            "Client pushing changes for sphere {} to {}",
502            push_body.sphere, url
503        );
504        let capability = generate_capability(&self.sphere_identity, SphereAbility::Push);
505        let (token, ucan_headers) = Self::make_bearer_token(
506            &self.session.gateway_identity,
507            &self.author,
508            &capability,
509            &self.store,
510        )
511        .await?;
512
513        let block_stream = self
514            .make_push_request(url, ucan_headers, &token, push_body)
515            .await?;
516
517        tokio::pin!(block_stream);
518
519        let push_response = match block_stream.try_next().await? {
520            Some((_, bytes)) => block_deserialize::<DagCborCodec, v0alpha2::PushResponse>(&bytes)?,
521            _ => return Err(v0alpha2::PushError::UnexpectedBody),
522        };
523
524        put_block_stream(self.store.clone(), block_stream)
525            .await
526            .map_err(|error| {
527                warn!("Failed to store blocks from gateway: {}", error);
528                v0alpha2::PushError::BrokenDownstream
529            })?;
530
531        Ok(push_response)
532    }
533}
534
535/// Both `reqwest` and `axum` re-export `StatusCode` from the `http` crate.
536///
537/// We're stuck on reqwest@0.11.20 [1] that uses an older version
538/// of `http::StatusCode`, whereas axum >= 0.7 uses the 1.0 release
539/// of several HTTP libraries (`http`, `http-body`, `hyper`) [2], which
540/// we'd like to use as our canonical representation.
541///
542/// This utility converts between the old `reqwest::StatusCode` to the
543/// >=1.0 implementation. Notably, we do not pull in all of `axum`
544/// into the `noosphere-core` crate, only the common underlying
545/// crate `http@1.0.0` (or greater).
546///
547/// [1] https://github.com/subconsciousnetwork/noosphere/issues/686
548/// [2] https://github.com/tokio-rs/axum/blob/5b6204168a676497d2f4188af603546d9ebfe20a/axum/CHANGELOG.md#070-27-november-2023
549fn translate_status_code(reqwest_code: reqwest::StatusCode) -> Result<StatusCode> {
550    let code: u16 = reqwest_code.into();
551    Ok(code.try_into()?)
552}