use chrono::Utc;
use crossbeam_channel::{Receiver, Sender};
use std::cell::RefCell;
use std::sync::Arc;
use std::thread;
use std::time::Duration;
use tracing::{Event, Level, Subscriber};
use tracing_subscriber::layer::Context;
use tracing_subscriber::Layer;
use crate::core::Value;
use crate::storage::mvcc::engine::MVCCEngine;
use crate::storage::traits::Engine;
thread_local! {
pub static IS_LOG_FLUSHER: RefCell<bool> = const { RefCell::new(false) };
}
#[derive(Debug, Clone)]
pub struct LogEntry {
pub level: String,
pub target: String,
pub message: String,
pub timestamp: chrono::DateTime<Utc>,
pub trace_id: Option<String>,
pub span_id: Option<String>,
pub json_fields: Option<Vec<(String, String)>>,
}
pub struct InternalLogLayer {
sender: Sender<LogEntry>,
}
impl InternalLogLayer {
pub fn new(sender: Sender<LogEntry>) -> Self {
Self { sender }
}
}
impl<S> Layer<S> for InternalLogLayer
where
S: Subscriber + for<'a> tracing_subscriber::registry::LookupSpan<'a>,
{
fn on_event(&self, event: &Event<'_>, _ctx: Context<'_, S>) {
let is_flusher = IS_LOG_FLUSHER.with(|f| *f.borrow());
if is_flusher {
return;
}
let level = event.metadata().level();
if level > &Level::INFO {
return;
}
let mut visitor = LogVisitor::default();
event.record(&mut visitor);
let mut trace_id = None;
let mut span_id = None;
if let Some(span) = _ctx.lookup_current() {
let ext = span.extensions();
if let Some(data) = ext.get::<(
std::time::Instant,
chrono::DateTime<Utc>,
String,
String,
Vec<(String, String)>,
String,
String,
)>() {
trace_id = Some(data.5.clone());
span_id = Some(data.6.clone());
}
}
let json_fields = if visitor.attributes.is_empty() {
None
} else {
Some(visitor.attributes)
};
let entry = LogEntry {
level: level.to_string(),
target: event.metadata().target().to_string(),
message: visitor.message,
timestamp: Utc::now(),
trace_id,
span_id,
json_fields,
};
let _ = self.sender.try_send(entry);
}
}
#[derive(Default)]
struct LogVisitor {
message: String,
attributes: Vec<(String, String)>,
}
impl tracing::field::Visit for LogVisitor {
fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) {
if field.name() == "message" {
self.message = format!("{:?}", value);
if self.message.starts_with('"') && self.message.ends_with('"') {
self.message = self.message[1..self.message.len() - 1].to_string();
}
} else {
let val = format!("{:?}", value);
let val_str = if val.starts_with('"') && val.ends_with('"') {
val[1..val.len() - 1].to_string()
} else {
val
};
self.attributes.push((field.name().to_string(), val_str));
}
}
fn record_f64(&mut self, field: &tracing::field::Field, value: f64) {
self.attributes
.push((field.name().to_string(), value.to_string()));
}
fn record_i64(&mut self, field: &tracing::field::Field, value: i64) {
self.attributes
.push((field.name().to_string(), value.to_string()));
}
fn record_u64(&mut self, field: &tracing::field::Field, value: u64) {
self.attributes
.push((field.name().to_string(), value.to_string()));
}
fn record_bool(&mut self, field: &tracing::field::Field, value: bool) {
self.attributes
.push((field.name().to_string(), value.to_string()));
}
fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
if field.name() == "message" {
self.message = value.to_string();
} else {
self.attributes
.push((field.name().to_string(), value.to_string()));
}
}
}
pub fn start_log_flusher(
engine: Arc<MVCCEngine>,
receiver: Receiver<LogEntry>,
) -> (
Arc<std::sync::atomic::AtomicBool>,
std::thread::JoinHandle<()>,
) {
let shutdown_flag = Arc::new(std::sync::atomic::AtomicBool::new(false));
let flag_clone = Arc::clone(&shutdown_flag);
let handle = thread::Builder::new()
.name("oxibase-log-flusher".to_string())
.spawn(move || {
IS_LOG_FLUSHER.with(|f| *f.borrow_mut() = true);
crate::common::tracing::IS_TELEMETRY_THREAD.with(|f| *f.borrow_mut() = true);
crate::common::metrics::IS_METRICS_THREAD.with(|f| *f.borrow_mut() = true);
let batch_size = 100;
let timeout = Duration::from_secs(1);
loop {
if flag_clone.load(std::sync::atomic::Ordering::Relaxed) {
break;
}
let mut entries = Vec::new();
match receiver.recv_timeout(timeout) {
Ok(entry) => {
entries.push(entry);
while entries.len() < batch_size {
match receiver.try_recv() {
Ok(entry) => entries.push(entry),
Err(_) => break,
}
}
}
Err(_) => continue, }
if entries.is_empty() {
continue;
}
if let Err(e) = insert_log_batch(&engine, &entries) {
eprintln!("Failed to flush internal logs: {}", e);
}
}
})
.expect("Failed to spawn log flusher thread");
(shutdown_flag, handle)
}
fn insert_log_batch(engine: &MVCCEngine, entries: &[LogEntry]) -> crate::core::Result<()> {
let mut tx = engine.begin_transaction()?;
let mut table = match tx.get_table("system.logs") {
Ok(t) => t,
Err(_) => {
tx.rollback()?;
return Ok(());
}
};
for entry in entries {
let ts_value = Value::Timestamp(entry.timestamp);
let level_value = Value::Text(entry.level.clone().into());
let target_value = Value::Text(entry.target.clone().into());
let msg_value = Value::Text(entry.message.clone().into());
let json_value =
entry
.json_fields
.clone()
.map_or(Value::Null(crate::core::DataType::Text), |json| {
let mut map = serde_json::Map::new();
for (k, v) in json {
map.insert(k, serde_json::Value::String(v));
}
Value::Text(
serde_json::to_string(&map)
.unwrap_or_else(|_| "{}".to_string())
.into(),
)
});
let trace_id_value = entry
.trace_id
.clone()
.map_or(Value::Null(crate::core::DataType::Text), |id| {
Value::Text(id.into())
});
let span_id_value = entry
.span_id
.clone()
.map_or(Value::Null(crate::core::DataType::Text), |id| {
Value::Text(id.into())
});
let row = vec![
Value::null_unknown(), ts_value, level_value, target_value, msg_value, json_value, trace_id_value, span_id_value, ];
table.insert(row.into())?;
}
tx.commit()?;
Ok(())
}