#![cfg_attr(docsrs, feature(doc_cfg))]
use std::{fmt::Debug, sync::Mutex};
use log::{LevelFilter, Log, Metadata, Record, SetLoggerError};
use redis::{Connection, ConnectionLike};
#[cfg_attr(docsrs, doc(cfg(feature = "default_encoders")))]
#[cfg(feature = "default_encoders")]
mod defaults;
#[cfg(feature = "default_encoders")]
pub use defaults::*;
#[cfg(test)]
mod lib_tests;
pub trait PubSubEncoder: Send + Sync + Sized {
fn encode(&self, record: &Record) -> Vec<u8>;
}
pub trait StreamEncoder: Send + Sync + Sized {
fn encode(&self, record: &Record) -> Vec<(String, Vec<u8>)>;
}
#[derive(Debug)]
#[doc(hidden)]
#[non_exhaustive]
pub struct DummyPubSubEncoder {}
#[doc(hidden)]
impl PubSubEncoder for DummyPubSubEncoder {
fn encode(&self, _record: &Record) -> Vec<u8> {
panic!()
}
}
#[derive(Debug)]
#[doc(hidden)]
#[non_exhaustive]
pub struct DummyStreamEncoder {}
#[doc(hidden)]
impl StreamEncoder for DummyStreamEncoder {
fn encode(&self, _record: &Record) -> Vec<(String, Vec<u8>)> {
panic!()
}
}
#[derive(Debug)]
pub struct RedisLogger<PUBSUB, STREAM>
where
PUBSUB: PubSubEncoder,
STREAM: StreamEncoder,
{
level: LevelFilter,
config: RedisLoggerConfig<PUBSUB, STREAM>,
}
impl<PUBSUB, STREAM> RedisLogger<PUBSUB, STREAM>
where
PUBSUB: PubSubEncoder + 'static,
STREAM: StreamEncoder + 'static,
{
pub fn new(level: LevelFilter, config: RedisLoggerConfig<PUBSUB, STREAM>) -> Box<Self> {
Box::new(Self { level, config })
}
pub fn init(level: LevelFilter, config: RedisLoggerConfig<PUBSUB, STREAM>) -> Result<(), SetLoggerError> {
let redis_logger = Self::new(level, config);
log::set_max_level(level);
log::set_boxed_logger(redis_logger)?;
Ok(())
}
}
impl<PUBSUB, STREAM> Log for RedisLogger<PUBSUB, STREAM>
where
PUBSUB: PubSubEncoder,
STREAM: StreamEncoder,
{
fn enabled(&self, metadata: &Metadata) -> bool {
metadata.level() <= self.level
}
fn log(&self, record: &Record) {
if self.enabled(record.metadata()) {
let config = &self.config;
let mut pipe = redis::pipe();
if let Some((channels, encoder)) = &config.channels {
let message = encoder.encode(record);
for channel in channels {
pipe.publish(channel, &message);
}
}
if let Some((streams, encoder)) = &config.streams {
let message = encoder.encode(record);
let message = message.as_slice();
for stream in streams {
pipe.xadd(stream, "*", message);
}
}
if let Err(e) = pipe.query::<()>(&mut config.connection.lock().unwrap()) {
eprintln!("Error logging to Redis: {e}");
}
}
}
fn flush(&self) {}
}
struct DebuggableConnection {
connection: redis::Connection,
}
impl ConnectionLike for DebuggableConnection {
fn req_packed_command(&mut self, cmd: &[u8]) -> redis::RedisResult<redis::Value> {
self.connection.req_packed_command(cmd)
}
fn req_packed_commands(&mut self, cmd: &[u8], offset: usize, count: usize) -> redis::RedisResult<Vec<redis::Value>> {
self.connection.req_packed_commands(cmd, offset, count)
}
fn get_db(&self) -> i64 {
self.connection.get_db()
}
fn check_connection(&mut self) -> bool {
self.connection.check_connection()
}
fn is_open(&self) -> bool {
self.connection.is_open()
}
}
impl Debug for DebuggableConnection {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "DebuggableConnection")
}
}
#[derive(Debug)]
pub struct RedisLoggerConfig<PUBSUB, STREAM>
where
PUBSUB: PubSubEncoder,
STREAM: StreamEncoder,
{
connection: Mutex<DebuggableConnection>,
channels: Option<(Vec<String>, PUBSUB)>,
streams: Option<(Vec<String>, STREAM)>,
}
impl<PUBSUB, STREAM> RedisLoggerConfig<PUBSUB, STREAM>
where
PUBSUB: PubSubEncoder,
STREAM: StreamEncoder,
{
pub fn new(connection: Connection, channels: Option<(Vec<String>, PUBSUB)>, streams: Option<(Vec<String>, STREAM)>) -> Self {
Self {
connection: Mutex::new(DebuggableConnection { connection }),
channels,
streams,
}
}
}
#[derive(Debug)]
pub struct RedisLoggerConfigTemp<PUBSUB, STREAM>
where
PUBSUB: PubSubEncoder,
STREAM: StreamEncoder,
{
connection_str: String,
channels: Option<(Vec<String>, PUBSUB)>,
streams: Option<(Vec<String>, STREAM)>,
}
impl<PUBSUB, STREAM> RedisLoggerConfigTemp<PUBSUB, STREAM>
where
PUBSUB: PubSubEncoder,
STREAM: StreamEncoder,
{
pub fn new(connection_str: String, channels: Option<(Vec<String>, PUBSUB)>, streams: Option<(Vec<String>, STREAM)>) -> Self {
Self {
connection_str,
channels,
streams,
}
}
pub fn build(self) -> RedisLoggerConfig<PUBSUB, STREAM> {
let client = redis::Client::open(self.connection_str).unwrap();
let connection = client.get_connection().unwrap();
RedisLoggerConfig::new(connection, self.channels, self.streams)
}
}
#[derive(Debug)]
#[non_exhaustive]
pub struct RedisLoggerConfigBuilder {}
impl RedisLoggerConfigBuilder {
pub fn with_pubsub<PUBSUB>(
connection_str: String,
channels: Vec<String>,
encoder: PUBSUB,
) -> RedisLoggerConfigTemp<PUBSUB, DummyStreamEncoder>
where
PUBSUB: PubSubEncoder,
{
Self::check_args(!channels.is_empty());
RedisLoggerConfigTemp::new(connection_str, Some((channels, encoder)), None)
}
#[cfg(feature = "default_encoders")]
pub fn with_pubsub_default(
connection_str: String,
channels: Vec<String>,
) -> RedisLoggerConfigTemp<DefaultPubSubEncoder, DummyStreamEncoder> {
Self::check_args(!channels.is_empty());
RedisLoggerConfigTemp::new(connection_str, Some((channels, DefaultPubSubEncoder::new())), None)
}
pub fn with_streams<STREAM>(
connection_str: String,
streams: Vec<String>,
encoder: STREAM,
) -> RedisLoggerConfigTemp<DummyPubSubEncoder, STREAM>
where
STREAM: StreamEncoder,
{
Self::check_args(!streams.is_empty());
RedisLoggerConfigTemp::new(connection_str, None, Some((streams, encoder)))
}
#[cfg(feature = "default_encoders")]
pub fn with_streams_default(
connection_str: String,
streams: Vec<String>,
) -> RedisLoggerConfigTemp<DummyPubSubEncoder, DefaultStreamEncoder> {
Self::check_args(!streams.is_empty());
RedisLoggerConfigTemp::new(connection_str, None, Some((streams, DefaultStreamEncoder::new())))
}
pub fn with_pubsub_and_streams<PUBSUB, STREAM>(
connection_str: String,
channels: Vec<String>,
pubsub_encoder: PUBSUB,
streams: Vec<String>,
stream_encoder: STREAM,
) -> RedisLoggerConfigTemp<PUBSUB, STREAM>
where
PUBSUB: PubSubEncoder,
STREAM: StreamEncoder,
{
Self::check_args(!channels.is_empty() && !streams.is_empty());
RedisLoggerConfigTemp::new(
connection_str,
Some((channels, pubsub_encoder)),
Some((streams, stream_encoder)),
)
}
#[cfg(feature = "default_encoders")]
pub fn with_pubsub_and_streams_default(
connection_str: String,
channels: Vec<String>,
streams: Vec<String>,
) -> RedisLoggerConfigTemp<DefaultPubSubEncoder, DefaultStreamEncoder> {
Self::check_args(!channels.is_empty() && !streams.is_empty());
RedisLoggerConfigTemp::new(
connection_str,
Some((channels, DefaultPubSubEncoder::new())),
Some((streams, DefaultStreamEncoder::new())),
)
}
const fn check_args(value: bool) {
assert!(
value,
"Channels not set in RedisLogger. Set at least one pub/sub channel and/or one stream channel."
);
}
}
#[cfg_attr(docsrs, doc(cfg(feature = "shared_logger")))]
#[cfg(feature = "shared_logger")]
impl<PUBSUB, STREAM> simplelog::SharedLogger for RedisLogger<PUBSUB, STREAM>
where
PUBSUB: PubSubEncoder + 'static,
STREAM: StreamEncoder + 'static,
{
fn level(&self) -> log::LevelFilter {
self.level
}
fn config(&self) -> Option<&simplelog::Config> {
None
}
fn as_log(self: Box<Self>) -> Box<dyn Log> {
self
}
}