1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
//! Publish-subscribe mechanism for internal events.

use std::collections::VecDeque;
use std::marker::PhantomData;

/// A permanent subscription to a pub-sub queue.
#[cfg_attr(feature = "serde-serialize", derive(Serialize, Deserialize))]
#[derive(Clone)]
pub struct Subscription<T> {
    // Position on the cursor array.
    id: u32,
    _phantom: PhantomData<T>,
}

#[cfg_attr(feature = "serde-serialize", derive(Serialize, Deserialize))]
#[derive(Clone)]
struct PubSubCursor {
    // Position on the offset array.
    id: u32,
    // Index of the next message to read.
    // NOTE: Having this here is not actually necessary because
    // this value is supposed to be equal to `offsets[self.id]`.
    // However, we keep it because it lets us avoid one lookup
    // on the `offsets` array inside of message-polling loops
    // based on `read_ith`.
    next: u32,
}

impl PubSubCursor {
    fn id(&self, num_deleted: u32) -> usize {
        (self.id - num_deleted) as usize
    }

    fn next(&self, num_deleted: u32) -> usize {
        (self.next - num_deleted) as usize
    }
}

/// A pub-sub queue.
#[cfg_attr(feature = "serde-serialize", derive(Serialize, Deserialize))]
#[derive(Clone)]
pub struct PubSub<T> {
    deleted_messages: u32,
    deleted_offsets: u32,
    messages: VecDeque<T>,
    offsets: VecDeque<u32>,
    cursors: Vec<PubSubCursor>,
}

impl<T> PubSub<T> {
    /// Create a new empty pub-sub queue.
    pub fn new() -> Self {
        Self {
            deleted_offsets: 0,
            deleted_messages: 0,
            messages: VecDeque::new(),
            offsets: VecDeque::new(),
            cursors: Vec::new(),
        }
    }

    fn reset_shifts(&mut self) {
        for offset in &mut self.offsets {
            *offset -= self.deleted_messages;
        }

        for cursor in &mut self.cursors {
            cursor.id -= self.deleted_offsets;
            cursor.next -= self.deleted_messages;
        }

        self.deleted_offsets = 0;
        self.deleted_messages = 0;
    }

    /// Publish a new message.
    pub fn publish(&mut self, message: T) {
        if self.offsets.is_empty() {
            // No subscribers, drop the message.
            return;
        }

        self.messages.push_back(message);
    }

    /// Subscribe to the queue.
    ///
    /// A subscription cannot be cancelled.
    #[must_use]
    pub fn subscribe(&mut self) -> Subscription<T> {
        let cursor = PubSubCursor {
            next: self.messages.len() as u32 + self.deleted_messages,
            id: self.offsets.len() as u32 + self.deleted_offsets,
        };

        let subscription = Subscription {
            id: self.cursors.len() as u32,
            _phantom: PhantomData,
        };

        self.offsets.push_back(cursor.next);
        self.cursors.push(cursor);
        subscription
    }

    /// Read the i-th message not yet read by the given subsciber.
    pub fn read_ith(&self, sub: &Subscription<T>, i: usize) -> Option<&T> {
        let cursor = &self.cursors[sub.id as usize];
        self.messages
            .get(cursor.next(self.deleted_messages) as usize + i)
    }

    /// Get the messages not yet read by the given subscriber.
    pub fn read(&self, sub: &Subscription<T>) -> impl Iterator<Item = &T> {
        let cursor = &self.cursors[sub.id as usize];
        let next = cursor.next(self.deleted_messages);

        // TODO: use self.queue.range(next..) once it is stabilised.
        MessageRange {
            queue: &self.messages,
            next,
        }
    }

    /// Makes the given subscribe acknowledge all the messages in the queue.
    ///
    /// A subscriber cannot read acknowledged messages any more.
    pub fn ack(&mut self, sub: &Subscription<T>) {
        // Update the cursor.
        let cursor = &mut self.cursors[sub.id as usize];

        self.offsets[cursor.id(self.deleted_offsets)] = u32::MAX;
        cursor.id = self.offsets.len() as u32 + self.deleted_offsets;

        cursor.next = self.messages.len() as u32 + self.deleted_messages;
        self.offsets.push_back(cursor.next);

        // Now clear the messages we don't need to
        // maintain in memory anymore.
        while self.offsets.front() == Some(&u32::MAX) {
            self.offsets.pop_front();
            self.deleted_offsets += 1;
        }

        // There must be at least one offset otherwise
        // that would mean we have no subscribers.
        let next = self.offsets.front().unwrap();
        let num_to_delete = *next - self.deleted_messages;

        for _ in 0..num_to_delete {
            self.messages.pop_front();
        }

        self.deleted_messages += num_to_delete;

        if self.deleted_messages > u32::MAX / 2 || self.deleted_offsets > u32::MAX / 2 {
            // Don't let the deleted_* shifts grow indefinitely otherwise
            // they will end up overflowing, breaking everything.
            self.reset_shifts();
        }
    }
}

struct MessageRange<'a, T> {
    queue: &'a VecDeque<T>,
    next: usize,
}

impl<'a, T> Iterator for MessageRange<'a, T> {
    type Item = &'a T;

    #[inline(always)]
    fn next(&mut self) -> Option<&'a T> {
        let result = self.queue.get(self.next);
        self.next += 1;
        result
    }
}