1use std::str::FromStr;
2
3use crate::{
4 data::{
5 FetchParameters, IdentifyResponse, PushBody, PushError, PushResponse, ReplicateParameters,
6 },
7 route::{Route, RouteUrl},
8};
9
10use anyhow::{anyhow, Result};
11use cid::Cid;
12use iroh_car::CarReader;
13use libipld_cbor::DagCborCodec;
14
15use noosphere_core::{
16 authority::{generate_capability, Author, SphereAbility, SphereReference},
17 data::{Link, MemoIpld},
18};
19use noosphere_storage::{block_deserialize, block_serialize};
20use reqwest::{header::HeaderMap, Body, StatusCode};
21use tokio_stream::{Stream, StreamExt};
22use tokio_util::io::StreamReader;
23use ucan::{
24 builder::UcanBuilder,
25 capability::CapabilityView,
26 crypto::{did::DidParser, KeyMaterial},
27 store::{UcanJwtStore, UcanStore},
28 ucan::Ucan,
29};
30use url::Url;
31
32pub struct Client<K, S>
37where
38 K: KeyMaterial + Clone + 'static,
39 S: UcanStore,
40{
41 pub session: IdentifyResponse,
42 pub sphere_identity: String,
43 pub api_base: Url,
44 pub author: Author<K>,
45 pub store: S,
46 client: reqwest::Client,
47}
48
49impl<K, S> Client<K, S>
50where
51 K: KeyMaterial + Clone + 'static,
52 S: UcanStore,
53{
54 pub async fn identify(
55 sphere_identity: &str,
56 api_base: &Url,
57 author: &Author<K>,
58 did_parser: &mut DidParser,
59 store: S,
60 ) -> Result<Client<K, S>> {
61 debug!("Initializing Noosphere API client");
62 debug!("Client represents sphere {}", sphere_identity);
63 debug!("Client targetting API at {}", api_base);
64
65 let client = reqwest::Client::new();
66
67 let mut url = api_base.clone();
68 url.set_path(&Route::Did.to_string());
69
70 let did_response = client.get(url).send().await?;
71
72 match did_response.status() {
73 StatusCode::OK => (),
74 _ => return Err(anyhow!("Unable to look up gateway identity")),
75 };
76
77 let gateway_identity = did_response.text().await?;
78
79 let mut url = api_base.clone();
80 url.set_path(&Route::Identify.to_string());
81
82 let (jwt, ucan_headers) = Self::make_bearer_token(
83 &gateway_identity,
84 author,
85 &generate_capability(sphere_identity, SphereAbility::Fetch),
86 &store,
87 )
88 .await?;
89
90 let identify_response: IdentifyResponse = client
91 .get(url)
92 .bearer_auth(jwt)
93 .headers(ucan_headers)
94 .send()
95 .await?
96 .json()
97 .await?;
98
99 identify_response.verify(did_parser, &store).await?;
100
101 debug!(
102 "Handshake succeeded with gateway {}",
103 identify_response.gateway_identity
104 );
105
106 Ok(Client {
107 session: identify_response,
108 sphere_identity: sphere_identity.into(),
109 api_base: api_base.clone(),
110 author: author.clone(),
111 store,
112 client,
113 })
114 }
115
116 async fn make_bearer_token(
117 gateway_identity: &str,
118 author: &Author<K>,
119 capability: &CapabilityView<SphereReference, SphereAbility>,
120 store: &S,
121 ) -> Result<(String, HeaderMap)> {
122 let mut signable = UcanBuilder::default()
123 .issued_by(&author.key)
124 .for_audience(gateway_identity)
125 .with_lifetime(120)
126 .claiming_capability(capability)
127 .with_nonce()
128 .build()?;
129
130 let mut ucan_headers = HeaderMap::new();
131
132 let authorization = author.require_authorization()?;
133 let authorization_cid = Cid::try_from(authorization)?;
134
135 match authorization.as_ucan(store).await {
136 Ok(ucan) => {
137 if let Some(ucan_proofs) = ucan.proofs() {
138 let mut proofs_to_search: Vec<String> = ucan_proofs.clone();
140
141 debug!("Making bearer token... {:?}", proofs_to_search);
142
143 while let Some(cid_string) = proofs_to_search.pop() {
144 let cid = Cid::from_str(cid_string.as_str())?;
145 let jwt = store.require_token(&cid).await?;
146 let ucan = Ucan::from_str(&jwt)?;
147
148 debug!("Adding UCAN header for {}", cid);
149
150 if let Some(ucan_proofs) = ucan.proofs() {
151 proofs_to_search.extend(ucan_proofs.clone().into_iter());
152 }
153
154 ucan_headers.append("ucan", format!("{cid} {jwt}").parse()?);
155 }
156 }
157
158 ucan_headers.append(
159 "ucan",
160 format!("{} {}", authorization_cid, ucan.encode()?).parse()?,
161 );
162 }
163 _ => {
164 debug!(
165 "Unable to resolve authorization to a UCAN; it will be used as a blind proof"
166 )
167 }
168 };
169
170 signable
172 .proofs
173 .push(Cid::try_from(authorization)?.to_string());
174
175 let jwt = signable.sign().await?.encode()?;
176
177 Ok((jwt, ucan_headers))
183 }
184
185 pub async fn replicate(
192 &self,
193 memo_version: &Cid,
194 params: Option<&ReplicateParameters>,
195 ) -> Result<impl Stream<Item = Result<(Cid, Vec<u8>)>>> {
196 let url = Url::try_from(RouteUrl(
197 &self.api_base,
198 Route::Replicate(Some(*memo_version)),
199 params,
200 ))?;
201
202 debug!("Client replicating {} from {}", memo_version, url);
203
204 let capability = generate_capability(&self.sphere_identity, SphereAbility::Fetch);
205
206 let (token, ucan_headers) = Self::make_bearer_token(
207 &self.session.gateway_identity,
208 &self.author,
209 &capability,
210 &self.store,
211 )
212 .await?;
213
214 let response = self
215 .client
216 .get(url)
217 .bearer_auth(token)
218 .headers(ucan_headers)
219 .send()
220 .await?;
221
222 Ok(
223 CarReader::new(StreamReader::new(response.bytes_stream().map(
224 |item| match item {
225 Ok(item) => Ok(item),
226 Err(error) => {
227 error!("Failed to read CAR stream: {}", error);
228 Err(std::io::Error::from(std::io::ErrorKind::BrokenPipe))
229 }
230 },
231 )))
232 .await?
233 .stream()
234 .map(|block| match block {
235 Ok(block) => Ok(block),
236 Err(error) => Err(anyhow!(error)),
237 }),
238 )
239 }
240
241 pub async fn fetch(
242 &self,
243 params: &FetchParameters,
244 ) -> Result<Option<(Link<MemoIpld>, impl Stream<Item = Result<(Cid, Vec<u8>)>>)>> {
245 let url = Url::try_from(RouteUrl(&self.api_base, Route::Fetch, Some(params)))?;
246
247 debug!("Client fetching blocks from {}", url);
248
249 let capability = generate_capability(&self.sphere_identity, SphereAbility::Fetch);
250 let (token, ucan_headers) = Self::make_bearer_token(
251 &self.session.gateway_identity,
252 &self.author,
253 &capability,
254 &self.store,
255 )
256 .await?;
257
258 let response = self
259 .client
260 .get(url)
261 .bearer_auth(token)
262 .headers(ucan_headers)
263 .send()
264 .await?;
265
266 let reader = CarReader::new(StreamReader::new(response.bytes_stream().map(
267 |item| match item {
268 Ok(item) => Ok(item),
269 Err(error) => {
270 error!("Failed to read CAR stream: {}", error);
271 Err(std::io::Error::from(std::io::ErrorKind::BrokenPipe))
272 }
273 },
274 )))
275 .await?;
276
277 let tip = reader.header().roots().first().cloned();
278
279 if let Some(tip) = tip {
280 Ok(match tip.codec() {
281 0 => None,
283 _ => Some((
284 tip.into(),
285 reader.stream().map(|block| match block {
286 Ok(block) => Ok(block),
287 Err(error) => Err(anyhow!(error)),
288 }),
289 )),
290 })
291 } else {
292 Ok(None)
293 }
294 }
295
296 pub async fn push(&self, push_body: &PushBody) -> Result<PushResponse, PushError> {
297 let url = Url::try_from(RouteUrl::<()>(&self.api_base, Route::Push, None))?;
298 debug!(
299 "Client pushing {} blocks for sphere {} to {}",
300 push_body.blocks.len(),
301 push_body.sphere,
302 url
303 );
304 let capability = generate_capability(&self.sphere_identity, SphereAbility::Push);
305 let (token, ucan_headers) = Self::make_bearer_token(
306 &self.session.gateway_identity,
307 &self.author,
308 &capability,
309 &self.store,
310 )
311 .await?;
312
313 let (_, push_body_bytes) = block_serialize::<DagCborCodec, _>(push_body)?;
314
315 let response = self
316 .client
317 .put(url)
318 .bearer_auth(token)
319 .headers(ucan_headers)
320 .header("Content-Type", "application/octet-stream")
321 .body(Body::from(push_body_bytes))
322 .send()
323 .await
324 .map_err(|err| PushError::Internal(anyhow!(err)))?;
325
326 if response.status() == StatusCode::CONFLICT {
327 return Err(PushError::Conflict);
328 }
329
330 let bytes = response
331 .bytes()
332 .await
333 .map_err(|err| PushError::Internal(anyhow!(err)))?;
334 Ok(block_deserialize::<DagCborCodec, _>(bytes.as_ref())?)
335 }
336}