timely_logging/lib.rs
1
2use std::rc::Rc;
3use std::cell::RefCell;
4use std::any::Any;
5use std::collections::HashMap;
6use std::time::{Instant, Duration};
7use std::fmt::{self, Debug};
8
9pub struct Registry<Id> {
10 /// A worker-specific identifier.
11 id: Id,
12 /// A map from names to typed loggers.
13 map: HashMap<String, (Box<dyn Any>, Box<dyn Flush>)>,
14 /// An instant common to all logging statements.
15 time: Instant,
16}
17
18impl<Id: Clone+'static> Registry<Id> {
19 /// Binds a log name to an action on log event batches.
20 ///
21 /// This method also returns any pre-installed action, rather than overwriting it
22 /// and pivoting the logging destination mid-stream. New loggers with this name will
23 /// use the new destination, and existing loggers will use the old destination.
24 ///
25 /// The action should respond to a sequence of events with non-decreasing timestamps
26 /// (Durations) and well as a timestamp that lower bounds the next event that could be
27 /// seen (likely greater or equal to the timestamp of the last event). The end of a
28 /// logging stream is indicated only by dropping the associated action, which can be
29 /// accomplished with `remove` (or a call to insert, though this is not recommended).
30 pub fn insert<T: 'static, F: FnMut(&Duration, &mut Vec<(Duration, Id, T)>)+'static>(
31 &mut self,
32 name: &str,
33 action: F) -> Option<Box<dyn Any>>
34 {
35 let logger = Logger::<T, Id>::new(self.time, Duration::default(), self.id.clone(), action);
36 self.insert_logger(name, logger)
37 }
38
39 /// Binds a log name to a logger.
40 pub fn insert_logger<T: 'static>(
41 &mut self,
42 name: &str,
43 logger: Logger<T, Id>) -> Option<Box<dyn Any>>
44 {
45 self.map.insert(name.to_owned(), (Box::new(logger.clone()), Box::new(logger))).map(|x| x.0)
46 }
47
48 /// Removes a bound logger.
49 ///
50 /// This is intended primarily to close a logging stream and let the associated writer
51 /// communicate that the stream is closed to any consumers. If a binding is not removed,
52 /// then the stream cannot be complete as in principle anyone could acquire a handle to
53 /// the logger and start further logging.
54 pub fn remove(&mut self, name: &str) -> Option<Box<dyn Any>> {
55 self.map.remove(name).map(|x| x.0)
56 }
57
58 /// Retrieves a shared logger, if one has been inserted.
59 pub fn get<T: 'static>(&self, name: &str) -> Option<Logger<T, Id>> {
60 self.map
61 .get(name)
62 .and_then(|entry| entry.0.downcast_ref::<Logger<T, Id>>())
63 .map(|x| (*x).clone())
64 }
65
66 /// Creates a new logger registry.
67 pub fn new(time: Instant, id: Id) -> Self {
68 Registry {
69 id,
70 time,
71 map: HashMap::new(),
72 }
73 }
74
75 /// Flushes all registered logs.
76 pub fn flush(&mut self) {
77 <Self as Flush>::flush(self);
78 }
79}
80
81impl<Id> Flush for Registry<Id> {
82 fn flush(&mut self) {
83 for value in self.map.values_mut() {
84 value.1.flush();
85 }
86 }
87}
88
89/// A buffering logger.
90#[derive(Debug)]
91pub struct Logger<T, E> {
92 inner: Rc<RefCell<LoggerInner<T, E, dyn FnMut(&Duration, &mut Vec<(Duration, E, T)>)>>>,
93}
94
95impl<T, E: Clone> Clone for Logger<T, E> {
96 fn clone(&self) -> Self {
97 Self {
98 inner: self.inner.clone()
99 }
100 }
101}
102
103struct LoggerInner<T, E, A: ?Sized + FnMut(&Duration, &mut Vec<(Duration, E, T)>)> {
104 id: E,
105 /// common instant used for all loggers.
106 time: Instant,
107 /// offset to allow re-calibration.
108 offset: Duration,
109 /// shared buffer of accumulated log events
110 buffer: Vec<(Duration, E, T)>,
111 /// action to take on full log buffers.
112 action: A,
113}
114
115impl<T, E: Clone> Logger<T, E> {
116 /// Allocates a new shareable logger bound to a write destination.
117 pub fn new<F>(time: Instant, offset: Duration, id: E, action: F) -> Self
118 where
119 F: FnMut(&Duration, &mut Vec<(Duration, E, T)>)+'static
120 {
121 let inner = LoggerInner {
122 id,
123 time,
124 offset,
125 action,
126 buffer: Vec::with_capacity(LoggerInner::<T, E, F>::buffer_capacity()),
127 };
128 let inner = Rc::new(RefCell::new(inner));
129 Logger { inner }
130 }
131
132 /// Logs an event.
133 ///
134 /// The event has its timestamp recorded at the moment of logging, but it may be delayed
135 /// due to buffering. It will be written when the logger is next flushed, either due to
136 /// the buffer reaching capacity or a direct call to flush.
137 ///
138 /// This implementation borrows a shared (but thread-local) buffer of log events, to ensure
139 /// that the `action` only sees one stream of events with increasing timestamps. This may
140 /// have a cost that we don't entirely understand.
141 pub fn log<S: Into<T>>(&self, event: S) {
142 self.log_many(Some(event));
143 }
144
145 /// Logs multiple events.
146 ///
147 /// The event has its timestamp recorded at the moment of logging, but it may be delayed
148 /// due to buffering. It will be written when the logger is next flushed, either due to
149 /// the buffer reaching capacity or a direct call to flush.
150 ///
151 /// All events in this call will have the same timestamp. This can be more performant due
152 /// to fewer `time.elapsed()` calls, but it also allows some logged events to appear to be
153 /// "transactional", occurring at the same moment.
154 ///
155 /// This implementation borrows a shared (but thread-local) buffer of log events, to ensure
156 /// that the `action` only sees one stream of events with increasing timestamps. This may
157 /// have a cost that we don't entirely understand.
158 pub fn log_many<I>(&self, events: I)
159 where I: IntoIterator, I::Item: Into<T>
160 {
161 self.inner.borrow_mut().log_many(events)
162 }
163
164 /// Flushes logged messages and communicates the new minimal timestamp.
165 pub fn flush(&mut self) {
166 <Self as Flush>::flush(self);
167 }
168}
169
170impl<T, E: Clone, A: ?Sized + FnMut(&Duration, &mut Vec<(Duration, E, T)>)> LoggerInner<T, E, A> {
171
172 /// The upper limit for buffers to allocate, size in bytes. [Self::buffer_capacity] converts
173 /// this to size in elements.
174 const BUFFER_SIZE_BYTES: usize = 1 << 13;
175
176 /// The maximum buffer capacity in elements. Returns a number between [Self::BUFFER_SIZE_BYTES]
177 /// and 1, inclusively.
178 // TODO: This fn is not const because it cannot depend on non-Sized generic parameters
179 fn buffer_capacity() -> usize {
180 let size = ::std::mem::size_of::<(Duration, E, T)>();
181 if size == 0 {
182 Self::BUFFER_SIZE_BYTES
183 } else if size <= Self::BUFFER_SIZE_BYTES {
184 Self::BUFFER_SIZE_BYTES / size
185 } else {
186 1
187 }
188 }
189
190 pub fn log_many<I>(&mut self, events: I)
191 where I: IntoIterator, I::Item: Into<T>
192 {
193 let elapsed = self.time.elapsed() + self.offset;
194 for event in events {
195 self.buffer.push((elapsed, self.id.clone(), event.into()));
196 if self.buffer.len() == self.buffer.capacity() {
197 // Would call `self.flush()`, but for `RefCell` panic.
198 (self.action)(&elapsed, &mut self.buffer);
199 // The buffer clear could plausibly be removed, changing the semantics but allowing users
200 // to do in-place updates without forcing them to take ownership.
201 self.buffer.clear();
202 let buffer_capacity = self.buffer.capacity();
203 if buffer_capacity < Self::buffer_capacity() {
204 self.buffer.reserve((buffer_capacity+1).next_power_of_two());
205 }
206 }
207 }
208 }
209}
210
211/// Flush on the *last* drop of a logger.
212impl<T, E, A: ?Sized + FnMut(&Duration, &mut Vec<(Duration, E, T)>)> Drop for LoggerInner<T, E, A> {
213 fn drop(&mut self) {
214 // Avoid sending out empty buffers just because of drops.
215 if !self.buffer.is_empty() {
216 self.flush();
217 }
218 }
219}
220
221impl<T, E, A: ?Sized + FnMut(&Duration, &mut Vec<(Duration, E, T)>)> Debug for LoggerInner<T, E, A>
222where
223 E: Debug,
224 T: Debug,
225{
226 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
227 f.debug_struct("LoggerInner")
228 .field("id", &self.id)
229 .field("time", &self.time)
230 .field("offset", &self.offset)
231 .field("action", &"FnMut")
232 .field("buffer", &self.buffer)
233 .finish()
234 }
235}
236
237/// Types that can be flushed.
238trait Flush {
239 /// Flushes buffered data.
240 fn flush(&mut self);
241}
242
243impl<T, E> Flush for Logger<T, E> {
244 fn flush(&mut self) {
245 self.inner.borrow_mut().flush()
246 }
247}
248
249impl<T, E, A: ?Sized + FnMut(&Duration, &mut Vec<(Duration, E, T)>)> Flush for LoggerInner<T, E, A> {
250 fn flush(&mut self) {
251 let elapsed = self.time.elapsed() + self.offset;
252 if !self.buffer.is_empty() {
253 (self.action)(&elapsed, &mut self.buffer);
254 self.buffer.clear();
255 // NB: This does not re-allocate any specific size if the buffer has been
256 // taken. The intent is that the geometric growth in `log_many` should be
257 // enough to ensure that we do not send too many small buffers, nor do we
258 // allocate too large buffers when they are not needed.
259 }
260 else {
261 // Avoid swapping resources for empty buffers.
262 (self.action)(&elapsed, &mut Vec::new());
263 }
264 }
265}