logforth_append_async/
append.rs

1// Copyright 2024 FastLabs Developers
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use logforth_core::Append;
18use logforth_core::Diagnostic;
19use logforth_core::Error;
20use logforth_core::Trap;
21use logforth_core::kv;
22use logforth_core::kv::Visitor;
23use logforth_core::record::Record;
24use logforth_core::trap::BestEffortTrap;
25
26use crate::Overflow;
27use crate::Task;
28use crate::state::AsyncState;
29use crate::worker::Worker;
30
31/// A composable appender, logging and flushing asynchronously.
32#[derive(Debug)]
33pub struct Async {
34    appends: Arc<[Box<dyn Append>]>,
35    overflow: Overflow,
36    state: AsyncState,
37    trap: Arc<dyn Trap>,
38}
39
40impl Append for Async {
41    fn append(&self, record: &Record, diags: &[Box<dyn Diagnostic>]) -> Result<(), Error> {
42        let mut diagnostics = vec![];
43
44        let mut collector = DiagnosticCollector(&mut diagnostics);
45        for d in diags {
46            d.visit(&mut collector)?;
47        }
48
49        let overflow = self.overflow;
50        let task = Task::Log {
51            appends: self.appends.clone(),
52            record: Box::new(record.to_owned()),
53            diags: diagnostics,
54        };
55        self.state.send_task(task, overflow)
56    }
57
58    fn flush(&self) -> Result<(), Error> {
59        let overflow = self.overflow;
60        let task = Task::Flush {
61            appends: self.appends.clone(),
62        };
63        self.state.send_task(task, overflow)
64    }
65
66    fn exit(&self) -> Result<(), Error> {
67        // If the program is tearing down, this will be the final flush. `crossbeam`
68        // uses thread-local internally, which is not supported in `atexit` callback.
69        // This can be bypassed by flushing sinks directly on the current thread, but
70        // before we do that we have to join the thread to ensure that any pending log
71        // tasks are completed.
72        //
73        // @see https://github.com/SpriteOvO/spdlog-rs/issues/64
74        self.state.destroy();
75        for append in self.appends.iter() {
76            if let Err(err) = append.exit() {
77                self.trap.trap(&err);
78            }
79        }
80        Ok(())
81    }
82}
83
84/// A builder for configuring an async appender.
85pub struct AsyncBuilder {
86    thread_name: String,
87    appends: Vec<Box<dyn Append>>,
88    buffered_lines_limit: Option<usize>,
89    trap: Arc<dyn Trap>,
90    overflow: Overflow,
91}
92
93impl AsyncBuilder {
94    /// Create a new async appender builder.
95    pub fn new(thread_name: impl Into<String>) -> AsyncBuilder {
96        AsyncBuilder {
97            thread_name: thread_name.into(),
98            appends: vec![],
99            buffered_lines_limit: None,
100            trap: Arc::new(BestEffortTrap::default()),
101            overflow: Overflow::Block,
102        }
103    }
104
105    /// Set the buffer size of pending messages.
106    pub fn buffered_lines_limit(mut self, buffered_lines_limit: Option<usize>) -> Self {
107        self.buffered_lines_limit = buffered_lines_limit;
108        self
109    }
110
111    /// Set the overflow policy to block when the buffer is full.
112    pub fn overflow_block(mut self) -> Self {
113        self.overflow = Overflow::Block;
114        self
115    }
116
117    /// Set the overflow policy to drop incoming messages when the buffer is full.
118    pub fn overflow_drop_incoming(mut self) -> Self {
119        self.overflow = Overflow::DropIncoming;
120        self
121    }
122
123    /// Set the trap for this async appender.
124    pub fn trap(mut self, trap: impl Into<Box<dyn Trap>>) -> Self {
125        let trap = trap.into();
126        self.trap = trap.into();
127        self
128    }
129
130    /// Add an appender to this async appender.
131    pub fn append(mut self, append: impl Into<Box<dyn Append>>) -> Self {
132        self.appends.push(append.into());
133        self
134    }
135
136    /// Build the async appender.
137    pub fn build(self) -> Async {
138        let Self {
139            thread_name,
140            appends,
141            buffered_lines_limit,
142            trap,
143            overflow,
144        } = self;
145
146        let appends = appends.into_boxed_slice().into();
147
148        let (sender, receiver) = match buffered_lines_limit {
149            Some(limit) => crossbeam_channel::bounded(limit),
150            None => crossbeam_channel::unbounded(),
151        };
152
153        let worker = Worker::new(receiver, trap.clone());
154        let thread_handle = std::thread::Builder::new()
155            .name(thread_name)
156            .spawn(move || worker.run())
157            .expect("failed to spawn async appender thread");
158        let state = AsyncState::new(sender, thread_handle);
159
160        Async {
161            appends,
162            overflow,
163            state,
164            trap,
165        }
166    }
167}
168
169struct DiagnosticCollector<'a>(&'a mut Vec<(kv::KeyOwned, kv::ValueOwned)>);
170
171impl<'a> Visitor for DiagnosticCollector<'a> {
172    fn visit(&mut self, key: kv::Key, value: kv::Value) -> Result<(), Error> {
173        self.0.push((key.to_owned(), value.to_owned()));
174        Ok(())
175    }
176}