use std::collections::HashMap;
use std::sync::Arc;
use parking_lot::{Mutex, RwLock};
use tokio::sync::Notify;
use crate::event::AstridEvent;
use crate::route::entry::{MAX_SUBSCRIPTION_BUDGET_BYTES, RouteEntry, RouteKey};
pub const METRIC_ROUTE_ACTIVE_PRINCIPALS: &str = "astrid_capsule_route_active_principals";
pub const METRIC_ROUTE_BUDGET_BYTES_IN_USE: &str = "astrid_capsule_route_budget_bytes_in_use";
pub struct RoutedEventReceiver {
pub(crate) route_key: RouteKey,
pub(crate) route_entry: Arc<Mutex<RouteEntry>>,
pub(crate) notify: Arc<Notify>,
pub(crate) routes: Arc<RwLock<HashMap<RouteKey, Arc<Mutex<RouteEntry>>>>>,
pub(crate) lagged_count: u64,
pub(crate) subscriber: &'static str,
}
impl std::fmt::Debug for RoutedEventReceiver {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("RoutedEventReceiver")
.field("route_key", &self.route_key)
.field("subscriber", &self.subscriber)
.finish_non_exhaustive()
}
}
impl RoutedEventReceiver {
pub async fn recv(&mut self, timeout: Option<std::time::Duration>) -> Option<Arc<AstridEvent>> {
loop {
{
let mut out: Vec<Arc<AstridEvent>> = Vec::with_capacity(1);
let mut entry = self.route_entry.lock();
let _ = entry.drr_drain(&mut out, MAX_SUBSCRIPTION_BUDGET_BYTES);
if let Some(first) = out.into_iter().next() {
return Some(first);
}
}
match timeout {
Some(dur) => {
if tokio::time::timeout(dur, self.notify.notified())
.await
.is_err()
{
return None;
}
},
None => {
self.notify.notified().await;
},
}
}
}
pub fn try_drain(&mut self, budget: usize) -> Vec<Arc<AstridEvent>> {
let mut out = Vec::new();
let mut entry = self.route_entry.lock();
let _ = entry.drr_drain(&mut out, budget);
out
}
pub fn drain_lagged(&mut self) -> u64 {
std::mem::take(&mut self.lagged_count)
}
#[must_use]
pub fn subscriber(&self) -> &'static str {
self.subscriber
}
#[must_use]
pub fn active_principals(&self) -> usize {
self.route_entry.lock().active_principals()
}
#[must_use]
pub fn total_bytes(&self) -> usize {
self.route_entry.lock().total_bytes
}
}
impl Drop for RoutedEventReceiver {
fn drop(&mut self) {
let mut routes = self.routes.write();
routes.remove(&self.route_key);
}
}