use lru_time_cache::LruCache;
use std::collections::BTreeSet;
use messages::RoutingMessage;
use NameType;
type Set<K> = BTreeSet<K>;
pub type Bytes = Vec<u8>;
pub struct MessageAccumulator {
requests: LruCache<RoutingMessage, Set<NameType>>,
}
impl MessageAccumulator {
pub fn with_expiry_duration(duration: ::time::Duration) -> MessageAccumulator {
MessageAccumulator { requests: LruCache::with_expiry_duration(duration) }
}
pub fn add_message(&mut self,
threshold: usize,
claimant: NameType,
message: RoutingMessage)
-> Option<RoutingMessage> {
{
if threshold <= 1 {
return Some(message);
}
let claimants = self.requests.entry(message.clone())
.or_insert_with(||Set::new());
claimants.insert(claimant);
if claimants.len() < threshold {
return None;
}
debug!("Returning message, {:?}, from accumulator", message);
Some(message)
}.map(|message| {
let _ = self.requests.remove(&message);
message
})
}
}
#[cfg(test)]
mod test {
#[test]
fn add_with_fixed_threshold() {
use ::test_utils::random_trait::Random;
let threshold = 3usize;
let id = ::id::Id::new();
let routing_message = ::test_utils::messages_util::arbitrary_routing_message(
&id.signing_public_key(), &id.signing_private_key());
let mut accumulator = ::message_accumulator::MessageAccumulator::with_expiry_duration(
::time::Duration::minutes(10));
for _ in 0..threshold - 1 {
assert!(accumulator.add_message(threshold.clone(), ::NameType::generate_random(),
routing_message.clone()).is_none());
}
assert_eq!(accumulator.add_message(threshold.clone(), ::NameType::generate_random(),
routing_message.clone()), Some(routing_message.clone()));
for _ in 0..threshold - 1 {
assert!(accumulator.add_message(threshold.clone(), ::NameType::generate_random(),
routing_message.clone()).is_none());
}
assert_eq!(accumulator.add_message(threshold.clone(), ::NameType::generate_random(),
routing_message.clone()), Some(routing_message));
}
#[test]
fn add_repeat_claimants() {
use ::test_utils::random_trait::Random;
let threshold = 3usize;
let id = ::id::Id::new();
let routing_message = ::test_utils::messages_util::arbitrary_routing_message(
&id.signing_public_key(), &id.signing_private_key());
let mut accumulator = ::message_accumulator::MessageAccumulator::with_expiry_duration(
::time::Duration::minutes(10));
for _ in 0..threshold - 1 {
let claimant = ::NameType::generate_random();
assert!(accumulator.add_message(threshold.clone(), claimant.clone(),
routing_message.clone()).is_none());
assert!(accumulator.add_message(threshold.clone(), claimant.clone(),
routing_message.clone()).is_none());
}
let claimant = ::NameType::generate_random();
assert_eq!(accumulator.add_message(threshold.clone(), claimant.clone(),
routing_message.clone()), Some(routing_message.clone()));
assert!(accumulator.add_message(threshold.clone(), claimant.clone(),
routing_message.clone()).is_none());
}
#[test]
fn add_multiple_messages() {
use ::test_utils::random_trait::Random;
let threshold = 3usize;
let id = ::id::Id::new();
let routing_message1 = ::test_utils::messages_util::arbitrary_routing_message(
&id.signing_public_key(), &id.signing_private_key());
let routing_message2 = ::test_utils::messages_util::arbitrary_routing_message(
&id.signing_public_key(), &id.signing_private_key());
let mut accumulator = ::message_accumulator::MessageAccumulator::with_expiry_duration(
::time::Duration::minutes(10));
for _ in 0..threshold - 1 {
let claimant = ::NameType::generate_random();
assert!(accumulator.add_message(threshold.clone(), claimant.clone(),
routing_message1.clone()).is_none());
assert!(accumulator.add_message(threshold.clone(), claimant.clone(),
routing_message2.clone()).is_none());
}
let claimant = ::NameType::generate_random();
assert_eq!(accumulator.add_message(threshold.clone(), claimant.clone(),
routing_message1.clone()), Some(routing_message1.clone()));
assert!(accumulator.add_message(threshold.clone() + 1, claimant.clone(),
routing_message2.clone()).is_none());
assert_eq!(accumulator.add_message(threshold.clone(), ::NameType::generate_random(),
routing_message2.clone()), Some(routing_message2.clone()));
}
}