use std::collections::BTreeMap;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering::{Acquire, Release};
use std::sync::{Arc, Weak};
use std::time::Duration;
use parking_lot::Mutex;
use tracing::warn;
use super::{ChannelDescriptor, ChannelId};
use crate::log_sink_set::LogSinkSet;
use crate::sink::SmallSinkVec;
use crate::throttler::Throttler;
use crate::{Context, Metadata, PartialMetadata, Schema, SinkId, nanoseconds_since_epoch};
static WARN_THROTTLER_INTERVAL: Duration = Duration::from_secs(10);
pub struct RawChannel {
descriptor: ChannelDescriptor,
context: Weak<Context>,
sinks: LogSinkSet,
closed: AtomicBool,
warn_throttler: Mutex<Throttler>,
}
impl RawChannel {
pub(crate) fn new(
context: &Arc<Context>,
topic: String,
message_encoding: String,
schema: Option<Schema>,
metadata: BTreeMap<String, String>,
) -> Arc<Self> {
Arc::new(Self {
descriptor: ChannelDescriptor::new(
ChannelId::next(),
topic,
message_encoding,
metadata,
schema,
),
context: Arc::downgrade(context),
sinks: LogSinkSet::new(),
closed: AtomicBool::new(false),
warn_throttler: Mutex::new(Throttler::new(WARN_THROTTLER_INTERVAL)),
})
}
pub(crate) fn descriptor(&self) -> &ChannelDescriptor {
&self.descriptor
}
pub fn id(&self) -> ChannelId {
self.descriptor.id()
}
pub fn topic(&self) -> &str {
self.descriptor.topic()
}
pub fn schema(&self) -> Option<&Schema> {
self.descriptor.schema()
}
pub fn message_encoding(&self) -> &str {
self.descriptor.message_encoding()
}
pub fn metadata(&self) -> &BTreeMap<String, String> {
self.descriptor.metadata()
}
pub(crate) fn matches(&self, other: &Self) -> bool {
self.descriptor.matches(&other.descriptor)
}
pub fn close(&self) {
if !self.is_closed() {
if let Some(ctx) = self.context.upgrade() {
ctx.remove_channel(self.descriptor.id());
}
}
}
pub(crate) fn remove_from_context(&self) {
self.closed.store(true, Release);
self.sinks.clear();
}
fn is_closed(&self) -> bool {
self.closed.load(Acquire)
}
pub(crate) fn log_warn_if_closed(&self) {
if self.is_closed() && self.warn_throttler.lock().try_acquire() {
warn!("Cannot log on closed channel for {}", self.topic());
}
}
pub(crate) fn update_sinks(&self, sinks: SmallSinkVec) {
self.sinks.store(sinks);
}
pub fn has_sinks(&self) -> bool {
!self.sinks.is_empty()
}
#[cfg(all(test, feature = "websocket"))]
pub(crate) fn num_sinks(&self) -> usize {
self.sinks.len()
}
pub fn log(&self, msg: &[u8]) {
self.log_with_meta(msg, PartialMetadata::default());
}
pub fn log_to_sink(&self, msg: &[u8], sink_id: Option<SinkId>) {
self.log_with_meta_to_sink(msg, PartialMetadata::default(), sink_id);
}
pub fn log_with_meta(&self, msg: &[u8], opts: PartialMetadata) {
self.log_with_meta_to_sink(msg, opts, None);
}
pub fn log_with_meta_to_sink(
&self,
msg: &[u8],
opts: PartialMetadata,
sink_id: Option<SinkId>,
) {
if self.has_sinks() {
self.log_to_sinks(msg, opts, sink_id);
} else {
self.log_warn_if_closed();
}
}
pub(crate) fn log_to_sinks(&self, msg: &[u8], opts: PartialMetadata, sink_id: Option<SinkId>) {
let metadata = Metadata {
log_time: opts.log_time.unwrap_or_else(nanoseconds_since_epoch),
};
match sink_id {
Some(id) => {
self.sinks.for_each_filtered(
|sink| sink.id() == id,
|sink| sink.log(self, msg, &metadata),
);
}
None => {
self.sinks.for_each(|sink| sink.log(self, msg, &metadata));
}
}
}
}
#[cfg(test)]
impl PartialEq for RawChannel {
fn eq(&self, other: &Self) -> bool {
self.matches(other)
}
}
#[cfg(test)]
impl Eq for RawChannel {}
impl std::fmt::Debug for RawChannel {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Channel")
.field("id", &self.descriptor.id())
.field("topic", &self.descriptor.topic())
.field("message_encoding", &self.descriptor.message_encoding())
.field("schema", &self.descriptor.schema())
.field("metadata", &self.descriptor.metadata())
.finish_non_exhaustive()
}
}