use crossbeam_channel::{bounded, select, unbounded, Receiver as CbReceiver, Sender as CbSender};
use std::collections::{HashMap, VecDeque};
use std::sync::{Arc, Mutex, OnceLock, RwLock};
use crate::instant::Instant;
pub(crate) mod wrapper;
use crate::channels::{resolve_label, LOGS_LIMIT};
use crate::data_flow::{WORKER_BATCH_SIZE, WORKER_FLUSH_INTERVAL_MS, WORKER_SHUTDOWN_DRAIN_LIMIT};
use crate::json::JsonStreamEntry;
pub(crate) use crate::json::{ChannelState, DataFlowLogEntry, StreamLogs};
use crate::metrics_server::METRICS_SERVER_PORT;
pub use crate::Format;
#[derive(Debug, Clone)]
pub(crate) struct StreamStats {
pub(crate) id: u32,
pub(crate) source: &'static str,
pub(crate) label: Option<String>,
pub(crate) state: ChannelState, pub(crate) items_yielded: u64,
pub(crate) type_name: &'static str,
pub(crate) type_size: usize,
pub(crate) iter: u32,
}
#[cfg_attr(feature = "hotpath-meta", hotpath_meta::measure_all)]
impl StreamStats {
fn new(
id: u32,
source: &'static str,
label: Option<String>,
type_name: &'static str,
type_size: usize,
iter: u32,
) -> Self {
Self {
id,
source,
label,
state: ChannelState::Active,
items_yielded: 0,
type_name,
type_size,
iter,
}
}
}
#[derive(Debug)]
pub(crate) struct StreamStatsLogs {
pub(crate) logs: VecDeque<DataFlowLogEntry>,
}
impl StreamStatsLogs {
fn new() -> Self {
Self {
logs: VecDeque::with_capacity(*LOGS_LIMIT),
}
}
}
pub(crate) struct StreamsInternalState {
pub(crate) stats: HashMap<u32, StreamStats>,
pub(crate) logs: HashMap<u32, StreamStatsLogs>,
}
impl From<&StreamStats> for JsonStreamEntry {
fn from(stats: &StreamStats) -> Self {
let label = resolve_label(stats.source, stats.label.as_deref(), Some(stats.iter));
JsonStreamEntry {
id: stats.id,
source: stats.source.to_string(),
label,
has_custom_label: stats.label.is_some(),
state: stats.state.as_str().to_string(),
items_yielded: stats.items_yielded,
type_name: stats.type_name.to_string(),
type_size: stats.type_size,
iter: stats.iter,
}
}
}
#[derive(Debug)]
pub(crate) enum StreamEvent {
Created {
id: u32,
source: &'static str,
display_label: Option<String>,
type_name: &'static str,
type_size: usize,
},
Yielded {
id: u32,
log: Option<String>,
timestamp: Instant,
},
Completed {
id: u32,
},
}
pub(crate) struct StreamsState {
pub(crate) event_tx: CbSender<StreamEvent>,
pub(crate) inner: Arc<RwLock<StreamsInternalState>>,
pub(crate) shutdown_tx: Mutex<Option<CbSender<()>>>,
pub(crate) completion_rx: Mutex<Option<CbReceiver<()>>>,
}
pub(crate) type StreamStatsState = StreamsState;
pub(crate) static STREAMS_STATE: OnceLock<StreamStatsState> = OnceLock::new();
#[cfg_attr(feature = "hotpath-meta", hotpath_meta::measure(log = true))]
fn process_stream_event(state: &mut StreamsInternalState, event: StreamEvent) {
match event {
StreamEvent::Created {
id,
source,
display_label,
type_name,
type_size,
} => {
let iter = state.stats.values().filter(|s| s.source == source).count() as u32;
state.stats.insert(
id,
StreamStats::new(id, source, display_label, type_name, type_size, iter),
);
state.logs.insert(id, StreamStatsLogs::new());
}
StreamEvent::Yielded { id, log, timestamp } => {
if let Some(stream_stats) = state.stats.get_mut(&id) {
stream_stats.items_yielded += 1;
}
if let Some(entry_logs) = state.logs.get_mut(&id) {
let items_yielded = state.stats.get(&id).map_or(0, |s| s.items_yielded);
let limit = *crate::channels::LOGS_LIMIT;
if entry_logs.logs.len() >= limit {
entry_logs.logs.pop_front();
}
entry_logs.logs.push_back(DataFlowLogEntry::new(
items_yielded,
crate::channels::timestamp_nanos(timestamp),
log,
None,
));
}
}
StreamEvent::Completed { id } => {
if let Some(stream_stats) = state.stats.get_mut(&id) {
stream_stats.state = ChannelState::Closed;
}
}
}
}
#[cfg_attr(feature = "hotpath-meta", hotpath_meta::measure)]
pub(crate) fn init_streams_state() -> &'static StreamStatsState {
STREAMS_STATE.get_or_init(|| {
crate::lib_on::START_TIME.get_or_init(Instant::now);
let (event_tx, event_rx) = unbounded::<StreamEvent>();
#[cfg(feature = "hotpath-meta")]
let (event_tx, event_rx) =
hotpath_meta::channel!((event_tx, event_rx), label = "hp-st-events", log = true);
let (shutdown_tx, shutdown_rx) = bounded::<()>(1);
#[cfg(feature = "hotpath-meta")]
let (shutdown_tx, shutdown_rx) = hotpath_meta::channel!(
(shutdown_tx, shutdown_rx),
label = "hp-st-shutdown",
log = true
);
let (completion_tx, completion_rx) = bounded::<()>(1);
#[cfg(feature = "hotpath-meta")]
let (completion_tx, completion_rx) = hotpath_meta::channel!(
(completion_tx, completion_rx),
label = "hp-st-completion",
log = true
);
let inner = Arc::new(RwLock::new(StreamsInternalState {
stats: HashMap::new(),
logs: HashMap::new(),
}));
let inner_clone = Arc::clone(&inner);
std::thread::Builder::new()
.name("hp-streams".into())
.spawn(move || {
let mut local_buffer: Vec<StreamEvent> = Vec::with_capacity(WORKER_BATCH_SIZE);
let flush_interval = std::time::Duration::from_millis(WORKER_FLUSH_INTERVAL_MS);
loop {
select! {
recv(event_rx) -> result => {
match result {
Ok(event) => {
local_buffer.push(event);
if local_buffer.len() >= WORKER_BATCH_SIZE {
if let Ok(mut shared) = inner_clone.write() {
for e in local_buffer.drain(..) {
process_stream_event(&mut shared, e);
}
}
}
}
Err(_) => {
if !local_buffer.is_empty() {
if let Ok(mut shared) = inner_clone.write() {
for e in local_buffer.drain(..) {
process_stream_event(&mut shared, e);
}
}
}
break;
}
}
}
recv(shutdown_rx) -> _ => {
let mut drained_events = Vec::with_capacity(WORKER_BATCH_SIZE);
for _ in 0..WORKER_SHUTDOWN_DRAIN_LIMIT {
match event_rx.try_recv() {
Ok(event) => drained_events.push(event),
Err(_) => break,
}
}
if let Ok(mut shared) = inner_clone.write() {
for e in local_buffer.drain(..) {
process_stream_event(&mut shared, e);
}
for event in drained_events {
process_stream_event(&mut shared, event);
}
}
break;
}
default(flush_interval) => {
if !local_buffer.is_empty() {
if let Ok(mut shared) = inner_clone.write() {
for e in local_buffer.drain(..) {
process_stream_event(&mut shared, e);
}
}
}
}
}
}
let _ = completion_tx.send(());
})
.expect("Failed to spawn stream-stats-collector thread");
crate::metrics_server::start_metrics_server_once(*METRICS_SERVER_PORT);
StreamsState {
event_tx,
inner,
shutdown_tx: Mutex::new(Some(shutdown_tx)),
completion_rx: Mutex::new(Some(completion_rx)),
}
})
}
#[doc(hidden)]
pub trait InstrumentStream {
type Output;
fn instrument_stream(self, source: &'static str, label: Option<String>) -> Self::Output;
}
#[doc(hidden)]
pub trait InstrumentStreamLog {
type Output;
fn instrument_stream_log(self, source: &'static str, label: Option<String>) -> Self::Output;
}
impl<S> InstrumentStream for S
where
S: futures_util::Stream,
{
type Output = crate::streams::wrapper::InstrumentedStream<S>;
fn instrument_stream(self, source: &'static str, label: Option<String>) -> Self::Output {
crate::streams::wrapper::InstrumentedStream::new(self, source, label)
}
}
impl<S> InstrumentStreamLog for S
where
S: futures_util::Stream,
S::Item: std::fmt::Debug,
{
type Output = crate::streams::wrapper::InstrumentedStreamLog<S>;
fn instrument_stream_log(self, source: &'static str, label: Option<String>) -> Self::Output {
crate::streams::wrapper::InstrumentedStreamLog::new(self, source, label)
}
}
#[macro_export]
macro_rules! stream {
($expr:expr) => {{
const STREAM_ID: &'static str = concat!(file!(), ":", line!());
$crate::InstrumentStream::instrument_stream($expr, STREAM_ID, None)
}};
($expr:expr, label = $label:expr) => {{
const STREAM_ID: &'static str = concat!(file!(), ":", line!());
$crate::InstrumentStream::instrument_stream($expr, STREAM_ID, Some($label.to_string()))
}};
($expr:expr, log = true) => {{
const STREAM_ID: &'static str = concat!(file!(), ":", line!());
$crate::InstrumentStreamLog::instrument_stream_log($expr, STREAM_ID, None)
}};
($expr:expr, label = $label:expr, log = true) => {{
const STREAM_ID: &'static str = concat!(file!(), ":", line!());
$crate::InstrumentStreamLog::instrument_stream_log(
$expr,
STREAM_ID,
Some($label.to_string()),
)
}};
($expr:expr, log = true, label = $label:expr) => {{
const STREAM_ID: &'static str = concat!(file!(), ":", line!());
$crate::InstrumentStreamLog::instrument_stream_log(
$expr,
STREAM_ID,
Some($label.to_string()),
)
}};
}
#[cfg_attr(feature = "hotpath-meta", hotpath_meta::measure(log = true))]
pub(crate) fn compare_stream_stats(a: &StreamStats, b: &StreamStats) -> std::cmp::Ordering {
let a_has_label = a.label.is_some();
let b_has_label = b.label.is_some();
match (a_has_label, b_has_label) {
(true, false) => std::cmp::Ordering::Less,
(false, true) => std::cmp::Ordering::Greater,
(true, true) => a
.label
.as_ref()
.unwrap()
.cmp(b.label.as_ref().unwrap())
.then_with(|| a.iter.cmp(&b.iter)),
(false, false) => a.source.cmp(b.source).then_with(|| a.iter.cmp(&b.iter)),
}
}
#[cfg_attr(feature = "hotpath-meta", hotpath_meta::measure(log = true))]
pub(crate) fn get_sorted_stream_stats() -> Vec<StreamStats> {
let Some(state) = STREAMS_STATE.get() else {
return Vec::new();
};
let guard = state.inner.read().unwrap();
let mut stats: Vec<StreamStats> = guard.stats.values().cloned().collect();
stats.sort_by(compare_stream_stats);
stats
}
#[cfg_attr(feature = "hotpath-meta", hotpath_meta::measure(log = true))]
pub(crate) fn get_stream_logs(stream_id: &str) -> Option<StreamLogs> {
let id = stream_id.parse::<u32>().ok()?;
let state = STREAMS_STATE.get()?;
let guard = state.inner.read().unwrap();
let entry_logs = guard.logs.get(&id)?;
let mut yielded_logs: Vec<DataFlowLogEntry> = entry_logs.logs.iter().cloned().collect();
yielded_logs.sort_by(|a, b| b.index.cmp(&a.index));
Some(StreamLogs {
id: stream_id.to_string(),
logs: yielded_logs,
})
}