use std::io::{self, Write};
use std::sync::{Arc, Mutex, RwLock};
use std::time::{SystemTime, UNIX_EPOCH};
use super::types::{FieldValue, LogLevel, LogRecord, TraceConfig};
pub trait LogSink: Send + Sync {
fn write(&self, record: &LogRecord);
}
pub struct ConsoleSink {
use_stdout: bool,
}
impl ConsoleSink {
pub fn new() -> Self {
Self { use_stdout: false }
}
pub fn stdout() -> Self {
Self { use_stdout: true }
}
pub fn format_json(record: &LogRecord) -> String {
let mut fields_json = String::new();
for (i, (k, v)) in record.fields.iter().enumerate() {
if i > 0 {
fields_json.push(',');
}
let escaped_key = k.replace('\\', "\\\\").replace('"', "\\\"");
fields_json.push('"');
fields_json.push_str(&escaped_key);
fields_json.push_str("\":");
fields_json.push_str(&v.to_json_value());
}
let mut out = format!(
"{{\"level\":\"{}\",\"msg\":{},\"ts\":{}",
record.level.as_str(),
FieldValue::Str(record.message.clone()).to_json_value(),
record.timestamp_ns,
);
if let Some(tid) = record.trace_id {
out.push_str(&format!(",\"trace_id\":{}", tid));
}
if let Some(sid) = record.span_id {
out.push_str(&format!(",\"span_id\":{}", sid));
}
if !fields_json.is_empty() {
out.push_str(&format!(",\"fields\":{{{}}}", fields_json));
}
out.push('}');
out
}
}
impl Default for ConsoleSink {
fn default() -> Self {
Self::new()
}
}
impl LogSink for ConsoleSink {
fn write(&self, record: &LogRecord) {
let line = Self::format_json(record);
if self.use_stdout {
let stdout = io::stdout();
let mut handle = stdout.lock();
let _ = writeln!(handle, "{}", line);
} else {
let stderr = io::stderr();
let mut handle = stderr.lock();
let _ = writeln!(handle, "{}", line);
}
}
}
pub struct MemorySink {
records: Mutex<Vec<LogRecord>>,
}
impl MemorySink {
pub fn new() -> Self {
Self {
records: Mutex::new(Vec::new()),
}
}
pub fn records(&self) -> Vec<LogRecord> {
self.records.lock().map(|g| g.clone()).unwrap_or_default()
}
pub fn clear(&self) {
if let Ok(mut g) = self.records.lock() {
g.clear();
}
}
pub fn len(&self) -> usize {
self.records.lock().map(|g| g.len()).unwrap_or(0)
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
}
impl Default for MemorySink {
fn default() -> Self {
Self::new()
}
}
impl LogSink for MemorySink {
fn write(&self, record: &LogRecord) {
if let Ok(mut g) = self.records.lock() {
g.push(record.clone());
}
}
}
#[derive(Debug, Clone)]
pub struct OtlpLogRecord {
pub time_unix_nano: u64,
pub severity_number: u32,
pub severity_text: String,
pub body: String,
pub attributes: Vec<(String, String)>,
pub trace_id: Option<u64>,
pub span_id: Option<u64>,
}
pub struct OtelLogSink {
records: Mutex<Vec<OtlpLogRecord>>,
}
impl OtelLogSink {
pub fn new() -> Self {
Self {
records: Mutex::new(Vec::new()),
}
}
pub fn to_otlp(record: &LogRecord) -> OtlpLogRecord {
let attributes: Vec<(String, String)> = record
.fields
.iter()
.map(|(k, v)| (k.clone(), v.to_json_value()))
.collect();
OtlpLogRecord {
time_unix_nano: record.timestamp_ns,
severity_number: record.level.to_otlp_severity(),
severity_text: record.level.as_str().to_owned(),
body: record.message.clone(),
attributes,
trace_id: record.trace_id,
span_id: record.span_id,
}
}
pub fn otlp_records(&self) -> Vec<OtlpLogRecord> {
self.records.lock().map(|g| g.clone()).unwrap_or_default()
}
}
impl Default for OtelLogSink {
fn default() -> Self {
Self::new()
}
}
impl LogSink for OtelLogSink {
fn write(&self, record: &LogRecord) {
let otlp = Self::to_otlp(record);
if let Ok(mut g) = self.records.lock() {
g.push(otlp);
}
}
}
pub struct LogBuilder {
fields: Vec<(String, FieldValue)>,
trace_id: Option<u64>,
span_id: Option<u64>,
logger: Arc<StructuredLogger>,
}
impl LogBuilder {
fn new(logger: Arc<StructuredLogger>) -> Self {
Self {
fields: Vec::new(),
trace_id: None,
span_id: None,
logger,
}
}
pub fn field(mut self, key: impl Into<String>, value: impl Into<FieldValue>) -> Self {
self.fields.push((key.into(), value.into()));
self
}
pub fn span_context(mut self, trace_id: u64, span_id: u64) -> Self {
self.trace_id = Some(trace_id);
self.span_id = Some(span_id);
self
}
pub fn emit(self, level: LogLevel, message: impl Into<String>) {
let mut record = LogRecord::new(level, message);
record.fields = self.fields;
record.trace_id = self.trace_id;
record.span_id = self.span_id;
self.logger.log(&record);
}
pub fn info(self, message: impl Into<String>) {
self.emit(LogLevel::Info, message);
}
pub fn warn(self, message: impl Into<String>) {
self.emit(LogLevel::Warn, message);
}
pub fn error(self, message: impl Into<String>) {
self.emit(LogLevel::Error, message);
}
pub fn debug(self, message: impl Into<String>) {
self.emit(LogLevel::Debug, message);
}
}
pub struct StructuredLogger {
min_level: LogLevel,
sinks: RwLock<Vec<Box<dyn LogSink>>>,
}
impl StructuredLogger {
pub fn new(min_level: LogLevel, sinks: Vec<Box<dyn LogSink>>) -> Self {
Self {
min_level,
sinks: RwLock::new(sinks),
}
}
pub fn log(&self, record: &LogRecord) {
if record.level < self.min_level {
return;
}
if let Ok(sinks) = self.sinks.read() {
for sink in sinks.iter() {
sink.write(record);
}
}
}
pub fn add_sink(&self, sink: Box<dyn LogSink>) {
if let Ok(mut sinks) = self.sinks.write() {
sinks.push(sink);
}
}
pub fn with_fields(self: &Arc<Self>, fields: Vec<(String, FieldValue)>) -> LogBuilder {
let mut builder = LogBuilder::new(Arc::clone(self));
builder.fields = fields;
builder
}
pub fn builder(self: &Arc<Self>) -> LogBuilder {
LogBuilder::new(Arc::clone(self))
}
pub fn info(&self, msg: &str) {
let record = LogRecord::new(LogLevel::Info, msg);
self.log(&record);
}
pub fn warn(&self, msg: &str) {
let record = LogRecord::new(LogLevel::Warn, msg);
self.log(&record);
}
pub fn error(&self, msg: &str) {
let record = LogRecord::new(LogLevel::Error, msg);
self.log(&record);
}
pub fn debug(&self, msg: &str) {
let record = LogRecord::new(LogLevel::Debug, msg);
self.log(&record);
}
pub fn trace(&self, msg: &str) {
let record = LogRecord::new(LogLevel::Trace, msg);
self.log(&record);
}
}
pub fn init_logger(_config: TraceConfig) -> Arc<StructuredLogger> {
Arc::new(StructuredLogger::new(
LogLevel::Info,
vec![Box::new(ConsoleSink::new())],
))
}
pub(crate) fn now_ns() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_nanos() as u64)
.unwrap_or(0)
}
#[cfg(test)]
mod tests {
use super::*;
fn make_logger(level: LogLevel) -> (Arc<StructuredLogger>, Arc<MemorySink>) {
let sink = Arc::new(MemorySink::new());
struct SharedSink(Arc<MemorySink>);
impl LogSink for SharedSink {
fn write(&self, record: &LogRecord) {
self.0.write(record);
}
}
let logger = Arc::new(StructuredLogger::new(
level,
vec![Box::new(SharedSink(Arc::clone(&sink)))],
));
(logger, sink)
}
#[test]
fn test_log_record_fields() {
let record = LogRecord::new(LogLevel::Info, "test").with_field("key", FieldValue::Int(42));
assert_eq!(record.message, "test");
assert_eq!(record.fields.len(), 1);
assert_eq!(record.fields[0].0, "key");
}
#[test]
fn test_memory_sink_captures() {
let (logger, sink) = make_logger(LogLevel::Debug);
logger.info("hello");
logger.warn("world");
let recs = sink.records();
assert_eq!(recs.len(), 2);
assert_eq!(recs[0].message, "hello");
assert_eq!(recs[1].level, LogLevel::Warn);
}
#[test]
fn test_log_level_filter() {
let (logger, sink) = make_logger(LogLevel::Warn);
logger.debug("should be filtered");
logger.info("also filtered");
logger.warn("passes");
let recs = sink.records();
assert_eq!(recs.len(), 1);
assert_eq!(recs[0].message, "passes");
}
#[test]
fn test_log_builder_fluent() {
let (logger, sink) = make_logger(LogLevel::Debug);
logger
.builder()
.field("user", "alice")
.field("count", 7i64)
.info("user logged in");
let recs = sink.records();
assert_eq!(recs.len(), 1);
assert_eq!(recs[0].fields.len(), 2);
}
#[test]
fn test_console_sink_json_format() {
let record = LogRecord::new(LogLevel::Info, "msg").with_field("k", FieldValue::Int(1));
let json = ConsoleSink::format_json(&record);
assert!(json.contains("\"level\":\"INFO\""));
assert!(json.contains("\"msg\":\"msg\""));
assert!(json.contains("\"k\":1"));
}
#[test]
fn test_otlp_log_conversion() {
let record =
LogRecord::new(LogLevel::Error, "boom").with_field("code", FieldValue::Int(500));
let otlp = OtelLogSink::to_otlp(&record);
assert_eq!(otlp.severity_text, "ERROR");
assert_eq!(otlp.severity_number, 17);
assert_eq!(otlp.body, "boom");
assert!(!otlp.attributes.is_empty());
}
}