1#![warn(missing_docs)]
3
4extern crate slog;
5extern crate thread_local;
6
7use slog::Drain;
8
9use std::sync::{mpsc, Mutex};
10use std::fmt;
11use std::{io, thread};
12use slog::{Record, RecordStatic, Level};
13use slog::ser::{self, Serialize, Serializer};
14
15use slog::OwnedKeyValueList;
16
17
18pub struct Async {
26 ref_sender: Mutex<mpsc::Sender<AsyncMsg>>,
27 tl_sender: thread_local::ThreadLocal<mpsc::Sender<AsyncMsg>>,
28 join: Mutex<Option<thread::JoinHandle<()>>>,
29}
30
31impl Async {
32 pub fn new<D: slog::Drain<Error=slog::Never> + Send + 'static>(drain: D) -> Self {
38 let (tx, rx) = mpsc::channel();
39 let join = thread::spawn(move || {
40 loop {
41 match rx.recv().unwrap() {
42 AsyncMsg::Record(r) => {
43 let rs = RecordStatic {
44 level: r.level,
45 file: r.file,
46 line: r.line,
47 column: r.column,
48 function: r.function,
49 module: r.module,
50 target: &r.target,
51 };
52 let record_values: Vec<_> = r.record_values
53 .iter()
54 .map(|&(k, ref v)| (k, v as &Serialize))
55 .collect();
56
57 drain.log(
58 &Record::new(&rs,
59 format_args!("{}", r.msg),
60 record_values.as_slice()
61 ),
62 &r.logger_values
63 ).unwrap();
64 }
65 AsyncMsg::Finish => return,
66 }
67 }
68 });
69
70 Async{
71 ref_sender: Mutex::new(tx),
72 tl_sender: thread_local::ThreadLocal::new(),
73 join: Mutex::new(Some(join)),
74 }
75 }
76
77 fn get_sender(&self) -> &mpsc::Sender<AsyncMsg> {
78 self.tl_sender.get_or(|| {
79 Box::new(self.ref_sender.lock().unwrap().clone())
81 })
82 }
83
84 fn send(&self, r: AsyncRecord) -> io::Result<()> {
86 let sender = self.get_sender();
87
88 sender.send(AsyncMsg::Record(r))
89 .map_err(|_| io::Error::new(io::ErrorKind::BrokenPipe, "Send failed"))
90 }
91
92}
93
94type RecordValues = Vec<(&'static str, Box<Serialize + Send>)>;
95
96struct ToSendSerializer {
97 record_values: RecordValues,
98}
99
100impl ToSendSerializer {
101 fn new() -> Self {
102 ToSendSerializer { record_values: Vec::new() }
103 }
104
105 fn finish(self) -> RecordValues {
106 self.record_values
107 }
108}
109
110impl Serializer for ToSendSerializer {
111 fn emit_bool(&mut self, key: &'static str, val: bool) -> ser::Result {
112 self.record_values.push((key, Box::new(val)));
113 Ok(())
114 }
115 fn emit_unit(&mut self, key: &'static str) -> ser::Result {
116 self.record_values.push((key, Box::new(())));
117 Ok(())
118 }
119 fn emit_none(&mut self, key: &'static str) -> ser::Result {
120 let val: Option<()> = None;
121 self.record_values.push((key, Box::new(val)));
122 Ok(())
123 }
124 fn emit_char(&mut self, key: &'static str, val: char) -> ser::Result {
125 self.record_values.push((key, Box::new(val)));
126 Ok(())
127 }
128 fn emit_u8(&mut self, key: &'static str, val: u8) -> ser::Result {
129 self.record_values.push((key, Box::new(val)));
130 Ok(())
131 }
132 fn emit_i8(&mut self, key: &'static str, val: i8) -> ser::Result {
133 self.record_values.push((key, Box::new(val)));
134 Ok(())
135 }
136 fn emit_u16(&mut self, key: &'static str, val: u16) -> ser::Result {
137 self.record_values.push((key, Box::new(val)));
138 Ok(())
139 }
140 fn emit_i16(&mut self, key: &'static str, val: i16) -> ser::Result {
141 self.record_values.push((key, Box::new(val)));
142 Ok(())
143 }
144 fn emit_u32(&mut self, key: &'static str, val: u32) -> ser::Result {
145 self.record_values.push((key, Box::new(val)));
146 Ok(())
147 }
148 fn emit_i32(&mut self, key: &'static str, val: i32) -> ser::Result {
149 self.record_values.push((key, Box::new(val)));
150 Ok(())
151 }
152 fn emit_f32(&mut self, key: &'static str, val: f32) -> ser::Result {
153 self.record_values.push((key, Box::new(val)));
154 Ok(())
155 }
156 fn emit_u64(&mut self, key: &'static str, val: u64) -> ser::Result {
157 self.record_values.push((key, Box::new(val)));
158 Ok(())
159 }
160 fn emit_i64(&mut self, key: &'static str, val: i64) -> ser::Result {
161 self.record_values.push((key, Box::new(val)));
162 Ok(())
163 }
164 fn emit_f64(&mut self, key: &'static str, val: f64) -> ser::Result {
165 self.record_values.push((key, Box::new(val)));
166 Ok(())
167 }
168 fn emit_usize(&mut self, key: &'static str, val: usize) -> ser::Result {
169 self.record_values.push((key, Box::new(val)));
170 Ok(())
171 }
172 fn emit_isize(&mut self, key: &'static str, val: isize) -> ser::Result {
173 self.record_values.push((key, Box::new(val)));
174 Ok(())
175 }
176 fn emit_str(&mut self, key: &'static str, val: &str) -> ser::Result {
177 self.record_values.push((key, Box::new(String::from(val))));
178 Ok(())
179 }
180 fn emit_arguments(&mut self, key: &'static str, val: &fmt::Arguments) -> ser::Result {
181 self.record_values.push((key, Box::new(fmt::format(*val))));
182 Ok(())
183 }
184}
185
186
187impl Drain for Async {
188 type Error = io::Error;
189
190 fn log(&self, record: &Record, logger_values: &OwnedKeyValueList) -> io::Result<()> {
191
192 let mut ser = ToSendSerializer::new();
193 for &(k, v) in record.values() {
194 try!(v.serialize(record, k, &mut ser))
195 }
196
197 self.send(AsyncRecord {
198 msg: fmt::format(record.msg()),
199 level: record.level(),
200 file: record.file(),
201 line: record.line(),
202 column: record.column(),
203 function: record.function(),
204 module: record.module(),
205 target: String::from(record.target()),
206 logger_values: logger_values.clone(),
207 record_values: ser.finish(),
208 })
209 }
210}
211
212struct AsyncRecord {
213 msg: String,
214 level: Level,
215 file: &'static str,
216 line: u32,
217 column: u32,
218 function: &'static str,
219 module: &'static str,
220 target: String,
221 logger_values: OwnedKeyValueList,
222 record_values: Vec<(&'static str, Box<Serialize + Send>)>,
223}
224
225enum AsyncMsg {
226 Record(AsyncRecord),
227 Finish,
228}
229
230impl Drop for Async {
231 fn drop(&mut self) {
232 let sender = self.get_sender();
233
234 let _ = sender.send(AsyncMsg::Finish);
235 let _ = self.join.lock().unwrap().take().unwrap().join();
236 }
237}