1use std::{
2 io::{self, IsTerminal as _},
3 sync::Arc,
4 time::Duration,
5};
6
7use metrics::increment_counter;
8use tokio::{
9 fs::{File, OpenOptions},
10 io::AsyncWriteExt,
11};
12use tracing::Metadata;
13
14use elfo_core::{
15 message,
16 messages::{ConfigUpdated, Terminate},
17 msg,
18 signal::{Signal, SignalKind},
19 ActorGroup, Blueprint, Context, RestartParams, RestartPolicy, TerminationPolicy,
20};
21
22use crate::{
23 config::{Colorization, Config, Sink},
24 formatters::Formatter,
25 line_buffer::LineBuffer,
26 line_transaction::{FailOnUnfit, Line as _, LineFactory, TruncateOnUnfit},
27 scope_filter::ScopeFilter,
28 theme, PreparedEvent, Shared,
29};
30
31pub(crate) struct Logger {
32 ctx: Context<Config>,
33 shared: Arc<Shared>,
34 scope_filter: ScopeFilter,
35
36 buffer: LineBuffer,
37}
38
39#[message]
41#[derive(Default)]
42#[non_exhaustive]
43pub struct ReopenLogFile {}
44
45impl Logger {
46 #[allow(clippy::new_ret_no_self)]
48 pub(crate) fn blueprint(shared: Arc<Shared>, scope_filter: ScopeFilter) -> Blueprint {
49 ActorGroup::new()
50 .config::<Config>()
51 .termination_policy(TerminationPolicy::manually())
52 .restart_policy(RestartPolicy::on_failure(RestartParams::new(
53 Duration::from_secs(5),
54 Duration::from_secs(30),
55 )))
56 .stop_order(105)
57 .exec(move |ctx| Logger::new(ctx, shared.clone(), scope_filter.clone()).main())
58 }
59
60 fn new(ctx: Context<Config>, shared: Arc<Shared>, scope_filter: ScopeFilter) -> Self {
61 scope_filter.configure(&ctx.config().targets);
62 let buffer = LineBuffer::with_capacity(1024, {
63 let cfg = ctx.config();
64 cfg.max_line_size.0 as _
65 });
66
67 Self {
68 ctx,
69 shared,
70 scope_filter,
71 buffer,
72 }
73 }
74
75 async fn main(mut self) {
76 let mut file = open_file(self.ctx.config()).await;
77 let mut use_colors = can_use_colors(self.ctx.config());
78
79 self.ctx.attach(Signal::new(
80 SignalKind::UnixHangup,
81 ReopenLogFile::default(),
82 ));
83
84 loop {
87 tokio::select! {
88 event = self.shared.channel.receive() => {
89 let event = ward!(event, break);
90 self.buffer.clear();
91
92 self.format_event(use_colors, event);
93
94 if let Some(file) = file.as_mut() {
95 file.write_all(self.buffer.as_str().as_bytes()).await.expect("cannot write to the config file");
97 } else {
98 print!("{}", self.buffer.as_str());
99 }
100
101 increment_counter!("elfo_written_events_total");
102 },
103 envelope = self.ctx.recv() => {
104 let envelope = ward!(envelope, break);
105 msg!(match envelope {
106 ReopenLogFile => {
107 file = open_file(self.ctx.config()).await;
108 use_colors = can_use_colors(self.ctx.config());
109 },
110 ConfigUpdated => {
111 file = open_file(self.ctx.config()).await;
112 use_colors = can_use_colors(self.ctx.config());
113 self.scope_filter.configure(&self.ctx.config().targets);
114 self.buffer.configure(self.ctx.config().max_line_size.0 as _);
115 },
116 Terminate => {
117 self.shared.channel.close();
119 },
120 });
121 },
122 }
123 }
124
125 if let Some(mut file) = file {
126 file.flush().await.expect("cannot flush the log file");
127 file.sync_all().await.expect("cannot sync the log file");
128 }
129 }
130
131 fn format_event(&mut self, use_colors: bool, event: PreparedEvent) {
132 let successful = if use_colors {
134 self.do_format_event::<theme::ColoredTheme, FailOnUnfit>(&event)
135 || self.do_format_event::<theme::ColoredTheme, TruncateOnUnfit>(&event)
136 } else {
137 self.do_format_event::<theme::PlainTheme, FailOnUnfit>(&event)
138 || self.do_format_event::<theme::PlainTheme, TruncateOnUnfit>(&event)
139 };
140
141 if successful {
142 self.shared.pool.clear(event.payload_id);
143 } else {
144 unreachable!("truncation must succeed")
145 }
146 }
147
148 fn do_format_event<T: theme::Theme, F: LineFactory>(&mut self, event: &PreparedEvent) -> bool {
149 let config = self.ctx.config();
150 let mut line = F::create_line(&mut self.buffer);
151
152 let payload = self
153 .shared
154 .pool
155 .get(event.payload_id)
156 .expect("unknown string");
157
158 T::Timestamp::fmt(line.meta_mut(), &event.timestamp);
161 line.meta_mut().push(' ');
162 T::Level::fmt(line.meta_mut(), event.metadata.level());
163 line.meta_mut().push_str(" [");
164 T::TraceId::fmt(line.meta_mut(), &event.trace_id);
165 line.meta_mut().push_str("] ");
166 T::ActorMeta::fmt(line.payload_mut(), &event.object);
167 line.payload_mut().push_str(" - ");
168 T::Payload::fmt(line.payload_mut(), &payload);
169
170 let mut span_id = event.span_id.clone();
172
173 {
174 let payload_buffer = line.payload_mut();
175 while let Some(data) = span_id
176 .as_ref()
177 .and_then(|span_id| self.shared.spans.get(span_id))
178 {
179 span_id.clone_from(&data.parent_id);
180
181 let payload = self
182 .shared
183 .pool
184 .get(data.payload_id)
185 .expect("unknown string");
186
187 T::Payload::fmt(payload_buffer, &payload);
188 }
189 }
190
191 if config.format.with_location {
192 if let Some(location) = extract_location(event.metadata) {
193 let fields_buffer = line.fields_mut();
194 fields_buffer.push('\t');
195 T::Location::fmt(line.fields_mut(), &location);
196 }
197 }
198
199 if config.format.with_module {
200 if let Some(module) = event.metadata.module_path() {
201 let fields_buffer = line.fields_mut();
202 fields_buffer.push('\t');
203 T::Module::fmt(fields_buffer, module);
204 }
205 }
206
207 line.try_commit()
208 }
209}
210
211async fn open_file(config: &Config) -> Option<File> {
212 if config.sink == Sink::Stdout {
213 return None;
214 }
215
216 let path = config
218 .path
219 .as_ref()
220 .expect("the config path must be provided");
221
222 let file = OpenOptions::new()
223 .create(true)
224 .append(true)
225 .open(path)
226 .await
227 .expect("cannot open the config file");
228
229 Some(file)
230}
231
232fn can_use_colors(config: &Config) -> bool {
233 match config.format.colorization {
234 Colorization::Always => true,
235 Colorization::Never => false,
236 Colorization::Auto => config.sink == Sink::Stdout && io::stdout().is_terminal(),
237 }
238}
239
240fn extract_location(metadata: &Metadata<'static>) -> Option<(&'static str, u32)> {
241 metadata
242 .file()
243 .map(|file| (file, metadata.line().unwrap_or_default()))
244}