noosphere_api/
client.rs

1use std::str::FromStr;
2
3use crate::{
4    data::{
5        FetchParameters, IdentifyResponse, PushBody, PushError, PushResponse, ReplicateParameters,
6    },
7    route::{Route, RouteUrl},
8};
9
10use anyhow::{anyhow, Result};
11use cid::Cid;
12use iroh_car::CarReader;
13use libipld_cbor::DagCborCodec;
14
15use noosphere_core::{
16    authority::{generate_capability, Author, SphereAbility, SphereReference},
17    data::{Link, MemoIpld},
18};
19use noosphere_storage::{block_deserialize, block_serialize};
20use reqwest::{header::HeaderMap, Body, StatusCode};
21use tokio_stream::{Stream, StreamExt};
22use tokio_util::io::StreamReader;
23use ucan::{
24    builder::UcanBuilder,
25    capability::CapabilityView,
26    crypto::{did::DidParser, KeyMaterial},
27    store::{UcanJwtStore, UcanStore},
28    ucan::Ucan,
29};
30use url::Url;
31
32/// A [Client] is a simple, portable HTTP client for the Noosphere gateway REST
33/// API. It embodies the intended usage of the REST API, which includes an
34/// opening handshake (with associated key verification) and various
35/// UCAN-authorized verbs over sphere data.
36pub struct Client<K, S>
37where
38    K: KeyMaterial + Clone + 'static,
39    S: UcanStore,
40{
41    pub session: IdentifyResponse,
42    pub sphere_identity: String,
43    pub api_base: Url,
44    pub author: Author<K>,
45    pub store: S,
46    client: reqwest::Client,
47}
48
49impl<K, S> Client<K, S>
50where
51    K: KeyMaterial + Clone + 'static,
52    S: UcanStore,
53{
54    pub async fn identify(
55        sphere_identity: &str,
56        api_base: &Url,
57        author: &Author<K>,
58        did_parser: &mut DidParser,
59        store: S,
60    ) -> Result<Client<K, S>> {
61        debug!("Initializing Noosphere API client");
62        debug!("Client represents sphere {}", sphere_identity);
63        debug!("Client targetting API at {}", api_base);
64
65        let client = reqwest::Client::new();
66
67        let mut url = api_base.clone();
68        url.set_path(&Route::Did.to_string());
69
70        let did_response = client.get(url).send().await?;
71
72        match did_response.status() {
73            StatusCode::OK => (),
74            _ => return Err(anyhow!("Unable to look up gateway identity")),
75        };
76
77        let gateway_identity = did_response.text().await?;
78
79        let mut url = api_base.clone();
80        url.set_path(&Route::Identify.to_string());
81
82        let (jwt, ucan_headers) = Self::make_bearer_token(
83            &gateway_identity,
84            author,
85            &generate_capability(sphere_identity, SphereAbility::Fetch),
86            &store,
87        )
88        .await?;
89
90        let identify_response: IdentifyResponse = client
91            .get(url)
92            .bearer_auth(jwt)
93            .headers(ucan_headers)
94            .send()
95            .await?
96            .json()
97            .await?;
98
99        identify_response.verify(did_parser, &store).await?;
100
101        debug!(
102            "Handshake succeeded with gateway {}",
103            identify_response.gateway_identity
104        );
105
106        Ok(Client {
107            session: identify_response,
108            sphere_identity: sphere_identity.into(),
109            api_base: api_base.clone(),
110            author: author.clone(),
111            store,
112            client,
113        })
114    }
115
116    async fn make_bearer_token(
117        gateway_identity: &str,
118        author: &Author<K>,
119        capability: &CapabilityView<SphereReference, SphereAbility>,
120        store: &S,
121    ) -> Result<(String, HeaderMap)> {
122        let mut signable = UcanBuilder::default()
123            .issued_by(&author.key)
124            .for_audience(gateway_identity)
125            .with_lifetime(120)
126            .claiming_capability(capability)
127            .with_nonce()
128            .build()?;
129
130        let mut ucan_headers = HeaderMap::new();
131
132        let authorization = author.require_authorization()?;
133        let authorization_cid = Cid::try_from(authorization)?;
134
135        match authorization.as_ucan(store).await {
136            Ok(ucan) => {
137                if let Some(ucan_proofs) = ucan.proofs() {
138                    // TODO(ucan-wg/rs-ucan#37): We should integrate a helper for this kind of stuff into rs-ucan
139                    let mut proofs_to_search: Vec<String> = ucan_proofs.clone();
140
141                    debug!("Making bearer token... {:?}", proofs_to_search);
142
143                    while let Some(cid_string) = proofs_to_search.pop() {
144                        let cid = Cid::from_str(cid_string.as_str())?;
145                        let jwt = store.require_token(&cid).await?;
146                        let ucan = Ucan::from_str(&jwt)?;
147
148                        debug!("Adding UCAN header for {}", cid);
149
150                        if let Some(ucan_proofs) = ucan.proofs() {
151                            proofs_to_search.extend(ucan_proofs.clone().into_iter());
152                        }
153
154                        ucan_headers.append("ucan", format!("{cid} {jwt}").parse()?);
155                    }
156                }
157
158                ucan_headers.append(
159                    "ucan",
160                    format!("{} {}", authorization_cid, ucan.encode()?).parse()?,
161                );
162            }
163            _ => {
164                debug!(
165                    "Unable to resolve authorization to a UCAN; it will be used as a blind proof"
166                )
167            }
168        };
169
170        // TODO(ucan-wg/rs-ucan#32): This is kind of a hack until we can add proofs by CID
171        signable
172            .proofs
173            .push(Cid::try_from(authorization)?.to_string());
174
175        let jwt = signable.sign().await?.encode()?;
176
177        // TODO: It is inefficient to send the same UCANs with every request,
178        // we should probably establish a conventional flow for syncing UCANs
179        // this way only once when pairing a gateway. For now, this is about the
180        // same efficiency as what we had before when UCANs were all inlined to
181        // a single token.
182        Ok((jwt, ucan_headers))
183    }
184
185    /// Replicate content from Noosphere, streaming its blocks from the
186    /// configured gateway. If the gateway doesn't have the desired content, it
187    /// will look it up from other sources such as IPFS if they are available.
188    /// Note that this means this call can potentially block on upstream
189    /// access to an IPFS node (which, depending on the node's network
190    /// configuration and peering status, can be quite slow).
191    pub async fn replicate(
192        &self,
193        memo_version: &Cid,
194        params: Option<&ReplicateParameters>,
195    ) -> Result<impl Stream<Item = Result<(Cid, Vec<u8>)>>> {
196        let url = Url::try_from(RouteUrl(
197            &self.api_base,
198            Route::Replicate(Some(*memo_version)),
199            params,
200        ))?;
201
202        debug!("Client replicating {} from {}", memo_version, url);
203
204        let capability = generate_capability(&self.sphere_identity, SphereAbility::Fetch);
205
206        let (token, ucan_headers) = Self::make_bearer_token(
207            &self.session.gateway_identity,
208            &self.author,
209            &capability,
210            &self.store,
211        )
212        .await?;
213
214        let response = self
215            .client
216            .get(url)
217            .bearer_auth(token)
218            .headers(ucan_headers)
219            .send()
220            .await?;
221
222        Ok(
223            CarReader::new(StreamReader::new(response.bytes_stream().map(
224                |item| match item {
225                    Ok(item) => Ok(item),
226                    Err(error) => {
227                        error!("Failed to read CAR stream: {}", error);
228                        Err(std::io::Error::from(std::io::ErrorKind::BrokenPipe))
229                    }
230                },
231            )))
232            .await?
233            .stream()
234            .map(|block| match block {
235                Ok(block) => Ok(block),
236                Err(error) => Err(anyhow!(error)),
237            }),
238        )
239    }
240
241    pub async fn fetch(
242        &self,
243        params: &FetchParameters,
244    ) -> Result<Option<(Link<MemoIpld>, impl Stream<Item = Result<(Cid, Vec<u8>)>>)>> {
245        let url = Url::try_from(RouteUrl(&self.api_base, Route::Fetch, Some(params)))?;
246
247        debug!("Client fetching blocks from {}", url);
248
249        let capability = generate_capability(&self.sphere_identity, SphereAbility::Fetch);
250        let (token, ucan_headers) = Self::make_bearer_token(
251            &self.session.gateway_identity,
252            &self.author,
253            &capability,
254            &self.store,
255        )
256        .await?;
257
258        let response = self
259            .client
260            .get(url)
261            .bearer_auth(token)
262            .headers(ucan_headers)
263            .send()
264            .await?;
265
266        let reader = CarReader::new(StreamReader::new(response.bytes_stream().map(
267            |item| match item {
268                Ok(item) => Ok(item),
269                Err(error) => {
270                    error!("Failed to read CAR stream: {}", error);
271                    Err(std::io::Error::from(std::io::ErrorKind::BrokenPipe))
272                }
273            },
274        )))
275        .await?;
276
277        let tip = reader.header().roots().first().cloned();
278
279        if let Some(tip) = tip {
280            Ok(match tip.codec() {
281                // Identity codec = no changes
282                0 => None,
283                _ => Some((
284                    tip.into(),
285                    reader.stream().map(|block| match block {
286                        Ok(block) => Ok(block),
287                        Err(error) => Err(anyhow!(error)),
288                    }),
289                )),
290            })
291        } else {
292            Ok(None)
293        }
294    }
295
296    pub async fn push(&self, push_body: &PushBody) -> Result<PushResponse, PushError> {
297        let url = Url::try_from(RouteUrl::<()>(&self.api_base, Route::Push, None))?;
298        debug!(
299            "Client pushing {} blocks for sphere {} to {}",
300            push_body.blocks.len(),
301            push_body.sphere,
302            url
303        );
304        let capability = generate_capability(&self.sphere_identity, SphereAbility::Push);
305        let (token, ucan_headers) = Self::make_bearer_token(
306            &self.session.gateway_identity,
307            &self.author,
308            &capability,
309            &self.store,
310        )
311        .await?;
312
313        let (_, push_body_bytes) = block_serialize::<DagCborCodec, _>(push_body)?;
314
315        let response = self
316            .client
317            .put(url)
318            .bearer_auth(token)
319            .headers(ucan_headers)
320            .header("Content-Type", "application/octet-stream")
321            .body(Body::from(push_body_bytes))
322            .send()
323            .await
324            .map_err(|err| PushError::Internal(anyhow!(err)))?;
325
326        if response.status() == StatusCode::CONFLICT {
327            return Err(PushError::Conflict);
328        }
329
330        let bytes = response
331            .bytes()
332            .await
333            .map_err(|err| PushError::Internal(anyhow!(err)))?;
334        Ok(block_deserialize::<DagCborCodec, _>(bytes.as_ref())?)
335    }
336}