use std::cell::RefCell;
use std::collections::{HashMap, HashSet};
use std::convert::AsRef;
use std::io;
use std::rc::Rc;
use std::str;
use lber::structure::StructureTag;
use lber::structures::{Boolean, Enumerated, Integer, OctetString, Sequence, Tag};
use lber::common::TagClass::*;
use futures::{Async, Future, Poll, Stream};
use futures::sync::{mpsc, oneshot};
use tokio_proto::multiplex::RequestId;
use tokio_service::Service;
use controls::Control;
use filter::parse;
use ldap::{bundle, next_search_options, next_req_controls};
use ldap::{Ldap, LdapOp};
use protocol::{LdapResult, ProtoBundle};
#[derive(Clone, Copy, Debug, PartialEq)]
pub enum Scope {
Base = 0,
OneLevel = 1,
Subtree = 2,
}
#[derive(Clone, Copy, Debug, PartialEq)]
pub enum DerefAliases {
Never = 0,
Searching = 1,
Finding = 2,
Always = 3,
}
pub enum SearchItem {
Entry(StructureTag),
Referral(StructureTag),
Done(RequestId, LdapResult, Vec<Control>),
}
pub struct SearchStream {
id: RequestId,
bundle: Rc<RefCell<ProtoBundle>>,
_tx_i: mpsc::UnboundedSender<SearchItem>,
rx_i: mpsc::UnboundedReceiver<SearchItem>,
tx_r: Option<oneshot::Sender<(LdapResult, Vec<Control>)>>,
refs: Vec<HashSet<String>>,
}
impl SearchStream {
pub fn id(&self) -> RequestId {
self.id
}
fn update_maps(&mut self) {
let mut bundle = self.bundle.borrow_mut();
let msgid = match bundle.search_helpers.get(&self.id) {
Some(helper) => helper.msgid,
None => return,
};
bundle.search_helpers.remove(&self.id);
bundle.id_map.remove(&msgid);
}
}
impl Stream for SearchStream {
type Item = StructureTag;
type Error = io::Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
if self.bundle.borrow().abandoned.contains(&self.id) {
if let Some(tx_r) = self.tx_r.take() {
let cancel_res = LdapResult {
rc: 80,
matched: "".to_owned(),
text: "search abandoned".to_owned(),
refs: vec![]
};
self.update_maps();
tx_r.send((cancel_res, vec![])).map_err(|_e| io::Error::new(io::ErrorKind::Other, "send result"))?;
}
return Ok(Async::Ready(None));
}
loop {
let item = try_ready!(self.rx_i.poll().map_err(|_e| io::Error::new(io::ErrorKind::Other, "poll search stream")));
match item {
Some(SearchItem::Done(_id, mut result, controls)) => {
result.refs.extend(self.refs.drain(..));
self.update_maps();
let tx_r = self.tx_r.take().expect("oneshot tx");
tx_r.send((result, controls)).map_err(|_e| io::Error::new(io::ErrorKind::Other, "send result"))?;
return Ok(Async::Ready(None));
},
Some(SearchItem::Entry(tag)) => return Ok(Async::Ready(Some(tag))),
Some(SearchItem::Referral(tag)) => {
self.refs.push(tag.expect_constructed().expect("referrals").into_iter()
.map(|t| t.expect_primitive().expect("octet string"))
.map(String::from_utf8)
.map(|s| s.expect("uri"))
.collect());
continue;
},
None => return Ok(Async::Ready(None)),
}
}
}
}
#[derive(Debug)]
pub struct SearchEntry {
pub dn: String,
pub attrs: HashMap<String, Vec<String>>,
pub bin_attrs: HashMap<String, Vec<Vec<u8>>>,
}
impl SearchEntry {
pub fn construct(tag: StructureTag) -> SearchEntry {
let mut tags = tag.match_id(4).and_then(|t| t.expect_constructed()).expect("entry").into_iter();
let dn = String::from_utf8(tags.next().expect("element").expect_primitive().expect("octet string"))
.expect("dn");
let mut attr_vals = HashMap::new();
let mut bin_attr_vals = HashMap::new();
let attrs = tags.next().expect("element").expect_constructed().expect("attrs").into_iter();
for a_v in attrs {
let mut part_attr = a_v.expect_constructed().expect("partial attribute").into_iter();
let a_type = String::from_utf8(part_attr.next().expect("element").expect_primitive().expect("octet string"))
.expect("attribute type");
let mut any_binary = false;
let values = part_attr.next().expect("element").expect_constructed().expect("values").into_iter()
.map(|t| t.expect_primitive().expect("octet string"))
.filter_map(|s| {
if let Ok(s) = str::from_utf8(s.as_ref()) {
return Some(s.to_owned());
}
bin_attr_vals.entry(a_type.clone()).or_insert_with(|| vec![]).push(s);
any_binary = true;
None
})
.collect::<Vec<String>>();
if any_binary {
bin_attr_vals.get_mut(&a_type).expect("bin vector").extend(
values.into_iter().map(String::into_bytes).collect::<Vec<Vec<u8>>>()
);
} else {
attr_vals.insert(a_type, values);
}
}
SearchEntry {
dn: dn,
attrs: attr_vals,
bin_attrs: bin_attr_vals,
}
}
}
pub struct SearchOptions {
deref: DerefAliases,
typesonly: bool,
timelimit: i32,
sizelimit: i32,
}
impl SearchOptions {
#[cfg_attr(feature="cargo-clippy", allow(new_without_default))]
pub fn new() -> Self {
SearchOptions {
deref: DerefAliases::Never,
typesonly: false,
timelimit: 0,
sizelimit: 0,
}
}
pub fn deref(mut self, d: DerefAliases) -> Self {
self.deref = d;
self
}
pub fn typesonly(mut self, typesonly: bool) -> Self {
self.typesonly = typesonly;
self
}
pub fn timelimit(mut self, timelimit: i32) -> Self {
self.timelimit = timelimit;
self
}
pub fn sizelimit(mut self, sizelimit: i32) -> Self {
self.sizelimit = sizelimit;
self
}
}
impl Ldap {
#[cfg_attr(feature="cargo-clippy", allow(type_complexity))]
pub fn search<S: AsRef<str>>(&self, base: &str, scope: Scope, filter: &str, attrs: Vec<S>) ->
Box<Future<Item=(SearchStream, oneshot::Receiver<(LdapResult, Vec<Control>)>), Error=io::Error>> {
let opts = match next_search_options(self) {
Some(opts) => opts,
None => SearchOptions::new(),
};
let req = Tag::Sequence(Sequence {
id: 3,
class: Application,
inner: vec![
Tag::OctetString(OctetString {
inner: Vec::from(base.as_bytes()),
.. Default::default()
}),
Tag::Enumerated(Enumerated {
inner: scope as i64,
.. Default::default()
}),
Tag::Enumerated(Enumerated {
inner: opts.deref as i64,
.. Default::default()
}),
Tag::Integer(Integer {
inner: opts.sizelimit as i64,
.. Default::default()
}),
Tag::Integer(Integer {
inner: opts.timelimit as i64,
.. Default::default()
}),
Tag::Boolean(Boolean {
inner: opts.typesonly,
.. Default::default()
}),
parse(filter).unwrap(),
Tag::Sequence(Sequence {
inner: attrs.into_iter().map(|s|
Tag::OctetString(OctetString { inner: Vec::from(s.as_ref()), ..Default::default() })).collect(),
.. Default::default()
})
],
});
let (tx_i, rx_i) = mpsc::unbounded::<SearchItem>();
let (tx_r, rx_r) = oneshot::channel::<(LdapResult, Vec<Control>)>();
let bundle = bundle(self);
let fut = self.call(LdapOp::Multi(req, tx_i.clone(), next_req_controls(self))).and_then(move |res| {
let id = match res {
(Tag::Integer(Integer { inner, .. }), _) => inner,
_ => unimplemented!(),
};
Ok((SearchStream {
id: id as RequestId,
bundle: bundle,
_tx_i: tx_i,
rx_i: rx_i,
tx_r: Some(tx_r),
refs: Vec::new(),
}, rx_r))
});
Box::new(fut)
}
}