#![warn(missing_docs)]
use std::time::Duration;
use async_stream::stream;
use futures::pin_mut;
use futures::select;
use futures::FutureExt;
use futures::Stream;
use futures_timer::Delay;
use rings_rpc::jsonrpc::Client as RpcClient;
use rings_rpc::protos::rings_node::*;
use crate::backend::types::BackendMessage;
use crate::backend::types::HttpRequest;
use crate::backend::types::ServiceMessage;
use crate::seed::Seed;
use crate::util::loader::ResourceLoader;
type Output<T> = anyhow::Result<ClientOutput<T>>;
pub struct Client {
client: RpcClient,
}
pub struct ClientOutput<T> {
pub result: T,
display: String,
}
impl Client {
pub fn new(endpoint_url: &str) -> anyhow::Result<Self> {
let rpc_client = RpcClient::new(endpoint_url);
Ok(Self { client: rpc_client })
}
pub async fn connect_peer_via_http(&mut self, url: &str) -> Output<String> {
let peer_did = self
.client
.connect_peer_via_http(&ConnectPeerViaHttpRequest {
url: url.to_string(),
})
.await
.map_err(|e| anyhow::anyhow!("{}", e))?
.did;
ClientOutput::ok(format!("Remote did: {}", peer_did), peer_did)
}
pub async fn connect_with_seed(&mut self, source: &str) -> Output<()> {
let seed = Seed::load(source).await?;
let req = seed.into_connect_with_seed_request();
self.client
.connect_with_seed(&req)
.await
.map_err(|e| anyhow::anyhow!("{}", e))?;
ClientOutput::ok("Successful!".to_string(), ())
}
pub async fn connect_with_did(&mut self, did: &str) -> Output<()> {
self.client
.connect_with_did(&ConnectWithDidRequest {
did: did.to_string(),
})
.await
.map_err(|e| anyhow::anyhow!("{}", e))?;
ClientOutput::ok("Successful!".to_owned(), ())
}
pub async fn list_peers(&mut self) -> Output<()> {
let peers = self
.client
.list_peers(&ListPeersRequest {})
.await
.map_err(|e| anyhow::anyhow!("{}", e))?
.peers;
let mut display = String::new();
display.push_str("Did, TransportId, Status\n");
display.push_str(
peers
.iter()
.map(|peer| format!("{}, {}, {}", peer.did, peer.did, peer.state))
.collect::<Vec<_>>()
.join("\n")
.as_str(),
);
ClientOutput::ok(display, ())
}
pub async fn disconnect(&mut self, did: &str) -> Output<()> {
self.client
.disconnect(&DisconnectRequest {
did: did.to_string(),
})
.await
.map_err(|e| anyhow::anyhow!("{}", e))?;
ClientOutput::ok("Done.".into(), ())
}
pub async fn send_custom_message(&self, did: &str, data: &str) -> Output<()> {
self.client
.send_custom_message(&SendCustomMessageRequest {
destination_did: did.to_string(),
data: data.to_string(),
})
.await
.map_err(|e| anyhow::anyhow!("{}", e))?;
ClientOutput::ok("Done.".into(), ())
}
#[allow(clippy::too_many_arguments)]
pub async fn send_http_request_message(
&self,
did: &str,
service: &str,
method: http::Method,
path: &str,
headers: Vec<(String, String)>,
body: Option<Vec<u8>>,
rid: Option<String>,
) -> Output<()> {
let req = HttpRequest {
service: service.to_string(),
method: method.to_string(),
path: path.to_string(),
headers,
body,
rid,
};
let backend_msg = BackendMessage::from(ServiceMessage::HttpRequest(req));
let rpc_req = backend_msg
.into_send_backend_message_request(did)
.map_err(|e| anyhow::anyhow!("{}", e))?;
self.client
.send_backend_message(&rpc_req)
.await
.map_err(|e| anyhow::anyhow!("{}", e))?;
ClientOutput::ok("Done.".into(), ())
}
pub async fn send_plain_text_message(&self, did: &str, text: &str) -> Output<()> {
let backend_msg = BackendMessage::PlainText(text.to_string());
let rpc_req = backend_msg
.into_send_backend_message_request(did)
.map_err(|e| anyhow::anyhow!("{}", e))?;
self.client
.send_backend_message(&rpc_req)
.await
.map_err(|e| anyhow::anyhow!("{}", e))?;
ClientOutput::ok("Done.".into(), ())
}
pub async fn register_service(&self, name: &str) -> Output<()> {
self.client
.register_service(&RegisterServiceRequest {
name: name.to_string(),
})
.await
.map_err(|e| anyhow::anyhow!("{}", e))?;
ClientOutput::ok("Done.".into(), ())
}
pub async fn lookup_service(&self, name: &str) -> Output<()> {
let dids = self
.client
.lookup_service(&LookupServiceRequest {
name: name.to_string(),
})
.await
.map_err(|e| anyhow::anyhow!("{}", e))?
.dids;
ClientOutput::ok(dids.join("\n"), ())
}
pub async fn publish_message_to_topic(&self, topic: &str, data: &str) -> Output<()> {
self.client
.publish_message_to_topic(&PublishMessageToTopicRequest {
topic: topic.to_string(),
data: data.to_string(),
})
.await
.map_err(|e| anyhow::anyhow!("{}", e))?;
ClientOutput::ok("Done.".into(), ())
}
pub async fn subscribe_topic<'a, 'b>(
&'a self,
topic: String,
) -> impl Stream<Item = String> + 'b
where
'a: 'b,
{
let mut skip = 0usize;
stream! {
loop {
let timeout = Delay::new(Duration::from_secs(5)).fuse();
pin_mut!(timeout);
select! {
_ = timeout => {
let result = self
.client
.fetch_topic_messages(&FetchTopicMessagesRequest {
topic: topic.clone(),
skip: skip as i64,
})
.await;
if let Err(e) = result {
tracing::error!("Failed to fetch messages of topic: {}, {}", topic, e);
continue;
}
let messages = result.unwrap().data;
for msg in messages.iter().cloned() {
yield msg
}
skip += messages.len();
}
}
}
}
}
pub async fn inspect(&self) -> Output<SwarmInfo> {
let swarm_info = self
.client
.node_info(&NodeInfoRequest {})
.await
.map_err(|e| anyhow::anyhow!("{}", e))?
.swarm
.unwrap();
let display =
serde_json::to_string_pretty(&swarm_info).map_err(|e| anyhow::anyhow!("{}", e))?;
ClientOutput::ok(display, swarm_info)
}
}
impl<T> ClientOutput<T> {
pub fn ok(display: String, result: T) -> anyhow::Result<Self> {
Ok(Self { result, display })
}
pub fn display(&self) {
println!("{}", self.display);
}
}