use std::{
io::{self, IsTerminal as _},
sync::Arc,
time::Duration,
};
use metrics::increment_counter;
use tokio::{
fs::{File, OpenOptions},
io::AsyncWriteExt,
};
use tracing::Metadata;
use elfo_core::{
message,
messages::{ConfigUpdated, Terminate},
msg,
signal::{Signal, SignalKind},
ActorGroup, Blueprint, Context, RestartParams, RestartPolicy, TerminationPolicy,
};
use crate::{
config::{Colorization, Config, Sink},
formatters::Formatter,
line_buffer::LineBuffer,
line_transaction::{FailOnUnfit, Line as _, LineFactory, TruncateOnUnfit},
scope_filter::ScopeFilter,
theme, PreparedEvent, Shared,
};
pub(crate) struct Logger {
ctx: Context<Config>,
shared: Arc<Shared>,
scope_filter: ScopeFilter,
buffer: LineBuffer,
}
#[message]
#[derive(Default)]
#[non_exhaustive]
pub struct ReopenLogFile {}
impl Logger {
#[allow(clippy::new_ret_no_self)]
pub(crate) fn blueprint(shared: Arc<Shared>, scope_filter: ScopeFilter) -> Blueprint {
ActorGroup::new()
.config::<Config>()
.termination_policy(TerminationPolicy::manually())
.restart_policy(RestartPolicy::on_failure(RestartParams::new(
Duration::from_secs(5),
Duration::from_secs(30),
)))
.stop_order(105)
.exec(move |ctx| Logger::new(ctx, shared.clone(), scope_filter.clone()).main())
}
fn new(ctx: Context<Config>, shared: Arc<Shared>, scope_filter: ScopeFilter) -> Self {
scope_filter.configure(&ctx.config().targets);
let buffer = LineBuffer::with_capacity(1024, {
let cfg = ctx.config();
cfg.max_line_size.0 as _
});
Self {
ctx,
shared,
scope_filter,
buffer,
}
}
async fn main(mut self) {
let mut file = open_file(self.ctx.config()).await;
let mut use_colors = can_use_colors(self.ctx.config());
self.ctx.attach(Signal::new(
SignalKind::UnixHangup,
ReopenLogFile::default(),
));
loop {
tokio::select! {
event = self.shared.channel.receive() => {
let event = ward!(event, break);
self.buffer.clear();
self.format_event(use_colors, event);
if let Some(file) = file.as_mut() {
file.write_all(self.buffer.as_str().as_bytes()).await.expect("cannot write to the config file");
} else {
print!("{}", self.buffer.as_str());
}
increment_counter!("elfo_written_events_total");
},
envelope = self.ctx.recv() => {
let envelope = ward!(envelope, break);
msg!(match envelope {
ReopenLogFile => {
file = open_file(self.ctx.config()).await;
use_colors = can_use_colors(self.ctx.config());
},
ConfigUpdated => {
file = open_file(self.ctx.config()).await;
use_colors = can_use_colors(self.ctx.config());
self.scope_filter.configure(&self.ctx.config().targets);
self.buffer.configure(self.ctx.config().max_line_size.0 as _);
},
Terminate => {
self.shared.channel.close();
},
});
},
}
}
if let Some(mut file) = file {
file.flush().await.expect("cannot flush the log file");
file.sync_all().await.expect("cannot sync the log file");
}
}
fn format_event(&mut self, use_colors: bool, event: PreparedEvent) {
let successful = if use_colors {
self.do_format_event::<theme::ColoredTheme, FailOnUnfit>(&event)
|| self.do_format_event::<theme::ColoredTheme, TruncateOnUnfit>(&event)
} else {
self.do_format_event::<theme::PlainTheme, FailOnUnfit>(&event)
|| self.do_format_event::<theme::PlainTheme, TruncateOnUnfit>(&event)
};
if successful {
self.shared.pool.clear(event.payload_id);
} else {
unreachable!("truncation must succeed")
}
}
fn do_format_event<T: theme::Theme, F: LineFactory>(&mut self, event: &PreparedEvent) -> bool {
let config = self.ctx.config();
let mut line = F::create_line(&mut self.buffer);
let payload = self
.shared
.pool
.get(event.payload_id)
.expect("unknown string");
T::Timestamp::fmt(line.meta_mut(), &event.timestamp);
line.meta_mut().push(' ');
T::Level::fmt(line.meta_mut(), event.metadata.level());
line.meta_mut().push_str(" [");
T::TraceId::fmt(line.meta_mut(), &event.trace_id);
line.meta_mut().push_str("] ");
T::ActorMeta::fmt(line.payload_mut(), &event.object);
line.payload_mut().push_str(" - ");
T::Payload::fmt(line.payload_mut(), &payload);
let mut span_id = event.span_id.clone();
{
let payload_buffer = line.payload_mut();
while let Some(data) = span_id
.as_ref()
.and_then(|span_id| self.shared.spans.get(span_id))
{
span_id.clone_from(&data.parent_id);
let payload = self
.shared
.pool
.get(data.payload_id)
.expect("unknown string");
T::Payload::fmt(payload_buffer, &payload);
}
}
if config.format.with_location {
if let Some(location) = extract_location(event.metadata) {
let fields_buffer = line.fields_mut();
fields_buffer.push('\t');
T::Location::fmt(line.fields_mut(), &location);
}
}
if config.format.with_module {
if let Some(module) = event.metadata.module_path() {
let fields_buffer = line.fields_mut();
fields_buffer.push('\t');
T::Module::fmt(fields_buffer, module);
}
}
line.try_commit()
}
}
async fn open_file(config: &Config) -> Option<File> {
if config.sink == Sink::Stdout {
return None;
}
let path = config
.path
.as_ref()
.expect("the config path must be provided");
let file = OpenOptions::new()
.create(true)
.append(true)
.open(path)
.await
.expect("cannot open the config file");
Some(file)
}
fn can_use_colors(config: &Config) -> bool {
match config.format.colorization {
Colorization::Always => true,
Colorization::Never => false,
Colorization::Auto => config.sink == Sink::Stdout && io::stdout().is_terminal(),
}
}
fn extract_location(metadata: &Metadata<'static>) -> Option<(&'static str, u32)> {
metadata
.file()
.map(|file| (file, metadata.line().unwrap_or_default()))
}