use std::sync::Arc;
use qbase::{
self,
error::{AppError, QuicError},
frame::ConnectionCloseFrame,
};
use qevent::quic::connectivity::BaseConnectionStates;
use tokio::sync::mpsc;
use crate::state::ArcConnState;
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum Event {
Handshaked,
Failed(QuicError),
ApplicationClose(AppError),
Closed(ConnectionCloseFrame),
StatelessReset,
Terminated,
}
pub trait EmitEvent: Send + Sync {
fn emit(&self, event: Event);
}
#[derive(Clone)]
pub struct ArcEventBroker {
conn_state: ArcConnState,
raw_broker: Arc<dyn EmitEvent>,
}
impl ArcEventBroker {
pub fn new<E: EmitEvent + 'static>(conn_state: ArcConnState, event_broker: E) -> Self {
Self {
conn_state,
raw_broker: Arc::new(event_broker),
}
}
}
impl EmitEvent for ArcEventBroker {
fn emit(&self, event: Event) {
match &event {
Event::Handshaked => {
if self.conn_state.enter_handshaked().is_none() {
return;
}
}
Event::Failed(error) => {
if self.conn_state.enter_closing(error).is_none() {
return;
}
}
Event::ApplicationClose(error) => {
if self.conn_state.enter_closing(error).is_none() {
return;
}
}
Event::Closed(ccf) => {
if self.conn_state.enter_draining(ccf).is_none() {
return;
}
}
Event::Terminated => {
let terminated_state = BaseConnectionStates::Closed;
self.conn_state.update(terminated_state.into());
}
Event::StatelessReset => todo!("unsupported"),
};
tracing::debug!(target: "quic", new_state = ?event, "connection state changed");
self.raw_broker.emit(event);
}
}
impl EmitEvent for mpsc::UnboundedSender<Event> {
fn emit(&self, event: Event) {
_ = self.send(event);
}
}
#[cfg(test)]
mod tests {
use tokio::sync::mpsc;
use super::*;
#[test]
fn test_emit_event() {
let (tx, mut rx) = mpsc::unbounded_channel();
tx.emit(Event::Handshaked);
assert_eq!(rx.try_recv().unwrap(), Event::Handshaked);
}
}