use futures::{
stream::{self, Stream},
Future, IntoFuture,
};
use header::TRAILER;
use http::uri::InvalidUri;
use hyper::{
self,
client::{Client, HttpConnector},
Chunk, Request, Response, StatusCode, Uri,
};
use hyper_multipart::client::multipart;
use read::{JsonLineDecoder, LineDecoder, StreamReader};
use request::{self, ApiRequest};
use response::{self, Error};
use serde::{Deserialize, Serialize};
use serde_json;
use std::io::Read;
use tokio_codec::{Decoder, FramedRead};
type AsyncResponse<T> = Box<Future<Item = T, Error = Error> + Send + 'static>;
type AsyncStreamResponse<T> = Box<Stream<Item = T, Error = Error> + Send + 'static>;
#[derive(Clone)]
pub struct IpfsClient {
base: Uri,
client: Client<HttpConnector, hyper::Body>,
}
impl Default for IpfsClient {
fn default() -> IpfsClient {
IpfsClient::new("localhost", 5001).unwrap()
}
}
impl IpfsClient {
#[inline]
pub fn new(host: &str, port: u16) -> Result<IpfsClient, InvalidUri> {
let base_path = IpfsClient::build_base_path(host, port)?;
Ok(IpfsClient {
base: base_path,
client: Client::builder().keep_alive(false).build_http(),
})
}
fn build_base_path(host: &str, port: u16) -> Result<Uri, InvalidUri> {
format!("http://{}:{}/api/v0", host, port).parse()
}
fn build_base_request<Req>(
&self,
req: &Req,
form: Option<multipart::Form>,
) -> Result<Request<hyper::Body>, Error>
where
Req: ApiRequest + Serialize,
{
let url = format!(
"{}{}?{}",
self.base,
Req::PATH,
::serde_urlencoded::to_string(req)?
);
url.parse::<Uri>().map_err(From::from).and_then(move |url| {
let mut builder = Request::builder();
let mut builder = builder.method(Req::METHOD.clone()).uri(url);
let req = if let Some(form) = form {
form.set_body(&mut builder)
} else {
builder.body(hyper::Body::empty())
};
req.map_err(From::from)
})
}
#[inline]
fn build_error_from_body(chunk: Chunk) -> Error {
match serde_json::from_slice(&chunk) {
Ok(e) => Error::Api(e).into(),
Err(_) => match String::from_utf8(chunk.to_vec()) {
Ok(s) => Error::Uncategorized(s).into(),
Err(e) => e.into(),
},
}
}
fn process_json_response<Res>(status: StatusCode, chunk: Chunk) -> Result<Res, Error>
where
for<'de> Res: 'static + Deserialize<'de>,
{
match status {
StatusCode::OK => serde_json::from_slice(&chunk).map_err(From::from),
_ => Err(Self::build_error_from_body(chunk)),
}
}
fn process_stream_response<D, Res>(
res: Response<hyper::Body>,
decoder: D,
) -> AsyncStreamResponse<Res>
where
D: 'static + Decoder<Item = Res, Error = Error> + Send,
Res: 'static,
{
let stream = FramedRead::new(StreamReader::new(res.into_body().from_err()), decoder);
Box::new(stream)
}
fn request_raw<Req>(
&self,
req: &Req,
form: Option<multipart::Form>,
) -> AsyncResponse<(StatusCode, Chunk)>
where
Req: ApiRequest + Serialize,
{
match self.build_base_request(req, form) {
Ok(req) => {
let res = self
.client
.request(req)
.and_then(|res| {
let status = res.status();
res.into_body().concat2().map(move |chunk| (status, chunk))
}).from_err();
Box::new(res)
}
Err(e) => Box::new(Err(e).into_future()),
}
}
fn request_stream<Req, Res, F>(
&self,
req: &Req,
form: Option<multipart::Form>,
process: F,
) -> AsyncStreamResponse<Res>
where
Req: ApiRequest + Serialize,
Res: 'static + Send,
F: 'static + Fn(hyper::Response<hyper::Body>) -> AsyncStreamResponse<Res> + Send,
{
match self.build_base_request(req, form) {
Ok(req) => {
let res = self
.client
.request(req)
.from_err()
.map(move |res| {
let stream: Box<
Stream<Item = Res, Error = _> + Send + 'static,
> = match res.status() {
StatusCode::OK => process(res),
_ => Box::new(
res.into_body()
.concat2()
.from_err()
.and_then(|chunk| Err(Self::build_error_from_body(chunk)))
.into_stream(),
),
};
stream
}).flatten_stream();
Box::new(res)
}
Err(e) => Box::new(stream::once(Err(e))),
}
}
fn request<Req, Res>(&self, req: &Req, form: Option<multipart::Form>) -> AsyncResponse<Res>
where
Req: ApiRequest + Serialize,
for<'de> Res: 'static + Deserialize<'de> + Send,
{
let res = self
.request_raw(req, form)
.and_then(|(status, chunk)| IpfsClient::process_json_response(status, chunk));
Box::new(res)
}
fn request_empty<Req>(&self, req: &Req, form: Option<multipart::Form>) -> AsyncResponse<()>
where
Req: ApiRequest + Serialize,
{
let res = self
.request_raw(req, form)
.and_then(|(status, chunk)| match status {
StatusCode::OK => Ok(()),
_ => Err(Self::build_error_from_body(chunk)),
});
Box::new(res)
}
fn request_string<Req>(&self, req: &Req, form: Option<multipart::Form>) -> AsyncResponse<String>
where
Req: ApiRequest + Serialize,
{
let res = self
.request_raw(req, form)
.and_then(|(status, chunk)| match status {
StatusCode::OK => String::from_utf8(chunk.to_vec()).map_err(From::from),
_ => Err(Self::build_error_from_body(chunk)),
});
Box::new(res)
}
fn request_stream_bytes<Req>(
&self,
req: &Req,
form: Option<multipart::Form>,
) -> AsyncStreamResponse<Chunk>
where
Req: ApiRequest + Serialize,
{
self.request_stream(req, form, |res| Box::new(res.into_body().from_err()))
}
fn request_stream_json<Req, Res>(
&self,
req: &Req,
form: Option<multipart::Form>,
) -> AsyncStreamResponse<Res>
where
Req: ApiRequest + Serialize,
for<'de> Res: 'static + Deserialize<'de> + Send,
{
self.request_stream(req, form, |res| {
let parse_stream_error = if let Some(trailer) = res.headers().get(TRAILER) {
if trailer == "X-Stream-Error" {
true
} else {
let err = Error::UnrecognizedTrailerHeader(
String::from_utf8_lossy(trailer.as_ref()).into(),
);
return Box::new(stream::once(Err(err.into())));
}
} else {
false
};
Box::new(IpfsClient::process_stream_response(
res,
JsonLineDecoder::new(parse_stream_error),
))
})
}
}
impl IpfsClient {
#[inline]
pub fn add<R>(&self, data: R) -> AsyncResponse<response::AddResponse>
where
R: 'static + Read + Send,
{
let mut form = multipart::Form::default();
form.add_reader("path", data);
self.request(&request::Add, Some(form))
}
#[inline]
pub fn bitswap_ledger(&self, peer: &str) -> AsyncResponse<response::BitswapLedgerResponse> {
self.request(&request::BitswapLedger { peer }, None)
}
#[inline]
pub fn bitswap_reprovide(&self) -> AsyncResponse<response::BitswapReprovideResponse> {
self.request_empty(&request::BitswapReprovide, None)
}
#[inline]
pub fn bitswap_stat(&self) -> AsyncResponse<response::BitswapStatResponse> {
self.request(&request::BitswapStat, None)
}
#[inline]
pub fn bitswap_unwant(&self, key: &str) -> AsyncResponse<response::BitswapUnwantResponse> {
self.request_empty(&request::BitswapUnwant { key }, None)
}
#[inline]
pub fn bitswap_wantlist(
&self,
peer: Option<&str>,
) -> AsyncResponse<response::BitswapWantlistResponse> {
self.request(&request::BitswapWantlist { peer }, None)
}
#[inline]
pub fn block_get(&self, hash: &str) -> AsyncStreamResponse<Chunk> {
self.request_stream_bytes(&request::BlockGet { hash }, None)
}
#[inline]
pub fn block_put<R>(&self, data: R) -> AsyncResponse<response::BlockPutResponse>
where
R: 'static + Read + Send,
{
let mut form = multipart::Form::default();
form.add_reader("data", data);
self.request(&request::BlockPut, Some(form))
}
#[inline]
pub fn block_rm(&self, hash: &str) -> AsyncResponse<response::BlockRmResponse> {
self.request(&request::BlockRm { hash }, None)
}
#[inline]
pub fn block_stat(&self, hash: &str) -> AsyncResponse<response::BlockStatResponse> {
self.request(&request::BlockStat { hash }, None)
}
#[inline]
pub fn bootstrap_add_default(&self) -> AsyncResponse<response::BootstrapAddDefaultResponse> {
self.request(&request::BootstrapAddDefault, None)
}
#[inline]
pub fn bootstrap_list(&self) -> AsyncResponse<response::BootstrapListResponse> {
self.request(&request::BootstrapList, None)
}
#[inline]
pub fn bootstrap_rm_all(&self) -> AsyncResponse<response::BootstrapRmAllResponse> {
self.request(&request::BootstrapRmAll, None)
}
#[inline]
pub fn cat(&self, path: &str) -> AsyncStreamResponse<Chunk> {
self.request_stream_bytes(&request::Cat { path }, None)
}
#[inline]
pub fn commands(&self) -> AsyncResponse<response::CommandsResponse> {
self.request(&request::Commands, None)
}
#[inline]
pub fn config_edit(&self) -> AsyncResponse<response::ConfigEditResponse> {
self.request(&request::ConfigEdit, None)
}
#[inline]
pub fn config_replace<R>(&self, data: R) -> AsyncResponse<response::ConfigReplaceResponse>
where
R: 'static + Read + Send,
{
let mut form = multipart::Form::default();
form.add_reader("file", data);
self.request_empty(&request::ConfigReplace, Some(form))
}
#[inline]
pub fn config_show(&self) -> AsyncResponse<response::ConfigShowResponse> {
self.request_string(&request::ConfigShow, None)
}
#[inline]
pub fn dag_get(&self, path: &str) -> AsyncResponse<response::DagGetResponse> {
self.request(&request::DagGet { path }, None)
}
#[inline]
pub fn dht_findpeer(&self, peer: &str) -> AsyncStreamResponse<response::DhtFindPeerResponse> {
self.request_stream_json(&request::DhtFindPeer { peer }, None)
}
#[inline]
pub fn dht_findprovs(&self, key: &str) -> AsyncStreamResponse<response::DhtFindProvsResponse> {
self.request_stream_json(&request::DhtFindProvs { key }, None)
}
#[inline]
pub fn dht_get(&self, key: &str) -> AsyncStreamResponse<response::DhtGetResponse> {
self.request_stream_json(&request::DhtGet { key }, None)
}
#[inline]
pub fn dht_provide(&self, key: &str) -> AsyncStreamResponse<response::DhtProvideResponse> {
self.request_stream_json(&request::DhtProvide { key }, None)
}
#[inline]
pub fn dht_put(&self, key: &str, value: &str) -> AsyncStreamResponse<response::DhtPutResponse> {
self.request_stream_json(&request::DhtPut { key, value }, None)
}
#[inline]
pub fn dht_query(&self, peer: &str) -> AsyncStreamResponse<response::DhtQueryResponse> {
self.request_stream_json(&request::DhtQuery { peer }, None)
}
#[inline]
pub fn diag_cmds_clear(&self) -> AsyncResponse<response::DiagCmdsClearResponse> {
self.request_empty(&request::DiagCmdsClear, None)
}
#[inline]
pub fn diag_cmds_set_time(
&self,
time: &str,
) -> AsyncResponse<response::DiagCmdsSetTimeResponse> {
self.request_empty(&request::DiagCmdsSetTime { time }, None)
}
#[inline]
pub fn diag_sys(&self) -> AsyncResponse<response::DiagSysResponse> {
self.request_string(&request::DiagSys, None)
}
#[inline]
pub fn dns(&self, link: &str, recursive: bool) -> AsyncResponse<response::DnsResponse> {
self.request(&request::Dns { link, recursive }, None)
}
#[inline]
pub fn file_ls(&self, path: &str) -> AsyncResponse<response::FileLsResponse> {
self.request(&request::FileLs { path }, None)
}
#[inline]
pub fn files_cp(&self, path: &str, dest: &str) -> AsyncResponse<response::FilesCpResponse> {
self.request_empty(&request::FilesCp { path, dest }, None)
}
#[inline]
pub fn files_flush(&self, path: Option<&str>) -> AsyncResponse<response::FilesFlushResponse> {
self.request_empty(&request::FilesFlush { path }, None)
}
#[inline]
pub fn files_ls(&self, path: Option<&str>) -> AsyncResponse<response::FilesLsResponse> {
self.request(&request::FilesLs { path }, None)
}
#[inline]
pub fn files_mkdir(
&self,
path: &str,
parents: bool,
) -> AsyncResponse<response::FilesMkdirResponse> {
self.request_empty(&request::FilesMkdir { path, parents }, None)
}
#[inline]
pub fn files_mv(&self, path: &str, dest: &str) -> AsyncResponse<response::FilesMvResponse> {
self.request_empty(&request::FilesMv { path, dest }, None)
}
#[inline]
pub fn files_read(&self, path: &str) -> AsyncStreamResponse<Chunk> {
self.request_stream_bytes(&request::FilesRead { path }, None)
}
#[inline]
pub fn files_rm(
&self,
path: &str,
recursive: bool,
) -> AsyncResponse<response::FilesRmResponse> {
self.request_empty(&request::FilesRm { path, recursive }, None)
}
#[inline]
pub fn files_stat(&self, path: &str) -> AsyncResponse<response::FilesStatResponse> {
self.request(&request::FilesStat { path }, None)
}
#[inline]
pub fn files_write<R>(
&self,
path: &str,
create: bool,
truncate: bool,
data: R,
) -> AsyncResponse<response::FilesWriteResponse>
where
R: 'static + Read + Send,
{
let mut form = multipart::Form::default();
form.add_reader("data", data);
self.request_empty(
&request::FilesWrite {
path,
create,
truncate,
},
Some(form),
)
}
#[inline]
pub fn filestore_dups(&self) -> AsyncStreamResponse<response::FilestoreDupsResponse> {
self.request_stream_json(&request::FilestoreDups, None)
}
#[inline]
pub fn filestore_ls(
&self,
cid: Option<&str>,
) -> AsyncStreamResponse<response::FilestoreLsResponse> {
self.request_stream_json(&request::FilestoreLs { cid }, None)
}
#[inline]
pub fn filestore_verify(
&self,
cid: Option<&str>,
) -> AsyncStreamResponse<response::FilestoreVerifyResponse> {
self.request_stream_json(&request::FilestoreVerify { cid }, None)
}
#[inline]
pub fn get(&self, path: &str) -> AsyncStreamResponse<Chunk> {
self.request_stream_bytes(&request::Get { path }, None)
}
#[inline]
pub fn id(&self, peer: Option<&str>) -> AsyncResponse<response::IdResponse> {
self.request(&request::Id { peer }, None)
}
#[inline]
pub fn key_gen(
&self,
name: &str,
kind: request::KeyType,
size: i32,
) -> AsyncResponse<response::KeyGenResponse> {
self.request(&request::KeyGen { name, kind, size }, None)
}
#[inline]
pub fn key_list(&self) -> AsyncResponse<response::KeyListResponse> {
self.request(&request::KeyList, None)
}
#[inline]
pub fn key_rename(
&self,
name: &str,
new: &str,
force: bool,
) -> AsyncResponse<response::KeyRenameResponse> {
self.request(&request::KeyRename { name, new, force }, None)
}
#[inline]
pub fn key_rm(&self, name: &str) -> AsyncResponse<response::KeyRmResponse> {
self.request(&request::KeyRm { name }, None)
}
#[inline]
pub fn log_level(
&self,
logger: request::Logger,
level: request::LoggingLevel,
) -> AsyncResponse<response::LogLevelResponse> {
self.request(&request::LogLevel { logger, level }, None)
}
#[inline]
pub fn log_ls(&self) -> AsyncResponse<response::LogLsResponse> {
self.request(&request::LogLs, None)
}
pub fn log_tail(&self) -> AsyncStreamResponse<String> {
let res = self
.build_base_request(&request::LogTail, None)
.map(|req| self.client.request(req).from_err())
.into_future()
.flatten()
.map(|res| IpfsClient::process_stream_response(res, LineDecoder))
.flatten_stream();
Box::new(res)
}
#[inline]
pub fn ls(&self, path: Option<&str>) -> AsyncResponse<response::LsResponse> {
self.request(&request::Ls { path }, None)
}
pub fn name_publish(
&self,
path: &str,
resolve: bool,
lifetime: Option<&str>,
ttl: Option<&str>,
key: Option<&str>,
) -> AsyncResponse<response::NamePublishResponse> {
self.request(
&request::NamePublish {
path,
resolve,
lifetime,
ttl,
key,
},
None,
)
}
pub fn name_resolve(
&self,
name: Option<&str>,
recursive: bool,
nocache: bool,
) -> AsyncResponse<response::NameResolveResponse> {
self.request(
&request::NameResolve {
name,
recursive,
nocache,
},
None,
)
}
#[inline]
pub fn object_data(&self, key: &str) -> AsyncStreamResponse<Chunk> {
self.request_stream_bytes(&request::ObjectData { key }, None)
}
#[inline]
pub fn object_diff(
&self,
key0: &str,
key1: &str,
) -> AsyncResponse<response::ObjectDiffResponse> {
self.request(&request::ObjectDiff { key0, key1 }, None)
}
#[inline]
pub fn object_get(&self, key: &str) -> AsyncResponse<response::ObjectGetResponse> {
self.request(&request::ObjectGet { key }, None)
}
#[inline]
pub fn object_links(&self, key: &str) -> AsyncResponse<response::ObjectLinksResponse> {
self.request(&request::ObjectLinks { key }, None)
}
#[inline]
pub fn object_new(
&self,
template: Option<request::ObjectTemplate>,
) -> AsyncResponse<response::ObjectNewResponse> {
self.request(&request::ObjectNew { template }, None)
}
#[inline]
pub fn object_stat(&self, key: &str) -> AsyncResponse<response::ObjectStatResponse> {
self.request(&request::ObjectStat { key }, None)
}
#[inline]
pub fn pin_add(&self, key: &str, recursive: bool) -> AsyncResponse<response::PinAddResponse> {
self.request(
&request::PinAdd {
key,
recursive: Some(recursive),
progress: false,
},
None,
)
}
#[inline]
pub fn pin_ls(
&self,
key: Option<&str>,
typ: Option<&str>,
) -> AsyncResponse<response::PinLsResponse> {
self.request(&request::PinLs { key, typ }, None)
}
#[inline]
pub fn pin_rm(&self, key: &str, recursive: bool) -> AsyncResponse<response::PinRmResponse> {
self.request(&request::PinRm { key, recursive }, None)
}
#[inline]
pub fn ping(
&self,
peer: &str,
count: Option<i32>,
) -> AsyncStreamResponse<response::PingResponse> {
self.request_stream_json(&request::Ping { peer, count }, None)
}
#[inline]
pub fn pubsub_ls(&self) -> AsyncResponse<response::PubsubLsResponse> {
self.request(&request::PubsubLs, None)
}
#[inline]
pub fn pubsub_peers(
&self,
topic: Option<&str>,
) -> AsyncResponse<response::PubsubPeersResponse> {
self.request(&request::PubsubPeers { topic }, None)
}
#[inline]
pub fn pubsub_pub(
&self,
topic: &str,
payload: &str,
) -> AsyncResponse<response::PubsubPubResponse> {
self.request_empty(&request::PubsubPub { topic, payload }, None)
}
#[inline]
pub fn pubsub_sub(
&self,
topic: &str,
discover: bool,
) -> AsyncStreamResponse<response::PubsubSubResponse> {
self.request_stream_json(&request::PubsubSub { topic, discover }, None)
}
#[inline]
pub fn refs_local(&self) -> AsyncStreamResponse<response::RefsLocalResponse> {
self.request_stream_json(&request::RefsLocal, None)
}
pub fn shutdown(&self) -> AsyncResponse<response::ShutdownResponse> {
self.request_empty(&request::Shutdown, None)
}
#[inline]
pub fn stats_bitswap(&self) -> AsyncResponse<response::StatsBitswapResponse> {
self.request(&request::StatsBitswap, None)
}
#[inline]
pub fn stats_bw(&self) -> AsyncResponse<response::StatsBwResponse> {
self.request(&request::StatsBw, None)
}
#[inline]
pub fn stats_repo(&self) -> AsyncResponse<response::StatsRepoResponse> {
self.request(&request::StatsRepo, None)
}
#[inline]
pub fn swarm_addrs_local(&self) -> AsyncResponse<response::SwarmAddrsLocalResponse> {
self.request(&request::SwarmAddrsLocal, None)
}
#[inline]
pub fn swarm_peers(&self) -> AsyncResponse<response::SwarmPeersResponse> {
self.request(&request::SwarmPeers, None)
}
#[inline]
pub fn tar_add<R>(&self, data: R) -> AsyncResponse<response::TarAddResponse>
where
R: 'static + Read + Send,
{
let mut form = multipart::Form::default();
form.add_reader("file", data);
self.request(&request::TarAdd, Some(form))
}
#[inline]
pub fn tar_cat(&self, path: &str) -> AsyncStreamResponse<Chunk> {
self.request_stream_bytes(&request::TarCat { path }, None)
}
#[inline]
pub fn version(&self) -> AsyncResponse<response::VersionResponse> {
self.request(&request::Version, None)
}
}