1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
//! Channel for users to send messages when calling [`Shard::send`] isn't
//! possible.
//!
//! [`Shard::send`]: crate::Shard::send

use crate::{
    command::{self, Command},
    error::{SendError, SendErrorType},
    CloseFrame,
};
use tokio::sync::mpsc::{self, error::TrySendError};

/// Channel between a user and shard for sending outgoing gateway messages.
#[derive(Debug)]
pub struct MessageChannel {
    /// Receiving half for shards to receive users' close frames.
    pub close_rx: mpsc::Receiver<CloseFrame<'static>>,
    /// Sending half for users to send close frames via shards.
    close_tx: mpsc::Sender<CloseFrame<'static>>,
    /// Receiving half for shards to receive users' commands.
    pub command_rx: mpsc::UnboundedReceiver<String>,
    /// Sending half for users to send commands via shards.
    command_tx: mpsc::UnboundedSender<String>,
}

impl MessageChannel {
    /// Initialize a new message channel.
    pub fn new() -> Self {
        let (command_tx, command_rx) = mpsc::unbounded_channel();
        let (close_tx, close_rx) = mpsc::channel(1);

        Self {
            close_rx,
            close_tx,
            command_rx,
            command_tx,
        }
    }

    /// Clone of the senders.
    pub fn sender(&self) -> MessageSender {
        MessageSender {
            close: self.close_tx.clone(),
            command: self.command_tx.clone(),
        }
    }
}

/// Channel to send messages over a [`Shard`] to the Discord gateway.
///
/// Unlike the methods on [`Shard`], messages queued up through this are
/// conditionally sent when not ratelimited and identified (except for close
/// frames which are sent as long as the shard is connected to the Websocket).
///
/// [`Shard`]: crate::Shard
#[derive(Clone, Debug)]
pub struct MessageSender {
    /// Sending half of the close channel.
    close: mpsc::Sender<CloseFrame<'static>>,
    /// Sending half of the command channel.
    command: mpsc::UnboundedSender<String>,
}

impl MessageSender {
    /// Whether the channel is closed.
    ///
    /// The channel will only be closed if the associated shard has been
    /// dropped.
    pub fn is_closed(&self) -> bool {
        self.command.is_closed()
    }

    /// Send a command to the associated shard.
    ///
    /// # Errors
    ///
    /// Returns a [`SendErrorType::Sending`] error type if the channel is
    /// closed.
    ///
    /// Returns a [`SendErrorType::Serializing`] error type if the provided
    /// command failed to serialize.
    pub fn command(&self, command: &impl Command) -> Result<(), SendError> {
        let json = command::prepare(command)?;

        self.send(json)
    }

    /// Send a JSON encoded gateway event to the associated shard.
    ///
    /// # Errors
    ///
    /// Returns a [`SendErrorType::Sending`] error type if the channel is
    /// closed.
    pub fn send(&self, json: String) -> Result<(), SendError> {
        self.command.send(json).map_err(|_| SendError {
            kind: SendErrorType::Sending,
            source: None,
        })
    }

    /// Send a Websocket close frame to the associated shard.
    ///
    /// Subsequent calls may be queued up to be sent once the shard's
    /// reestablished a Websocket connection or ignored if the queue is full.
    /// The internal queue capacity is currently `1`.
    ///
    /// See the [`Shard::close`] docs for further information.
    ///
    /// # Errors
    ///
    /// Returns a [`SendErrorType::Sending`] error type if the channel is
    /// closed.
    ///
    /// [`Shard::close`]: crate::Shard::close
    pub fn close(&self, close_frame: CloseFrame<'static>) -> Result<(), SendError> {
        match self.close.try_send(close_frame) {
            Ok(()) | Err(TrySendError::Full(_)) => Ok(()),
            _ => Err(SendError {
                kind: SendErrorType::Sending,
                source: None,
            }),
        }
    }
}

#[cfg(test)]
mod tests {
    use super::{MessageChannel, MessageSender};
    use crate::json;
    use static_assertions::assert_impl_all;
    use std::{error::Error, fmt::Debug};
    use twilight_model::{
        gateway::{
            payload::outgoing::{Heartbeat, RequestGuildMembers},
            CloseFrame,
        },
        id::Id,
    };

    assert_impl_all!(MessageChannel: Debug, Send, Sync);
    assert_impl_all!(MessageSender: Clone, Debug, Send, Sync);

    #[test]
    fn channel_sending() -> Result<(), Box<dyn Error>> {
        let mut channel = MessageChannel::new();
        let sender = channel.sender();
        assert!(channel.command_rx.try_recv().is_err());
        assert!(channel.close_rx.try_recv().is_err());

        let frame = CloseFrame::NORMAL;
        let request = RequestGuildMembers::builder(Id::new(1)).query("", None);
        let heartbeat = Heartbeat::new(Some(30_000));
        let heartbeat_string = json::to_string(&heartbeat)?;
        assert!(sender.command(&request).is_ok());
        assert!(sender.send(heartbeat_string.clone()).is_ok());
        assert!(sender.close(frame.clone()).is_ok());
        assert!(sender.close(frame.clone()).is_ok());

        assert_eq!(request, json::from_str(&channel.command_rx.try_recv()?)?);
        assert_eq!(heartbeat_string, channel.command_rx.try_recv()?);
        assert_eq!(frame, channel.close_rx.try_recv()?);
        assert!(channel.close_rx.try_recv().is_err());

        assert!(!sender.is_closed());
        drop(channel);
        assert!(sender.is_closed());

        assert!(sender.command(&request).is_err());
        assert!(sender.send(heartbeat_string).is_err());
        assert!(sender.close(frame).is_err());

        Ok(())
    }
}