use crate::*;
impl BroadcastTypeTrait for String {}
impl BroadcastTypeTrait for &str {}
impl BroadcastTypeTrait for char {}
impl BroadcastTypeTrait for bool {}
impl BroadcastTypeTrait for i8 {}
impl BroadcastTypeTrait for i16 {}
impl BroadcastTypeTrait for i32 {}
impl BroadcastTypeTrait for i64 {}
impl BroadcastTypeTrait for i128 {}
impl BroadcastTypeTrait for isize {}
impl BroadcastTypeTrait for u8 {}
impl BroadcastTypeTrait for u16 {}
impl BroadcastTypeTrait for u32 {}
impl BroadcastTypeTrait for u64 {}
impl BroadcastTypeTrait for u128 {}
impl BroadcastTypeTrait for usize {}
impl BroadcastTypeTrait for f32 {}
impl BroadcastTypeTrait for f64 {}
impl BroadcastTypeTrait for IpAddr {}
impl BroadcastTypeTrait for Ipv4Addr {}
impl BroadcastTypeTrait for Ipv6Addr {}
impl BroadcastTypeTrait for SocketAddr {}
impl BroadcastTypeTrait for NonZeroU8 {}
impl BroadcastTypeTrait for NonZeroU16 {}
impl BroadcastTypeTrait for NonZeroU32 {}
impl BroadcastTypeTrait for NonZeroU64 {}
impl BroadcastTypeTrait for NonZeroU128 {}
impl BroadcastTypeTrait for NonZeroUsize {}
impl BroadcastTypeTrait for NonZeroI8 {}
impl BroadcastTypeTrait for NonZeroI16 {}
impl BroadcastTypeTrait for NonZeroI32 {}
impl BroadcastTypeTrait for NonZeroI64 {}
impl BroadcastTypeTrait for NonZeroI128 {}
impl BroadcastTypeTrait for NonZeroIsize {}
impl BroadcastTypeTrait for Infallible {}
impl BroadcastTypeTrait for &String {}
impl BroadcastTypeTrait for &&str {}
impl BroadcastTypeTrait for &char {}
impl BroadcastTypeTrait for &bool {}
impl BroadcastTypeTrait for &i8 {}
impl BroadcastTypeTrait for &i16 {}
impl BroadcastTypeTrait for &i32 {}
impl BroadcastTypeTrait for &i64 {}
impl BroadcastTypeTrait for &i128 {}
impl BroadcastTypeTrait for &isize {}
impl BroadcastTypeTrait for &u8 {}
impl BroadcastTypeTrait for &u16 {}
impl BroadcastTypeTrait for &u32 {}
impl BroadcastTypeTrait for &u128 {}
impl BroadcastTypeTrait for &usize {}
impl BroadcastTypeTrait for &f32 {}
impl BroadcastTypeTrait for &f64 {}
impl BroadcastTypeTrait for &IpAddr {}
impl BroadcastTypeTrait for &Ipv4Addr {}
impl BroadcastTypeTrait for &Ipv6Addr {}
impl BroadcastTypeTrait for &SocketAddr {}
impl BroadcastTypeTrait for &NonZeroU8 {}
impl BroadcastTypeTrait for &NonZeroU16 {}
impl BroadcastTypeTrait for &NonZeroU32 {}
impl BroadcastTypeTrait for &NonZeroU64 {}
impl BroadcastTypeTrait for &NonZeroU128 {}
impl BroadcastTypeTrait for &NonZeroUsize {}
impl BroadcastTypeTrait for &NonZeroI8 {}
impl BroadcastTypeTrait for &NonZeroI16 {}
impl BroadcastTypeTrait for &NonZeroI32 {}
impl BroadcastTypeTrait for &NonZeroI64 {}
impl BroadcastTypeTrait for &NonZeroI128 {}
impl BroadcastTypeTrait for &NonZeroIsize {}
impl BroadcastTypeTrait for &Infallible {}
impl<B> Default for BroadcastType<B>
where
B: BroadcastTypeTrait,
{
#[inline(always)]
fn default() -> Self {
BroadcastType::Unknown
}
}
impl<B> BroadcastType<B>
where
B: BroadcastTypeTrait,
{
#[inline(always)]
pub fn get_key(broadcast_type: BroadcastType<B>) -> String {
match broadcast_type {
BroadcastType::PointToPoint(key1, key2) => {
let (first_key, second_key) = if key1 <= key2 {
(key1, key2)
} else {
(key2, key1)
};
format!(
"{}-{}-{}",
POINT_TO_POINT_KEY,
first_key.to_string(),
second_key.to_string()
)
}
BroadcastType::PointToGroup(key) => {
format!("{}-{}", POINT_TO_GROUP_KEY, key.to_string())
}
BroadcastType::Unknown => String::new(),
}
}
}
impl<'a, B> WebSocketConfig<'a, B>
where
B: BroadcastTypeTrait,
{
#[inline(always)]
pub fn new(context: &'a mut Context) -> Self {
Self {
context,
capacity: DEFAULT_BROADCAST_SENDER_CAPACITY,
broadcast_type: BroadcastType::default(),
connected_hook: default_server_hook_handler(),
request_hook: default_server_hook_handler(),
sended_hook: default_server_hook_handler(),
closed_hook: default_server_hook_handler(),
}
}
}
impl<'a, B> WebSocketConfig<'a, B>
where
B: BroadcastTypeTrait,
{
#[inline(always)]
pub fn set_capacity(mut self, capacity: Capacity) -> Self {
self.capacity = capacity;
self
}
#[inline(always)]
pub fn set_context(mut self, context: &'a mut Context) -> Self {
self.context = context;
self
}
#[inline(always)]
pub fn set_broadcast_type(mut self, broadcast_type: BroadcastType<B>) -> Self {
self.broadcast_type = broadcast_type;
self
}
#[inline(always)]
pub fn get_context(&mut self) -> &mut Context {
self.context
}
#[inline(always)]
pub fn get_capacity(&self) -> Capacity {
self.capacity
}
#[inline(always)]
pub fn get_broadcast_type(&self) -> &BroadcastType<B> {
&self.broadcast_type
}
#[inline(always)]
pub fn set_connected_hook<S>(mut self) -> Self
where
S: ServerHook,
{
self.connected_hook = server_hook_factory::<S>();
self
}
#[inline(always)]
pub fn set_request_hook<S>(mut self) -> Self
where
S: ServerHook,
{
self.request_hook = server_hook_factory::<S>();
self
}
#[inline(always)]
pub fn set_sended_hook<S>(mut self) -> Self
where
S: ServerHook,
{
self.sended_hook = server_hook_factory::<S>();
self
}
#[inline(always)]
pub fn set_closed_hook<S>(mut self) -> Self
where
S: ServerHook,
{
self.closed_hook = server_hook_factory::<S>();
self
}
#[inline(always)]
pub fn get_connected_hook(&self) -> &ServerHookHandler {
&self.connected_hook
}
#[inline(always)]
pub fn get_request_hook(&self) -> &ServerHookHandler {
&self.request_hook
}
#[inline(always)]
pub fn get_sended_hook(&self) -> &ServerHookHandler {
&self.sended_hook
}
#[inline(always)]
pub fn get_closed_hook(&self) -> &ServerHookHandler {
&self.closed_hook
}
}
impl WebSocket {
#[inline(always)]
pub fn new() -> Self {
Self::default()
}
#[inline(always)]
fn subscribe_unwrap_or_insert<B>(
&self,
broadcast_type: BroadcastType<B>,
capacity: Capacity,
) -> BroadcastMapReceiver<Vec<u8>>
where
B: BroadcastTypeTrait,
{
let key: String = BroadcastType::get_key(broadcast_type);
self.broadcast_map.subscribe_or_insert(&key, capacity)
}
#[inline(always)]
fn point_to_point<B>(
&self,
key1: &B,
key2: &B,
capacity: Capacity,
) -> BroadcastMapReceiver<Vec<u8>>
where
B: BroadcastTypeTrait,
{
self.subscribe_unwrap_or_insert(
BroadcastType::PointToPoint(key1.clone(), key2.clone()),
capacity,
)
}
#[inline(always)]
fn point_to_group<B>(&self, key: &B, capacity: Capacity) -> BroadcastMapReceiver<Vec<u8>>
where
B: BroadcastTypeTrait,
{
self.subscribe_unwrap_or_insert(BroadcastType::PointToGroup(key.clone()), capacity)
}
#[inline(always)]
pub fn receiver_count<B>(&self, broadcast_type: BroadcastType<B>) -> ReceiverCount
where
B: BroadcastTypeTrait,
{
let key: String = BroadcastType::get_key(broadcast_type);
self.broadcast_map.receiver_count(&key).unwrap_or(0)
}
#[inline(always)]
pub fn receiver_count_before_connected<B>(
&self,
broadcast_type: BroadcastType<B>,
) -> ReceiverCount
where
B: BroadcastTypeTrait,
{
let count: ReceiverCount = self.receiver_count(broadcast_type);
count.clamp(0, ReceiverCount::MAX - 1) + 1
}
#[inline(always)]
pub fn receiver_count_after_closed<B>(&self, broadcast_type: BroadcastType<B>) -> ReceiverCount
where
B: BroadcastTypeTrait,
{
let count: ReceiverCount = self.receiver_count(broadcast_type);
count.clamp(1, ReceiverCount::MAX) - 1
}
#[inline(always)]
pub fn try_send<T, B>(
&self,
broadcast_type: BroadcastType<B>,
data: T,
) -> Result<Option<ReceiverCount>, SendError<Vec<u8>>>
where
T: Into<Vec<u8>>,
B: BroadcastTypeTrait,
{
let key: String = BroadcastType::get_key(broadcast_type);
self.broadcast_map.try_send(&key, data.into())
}
#[inline(always)]
pub fn send<T, B>(&self, broadcast_type: BroadcastType<B>, data: T) -> Option<ReceiverCount>
where
T: Into<Vec<u8>>,
B: BroadcastTypeTrait,
{
self.try_send(broadcast_type, data).unwrap()
}
pub async fn run<B>(&self, mut websocket_config: WebSocketConfig<'_, B>)
where
B: BroadcastTypeTrait,
{
let capacity: Capacity = websocket_config.get_capacity();
let broadcast_type: BroadcastType<B> = websocket_config.get_broadcast_type().clone();
let connected_hook: ServerHookHandler = websocket_config.get_connected_hook().clone();
let sended_hook: ServerHookHandler = websocket_config.get_sended_hook().clone();
let request_hook: ServerHookHandler = websocket_config.get_request_hook().clone();
let closed_hook: ServerHookHandler = websocket_config.get_closed_hook().clone();
let ctx: &mut Context = websocket_config.get_context();
let mut receiver: Receiver<Vec<u8>> = match &broadcast_type {
BroadcastType::PointToPoint(key1, key2) => self.point_to_point(key1, key2, capacity),
BroadcastType::PointToGroup(key) => self.point_to_group(key, capacity),
BroadcastType::Unknown => panic!("BroadcastType must be PointToPoint or PointToGroup"),
};
let key: String = BroadcastType::get_key(broadcast_type);
connected_hook(ctx).await;
loop {
tokio::select! {
request_res = ctx.ws_from_stream() => {
let mut is_err: bool = false;
if request_res.is_ok() {
request_hook(ctx).await;
} else {
is_err = true;
closed_hook(ctx).await;
}
if ctx.get_aborted() {
continue;
}
if ctx.get_closed() {
break;
}
let body: ResponseBody = ctx.get_response().get_body().clone();
is_err = self.broadcast_map.try_send(&key, body).is_err() || is_err;
sended_hook(ctx).await;
if is_err || ctx.get_closed() {
break;
}
},
msg_res = receiver.recv() => {
if let Ok(msg) = &msg_res {
if ctx.try_send_body_list_with_data(&WebSocketFrame::create_frame_list(msg)).await.is_ok() {
continue;
} else {
break;
}
}
break;
}
}
}
ctx.set_aborted(true).set_closed(true);
}
}