use futures::{Future, Stream};
use futures::sync::mpsc;
use rmpv::{self, Value};
use serde::{Deserialize, Serialize};
use {Error, Request, Sender, Service};
use dispatch::{PrimitiveDispatch, StreamingDispatch};
use hpack::RawHeader;
use protocol::Flatten;
pub type Version = i64;
#[derive(Debug)]
pub struct Close {
sender: Sender,
}
impl Close {
pub fn close(self) {}
}
impl Drop for Close {
fn drop(&mut self) {
self.sender.send(Request::new(0, &[0; 0]).unwrap());
}
}
enum Method {
Subscribe,
ChildrenSubscribe,
Put,
Get,
Create,
Del,
}
impl Into<u64> for Method {
#[inline]
fn into(self) -> u64 {
match self {
Method::Subscribe => 0,
Method::ChildrenSubscribe => 1,
Method::Put => 2,
Method::Get => 3,
Method::Create => 4,
Method::Del => 5,
}
}
}
#[derive(Clone, Debug)]
pub struct Unicorn {
service: Service,
}
impl Unicorn {
pub fn new(service: Service) -> Self {
Self { service }
}
pub fn create<T, H>(&self, path: &str, value: &T, headers: H) ->
impl Future<Item=bool, Error=Error>
where
T: Serialize,
H: IntoIterator<Item=RawHeader>
{
let (dispatch, future) = PrimitiveDispatch::pair();
let request = Request::new(Method::Create.into(), &(path, value))
.unwrap()
.add_headers(headers);
self.service.call(request, dispatch);
future.and_then(|val: Value| {
match rmpv::ext::deserialize_from(val) {
Ok(val) => Ok(val),
Err(err) => Err(Error::InvalidDataFraming(err.to_string())),
}
})
}
pub fn put<T, H>(&self, path: &str, value: &T, headers: H) ->
impl Future<Item=(bool, Version), Error=Error>
where
T: Serialize,
H: IntoIterator<Item=RawHeader>
{
let (dispatch, future) = PrimitiveDispatch::pair();
let request = Request::new(Method::Put.into(), &(path, value))
.unwrap()
.add_headers(headers);
self.service.call(request, dispatch);
future.and_then(|(val, version): (Value, Version)| {
match rmpv::ext::deserialize_from(val) {
Ok(val) => Ok((val, version)),
Err(err) => Err(Error::InvalidDataFraming(err.to_string())),
}
})
}
pub fn del<H>(&self, path: &str, version: &Version, headers: H) ->
impl Future<Item=bool, Error=Error>
where
H: IntoIterator<Item=RawHeader>
{
let (dispatch, future) = PrimitiveDispatch::pair();
let request = Request::new(Method::Del.into(), &(path, version))
.unwrap()
.add_headers(headers);
self.service.call(request, dispatch);
future.and_then(|val: Value| {
match rmpv::ext::deserialize_from(val) {
Ok(val) => Ok(val),
Err(err) => Err(Error::InvalidDataFraming(err.to_string())),
}
})
}
pub fn get<T, H>(&self, path: &str, headers: H) ->
impl Future<Item=(Option<T>, Version), Error=Error>
where
T: for<'de> Deserialize<'de>,
H: Into<Option<Vec<RawHeader>>>
{
let (dispatch, future) = PrimitiveDispatch::pair();
let headers = headers.into().unwrap_or_default();
let request = Request::new(Method::Get.into(), &[path])
.unwrap()
.add_headers(headers);
self.service.call(request, dispatch);
future.and_then(|(val, version): (Value, Version)| {
match rmpv::ext::deserialize_from(val) {
Ok(val) => Ok((val, version)),
Err(err) => Err(Error::InvalidDataFraming(err.to_string())),
}
})
}
pub fn subscribe<'a, T, H>(&self, path: &str, headers: H) ->
impl Future<Item=(Close, Box<Stream<Item=(Option<T>, Version), Error=Error> + Send + 'a>), Error=Error>
where
T: for<'de> Deserialize<'de> + Send + 'static,
H: Into<Option<Vec<RawHeader>>>
{
let (tx, rx) = mpsc::unbounded();
let dispatch = StreamingDispatch::new(tx);
let headers = headers.into().unwrap_or_default();
let request = Request::new(Method::Subscribe.into(), &[path]).unwrap()
.add_headers(headers);
self.service.call(request, dispatch).and_then(|sender| {
let handle = Close { sender };
let stream = rx.map_err(|()| Error::Canceled)
.then(Flatten::flatten)
.and_then(|(val, version): (Value, Version)| {
match rmpv::ext::deserialize_from(val) {
Ok(val) => Ok((val, version)),
Err(err) => Err(Error::InvalidDataFraming(err.to_string())),
}
});
let stream = Box::new(stream) as Box<Stream<Item=(Option<T>, Version), Error=Error> + Send>;
Ok((handle, stream))
})
}
pub fn children_subscribe<H>(&self, path: &str, headers: H) ->
impl Future<Item=(Close, Box<Stream<Item=(Version, Vec<String>), Error=Error> + Send>), Error=Error>
where
H: Into<Option<Vec<RawHeader>>>
{
let (tx, rx) = mpsc::unbounded();
let dispatch = StreamingDispatch::new(tx);
let headers = headers.into().unwrap_or_default();
let request = Request::new(Method::ChildrenSubscribe.into(), &[path])
.unwrap()
.add_headers(headers);
self.service.call(request, dispatch).and_then(|sender| {
let handle = Close { sender };
let stream = rx.map_err(|()| Error::Canceled)
.then(Flatten::flatten);
let stream = Box::new(stream) as Box<Stream<Item=(Version, Vec<String>), Error=Error> + Send>;
Ok((handle, stream))
})
}
}