1use std::cmp::Ordering;
2use std::collections::{BinaryHeap, HashMap};
3use std::fmt;
4use std::time::Duration;
5
6use message::IhaveMessage;
7use time::{Clock, NodeTime};
8use System;
9
10pub struct MissingMessages<T: System> {
11 timeout_queue: BinaryHeap<QueueItem<T>>,
12 ihaves: HashMap<T::MessageId, IhaveEntry<T::NodeId>>,
13 entry_seqno: u64,
14}
15impl<T: System> fmt::Debug for MissingMessages<T>
16where
17 T::NodeId: fmt::Debug,
18 T::MessageId: fmt::Debug,
19{
20 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
21 write!(
22 f,
23 "MissingMessages {{ timeout_queue: {:?}, ihaves: {:?}, entry_seqno: {:?} }}",
24 self.timeout_queue, self.ihaves, self.entry_seqno
25 )
26 }
27}
28impl<T: System> MissingMessages<T> {
29 pub fn new() -> Self {
30 MissingMessages {
31 timeout_queue: BinaryHeap::new(),
32 ihaves: HashMap::new(),
33 entry_seqno: 0,
34 }
35 }
36
37 pub fn push(&mut self, ihave: IhaveMessage<T>, clock: &Clock, timeout: Duration) {
38 let seqno = self.entry_seqno;
39 let entry = self.ihaves
40 .entry(ihave.message_id.clone())
41 .or_insert_with(|| {
42 let mut expiry_time = clock.now();
43 if !ihave.realtime {
44 expiry_time += timeout;
45 }
46 IhaveEntry {
47 seqno,
48 head_round: ihave.round,
49 head_owner: ihave.sender.clone(),
50 owners: 0,
51 next_expiry_time: expiry_time,
52 }
53 });
54
55 let expiry_time = entry.next_expiry_time;
56 entry.next_expiry_time += timeout;
57 entry.owners += 1;
58 if entry.owners == 1 {
59 self.entry_seqno += 1;
60 }
61
62 self.timeout_queue.push(QueueItem::Message {
63 expiry_time,
64 ihave,
65 entry_seqno: entry.seqno,
66 });
67 }
68
69 pub fn pop_expired(&mut self, clock: &Clock) -> Option<IhaveMessage<T>> {
70 let is_expired = |x: &QueueItem<_>| x.expiry_time() <= clock.now();
71 while self.timeout_queue.peek().map_or(false, is_expired) {
72 let item = self.timeout_queue.pop().expect("never fails");
73 match self.ihaves.get(item.message_id()) {
74 None => {
75 continue;
77 }
78 Some(entry) if entry.seqno != item.entry_seqno() => {
79 continue;
81 }
82 _ => {}
83 }
84
85 match item {
86 QueueItem::Message { ihave, .. } => {
87 let entry = self.ihaves.get_mut(&ihave.message_id).expect("never fails");
88 assert_ne!(entry.owners, 0);
89
90 entry.owners -= 1;
91 entry.head_round = ihave.round;
92 entry.head_owner = ihave.sender.clone();
93 if entry.owners == 0 {
94 self.timeout_queue.push(QueueItem::Entry {
95 expiry_time: entry.next_expiry_time,
96 entry_seqno: entry.seqno,
97 message_id: ihave.message_id.clone(),
98 });
99 }
100 return Some(ihave);
101 }
102 QueueItem::Entry { message_id, .. } => {
103 let expired = self.ihaves.get(&message_id).map(|e| e.owners) == Some(0);
104 if expired {
105 self.ihaves.remove(&message_id);
106 }
107 }
108 }
109 }
110 None
111 }
112
113 pub fn remove(&mut self, message_id: &T::MessageId) {
114 self.ihaves.remove(message_id);
115 }
116
117 pub fn waiting_messages(&self) -> usize {
118 self.ihaves.len()
119 }
120
121 pub fn next_expiry_time(&self) -> Option<NodeTime> {
122 self.timeout_queue.peek().map(|x| x.expiry_time())
123 }
124
125 pub fn get_ihave(&self, message_id: &T::MessageId) -> Option<(u16, &T::NodeId)> {
126 self.ihaves
127 .get(message_id)
128 .map(|e| (e.head_round, &e.head_owner))
129 }
130}
131
132#[derive(Debug)]
133struct IhaveEntry<N> {
134 seqno: u64,
135 head_round: u16,
136 head_owner: N,
137 owners: usize,
138 next_expiry_time: NodeTime,
139}
140
141enum QueueItem<T: System> {
142 Message {
143 expiry_time: NodeTime,
144 entry_seqno: u64,
145 ihave: IhaveMessage<T>,
146 },
147 Entry {
148 expiry_time: NodeTime,
149 entry_seqno: u64,
150 message_id: T::MessageId,
151 },
152}
153impl<T: System> QueueItem<T> {
154 fn expiry_time(&self) -> NodeTime {
155 match self {
156 QueueItem::Message { expiry_time, .. } | QueueItem::Entry { expiry_time, .. } => {
157 *expiry_time
158 }
159 }
160 }
161
162 fn entry_seqno(&self) -> u64 {
163 match self {
164 QueueItem::Message { entry_seqno, .. } | QueueItem::Entry { entry_seqno, .. } => {
165 *entry_seqno
166 }
167 }
168 }
169
170 fn message_id(&self) -> &T::MessageId {
171 match self {
172 QueueItem::Message { ihave, .. } => &ihave.message_id,
173 QueueItem::Entry { message_id, .. } => message_id,
174 }
175 }
176}
177impl<T: System> PartialEq for QueueItem<T> {
178 fn eq(&self, other: &Self) -> bool {
179 self.expiry_time() == other.expiry_time()
180 }
181}
182impl<T: System> Eq for QueueItem<T> {}
183impl<T: System> PartialOrd for QueueItem<T> {
184 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
185 other.expiry_time().partial_cmp(&self.expiry_time())
186 }
187}
188impl<T: System> Ord for QueueItem<T> {
189 fn cmp(&self, other: &Self) -> Ordering {
190 other.expiry_time().cmp(&self.expiry_time())
191 }
192}
193impl<T: System> fmt::Debug for QueueItem<T>
194where
195 T::NodeId: fmt::Debug,
196 T::MessageId: fmt::Debug,
197{
198 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
199 write!(
200 f,
201 "QueueItem {{ expiry_time: {:?}, message_id: {:?}, .. }}",
202 self.expiry_time(),
203 self.message_id()
204 )
205 }
206}