plumtree/
missing.rs

1use std::cmp::Ordering;
2use std::collections::{BinaryHeap, HashMap};
3use std::fmt;
4use std::time::Duration;
5
6use message::IhaveMessage;
7use time::{Clock, NodeTime};
8use System;
9
10pub struct MissingMessages<T: System> {
11    timeout_queue: BinaryHeap<QueueItem<T>>,
12    ihaves: HashMap<T::MessageId, IhaveEntry<T::NodeId>>,
13    entry_seqno: u64,
14}
15impl<T: System> fmt::Debug for MissingMessages<T>
16where
17    T::NodeId: fmt::Debug,
18    T::MessageId: fmt::Debug,
19{
20    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
21        write!(
22            f,
23            "MissingMessages {{ timeout_queue: {:?}, ihaves: {:?}, entry_seqno: {:?} }}",
24            self.timeout_queue, self.ihaves, self.entry_seqno
25        )
26    }
27}
28impl<T: System> MissingMessages<T> {
29    pub fn new() -> Self {
30        MissingMessages {
31            timeout_queue: BinaryHeap::new(),
32            ihaves: HashMap::new(),
33            entry_seqno: 0,
34        }
35    }
36
37    pub fn push(&mut self, ihave: IhaveMessage<T>, clock: &Clock, timeout: Duration) {
38        let seqno = self.entry_seqno;
39        let entry = self.ihaves
40            .entry(ihave.message_id.clone())
41            .or_insert_with(|| {
42                let mut expiry_time = clock.now();
43                if !ihave.realtime {
44                    expiry_time += timeout;
45                }
46                IhaveEntry {
47                    seqno,
48                    head_round: ihave.round,
49                    head_owner: ihave.sender.clone(),
50                    owners: 0,
51                    next_expiry_time: expiry_time,
52                }
53            });
54
55        let expiry_time = entry.next_expiry_time;
56        entry.next_expiry_time += timeout;
57        entry.owners += 1;
58        if entry.owners == 1 {
59            self.entry_seqno += 1;
60        }
61
62        self.timeout_queue.push(QueueItem::Message {
63            expiry_time,
64            ihave,
65            entry_seqno: entry.seqno,
66        });
67    }
68
69    pub fn pop_expired(&mut self, clock: &Clock) -> Option<IhaveMessage<T>> {
70        let is_expired = |x: &QueueItem<_>| x.expiry_time() <= clock.now();
71        while self.timeout_queue.peek().map_or(false, is_expired) {
72            let item = self.timeout_queue.pop().expect("never fails");
73            match self.ihaves.get(item.message_id()) {
74                None => {
75                    // (a) The entry has been removed due to reception of the associated GOSSIP message
76                    continue;
77                }
78                Some(entry) if entry.seqno != item.entry_seqno() => {
79                    // (b) Like `(a)`, but the message has been forgot before receiving new IHAVE messages
80                    continue;
81                }
82                _ => {}
83            }
84
85            match item {
86                QueueItem::Message { ihave, .. } => {
87                    let entry = self.ihaves.get_mut(&ihave.message_id).expect("never fails");
88                    assert_ne!(entry.owners, 0);
89
90                    entry.owners -= 1;
91                    entry.head_round = ihave.round;
92                    entry.head_owner = ihave.sender.clone();
93                    if entry.owners == 0 {
94                        self.timeout_queue.push(QueueItem::Entry {
95                            expiry_time: entry.next_expiry_time,
96                            entry_seqno: entry.seqno,
97                            message_id: ihave.message_id.clone(),
98                        });
99                    }
100                    return Some(ihave);
101                }
102                QueueItem::Entry { message_id, .. } => {
103                    let expired = self.ihaves.get(&message_id).map(|e| e.owners) == Some(0);
104                    if expired {
105                        self.ihaves.remove(&message_id);
106                    }
107                }
108            }
109        }
110        None
111    }
112
113    pub fn remove(&mut self, message_id: &T::MessageId) {
114        self.ihaves.remove(message_id);
115    }
116
117    pub fn waiting_messages(&self) -> usize {
118        self.ihaves.len()
119    }
120
121    pub fn next_expiry_time(&self) -> Option<NodeTime> {
122        self.timeout_queue.peek().map(|x| x.expiry_time())
123    }
124
125    pub fn get_ihave(&self, message_id: &T::MessageId) -> Option<(u16, &T::NodeId)> {
126        self.ihaves
127            .get(message_id)
128            .map(|e| (e.head_round, &e.head_owner))
129    }
130}
131
132#[derive(Debug)]
133struct IhaveEntry<N> {
134    seqno: u64,
135    head_round: u16,
136    head_owner: N,
137    owners: usize,
138    next_expiry_time: NodeTime,
139}
140
141enum QueueItem<T: System> {
142    Message {
143        expiry_time: NodeTime,
144        entry_seqno: u64,
145        ihave: IhaveMessage<T>,
146    },
147    Entry {
148        expiry_time: NodeTime,
149        entry_seqno: u64,
150        message_id: T::MessageId,
151    },
152}
153impl<T: System> QueueItem<T> {
154    fn expiry_time(&self) -> NodeTime {
155        match self {
156            QueueItem::Message { expiry_time, .. } | QueueItem::Entry { expiry_time, .. } => {
157                *expiry_time
158            }
159        }
160    }
161
162    fn entry_seqno(&self) -> u64 {
163        match self {
164            QueueItem::Message { entry_seqno, .. } | QueueItem::Entry { entry_seqno, .. } => {
165                *entry_seqno
166            }
167        }
168    }
169
170    fn message_id(&self) -> &T::MessageId {
171        match self {
172            QueueItem::Message { ihave, .. } => &ihave.message_id,
173            QueueItem::Entry { message_id, .. } => message_id,
174        }
175    }
176}
177impl<T: System> PartialEq for QueueItem<T> {
178    fn eq(&self, other: &Self) -> bool {
179        self.expiry_time() == other.expiry_time()
180    }
181}
182impl<T: System> Eq for QueueItem<T> {}
183impl<T: System> PartialOrd for QueueItem<T> {
184    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
185        other.expiry_time().partial_cmp(&self.expiry_time())
186    }
187}
188impl<T: System> Ord for QueueItem<T> {
189    fn cmp(&self, other: &Self) -> Ordering {
190        other.expiry_time().cmp(&self.expiry_time())
191    }
192}
193impl<T: System> fmt::Debug for QueueItem<T>
194where
195    T::NodeId: fmt::Debug,
196    T::MessageId: fmt::Debug,
197{
198    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
199        write!(
200            f,
201            "QueueItem {{ expiry_time: {:?}, message_id: {:?}, .. }}",
202            self.expiry_time(),
203            self.message_id()
204        )
205    }
206}