timely_logging/
lib.rs

1
2use std::rc::Rc;
3use std::cell::RefCell;
4use std::any::Any;
5use std::collections::HashMap;
6use std::time::{Instant, Duration};
7use std::fmt::{self, Debug};
8
9pub struct Registry<Id> {
10    /// A worker-specific identifier.
11    id: Id,
12    /// A map from names to typed loggers.
13    map: HashMap<String, (Box<dyn Any>, Box<dyn Flush>)>,
14    /// An instant common to all logging statements.
15    time: Instant,
16}
17
18impl<Id: Clone+'static> Registry<Id> {
19    /// Binds a log name to an action on log event batches.
20    ///
21    /// This method also returns any pre-installed action, rather than overwriting it
22    /// and pivoting the logging destination mid-stream. New loggers with this name will
23    /// use the new destination, and existing loggers will use the old destination.
24    ///
25    /// The action should respond to a sequence of events with non-decreasing timestamps
26    /// (Durations) and well as a timestamp that lower bounds the next event that could be
27    /// seen (likely greater or equal to the timestamp of the last event). The end of a
28    /// logging stream is indicated only by dropping the associated action, which can be
29    /// accomplished with `remove` (or a call to insert, though this is not recommended).
30    pub fn insert<T: 'static, F: FnMut(&Duration, &mut Vec<(Duration, Id, T)>)+'static>(
31        &mut self,
32        name: &str,
33        action: F) -> Option<Box<dyn Any>>
34    {
35        let logger = Logger::<T, Id>::new(self.time, Duration::default(), self.id.clone(), action);
36        self.insert_logger(name, logger)
37    }
38
39    /// Binds a log name to a logger.
40    pub fn insert_logger<T: 'static>(
41        &mut self,
42        name: &str,
43        logger: Logger<T, Id>) -> Option<Box<dyn Any>>
44    {
45        self.map.insert(name.to_owned(), (Box::new(logger.clone()), Box::new(logger))).map(|x| x.0)
46    }
47
48    /// Removes a bound logger.
49    ///
50    /// This is intended primarily to close a logging stream and let the associated writer
51    /// communicate that the stream is closed to any consumers. If a binding is not removed,
52    /// then the stream cannot be complete as in principle anyone could acquire a handle to
53    /// the logger and start further logging.
54    pub fn remove(&mut self, name: &str) -> Option<Box<dyn Any>> {
55        self.map.remove(name).map(|x| x.0)
56    }
57
58    /// Retrieves a shared logger, if one has been inserted.
59    pub fn get<T: 'static>(&self, name: &str) -> Option<Logger<T, Id>> {
60        self.map
61            .get(name)
62            .and_then(|entry| entry.0.downcast_ref::<Logger<T, Id>>())
63            .map(|x| (*x).clone())
64    }
65
66    /// Creates a new logger registry.
67    pub fn new(time: Instant, id: Id) -> Self {
68        Registry {
69            id,
70            time,
71            map: HashMap::new(),
72        }
73    }
74
75    /// Flushes all registered logs.
76    pub fn flush(&mut self) {
77        <Self as Flush>::flush(self);
78    }
79}
80
81impl<Id> Flush for Registry<Id> {
82    fn flush(&mut self) {
83        for value in self.map.values_mut() {
84            value.1.flush();
85        }
86    }
87}
88
89/// A buffering logger.
90#[derive(Debug)]
91pub struct Logger<T, E> {
92    inner: Rc<RefCell<LoggerInner<T, E, dyn FnMut(&Duration, &mut Vec<(Duration, E, T)>)>>>,
93}
94
95impl<T, E: Clone> Clone for Logger<T, E> {
96    fn clone(&self) -> Self {
97        Self {
98            inner: self.inner.clone()
99        }
100    }
101}
102
103struct LoggerInner<T, E, A: ?Sized + FnMut(&Duration, &mut Vec<(Duration, E, T)>)> {
104    id:     E,
105    /// common instant used for all loggers.
106    time:   Instant,
107    /// offset to allow re-calibration.
108    offset: Duration,
109    /// shared buffer of accumulated log events
110    buffer: Vec<(Duration, E, T)>,
111    /// action to take on full log buffers.
112    action: A,
113}
114
115impl<T, E: Clone> Logger<T, E> {
116    /// Allocates a new shareable logger bound to a write destination.
117    pub fn new<F>(time: Instant, offset: Duration, id: E, action: F) -> Self
118    where
119        F: FnMut(&Duration, &mut Vec<(Duration, E, T)>)+'static
120    {
121        let inner = LoggerInner {
122            id,
123            time,
124            offset,
125            action,
126            buffer: Vec::with_capacity(LoggerInner::<T, E, F>::buffer_capacity()),
127        };
128        let inner = Rc::new(RefCell::new(inner));
129        Logger { inner }
130    }
131
132    /// Logs an event.
133    ///
134    /// The event has its timestamp recorded at the moment of logging, but it may be delayed
135    /// due to buffering. It will be written when the logger is next flushed, either due to
136    /// the buffer reaching capacity or a direct call to flush.
137    ///
138    /// This implementation borrows a shared (but thread-local) buffer of log events, to ensure
139    /// that the `action` only sees one stream of events with increasing timestamps. This may
140    /// have a cost that we don't entirely understand.
141    pub fn log<S: Into<T>>(&self, event: S) {
142        self.log_many(Some(event));
143    }
144
145    /// Logs multiple events.
146    ///
147    /// The event has its timestamp recorded at the moment of logging, but it may be delayed
148    /// due to buffering. It will be written when the logger is next flushed, either due to
149    /// the buffer reaching capacity or a direct call to flush.
150    ///
151    /// All events in this call will have the same timestamp. This can be more performant due
152    /// to fewer `time.elapsed()` calls, but it also allows some logged events to appear to be
153    /// "transactional", occurring at the same moment.
154    ///
155    /// This implementation borrows a shared (but thread-local) buffer of log events, to ensure
156    /// that the `action` only sees one stream of events with increasing timestamps. This may
157    /// have a cost that we don't entirely understand.
158    pub fn log_many<I>(&self, events: I)
159    where I: IntoIterator, I::Item: Into<T>
160    {
161        self.inner.borrow_mut().log_many(events)
162    }
163
164    /// Flushes logged messages and communicates the new minimal timestamp.
165    pub fn flush(&mut self) {
166        <Self as Flush>::flush(self);
167    }
168}
169
170impl<T, E: Clone, A: ?Sized + FnMut(&Duration, &mut Vec<(Duration, E, T)>)> LoggerInner<T, E, A> {
171
172    /// The upper limit for buffers to allocate, size in bytes. [Self::buffer_capacity] converts
173    /// this to size in elements.
174    const BUFFER_SIZE_BYTES: usize = 1 << 13;
175
176    /// The maximum buffer capacity in elements. Returns a number between [Self::BUFFER_SIZE_BYTES]
177    /// and 1, inclusively.
178    // TODO: This fn is not const because it cannot depend on non-Sized generic parameters
179    fn buffer_capacity() -> usize {
180        let size =  ::std::mem::size_of::<(Duration, E, T)>();
181        if size == 0 {
182            Self::BUFFER_SIZE_BYTES
183        } else if size <= Self::BUFFER_SIZE_BYTES {
184            Self::BUFFER_SIZE_BYTES / size
185        } else {
186            1
187        }
188    }
189
190    pub fn log_many<I>(&mut self, events: I)
191        where I: IntoIterator, I::Item: Into<T>
192    {
193        let elapsed = self.time.elapsed() + self.offset;
194        for event in events {
195            self.buffer.push((elapsed, self.id.clone(), event.into()));
196            if self.buffer.len() == self.buffer.capacity() {
197                // Would call `self.flush()`, but for `RefCell` panic.
198                (self.action)(&elapsed, &mut self.buffer);
199                // The buffer clear could plausibly be removed, changing the semantics but allowing users
200                // to do in-place updates without forcing them to take ownership.
201                self.buffer.clear();
202                let buffer_capacity = self.buffer.capacity();
203                if buffer_capacity < Self::buffer_capacity() {
204                    self.buffer.reserve((buffer_capacity+1).next_power_of_two());
205                }
206            }
207        }
208    }
209}
210
211/// Flush on the *last* drop of a logger.
212impl<T, E, A: ?Sized + FnMut(&Duration, &mut Vec<(Duration, E, T)>)> Drop for LoggerInner<T, E, A> {
213    fn drop(&mut self) {
214        // Avoid sending out empty buffers just because of drops.
215        if !self.buffer.is_empty() {
216            self.flush();
217        }
218    }
219}
220
221impl<T, E, A: ?Sized + FnMut(&Duration, &mut Vec<(Duration, E, T)>)> Debug for LoggerInner<T, E, A>
222where
223    E: Debug,
224    T: Debug,
225{
226    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
227        f.debug_struct("LoggerInner")
228            .field("id", &self.id)
229            .field("time", &self.time)
230            .field("offset", &self.offset)
231            .field("action", &"FnMut")
232            .field("buffer", &self.buffer)
233            .finish()
234    }
235}
236
237/// Types that can be flushed.
238trait Flush {
239    /// Flushes buffered data.
240    fn flush(&mut self);
241}
242
243impl<T, E> Flush for Logger<T, E> {
244    fn flush(&mut self) {
245        self.inner.borrow_mut().flush()
246    }
247}
248
249impl<T, E, A: ?Sized + FnMut(&Duration, &mut Vec<(Duration, E, T)>)> Flush for LoggerInner<T, E, A> {
250    fn flush(&mut self) {
251        let elapsed = self.time.elapsed() + self.offset;
252        if !self.buffer.is_empty() {
253            (self.action)(&elapsed, &mut self.buffer);
254            self.buffer.clear();
255            // NB: This does not re-allocate any specific size if the buffer has been
256            // taken. The intent is that the geometric growth in `log_many` should be
257            // enough to ensure that we do not send too many small buffers, nor do we
258            // allocate too large buffers when they are not needed.
259        }
260        else {
261            // Avoid swapping resources for empty buffers.
262            (self.action)(&elapsed, &mut Vec::new());
263        }
264    }
265}