use futures::future::{Future, IntoFuture};
use futures::stream::{self, Stream};
use header::Trailer;
use read::{JsonLineDecoder, LineDecoder, StreamReader};
use request::{self, ApiRequest};
use response::{self, Error, ErrorKind};
use hyper::{self, Chunk, Request, Response, Uri, Method, StatusCode};
use hyper::client::{Client, Config, HttpConnector};
use hyper_multipart::client::multipart;
use serde::{Deserialize, Serialize};
use serde_json;
use std::io::Read;
use tokio_core::reactor::Handle;
use tokio_io::codec::{Decoder, FramedRead};
type AsyncResponse<T> = Box<Future<Item = T, Error = Error>>;
type AsyncStreamResponse<T> = Box<Stream<Item = T, Error = Error>>;
pub struct IpfsClient {
base: Uri,
client: Client<HttpConnector, multipart::Body>,
}
impl IpfsClient {
#[inline]
pub fn new(
handle: &Handle,
host: &str,
port: u16,
) -> Result<IpfsClient, hyper::error::UriError> {
let base_path = IpfsClient::build_base_path(host, port)?;
Ok(IpfsClient {
base: base_path,
client: Config::default()
.body::<multipart::Body>()
.keep_alive(true)
.build(handle),
})
}
pub fn default(handle: &Handle) -> IpfsClient {
IpfsClient::new(handle, "localhost", 5001).unwrap()
}
fn build_base_path(host: &str, port: u16) -> Result<Uri, hyper::error::UriError> {
format!("http://{}:{}/api/v0", host, port).parse()
}
fn build_base_request<Req>(
&self,
req: &Req,
form: Option<multipart::Form>,
) -> Result<Request<multipart::Body>, Error>
where
Req: ApiRequest + Serialize,
{
let url = format!(
"{}{}?{}",
self.base,
Req::path(),
::serde_urlencoded::to_string(req)?
);
url.parse::<Uri>()
.map(move |url| {
let mut req = Request::new(Method::Get, url);
if let Some(form) = form {
form.set_body(&mut req);
}
req
})
.map_err(From::from)
}
#[inline]
fn build_error_from_body(chunk: Chunk) -> Error {
match serde_json::from_slice(&chunk) {
Ok(e) => ErrorKind::Api(e).into(),
Err(_) => {
match String::from_utf8(chunk.to_vec()) {
Ok(s) => ErrorKind::Uncategorized(s).into(),
Err(e) => e.into(),
}
}
}
}
fn process_json_response<Res>(status: StatusCode, chunk: Chunk) -> Result<Res, Error>
where
for<'de> Res: 'static + Deserialize<'de>,
{
match status {
StatusCode::Ok => serde_json::from_slice(&chunk).map_err(From::from),
_ => Err(Self::build_error_from_body(chunk)),
}
}
fn process_stream_response<D, Res>(
res: Response,
decoder: D,
) -> Box<Stream<Item = Res, Error = Error>>
where
D: 'static + Decoder<Item = Res, Error = Error>,
Res: 'static,
{
let stream = FramedRead::new(StreamReader::new(res.body().from_err()), decoder);
Box::new(stream)
}
fn request_raw<Req>(
&self,
req: &Req,
form: Option<multipart::Form>,
) -> AsyncResponse<(StatusCode, Chunk)>
where
Req: ApiRequest + Serialize,
{
match self.build_base_request(req, form) {
Ok(req) => {
let res = self.client
.request(req)
.and_then(|res| {
let status = res.status();
res.body().concat2().map(move |chunk| (status, chunk))
})
.from_err();
Box::new(res)
}
Err(e) => Box::new(Err(e).into_future()),
}
}
fn request_stream<Req, Res, F>(
&self,
req: &Req,
form: Option<multipart::Form>,
process: F,
) -> AsyncStreamResponse<Res>
where
Req: ApiRequest + Serialize,
Res: 'static,
F: 'static + Fn(hyper::Response) -> AsyncStreamResponse<Res>,
{
match self.build_base_request(req, form) {
Ok(req) => {
let res = self.client
.request(req)
.from_err()
.map(move |res| {
let stream: Box<Stream<Item = Res, Error = _>> = match res.status() {
StatusCode::Ok => process(res),
_ => Box::new(
res.body()
.concat2()
.from_err()
.and_then(|chunk| Err(Self::build_error_from_body(chunk)))
.into_stream(),
),
};
stream
})
.flatten_stream();
Box::new(res)
}
Err(e) => Box::new(stream::once(Err(e))),
}
}
fn request<Req, Res>(&self, req: &Req, form: Option<multipart::Form>) -> AsyncResponse<Res>
where
Req: ApiRequest + Serialize,
for<'de> Res: 'static + Deserialize<'de>,
{
let res = self.request_raw(req, form).and_then(|(status, chunk)| {
IpfsClient::process_json_response(status, chunk)
});
Box::new(res)
}
fn request_empty<Req>(&self, req: &Req, form: Option<multipart::Form>) -> AsyncResponse<()>
where
Req: ApiRequest + Serialize,
{
let res = self.request_raw(req, form).and_then(
|(status, chunk)| match status {
StatusCode::Ok => Ok(()),
_ => Err(Self::build_error_from_body(chunk)),
},
);
Box::new(res)
}
fn request_string<Req>(&self, req: &Req, form: Option<multipart::Form>) -> AsyncResponse<String>
where
Req: ApiRequest + Serialize,
{
let res = self.request_raw(req, form).and_then(
|(status, chunk)| match status {
StatusCode::Ok => String::from_utf8(chunk.to_vec()).map_err(From::from),
_ => Err(Self::build_error_from_body(chunk)),
},
);
Box::new(res)
}
fn request_stream_bytes<Req>(
&self,
req: &Req,
form: Option<multipart::Form>,
) -> AsyncStreamResponse<Chunk>
where
Req: ApiRequest + Serialize,
{
self.request_stream(req, form, |res| Box::new(res.body().from_err()))
}
fn request_stream_json<Req, Res>(
&self,
req: &Req,
form: Option<multipart::Form>,
) -> AsyncStreamResponse<Res>
where
Req: ApiRequest + Serialize,
for<'de> Res: 'static + Deserialize<'de>,
{
self.request_stream(req, form, |res| {
let parse_stream_error = if let Some(trailer) = res.headers().get() {
match trailer {
&Trailer::StreamError => true,
}
} else {
false
};
Box::new(IpfsClient::process_stream_response(
res,
JsonLineDecoder::new(parse_stream_error),
))
})
}
}
impl IpfsClient {
#[inline]
pub fn add<R>(&self, data: R) -> AsyncResponse<response::AddResponse>
where
R: 'static + Read + Send,
{
let mut form = multipart::Form::default();
form.add_reader("path", data);
self.request(&request::Add, Some(form))
}
#[inline]
pub fn bitswap_ledger(&self, peer: &str) -> AsyncResponse<response::BitswapLedgerResponse> {
self.request(&request::BitswapLedger { peer }, None)
}
#[inline]
pub fn bitswap_stat(&self) -> AsyncResponse<response::BitswapStatResponse> {
self.request(&request::BitswapStat, None)
}
#[inline]
pub fn bitswap_unwant(&self, key: &str) -> AsyncResponse<response::BitswapUnwantResponse> {
self.request_empty(&request::BitswapUnwant { key }, None)
}
#[inline]
pub fn bitswap_wantlist(
&self,
peer: Option<&str>,
) -> AsyncResponse<response::BitswapWantlistResponse> {
self.request(&request::BitswapWantlist { peer }, None)
}
#[inline]
pub fn block_get(&self, hash: &str) -> AsyncStreamResponse<Chunk> {
self.request_stream_bytes(&request::BlockGet { hash }, None)
}
#[inline]
pub fn block_put<R>(&self, data: R) -> AsyncResponse<response::BlockPutResponse>
where
R: 'static + Read + Send,
{
let mut form = multipart::Form::default();
form.add_reader("data", data);
self.request(&request::BlockPut, Some(form))
}
#[inline]
pub fn block_rm(&self, hash: &str) -> AsyncResponse<response::BlockRmResponse> {
self.request(&request::BlockRm { hash }, None)
}
#[inline]
pub fn block_stat(&self, hash: &str) -> AsyncResponse<response::BlockStatResponse> {
self.request(&request::BlockStat { hash }, None)
}
#[inline]
pub fn bootstrap_add_default(&self) -> AsyncResponse<response::BootstrapAddDefaultResponse> {
self.request(&request::BootstrapAddDefault, None)
}
#[inline]
pub fn bootstrap_list(&self) -> AsyncResponse<response::BootstrapListResponse> {
self.request(&request::BootstrapList, None)
}
#[inline]
pub fn bootstrap_rm_all(&self) -> AsyncResponse<response::BootstrapRmAllResponse> {
self.request(&request::BootstrapRmAll, None)
}
#[inline]
pub fn cat(&self, path: &str) -> AsyncStreamResponse<Chunk> {
self.request_stream_bytes(&request::Cat { path }, None)
}
#[inline]
pub fn commands(&self) -> AsyncResponse<response::CommandsResponse> {
self.request(&request::Commands, None)
}
#[inline]
pub fn config_edit(&self) -> AsyncResponse<response::ConfigEditResponse> {
self.request(&request::ConfigEdit, None)
}
#[inline]
pub fn config_replace<R>(&self, data: R) -> AsyncResponse<response::ConfigReplaceResponse>
where
R: 'static + Read + Send,
{
let mut form = multipart::Form::default();
form.add_reader("file", data);
self.request_empty(&request::ConfigReplace, Some(form))
}
#[inline]
pub fn config_show(&self) -> AsyncResponse<response::ConfigShowResponse> {
self.request_string(&request::ConfigShow, None)
}
#[inline]
pub fn dag_get(&self, path: &str) -> AsyncResponse<response::DagGetResponse> {
self.request(&request::DagGet { path }, None)
}
#[inline]
pub fn dht_findpeer(&self, peer: &str) -> AsyncStreamResponse<response::DhtFindPeerResponse> {
self.request_stream_json(&request::DhtFindPeer { peer }, None)
}
#[inline]
pub fn dht_findprovs(&self, key: &str) -> AsyncStreamResponse<response::DhtFindProvsResponse> {
self.request_stream_json(&request::DhtFindProvs { key }, None)
}
#[inline]
pub fn dht_get(&self, key: &str) -> AsyncStreamResponse<response::DhtGetResponse> {
self.request_stream_json(&request::DhtGet { key }, None)
}
#[inline]
pub fn dht_provide(&self, key: &str) -> AsyncStreamResponse<response::DhtProvideResponse> {
self.request_stream_json(&request::DhtProvide { key }, None)
}
#[inline]
pub fn dht_put(&self, key: &str, value: &str) -> AsyncStreamResponse<response::DhtPutResponse> {
self.request_stream_json(&request::DhtPut { key, value }, None)
}
#[inline]
pub fn dht_query(&self, peer: &str) -> AsyncStreamResponse<response::DhtQueryResponse> {
self.request_stream_json(&request::DhtQuery { peer }, None)
}
#[inline]
pub fn diag_cmds_clear(&self) -> AsyncResponse<response::DiagCmdsClearResponse> {
self.request_empty(&request::DiagCmdsClear, None)
}
#[inline]
pub fn diag_cmds_set_time(
&self,
time: &str,
) -> AsyncResponse<response::DiagCmdsSetTimeResponse> {
self.request_empty(&request::DiagCmdsSetTime { time }, None)
}
#[inline]
pub fn diag_sys(&self) -> AsyncResponse<response::DiagSysResponse> {
self.request_string(&request::DiagSys, None)
}
#[inline]
pub fn dns(&self, link: &str, recursive: bool) -> AsyncResponse<response::DnsResponse> {
self.request(&request::Dns { link, recursive }, None)
}
#[inline]
pub fn file_ls(&self, path: &str) -> AsyncResponse<response::FileLsResponse> {
self.request(&request::FileLs { path }, None)
}
#[inline]
pub fn files_cp(&self, path: &str, dest: &str) -> AsyncResponse<response::FilesCpResponse> {
self.request_empty(&request::FilesCp { path, dest }, None)
}
#[inline]
pub fn files_flush(&self, path: &Option<&str>) -> AsyncResponse<response::FilesFlushResponse> {
self.request_empty(&request::FilesFlush { path }, None)
}
#[inline]
pub fn files_ls(&self, path: &Option<&str>) -> AsyncResponse<response::FilesLsResponse> {
self.request(&request::FilesLs { path }, None)
}
#[inline]
pub fn files_mkdir(
&self,
path: &str,
parents: bool,
) -> AsyncResponse<response::FilesMkdirResponse> {
self.request_empty(&request::FilesMkdir { path, parents }, None)
}
#[inline]
pub fn files_mv(&self, path: &str, dest: &str) -> AsyncResponse<response::FilesMvResponse> {
self.request_empty(&request::FilesMv { path, dest }, None)
}
#[inline]
pub fn files_read(&self, path: &str) -> AsyncStreamResponse<Chunk> {
self.request_stream_bytes(&request::FilesRead { path }, None)
}
#[inline]
pub fn files_rm(
&self,
path: &str,
recursive: bool,
) -> AsyncResponse<response::FilesRmResponse> {
self.request_empty(&request::FilesRm { path, recursive }, None)
}
#[inline]
pub fn files_stat(&self, path: &str) -> AsyncResponse<response::FilesStatResponse> {
self.request(&request::FilesStat { path }, None)
}
#[inline]
pub fn files_write<R>(
&self,
path: &str,
create: bool,
truncate: bool,
data: R,
) -> AsyncResponse<response::FilesWriteResponse>
where
R: 'static + Read + Send,
{
let mut form = multipart::Form::default();
form.add_reader("data", data);
self.request_empty(
&request::FilesWrite {
path,
create,
truncate,
},
Some(form),
)
}
#[inline]
pub fn filestore_dups(&self) -> AsyncStreamResponse<response::FilestoreDupsResponse> {
self.request_stream_json(&request::FilestoreDups, None)
}
#[inline]
pub fn filestore_ls(
&self,
cid: &Option<&str>,
) -> AsyncStreamResponse<response::FilestoreLsResponse> {
self.request_stream_json(&request::FilestoreLs { cid }, None)
}
#[inline]
pub fn filestore_verify(
&self,
cid: &Option<&str>,
) -> AsyncStreamResponse<response::FilestoreVerifyResponse> {
self.request_stream_json(&request::FilestoreVerify { cid }, None)
}
#[inline]
pub fn get(&self, path: &str) -> AsyncStreamResponse<Chunk> {
self.request_stream_bytes(&request::Get { path }, None)
}
#[inline]
pub fn id(&self, peer: &Option<&str>) -> AsyncResponse<response::IdResponse> {
self.request(&request::Id { peer }, None)
}
#[inline]
pub fn key_gen(
&self,
name: &str,
kind: request::KeyType,
size: &Option<i32>,
) -> AsyncResponse<response::KeyGenResponse> {
self.request(&request::KeyGen { name, kind, size }, None)
}
#[inline]
pub fn key_list(&self) -> AsyncResponse<response::KeyListResponse> {
self.request(&request::KeyList, None)
}
#[inline]
pub fn log_level(
&self,
logger: request::Logger,
level: request::LoggingLevel,
) -> AsyncResponse<response::LogLevelResponse> {
self.request(&request::LogLevel { logger, level }, None)
}
#[inline]
pub fn log_ls(&self) -> AsyncResponse<response::LogLsResponse> {
self.request(&request::LogLs, None)
}
pub fn log_tail(&self) -> AsyncStreamResponse<String> {
let res = self.build_base_request(&request::LogTail, None)
.map(|req| self.client.request(req).from_err())
.into_future()
.flatten()
.map(|res| IpfsClient::process_stream_response(res, LineDecoder))
.flatten_stream();
Box::new(res)
}
#[inline]
pub fn ls(&self, path: &Option<&str>) -> AsyncResponse<response::LsResponse> {
self.request(&request::Ls { path }, None)
}
#[inline]
pub fn object_diff(
&self,
key0: &str,
key1: &str,
) -> AsyncResponse<response::ObjectDiffResponse> {
self.request(&request::ObjectDiff { key0, key1 }, None)
}
#[inline]
pub fn object_get(&self, key: &str) -> AsyncResponse<response::ObjectGetResponse> {
self.request(&request::ObjectGet { key }, None)
}
#[inline]
pub fn object_links(&self, key: &str) -> AsyncResponse<response::ObjectLinksResponse> {
self.request(&request::ObjectLinks { key }, None)
}
#[inline]
pub fn object_stat(&self, key: &str) -> AsyncResponse<response::ObjectStatResponse> {
self.request(&request::ObjectStat { key }, None)
}
#[inline]
pub fn pin_ls(
&self,
key: &Option<&str>,
typ: &Option<&str>,
) -> AsyncResponse<response::PinLsResponse> {
self.request(&request::PinLs { key, typ }, None)
}
#[inline]
pub fn pin_rm(&self, key: &str, recursive: bool) -> AsyncResponse<response::PinRmResponse> {
self.request(&request::PinRm { key, recursive }, None)
}
#[inline]
pub fn ping(
&self,
peer: &str,
count: &Option<usize>,
) -> AsyncStreamResponse<response::PingResponse> {
self.request_stream_json(&request::Ping { peer, count }, None)
}
#[inline]
pub fn pubsub_ls(&self) -> AsyncResponse<response::PubsubLsResponse> {
self.request(&request::PubsubLs, None)
}
#[inline]
pub fn pubsub_peers(
&self,
topic: &Option<&str>,
) -> AsyncResponse<response::PubsubPeersResponse> {
self.request(&request::PubsubPeers { topic }, None)
}
#[inline]
pub fn pubsub_pub(
&self,
topic: &str,
payload: &str,
) -> AsyncResponse<response::PubsubPubResponse> {
self.request_empty(&request::PubsubPub { topic, payload }, None)
}
#[inline]
pub fn pubsub_sub(
&self,
topic: &str,
discover: bool,
) -> AsyncStreamResponse<response::PubsubSubResponse> {
self.request_stream_json(&request::PubsubSub { topic, discover }, None)
}
#[inline]
pub fn refs_local(&self) -> AsyncStreamResponse<response::RefsLocalResponse> {
self.request_stream_json(&request::RefsLocal, None)
}
#[inline]
pub fn stats_bitswap(&self) -> AsyncResponse<response::StatsBitswapResponse> {
self.request(&request::StatsBitswap, None)
}
#[inline]
pub fn stats_bw(&self) -> AsyncResponse<response::StatsBwResponse> {
self.request(&request::StatsBw, None)
}
#[inline]
pub fn stats_repo(&self) -> AsyncResponse<response::StatsRepoResponse> {
self.request(&request::StatsRepo, None)
}
#[inline]
pub fn swarm_addrs_local(&self) -> AsyncResponse<response::SwarmAddrsLocalResponse> {
self.request(&request::SwarmAddrsLocal, None)
}
#[inline]
pub fn swarm_peers(&self) -> AsyncResponse<response::SwarmPeersResponse> {
self.request(&request::SwarmPeers, None)
}
#[inline]
pub fn tar_add<R>(&self, data: R) -> AsyncResponse<response::TarAddResponse>
where
R: 'static + Read + Send,
{
let mut form = multipart::Form::default();
form.add_reader("file", data);
self.request(&request::TarAdd, Some(form))
}
#[inline]
pub fn tar_cat(&self, path: &str) -> AsyncStreamResponse<Chunk> {
self.request_stream_bytes(&request::TarCat { path }, None)
}
#[inline]
pub fn version(&self) -> AsyncResponse<response::VersionResponse> {
self.request(&request::Version, None)
}
}