#![warn(missing_docs)]
use std::time::Duration;
use async_stream::stream;
use futures::pin_mut;
use futures::select;
use futures::FutureExt;
use futures::Stream;
use futures_timer::Delay;
use serde_json::json;
use crate::prelude::http;
use crate::prelude::rings_core::inspect::SwarmInspect;
use crate::prelude::rings_core::session::SessionSk;
use crate::prelude::rings_rpc::client::Client as RpcClient;
use crate::prelude::rings_rpc::types::Timeout;
use crate::seed::Seed;
use crate::util::loader::ResourceLoader;
type Output<T> = anyhow::Result<ClientOutput<T>>;
pub struct Client {
client: RpcClient,
}
pub struct ClientOutput<T> {
pub result: T,
display: String,
}
impl Client {
pub fn new(endpoint_url: &str, session_sk: SessionSk) -> anyhow::Result<Self> {
let rpc_client = RpcClient::new(endpoint_url, Some(session_sk));
Ok(Self { client: rpc_client })
}
pub async fn connect_peer_via_http(&mut self, http_url: &str) -> Output<String> {
let peer_did = self
.client
.connect_peer_via_http(http_url)
.await
.map_err(|e| anyhow::anyhow!("{}", e))?;
ClientOutput::ok(format!("Remote did: {}", peer_did), peer_did)
}
pub async fn connect_with_seed(&mut self, source: &str) -> Output<()> {
let seed = Seed::load(source).await?;
let seed_v = serde_json::to_value(seed).map_err(|_| anyhow::anyhow!("serialize failed"))?;
self.client
.connect_with_seed(&[seed_v])
.await
.map_err(|e| anyhow::anyhow!("{}", e))?;
ClientOutput::ok("Successful!".to_string(), ())
}
pub async fn connect_with_did(&mut self, did: &str) -> Output<()> {
self.client
.connect_with_did(did)
.await
.map_err(|e| anyhow::anyhow!("{}", e))?;
ClientOutput::ok("Successful!".to_owned(), ())
}
pub async fn list_peers(&mut self) -> Output<()> {
let peers = self
.client
.list_peers()
.await
.map_err(|e| anyhow::anyhow!("{}", e))?;
let mut display = String::new();
display.push_str("Did, TransportId, Status\n");
display.push_str(
peers
.iter()
.map(|peer| format!("{}, {}, {}", peer.did, peer.did, peer.state))
.collect::<Vec<_>>()
.join("\n")
.as_str(),
);
ClientOutput::ok(display, ())
}
pub async fn disconnect(&mut self, did: &str) -> Output<()> {
self.client
.disconnect(did)
.await
.map_err(|e| anyhow::anyhow!("{}", e))?;
ClientOutput::ok("Done.".into(), ())
}
pub async fn send_message(&self, did: &str, text: &str) -> Output<()> {
let mut params = serde_json::Map::new();
params.insert("destination".to_owned(), json!(did));
params.insert("text".to_owned(), json!(text));
self.client
.send_message(did, text)
.await
.map_err(|e| anyhow::anyhow!("{}", e))?;
ClientOutput::ok("Done.".into(), ())
}
pub async fn send_custom_message(
&self,
did: &str,
message_type: u16,
data: &str,
) -> Output<()> {
self.client
.send_custom_message(did, message_type, data)
.await
.map_err(|e| anyhow::anyhow!("{}", e))?;
ClientOutput::ok("Done.".into(), ())
}
#[allow(clippy::too_many_arguments)]
pub async fn send_http_request_message(
&self,
did: &str,
name: &str,
method: http::Method,
url: &str,
timeout: Timeout,
headers: &[(&str, &str)],
body: Option<String>,
) -> Output<()> {
self.client
.send_http_request_message(did, name, method, url, timeout, headers, body)
.await
.map_err(|e| anyhow::anyhow!("{}", e))?;
ClientOutput::ok("Done.".into(), ())
}
pub async fn send_simple_text_message(&self, did: &str, text: &str) -> Output<()> {
self.client
.send_simple_text_message(did, text)
.await
.map_err(|e| anyhow::anyhow!("{}", e))?;
ClientOutput::ok("Done.".into(), ())
}
pub async fn register_service(&self, name: &str) -> Output<()> {
self.client
.register_service(name)
.await
.map_err(|e| anyhow::anyhow!("{}", e))?;
ClientOutput::ok("Done.".into(), ())
}
pub async fn lookup_service(&self, name: &str) -> Output<()> {
let dids = self
.client
.lookup_service(name)
.await
.map_err(|e| anyhow::anyhow!("{}", e))?;
ClientOutput::ok(dids.join("\n"), ())
}
pub async fn publish_message_to_topic(&self, topic: &str, data: &str) -> Output<()> {
self.client
.publish_message_to_topic(topic, data)
.await
.map_err(|e| anyhow::anyhow!("{}", e))?;
ClientOutput::ok("Done.".into(), ())
}
pub async fn subscribe_topic<'a, 'b>(
&'a self,
topic: String,
) -> impl Stream<Item = String> + 'b
where
'a: 'b,
{
let mut index = 0;
stream! {
loop {
let timeout = Delay::new(Duration::from_secs(5)).fuse();
pin_mut!(timeout);
select! {
_ = timeout => {
let result = self
.client
.fetch_topic_messages(topic.as_str(), index)
.await;
if let Err(e) = result {
tracing::error!("Failed to fetch messages of topic: {}, {}", topic, e);
continue;
}
let messages = result.unwrap();
for msg in messages.iter().cloned() {
yield msg
}
index += messages.len();
}
}
}
}
}
pub async fn inspect(&self) -> Output<SwarmInspect> {
let info = self
.client
.inspect()
.await
.map_err(|e| anyhow::anyhow!("{}", e))?;
let display =
serde_json::to_string_pretty(&info.swarm).map_err(|e| anyhow::anyhow!("{}", e))?;
ClientOutput::ok(display, info.swarm)
}
}
impl<T> ClientOutput<T> {
pub fn ok(display: String, result: T) -> anyhow::Result<Self> {
Ok(Self { result, display })
}
pub fn display(&self) {
println!("{}", self.display);
}
}