use crossbeam_channel::{unbounded, Sender as CbSender};
use std::collections::{HashMap, VecDeque};
use std::sync::atomic::AtomicU64;
use std::sync::{Arc, OnceLock, RwLock};
#[cfg(target_os = "linux")]
use quanta::Instant;
#[cfg(not(target_os = "linux"))]
use std::time::Instant;
pub mod guard;
pub use guard::{StreamsGuard, StreamsGuardBuilder};
pub(crate) mod wrapper;
use crate::http_server::HTTP_SERVER_PORT;
pub use crate::json::{ChannelState, LogEntry, SerializableStreamStats, StreamLogs, StreamsJson};
pub use crate::Format;
#[derive(Debug, Clone)]
pub(crate) struct StreamStats {
pub(crate) id: u64,
pub(crate) source: &'static str,
pub(crate) label: Option<String>,
pub(crate) state: ChannelState, pub(crate) items_yielded: u64,
pub(crate) type_name: &'static str,
pub(crate) type_size: usize,
pub(crate) logs: VecDeque<LogEntry>,
pub(crate) iter: u32,
}
impl From<&StreamStats> for SerializableStreamStats {
fn from(stream_stats: &StreamStats) -> Self {
let label = crate::channels::resolve_label(
stream_stats.source,
stream_stats.label.as_deref(),
Some(stream_stats.iter),
);
Self {
id: stream_stats.id,
source: stream_stats.source.to_string(),
label,
has_custom_label: stream_stats.label.is_some(),
state: stream_stats.state,
items_yielded: stream_stats.items_yielded,
type_name: stream_stats.type_name.to_string(),
type_size: stream_stats.type_size,
iter: stream_stats.iter,
}
}
}
impl StreamStats {
fn new(
id: u64,
source: &'static str,
label: Option<String>,
type_name: &'static str,
type_size: usize,
iter: u32,
) -> Self {
Self {
id,
source,
label,
state: ChannelState::Active,
items_yielded: 0,
type_name,
type_size,
logs: VecDeque::new(),
iter,
}
}
}
#[derive(Debug)]
pub(crate) enum StreamEvent {
Created {
id: u64,
source: &'static str,
display_label: Option<String>,
type_name: &'static str,
type_size: usize,
},
Yielded {
id: u64,
log: Option<String>,
timestamp: Instant,
},
Completed {
id: u64,
},
}
pub(crate) type StreamStatsState = (
CbSender<StreamEvent>,
Arc<RwLock<HashMap<u64, StreamStats>>>,
);
static STREAMS_STATE: OnceLock<StreamStatsState> = OnceLock::new();
pub(crate) static STREAM_ID_COUNTER: AtomicU64 = AtomicU64::new(0);
pub(crate) fn init_streams_state() -> &'static StreamStatsState {
STREAMS_STATE.get_or_init(|| {
crate::channels::START_TIME.get_or_init(Instant::now);
let (tx, rx) = unbounded::<StreamEvent>();
let stats_map = Arc::new(RwLock::new(HashMap::<u64, StreamStats>::new()));
let stats_map_clone = Arc::clone(&stats_map);
std::thread::Builder::new()
.name("hp-streams".into())
.spawn(move || {
while let Ok(event) = rx.recv() {
let mut stats = stats_map_clone.write().unwrap();
match event {
StreamEvent::Created {
id,
source,
display_label,
type_name,
type_size,
} => {
let iter = stats.values().filter(|s| s.source == source).count() as u32;
stats.insert(
id,
StreamStats::new(
id,
source,
display_label,
type_name,
type_size,
iter,
),
);
}
StreamEvent::Yielded { id, log, timestamp } => {
if let Some(stream_stats) = stats.get_mut(&id) {
stream_stats.items_yielded += 1;
let limit = crate::channels::get_log_limit();
if stream_stats.logs.len() >= limit {
stream_stats.logs.pop_front();
}
stream_stats.logs.push_back(LogEntry::new(
stream_stats.items_yielded,
crate::channels::timestamp_nanos(timestamp),
log,
None,
));
}
}
StreamEvent::Completed { id } => {
if let Some(stream_stats) = stats.get_mut(&id) {
stream_stats.state = ChannelState::Closed;
}
}
}
}
})
.expect("Failed to spawn stream-stats-collector thread");
crate::http_server::start_metrics_server_once(*HTTP_SERVER_PORT);
(tx, stats_map)
})
}
#[doc(hidden)]
pub trait InstrumentStream {
type Output;
fn instrument_stream(self, source: &'static str, label: Option<String>) -> Self::Output;
}
#[doc(hidden)]
pub trait InstrumentStreamLog {
type Output;
fn instrument_stream_log(self, source: &'static str, label: Option<String>) -> Self::Output;
}
impl<S> InstrumentStream for S
where
S: futures_util::Stream,
{
type Output = crate::streams::wrapper::InstrumentedStream<S>;
fn instrument_stream(self, source: &'static str, label: Option<String>) -> Self::Output {
crate::streams::wrapper::InstrumentedStream::new(self, source, label)
}
}
impl<S> InstrumentStreamLog for S
where
S: futures_util::Stream,
S::Item: std::fmt::Debug,
{
type Output = crate::streams::wrapper::InstrumentedStreamLog<S>;
fn instrument_stream_log(self, source: &'static str, label: Option<String>) -> Self::Output {
crate::streams::wrapper::InstrumentedStreamLog::new(self, source, label)
}
}
#[macro_export]
macro_rules! stream {
($expr:expr) => {{
const STREAM_ID: &'static str = concat!(file!(), ":", line!());
$crate::InstrumentStream::instrument_stream($expr, STREAM_ID, None)
}};
($expr:expr, label = $label:expr) => {{
const STREAM_ID: &'static str = concat!(file!(), ":", line!());
$crate::InstrumentStream::instrument_stream($expr, STREAM_ID, Some($label.to_string()))
}};
($expr:expr, log = true) => {{
const STREAM_ID: &'static str = concat!(file!(), ":", line!());
$crate::InstrumentStreamLog::instrument_stream_log($expr, STREAM_ID, None)
}};
($expr:expr, label = $label:expr, log = true) => {{
const STREAM_ID: &'static str = concat!(file!(), ":", line!());
$crate::InstrumentStreamLog::instrument_stream_log(
$expr,
STREAM_ID,
Some($label.to_string()),
)
}};
($expr:expr, log = true, label = $label:expr) => {{
const STREAM_ID: &'static str = concat!(file!(), ":", line!());
$crate::InstrumentStreamLog::instrument_stream_log(
$expr,
STREAM_ID,
Some($label.to_string()),
)
}};
}
fn get_all_stream_stats() -> HashMap<u64, StreamStats> {
if let Some((_, stats_map)) = STREAMS_STATE.get() {
stats_map.read().unwrap().clone()
} else {
HashMap::new()
}
}
fn compare_stream_stats(a: &StreamStats, b: &StreamStats) -> std::cmp::Ordering {
let a_has_label = a.label.is_some();
let b_has_label = b.label.is_some();
match (a_has_label, b_has_label) {
(true, false) => std::cmp::Ordering::Less,
(false, true) => std::cmp::Ordering::Greater,
(true, true) => a
.label
.as_ref()
.unwrap()
.cmp(b.label.as_ref().unwrap())
.then_with(|| a.iter.cmp(&b.iter)),
(false, false) => a.source.cmp(b.source).then_with(|| a.iter.cmp(&b.iter)),
}
}
pub(crate) fn get_sorted_stream_stats() -> Vec<StreamStats> {
let mut stats: Vec<StreamStats> = get_all_stream_stats().into_values().collect();
stats.sort_by(compare_stream_stats);
stats
}
pub fn get_streams_json() -> StreamsJson {
let streams = get_sorted_stream_stats()
.iter()
.map(SerializableStreamStats::from)
.collect();
let current_elapsed_ns = crate::channels::START_TIME
.get()
.expect("START_TIME must be initialized")
.elapsed()
.as_nanos() as u64;
StreamsJson {
current_elapsed_ns,
streams,
}
}
pub fn get_stream_logs(stream_id: &str) -> Option<StreamLogs> {
let id = stream_id.parse::<u64>().ok()?;
let stats = get_all_stream_stats();
stats.get(&id).map(|stream_stats| {
let mut yielded_logs: Vec<LogEntry> = stream_stats.logs.iter().cloned().collect();
yielded_logs.sort_by(|a, b| b.index.cmp(&a.index));
StreamLogs {
id: stream_id.to_string(),
logs: yielded_logs,
}
})
}