use std::collections::HashMap;
use std::os::unix::net::UnixStream as StdUnixStream;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use futures::channel::mpsc;
use futures::prelude::*;
use futures::{SinkExt, StreamExt};
use futures_codec::{Framed, JsonCodec};
use async_std::future::timeout;
use async_std::os::unix::net::UnixStream;
use async_std::task::{self, JoinHandle};
use humantime::Duration as HumanDuration;
use structopt::StructOpt;
use tracing::{span, Level};
use dsf_rpc::*;
use dsf_rpc::{Request as RpcRequest, Response as RpcResponse};
use dsf_core::api::*;
use crate::error::Error;
type RequestMap = Arc<Mutex<HashMap<u64, mpsc::Sender<ResponseKind>>>>;
#[derive(Clone, Debug, StructOpt)]
pub struct Options {
#[structopt(
short = "d",
long = "daemon-socket",
default_value = "/var/run/dsfd/dsf.sock",
env = "DSF_SOCK"
)]
pub daemon_socket: String,
#[structopt(long, default_value = "10s")]
pub timeout: HumanDuration,
}
impl Options {
pub fn new(address: &str, timeout: Duration) -> Self {
Self {
daemon_socket: address.to_string(),
timeout: timeout.into(),
}
}
}
#[derive(Debug)]
pub struct Client {
addr: String,
sink: mpsc::Sender<RpcRequest>,
requests: RequestMap,
timeout: Duration,
tx_handle: JoinHandle<()>,
rx_handle: JoinHandle<()>,
}
impl Client {
pub fn new(options: &Options) -> Result<Self, Error> {
let span = span!(Level::DEBUG, "client", "{}", options.daemon_socket);
let _enter = span.enter();
info!("Client connecting (address: {})", options.daemon_socket);
let stream = StdUnixStream::connect(&options.daemon_socket)?;
let stream = UnixStream::from(stream);
let codec = JsonCodec::<RpcRequest, RpcResponse>::new();
let framed = Framed::new(stream, codec);
let (mut unix_sink, mut unix_stream) = framed.split();
let (internal_sink, mut internal_stream) = mpsc::channel::<RpcRequest>(0);
let tx_handle = task::spawn(async move {
trace!("started client tx listener");
while let Some(msg) = internal_stream.next().await {
unix_sink.send(msg).await.unwrap();
}
});
let requests = Arc::new(Mutex::new(HashMap::new()));
let reqs = requests.clone();
let rx_handle = task::spawn(async move {
trace!("started client rx listener");
while let Some(Ok(resp)) = unix_stream.next().await {
Self::handle(&reqs, resp).await.unwrap();
}
});
Ok(Client {
sink: internal_sink,
addr: options.daemon_socket.to_owned(),
requests,
timeout: *options.timeout,
rx_handle,
tx_handle,
})
}
pub async fn request(&mut self, rk: RequestKind) -> Result<ResponseKind, Error> {
let span = span!(Level::DEBUG, "client", "{}", self.addr);
let _enter = span.enter();
debug!("Issuing request: {:?}", rk);
let resp = self.do_request(rk).await.map(|(v, _)| v)?;
debug!("Received response: {:?}", resp);
Ok(resp)
}
async fn do_request(
&mut self,
rk: RequestKind,
) -> Result<(ResponseKind, mpsc::Receiver<ResponseKind>), Error> {
let (tx, mut rx) = mpsc::channel(0);
let req = RpcRequest::new(rk);
let id = req.req_id();
trace!("request add lock");
self.requests.lock().unwrap().insert(id, tx);
self.sink.send(req).await.unwrap();
let res = timeout(self.timeout, rx.next()).await;
let res = match res {
Ok(Some(v)) => Ok(v),
Ok(None) => {
error!("No response received");
Err(Error::None(()))
}
Err(e) => {
error!("Response error: {:?}", e);
Err(Error::Timeout)
}
};
if let Err(_e) = &res {
trace!("request failure lock");
self.requests.lock().unwrap().remove(&id);
}
res.map(|v| (v, rx))
}
async fn handle(requests: &RequestMap, resp: RpcResponse) -> Result<(), Error> {
let id = resp.req_id();
trace!("receive request lock");
let mut a = match requests.lock().unwrap().get_mut(&id) {
Some(a) => a.clone(),
None => {
error!("Unix RX with no matching request ID");
return Err(Error::Unknown);
}
};
match a.send(resp.kind()).await {
Ok(_) => (),
Err(e) => {
error!("client send error: {:?}", e);
}
};
Ok(())
}
pub async fn status(&mut self) -> Result<StatusInfo, Error> {
let req = RequestKind::Status;
let resp = self.request(req).await?;
match resp {
ResponseKind::Status(info) => Ok(info),
_ => Err(Error::UnrecognizedResult),
}
}
pub async fn connect(
&mut self,
options: peer::ConnectOptions,
) -> Result<peer::ConnectInfo, Error> {
let req = RequestKind::Peer(peer::PeerCommands::Connect(options));
let resp = self.request(req).await?;
match resp {
ResponseKind::Connected(info) => Ok(info),
_ => Err(Error::UnrecognizedResult),
}
}
pub async fn find(&mut self, options: peer::SearchOptions) -> Result<peer::PeerInfo, Error> {
let req = RequestKind::Peer(peer::PeerCommands::Search(options));
let resp = self.request(req).await?;
match resp {
ResponseKind::Peer(info) => Ok(info.clone()),
_ => Err(Error::UnrecognizedResult),
}
}
pub async fn list(
&mut self,
options: service::ListOptions,
) -> Result<Vec<service::ServiceInfo>, Error> {
let req = RequestKind::Service(service::ServiceCommands::List(options));
let resp = self.request(req).await?;
match resp {
ResponseKind::Services(info) => Ok(info),
_ => Err(Error::UnrecognizedResult),
}
}
pub async fn info(
&mut self,
options: service::InfoOptions,
) -> Result<(ServiceHandle, ServiceInfo), Error> {
let req = RequestKind::Service(service::ServiceCommands::Info(options));
let resp = self.request(req).await?;
match resp {
ResponseKind::Service(info) => Ok((ServiceHandle::new(info.id.clone()), info)),
_ => Err(Error::UnrecognizedResult),
}
}
pub async fn create(
&mut self,
options: service::CreateOptions,
) -> Result<ServiceHandle, Error> {
let req = RequestKind::Service(service::ServiceCommands::Create(options));
let resp = self.request(req).await?;
match resp {
ResponseKind::Service(info) => Ok(ServiceHandle::new(info.id.clone())),
_ => Err(Error::UnrecognizedResult),
}
}
pub async fn register(
&mut self,
options: RegisterOptions,
) -> Result<dsf_rpc::service::RegisterInfo, Error> {
let req = RequestKind::Service(dsf_rpc::service::ServiceCommands::Register(options));
let resp = self.request(req).await?;
match resp {
ResponseKind::Registered(info) => Ok(info),
_ => Err(Error::UnrecognizedResult),
}
}
pub async fn locate(
&mut self,
options: LocateOptions,
) -> Result<(ServiceHandle, LocateInfo), Error> {
let id = options.id.clone();
let req = RequestKind::Service(dsf_rpc::service::ServiceCommands::Locate(options));
let resp = self.request(req).await?;
match resp {
ResponseKind::Located(info) => {
let handle = ServiceHandle { id: id.clone() };
Ok((handle, info))
}
_ => Err(Error::UnrecognizedResult),
}
}
pub async fn publish(&mut self, options: PublishOptions) -> Result<PublishInfo, Error> {
let req = RequestKind::Data(DataCommands::Publish(options));
let resp = self.request(req).await?;
match resp {
ResponseKind::Published(info) => Ok(info),
_ => Err(Error::UnrecognizedResult),
}
}
pub async fn subscribe(
&mut self,
options: SubscribeOptions,
) -> Result<impl Stream<Item = ResponseKind>, Error> {
let req = RequestKind::Service(ServiceCommands::Subscribe(options));
let (resp, rx) = self.do_request(req).await?;
match resp {
ResponseKind::Subscribed(_info) => Ok(rx),
_ => Err(Error::UnrecognizedResult),
}
}
pub async fn data(&mut self, options: data::ListOptions) -> Result<Vec<DataInfo>, Error> {
let req = RequestKind::Data(DataCommands::List(options));
let resp = self.request(req).await?;
match resp {
ResponseKind::Data(info) => Ok(info),
_ => Err(Error::UnrecognizedResult),
}
}
}