use std::collections::BTreeMap;
use std::sync::Arc;
use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering::Relaxed;
use delegate::delegate;
use serde::{Deserialize, Serialize};
use smallbytes::SmallBytes;
use crate::{ChannelBuilder, Encode, PartialMetadata, Schema, SinkId, metadata::ToUnixNanos};
mod channel_descriptor;
mod lazy_channel;
mod raw_channel;
pub use channel_descriptor::ChannelDescriptor;
pub use lazy_channel::{LazyChannel, LazyRawChannel};
pub use raw_channel::RawChannel;
const STACK_BUFFER_SIZE: usize = 256 * 1024;
const MAX_SAFE_DOUBLE_INTEGER_VALUE: u64 = 1 << 53;
#[derive(Clone, Copy, PartialEq, Eq, Hash, Debug, Deserialize, Serialize)]
pub struct ChannelId(u64);
impl ChannelId {
pub fn new(id: u64) -> Self {
Self(id)
}
pub(crate) fn next() -> Self {
static NEXT_ID: AtomicU64 = AtomicU64::new(1);
let id = NEXT_ID.fetch_add(1, Relaxed);
assert!(
id < MAX_SAFE_DOUBLE_INTEGER_VALUE,
"ChannelId overflow, you win the prize!"
);
Self(id)
}
}
impl From<ChannelId> for u64 {
fn from(id: ChannelId) -> u64 {
id.0
}
}
impl From<u64> for ChannelId {
fn from(value: u64) -> Self {
ChannelId::new(value)
}
}
impl std::fmt::Display for ChannelId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0)
}
}
#[derive(Debug)]
pub struct Channel<T: Encode> {
inner: Arc<RawChannel>,
_phantom: std::marker::PhantomData<T>,
}
impl<T: Encode> Channel<T> {
pub fn new(topic: impl Into<String>) -> Self {
ChannelBuilder::new(topic).build()
}
#[doc(hidden)]
pub fn from_raw_channel(raw_channel: Arc<RawChannel>) -> Self {
Self {
inner: raw_channel,
_phantom: std::marker::PhantomData,
}
}
#[doc(hidden)]
pub fn into_inner(self) -> Arc<RawChannel> {
self.inner
}
delegate! { to self.inner {
pub fn id(&self) -> ChannelId;
pub fn topic(&self) -> &str;
pub fn schema(&self) -> Option<&Schema>;
pub fn message_encoding(&self) -> &str;
pub fn metadata(&self) -> &BTreeMap<String, String>;
pub fn has_sinks(&self) -> bool;
pub fn close(&self);
} }
pub fn log(&self, msg: &T) {
self.log_with_meta(msg, PartialMetadata::default());
}
pub fn log_to_sink(&self, msg: &T, sink_id: Option<SinkId>) {
self.log_with_meta_to_sink(msg, PartialMetadata::default(), sink_id);
}
pub fn log_with_meta(&self, msg: &T, metadata: PartialMetadata) {
self.log_with_meta_to_sink(msg, metadata, None);
}
pub fn log_with_meta_to_sink(
&self,
msg: &T,
metadata: PartialMetadata,
sink_id: Option<SinkId>,
) {
if self.has_sinks() {
self.log_to_sinks(msg, metadata, sink_id);
} else {
self.inner.log_warn_if_closed();
}
}
pub fn log_with_time(&self, msg: &T, timestamp: impl ToUnixNanos) {
self.log_with_meta(msg, PartialMetadata::with_log_time(timestamp))
}
fn log_to_sinks(&self, msg: &T, metadata: PartialMetadata, sink_id: Option<SinkId>) {
let mut buf: SmallBytes<STACK_BUFFER_SIZE> = SmallBytes::new();
if let Some(estimated_size) = msg.encoded_len() {
buf.reserve(estimated_size);
}
msg.encode(&mut buf).unwrap();
self.inner.log_to_sinks(&buf, metadata, sink_id);
}
}
#[cfg(test)]
mod test {
use crate::channel_builder::ChannelBuilder;
use crate::log_sink_set::ERROR_LOGGING_MESSAGE;
use crate::testutil::RecordingSink;
use crate::{Context, FoxgloveError, RawChannel, Schema, Sink};
use std::sync::Arc;
use tracing_test::traced_test;
fn new_test_channel(ctx: &Arc<Context>) -> Result<Arc<RawChannel>, FoxgloveError> {
ChannelBuilder::new("/topic")
.context(ctx)
.message_encoding("message_encoding")
.schema(Schema::new(
"name",
"encoding",
br#"{
"type": "object",
"properties": {
"msg": {"type": "string"},
"count": {"type": "number"},
},
}"#,
))
.metadata(maplit::btreemap! {"key".to_string() => "value".to_string()})
.build_raw()
}
#[test]
fn test_channel_new() {
let ctx = Context::new();
let topic = "topic";
let message_encoding = "message_encoding";
let schema = Schema::new("schema_name", "schema_encoding", &[1, 2, 3]);
let metadata = maplit::btreemap! {"key".to_string() => "value".to_string()};
let channel = ChannelBuilder::new(topic)
.message_encoding(message_encoding)
.schema(schema.clone())
.metadata(metadata.clone())
.context(&ctx)
.build_raw()
.expect("Failed to create channel");
assert!(u64::from(channel.id()) > 0);
assert_eq!(channel.topic(), topic);
assert_eq!(channel.message_encoding(), message_encoding);
assert_eq!(channel.schema(), Some(&schema));
assert_eq!(channel.metadata(), &metadata);
assert_eq!(ctx.get_channel_by_topic(topic), Some(channel));
}
#[traced_test]
#[test]
fn test_channel_log_msg() {
let ctx = Context::new();
let channel = new_test_channel(&ctx).unwrap();
let msg = vec![1, 2, 3];
channel.log(&msg);
assert!(!logs_contain(ERROR_LOGGING_MESSAGE));
}
#[traced_test]
#[test]
fn test_log_msg_success() {
let ctx = Context::new();
let recording_sink = Arc::new(RecordingSink::new());
assert!(ctx.add_sink(recording_sink.clone()));
let channel = new_test_channel(&ctx).unwrap();
let msg = b"test_message";
channel.log(msg);
assert!(!logs_contain(ERROR_LOGGING_MESSAGE));
let messages = recording_sink.take_messages();
assert_eq!(messages.len(), 1);
assert_eq!(messages[0].channel_id, channel.id());
assert_eq!(messages[0].msg, msg.to_vec());
assert!(messages[0].metadata.log_time > 1732847588055322395);
}
#[traced_test]
#[test]
fn test_log_empty_message_reaches_sink() {
let ctx = Context::new();
let recording_sink = Arc::new(RecordingSink::new());
assert!(ctx.add_sink(recording_sink.clone()));
let channel = new_test_channel(&ctx).unwrap();
channel.log(b"");
assert!(!logs_contain(ERROR_LOGGING_MESSAGE));
let messages = recording_sink.take_messages();
assert_eq!(messages.len(), 1, "empty message should still be delivered");
assert_eq!(messages[0].channel_id, channel.id());
assert!(
messages[0].msg.is_empty(),
"sink should receive the zero-length payload",
);
}
#[traced_test]
#[test]
fn test_channel_close() {
let ctx = Context::new();
let ch = new_test_channel(&ctx).unwrap();
ch.log(b"");
assert!(!logs_contain("Cannot log on closed channel for /topic"));
ch.close();
ch.log(b"");
assert!(logs_contain("Cannot log on closed channel for /topic"));
}
#[traced_test]
#[test]
fn test_channel_closed_by_context() {
let ctx = Context::new();
let ch = new_test_channel(&ctx).unwrap();
ch.log(b"");
assert!(!logs_contain("Cannot log on closed channel for /topic"));
drop(ctx);
ch.log(b"");
assert!(logs_contain("Cannot log on closed channel for /topic"));
}
#[traced_test]
#[test]
fn test_log_to_specific_sink() {
let ctx = Context::new();
let sink1 = Arc::new(RecordingSink::new());
let sink2 = Arc::new(RecordingSink::new());
let sink3 = Arc::new(RecordingSink::new());
assert!(ctx.add_sink(sink1.clone()));
assert!(ctx.add_sink(sink2.clone()));
assert!(ctx.add_sink(sink3.clone()));
let channel = ChannelBuilder::new("/test_topic")
.context(&ctx)
.message_encoding("raw")
.build_raw()
.expect("Failed to create channel");
let msg_all = b"message for all sinks";
channel.log(msg_all);
let msg_sink2_only = b"message for sink2 only";
channel.log_to_sink(msg_sink2_only, Some(sink2.id()));
let msg_sink3_only = b"message for sink3 only";
channel.log_to_sink(msg_sink3_only, Some(sink3.id()));
let sink1_messages = sink1.take_messages();
let sink2_messages = sink2.take_messages();
let sink3_messages = sink3.take_messages();
assert_eq!(sink1_messages.len(), 1);
assert_eq!(sink1_messages[0].msg, msg_all.to_vec());
assert_eq!(sink2_messages.len(), 2);
assert_eq!(sink2_messages[0].msg, msg_all.to_vec());
assert_eq!(sink2_messages[1].msg, msg_sink2_only.to_vec());
assert_eq!(sink3_messages.len(), 2);
assert_eq!(sink3_messages[0].msg, msg_all.to_vec());
assert_eq!(sink3_messages[1].msg, msg_sink3_only.to_vec());
let non_existent_id = crate::SinkId::next();
channel.log_to_sink(b"message to nowhere", Some(non_existent_id));
assert_eq!(sink1.take_messages().len(), 0);
assert_eq!(sink2.take_messages().len(), 0);
assert_eq!(sink3.take_messages().len(), 0);
assert!(!logs_contain(ERROR_LOGGING_MESSAGE));
}
}