use std::collections::HashMap;
use std::net::SocketAddr;
use futures::{Future, Stream};
use futures::sync::mpsc;
use {Error, Request, Service};
use dispatch::{PrimitiveDispatch, StreamingDispatch};
use protocol::Flatten;
use resolve::ResolveInfo;
pub type HashRing = Vec<(u64, String)>;
enum Method {
Resolve,
Routing,
}
impl Into<u64> for Method {
#[inline]
fn into(self) -> u64 {
match self {
Method::Resolve => 0,
Method::Routing => 5,
}
}
}
#[derive(Clone, Debug)]
pub struct Locator {
service: Service,
}
impl Locator {
pub fn new(service: Service) -> Self {
Self { service }
}
pub fn resolve(&self, name: &str) -> impl Future<Item = ResolveInfo<SocketAddr>, Error = Error> {
let (dispatch, future) = PrimitiveDispatch::pair();
self.service.call(Request::new(Method::Resolve.into(), &[name]).unwrap(), dispatch);
future.map(|ResolveInfo{addrs, version, methods}| {
let addrs = addrs.into_iter()
.map(|(ip, port)| SocketAddr::new(ip, port))
.collect();
ResolveInfo {
addrs: addrs,
version: version,
methods: methods,
}
})
}
pub fn routing(&self, uuid: &str) ->
impl Stream<Item = HashMap<String, HashRing>, Error = Error>
{
let (tx, rx) = mpsc::unbounded();
let dispatch = StreamingDispatch::new(tx);
self.service.call(Request::new(Method::Routing.into(), &[uuid]).unwrap(), dispatch);
rx.map_err(|()| Error::Canceled).then(Flatten::flatten)
}
}