logforth_append_async/
append.rs1use logforth_core::Append;
16use logforth_core::Diagnostic;
17use logforth_core::Error;
18use logforth_core::Trap;
19use logforth_core::kv;
20use logforth_core::kv::Visitor;
21use logforth_core::record::Record;
22use logforth_core::trap::BestEffortTrap;
23
24use crate::Overflow;
25use crate::Task;
26use crate::channel::channel;
27use crate::state::AsyncState;
28use crate::worker::Worker;
29
30#[derive(Debug)]
54pub struct Async {
55 state: AsyncState,
56}
57
58impl Append for Async {
59 fn append(&self, record: &Record, diags: &[Box<dyn Diagnostic>]) -> Result<(), Error> {
60 let mut diagnostics = vec![];
61
62 let mut collector = DiagnosticCollector(&mut diagnostics);
63 for d in diags {
64 d.visit(&mut collector)?;
65 }
66
67 let task = Task::Log {
68 record: Box::new(record.to_owned()),
69 diags: diagnostics,
70 };
71 self.state.send_task(task)
72 }
73
74 fn flush(&self) -> Result<(), Error> {
75 let (done_tx, done_rx) = oneshot::channel();
76
77 let task = Task::Flush { done: done_tx };
78 self.state.send_task(task)?;
79
80 done_rx
81 .recv()
82 .map_err(|err| Error::new("worker exited before completing flush").with_source(err))
83 }
84}
85
86pub struct AsyncBuilder {
88 thread_name: String,
89 appends: Vec<Box<dyn Append>>,
90 buffered_lines_limit: Option<usize>,
91 trap: Box<dyn Trap>,
92 overflow: Overflow,
93}
94
95impl AsyncBuilder {
96 pub fn new(thread_name: impl Into<String>) -> AsyncBuilder {
98 AsyncBuilder {
99 thread_name: thread_name.into(),
100 appends: vec![],
101 buffered_lines_limit: None,
102 trap: Box::new(BestEffortTrap::default()),
103 overflow: Overflow::Block,
104 }
105 }
106
107 pub fn buffered_lines_limit(mut self, buffered_lines_limit: Option<usize>) -> Self {
109 self.buffered_lines_limit = buffered_lines_limit;
110 self
111 }
112
113 pub fn overflow_block(mut self) -> Self {
115 self.overflow = Overflow::Block;
116 self
117 }
118
119 pub fn overflow_drop_incoming(mut self) -> Self {
121 self.overflow = Overflow::DropIncoming;
122 self
123 }
124
125 pub fn trap(mut self, trap: impl Into<Box<dyn Trap>>) -> Self {
127 self.trap = trap.into();
128 self
129 }
130
131 pub fn append(mut self, append: impl Into<Box<dyn Append>>) -> Self {
133 self.appends.push(append.into());
134 self
135 }
136
137 pub fn build(self) -> Async {
139 let Self {
140 thread_name,
141 appends,
142 buffered_lines_limit,
143 trap,
144 overflow,
145 } = self;
146
147 let (sender, receiver) = channel(buffered_lines_limit);
148 let worker = Worker::new(appends, receiver, trap);
149 let thread_handle = std::thread::Builder::new()
150 .name(thread_name)
151 .spawn(move || worker.run())
152 .expect("failed to spawn async appender thread");
153
154 let state = AsyncState::new(overflow, sender, thread_handle);
155 Async { state }
156 }
157}
158
159struct DiagnosticCollector<'a>(&'a mut Vec<(kv::KeyOwned, kv::ValueOwned)>);
160
161impl<'a> Visitor for DiagnosticCollector<'a> {
162 fn visit(&mut self, key: kv::KeyView, value: kv::ValueView) -> Result<(), Error> {
163 self.0.push((key.to_owned(), value.to_owned()));
164 Ok(())
165 }
166}