1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
use std::{
    mem,
    sync::{
        RwLock,
        Mutex,
    },
};

use futures::sync::oneshot;

struct Listener<T> {
    sender: oneshot::Sender<(u32, Vec<T>)>,
    cursor: u32,
}

fn fire_listener_if_ready<T: Clone>(messages: &[T], listener: Listener<T>) -> Result<(), Listener<T>> {
    let current_cursor = messages.len() as u32;

    if listener.cursor < current_cursor {
        let new_messages = messages[(listener.cursor as usize)..].to_vec();
        let _ = listener.sender.send((current_cursor, new_messages));
        Ok(())
    } else {
        Err(listener)
    }
}

/// A message queue with persistent history that can be subscribed to.
///
/// Definitely non-optimal. This would ideally be a lockless mpmc queue.
#[derive(Default)]
pub struct MessageQueue<T> {
   messages: RwLock<Vec<T>>,
   message_listeners: Mutex<Vec<Listener<T>>>,
}

impl<T: Clone> MessageQueue<T> {
    pub fn new() -> MessageQueue<T> {
        MessageQueue {
            messages: RwLock::new(Vec::new()),
            message_listeners: Mutex::new(Vec::new()),
        }
    }

    pub fn push_messages(&self, new_messages: &[T]) {
        let mut message_listeners = self.message_listeners.lock().unwrap();
        let mut messages = self.messages.write().unwrap();
        messages.extend_from_slice(new_messages);

        let mut remaining_listeners = Vec::new();

        for listener in message_listeners.drain(..) {
            match fire_listener_if_ready(&messages, listener) {
                Ok(_) => {}
                Err(listener) => remaining_listeners.push(listener)
            }
        }

        // Without this annotation, Rust gets confused since the first argument
        // is a MutexGuard, but the second is a Vec.
        mem::replace::<Vec<_>>(&mut message_listeners, remaining_listeners);
    }

    pub fn subscribe(&self, cursor: u32, sender: oneshot::Sender<(u32, Vec<T>)>) {
        let listener = {
            let listener = Listener {
                sender,
                cursor,
            };

            let messages = self.messages.read().unwrap();

            match fire_listener_if_ready(&messages, listener) {
                Ok(_) => return,
                Err(listener) => listener
            }
        };

        let mut message_listeners = self.message_listeners.lock().unwrap();
        message_listeners.push(listener);
    }

    pub fn get_message_cursor(&self) -> u32 {
        self.messages.read().unwrap().len() as u32
    }

    pub fn get_messages_since(&self, cursor: u32) -> (u32, Vec<T>) {
        let messages = self.messages.read().unwrap();

        let current_cursor = messages.len() as u32;

        // Cursor is out of bounds or there are no new messages
        if cursor >= current_cursor {
            return (current_cursor, Vec::new());
        }

        (current_cursor, messages[(cursor as usize)..].to_vec())
    }
}