logforth_append_async/
append.rs1use std::sync::Arc;
16
17use logforth_core::Append;
18use logforth_core::Diagnostic;
19use logforth_core::Error;
20use logforth_core::Trap;
21use logforth_core::kv;
22use logforth_core::kv::Visitor;
23use logforth_core::record::Record;
24use logforth_core::trap::BestEffortTrap;
25
26use crate::Overflow;
27use crate::Task;
28use crate::state::AsyncState;
29use crate::worker::Worker;
30
31#[derive(Debug)]
33pub struct Async {
34 appends: Arc<[Box<dyn Append>]>,
35 overflow: Overflow,
36 state: AsyncState,
37 trap: Arc<dyn Trap>,
38}
39
40impl Append for Async {
41 fn append(&self, record: &Record, diags: &[Box<dyn Diagnostic>]) -> Result<(), Error> {
42 let mut diagnostics = vec![];
43
44 let mut collector = DiagnosticCollector(&mut diagnostics);
45 for d in diags {
46 d.visit(&mut collector)?;
47 }
48
49 let overflow = self.overflow;
50 let task = Task::Log {
51 appends: self.appends.clone(),
52 record: Box::new(record.to_owned()),
53 diags: diagnostics,
54 };
55 self.state.send_task(task, overflow)
56 }
57
58 fn flush(&self) -> Result<(), Error> {
59 let overflow = self.overflow;
60 let task = Task::Flush {
61 appends: self.appends.clone(),
62 };
63 self.state.send_task(task, overflow)
64 }
65
66 fn exit(&self) -> Result<(), Error> {
67 self.state.destroy();
75 for append in self.appends.iter() {
76 if let Err(err) = append.exit() {
77 self.trap.trap(&err);
78 }
79 }
80 Ok(())
81 }
82}
83
84pub struct AsyncBuilder {
86 thread_name: String,
87 appends: Vec<Box<dyn Append>>,
88 buffered_lines_limit: Option<usize>,
89 trap: Arc<dyn Trap>,
90 overflow: Overflow,
91}
92
93impl AsyncBuilder {
94 pub fn new(thread_name: impl Into<String>) -> AsyncBuilder {
96 AsyncBuilder {
97 thread_name: thread_name.into(),
98 appends: vec![],
99 buffered_lines_limit: None,
100 trap: Arc::new(BestEffortTrap::default()),
101 overflow: Overflow::Block,
102 }
103 }
104
105 pub fn buffered_lines_limit(mut self, buffered_lines_limit: Option<usize>) -> Self {
107 self.buffered_lines_limit = buffered_lines_limit;
108 self
109 }
110
111 pub fn overflow_block(mut self) -> Self {
113 self.overflow = Overflow::Block;
114 self
115 }
116
117 pub fn overflow_drop_incoming(mut self) -> Self {
119 self.overflow = Overflow::DropIncoming;
120 self
121 }
122
123 pub fn trap(mut self, trap: impl Into<Box<dyn Trap>>) -> Self {
125 let trap = trap.into();
126 self.trap = trap.into();
127 self
128 }
129
130 pub fn append(mut self, append: impl Into<Box<dyn Append>>) -> Self {
132 self.appends.push(append.into());
133 self
134 }
135
136 pub fn build(self) -> Async {
138 let Self {
139 thread_name,
140 appends,
141 buffered_lines_limit,
142 trap,
143 overflow,
144 } = self;
145
146 let appends = appends.into_boxed_slice().into();
147
148 let (sender, receiver) = match buffered_lines_limit {
149 Some(limit) => crossbeam_channel::bounded(limit),
150 None => crossbeam_channel::unbounded(),
151 };
152
153 let worker = Worker::new(receiver, trap.clone());
154 let thread_handle = std::thread::Builder::new()
155 .name(thread_name)
156 .spawn(move || worker.run())
157 .expect("failed to spawn async appender thread");
158 let state = AsyncState::new(sender, thread_handle);
159
160 Async {
161 appends,
162 overflow,
163 state,
164 trap,
165 }
166 }
167}
168
169struct DiagnosticCollector<'a>(&'a mut Vec<(kv::KeyOwned, kv::ValueOwned)>);
170
171impl<'a> Visitor for DiagnosticCollector<'a> {
172 fn visit(&mut self, key: kv::Key, value: kv::Value) -> Result<(), Error> {
173 self.0.push((key.to_owned(), value.to_owned()));
174 Ok(())
175 }
176}