use std::collections::VecDeque;
use std::sync::Arc;
use chrono::{DateTime, Utc};
use parking_lot::RwLock;
use serde::Serialize;
use tracing::field::{Field, Visit};
use tracing::span;
use tracing_subscriber::Layer;
use tracing_subscriber::layer::Context;
const MAX_LOG_ENTRIES: usize = 2000;
#[derive(Debug, Clone, Serialize, serde::Deserialize)]
pub struct LogEntry {
pub timestamp: DateTime<Utc>,
pub level: String,
pub target: String,
pub message: String,
pub job_id: Option<String>,
pub seq: u64,
}
#[derive(Clone)]
pub struct LogBuffer {
inner: Arc<RwLock<LogBufferInner>>,
}
struct LogBufferInner {
entries: VecDeque<LogEntry>,
next_seq: u64,
}
impl Default for LogBuffer {
fn default() -> Self {
Self::new()
}
}
impl LogBuffer {
pub fn new() -> Self {
Self {
inner: Arc::new(RwLock::new(LogBufferInner {
entries: VecDeque::with_capacity(MAX_LOG_ENTRIES),
next_seq: 0,
})),
}
}
fn push(&self, mut entry: LogEntry) {
let mut inner = self.inner.write();
entry.seq = inner.next_seq;
inner.next_seq += 1;
if inner.entries.len() >= MAX_LOG_ENTRIES {
inner.entries.pop_front();
}
inner.entries.push_back(entry);
}
pub fn get_entries(
&self,
job_id: Option<&str>,
after_seq: Option<u64>,
level: Option<&str>,
limit: usize,
) -> Vec<LogEntry> {
let inner = self.inner.read();
inner
.entries
.iter()
.filter(|e| {
if let Some(after) = after_seq
&& e.seq <= after
{
return false;
}
if let Some(jid) = job_id
&& e.job_id.as_deref() != Some(jid)
{
return false;
}
if let Some(lvl) = level
&& !e.level.eq_ignore_ascii_case(lvl)
{
return false;
}
true
})
.rev()
.take(limit)
.cloned()
.collect::<Vec<_>>()
.into_iter()
.rev()
.collect()
}
pub fn latest_seq(&self) -> u64 {
let inner = self.inner.read();
if inner.next_seq > 0 {
inner.next_seq - 1
} else {
0
}
}
}
struct FieldVisitor {
message: String,
job_id: Option<String>,
}
impl Visit for FieldVisitor {
fn record_debug(&mut self, field: &Field, value: &dyn std::fmt::Debug) {
if field.name() == "message" {
self.message = format!("{value:?}");
} else if field.name() == "job_id" {
self.job_id = Some(format!("{value:?}"));
} else {
if !self.message.is_empty() {
self.message.push(' ');
}
self.message
.push_str(&format!("{}={:?}", field.name(), value));
}
}
fn record_str(&mut self, field: &Field, value: &str) {
if field.name() == "message" {
self.message = value.to_string();
} else if field.name() == "job_id" {
self.job_id = Some(value.to_string());
} else {
if !self.message.is_empty() {
self.message.push(' ');
}
self.message
.push_str(&format!("{}={}", field.name(), value));
}
}
}
pub struct LogBufferLayer {
buffer: LogBuffer,
}
impl LogBufferLayer {
pub fn new(buffer: LogBuffer) -> Self {
Self { buffer }
}
}
impl<S> Layer<S> for LogBufferLayer
where
S: tracing::Subscriber + for<'a> tracing_subscriber::registry::LookupSpan<'a>,
{
fn on_event(&self, event: &tracing::Event<'_>, ctx: Context<'_, S>) {
let metadata = event.metadata();
let mut visitor = FieldVisitor {
message: String::new(),
job_id: None,
};
event.record(&mut visitor);
if visitor.job_id.is_none()
&& let Some(scope) = ctx.event_scope(event)
{
for span in scope {
let extensions = span.extensions();
if let Some(fields) = extensions.get::<SpanFields>()
&& visitor.job_id.is_none()
{
visitor.job_id = fields.job_id.clone();
}
}
}
let entry = LogEntry {
timestamp: Utc::now(),
level: metadata.level().to_string(),
target: metadata.target().to_string(),
message: visitor.message,
job_id: visitor.job_id,
seq: 0, };
self.buffer.push(entry);
}
fn on_new_span(&self, attrs: &span::Attributes<'_>, id: &span::Id, ctx: Context<'_, S>) {
let mut visitor = FieldVisitor {
message: String::new(),
job_id: None,
};
attrs.record(&mut visitor);
if visitor.job_id.is_some()
&& let Some(span) = ctx.span(id)
{
let mut extensions = span.extensions_mut();
extensions.insert(SpanFields {
job_id: visitor.job_id,
});
}
}
}
struct SpanFields {
job_id: Option<String>,
}