use super::super::json::{self, GatewayEventParsingError};
use crate::{listener::Listeners, EventTypeFlags};
use std::{
convert::TryFrom,
error::Error,
fmt::{Display, Formatter, Result as FmtResult},
};
use twilight_model::gateway::event::{shard::Payload, Event};
#[derive(Debug)]
pub enum EmitJsonError {
EventTypeUnknown {
event_type: Option<String>,
op: u8,
},
Parsing {
source: GatewayEventParsingError,
},
}
impl Display for EmitJsonError {
fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
match self {
Self::EventTypeUnknown { event_type, op } => f.write_fmt(format_args!(
"provided event type ({:?})/op ({}) pair is unknown",
event_type, op,
)),
Self::Parsing { source } => Display::fmt(source, f),
}
}
}
impl Error for EmitJsonError {
fn source(&self) -> Option<&(dyn Error + 'static)> {
match self {
Self::EventTypeUnknown { .. } => None,
Self::Parsing { source } => Some(source),
}
}
}
#[derive(Clone, Debug)]
pub struct Emitter {
listeners: Listeners<Event>,
}
impl Emitter {
pub fn new(listeners: Listeners<Event>) -> Self {
Self { listeners }
}
pub fn into_listeners(self) -> Listeners<Event> {
self.listeners
}
pub fn wants(&self, event_type: EventTypeFlags) -> bool {
self.listeners.event_types().contains(event_type)
}
#[tracing::instrument(level = "trace")]
pub fn bytes(&self, bytes: &[u8]) {
if !self.wants(EventTypeFlags::SHARD_PAYLOAD) {
return;
}
self.send(EventTypeFlags::SHARD_PAYLOAD, |_| {
Event::ShardPayload(Payload {
bytes: bytes.to_vec(),
})
});
}
#[tracing::instrument(level = "trace")]
pub fn event(&self, event: Event) {
let event_type = EventTypeFlags::from(event.kind());
if !self.wants(event_type) {
return;
}
let listener_count = self.listeners.len();
let mut event = Some(event);
self.send(event_type, |idx| {
if idx == listener_count {
tracing::trace!("moving event to send to listener");
event.take().unwrap()
} else {
tracing::trace!("cloning event to send to listener");
event.clone().unwrap()
}
})
}
pub fn json(
&self,
op: u8,
seq: Option<u64>,
event_type: Option<&str>,
json: &mut str,
) -> Result<(), EmitJsonError> {
let flag = EventTypeFlags::try_from((op, event_type)).map_err(|(op, event_type)| {
EmitJsonError::EventTypeUnknown {
event_type: event_type.map(ToOwned::to_owned),
op,
}
})?;
if !self.wants(flag) {
return Ok(());
}
let gateway_event = json::parse_gateway_event(op, seq, event_type, json)
.map_err(|source| EmitJsonError::Parsing { source })?;
self.event(Event::from(gateway_event));
Ok(())
}
fn send(&self, event_type: EventTypeFlags, mut f: impl FnMut(usize) -> Event) {
let listener_count = self.listeners.len();
let mut idx = 0;
let span = tracing::trace_span!(
"beginning to iterate over listeners",
?event_type,
?listener_count,
);
let _span_enter = span.enter();
self.listeners.all().retain(|id, listener| {
let span = tracing::trace_span!("sending to listener", %id, ?event_type);
let _span_enter = span.enter();
idx += 1;
if !listener.wants(event_type) {
tracing::trace!("listener doesn't want event type");
return !listener.tx.is_closed();
}
listener.tx.unbounded_send(f(idx)).is_ok()
});
}
}
#[cfg(test)]
mod tests {
use super::Emitter;
use crate::{listener::Listeners, Event, EventTypeFlags};
#[test]
fn test_bytes_send() {
let listeners = Listeners::default();
let mut rx = listeners.add(EventTypeFlags::SHARD_PAYLOAD);
let emitter = Emitter::new(listeners);
emitter.bytes(&[1]);
assert_eq!(1, emitter.listeners.len());
assert!(matches!(rx.try_next(), Ok(Some(_))));
assert!(rx.try_next().is_err());
}
#[test]
fn test_event_removes_closed_channels() {
let listeners = Listeners::default();
let _ = listeners.add(EventTypeFlags::default());
let emitter = Emitter::new(listeners);
emitter.event(Event::GatewayReconnect);
assert!(emitter.listeners.all().is_empty());
}
#[test]
fn test_event_sends_to_rxs() {
let listeners = Listeners::default();
let mut rx1 = listeners.add(EventTypeFlags::default());
let mut rx2 = listeners.add(EventTypeFlags::default());
let emitter = Emitter::new(listeners);
emitter.event(Event::GatewayReconnect);
assert_eq!(2, emitter.listeners.len());
assert!(matches!(rx1.try_next(), Ok(Some(_))));
assert!(matches!(rx2.try_next(), Ok(Some(_))));
assert!(rx1.try_next().is_err());
assert!(rx2.try_next().is_err());
}
}