use crate::header::TRAILER;
#[cfg(feature = "hyper")]
use crate::hyper_multipart::client::multipart;
use crate::read::{JsonLineDecoder, LineDecoder, StreamReader};
use crate::request::{self, ApiRequest};
use crate::response::{self, Error};
#[cfg(feature = "actix")]
use actix_http::{encoding, Payload, PayloadStream};
#[cfg(feature = "actix")]
use actix_multipart::client::multipart;
use bytes::Bytes;
use futures::{
future,
stream::{self, Stream},
Future, IntoFuture,
};
use http::uri::{InvalidUri, Uri};
use http::StatusCode;
#[cfg(feature = "hyper")]
use hyper::client::{self, Builder, HttpConnector};
#[cfg(feature = "hyper")]
use hyper_tls::HttpsConnector;
use multiaddr::{AddrComponent, ToMultiaddr};
use serde::{Deserialize, Serialize};
use serde_json;
use std::{
fs,
io::Read,
net::{IpAddr, SocketAddr},
path::{Path, PathBuf},
};
use tokio_codec::{Decoder, FramedRead};
#[cfg(feature = "actix")]
type AsyncResponse<T> = Box<Future<Item = T, Error = Error> + 'static>;
#[cfg(feature = "hyper")]
type AsyncResponse<T> = Box<dyn Future<Item = T, Error = Error> + Send + 'static>;
#[cfg(feature = "actix")]
type AsyncStreamResponse<T> = Box<Stream<Item = T, Error = Error> + 'static>;
#[cfg(feature = "hyper")]
type AsyncStreamResponse<T> = Box<dyn Stream<Item = T, Error = Error> + Send + 'static>;
#[cfg(feature = "actix")]
type Request = awc::ClientRequest;
#[cfg(feature = "hyper")]
type Request = http::Request<hyper::Body>;
#[cfg(feature = "actix")]
type Response = awc::ClientResponse<encoding::Decoder<Payload<PayloadStream>>>;
#[cfg(feature = "hyper")]
type Response = http::Response<hyper::Body>;
#[cfg(feature = "actix")]
type Client = awc::Client;
#[cfg(feature = "hyper")]
type Client = client::Client<HttpsConnector<HttpConnector>, hyper::Body>;
#[derive(Clone)]
pub struct IpfsClient {
base: Uri,
client: Client,
}
impl Default for IpfsClient {
fn default() -> IpfsClient {
dirs::home_dir()
.map(|home_dir| home_dir.join(".ipfs").join("api"))
.and_then(|multiaddr_path| fs::read_to_string(&multiaddr_path).ok())
.and_then(|multiaddr_str| multiaddr_str.to_multiaddr().ok())
.and_then(|multiaddr| {
let mut addr: Option<IpAddr> = None;
let mut port: Option<u16> = None;
for addr_component in multiaddr.iter() {
match addr_component {
AddrComponent::IP4(v4addr) => addr = Some(v4addr.into()),
AddrComponent::IP6(v6addr) => addr = Some(v6addr.into()),
AddrComponent::TCP(tcpport) => port = Some(tcpport),
_ => {
return None;
}
}
}
if let (Some(addr), Some(port)) = (addr, port) {
Some(SocketAddr::new(addr, port))
} else {
None
}
})
.map(IpfsClient::from)
.unwrap_or_else(|| IpfsClient::new("localhost", 5001).unwrap())
}
}
impl From<SocketAddr> for IpfsClient {
fn from(socket_addr: SocketAddr) -> Self {
IpfsClient::new(&socket_addr.ip().to_string(), socket_addr.port()).unwrap()
}
}
impl IpfsClient {
#[inline]
pub fn new(host: &str, port: u16) -> Result<IpfsClient, InvalidUri> {
Self::new_from_uri(format!("http://{}:{}", host, port).as_str())
}
#[inline]
pub fn new_from_uri(uri: &str) -> Result<IpfsClient, InvalidUri> {
let base_path = IpfsClient::build_base_path(uri)?;
Ok(IpfsClient {
base: base_path,
#[cfg(feature = "hyper")]
client: {
let connector = HttpsConnector::new(4).unwrap();
Builder::default().keep_alive(false).build(connector)
},
#[cfg(feature = "actix")]
client: Client::default(),
})
}
fn build_base_path(uri: &str) -> Result<Uri, InvalidUri> {
format!("{}/api/v0", uri).parse()
}
fn build_base_request<Req>(
&self,
req: &Req,
form: Option<multipart::Form<'static>>,
) -> Result<Request, Error>
where
Req: ApiRequest + Serialize,
{
let url = format!(
"{}{}?{}",
self.base,
Req::PATH,
::serde_urlencoded::to_string(req)?
);
#[cfg(feature = "hyper")]
let req = url.parse::<Uri>().map_err(From::from).and_then(move |url| {
let mut builder = http::Request::builder();
let mut builder = builder.method(Req::METHOD.clone()).uri(url);
let req = if let Some(form) = form {
form.set_body_convert::<hyper::Body, multipart::Body>(&mut builder)
} else {
builder.body(hyper::Body::empty())
};
req.map_err(From::from)
});
#[cfg(feature = "actix")]
let req = if let Some(form) = form {
Ok(self
.client
.request(Req::METHOD.clone(), url)
.content_type(form.content_type()))
} else {
Ok(self.client.request(Req::METHOD.clone(), url))
};
req
}
#[inline]
fn build_error_from_body(chunk: Bytes) -> Error {
match serde_json::from_slice(&chunk) {
Ok(e) => Error::Api(e),
Err(_) => match String::from_utf8(chunk.to_vec()) {
Ok(s) => Error::Uncategorized(s),
Err(e) => e.into(),
},
}
}
fn process_json_response<Res>(status: StatusCode, chunk: Bytes) -> Result<Res, Error>
where
for<'de> Res: 'static + Deserialize<'de>,
{
match status {
StatusCode::OK => serde_json::from_slice(&chunk).map_err(From::from),
_ => Err(Self::build_error_from_body(chunk)),
}
}
fn process_stream_response<D, Res>(res: Response, decoder: D) -> AsyncStreamResponse<Res>
where
D: 'static + Decoder<Item = Res, Error = Error> + Send,
Res: 'static,
{
#[cfg(feature = "hyper")]
let stream = FramedRead::new(
StreamReader::new(res.into_body().map(|c| c.into_bytes()).from_err()),
decoder,
);
#[cfg(feature = "actix")]
let stream = FramedRead::new(StreamReader::new(res.from_err()), decoder);
Box::new(stream)
}
fn request_raw<Req>(
&self,
req: &Req,
form: Option<multipart::Form<'static>>,
) -> AsyncResponse<(StatusCode, Bytes)>
where
Req: ApiRequest + Serialize,
{
match self.build_base_request(req, form) {
Ok(req) => {
#[cfg(feature = "hyper")]
let res = self
.client
.request(req)
.and_then(|res| {
let status = res.status();
res.into_body()
.concat2()
.map(move |chunk| (status, chunk.into_bytes()))
})
.from_err();
#[cfg(feature = "actix")]
let res = req
.timeout(std::time::Duration::from_secs(90))
.send()
.from_err()
.and_then(|mut x| {
let status = x.status();
x.body().map(move |body| (status, body)).from_err()
});
Box::new(res)
}
Err(e) => Box::new(Err(e).into_future()),
}
}
fn request_stream<Req, Res, F>(
&self,
req: &Req,
form: Option<multipart::Form<'static>>,
process: F,
) -> AsyncStreamResponse<Res>
where
Req: ApiRequest + Serialize,
Res: 'static + Send,
F: 'static + Fn(Response) -> AsyncStreamResponse<Res> + Send,
{
#[cfg(feature = "hyper")]
match self.build_base_request(req, form) {
Ok(req) => {
let res = self
.client
.request(req)
.from_err()
.map(move |res| {
let stream: Box<dyn Stream<Item = Res, Error = _> + Send + 'static> =
match res.status() {
StatusCode::OK => process(res),
_ => Box::new(
res.into_body()
.concat2()
.from_err()
.and_then(|chunk| {
Err(Self::build_error_from_body(chunk.into_bytes()))
})
.into_stream(),
),
};
stream
})
.flatten_stream();
Box::new(res)
}
Err(e) => Box::new(stream::once(Err(e))),
}
#[cfg(feature = "actix")]
match self.build_base_request(req, form) {
Ok(req) => {
let res = req
.timeout(std::time::Duration::from_secs(90))
.send()
.from_err();
Box::new(res.map(process).flatten_stream())
}
Err(e) => Box::new(stream::once(Err(e))),
}
}
fn request<Req, Res>(
&self,
req: &Req,
form: Option<multipart::Form<'static>>,
) -> AsyncResponse<Res>
where
Req: ApiRequest + Serialize,
for<'de> Res: 'static + Deserialize<'de> + Send,
{
let res = self
.request_raw(req, form)
.and_then(|(status, chunk)| IpfsClient::process_json_response(status, chunk));
Box::new(res)
}
fn request_empty<Req>(
&self,
req: &Req,
form: Option<multipart::Form<'static>>,
) -> AsyncResponse<()>
where
Req: ApiRequest + Serialize,
{
let res = self
.request_raw(req, form)
.and_then(|(status, chunk)| match status {
StatusCode::OK => Ok(()),
_ => Err(Self::build_error_from_body(chunk)),
});
Box::new(res)
}
fn request_string<Req>(
&self,
req: &Req,
form: Option<multipart::Form<'static>>,
) -> AsyncResponse<String>
where
Req: ApiRequest + Serialize,
{
let res = self
.request_raw(req, form)
.and_then(|(status, chunk)| match status {
StatusCode::OK => String::from_utf8(chunk.to_vec()).map_err(From::from),
_ => Err(Self::build_error_from_body(chunk)),
});
Box::new(res)
}
fn request_stream_bytes<Req>(
&self,
req: &Req,
form: Option<multipart::Form<'static>>,
) -> AsyncStreamResponse<Bytes>
where
Req: ApiRequest + Serialize,
{
#[cfg(feature = "hyper")]
let res = self.request_stream(req, form, |res| {
Box::new(res.into_body().from_err().map(|c| c.into_bytes()))
});
#[cfg(feature = "actix")]
let res = self.request_stream(req, form, |res| Box::new(res.from_err()));
res
}
fn request_stream_json<Req, Res>(
&self,
req: &Req,
form: Option<multipart::Form<'static>>,
) -> AsyncStreamResponse<Res>
where
Req: ApiRequest + Serialize,
for<'de> Res: 'static + Deserialize<'de> + Send,
{
self.request_stream(req, form, |res| {
let parse_stream_error = if let Some(trailer) = res.headers().get(TRAILER) {
if trailer == "X-Stream-Error" {
true
} else {
let err = Error::UnrecognizedTrailerHeader(
String::from_utf8_lossy(trailer.as_ref()).into(),
);
return Box::new(stream::once(Err(err)));
}
} else {
false
};
Box::new(IpfsClient::process_stream_response(
res,
JsonLineDecoder::new(parse_stream_error),
))
})
}
}
impl IpfsClient {
#[inline]
pub fn add<R>(&self, data: R) -> AsyncResponse<response::AddResponse>
where
R: 'static + Read + Send,
{
let mut form = multipart::Form::default();
form.add_reader("path", data);
self.request(&request::Add, Some(form))
}
#[inline]
pub fn add_path<P>(&self, path: P) -> AsyncResponse<response::AddResponse>
where
P: AsRef<Path>,
{
let mut form = multipart::Form::default();
let prefix = path.as_ref().parent();
let mut paths_to_add: Vec<(PathBuf, u64)> = vec![];
for path in walkdir::WalkDir::new(path.as_ref()) {
match path {
Ok(entry) => {
if entry.file_type().is_file() {
let file_size =
entry.metadata().map(|metadata| metadata.len()).unwrap_or(0);
paths_to_add.push((entry.path().to_path_buf(), file_size));
}
}
Err(err) => {
return Box::new(future::err(Error::Io(err.into())));
}
}
}
paths_to_add.sort_unstable_by(|(_, a), (_, b)| a.cmp(b).reverse());
let mut it = 0;
const FILE_DESCRIPTOR_LIMIT: usize = 127;
for (path, file_size) in paths_to_add {
let file = std::fs::File::open(&path);
if let Err(err) = file {
return Box::new(future::err(err.into()));
}
let file_name = match prefix {
Some(prefix) => path.strip_prefix(prefix).unwrap(),
None => path.as_path(),
}
.to_string_lossy();
if it < FILE_DESCRIPTOR_LIMIT {
form.add_reader_file("path", file.unwrap(), file_name);
it += 1;
} else {
let mut buf = Vec::with_capacity(file_size as usize);
if let Err(err) = file.unwrap().read_to_end(&mut buf) {
return Box::new(future::err(err.into()));
}
form.add_reader_file("path", std::io::Cursor::new(buf), file_name);
}
}
Box::new(
self.request_stream_json(&request::Add, Some(form))
.collect()
.map(|mut responses: Vec<response::AddResponse>| responses.pop().unwrap()),
)
}
#[inline]
pub fn bitswap_ledger(&self, peer: &str) -> AsyncResponse<response::BitswapLedgerResponse> {
self.request(&request::BitswapLedger { peer }, None)
}
#[inline]
pub fn bitswap_reprovide(&self) -> AsyncResponse<response::BitswapReprovideResponse> {
self.request_empty(&request::BitswapReprovide, None)
}
#[inline]
pub fn bitswap_stat(&self) -> AsyncResponse<response::BitswapStatResponse> {
self.request(&request::BitswapStat, None)
}
#[inline]
pub fn bitswap_unwant(&self, key: &str) -> AsyncResponse<response::BitswapUnwantResponse> {
self.request_empty(&request::BitswapUnwant { key }, None)
}
#[inline]
pub fn bitswap_wantlist(
&self,
peer: Option<&str>,
) -> AsyncResponse<response::BitswapWantlistResponse> {
self.request(&request::BitswapWantlist { peer }, None)
}
#[inline]
pub fn block_get(&self, hash: &str) -> AsyncStreamResponse<Bytes> {
self.request_stream_bytes(&request::BlockGet { hash }, None)
}
#[inline]
pub fn block_put<R>(&self, data: R) -> AsyncResponse<response::BlockPutResponse>
where
R: 'static + Read + Send,
{
let mut form = multipart::Form::default();
form.add_reader("data", data);
self.request(&request::BlockPut, Some(form))
}
#[inline]
pub fn block_rm(&self, hash: &str) -> AsyncResponse<response::BlockRmResponse> {
self.request(&request::BlockRm { hash }, None)
}
#[inline]
pub fn block_stat(&self, hash: &str) -> AsyncResponse<response::BlockStatResponse> {
self.request(&request::BlockStat { hash }, None)
}
#[inline]
pub fn bootstrap_add_default(&self) -> AsyncResponse<response::BootstrapAddDefaultResponse> {
self.request(&request::BootstrapAddDefault, None)
}
#[inline]
pub fn bootstrap_list(&self) -> AsyncResponse<response::BootstrapListResponse> {
self.request(&request::BootstrapList, None)
}
#[inline]
pub fn bootstrap_rm_all(&self) -> AsyncResponse<response::BootstrapRmAllResponse> {
self.request(&request::BootstrapRmAll, None)
}
#[inline]
pub fn cat(&self, path: &str) -> AsyncStreamResponse<Bytes> {
self.request_stream_bytes(&request::Cat { path }, None)
}
#[inline]
pub fn commands(&self) -> AsyncResponse<response::CommandsResponse> {
self.request(&request::Commands, None)
}
#[inline]
pub fn config_edit(&self) -> AsyncResponse<response::ConfigEditResponse> {
self.request(&request::ConfigEdit, None)
}
#[inline]
pub fn config_replace<R>(&self, data: R) -> AsyncResponse<response::ConfigReplaceResponse>
where
R: 'static + Read + Send,
{
let mut form = multipart::Form::default();
form.add_reader("file", data);
self.request_empty(&request::ConfigReplace, Some(form))
}
#[inline]
pub fn config_show(&self) -> AsyncResponse<response::ConfigShowResponse> {
self.request_string(&request::ConfigShow, None)
}
#[inline]
pub fn dag_get(&self, path: &str) -> AsyncResponse<response::DagGetResponse> {
self.request(&request::DagGet { path }, None)
}
#[inline]
pub fn dht_findpeer(&self, peer: &str) -> AsyncStreamResponse<response::DhtFindPeerResponse> {
self.request_stream_json(&request::DhtFindPeer { peer }, None)
}
#[inline]
pub fn dht_findprovs(&self, key: &str) -> AsyncStreamResponse<response::DhtFindProvsResponse> {
self.request_stream_json(&request::DhtFindProvs { key }, None)
}
#[inline]
pub fn dht_get(&self, key: &str) -> AsyncStreamResponse<response::DhtGetResponse> {
self.request_stream_json(&request::DhtGet { key }, None)
}
#[inline]
pub fn dht_provide(&self, key: &str) -> AsyncStreamResponse<response::DhtProvideResponse> {
self.request_stream_json(&request::DhtProvide { key }, None)
}
#[inline]
pub fn dht_put(&self, key: &str, value: &str) -> AsyncStreamResponse<response::DhtPutResponse> {
self.request_stream_json(&request::DhtPut { key, value }, None)
}
#[inline]
pub fn dht_query(&self, peer: &str) -> AsyncStreamResponse<response::DhtQueryResponse> {
self.request_stream_json(&request::DhtQuery { peer }, None)
}
#[inline]
pub fn diag_cmds_clear(&self) -> AsyncResponse<response::DiagCmdsClearResponse> {
self.request_empty(&request::DiagCmdsClear, None)
}
#[inline]
pub fn diag_cmds_set_time(
&self,
time: &str,
) -> AsyncResponse<response::DiagCmdsSetTimeResponse> {
self.request_empty(&request::DiagCmdsSetTime { time }, None)
}
#[inline]
pub fn diag_sys(&self) -> AsyncResponse<response::DiagSysResponse> {
self.request_string(&request::DiagSys, None)
}
#[inline]
pub fn dns(&self, link: &str, recursive: bool) -> AsyncResponse<response::DnsResponse> {
self.request(&request::Dns { link, recursive }, None)
}
#[inline]
pub fn file_ls(&self, path: &str) -> AsyncResponse<response::FileLsResponse> {
self.request(&request::FileLs { path }, None)
}
#[inline]
pub fn files_cp(&self, path: &str, dest: &str) -> AsyncResponse<response::FilesCpResponse> {
self.request_empty(&request::FilesCp { path, dest }, None)
}
#[inline]
pub fn files_flush(&self, path: Option<&str>) -> AsyncResponse<response::FilesFlushResponse> {
self.request_empty(&request::FilesFlush { path }, None)
}
#[inline]
pub fn files_ls(&self, path: Option<&str>) -> AsyncResponse<response::FilesLsResponse> {
self.request(&request::FilesLs { path }, None)
}
#[inline]
pub fn files_mkdir(
&self,
path: &str,
parents: bool,
) -> AsyncResponse<response::FilesMkdirResponse> {
self.request_empty(&request::FilesMkdir { path, parents }, None)
}
#[inline]
pub fn files_mv(&self, path: &str, dest: &str) -> AsyncResponse<response::FilesMvResponse> {
self.request_empty(&request::FilesMv { path, dest }, None)
}
#[inline]
pub fn files_read(&self, path: &str) -> AsyncStreamResponse<Bytes> {
self.request_stream_bytes(&request::FilesRead { path }, None)
}
#[inline]
pub fn files_rm(
&self,
path: &str,
recursive: bool,
) -> AsyncResponse<response::FilesRmResponse> {
self.request_empty(&request::FilesRm { path, recursive }, None)
}
#[inline]
pub fn files_stat(&self, path: &str) -> AsyncResponse<response::FilesStatResponse> {
self.request(&request::FilesStat { path }, None)
}
#[inline]
pub fn files_write<R>(
&self,
path: &str,
create: bool,
truncate: bool,
data: R,
) -> AsyncResponse<response::FilesWriteResponse>
where
R: 'static + Read + Send,
{
let mut form = multipart::Form::default();
form.add_reader("data", data);
self.request_empty(
&request::FilesWrite {
path,
create,
truncate,
},
Some(form),
)
}
#[inline]
pub fn filestore_dups(&self) -> AsyncStreamResponse<response::FilestoreDupsResponse> {
self.request_stream_json(&request::FilestoreDups, None)
}
#[inline]
pub fn filestore_ls(
&self,
cid: Option<&str>,
) -> AsyncStreamResponse<response::FilestoreLsResponse> {
self.request_stream_json(&request::FilestoreLs { cid }, None)
}
#[inline]
pub fn filestore_verify(
&self,
cid: Option<&str>,
) -> AsyncStreamResponse<response::FilestoreVerifyResponse> {
self.request_stream_json(&request::FilestoreVerify { cid }, None)
}
#[inline]
pub fn get(&self, path: &str) -> AsyncStreamResponse<Bytes> {
self.request_stream_bytes(&request::Get { path }, None)
}
#[inline]
pub fn id(&self, peer: Option<&str>) -> AsyncResponse<response::IdResponse> {
self.request(&request::Id { peer }, None)
}
#[inline]
pub fn key_gen(
&self,
name: &str,
kind: request::KeyType,
size: i32,
) -> AsyncResponse<response::KeyGenResponse> {
self.request(&request::KeyGen { name, kind, size }, None)
}
#[inline]
pub fn key_list(&self) -> AsyncResponse<response::KeyListResponse> {
self.request(&request::KeyList, None)
}
#[inline]
pub fn key_rename(
&self,
name: &str,
new: &str,
force: bool,
) -> AsyncResponse<response::KeyRenameResponse> {
self.request(&request::KeyRename { name, new, force }, None)
}
#[inline]
pub fn key_rm(&self, name: &str) -> AsyncResponse<response::KeyRmResponse> {
self.request(&request::KeyRm { name }, None)
}
#[inline]
pub fn log_level(
&self,
logger: request::Logger,
level: request::LoggingLevel,
) -> AsyncResponse<response::LogLevelResponse> {
self.request(&request::LogLevel { logger, level }, None)
}
#[inline]
pub fn log_ls(&self) -> AsyncResponse<response::LogLsResponse> {
self.request(&request::LogLs, None)
}
pub fn log_tail(&self) -> AsyncStreamResponse<String> {
#[cfg(feature = "hyper")]
let res = self
.build_base_request(&request::LogTail, None)
.map(|req| self.client.request(req).from_err())
.into_future()
.flatten()
.map(|res| IpfsClient::process_stream_response(res, LineDecoder))
.flatten_stream();
#[cfg(feature = "actix")]
let res = self
.build_base_request(&request::LogTail, None)
.into_future()
.and_then(|req| {
req.timeout(std::time::Duration::from_secs(90))
.send()
.from_err()
})
.map(|res| IpfsClient::process_stream_response(res, LineDecoder))
.flatten_stream();
Box::new(res)
}
#[inline]
pub fn ls(&self, path: Option<&str>) -> AsyncResponse<response::LsResponse> {
self.request(&request::Ls { path }, None)
}
pub fn name_publish(
&self,
path: &str,
resolve: bool,
lifetime: Option<&str>,
ttl: Option<&str>,
key: Option<&str>,
) -> AsyncResponse<response::NamePublishResponse> {
self.request(
&request::NamePublish {
path,
resolve,
lifetime,
ttl,
key,
},
None,
)
}
pub fn name_resolve(
&self,
name: Option<&str>,
recursive: bool,
nocache: bool,
) -> AsyncResponse<response::NameResolveResponse> {
self.request(
&request::NameResolve {
name,
recursive,
nocache,
},
None,
)
}
#[inline]
pub fn object_data(&self, key: &str) -> AsyncStreamResponse<Bytes> {
self.request_stream_bytes(&request::ObjectData { key }, None)
}
#[inline]
pub fn object_diff(
&self,
key0: &str,
key1: &str,
) -> AsyncResponse<response::ObjectDiffResponse> {
self.request(&request::ObjectDiff { key0, key1 }, None)
}
#[inline]
pub fn object_get(&self, key: &str) -> AsyncResponse<response::ObjectGetResponse> {
self.request(&request::ObjectGet { key }, None)
}
#[inline]
pub fn object_links(&self, key: &str) -> AsyncResponse<response::ObjectLinksResponse> {
self.request(&request::ObjectLinks { key }, None)
}
#[inline]
pub fn object_new(
&self,
template: Option<request::ObjectTemplate>,
) -> AsyncResponse<response::ObjectNewResponse> {
self.request(&request::ObjectNew { template }, None)
}
#[inline]
pub fn object_stat(&self, key: &str) -> AsyncResponse<response::ObjectStatResponse> {
self.request(&request::ObjectStat { key }, None)
}
#[inline]
pub fn pin_add(&self, key: &str, recursive: bool) -> AsyncResponse<response::PinAddResponse> {
self.request(
&request::PinAdd {
key,
recursive: Some(recursive),
progress: false,
},
None,
)
}
#[inline]
pub fn pin_ls(
&self,
key: Option<&str>,
typ: Option<&str>,
) -> AsyncResponse<response::PinLsResponse> {
self.request(&request::PinLs { key, typ }, None)
}
#[inline]
pub fn pin_rm(&self, key: &str, recursive: bool) -> AsyncResponse<response::PinRmResponse> {
self.request(&request::PinRm { key, recursive }, None)
}
#[inline]
pub fn ping(
&self,
peer: &str,
count: Option<i32>,
) -> AsyncStreamResponse<response::PingResponse> {
self.request_stream_json(&request::Ping { peer, count }, None)
}
#[inline]
pub fn pubsub_ls(&self) -> AsyncResponse<response::PubsubLsResponse> {
self.request(&request::PubsubLs, None)
}
#[inline]
pub fn pubsub_peers(
&self,
topic: Option<&str>,
) -> AsyncResponse<response::PubsubPeersResponse> {
self.request(&request::PubsubPeers { topic }, None)
}
#[inline]
pub fn pubsub_pub(
&self,
topic: &str,
payload: &str,
) -> AsyncResponse<response::PubsubPubResponse> {
self.request_empty(&request::PubsubPub { topic, payload }, None)
}
#[inline]
pub fn pubsub_sub(
&self,
topic: &str,
discover: bool,
) -> AsyncStreamResponse<response::PubsubSubResponse> {
self.request_stream_json(&request::PubsubSub { topic, discover }, None)
}
#[inline]
pub fn refs_local(&self) -> AsyncStreamResponse<response::RefsLocalResponse> {
self.request_stream_json(&request::RefsLocal, None)
}
pub fn shutdown(&self) -> AsyncResponse<response::ShutdownResponse> {
self.request_empty(&request::Shutdown, None)
}
#[inline]
pub fn stats_bitswap(&self) -> AsyncResponse<response::StatsBitswapResponse> {
self.request(&request::StatsBitswap, None)
}
#[inline]
pub fn stats_bw(&self) -> AsyncResponse<response::StatsBwResponse> {
self.request(&request::StatsBw, None)
}
#[inline]
pub fn stats_repo(&self) -> AsyncResponse<response::StatsRepoResponse> {
self.request(&request::StatsRepo, None)
}
#[inline]
pub fn swarm_addrs_local(&self) -> AsyncResponse<response::SwarmAddrsLocalResponse> {
self.request(&request::SwarmAddrsLocal, None)
}
#[inline]
pub fn swarm_peers(&self) -> AsyncResponse<response::SwarmPeersResponse> {
self.request(&request::SwarmPeers, None)
}
#[inline]
pub fn tar_add<R>(&self, data: R) -> AsyncResponse<response::TarAddResponse>
where
R: 'static + Read + Send,
{
let mut form = multipart::Form::default();
form.add_reader("file", data);
self.request(&request::TarAdd, Some(form))
}
#[inline]
pub fn tar_cat(&self, path: &str) -> AsyncStreamResponse<Bytes> {
self.request_stream_bytes(&request::TarCat { path }, None)
}
#[inline]
pub fn version(&self) -> AsyncResponse<response::VersionResponse> {
self.request(&request::Version, None)
}
}