Skip to main content

logforth_append_async/
append.rs

1// Copyright 2024 FastLabs Developers
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use logforth_core::Append;
16use logforth_core::Diagnostic;
17use logforth_core::Error;
18use logforth_core::Trap;
19use logforth_core::kv;
20use logforth_core::kv::Visitor;
21use logforth_core::record::Record;
22use logforth_core::trap::BestEffortTrap;
23
24use crate::Overflow;
25use crate::Task;
26use crate::channel::channel;
27use crate::state::AsyncState;
28use crate::worker::Worker;
29
30/// A composable appender, logging and flushing asynchronously.
31///
32/// # Examples
33///
34/// ```
35/// use logforth_append_async::AsyncBuilder;
36/// use logforth_core::append::Stderr;
37///
38/// let async_append = AsyncBuilder::new("logforth-async-append")
39///     .overflow_drop_incoming()
40///     // for demonstration purposes; in practice, this can be a file appender, etc.
41///     .append(Stderr::default())
42///     .build();
43/// ```
44///
45/// # Caveats
46///
47/// The caller or application should ensure that the `flush` method is called before the program
48/// exits to write out any buffered events, especially when this appender is used in a global
49/// context.
50///
51/// The drop glue will also flush the appender. But, in Rust, static items do not call `drop`
52/// at the end of the program.
53#[derive(Debug)]
54pub struct Async {
55    state: AsyncState,
56}
57
58impl Append for Async {
59    fn append(&self, record: &Record, diags: &[Box<dyn Diagnostic>]) -> Result<(), Error> {
60        let mut diagnostics = vec![];
61
62        let mut collector = DiagnosticCollector(&mut diagnostics);
63        for d in diags {
64            d.visit(&mut collector)?;
65        }
66
67        let task = Task::Log {
68            record: Box::new(record.to_owned()),
69            diags: diagnostics,
70        };
71        self.state.send_task(task)
72    }
73
74    fn flush(&self) -> Result<(), Error> {
75        let (done_tx, done_rx) = oneshot::channel();
76
77        let task = Task::Flush { done: done_tx };
78        self.state.send_task(task)?;
79
80        done_rx
81            .recv()
82            .map_err(|err| Error::new("worker exited before completing flush").with_source(err))
83    }
84}
85
86/// A builder for configuring an async appender.
87pub struct AsyncBuilder {
88    thread_name: String,
89    appends: Vec<Box<dyn Append>>,
90    buffered_lines_limit: Option<usize>,
91    trap: Box<dyn Trap>,
92    overflow: Overflow,
93}
94
95impl AsyncBuilder {
96    /// Create a new async appender builder.
97    pub fn new(thread_name: impl Into<String>) -> AsyncBuilder {
98        AsyncBuilder {
99            thread_name: thread_name.into(),
100            appends: vec![],
101            buffered_lines_limit: None,
102            trap: Box::new(BestEffortTrap::default()),
103            overflow: Overflow::Block,
104        }
105    }
106
107    /// Set the buffer size of pending messages.
108    pub fn buffered_lines_limit(mut self, buffered_lines_limit: Option<usize>) -> Self {
109        self.buffered_lines_limit = buffered_lines_limit;
110        self
111    }
112
113    /// Set the overflow policy to block when the buffer is full.
114    pub fn overflow_block(mut self) -> Self {
115        self.overflow = Overflow::Block;
116        self
117    }
118
119    /// Set the overflow policy to drop incoming messages when the buffer is full.
120    pub fn overflow_drop_incoming(mut self) -> Self {
121        self.overflow = Overflow::DropIncoming;
122        self
123    }
124
125    /// Set the trap for this async appender.
126    pub fn trap(mut self, trap: impl Into<Box<dyn Trap>>) -> Self {
127        self.trap = trap.into();
128        self
129    }
130
131    /// Add an appender to this async appender.
132    pub fn append(mut self, append: impl Into<Box<dyn Append>>) -> Self {
133        self.appends.push(append.into());
134        self
135    }
136
137    /// Build the async appender.
138    pub fn build(self) -> Async {
139        let Self {
140            thread_name,
141            appends,
142            buffered_lines_limit,
143            trap,
144            overflow,
145        } = self;
146
147        let (sender, receiver) = channel(buffered_lines_limit);
148        let worker = Worker::new(appends, receiver, trap);
149        let thread_handle = std::thread::Builder::new()
150            .name(thread_name)
151            .spawn(move || worker.run())
152            .expect("failed to spawn async appender thread");
153
154        let state = AsyncState::new(overflow, sender, thread_handle);
155        Async { state }
156    }
157}
158
159struct DiagnosticCollector<'a>(&'a mut Vec<(kv::KeyOwned, kv::ValueOwned)>);
160
161impl<'a> Visitor for DiagnosticCollector<'a> {
162    fn visit(&mut self, key: kv::KeyView, value: kv::ValueView) -> Result<(), Error> {
163        self.0.push((key.to_owned(), value.to_owned()));
164        Ok(())
165    }
166}