Skip to main content

elfo_logger/
actor.rs

1use std::{
2    io::{self, IsTerminal as _},
3    sync::Arc,
4    time::Duration,
5};
6
7use metrics::increment_counter;
8use tokio::{
9    fs::{File, OpenOptions},
10    io::AsyncWriteExt,
11};
12use tracing::Metadata;
13
14use elfo_core::{
15    message,
16    messages::{ConfigUpdated, Terminate},
17    msg,
18    signal::{Signal, SignalKind},
19    ActorGroup, Blueprint, Context, RestartParams, RestartPolicy, TerminationPolicy,
20};
21
22use crate::{
23    config::{Colorization, Config, Sink},
24    formatters::Formatter,
25    line_buffer::LineBuffer,
26    line_transaction::{FailOnUnfit, Line as _, LineFactory, TruncateOnUnfit},
27    scope_filter::ScopeFilter,
28    theme, PreparedEvent, Shared,
29};
30
31pub(crate) struct Logger {
32    ctx: Context<Config>,
33    shared: Arc<Shared>,
34    scope_filter: ScopeFilter,
35
36    buffer: LineBuffer,
37}
38
39/// Reload a log file, usually after rotation.
40#[message]
41#[derive(Default)]
42#[non_exhaustive]
43pub struct ReopenLogFile {}
44
45impl Logger {
46    // TODO: rename it?
47    #[allow(clippy::new_ret_no_self)]
48    pub(crate) fn blueprint(shared: Arc<Shared>, scope_filter: ScopeFilter) -> Blueprint {
49        ActorGroup::new()
50            .config::<Config>()
51            .termination_policy(TerminationPolicy::manually())
52            .restart_policy(RestartPolicy::on_failure(RestartParams::new(
53                Duration::from_secs(5),
54                Duration::from_secs(30),
55            )))
56            .stop_order(105)
57            .exec(move |ctx| Logger::new(ctx, shared.clone(), scope_filter.clone()).main())
58    }
59
60    fn new(ctx: Context<Config>, shared: Arc<Shared>, scope_filter: ScopeFilter) -> Self {
61        scope_filter.configure(&ctx.config().targets);
62        let buffer = LineBuffer::with_capacity(1024, {
63            let cfg = ctx.config();
64            cfg.max_line_size.0 as _
65        });
66
67        Self {
68            ctx,
69            shared,
70            scope_filter,
71            buffer,
72        }
73    }
74
75    async fn main(mut self) {
76        let mut file = open_file(self.ctx.config()).await;
77        let mut use_colors = can_use_colors(self.ctx.config());
78
79        self.ctx.attach(Signal::new(
80            SignalKind::UnixHangup,
81            ReopenLogFile::default(),
82        ));
83
84        // Note that we don't use `elfo::stream::Stream` here intentionally
85        // to avoid cyclic dependences (`Context::recv()` logs all messages).
86        loop {
87            tokio::select! {
88                event = self.shared.channel.receive() => {
89                    let event = ward!(event, break);
90                    self.buffer.clear();
91
92                    self.format_event(use_colors, event);
93
94                    if let Some(file) = file.as_mut() {
95                        // TODO: what about performance here?
96                        file.write_all(self.buffer.as_str().as_bytes()).await.expect("cannot write to the config file");
97                    } else {
98                        print!("{}", self.buffer.as_str());
99                    }
100
101                    increment_counter!("elfo_written_events_total");
102                },
103                envelope = self.ctx.recv() => {
104                    let envelope = ward!(envelope, break);
105                    msg!(match envelope {
106                        ReopenLogFile => {
107                            file = open_file(self.ctx.config()).await;
108                            use_colors = can_use_colors(self.ctx.config());
109                        },
110                        ConfigUpdated => {
111                            file = open_file(self.ctx.config()).await;
112                            use_colors = can_use_colors(self.ctx.config());
113                            self.scope_filter.configure(&self.ctx.config().targets);
114                            self.buffer.configure(self.ctx.config().max_line_size.0 as _);
115                        },
116                        Terminate => {
117                            // Close the channel and wait for the rest of the events.
118                            self.shared.channel.close();
119                        },
120                    });
121                },
122            }
123        }
124
125        if let Some(mut file) = file {
126            file.flush().await.expect("cannot flush the log file");
127            file.sync_all().await.expect("cannot sync the log file");
128        }
129    }
130
131    fn format_event(&mut self, use_colors: bool, event: PreparedEvent) {
132        // boolean operator || is short-circuit
133        let successful = if use_colors {
134            self.do_format_event::<theme::ColoredTheme, FailOnUnfit>(&event)
135                || self.do_format_event::<theme::ColoredTheme, TruncateOnUnfit>(&event)
136        } else {
137            self.do_format_event::<theme::PlainTheme, FailOnUnfit>(&event)
138                || self.do_format_event::<theme::PlainTheme, TruncateOnUnfit>(&event)
139        };
140
141        if successful {
142            self.shared.pool.clear(event.payload_id);
143        } else {
144            unreachable!("truncation must succeed")
145        }
146    }
147
148    fn do_format_event<T: theme::Theme, F: LineFactory>(&mut self, event: &PreparedEvent) -> bool {
149        let config = self.ctx.config();
150        let mut line = F::create_line(&mut self.buffer);
151
152        let payload = self
153            .shared
154            .pool
155            .get(event.payload_id)
156            .expect("unknown string");
157
158        // <timestamp> <level> [<trace_id>] <object> - <message>\t<fields>
159
160        T::Timestamp::fmt(line.meta_mut(), &event.timestamp);
161        line.meta_mut().push(' ');
162        T::Level::fmt(line.meta_mut(), event.metadata.level());
163        line.meta_mut().push_str(" [");
164        T::TraceId::fmt(line.meta_mut(), &event.trace_id);
165        line.meta_mut().push_str("] ");
166        T::ActorMeta::fmt(line.payload_mut(), &event.object);
167        line.payload_mut().push_str(" - ");
168        T::Payload::fmt(line.payload_mut(), &payload);
169
170        // Add ancestors' fields.
171        let mut span_id = event.span_id.clone();
172
173        {
174            let payload_buffer = line.payload_mut();
175            while let Some(data) = span_id
176                .as_ref()
177                .and_then(|span_id| self.shared.spans.get(span_id))
178            {
179                span_id.clone_from(&data.parent_id);
180
181                let payload = self
182                    .shared
183                    .pool
184                    .get(data.payload_id)
185                    .expect("unknown string");
186
187                T::Payload::fmt(payload_buffer, &payload);
188            }
189        }
190
191        if config.format.with_location {
192            if let Some(location) = extract_location(event.metadata) {
193                let fields_buffer = line.fields_mut();
194                fields_buffer.push('\t');
195                T::Location::fmt(line.fields_mut(), &location);
196            }
197        }
198
199        if config.format.with_module {
200            if let Some(module) = event.metadata.module_path() {
201                let fields_buffer = line.fields_mut();
202                fields_buffer.push('\t');
203                T::Module::fmt(fields_buffer, module);
204            }
205        }
206
207        line.try_commit()
208    }
209}
210
211async fn open_file(config: &Config) -> Option<File> {
212    if config.sink == Sink::Stdout {
213        return None;
214    }
215
216    // TODO: rely on deserialize instead.
217    let path = config
218        .path
219        .as_ref()
220        .expect("the config path must be provided");
221
222    let file = OpenOptions::new()
223        .create(true)
224        .append(true)
225        .open(path)
226        .await
227        .expect("cannot open the config file");
228
229    Some(file)
230}
231
232fn can_use_colors(config: &Config) -> bool {
233    match config.format.colorization {
234        Colorization::Always => true,
235        Colorization::Never => false,
236        Colorization::Auto => config.sink == Sink::Stdout && io::stdout().is_terminal(),
237    }
238}
239
240fn extract_location(metadata: &Metadata<'static>) -> Option<(&'static str, u32)> {
241    metadata
242        .file()
243        .map(|file| (file, metadata.line().unwrap_or_default()))
244}