use std::cell::RefCell;
use std::collections::{HashMap, HashSet};
use std::convert::AsRef;
use std::io;
use std::rc::Rc;
use std::str;
use std::time::Duration;
use std::u64;
use lber::structure::StructureTag;
use lber::structures::{Boolean, Enumerated, Integer, OctetString, Sequence, Tag};
use lber::common::TagClass::*;
use futures::{Async, Future, IntoFuture, Poll, Stream};
use futures::sync::{mpsc, oneshot};
use tokio_core::reactor::Timeout;
use tokio_proto::multiplex::RequestId;
use tokio_service::Service;
use controls::Control;
use filter::parse;
use ldap::{bundle, next_search_options, next_req_controls, next_timeout};
use ldap::{Ldap, LdapOp, LdapResponse};
use protocol::ProtoBundle;
use result::{LdapResult, SearchResult};
#[derive(Clone, Copy, Debug, PartialEq)]
pub enum Scope {
Base = 0,
OneLevel = 1,
Subtree = 2,
}
#[derive(Clone, Copy, Debug, PartialEq)]
pub enum DerefAliases {
Never = 0,
Searching = 1,
Finding = 2,
Always = 3,
}
pub enum SearchItem {
Entry(StructureTag),
Referral(StructureTag),
Done(RequestId, LdapResult, Vec<Control>),
}
#[derive(PartialEq, Eq)]
enum AbandonState {
Idle,
AwaitingCmd,
AwaitingOp,
}
#[derive(Debug,Clone)]
pub struct ResultEntry(StructureTag);
impl ResultEntry {
#[doc(hidden)]
pub fn new(st: StructureTag) -> ResultEntry {
ResultEntry(st)
}
}
pub struct SearchStream {
id: RequestId,
initial_timeout: bool,
ldap: Ldap,
bundle: Rc<RefCell<ProtoBundle>>,
_tx_i: mpsc::UnboundedSender<SearchItem>,
rx_i: mpsc::UnboundedReceiver<SearchItem>,
tx_r: Option<oneshot::Sender<LdapResult>>,
rx_r: Option<oneshot::Receiver<LdapResult>>,
refs: Vec<HashSet<String>>,
timeout: Option<Duration>,
entry_timeout: Option<Timeout>,
abandon_state: AbandonState,
rx_a: Option<mpsc::UnboundedReceiver<()>>,
}
impl SearchStream {
pub fn get_abandon_channel(&mut self) -> io::Result<mpsc::UnboundedSender<()>> {
if self.abandon_state != AbandonState::Idle {
return Err(io::Error::new(io::ErrorKind::Other, "bad abandon state"));
}
let (tx_a, rx_a) = mpsc::unbounded::<()>();
self.rx_a = Some(rx_a);
self.abandon_state = AbandonState::AwaitingCmd;
Ok(tx_a)
}
pub fn get_result_rx(&mut self) -> io::Result<oneshot::Receiver<LdapResult>> {
self.rx_r.take().ok_or_else(|| io::Error::new(io::ErrorKind::Other, "channel already retrieved"))
}
fn update_maps(&mut self, cause: EndCause) {
let mut bundle = self.bundle.borrow_mut();
if let Some(helper) = bundle.search_helpers.remove(&self.id) {
if cause == EndCause::InitialTimeout {
bundle.solo_ops.push_back(helper.msgid);
} else {
bundle.id_map.remove(&helper.msgid);
}
}
}
}
#[derive(PartialEq, Eq)]
enum EndCause {
Regular,
InitialTimeout,
SubsequentTimeout,
Abandoned,
}
impl Stream for SearchStream {
type Item = ResultEntry;
type Error = io::Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
loop {
if self.initial_timeout {
self.update_maps(EndCause::InitialTimeout);
return Err(io::Error::new(io::ErrorKind::Other, "timeout"));
}
let (abandon_req, abandon_done) = if let Some(ref mut rx_a) = self.rx_a {
match rx_a.poll() {
Ok(Async::Ready(_)) => match self.abandon_state {
AbandonState::AwaitingCmd => (true, false),
AbandonState::AwaitingOp => (false, true),
_ => panic!("invalid abandon_state"),
},
Ok(Async::NotReady) => match self.abandon_state {
AbandonState::AwaitingCmd => (false, false),
AbandonState::AwaitingOp => return Ok(Async::NotReady),
_ => panic!("invalid abandon_state"),
},
Err(_e) =>
match self.abandon_state {
AbandonState::AwaitingCmd => return Err(io::Error::new(io::ErrorKind::Other, "poll abandon channel")),
AbandonState::AwaitingOp => (false, true),
_ => panic!("invalid abandon_state"),
}
}
} else {
(false, false)
};
if abandon_done {
if let Some(tx_r) = self.tx_r.take() {
let result = LdapResult {
rc: 88,
matched: "".to_owned(),
text: "search abandoned".to_owned(),
refs: vec![],
ctrls: vec![],
};
self.update_maps(EndCause::Abandoned);
tx_r.send(result).map_err(|_e| io::Error::new(io::ErrorKind::Other, "send result"))?;
}
return Ok(Async::Ready(None))
}
if abandon_req {
let (tx_a, rx_a) = mpsc::unbounded::<()>();
self.abandon_state = AbandonState::AwaitingOp;
self.rx_a = Some(rx_a);
let handle = self.bundle.borrow().handle.clone();
let ldap = self.ldap.clone();
let msgid = match self.bundle.borrow().search_helpers.get(&self.id) {
Some(helper) => helper.msgid,
None => return Err(io::Error::new(io::ErrorKind::Other, format!("helper not found for: {}", self.id))),
};
let abandon = if let Some(ref timeout) = self.timeout {
ldap.with_timeout(*timeout).abandon(msgid)
} else {
ldap.abandon(msgid)
};
handle.spawn(abandon.then(move |_r| {
tx_a.send(()).map_err(|_e| ())
}));
continue;
}
if let Some(ref timeout) = self.timeout {
if self.entry_timeout.is_none() {
self.entry_timeout = Some(Timeout::new(*timeout, &self.bundle.borrow().handle)?);
}
}
let timeout_fired = if let Some(ref mut timeout) = self.entry_timeout {
match timeout.poll() {
Ok(Async::Ready(_)) => true,
Ok(Async::NotReady) => false,
Err(e) => return Err(e),
}
} else {
false
};
if timeout_fired {
self.update_maps(EndCause::SubsequentTimeout);
return Err(io::Error::new(io::ErrorKind::Other, "timeout"));
}
let item = try_ready!(self.rx_i.poll().map_err(|_e| io::Error::new(io::ErrorKind::Other, "poll search stream")));
match item {
Some(SearchItem::Done(_id, mut result, controls)) => {
result.refs.extend(self.refs.drain(..));
self.update_maps(EndCause::Regular);
result.ctrls = controls;
let tx_r = self.tx_r.take().expect("oneshot tx");
tx_r.send(result).map_err(|_e| io::Error::new(io::ErrorKind::Other, "send result"))?;
return Ok(Async::Ready(None));
},
Some(SearchItem::Entry(tag)) => {
self.entry_timeout.take();
return Ok(Async::Ready(Some(ResultEntry(tag))))
},
Some(SearchItem::Referral(tag)) => {
self.refs.push(tag.expect_constructed().expect("referrals").into_iter()
.map(|t| t.expect_primitive().expect("octet string"))
.map(String::from_utf8)
.map(|s| s.expect("uri"))
.collect());
self.entry_timeout.take();
continue;
},
None => return Ok(Async::Ready(None)),
}
}
}
}
#[derive(Debug,Clone)]
pub struct SearchEntry {
pub dn: String,
pub attrs: HashMap<String, Vec<String>>,
pub bin_attrs: HashMap<String, Vec<Vec<u8>>>,
}
impl SearchEntry {
pub fn construct(re: ResultEntry) -> SearchEntry {
let mut tags = re.0.match_id(4).and_then(|t| t.expect_constructed()).expect("entry").into_iter();
let dn = String::from_utf8(tags.next().expect("element").expect_primitive().expect("octet string"))
.expect("dn");
let mut attr_vals = HashMap::new();
let mut bin_attr_vals = HashMap::new();
let attrs = tags.next().expect("element").expect_constructed().expect("attrs").into_iter();
for a_v in attrs {
let mut part_attr = a_v.expect_constructed().expect("partial attribute").into_iter();
let a_type = String::from_utf8(part_attr.next().expect("element").expect_primitive().expect("octet string"))
.expect("attribute type");
let mut any_binary = false;
let values = part_attr.next().expect("element").expect_constructed().expect("values").into_iter()
.map(|t| t.expect_primitive().expect("octet string"))
.filter_map(|s| {
if let Ok(s) = str::from_utf8(s.as_ref()) {
return Some(s.to_owned());
}
bin_attr_vals.entry(a_type.clone()).or_insert_with(|| vec![]).push(s);
any_binary = true;
None
})
.collect::<Vec<String>>();
if any_binary {
bin_attr_vals.get_mut(&a_type).expect("bin vector").extend(
values.into_iter().map(String::into_bytes).collect::<Vec<Vec<u8>>>()
);
} else {
attr_vals.insert(a_type, values);
}
}
SearchEntry {
dn: dn,
attrs: attr_vals,
bin_attrs: bin_attr_vals,
}
}
}
pub struct SearchOptions {
deref: DerefAliases,
typesonly: bool,
timelimit: i32,
sizelimit: i32,
}
impl SearchOptions {
#[cfg_attr(feature="cargo-clippy", allow(new_without_default))]
pub fn new() -> Self {
SearchOptions {
deref: DerefAliases::Never,
typesonly: false,
timelimit: 0,
sizelimit: 0,
}
}
pub fn deref(mut self, d: DerefAliases) -> Self {
self.deref = d;
self
}
pub fn typesonly(mut self, typesonly: bool) -> Self {
self.typesonly = typesonly;
self
}
pub fn timelimit(mut self, timelimit: i32) -> Self {
self.timelimit = timelimit;
self
}
pub fn sizelimit(mut self, sizelimit: i32) -> Self {
self.sizelimit = sizelimit;
self
}
}
impl Ldap {
pub fn search<S: AsRef<str>>(&self, base: &str, scope: Scope, filter: &str, attrs: Vec<S>) ->
Box<Future<Item=SearchResult, Error=io::Error>>
{
let srch = self
.streaming_search(base, scope, filter, attrs)
.and_then(|mut strm| strm
.get_result_rx()
.into_future()
.and_then(|rx_r| rx_r
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))
.join(strm.collect())
)
.map(|(result, result_set)| SearchResult(result_set, result))
);
Box::new(srch)
}
pub fn streaming_search<S: AsRef<str>>(&self, base: &str, scope: Scope, filter: &str, attrs: Vec<S>) ->
Box<Future<Item=SearchStream, Error=io::Error>>
{
let opts = match next_search_options(self) {
Some(opts) => opts,
None => SearchOptions::new(),
};
let req = Tag::Sequence(Sequence {
id: 3,
class: Application,
inner: vec![
Tag::OctetString(OctetString {
inner: Vec::from(base.as_bytes()),
.. Default::default()
}),
Tag::Enumerated(Enumerated {
inner: scope as i64,
.. Default::default()
}),
Tag::Enumerated(Enumerated {
inner: opts.deref as i64,
.. Default::default()
}),
Tag::Integer(Integer {
inner: opts.sizelimit as i64,
.. Default::default()
}),
Tag::Integer(Integer {
inner: opts.timelimit as i64,
.. Default::default()
}),
Tag::Boolean(Boolean {
inner: opts.typesonly,
.. Default::default()
}),
parse(filter).unwrap(),
Tag::Sequence(Sequence {
inner: attrs.into_iter().map(|s|
Tag::OctetString(OctetString { inner: Vec::from(s.as_ref()), ..Default::default() })).collect(),
.. Default::default()
})
],
});
let (tx_i, rx_i) = mpsc::unbounded::<SearchItem>();
let (tx_r, rx_r) = oneshot::channel::<LdapResult>();
let bundle = bundle(self);
let timeout = next_timeout(self);
if let Some(ref timeout) = timeout {
self.with_timeout(*timeout);
}
let ldap = self.clone();
let fut = self.call(LdapOp::Multi(req, tx_i.clone(), next_req_controls(self))).and_then(move |res| {
let (id, initial_timeout) = match res {
LdapResponse(Tag::Integer(Integer { inner, .. }), _) => (inner as u64, false),
LdapResponse(Tag::Enumerated(Enumerated { inner, .. }), _) => (inner as u64, true),
_ => unimplemented!(),
};
Ok(SearchStream {
id: id,
initial_timeout: initial_timeout,
ldap: ldap,
bundle: bundle,
_tx_i: tx_i,
rx_i: rx_i,
tx_r: Some(tx_r),
rx_r: Some(rx_r),
refs: Vec::new(),
timeout: timeout,
entry_timeout: None,
abandon_state: AbandonState::Idle,
rx_a: None,
})
});
Box::new(fut)
}
}