slog_extra/
lib.rs

1//! Standard slog-rs extensions.
2#![warn(missing_docs)]
3
4extern crate slog;
5extern crate thread_local;
6
7use slog::Drain;
8
9use std::sync::{mpsc, Mutex};
10use std::fmt;
11use std::{io, thread};
12use slog::{Record, RecordStatic, Level};
13use slog::ser::{self, Serialize, Serializer};
14
15use slog::OwnedKeyValueList;
16
17
18/// `Async` drain
19///
20/// `Async` will send all the logging records to a wrapped drain running in another thread.
21///
22/// Note: Dropping `Async` waits for it's worker-thread to finish (thus handle all previous
23/// requests). If you can't tolerate the delay, make sure you drop `Async` drain instance eg. in
24/// another thread.
25pub struct Async {
26    ref_sender: Mutex<mpsc::Sender<AsyncMsg>>,
27    tl_sender: thread_local::ThreadLocal<mpsc::Sender<AsyncMsg>>,
28    join: Mutex<Option<thread::JoinHandle<()>>>,
29}
30
31impl Async {
32    /// Create `Async` drain
33    ///
34    /// The wrapped drain must handle all error conditions (`Drain<Error=Never>`). See
35    /// `slog::DrainExt::fuse()` and `slog::DrainExt::ignore_err()` for typical error handling
36    /// strategies.
37    pub fn new<D: slog::Drain<Error=slog::Never> + Send + 'static>(drain: D) -> Self {
38        let (tx, rx) = mpsc::channel();
39        let join = thread::spawn(move || {
40                loop {
41                    match rx.recv().unwrap() {
42                        AsyncMsg::Record(r) => {
43                            let rs = RecordStatic {
44                                level: r.level,
45                                file: r.file,
46                                line: r.line,
47                                column: r.column,
48                                function: r.function,
49                                module: r.module,
50                                target: &r.target,
51                            };
52                            let record_values: Vec<_> = r.record_values
53                                .iter()
54                                .map(|&(k, ref v)| (k, v as &Serialize))
55                                .collect();
56
57                            drain.log(
58                                &Record::new(&rs,
59                                             format_args!("{}", r.msg),
60                                             record_values.as_slice()
61                                            ),
62                                            &r.logger_values
63                                            ).unwrap();
64                        }
65                        AsyncMsg::Finish => return,
66                    }
67                }
68        });
69
70        Async{
71            ref_sender: Mutex::new(tx),
72            tl_sender: thread_local::ThreadLocal::new(),
73            join: Mutex::new(Some(join)),
74        }
75    }
76
77    fn get_sender(&self) -> &mpsc::Sender<AsyncMsg> {
78        self.tl_sender.get_or(|| {
79            // TODO: Change to `get_or_try` https://github.com/Amanieu/thread_local-rs/issues/2
80            Box::new(self.ref_sender.lock().unwrap().clone())
81        })
82    }
83
84    /// Send `AsyncRecord` to a worker thread.
85    fn send(&self, r: AsyncRecord) -> io::Result<()> {
86        let sender = self.get_sender();
87
88        sender.send(AsyncMsg::Record(r))
89            .map_err(|_| io::Error::new(io::ErrorKind::BrokenPipe, "Send failed"))
90    }
91
92}
93
94type RecordValues = Vec<(&'static str, Box<Serialize + Send>)>;
95
96struct ToSendSerializer {
97    record_values: RecordValues,
98}
99
100impl ToSendSerializer {
101    fn new() -> Self {
102        ToSendSerializer { record_values: Vec::new() }
103    }
104
105    fn finish(self) -> RecordValues {
106        self.record_values
107    }
108}
109
110impl Serializer for ToSendSerializer {
111    fn emit_bool(&mut self, key: &'static str, val: bool) -> ser::Result {
112        self.record_values.push((key, Box::new(val)));
113        Ok(())
114    }
115    fn emit_unit(&mut self, key: &'static str) -> ser::Result {
116        self.record_values.push((key, Box::new(())));
117        Ok(())
118    }
119    fn emit_none(&mut self, key: &'static str) -> ser::Result {
120        let val: Option<()> = None;
121        self.record_values.push((key, Box::new(val)));
122        Ok(())
123    }
124    fn emit_char(&mut self, key: &'static str, val: char) -> ser::Result {
125        self.record_values.push((key, Box::new(val)));
126        Ok(())
127    }
128    fn emit_u8(&mut self, key: &'static str, val: u8) -> ser::Result {
129        self.record_values.push((key, Box::new(val)));
130        Ok(())
131    }
132    fn emit_i8(&mut self, key: &'static str, val: i8) -> ser::Result {
133        self.record_values.push((key, Box::new(val)));
134        Ok(())
135    }
136    fn emit_u16(&mut self, key: &'static str, val: u16) -> ser::Result {
137        self.record_values.push((key, Box::new(val)));
138        Ok(())
139    }
140    fn emit_i16(&mut self, key: &'static str, val: i16) -> ser::Result {
141        self.record_values.push((key, Box::new(val)));
142        Ok(())
143    }
144    fn emit_u32(&mut self, key: &'static str, val: u32) -> ser::Result {
145        self.record_values.push((key, Box::new(val)));
146        Ok(())
147    }
148    fn emit_i32(&mut self, key: &'static str, val: i32) -> ser::Result {
149        self.record_values.push((key, Box::new(val)));
150        Ok(())
151    }
152    fn emit_f32(&mut self, key: &'static str, val: f32) -> ser::Result {
153        self.record_values.push((key, Box::new(val)));
154        Ok(())
155    }
156    fn emit_u64(&mut self, key: &'static str, val: u64) -> ser::Result {
157        self.record_values.push((key, Box::new(val)));
158        Ok(())
159    }
160    fn emit_i64(&mut self, key: &'static str, val: i64) -> ser::Result {
161        self.record_values.push((key, Box::new(val)));
162        Ok(())
163    }
164    fn emit_f64(&mut self, key: &'static str, val: f64) -> ser::Result {
165        self.record_values.push((key, Box::new(val)));
166        Ok(())
167    }
168    fn emit_usize(&mut self, key: &'static str, val: usize) -> ser::Result {
169        self.record_values.push((key, Box::new(val)));
170        Ok(())
171    }
172    fn emit_isize(&mut self, key: &'static str, val: isize) -> ser::Result {
173        self.record_values.push((key, Box::new(val)));
174        Ok(())
175    }
176    fn emit_str(&mut self, key: &'static str, val: &str) -> ser::Result {
177        self.record_values.push((key, Box::new(String::from(val))));
178        Ok(())
179    }
180    fn emit_arguments(&mut self, key: &'static str, val: &fmt::Arguments) -> ser::Result {
181        self.record_values.push((key, Box::new(fmt::format(*val))));
182        Ok(())
183    }
184}
185
186
187impl Drain for Async {
188    type Error = io::Error;
189
190    fn log(&self, record: &Record, logger_values: &OwnedKeyValueList) -> io::Result<()> {
191
192        let mut ser = ToSendSerializer::new();
193        for &(k, v) in record.values() {
194            try!(v.serialize(record, k, &mut ser))
195        }
196
197        self.send(AsyncRecord {
198            msg: fmt::format(record.msg()),
199            level: record.level(),
200            file: record.file(),
201            line: record.line(),
202            column: record.column(),
203            function: record.function(),
204            module: record.module(),
205            target: String::from(record.target()),
206            logger_values: logger_values.clone(),
207            record_values: ser.finish(),
208        })
209    }
210}
211
212struct AsyncRecord {
213    msg: String,
214    level: Level,
215    file: &'static str,
216    line: u32,
217    column: u32,
218    function: &'static str,
219    module: &'static str,
220    target: String,
221    logger_values: OwnedKeyValueList,
222    record_values: Vec<(&'static str, Box<Serialize + Send>)>,
223}
224
225enum AsyncMsg {
226    Record(AsyncRecord),
227    Finish,
228}
229
230impl Drop for Async {
231    fn drop(&mut self) {
232        let sender = self.get_sender();
233
234        let _ = sender.send(AsyncMsg::Finish);
235        let _ = self.join.lock().unwrap().take().unwrap().join();
236    }
237}