use chrono::Utc;
use crossbeam_channel::{Receiver, Sender};
use std::cell::RefCell;
use std::sync::Arc;
use std::thread;
use std::time::Duration;
use tracing::{Event, Level, Subscriber};
use tracing_subscriber::layer::Context;
use tracing_subscriber::Layer;
use crate::core::Value;
use crate::storage::mvcc::engine::MVCCEngine;
use crate::storage::traits::Engine;
thread_local! {
pub static IS_LOG_FLUSHER: RefCell<bool> = const { RefCell::new(false) };
}
#[derive(Debug, Clone)]
pub struct LogEntry {
pub level: String,
pub target: String,
pub message: String,
pub timestamp: chrono::DateTime<Utc>,
}
pub struct InternalLogLayer {
sender: Sender<LogEntry>,
}
impl InternalLogLayer {
pub fn new(sender: Sender<LogEntry>) -> Self {
Self { sender }
}
}
impl<S> Layer<S> for InternalLogLayer
where
S: Subscriber,
{
fn on_event(&self, event: &Event<'_>, _ctx: Context<'_, S>) {
let is_flusher = IS_LOG_FLUSHER.with(|f| *f.borrow());
if is_flusher {
return;
}
let level = event.metadata().level();
if level > &Level::INFO {
return;
}
let mut visitor = LogVisitor::default();
event.record(&mut visitor);
let entry = LogEntry {
level: level.to_string(),
target: event.metadata().target().to_string(),
message: visitor.message,
timestamp: Utc::now(),
};
let _ = self.sender.try_send(entry);
}
}
#[derive(Default)]
struct LogVisitor {
message: String,
}
impl tracing::field::Visit for LogVisitor {
fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) {
if field.name() == "message" {
self.message = format!("{:?}", value);
if self.message.starts_with('"') && self.message.ends_with('"') {
self.message = self.message[1..self.message.len() - 1].to_string();
}
}
}
}
pub fn start_log_flusher(
engine: Arc<MVCCEngine>,
receiver: Receiver<LogEntry>,
) -> Arc<std::sync::atomic::AtomicBool> {
let shutdown_flag = Arc::new(std::sync::atomic::AtomicBool::new(false));
let flag_clone = Arc::clone(&shutdown_flag);
thread::Builder::new()
.name("oxibase-log-flusher".to_string())
.spawn(move || {
IS_LOG_FLUSHER.with(|f| *f.borrow_mut() = true);
let batch_size = 100;
let timeout = Duration::from_secs(1);
loop {
if flag_clone.load(std::sync::atomic::Ordering::Relaxed) {
break;
}
let mut entries = Vec::new();
match receiver.recv_timeout(timeout) {
Ok(entry) => {
entries.push(entry);
while entries.len() < batch_size {
match receiver.try_recv() {
Ok(entry) => entries.push(entry),
Err(_) => break,
}
}
}
Err(_) => continue, }
if entries.is_empty() {
continue;
}
if let Err(e) = insert_log_batch(&engine, &entries) {
eprintln!("Failed to flush internal logs: {}", e);
}
}
})
.expect("Failed to spawn log flusher thread");
shutdown_flag
}
fn insert_log_batch(engine: &MVCCEngine, entries: &[LogEntry]) -> crate::core::Result<()> {
let mut tx = engine.begin_transaction()?;
let mut table = match tx.get_table("system.logs") {
Ok(t) => t,
Err(_) => {
tx.rollback()?;
return Ok(());
}
};
for entry in entries {
let ts_value = Value::Timestamp(entry.timestamp);
let level_value = Value::Text(entry.level.clone().into());
let target_value = Value::Text(entry.target.clone().into());
let msg_value = Value::Text(entry.message.clone().into());
let json_value = Value::null_unknown();
let row = vec![
Value::null_unknown(), ts_value, level_value, target_value, msg_value, json_value, ];
table.insert(row.into())?;
}
tx.commit()?;
Ok(())
}