audis/lib.rs
1//! Audis is an implementation of a multi-index audit log,
2//! built atop the Redis key-value store solution.
3//!
4//! An audit log consists of zero or more Event objects,
5//! indexed against one or more subjects, each.
6//! The correspondence between Redis databases and audit
7//! logs is 1:1 -- each audit log inhabits precisely one
8//! Redis instance, and an instance can only house a single
9//! audit log.
10//!
11//! ## Example: Logging Events
12//!
13//! The simplest way to use audis is to point it at a Redis
14//! instance and start logging to it:
15//!
16//! ```rust,no_run
17//! extern crate audis;
18//!
19//! fn main() {
20//! let client = audis::Client::connect("redis://127.0.0.1:6379").unwrap();
21//!
22//! client.log(&audis::Event{
23//! id: "foo1".to_string(),
24//! data: "{\"some\":\"data\"}".to_string(),
25//! subjects: vec![
26//! "system".to_string(),
27//! "user:42".to_string(),
28//! ],
29//! }).unwrap();
30//!
31//! // ... etc ...
32//! }
33//! ```
34//!
35//! ## Retrieving The Audit Log
36//!
37//! What good is an audit log if you can't ever review it?
38//!
39//! ```rust,no_run
40//! extern crate audis;
41//!
42//! fn main() {
43//! let client = audis::Client::connect("redis://127.0.0.1:6379").unwrap();
44//!
45//! for subject in &client.subjects().unwrap() {
46//! println!("## {} ######################", subject);
47//! for event in &client.retrieve(subject).unwrap() {
48//! println!(" {}", event.data);
49//! }
50//! println!("");
51//! }
52//! }
53//! ```
54//!
55//! ## Distributed Audit Logging via Threads
56//!
57//! A common pattern with audis is to delegate a single thread
58//! to the task of shunting event logs into Redis, allowing other
59//! threads to focus on their work, without being slowed down by
60//! momentary hiccups in the auditing layer.
61//!
62//! You can do this via the `background()` function, which returns
63//! a buffered channel you can send events to, and the join handle
64//! of the executing background thread:
65//!
66//! ```rust,no_run
67//! extern crate audis;
68//!
69//! fn main() {
70//! let client = audis::Client::connect("redis://127.0.0.1:6379").unwrap();
71//!
72//! // buffer 50 events
73//! let (tx, thread) = client.background(50).unwrap();
74//!
75//! tx.send(audis::Event{
76//! id: "foo1".to_string(),
77//! data: "{\"some\":\"data\"}".to_string(),
78//! subjects: vec![
79//! "system".to_string(),
80//! "user:42".to_string(),
81//! ],
82//! }).unwrap();
83//!
84//! // ... etc ...
85//!
86//! thread.join().unwrap();
87//! }
88//! ```
89//!
90//! ## Implementation Details
91//!
92//! Audis uses four (4) types of objects A) the events
93//! themselves, B) reference counts for inserted events,
94//! C) per-subject lists of events, sorted in insertion-order,
95//! and D) a master list of all known subjects.
96//!
97//! Each audit event is stored as an opaque blob, usually
98//! JSON -- for the purposes of this library, the exact contents
99//! of events is immaterial. Each event gets its own Redis key,
100//! derived from its globally unique ID, in the form `audit:$id`.
101//! This makes retrieving events an _O(1)_ operation, using the
102//! Redis `GET` command.
103//!
104//! Accompanying each event object is a reference count, kept
105//! under a parallel keying structure that appends `:ref` to the
106//! main event object key. These reference counts are integers
107//! that track how many different subjects are currently still
108//! referencing the given event. For example, `audit:ae2:ref`
109//! is the reference count key for `audit:ae2`.
110//!
111//! Each subject in the audit log maintains its own list of
112//! event IDs that are relevant to it. These lists are stored
113//! under keys derived from the subject itself. Callers are
114//! strongly urged to ensure that subject names are as unique
115//! as they need to be for analysis.
116//!
117//! Finally, a single Redis Set, called `subjects`, exists to
118//! track the complete set of known subject strings. This
119//! facilitates discovery of the different subsets of the audit
120//! log.
121//!
122//! Here is some pseudocode for the insertion logic of the
123//! `LOG(e)` operation, where `e` is an object:
124//!
125//! ```redis-pseudo-code
126//! LOG(e):
127//! var id = $e[id]
128//! SETNX "audit:$id" $e[data]
129//! for s in $e[subjects]:
130//! SADD "subjects" "$s"
131//! RPUSH "$s" "$id"
132//! INCR "audit:$id:ref"
133//! ```
134//!
135//! Technically speaking, `LOG(e)` runs in _O(n)_, linearly
136//! to the number of subjects that the audit log event applies
137//! to. However, given that this `n` is usually very small
138//! (almost always < 100), `LOG(e)` performs well.
139//!
140//! `RETR(s)` is straightforward: iterate over the subject list
141//! in Redis via `LRANGE` and then `GET` the referenced event
142//! objects:
143//!
144//! ```redis-pseudo-code
145//! RETR(s):
146//! var log = []
147//! for id in LRANGE "$s" 0 -1:
148//! $log.append( GET "audit:$id" )
149//! return $log
150//! ```
151//!
152//! Since `LOG(e)` only ever adds to our audit log dataset,
153//! and `RETR(s)` is a read-only operation, our Redis footprint
154//! will forever grow, unless we define operations to clear out
155//! old log entries. Deleting parts of our audit log seems
156//! wrong and counter-productive -- the whole point is to know
157//! what happened! Storage (especially memory), however, isn't
158//! unlimited, and for debugging purposes (at least), audit log
159//! events become less relevant over time.
160//!
161//! For these reasons, we define two pruning operations:
162//! `TRUNC(s,n)`, for truncating a subject eventset to the most
163//! recent `n` audit events, and `PURGE(s,last)`, for deleting
164//! a events from a subject eventset, until a given ID is found.
165//! (That ID will also be removed, as well).
166//!
167//! These two operations allow us to define a cleanup routine,
168//! outside of the audis library, which can do things like
169//! render and persist audit events to a log file in a filesystem
170//! or external blobstore (i.e. S3), before ultimately deleting
171//! them from Redis.
172//!
173//! Here is the pseudo-code for `TRUNC(s,n)`:
174//!
175//! ```redis-pseudo-code
176//! TRUNC(s,n):
177//! var end = 0 - n - 1
178//! for id in LRANGE "$s" 0 $end:
179//! LPOP "$s"
180//! DECR "audit:$id:ref"
181//! if GET "audit:$id:ref" <= 0:
182//! DEL "audit:$id:ref" "audit:$id"
183//! ```
184//!
185//! As events are truncated from the subject's index, the
186//! associated reference counts are checked to determine if any
187//! larger cleanup (via `DEL`) needs to be performed.
188//!
189//! `PURGE(s,last)` is similar:
190//!
191//! ```redis-pseudo-code
192//! PURGE(s,last):
193//! for id in LRANGE "$s" 0 -1:
194//! LPOP "$s"
195//! DECR "audit:$id:ref"
196//! if GET "audit:$id:ref" <= 0:
197//! DEL "audit:$id:ref" "audit:$id"
198//! if $id == $last
199//! break
200//! ```
201//!
202//! Both of these operations suffer from massive problems
203//! when run concurrently with each other, or with other
204//! calls to themselves. A future version of this library
205//! will correct this, by the judicious use of `LOCK()`/`UNLOCK()`
206//! primitives implemented inside of the same Redis database.
207//!
208
209use redis;
210use std::sync::mpsc::{sync_channel, SyncSender};
211use std::thread::{spawn, JoinHandle};
212
213macro_rules! id {
214 ($x:expr) => {
215 format!("audit:{}", $x)
216 };
217}
218
219macro_rules! idref {
220 ($x:expr) => {
221 format!("audit:{}:ref", $x)
222 };
223}
224
225pub type AudisResult<T> = redis::RedisResult<T>;
226
227/// A single Redis endpoint housing an audit log.
228pub struct Client {
229 url: String,
230 redis: redis::Client,
231}
232
233/// An event, suitable for logging in the audit log.
234pub struct Event {
235 pub id: String,
236 pub data: String,
237 pub subjects: Vec<String>,
238}
239
240impl Client {
241 /// Connect to a Redis instance, by URL.
242 ///
243 /// This implementation understands the same URL formats
244 /// that the underlying `redis` crate understands.
245 /// Primarily, this means the following should work:
246 ///
247 /// - redis://127.0.0.1:6379
248 /// - redis://localhost
249 /// - unix:/path/to/redis.sock
250 ///
251 pub fn connect(url: &str) -> AudisResult<Client> {
252 let c = Client {
253 url: url.to_string(),
254 redis: redis::Client::open(url)?,
255 };
256 match c.ping() {
257 Ok(_) => Ok(c),
258 Err(e) => Err(e),
259 }
260 }
261
262 /// Delegate event logging to a background thread.
263 ///
264 /// This function spins up a new thread, with a copy of the
265 /// audis Client object, and returns a channel for sending
266 /// new audis::Event objects to be logged, and the thread
267 /// JoinHandle for waiting on the thread to finish.
268 ///
269 /// The sending channel is buffered, and will have enough
270 /// space to keep `n` Event objects in memory. If `n` is
271 /// passed as zero, a suitable default will be used instead.
272 ///
273 /// If the background thread encounters an error while trying
274 /// to log an Event to the Redis backend, it will print out
275 /// the error and attempt to recover.
276 ///
277 /// To shut down the background thread, drop the returned
278 /// SyncSender<Event> object and then join the thread's
279 /// JoinHandle.
280 ///
281 pub fn background(&self, n: usize) -> AudisResult<(SyncSender<Event>, JoinHandle<()>)> {
282 let c = Client {
283 url: self.url.to_string(),
284 redis: redis::Client::open(self.url.as_str())?,
285 };
286 let (tx, rx) = sync_channel(if n == 0 { 100 } else { n });
287
288 let t = spawn(move || {
289 for e in rx {
290 match c.log(&e) {
291 Err(err) => println!("audis failed to log event {}: {}", e.id, err),
292 Ok(_) => (),
293 };
294 }
295 });
296
297 Ok((tx, t))
298 }
299
300 /// Return the list of all known subjects.
301 pub fn subjects(&self) -> AudisResult<Vec<String>> {
302 self.smembers("subjects")
303 }
304
305 /// Log an event to the audit log.
306 pub fn log(&self, e: &Event) -> AudisResult<&Client> {
307 self.setnx(&id!(e.id), &e.data)?;
308 for s in &e.subjects {
309 self.sadd("subjects", s)?.rpush(s, &e.id)?.incr(&e.id)?;
310 }
311 Ok(self)
312 }
313
314 /// Retrieve the full list of events for the given subject.
315 pub fn retrieve(&self, log: &str) -> AudisResult<Vec<Event>> {
316 let mut events: Vec<Event> = vec![];
317 for id in self.lrange(&log, "0", "-1")? {
318 events.push(Event {
319 id: String::from(&id),
320 data: self.get(&id!(id))?,
321 subjects: vec![],
322 })
323 }
324
325 Ok(events)
326 }
327
328 /// Truncate a subject so that it only contains `n` Events.
329 pub fn truncate(&self, log: &str, n: u32) -> AudisResult<&Client> {
330 for id in self.lrange(&log, "0", &format!("-{}", n + 1))? {
331 self.lpop(&log)?.deref(&id)?;
332 }
333 Ok(self)
334 }
335
336 /// Delete the Event `last` and all prior events from a given subject.
337 pub fn purge(&self, log: &str, last: &str) -> AudisResult<&Client> {
338 for id in self.lrange(&log, "0", "-1")? {
339 self.lpop(&log)?.deref(&id)?;
340 if id == last {
341 break;
342 }
343 }
344
345 Ok(self)
346 }
347
348 fn query<T: redis::FromRedisValue>(&self, cmd: &mut redis::Cmd) -> AudisResult<T> {
349 cmd.query(&mut self.redis.get_connection()?)
350 }
351
352 fn ping(&self) -> AudisResult<&Client> {
353 self.query(&mut redis::cmd("PING"))?;
354 Ok(self)
355 }
356
357 fn lrange(&self, key: &str, a: &str, b: &str) -> AudisResult<Vec<String>> {
358 self.query(redis::cmd("LRANGE").arg(key).arg(a).arg(b))
359 }
360
361 fn smembers(&self, key: &str) -> AudisResult<Vec<String>> {
362 self.query(redis::cmd("SMEMBERS").arg(key))
363 }
364
365 fn rpush(&self, log: &str, id: &str) -> AudisResult<&Client> {
366 self.query(redis::cmd("RPUSH").arg(log).arg(id))?;
367 Ok(self)
368 }
369
370 fn lpop(&self, log: &str) -> AudisResult<&Client> {
371 self.query(redis::cmd("LPOP").arg(log))?;
372 Ok(self)
373 }
374
375 fn decr(&self, key: &str) -> AudisResult<&Client> {
376 self.query(redis::cmd("DECR").arg(key))?;
377 Ok(self)
378 }
379
380 fn incr(&self, key: &str) -> AudisResult<&Client> {
381 self.query(redis::cmd("INCR").arg(key))?;
382 Ok(self)
383 }
384
385 fn setnx(&self, key: &str, data: &str) -> AudisResult<&Client> {
386 let s: i32 = self.query(redis::cmd("SETNX").arg(key).arg(data))?;
387 if s == 1 {
388 Ok(self)
389 } else {
390 Err(redis::RedisError::from((
391 redis::ErrorKind::IoError,
392 "duplicate key detected",
393 )))
394 }
395 }
396
397 fn sadd(&self, key: &str, data: &str) -> AudisResult<&Client> {
398 self.query(redis::cmd("SADD").arg(key).arg(data))?;
399 Ok(self)
400 }
401
402 fn get(&self, key: &str) -> AudisResult<String> {
403 self.query(redis::cmd("GET").arg(key))
404 }
405
406 fn del(&self, id: &str) -> AudisResult<&Client> {
407 self.query(redis::cmd("DEL").arg(id!(id)).arg(idref!(id)))?;
408 Ok(self)
409 }
410
411 // Dereference (and possibly delete) an audit event.
412 fn deref(&self, id: &str) -> AudisResult<&Client> {
413 if self.decr(&idref!(id))?.get(&idref!(id))? == "0" {
414 self.del(id)?;
415 }
416 Ok(self)
417 }
418}