use crate::{read::LineDecoder, request, response, Backend, BoxStream};
use async_trait::async_trait;
use bytes::Bytes;
use common_multipart_rfc7578::client::multipart;
use futures::{future, AsyncRead, FutureExt, TryStreamExt};
use std::{
fs::File,
io::{Cursor, Read},
path::{Path, PathBuf},
};
const FILE_DESCRIPTOR_LIMIT: usize = 127;
macro_rules! impl_stream_api_response {
(($self:ident, $req:expr, $form:expr) => $call:ident) => {
impl_stream_api_response! {
($self, $req, $form) |req| => { $self.$call(req) }
}
};
(($self:ident, $req:expr, $form:expr) |$var:ident| => $impl:block) => {
match $self.build_base_request($req, $form) {
Ok($var) => $impl,
Err(e) => Box::new(future::err(e).into_stream()),
}
};
}
#[cfg_attr(feature = "with-send-sync", async_trait)]
#[cfg_attr(not(feature = "with-send-sync"), async_trait(?Send))]
pub trait IpfsApi: Backend {
async fn add<R>(&self, data: R) -> Result<response::AddResponse, Self::Error>
where
R: 'static + Read + Send + Sync + Unpin,
{
self.add_with_options(data, request::Add::default()).await
}
async fn add_async<R>(&self, data: R) -> Result<response::AddResponse, Self::Error>
where
R: 'static + AsyncRead + Send + Sync + Unpin,
{
self.add_async_with_options(data, request::Add::default())
.await
}
async fn add_with_options<R>(
&self,
data: R,
add: request::Add<'_>,
) -> Result<response::AddResponse, Self::Error>
where
R: 'static + Read + Send + Sync + Unpin,
{
let mut form = multipart::Form::default();
form.add_reader("path", data);
self.request(add, Some(form)).await
}
async fn add_async_with_options<R>(
&self,
data: R,
add: request::Add<'_>,
) -> Result<response::AddResponse, Self::Error>
where
R: 'static + AsyncRead + Send + Sync + Unpin,
{
let mut form = multipart::Form::default();
form.add_async_reader("path", data);
self.request(add, Some(form)).await
}
async fn add_with_form<F>(
&self,
form: F,
add: request::Add<'_>,
) -> Result<Vec<response::AddResponse>, Self::Error>
where
F: Into<multipart::Form<'static>> + Send,
{
let req = self.build_base_request(add, Some(form.into()))?;
self.request_stream_json(req).try_collect().await
}
async fn add_path<P>(&self, path: P) -> Result<Vec<response::AddResponse>, Self::Error>
where
P: AsRef<Path> + Send,
{
let prefix = path.as_ref().parent();
let mut paths_to_add: Vec<(PathBuf, u64)> = vec![];
for path in walkdir::WalkDir::new(path.as_ref()) {
match path {
Ok(entry) if entry.file_type().is_file() => {
let file_size = entry
.metadata()
.map(|metadata| metadata.len())
.map_err(|e| crate::Error::Io(e.into()))?;
paths_to_add.push((entry.path().to_path_buf(), file_size));
}
Ok(_) => (),
Err(e) => return Err(crate::Error::Io(e.into()).into()),
}
}
paths_to_add.sort_unstable_by(|(_, a), (_, b)| a.cmp(b).reverse());
let mut it = 0;
let mut form = multipart::Form::default();
for (path, file_size) in paths_to_add {
let mut file = File::open(&path).map_err(crate::Error::Io)?;
let file_name = match prefix {
Some(prefix) => path.strip_prefix(prefix).unwrap(),
None => path.as_path(),
}
.to_string_lossy();
if it < FILE_DESCRIPTOR_LIMIT {
form.add_reader_file("path", file, file_name);
it += 1;
} else {
let mut buf = Vec::with_capacity(file_size as usize);
let _ = file.read_to_end(&mut buf).map_err(crate::Error::Io)?;
form.add_reader_file("path", Cursor::new(buf), file_name);
}
}
let req = self.build_base_request(request::Add::default(), Some(form))?;
self.request_stream_json(req).try_collect().await
}
async fn bitswap_ledger(
&self,
peer: &str,
) -> Result<response::BitswapLedgerResponse, Self::Error> {
self.request(request::BitswapLedger { peer }, None).await
}
async fn bitswap_reprovide(&self) -> Result<response::BitswapReprovideResponse, Self::Error> {
self.request_empty(request::BitswapReprovide, None).await
}
async fn bitswap_stat(&self) -> Result<response::BitswapStatResponse, Self::Error> {
self.request(request::BitswapStat, None).await
}
async fn bitswap_unwant(
&self,
key: &str,
) -> Result<response::BitswapUnwantResponse, Self::Error> {
self.request_empty(request::BitswapUnwant { key }, None)
.await
}
async fn bitswap_wantlist(
&self,
peer: Option<&str>,
) -> Result<response::BitswapWantlistResponse, Self::Error> {
self.request(request::BitswapWantlist { peer }, None).await
}
fn block_get(&self, hash: &str) -> BoxStream<Bytes, Self::Error> {
impl_stream_api_response! {
(self, request::BlockGet { hash }, None) => request_stream_bytes
}
}
async fn block_put<R>(&self, data: R) -> Result<response::BlockPutResponse, Self::Error>
where
R: 'static + Read + Send + Sync + Unpin,
{
self.block_put_with_options(data, request::BlockPut::default())
.await
}
async fn block_put_with_options<R>(
&self,
data: R,
options: request::BlockPut<'async_trait>,
) -> Result<response::BlockPutResponse, Self::Error>
where
R: 'static + Read + Send + Sync + Unpin,
{
let mut form = multipart::Form::default();
form.add_reader("data", data);
self.request(options, Some(form)).await
}
async fn block_rm(&self, hash: &str) -> Result<response::BlockRmResponse, Self::Error> {
self.request(request::BlockRm { hash }, None).await
}
async fn block_stat(&self, hash: &str) -> Result<response::BlockStatResponse, Self::Error> {
self.request(request::BlockStat { hash }, None).await
}
async fn bootstrap_add_default(
&self,
) -> Result<response::BootstrapAddDefaultResponse, Self::Error> {
self.request(request::BootstrapAddDefault, None).await
}
async fn bootstrap_list(&self) -> Result<response::BootstrapListResponse, Self::Error> {
self.request(request::BootstrapList, None).await
}
async fn bootstrap_rm_all(&self) -> Result<response::BootstrapRmAllResponse, Self::Error> {
self.request(request::BootstrapRmAll, None).await
}
fn cat(&self, path: &str) -> BoxStream<Bytes, Self::Error> {
let offset = None;
let length = None;
impl_stream_api_response! {
(self, request::Cat { path, offset, length }, None) => request_stream_bytes
}
}
fn cat_range(
&self,
path: &str,
_offset: usize,
_length: usize,
) -> BoxStream<Bytes, Self::Error> {
let offset = Some(_offset);
let length = Some(_length);
impl_stream_api_response! {
(self, request::Cat { path, offset, length }, None) => request_stream_bytes
}
}
async fn commands(&self) -> Result<response::CommandsResponse, Self::Error> {
self.request(request::Commands, None).await
}
async fn config_get_string(&self, key: &str) -> Result<response::ConfigResponse, Self::Error> {
self.request(
request::Config {
key,
value: None,
boolean: None,
stringified_json: None,
},
None,
)
.await
}
async fn config_get_bool(&self, key: &str) -> Result<response::ConfigResponse, Self::Error> {
self.request(
request::Config {
key,
value: None,
boolean: None,
stringified_json: None,
},
None,
)
.await
}
async fn config_get_json(&self, key: &str) -> Result<response::ConfigResponse, Self::Error> {
self.request(
request::Config {
key,
value: None,
boolean: None,
stringified_json: None,
},
None,
)
.await
}
async fn config_set_string(
&self,
key: &str,
value: &str,
) -> Result<response::ConfigResponse, Self::Error> {
self.request(
request::Config {
key,
value: Some(value),
boolean: None,
stringified_json: None,
},
None,
)
.await
}
async fn config_set_bool(
&self,
key: &str,
value: bool,
) -> Result<response::ConfigResponse, Self::Error> {
self.request(
request::Config {
key,
value: Some(&value.to_string()),
boolean: Some(true),
stringified_json: None,
},
None,
)
.await
}
async fn config_set_json(
&self,
key: &str,
value: &str,
) -> Result<response::ConfigResponse, Self::Error> {
self.request(
request::Config {
key,
value: Some(value),
boolean: None,
stringified_json: Some(true),
},
None,
)
.await
}
async fn config_edit(&self) -> Result<response::ConfigEditResponse, Self::Error> {
self.request(request::ConfigEdit, None).await
}
async fn config_replace<R>(
&self,
data: R,
) -> Result<response::ConfigReplaceResponse, Self::Error>
where
R: 'static + Read + Send + Sync + Unpin,
{
let mut form = multipart::Form::default();
form.add_reader("file", data);
self.request_empty(request::ConfigReplace, Some(form)).await
}
async fn config_show(&self) -> Result<response::ConfigShowResponse, Self::Error> {
self.request_string(request::ConfigShow, None).await
}
fn dag_get(&self, path: &str) -> BoxStream<Bytes, Self::Error> {
self.dag_get_with_options(request::DagGet {
path,
..Default::default()
})
}
fn dag_get_with_options(&self, options: request::DagGet) -> BoxStream<Bytes, Self::Error> {
impl_stream_api_response! {
(self, options, None) => request_stream_bytes
}
}
async fn dag_put<R>(&self, data: R) -> Result<response::DagPutResponse, Self::Error>
where
R: 'static + Read + Send + Sync + Unpin,
{
self.dag_put_with_options(data, request::DagPut::default())
.await
}
async fn dag_put_with_options<'a, R>(
&self,
data: R,
options: request::DagPut<'a>,
) -> Result<response::DagPutResponse, Self::Error>
where
R: 'static + Read + Send + Sync + Unpin,
{
let mut form = multipart::Form::default();
form.add_reader("object data", data);
self.request(options, Some(form)).await
}
fn dht_findpeer(&self, peer: &str) -> BoxStream<response::DhtFindPeerResponse, Self::Error> {
impl_stream_api_response! {
(self, request::DhtFindPeer { peer }, None) => request_stream_json
}
}
fn dht_findprovs(&self, key: &str) -> BoxStream<response::DhtFindProvsResponse, Self::Error> {
impl_stream_api_response! {
(self, request::DhtFindProvs { key }, None) => request_stream_json
}
}
fn dht_get(&self, key: &str) -> BoxStream<response::DhtGetResponse, Self::Error> {
impl_stream_api_response! {
(self, request::DhtGet { key }, None) => request_stream_json
}
}
fn dht_provide(&self, key: &str) -> BoxStream<response::DhtProvideResponse, Self::Error> {
impl_stream_api_response! {
(self, request::DhtProvide { key }, None) => request_stream_json
}
}
fn dht_put(&self, key: &str, value: &str) -> BoxStream<response::DhtPutResponse, Self::Error> {
impl_stream_api_response! {
(self, request::DhtPut { key, value }, None) => request_stream_json
}
}
fn dht_query(&self, peer: &str) -> BoxStream<response::DhtQueryResponse, Self::Error> {
impl_stream_api_response! {
(self, request::DhtQuery { peer }, None) => request_stream_json
}
}
async fn diag_cmds_clear(&self) -> Result<response::DiagCmdsClearResponse, Self::Error> {
self.request_empty(request::DiagCmdsClear, None).await
}
async fn diag_cmds_set_time(
&self,
time: &str,
) -> Result<response::DiagCmdsSetTimeResponse, Self::Error> {
self.request_empty(request::DiagCmdsSetTime { time }, None)
.await
}
async fn diag_sys(&self) -> Result<response::DiagSysResponse, Self::Error> {
self.request_string(request::DiagSys, None).await
}
async fn dns(&self, link: &str, recursive: bool) -> Result<response::DnsResponse, Self::Error> {
self.request(request::Dns { link, recursive }, None).await
}
async fn file_ls(&self, path: &str) -> Result<response::FileLsResponse, Self::Error> {
self.request(request::FileLs { path }, None).await
}
async fn files_cp(
&self,
path: &str,
dest: &str,
) -> Result<response::FilesCpResponse, Self::Error> {
self.files_cp_with_options(request::FilesCp {
path,
dest,
..Default::default()
})
.await
}
async fn files_cp_with_options(
&self,
options: request::FilesCp<'_>,
) -> Result<response::FilesCpResponse, Self::Error> {
self.request_empty(options, None).await
}
async fn files_flush(
&self,
path: Option<&str>,
) -> Result<response::FilesFlushResponse, Self::Error> {
self.request_empty(request::FilesFlush { path }, None).await
}
async fn files_ls(&self, path: Option<&str>) -> Result<response::FilesLsResponse, Self::Error> {
self.files_ls_with_options(request::FilesLs {
path,
..Default::default()
})
.await
}
async fn files_ls_with_options(
&self,
options: request::FilesLs<'_>,
) -> Result<response::FilesLsResponse, Self::Error> {
self.request(options, None).await
}
async fn files_mkdir(
&self,
path: &str,
parents: bool,
) -> Result<response::FilesMkdirResponse, Self::Error> {
self.files_mkdir_with_options(request::FilesMkdir {
path,
parents: Some(parents),
..Default::default()
})
.await
}
async fn files_mkdir_with_options(
&self,
options: request::FilesMkdir<'_>,
) -> Result<response::FilesMkdirResponse, Self::Error> {
self.request_empty(options, None).await
}
async fn files_mv(
&self,
path: &str,
dest: &str,
) -> Result<response::FilesMvResponse, Self::Error> {
self.files_mv_with_options(request::FilesMv {
path,
dest,
..Default::default()
})
.await
}
async fn files_mv_with_options(
&self,
options: request::FilesMv<'_>,
) -> Result<response::FilesMvResponse, Self::Error> {
self.request_empty(options, None).await
}
fn files_read(&self, path: &str) -> BoxStream<Bytes, Self::Error> {
self.files_read_with_options(request::FilesRead {
path,
..request::FilesRead::default()
})
}
fn files_read_with_options(
&self,
options: request::FilesRead,
) -> BoxStream<Bytes, Self::Error> {
impl_stream_api_response! { (self, options, None) => request_stream_bytes }
}
async fn files_rm(
&self,
path: &str,
recursive: bool,
) -> Result<response::FilesRmResponse, Self::Error> {
self.files_rm_with_options(request::FilesRm {
path,
recursive: Some(recursive),
..Default::default()
})
.await
}
async fn files_rm_with_options(
&self,
options: request::FilesRm<'_>,
) -> Result<response::FilesRmResponse, Self::Error> {
self.request_empty(options, None).await
}
async fn files_stat(&self, path: &str) -> Result<response::FilesStatResponse, Self::Error> {
self.files_stat_with_options(request::FilesStat {
path,
..Default::default()
})
.await
}
async fn files_stat_with_options(
&self,
options: request::FilesStat<'_>,
) -> Result<response::FilesStatResponse, Self::Error> {
self.request(options, None).await
}
async fn files_write<R>(
&self,
path: &str,
create: bool,
truncate: bool,
data: R,
) -> Result<response::FilesWriteResponse, Self::Error>
where
R: 'static + Read + Send + Sync + Unpin,
{
let options = request::FilesWrite {
path,
create: Some(create),
truncate: Some(truncate),
..request::FilesWrite::default()
};
self.files_write_with_options(options, data).await
}
async fn files_write_with_options<R>(
&self,
options: request::FilesWrite<'_>,
data: R,
) -> Result<response::FilesWriteResponse, Self::Error>
where
R: 'static + Read + Send + Sync + Unpin,
{
let mut form = multipart::Form::default();
form.add_reader("data", data);
self.request_empty(options, Some(form)).await
}
async fn files_chcid(
&self,
path: &str,
cid_version: i32,
) -> Result<response::FilesChcidResponse, Self::Error> {
self.request_empty(
request::FilesChcid {
path: Some(path),
cid_version: Some(cid_version),
..Default::default()
},
None,
)
.await
}
async fn files_chcid_with_options(
&self,
options: request::FilesChcid<'_>,
) -> Result<response::FilesChcidResponse, Self::Error> {
self.request_empty(options, None).await
}
fn filestore_dups(&self) -> BoxStream<response::FilestoreDupsResponse, Self::Error> {
impl_stream_api_response! {
(self, request::FilestoreDups, None) => request_stream_json
}
}
fn filestore_ls(
&self,
cid: Option<&str>,
) -> BoxStream<response::FilestoreLsResponse, Self::Error> {
impl_stream_api_response! {
(self, request::FilestoreLs { cid }, None) => request_stream_json
}
}
fn filestore_verify(
&self,
cid: Option<&str>,
) -> BoxStream<response::FilestoreVerifyResponse, Self::Error> {
impl_stream_api_response! {
(self, request::FilestoreVerify{ cid }, None) => request_stream_json
}
}
fn get(&self, path: &str) -> BoxStream<Bytes, Self::Error> {
impl_stream_api_response! {
(self, request::Get { path }, None) => request_stream_bytes
}
}
async fn id(&self, peer: Option<&str>) -> Result<response::IdResponse, Self::Error> {
self.request(request::Id { peer }, None).await
}
async fn key_gen(
&self,
name: &str,
kind: request::KeyType,
size: i32,
) -> Result<response::KeyGenResponse, Self::Error> {
self.request(request::KeyGen { name, kind, size }, None)
.await
}
async fn key_list(&self) -> Result<response::KeyListResponse, Self::Error> {
self.request(request::KeyList, None).await
}
async fn key_rename(
&self,
name: &str,
new: &str,
force: bool,
) -> Result<response::KeyRenameResponse, Self::Error> {
self.request(request::KeyRename { name, new, force }, None)
.await
}
async fn key_rm(&self, name: &str) -> Result<response::KeyRmResponse, Self::Error> {
self.request(request::KeyRm { name }, None).await
}
async fn log_level(
&self,
logger: request::Logger<'_>,
level: request::LoggingLevel,
) -> Result<response::LogLevelResponse, Self::Error> {
self.request(request::LogLevel { logger, level }, None)
.await
}
async fn log_ls(&self) -> Result<response::LogLsResponse, Self::Error> {
self.request(request::LogLs, None).await
}
fn log_tail(&self) -> BoxStream<String, Self::Error> {
impl_stream_api_response! {
(self, request::LogTail, None) |req| => {
self.request_stream(req, |res| {
Box::new(Self::process_stream_response(res, LineDecoder).map_err(Self::Error::from))
})
}
}
}
async fn ls(&self, path: &str) -> Result<response::LsResponse, Self::Error> {
self.request(
request::Ls {
path,
..Default::default()
},
None,
)
.await
}
fn ls_with_options(
&self,
options: request::Ls<'_>,
) -> BoxStream<response::LsResponse, Self::Error> {
impl_stream_api_response! {
(self, options, None) => request_stream_json
}
}
async fn name_publish(
&self,
path: &str,
resolve: bool,
lifetime: Option<&str>,
ttl: Option<&str>,
key: Option<&str>,
) -> Result<response::NamePublishResponse, Self::Error> {
self.request(
request::NamePublish {
path,
resolve,
lifetime,
ttl,
key,
},
None,
)
.await
}
async fn name_resolve(
&self,
name: Option<&str>,
recursive: bool,
nocache: bool,
) -> Result<response::NameResolveResponse, Self::Error> {
self.request(
request::NameResolve {
name,
recursive,
nocache,
},
None,
)
.await
}
fn object_data(&self, key: &str) -> BoxStream<Bytes, Self::Error> {
impl_stream_api_response! {
(self, request::ObjectData { key }, None) => request_stream_bytes
}
}
async fn object_diff(
&self,
key0: &str,
key1: &str,
) -> Result<response::ObjectDiffResponse, Self::Error> {
self.request(request::ObjectDiff { key0, key1 }, None).await
}
async fn object_get(&self, key: &str) -> Result<response::ObjectGetResponse, Self::Error> {
self.request(request::ObjectGet { key }, None).await
}
async fn object_links(&self, key: &str) -> Result<response::ObjectLinksResponse, Self::Error> {
self.request(request::ObjectLinks { key }, None).await
}
async fn object_new(
&self,
template: Option<request::ObjectTemplate>,
) -> Result<response::ObjectNewResponse, Self::Error> {
self.request(request::ObjectNew { template }, None).await
}
async fn object_patch_add_link(
&self,
folder: &str,
name: &str,
key: &str,
create: bool,
) -> Result<response::ObjectPatchAddLinkResponse, Self::Error> {
self.request(
request::ObjectPatchAddLink {
folder,
name,
key,
create,
},
None,
)
.await
}
async fn object_stat(&self, key: &str) -> Result<response::ObjectStatResponse, Self::Error> {
self.request(request::ObjectStat { key }, None).await
}
async fn pin_add(
&self,
key: &str,
recursive: bool,
) -> Result<response::PinAddResponse, Self::Error> {
self.request(
request::PinAdd {
key,
recursive: Some(recursive),
progress: false,
},
None,
)
.await
}
async fn pin_ls(
&self,
key: Option<&str>,
typ: Option<&str>,
) -> Result<response::PinLsResponse, Self::Error> {
self.request(request::PinLs { key, typ }, None).await
}
async fn pin_rm(
&self,
key: &str,
recursive: bool,
) -> Result<response::PinRmResponse, Self::Error> {
self.request(request::PinRm { key, recursive }, None).await
}
fn ping(
&self,
peer: &str,
count: Option<i32>,
) -> BoxStream<response::PingResponse, Self::Error> {
impl_stream_api_response! {
(self, request::Ping { peer, count }, None) => request_stream_json
}
}
async fn pubsub_ls(&self) -> Result<response::PubsubLsResponse, Self::Error> {
self.request(request::PubsubLs, None).await
}
async fn pubsub_peers<T>(
&self,
topic: Option<T>,
) -> Result<response::PubsubPeersResponse, Self::Error>
where
T: AsRef<[u8]> + Send + Sync,
{
match topic {
Some(topic) => {
self.request(
request::PubsubPeers {
topic: Some(topic.as_ref()),
},
None,
)
.await
}
None => {
self.request(request::PubsubPeers { topic: None }, None)
.await
}
}
}
async fn pubsub_pub<T, R>(
&self,
topic: T,
data: R,
) -> Result<response::PubsubPubResponse, Self::Error>
where
T: AsRef<[u8]> + Send + Sync,
R: 'static + Read + Send + Sync + Unpin,
{
let mut form = multipart::Form::default();
form.add_reader("data", data);
self.request_empty(
request::PubsubPub {
topic: topic.as_ref(),
},
Some(form),
)
.await
}
fn pubsub_sub<T>(&self, topic: T) -> BoxStream<response::PubsubSubResponse, Self::Error>
where
T: AsRef<[u8]>,
{
impl_stream_api_response! {
(self, request::PubsubSub { topic: topic.as_ref() }, None) => request_stream_json
}
}
fn refs_local(&self) -> BoxStream<response::RefsLocalResponse, Self::Error> {
impl_stream_api_response! {
(self, request::RefsLocal, None) => request_stream_json
}
}
async fn shutdown(&self) -> Result<response::ShutdownResponse, Self::Error> {
self.request_empty(request::Shutdown, None).await
}
async fn stats_bitswap(&self) -> Result<response::StatsBitswapResponse, Self::Error> {
self.request(request::StatsBitswap, None).await
}
async fn stats_bw(&self) -> Result<response::StatsBwResponse, Self::Error> {
self.request(request::StatsBw, None).await
}
async fn stats_repo(&self) -> Result<response::StatsRepoResponse, Self::Error> {
self.request(request::StatsRepo, None).await
}
async fn swarm_addrs_local(&self) -> Result<response::SwarmAddrsLocalResponse, Self::Error> {
self.request(request::SwarmAddrsLocal, None).await
}
async fn swarm_connect(
&self,
peer: &str,
) -> Result<response::SwarmConnectResponse, Self::Error> {
self.request(request::SwarmConnect { peer }, None).await
}
async fn swarm_peers(&self) -> Result<response::SwarmPeersResponse, Self::Error> {
self.request(request::SwarmPeers, None).await
}
async fn tar_add<R>(&self, data: R) -> Result<response::TarAddResponse, Self::Error>
where
R: 'static + Read + Send + Sync + Unpin,
{
let mut form = multipart::Form::default();
form.add_reader("file", data);
self.request(request::TarAdd, Some(form)).await
}
fn tar_cat(&self, path: &str) -> BoxStream<Bytes, Self::Error> {
impl_stream_api_response! {
(self, request::TarCat { path }, None) => request_stream_bytes
}
}
async fn version(&self) -> Result<response::VersionResponse, Self::Error> {
self.request(request::Version, None).await
}
}
impl<B> IpfsApi for B where B: Backend {}