use std::path::{Path, PathBuf};
use std::time::Duration;
use apikit::payload::{ErrorResponse, MessageResponse};
use bytes::Bytes;
use futures::{Stream, TryStreamExt};
use header::HeaderName;
use interface::{BlobMeta, MetadataList, Query, QueryResponse, RoutingConfig};
use hyper::{header, StatusCode};
use protocol::directory::{auth::*, blobmeta::*, routing::*, storage::*};
use protocol::storage::PutResponse;
use reqwest::{Client as ReqwestClient, Request};
use reqwest::Body;
use serde::de::DeserializeOwned;
use snafu::prelude::*;
use tokio_util::codec::{BytesCodec, FramedRead};
use crate::{ClientBuilder, Meta, Parameters};
#[derive(Debug, Snafu)]
pub enum ClientError {
#[snafu(display("failed to build reqwest client: {}", source))]
ClientBuildError { source: reqwest::Error },
#[snafu(display("failed to fetch response body: {}", source))]
FetchBodyError { source: reqwest::Error },
#[snafu(display("file [{:?}] does not exist", path))]
FileDoesNotExist { path: PathBuf },
#[snafu(display("failed to load the metadata for '{:?}': {}", path, source))]
FileMetadataError {
source: std::io::Error,
path: PathBuf,
},
#[snafu(display("failed to serialize metadata [{:?}]: {}", meta, source))]
MetaSerializationError {
source: serde_json::Error,
meta: Meta,
},
#[snafu(display("the redirect limit of {} was exceeded", limit))]
RedirectLimitExceeded { limit: u32 },
#[snafu(display("failed to build request: {}", source))]
RequestBuildError { source: reqwest::Error },
#[snafu(display("failed to execute request: {}", source))]
RequestExecutionError { source: reqwest::Error },
#[snafu(display("failed to deserialize response: {}", source))]
ResponseDeserializationError { source: serde_json::Error },
#[snafu(display("server returned an error: {}", message))]
ServerReturnedError { message: String },
#[snafu(display("did not get a redirect when expected"))]
MissingRedirect,
#[snafu(display("did not receive a request id when expected"))]
MissingRequestId,
#[snafu(display("too many retries"))]
TooManyRetries,
#[snafu(display("{}", message))]
UnknownError { message: String },
}
fn encode_metadata(meta: Meta) -> Result<String> {
let serialized_meta = serde_json::to_vec(&meta).context(MetaSerializationSnafu { meta })?;
Ok(base64::encode(&serialized_meta))
}
async fn extract_body<T: DeserializeOwned>(response: reqwest::Response) -> Result<T> {
let body_bytes = response.bytes().await.context(FetchBodySnafu)?;
tracing::debug!("body: {}", String::from_utf8_lossy(&body_bytes));
serde_json::from_slice(body_bytes.as_ref()).context(ResponseDeserializationSnafu)
}
async fn extract_error(response: reqwest::Response) -> ClientError {
match extract_body::<ErrorResponse>(response).await {
Ok(e) => ClientError::ServerReturnedError { message: e.error },
Err(e) => ClientError::UnknownError {
message: e.to_string(),
},
}
}
async fn extract<T: DeserializeOwned>(response: reqwest::Response) -> Result<T> {
let status = response.status();
if status.is_success() {
extract_body(response).await
} else {
Err(extract_error(response).await)
}
}
struct RedirectResponse {
pub location: String,
pub request_id: String,
}
#[derive(Clone)]
pub struct Client {
client: ReqwestClient,
host: String,
token: String,
}
type Result<T> = std::result::Result<T, ClientError>;
impl Client {
pub async fn new<S: Into<String>, U: Into<String>, P: Into<String>>(
directory_host: S,
username: U,
password: P,
) -> Result<Self> {
Client::new_with_params(Parameters {
host: directory_host.into(),
username: username.into(),
password: password.into(),
pool_idle_timeout: Duration::from_secs(5),
request_timeout: Duration::from_secs(60),
})
.await
}
pub fn builder() -> ClientBuilder {
ClientBuilder::default()
}
pub(crate) async fn new_with_params(params: Parameters) -> Result<Self> {
let client = ReqwestClient::builder()
.pool_idle_timeout(params.pool_idle_timeout)
.timeout(params.request_timeout)
.redirect(reqwest::redirect::Policy::none())
.build()
.context(ClientBuildSnafu)?;
let token =
Client::login(&client, ¶ms.host, ¶ms.username, ¶ms.password).await?;
Ok(Self {
host: params.host,
client,
token,
})
}
async fn prepare_push_request<P: AsRef<Path>>(
&self,
url: &str,
request_id: &str,
path: P,
encoded_meta: &str,
file_length: u64,
) -> Result<reqwest::Request> {
let mut request_builder = self
.client
.post(url)
.bearer_auth(&self.token)
.header(header::HeaderName::from_static("x-blob-meta"), encoded_meta)
.header(header::HeaderName::from_static("x-request-id"), request_id);
if path.as_ref().is_file() {
let file = tokio::fs::File::open(path.as_ref()).await.unwrap();
let stream = FramedRead::new(file, BytesCodec::new());
request_builder = request_builder
.body(Body::wrap_stream(stream))
.header(HeaderName::from_static("x-blob-size"), file_length);
} else {
request_builder = request_builder.header(HeaderName::from_static("x-blob-size"), 0_u64);
}
request_builder.build().context(RequestBuildSnafu)
}
async fn request_with_redirect(&self, request: Request) -> Result<RedirectResponse> {
let response = self
.client
.execute(request)
.await
.context(RequestExecutionSnafu)?;
ensure!(
response.status() == StatusCode::TEMPORARY_REDIRECT,
MissingRedirectSnafu
);
let new_location = response
.headers()
.get(header::LOCATION)
.ok_or(ClientError::MissingRedirect)?;
let request_id = response
.headers()
.get("x-request-id")
.ok_or(ClientError::MissingRequestId)?;
let new_url = new_location
.to_str()
.expect("redirect location is not UTF-8");
tracing::debug!("redirect to {}", new_url);
Ok(RedirectResponse {
location: new_url.to_string(),
request_id: String::from_utf8(request_id.as_bytes().to_vec()).unwrap(), })
}
async fn login(
client: &ReqwestClient,
host: &str,
username: &str,
password: &str,
) -> Result<String> {
let url = format!("{}/auth/login", host);
let response = client
.post(&url)
.json(&LoginRequest {
username: username.to_string(),
password: password.to_string(),
})
.send()
.await
.context(RequestExecutionSnafu)?;
let resp: LoginResponse = extract(response).await?;
Ok(resp.token)
}
pub async fn register(&self, username: &str, password: &str) -> Result<String> {
let url = format!("{}/auth/register", self.host);
let response = self
.client
.post(&url)
.bearer_auth(&self.token)
.json(&RegisterRequest {
username: username.to_string(),
password: password.to_string(),
})
.send()
.await
.context(RequestExecutionSnafu)?;
let resp: LoginResponse = extract(response).await?;
Ok(resp.token)
}
pub async fn create_empty(&self, meta: Meta) -> Result<String> {
let url = format!("{}/blob", self.host);
let meta_b64 = encode_metadata(meta)?;
let redirect_req = self
.client
.post(&url)
.bearer_auth(&self.token)
.header(HeaderName::from_static("x-blob-meta"), meta_b64.clone())
.header(HeaderName::from_static("x-blob-size"), 0_u64)
.build()
.context(RequestBuildSnafu)?;
let RedirectResponse {
location,
request_id,
} = self.request_with_redirect(redirect_req).await?;
let response = self
.client
.post(&location)
.bearer_auth(&self.token)
.header(HeaderName::from_static("x-blob-meta"), &meta_b64)
.header(HeaderName::from_static("x-blob-size"), 0_u64)
.header(HeaderName::from_static("x-request-id"), &request_id)
.send()
.await
.context(RequestExecutionSnafu)?;
let put_response: PutResponse = extract(response).await?;
Ok(put_response.id)
}
async fn push_internal<P: AsRef<Path>>(
&self,
path: P,
meta: Meta,
base_url: String,
) -> Result<String> {
ensure!(
path.as_ref().exists(),
FileDoesNotExistSnafu {
path: PathBuf::from(path.as_ref())
}
);
let url = base_url;
let meta_b64 = encode_metadata(meta)?;
let file_length = path
.as_ref()
.metadata()
.context(FileMetadataSnafu {
path: PathBuf::from(path.as_ref()),
})?
.len();
let initial_redirect_request = self
.client
.post(&url)
.bearer_auth(&self.token)
.header(
header::HeaderName::from_static("x-blob-meta"),
meta_b64.clone(),
)
.header(HeaderName::from_static("x-blob-size"), file_length)
.build()
.context(RequestBuildSnafu)?;
let RedirectResponse {
location,
request_id,
} = self.request_with_redirect(initial_redirect_request).await?;
let request = self
.prepare_push_request(
&location,
&request_id,
path.as_ref(),
&meta_b64,
file_length,
)
.await?;
let response = self
.client
.execute(request)
.await
.context(RequestExecutionSnafu)?;
let put_response: PutResponse = extract(response).await?;
Ok(put_response.id)
}
pub async fn health(&self) -> Result<String> {
let url = format!("{}/health", self.host);
let response = self
.client
.get(&url)
.send()
.await
.context(RequestExecutionSnafu)?;
let status = response.status();
if status.is_success() {
let msg: MessageResponse = extract_body(response).await?;
Ok(msg.message)
} else {
Err(extract_error(response).await)
}
}
pub async fn list_storage_nodes(&self) -> Result<ListStorageNodesResponse> {
let url = format!("{}/node/storage", self.host);
let response = self
.client
.get(&url)
.bearer_auth(&self.token)
.send()
.await
.context(RequestExecutionSnafu)?;
extract_body(response).await
}
pub async fn push<P: AsRef<Path>>(&self, path: P, meta: Meta) -> Result<String> {
self.push_internal(path, meta, format!("{}/blob", self.host))
.await
}
pub async fn update_blob<P: AsRef<Path>>(
&self,
blob_id: &str,
path: P,
meta: Meta,
) -> Result<String> {
self.push_internal(path, meta, format!("{}/blob/{}", self.host, blob_id))
.await
}
pub async fn list_meta(
&self,
tags: Option<Vec<String>>,
meta_keys: Option<Vec<String>>,
) -> Result<MetadataList> {
let url = format!("{}/metadata", &self.host);
let response = self
.client
.get(&url)
.bearer_auth(&self.token)
.json(&ListMetadataRequest {
tags,
fields: meta_keys,
})
.send()
.await
.context(RequestExecutionSnafu)?;
extract(response).await
}
pub async fn update_meta(&self, blob_id: &str, meta: Meta) -> Result<()> {
let url = format!("{}/blob/{}/metadata", self.host, blob_id);
let request = self
.client
.put(&url)
.bearer_auth(&self.token)
.json(&meta)
.build()
.context(RequestBuildSnafu)?;
let RedirectResponse {
location,
request_id,
} = self.request_with_redirect(request).await?;
let response = self
.client
.put(&location)
.bearer_auth(&self.token)
.header(HeaderName::from_static("x-request-id"), request_id)
.json(&meta)
.send()
.await
.context(RequestExecutionSnafu)?;
if response.status().is_success() {
Ok(())
} else {
Err(extract_error(response).await)
}
}
pub async fn fsync(&self, blob_id: &str) -> Result<()> {
let url = format!("{}/blob/{}/fsync", self.host, blob_id);
let request = self
.client
.post(&url)
.bearer_auth(&self.token)
.build()
.context(RequestBuildSnafu)?;
let RedirectResponse {
location,
request_id,
} = self.request_with_redirect(request).await?;
let response = self
.client
.post(&location)
.bearer_auth(&self.token)
.header(HeaderName::from_static("x-request-id"), request_id)
.send()
.await
.context(RequestBuildSnafu)?;
if response.status().is_success() {
Ok(())
} else {
Err(extract_error(response).await)
}
}
pub async fn write(&self, blob_id: &str, offset: u64, buffer: Bytes) -> Result<()> {
let url = format!("{}/blob/{}", self.host, blob_id);
let request = self
.client
.put(&url)
.bearer_auth(&self.token)
.header(
header::RANGE,
&format!("bytes={}-{}", offset, offset + (buffer.len() - 1) as u64),
)
.build()
.context(RequestBuildSnafu)?;
let RedirectResponse {
location,
request_id,
} = self.request_with_redirect(request).await?;
let response = self
.client
.put(&location)
.bearer_auth(&self.token)
.header(
header::RANGE,
&format!("bytes={}-{}", offset, offset + (buffer.len() - 1) as u64),
)
.header(HeaderName::from_static("x-request-id"), request_id)
.body(buffer.clone())
.send()
.await
.context(RequestExecutionSnafu)?;
let status = response.status();
if status.is_success() {
Ok(())
} else {
Err(extract_error(response).await)
}
}
pub async fn get_meta(&self, blob_id: &str) -> Result<Option<BlobMeta>> {
let url = format!("{}/blob/{}/metadata", self.host, blob_id);
let response = self
.client
.get(&url)
.bearer_auth(&self.token)
.send()
.await
.context(RequestExecutionSnafu)?;
let resp: GetMetaResponse = extract(response).await?;
Ok(resp.meta)
}
pub async fn get_file(&self, blob_id: &str) -> Result<impl Stream<Item = Result<Bytes>>> {
let url = format!("{}/blob/{}", self.host, blob_id);
let redirect_request = self
.client
.get(&url)
.bearer_auth(&self.token)
.build()
.context(RequestBuildSnafu)?;
let RedirectResponse {
location,
request_id,
} = self.request_with_redirect(redirect_request).await?;
let response = self
.client
.get(&location)
.bearer_auth(&self.token)
.header(HeaderName::from_static("x-request-id"), request_id)
.send()
.await
.context(RequestExecutionSnafu)?;
if response.status().is_success() {
Ok(response
.bytes_stream()
.map_err(|e| ClientError::UnknownError {
message: e.to_string(),
}))
} else {
Err(extract_error(response).await)
}
}
pub async fn read_range(&self, blob_id: &str, range: (u64, u64)) -> Result<Vec<u8>> {
let url = format!("{}/blob/{}", self.host, blob_id);
let request = self
.client
.get(&url)
.bearer_auth(&self.token)
.header(header::RANGE, &format!("bytes={}-{}", range.0, range.1))
.build()
.context(RequestBuildSnafu)?;
let RedirectResponse {
location,
request_id,
} = self.request_with_redirect(request).await?;
let response = self
.client
.get(&location)
.header(header::RANGE, &format!("bytes={}-{}", range.0, range.1))
.header(HeaderName::from_static("x-request-id"), request_id)
.bearer_auth(&self.token)
.send()
.await
.context(RequestExecutionSnafu)?;
let status = response.status();
if status.is_success() {
let resp_bytes = response.bytes().await.context(FetchBodySnafu)?;
Ok(resp_bytes.to_vec())
} else {
Err(extract_error(response).await)
}
}
pub async fn query(&self, query: Query) -> Result<QueryResponse> {
let url = format!("{}/query", self.host);
let response = self
.client
.post(&url)
.bearer_auth(&self.token)
.json(&query)
.send()
.await
.context(RequestExecutionSnafu)?;
extract(response).await
}
pub async fn delete(&self, blob_id: String) -> Result<()> {
let url = format!("{}/blob/{}", self.host, blob_id);
let request = self
.client
.delete(&url)
.bearer_auth(&self.token)
.build()
.context(RequestBuildSnafu)?;
let RedirectResponse {
location,
request_id,
} = self.request_with_redirect(request).await?;
let response = self
.client
.delete(&location)
.bearer_auth(&self.token)
.header(HeaderName::from_static("x-request-id"), request_id)
.send()
.await
.context(RequestExecutionSnafu)?;
let status = response.status();
if status.is_success() {
Ok(())
} else {
Err(extract_error(response).await)
}
}
pub async fn get_routing_config(&self) -> Result<Option<RoutingConfig>> {
let url = format!("{}/routing", self.host);
let response = self
.client
.get(&url)
.bearer_auth(&self.token)
.send()
.await
.context(RequestExecutionSnafu)?;
let response: GetRoutingConfigResponse = extract(response).await?;
Ok(response.routing_config)
}
pub async fn set_routing_config(&self, routing_config: &RoutingConfig) -> Result<()> {
let url = format!("{}/routing", self.host);
let response = self
.client
.put(&url)
.bearer_auth(&self.token)
.json(&SetRoutingConfigRequest {
routing_config: routing_config.clone(),
})
.send()
.await
.context(RequestExecutionSnafu)?;
if response.status().is_success() {
Ok(())
} else {
Err(extract_error(response).await)
}
}
pub async fn delete_routing_config(&self) -> Result<()> {
let url = format!("{}/routing", self.host);
let response = self
.client
.delete(&url)
.bearer_auth(&self.token)
.send()
.await
.context(RequestExecutionSnafu)?;
if response.status().is_success() {
Ok(())
} else {
Err(extract_error(response).await)
}
}
}