use std::collections::HashMap;
use slim_datapath::{api::ProtoMessage as Message, api::ProtoName};
pub struct ProducerBuffer {
capacity: usize,
next: usize,
buffer: Vec<Option<Message>>,
map: HashMap<usize, usize>,
destination_name: ProtoName,
destination_id: Option<u64>,
}
impl ProducerBuffer {
pub fn with_capacity(capacity: usize) -> Self {
ProducerBuffer {
capacity,
next: 0,
buffer: vec![None; capacity],
map: HashMap::new(),
destination_name: ProtoName::from_strings(["unknown", "unknown", "unknown"]),
destination_id: None,
}
}
pub fn get_capacity(&self) -> usize {
self.capacity
}
pub fn get_destination_name(&self) -> &ProtoName {
&self.destination_name
}
pub fn get_destination_id(&self) -> Option<u64> {
self.destination_id
}
pub fn push(&mut self, msg: Message) -> bool {
if self.map.is_empty() {
self.destination_name = msg.get_dst();
}
let id = msg.get_id() as usize;
if self.map.contains_key(&id) {
return true;
}
if let Some(message) = &self.buffer[self.next] {
let to_remove = message.get_id() as usize;
self.map.remove(&to_remove);
}
self.buffer[self.next] = Some(msg);
self.map.insert(id, self.next);
self.next = (self.next + 1) % self.capacity;
true
}
pub fn clear(&mut self) {
self.buffer = vec![None; self.capacity];
self.next = 0;
self.map.clear();
}
pub fn get(&self, id: usize) -> Option<Message> {
match self.map.get(&id) {
None => None,
Some(index) => self.buffer[*index].clone(),
}
}
pub fn iter(&self) -> impl Iterator<Item = &Message> {
self.buffer.iter().filter_map(|msg| {
msg.as_ref()
})
}
}
#[cfg(test)]
mod tests {
use super::*;
use slim_datapath::api::ProtoName as Name;
use slim_datapath::api::{
ProtoSessionMessageType, ProtoSessionType, SessionHeader, SlimHeader,
};
#[test]
fn test_producer_buffer() {
let mut buffer = ProducerBuffer::with_capacity(3);
assert_eq!(buffer.get_capacity(), 3);
let src = Name::from_strings(["org", "ns", "type"]).with_id(0);
let src_id = src.to_string();
let name_type = Name::from_strings(["org", "ns", "type"]).with_id(1);
let slim_header = SlimHeader::new(src, name_type, &src_id, None);
let h0 = SessionHeader::new(
ProtoSessionType::PointToPoint.into(),
ProtoSessionMessageType::Msg.into(),
0,
0,
);
let h1 = SessionHeader::new(
ProtoSessionType::PointToPoint.into(),
ProtoSessionMessageType::Msg.into(),
0,
1,
);
let h2 = SessionHeader::new(
ProtoSessionType::PointToPoint.into(),
ProtoSessionMessageType::Msg.into(),
0,
2,
);
let h3 = SessionHeader::new(
ProtoSessionType::PointToPoint.into(),
ProtoSessionMessageType::Msg.into(),
0,
3,
);
let h4 = SessionHeader::new(
ProtoSessionType::PointToPoint.into(),
ProtoSessionMessageType::Msg.into(),
0,
4,
);
let p0 = Message::builder()
.with_slim_header(slim_header.clone())
.with_session_header(h0)
.application_payload("", vec![])
.build_publish()
.unwrap();
let p1 = Message::builder()
.with_slim_header(slim_header.clone())
.with_session_header(h1)
.application_payload("", vec![])
.build_publish()
.unwrap();
let p2 = Message::builder()
.with_slim_header(slim_header.clone())
.with_session_header(h2)
.application_payload("", vec![])
.build_publish()
.unwrap();
let p3 = Message::builder()
.with_slim_header(slim_header.clone())
.with_session_header(h3)
.application_payload("", vec![])
.build_publish()
.unwrap();
let p4 = Message::builder()
.with_slim_header(slim_header.clone())
.with_session_header(h4)
.application_payload("", vec![])
.build_publish()
.unwrap();
assert!(buffer.push(p0.clone()));
assert_eq!(buffer.get(0).unwrap(), p0);
assert_eq!(buffer.get(0).unwrap(), p0);
assert_eq!(buffer.get(0).unwrap(), p0);
assert_eq!(buffer.get(1), None);
assert!(buffer.push(p0.clone()));
assert!(buffer.push(p1.clone()));
assert!(buffer.push(p2.clone()));
assert_eq!(buffer.get(0).unwrap(), p0);
assert_eq!(buffer.get(1).unwrap(), p1);
assert_eq!(buffer.get(2).unwrap(), p2);
assert_eq!(buffer.get(3), None);
assert!(buffer.push(p3.clone()));
assert_eq!(buffer.get(0), None);
assert_eq!(buffer.get(1).unwrap(), p1);
assert_eq!(buffer.get(2).unwrap(), p2);
assert_eq!(buffer.get(3).unwrap(), p3);
assert_eq!(buffer.get(4), None);
assert!(buffer.push(p4.clone()));
assert_eq!(buffer.get(0), None);
assert_eq!(buffer.get(1), None);
assert_eq!(buffer.get(2).unwrap(), p2);
assert_eq!(buffer.get(3).unwrap(), p3);
assert_eq!(buffer.get(4).unwrap(), p4);
buffer.clear();
assert_eq!(buffer.get(0), None);
assert_eq!(buffer.get(1), None);
assert_eq!(buffer.get(2), None);
assert_eq!(buffer.get(3), None);
assert_eq!(buffer.get(4), None);
assert!(buffer.push(p0.clone()));
assert!(buffer.push(p1.clone()));
assert!(buffer.push(p2.clone()));
assert!(buffer.push(p3.clone()));
assert!(buffer.push(p4.clone()));
assert_eq!(buffer.get(0), None);
assert_eq!(buffer.get(1), None);
assert_eq!(buffer.get(2).unwrap(), p2);
assert_eq!(buffer.get(3).unwrap(), p3);
assert_eq!(buffer.get(4).unwrap(), p4);
}
#[test]
fn test_iter_producer_buffer() {
let src = Name::from_strings(["org", "ns", "type"]).with_id(0);
let src_id = src.to_string();
let name_type = Name::from_strings(["org", "ns", "type"]).with_id(1);
let slim_header = SlimHeader::new(src, name_type, &src_id, None);
let h = SessionHeader::new(
ProtoSessionType::PointToPoint.into(),
ProtoSessionMessageType::Msg.into(),
0,
0,
);
let mut p = Message::builder()
.with_slim_header(slim_header)
.with_session_header(h)
.application_payload("", vec![])
.build_publish()
.unwrap();
let mut b = ProducerBuffer::with_capacity(30);
b.push(p.clone()); p.set_message_id(1);
b.push(p.clone()); p.set_message_id(2);
b.push(p.clone()); p.set_message_id(5);
b.push(p.clone()); p.set_message_id(6);
b.push(p.clone()); p.set_message_id(10);
b.push(p.clone());
let expected = [0, 1, 2, 5, 6, 10];
for (i, m) in b.iter().enumerate() {
assert_eq!(m.get_id(), expected[i]);
}
}
}