use crate::{
header::TRAILER,
read::{JsonLineDecoder, LineDecoder, StreamReader},
request::{self, ApiRequest},
response::{self, Error},
Client, Request, Response,
};
#[cfg(feature = "actix")]
use actix_multipart::client::multipart;
use bytes::Bytes;
use futures::{future, FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt};
use http::{
uri::{InvalidUri, Uri},
StatusCode,
};
#[cfg(feature = "hyper")]
use hyper::{body, client::Builder};
#[cfg(feature = "hyper")]
use hyper_multipart::client::multipart;
#[cfg(feature = "hyper")]
use hyper_tls::HttpsConnector;
use parity_multiaddr::Protocol;
use serde::{Deserialize, Serialize};
use serde_json;
use std::{
fs,
io::{Cursor, Read},
net::{IpAddr, SocketAddr},
path::{Path, PathBuf},
};
use tokio_util::codec::{Decoder, FramedRead};
const FILE_DESCRIPTOR_LIMIT: usize = 127;
#[derive(Clone)]
pub struct IpfsClient {
base: Uri,
client: Client,
}
impl Default for IpfsClient {
fn default() -> IpfsClient {
dirs::home_dir()
.map(|home_dir| home_dir.join(".ipfs").join("api"))
.and_then(|multiaddr_path| fs::read_to_string(&multiaddr_path).ok())
.and_then(|multiaddr_str| parity_multiaddr::from_url(&multiaddr_str).ok())
.and_then(|multiaddr| {
let mut addr: Option<IpAddr> = None;
let mut port: Option<u16> = None;
for addr_component in multiaddr.iter() {
match addr_component {
Protocol::Ip4(v4addr) => addr = Some(v4addr.into()),
Protocol::Ip6(v6addr) => addr = Some(v6addr.into()),
Protocol::Tcp(tcpport) => port = Some(tcpport),
_ => {
return None;
}
}
}
if let (Some(addr), Some(port)) = (addr, port) {
Some(SocketAddr::new(addr, port))
} else {
None
}
})
.map(IpfsClient::from)
.unwrap_or_else(|| IpfsClient::new("localhost", 5001).unwrap())
}
}
impl From<SocketAddr> for IpfsClient {
fn from(socket_addr: SocketAddr) -> Self {
IpfsClient::new(&socket_addr.ip().to_string(), socket_addr.port()).unwrap()
}
}
impl IpfsClient {
#[inline]
pub fn new(host: &str, port: u16) -> Result<IpfsClient, InvalidUri> {
Self::new_from_uri(format!("http://{}:{}", host, port).as_str())
}
#[inline]
pub fn new_from_uri(uri: &str) -> Result<IpfsClient, InvalidUri> {
let base_path = IpfsClient::build_base_path(uri)?;
let client = {
#[cfg(feature = "hyper")]
{
Builder::default()
.pool_max_idle_per_host(0)
.build(HttpsConnector::new())
}
#[cfg(feature = "actix")]
{
Client::default()
}
};
Ok(IpfsClient {
base: base_path,
client,
})
}
fn build_base_path(uri: &str) -> Result<Uri, InvalidUri> {
format!("{}/api/v0", uri).parse()
}
fn build_base_request<Req>(
&self,
req: Req,
form: Option<multipart::Form<'static>>,
) -> Result<Request, Error>
where
Req: ApiRequest + Serialize,
{
let url = format!(
"{}{}?{}",
self.base,
Req::PATH,
::serde_urlencoded::to_string(req)?
);
#[cfg(feature = "hyper")]
{
url.parse::<Uri>().map_err(From::from).and_then(move |url| {
let builder = http::Request::builder();
let builder = builder.method(Req::METHOD.clone()).uri(url);
let req = if let Some(form) = form {
form.set_body_convert::<hyper::Body, multipart::Body>(builder)
} else {
builder.body(hyper::Body::empty())
};
req.map_err(From::from)
})
}
#[cfg(feature = "actix")]
{
let req = if let Some(form) = form {
self.client
.request(Req::METHOD.clone(), url)
.content_type(form.content_type())
} else {
self.client.request(Req::METHOD.clone(), url)
};
Ok(req.timeout(std::time::Duration::from_secs(90)))
}
}
#[inline]
fn process_error_from_body(body: Bytes) -> Error {
match serde_json::from_slice(&body) {
Ok(e) => Error::Api(e),
Err(_) => match String::from_utf8(body.to_vec()) {
Ok(s) => Error::Uncategorized(s),
Err(e) => e.into(),
},
}
}
fn process_json_response<Res>(status: StatusCode, body: Bytes) -> Result<Res, Error>
where
for<'de> Res: 'static + Deserialize<'de>,
{
match status {
StatusCode::OK => serde_json::from_slice(&body).map_err(From::from),
_ => Err(Self::process_error_from_body(body)),
}
}
fn process_stream_response<D, Res>(
res: Response,
decoder: D,
) -> impl Stream<Item = Result<Res, Error>>
where
D: Decoder<Item = Res, Error = Error> + Send,
{
#[cfg(feature = "hyper")]
{
FramedRead::new(StreamReader::new(res.into_body()), decoder)
}
#[cfg(feature = "actix")]
{
FramedRead::new(StreamReader::new(res), decoder)
}
}
async fn request_raw<Req>(
&self,
req: Req,
form: Option<multipart::Form<'static>>,
) -> Result<(StatusCode, Bytes), Error>
where
Req: ApiRequest + Serialize,
{
let req = self.build_base_request(req, form)?;
#[cfg(feature = "hyper")]
{
let res = self.client.request(req).await?;
let status = res.status();
let body = body::to_bytes(res.into_body()).await?;
Ok((status, body))
}
#[cfg(feature = "actix")]
{
let mut res = req.send().await?;
let status = res.status();
let body = res.body().await?;
Ok((status, body))
}
}
async fn request<Req, Res>(
&self,
req: Req,
form: Option<multipart::Form<'static>>,
) -> Result<Res, Error>
where
Req: ApiRequest + Serialize,
for<'de> Res: 'static + Deserialize<'de>,
{
let (status, chunk) = self.request_raw(req, form).await?;
IpfsClient::process_json_response(status, chunk)
}
async fn request_empty<Req>(
&self,
req: Req,
form: Option<multipart::Form<'static>>,
) -> Result<(), Error>
where
Req: ApiRequest + Serialize,
{
let (status, chunk) = self.request_raw(req, form).await?;
match status {
StatusCode::OK => Ok(()),
_ => Err(Self::process_error_from_body(chunk)),
}
}
async fn request_string<Req>(
&self,
req: Req,
form: Option<multipart::Form<'static>>,
) -> Result<String, Error>
where
Req: ApiRequest + Serialize,
{
let (status, chunk) = self.request_raw(req, form).await?;
match status {
StatusCode::OK => String::from_utf8(chunk.to_vec()).map_err(From::from),
_ => Err(Self::process_error_from_body(chunk)),
}
}
}
impl IpfsClient {
fn request_stream<Res, F, OutStream>(
&self,
req: Request,
process: F,
) -> impl Stream<Item = Result<Res, Error>>
where
OutStream: Stream<Item = Result<Res, Error>>,
F: 'static + Fn(Response) -> OutStream,
{
#[cfg(feature = "hyper")]
{
self.client
.request(req)
.err_into()
.map_ok(move |res| {
match res.status() {
StatusCode::OK => process(res).right_stream(),
_ => body::to_bytes(res.into_body())
.boxed()
.map(|maybe_body| match maybe_body {
Ok(body) => Err(Self::process_error_from_body(body)),
Err(e) => Err(e.into()),
})
.into_stream()
.left_stream(),
}
})
.try_flatten_stream()
}
#[cfg(feature = "actix")]
{
req.send()
.err_into()
.map_ok(move |mut res| {
match res.status() {
StatusCode::OK => process(res).right_stream(),
_ => res
.body()
.map(|maybe_body| match maybe_body {
Ok(body) => Err(Self::process_error_from_body(body)),
Err(e) => Err(e.into()),
})
.into_stream()
.left_stream(),
}
})
.try_flatten_stream()
}
}
fn request_stream_bytes(&self, req: Request) -> impl Stream<Item = Result<Bytes, Error>> {
#[cfg(feature = "hyper")]
{
self.request_stream(req, |res| res.into_body().err_into())
}
#[cfg(feature = "actix")]
{
self.request_stream(req, |res| res.err_into())
}
}
fn request_stream_json<Res>(&self, req: Request) -> impl Stream<Item = Result<Res, Error>>
where
for<'de> Res: 'static + Deserialize<'de> + Send,
{
self.request_stream(req, |res| {
let parse_stream_error = if let Some(trailer) = res.headers().get(TRAILER) {
if trailer == "X-Stream-Error" {
true
} else {
let err = Error::UnrecognizedTrailerHeader(
String::from_utf8_lossy(trailer.as_ref()).into(),
);
return future::err(err).into_stream().left_stream();
}
} else {
false
};
IpfsClient::process_stream_response(res, JsonLineDecoder::new(parse_stream_error))
.right_stream()
})
}
}
macro_rules! impl_stream_api_response {
(($self:ident, $req:expr, $form:expr) => $call:ident) => {
impl_stream_api_response! {
($self, $req, $form) |req| => { $self.$call(req) }
}
};
(($self:ident, $req:expr, $form:expr) |$var:ident| => $impl:block) => {
match $self.build_base_request($req, $form) {
Ok($var) => $impl.right_stream(),
Err(e) => return future::err(e).into_stream().left_stream(),
}
};
}
impl IpfsClient {
#[inline]
pub async fn add<R>(&self, data: R) -> Result<response::AddResponse, Error>
where
R: 'static + Read + Send + Sync,
{
let mut form = multipart::Form::default();
form.add_reader("path", data);
self.request(request::Add, Some(form)).await
}
#[inline]
pub async fn add_path<P>(&self, path: P) -> Result<Vec<response::AddResponse>, Error>
where
P: AsRef<Path>,
{
let prefix = path.as_ref().parent();
let mut paths_to_add: Vec<(PathBuf, u64)> = vec![];
for path in walkdir::WalkDir::new(path.as_ref()) {
match path {
Ok(entry) if entry.file_type().is_file() => {
if entry.file_type().is_file() {
let file_size = entry
.metadata()
.map(|metadata| metadata.len())
.map_err(|e| Error::Io(e.into()))?;
paths_to_add.push((entry.path().to_path_buf(), file_size));
}
}
Ok(_) => (),
Err(err) => return Err(Error::Io(err.into())),
}
}
paths_to_add.sort_unstable_by(|(_, a), (_, b)| a.cmp(b).reverse());
let mut it = 0;
let mut form = multipart::Form::default();
for (path, file_size) in paths_to_add {
let mut file = fs::File::open(&path)?;
let file_name = match prefix {
Some(prefix) => path.strip_prefix(prefix).unwrap(),
None => path.as_path(),
}
.to_string_lossy();
if it < FILE_DESCRIPTOR_LIMIT {
form.add_reader_file("path", file, file_name);
it += 1;
} else {
let mut buf = Vec::with_capacity(file_size as usize);
let _ = file.read_to_end(&mut buf)?;
form.add_reader_file("path", Cursor::new(buf), file_name);
}
}
let req = self.build_base_request(request::Add, Some(form))?;
self.request_stream_json(req).try_collect().await
}
#[inline]
pub async fn bitswap_ledger(
&self,
peer: &str,
) -> Result<response::BitswapLedgerResponse, Error> {
self.request(request::BitswapLedger { peer }, None).await
}
#[inline]
pub async fn bitswap_reprovide(&self) -> Result<response::BitswapReprovideResponse, Error> {
self.request_empty(request::BitswapReprovide, None).await
}
#[inline]
pub async fn bitswap_stat(&self) -> Result<response::BitswapStatResponse, Error> {
self.request(request::BitswapStat, None).await
}
#[inline]
pub async fn bitswap_unwant(
&self,
key: &str,
) -> Result<response::BitswapUnwantResponse, Error> {
self.request_empty(request::BitswapUnwant { key }, None)
.await
}
#[inline]
pub async fn bitswap_wantlist(
&self,
peer: Option<&str>,
) -> Result<response::BitswapWantlistResponse, Error> {
self.request(request::BitswapWantlist { peer }, None).await
}
#[inline]
pub fn block_get(&self, hash: &str) -> impl Stream<Item = Result<Bytes, Error>> {
impl_stream_api_response! {
(self, request::BlockGet { hash }, None) => request_stream_bytes
}
}
#[inline]
pub async fn block_put<R>(&self, data: R) -> Result<response::BlockPutResponse, Error>
where
R: 'static + Read + Send + Sync,
{
let mut form = multipart::Form::default();
form.add_reader("data", data);
self.request(request::BlockPut, Some(form)).await
}
#[inline]
pub async fn block_rm(&self, hash: &str) -> Result<response::BlockRmResponse, Error> {
self.request(request::BlockRm { hash }, None).await
}
#[inline]
pub async fn block_stat(&self, hash: &str) -> Result<response::BlockStatResponse, Error> {
self.request(request::BlockStat { hash }, None).await
}
#[inline]
pub async fn bootstrap_add_default(
&self,
) -> Result<response::BootstrapAddDefaultResponse, Error> {
self.request(request::BootstrapAddDefault, None).await
}
#[inline]
pub async fn bootstrap_list(&self) -> Result<response::BootstrapListResponse, Error> {
self.request(request::BootstrapList, None).await
}
#[inline]
pub async fn bootstrap_rm_all(&self) -> Result<response::BootstrapRmAllResponse, Error> {
self.request(request::BootstrapRmAll, None).await
}
#[inline]
pub fn cat(&self, path: &str) -> impl Stream<Item = Result<Bytes, Error>> {
impl_stream_api_response! {
(self, request::Cat { path }, None) => request_stream_bytes
}
}
#[inline]
pub async fn commands(&self) -> Result<response::CommandsResponse, Error> {
self.request(request::Commands, None).await
}
#[inline]
pub async fn config_edit(&self) -> Result<response::ConfigEditResponse, Error> {
self.request(request::ConfigEdit, None).await
}
#[inline]
pub async fn config_replace<R>(&self, data: R) -> Result<response::ConfigReplaceResponse, Error>
where
R: 'static + Read + Send + Sync,
{
let mut form = multipart::Form::default();
form.add_reader("file", data);
self.request_empty(request::ConfigReplace, Some(form)).await
}
#[inline]
pub async fn config_show(&self) -> Result<response::ConfigShowResponse, Error> {
self.request_string(request::ConfigShow, None).await
}
#[inline]
pub async fn dag_get(&self, path: &str) -> Result<response::DagGetResponse, Error> {
self.request(request::DagGet { path }, None).await
}
#[inline]
pub fn dht_findpeer(
&self,
peer: &str,
) -> impl Stream<Item = Result<response::DhtFindPeerResponse, Error>> {
impl_stream_api_response! {
(self, request::DhtFindPeer { peer }, None) => request_stream_json
}
}
#[inline]
pub fn dht_findprovs(
&self,
key: &str,
) -> impl Stream<Item = Result<response::DhtFindProvsResponse, Error>> {
impl_stream_api_response! {
(self, request::DhtFindProvs { key }, None) => request_stream_json
}
}
#[inline]
pub fn dht_get(
&self,
key: &str,
) -> impl Stream<Item = Result<response::DhtGetResponse, Error>> {
impl_stream_api_response! {
(self, request::DhtGet { key }, None) => request_stream_json
}
}
#[inline]
pub fn dht_provide(
&self,
key: &str,
) -> impl Stream<Item = Result<response::DhtProvideResponse, Error>> {
impl_stream_api_response! {
(self, request::DhtProvide { key }, None) => request_stream_json
}
}
#[inline]
pub fn dht_put(
&self,
key: &str,
value: &str,
) -> impl Stream<Item = Result<response::DhtPutResponse, Error>> {
impl_stream_api_response! {
(self, request::DhtPut { key, value }, None) => request_stream_json
}
}
#[inline]
pub fn dht_query(
&self,
peer: &str,
) -> impl Stream<Item = Result<response::DhtQueryResponse, Error>> {
impl_stream_api_response! {
(self, request::DhtQuery { peer }, None) => request_stream_json
}
}
#[inline]
pub async fn diag_cmds_clear(&self) -> Result<response::DiagCmdsClearResponse, Error> {
self.request_empty(request::DiagCmdsClear, None).await
}
#[inline]
pub async fn diag_cmds_set_time(
&self,
time: &str,
) -> Result<response::DiagCmdsSetTimeResponse, Error> {
self.request_empty(request::DiagCmdsSetTime { time }, None)
.await
}
#[inline]
pub async fn diag_sys(&self) -> Result<response::DiagSysResponse, Error> {
self.request_string(request::DiagSys, None).await
}
#[inline]
pub async fn dns(&self, link: &str, recursive: bool) -> Result<response::DnsResponse, Error> {
self.request(request::Dns { link, recursive }, None).await
}
#[inline]
pub async fn file_ls(&self, path: &str) -> Result<response::FileLsResponse, Error> {
self.request(request::FileLs { path }, None).await
}
#[inline]
pub async fn files_cp(
&self,
path: &str,
dest: &str,
) -> Result<response::FilesCpResponse, Error> {
self.request_empty(request::FilesCp { path, dest }, None)
.await
}
#[inline]
pub async fn files_flush(
&self,
path: Option<&str>,
) -> Result<response::FilesFlushResponse, Error> {
self.request_empty(request::FilesFlush { path }, None).await
}
#[inline]
pub async fn files_ls(&self, path: Option<&str>) -> Result<response::FilesLsResponse, Error> {
self.request(request::FilesLs { path }, None).await
}
#[inline]
pub async fn files_mkdir(
&self,
path: &str,
parents: bool,
) -> Result<response::FilesMkdirResponse, Error> {
self.request_empty(request::FilesMkdir { path, parents }, None)
.await
}
#[inline]
pub async fn files_mv(
&self,
path: &str,
dest: &str,
) -> Result<response::FilesMvResponse, Error> {
self.request_empty(request::FilesMv { path, dest }, None)
.await
}
#[inline]
pub fn files_read(&self, path: &str) -> impl Stream<Item = Result<Bytes, Error>> {
impl_stream_api_response! {
(self, request::FilesRead { path }, None) => request_stream_bytes
}
}
#[inline]
pub async fn files_rm(
&self,
path: &str,
recursive: bool,
) -> Result<response::FilesRmResponse, Error> {
self.request_empty(request::FilesRm { path, recursive }, None)
.await
}
#[inline]
pub async fn files_stat(&self, path: &str) -> Result<response::FilesStatResponse, Error> {
self.request(request::FilesStat { path }, None).await
}
#[inline]
pub async fn files_write<R>(
&self,
path: &str,
create: bool,
truncate: bool,
data: R,
) -> Result<response::FilesWriteResponse, Error>
where
R: 'static + Read + Send + Sync,
{
let mut form = multipart::Form::default();
form.add_reader("data", data);
self.request_empty(
request::FilesWrite {
path,
create,
truncate,
},
Some(form),
)
.await
}
#[inline]
pub fn filestore_dups(
&self,
) -> impl Stream<Item = Result<response::FilestoreDupsResponse, Error>> {
impl_stream_api_response! {
(self, request::FilestoreDups, None) => request_stream_json
}
}
#[inline]
pub fn filestore_ls(
&self,
cid: Option<&str>,
) -> impl Stream<Item = Result<response::FilestoreLsResponse, Error>> {
impl_stream_api_response! {
(self, request::FilestoreLs { cid }, None) => request_stream_json
}
}
#[inline]
pub fn filestore_verify(
&self,
cid: Option<&str>,
) -> impl Stream<Item = Result<response::FilestoreVerifyResponse, Error>> {
impl_stream_api_response! {
(self, request::FilestoreVerify{ cid }, None) => request_stream_json
}
}
#[inline]
pub fn get(&self, path: &str) -> impl Stream<Item = Result<Bytes, Error>> {
impl_stream_api_response! {
(self, request::Get { path }, None) => request_stream_bytes
}
}
#[inline]
pub async fn id(&self, peer: Option<&str>) -> Result<response::IdResponse, Error> {
self.request(request::Id { peer }, None).await
}
#[inline]
pub async fn key_gen(
&self,
name: &str,
kind: request::KeyType,
size: i32,
) -> Result<response::KeyGenResponse, Error> {
self.request(request::KeyGen { name, kind, size }, None)
.await
}
#[inline]
pub async fn key_list(&self) -> Result<response::KeyListResponse, Error> {
self.request(request::KeyList, None).await
}
#[inline]
pub async fn key_rename(
&self,
name: &str,
new: &str,
force: bool,
) -> Result<response::KeyRenameResponse, Error> {
self.request(request::KeyRename { name, new, force }, None)
.await
}
#[inline]
pub async fn key_rm(&self, name: &str) -> Result<response::KeyRmResponse, Error> {
self.request(request::KeyRm { name }, None).await
}
#[inline]
pub async fn log_level(
&self,
logger: request::Logger<'_>,
level: request::LoggingLevel,
) -> Result<response::LogLevelResponse, Error> {
self.request(request::LogLevel { logger, level }, None)
.await
}
#[inline]
pub async fn log_ls(&self) -> Result<response::LogLsResponse, Error> {
self.request(request::LogLs, None).await
}
pub fn log_tail(&self) -> impl Stream<Item = Result<String, Error>> {
impl_stream_api_response! {
(self, request::LogTail, None) |req| => {
self.request_stream(req, |res| {
IpfsClient::process_stream_response(res, LineDecoder)
})
}
}
}
#[inline]
pub async fn ls(&self, path: Option<&str>) -> Result<response::LsResponse, Error> {
self.request(request::Ls { path }, None).await
}
pub async fn name_publish(
&self,
path: &str,
resolve: bool,
lifetime: Option<&str>,
ttl: Option<&str>,
key: Option<&str>,
) -> Result<response::NamePublishResponse, Error> {
self.request(
request::NamePublish {
path,
resolve,
lifetime,
ttl,
key,
},
None,
)
.await
}
pub async fn name_resolve(
&self,
name: Option<&str>,
recursive: bool,
nocache: bool,
) -> Result<response::NameResolveResponse, Error> {
self.request(
request::NameResolve {
name,
recursive,
nocache,
},
None,
)
.await
}
#[inline]
pub fn object_data(&self, key: &str) -> impl Stream<Item = Result<Bytes, Error>> {
impl_stream_api_response! {
(self, request::ObjectData { key }, None) => request_stream_bytes
}
}
#[inline]
pub async fn object_diff(
&self,
key0: &str,
key1: &str,
) -> Result<response::ObjectDiffResponse, Error> {
self.request(request::ObjectDiff { key0, key1 }, None).await
}
#[inline]
pub async fn object_get(&self, key: &str) -> Result<response::ObjectGetResponse, Error> {
self.request(request::ObjectGet { key }, None).await
}
#[inline]
pub async fn object_links(&self, key: &str) -> Result<response::ObjectLinksResponse, Error> {
self.request(request::ObjectLinks { key }, None).await
}
#[inline]
pub async fn object_new(
&self,
template: Option<request::ObjectTemplate>,
) -> Result<response::ObjectNewResponse, Error> {
self.request(request::ObjectNew { template }, None).await
}
#[inline]
pub async fn object_stat(&self, key: &str) -> Result<response::ObjectStatResponse, Error> {
self.request(request::ObjectStat { key }, None).await
}
#[inline]
pub async fn pin_add(
&self,
key: &str,
recursive: bool,
) -> Result<response::PinAddResponse, Error> {
self.request(
request::PinAdd {
key,
recursive: Some(recursive),
progress: false,
},
None,
)
.await
}
#[inline]
pub async fn pin_ls(
&self,
key: Option<&str>,
typ: Option<&str>,
) -> Result<response::PinLsResponse, Error> {
self.request(request::PinLs { key, typ }, None).await
}
#[inline]
pub async fn pin_rm(
&self,
key: &str,
recursive: bool,
) -> Result<response::PinRmResponse, Error> {
self.request(request::PinRm { key, recursive }, None).await
}
#[inline]
pub fn ping(
&self,
peer: &str,
count: Option<i32>,
) -> impl Stream<Item = Result<response::PingResponse, Error>> {
impl_stream_api_response! {
(self, request::Ping { peer, count }, None) => request_stream_json
}
}
#[inline]
pub async fn pubsub_ls(&self) -> Result<response::PubsubLsResponse, Error> {
self.request(request::PubsubLs, None).await
}
#[inline]
pub async fn pubsub_peers(
&self,
topic: Option<&str>,
) -> Result<response::PubsubPeersResponse, Error> {
self.request(request::PubsubPeers { topic }, None).await
}
#[inline]
pub async fn pubsub_pub(
&self,
topic: &str,
payload: &str,
) -> Result<response::PubsubPubResponse, Error> {
self.request_empty(request::PubsubPub { topic, payload }, None)
.await
}
#[inline]
pub fn pubsub_sub(
&self,
topic: &str,
discover: bool,
) -> impl Stream<Item = Result<response::PubsubSubResponse, Error>> {
impl_stream_api_response! {
(self, request::PubsubSub { topic, discover }, None) => request_stream_json
}
}
#[inline]
pub fn refs_local(&self) -> impl Stream<Item = Result<response::RefsLocalResponse, Error>> {
impl_stream_api_response! {
(self, request::RefsLocal, None) => request_stream_json
}
}
pub async fn shutdown(&self) -> Result<response::ShutdownResponse, Error> {
self.request_empty(request::Shutdown, None).await
}
#[inline]
pub async fn stats_bitswap(&self) -> Result<response::StatsBitswapResponse, Error> {
self.request(request::StatsBitswap, None).await
}
#[inline]
pub async fn stats_bw(&self) -> Result<response::StatsBwResponse, Error> {
self.request(request::StatsBw, None).await
}
#[inline]
pub async fn stats_repo(&self) -> Result<response::StatsRepoResponse, Error> {
self.request(request::StatsRepo, None).await
}
#[inline]
pub async fn swarm_addrs_local(&self) -> Result<response::SwarmAddrsLocalResponse, Error> {
self.request(request::SwarmAddrsLocal, None).await
}
#[inline]
pub async fn swarm_peers(&self) -> Result<response::SwarmPeersResponse, Error> {
self.request(request::SwarmPeers, None).await
}
#[inline]
pub async fn tar_add<R>(&self, data: R) -> Result<response::TarAddResponse, Error>
where
R: 'static + Read + Send + Sync,
{
let mut form = multipart::Form::default();
form.add_reader("file", data);
self.request(request::TarAdd, Some(form)).await
}
#[inline]
pub fn tar_cat(&self, path: &str) -> impl Stream<Item = Result<Bytes, Error>> {
impl_stream_api_response! {
(self, request::TarCat { path }, None) => request_stream_bytes
}
}
#[inline]
pub async fn version(&self) -> Result<response::VersionResponse, Error> {
self.request(request::Version, None).await
}
}