use std::str::FromStr;
use crate::{
data::{FetchParameters, FetchResponse, IdentifyResponse, PushBody, PushResponse},
route::{Route, RouteUrl},
};
use anyhow::{anyhow, Result};
use cid::Cid;
use libipld_cbor::DagCborCodec;
use noosphere_car::CarReader;
use noosphere_core::authority::{Author, SphereAction, SphereReference};
use noosphere_storage::{block_deserialize, block_serialize};
use reqwest::{header::HeaderMap, Body, StatusCode};
use tokio_stream::{Stream, StreamExt};
use tokio_util::io::StreamReader;
use ucan::{
builder::UcanBuilder,
capability::{Capability, Resource, With},
crypto::{did::DidParser, KeyMaterial},
store::{UcanJwtStore, UcanStore},
ucan::Ucan,
};
use url::Url;
pub struct Client<K, S>
where
K: KeyMaterial + Clone + 'static,
S: UcanStore,
{
pub session: IdentifyResponse,
pub sphere_identity: String,
pub api_base: Url,
pub author: Author<K>,
pub store: S,
client: reqwest::Client,
}
impl<K, S> Client<K, S>
where
K: KeyMaterial + Clone + 'static,
S: UcanStore,
{
pub async fn identify(
sphere_identity: &str,
api_base: &Url,
author: &Author<K>,
did_parser: &mut DidParser,
store: S,
) -> Result<Client<K, S>> {
debug!("Initializing Noosphere API client");
debug!("Client represents sphere {}", sphere_identity);
debug!("Client targetting API at {}", api_base);
let client = reqwest::Client::new();
let mut url = api_base.clone();
url.set_path(&Route::Did.to_string());
let did_response = client.get(url).send().await?;
match did_response.status() {
StatusCode::OK => (),
_ => return Err(anyhow!("Unable to look up gateway identity")),
};
let gateway_identity = did_response.text().await?;
let mut url = api_base.clone();
url.set_path(&Route::Identify.to_string());
let (jwt, ucan_headers) = Self::make_bearer_token(
&gateway_identity,
author,
&Capability {
with: With::Resource {
kind: Resource::Scoped(SphereReference {
did: sphere_identity.to_string(),
}),
},
can: SphereAction::Fetch,
},
&store,
)
.await?;
let identify_response: IdentifyResponse = client
.get(url)
.bearer_auth(jwt)
.headers(ucan_headers)
.send()
.await?
.json()
.await?;
identify_response.verify(did_parser, &store).await?;
debug!(
"Handshake succeeded with gateway {}",
identify_response.gateway_identity
);
Ok(Client {
session: identify_response,
sphere_identity: sphere_identity.into(),
api_base: api_base.clone(),
author: author.clone(),
store,
client,
})
}
async fn make_bearer_token(
gateway_identity: &str,
author: &Author<K>,
capability: &Capability<SphereReference, SphereAction>,
store: &S,
) -> Result<(String, HeaderMap)> {
let mut signable = UcanBuilder::default()
.issued_by(&author.key)
.for_audience(gateway_identity)
.with_lifetime(120)
.claiming_capability(capability)
.with_nonce()
.build()?;
let mut ucan_headers = HeaderMap::new();
let authorization = author.require_authorization()?;
let authorization_cid = Cid::try_from(authorization)?;
match authorization.resolve_ucan(store).await {
Ok(ucan) => {
let mut proofs_to_search: Vec<String> = ucan.proofs().clone();
debug!("Making bearer token... {:?}", proofs_to_search);
while let Some(cid_string) = proofs_to_search.pop() {
let cid = Cid::from_str(cid_string.as_str())?;
let jwt = store.require_token(&cid).await?;
let ucan = Ucan::from_str(&jwt)?;
debug!("Adding UCAN header for {}", cid);
proofs_to_search.extend(ucan.proofs().clone().into_iter());
ucan_headers.append("ucan", format!("{cid} {jwt}").parse()?);
}
ucan_headers.append(
"ucan",
format!("{} {}", authorization_cid, ucan.encode()?).parse()?,
);
}
_ => {
warn!("Unable to resolve authorization to a UCAN; it will be used as a blind proof")
}
};
signable
.proofs
.push(Cid::try_from(authorization)?.to_string());
let jwt = signable.sign().await?.encode()?;
Ok((jwt, ucan_headers))
}
pub async fn replicate(
&self,
memo_version: &Cid,
) -> Result<impl Stream<Item = Result<(Cid, Vec<u8>)>>> {
let url = Url::try_from(RouteUrl::<()>(
&self.api_base,
Route::Replicate(Some(*memo_version)),
None,
))?;
debug!("Client replicating memo from {}", url);
let capability = Capability {
with: With::Resource {
kind: Resource::Scoped(SphereReference {
did: self.sphere_identity.clone(),
}),
},
can: SphereAction::Fetch,
};
let (token, ucan_headers) = Self::make_bearer_token(
&self.session.gateway_identity,
&self.author,
&capability,
&self.store,
)
.await?;
let response = self
.client
.get(url)
.bearer_auth(token)
.headers(ucan_headers)
.send()
.await?;
Ok(
CarReader::new(StreamReader::new(response.bytes_stream().map(
|item| match item {
Ok(item) => Ok(item),
Err(error) => {
error!("Failed to read CAR stream: {}", error);
Err(std::io::Error::from(std::io::ErrorKind::BrokenPipe))
}
},
)))
.await?
.stream()
.map(|block| match block {
Ok(block) => Ok(block),
Err(error) => Err(anyhow!(error)),
}),
)
}
pub async fn fetch(&self, params: &FetchParameters) -> Result<FetchResponse> {
let url = Url::try_from(RouteUrl(&self.api_base, Route::Fetch, Some(params)))?;
debug!("Client fetching blocks from {}", url);
let capability = Capability {
with: With::Resource {
kind: Resource::Scoped(SphereReference {
did: self.sphere_identity.clone(),
}),
},
can: SphereAction::Fetch,
};
let (token, ucan_headers) = Self::make_bearer_token(
&self.session.gateway_identity,
&self.author,
&capability,
&self.store,
)
.await?;
let bytes = self
.client
.get(url)
.bearer_auth(token)
.headers(ucan_headers)
.send()
.await?
.bytes()
.await?;
block_deserialize::<DagCborCodec, _>(&bytes)
}
pub async fn push(&self, push_body: &PushBody) -> Result<PushResponse> {
let url = Url::try_from(RouteUrl::<()>(&self.api_base, Route::Push, None))?;
debug!(
"Client pushing {} blocks for sphere {} to {}",
push_body.blocks.len(),
push_body.sphere,
url
);
let capability = Capability {
with: With::Resource {
kind: Resource::Scoped(SphereReference {
did: self.sphere_identity.clone(),
}),
},
can: SphereAction::Push,
};
let (token, ucan_headers) = Self::make_bearer_token(
&self.session.gateway_identity,
&self.author,
&capability,
&self.store,
)
.await?;
let (_, push_body_bytes) = block_serialize::<DagCborCodec, _>(push_body)?;
let bytes = self
.client
.put(url)
.bearer_auth(token)
.headers(ucan_headers)
.header("Content-Type", "application/octet-stream")
.body(Body::from(push_body_bytes))
.send()
.await?
.bytes()
.await?;
block_deserialize::<DagCborCodec, _>(bytes.as_ref())
}
}