Skip to main content

audis/
lib.rs

1//! Audis is an implementation of a multi-index audit log,
2//! built atop the Redis key-value store solution.
3//!
4//! An audit log consists of zero or more Event objects,
5//! indexed against one or more subjects, each.
6//! The correspondence between Redis databases and audit
7//! logs is 1:1 -- each audit log inhabits precisely one
8//! Redis instance, and an instance can only house a single
9//! audit log.
10//!
11//! ## Example: Logging Events
12//!
13//! The simplest way to use audis is to point it at a Redis
14//! instance and start logging to it:
15//!
16//! ```rust,no_run
17//! extern crate audis;
18//!
19//! fn main() {
20//!     let client = audis::Client::connect("redis://127.0.0.1:6379").unwrap();
21//!
22//!     client.log(&audis::Event{
23//!         id: "foo1".to_string(),
24//!         data: "{\"some\":\"data\"}".to_string(),
25//!         subjects: vec![
26//!             "system".to_string(),
27//!             "user:42".to_string(),
28//!         ],
29//!     }).unwrap();
30//!
31//!     // ... etc ...
32//! }
33//! ```
34//!
35//! ## Retrieving The Audit Log
36//!
37//! What good is an audit log if you can't ever review it?
38//!
39//! ```rust,no_run
40//! extern crate audis;
41//!
42//! fn main() {
43//!     let client = audis::Client::connect("redis://127.0.0.1:6379").unwrap();
44//!
45//!     for subject in &client.subjects().unwrap() {
46//!         println!("## {} ######################", subject);
47//!         for event in &client.retrieve(subject).unwrap() {
48//!             println!("  {}", event.data);
49//!         }
50//!         println!("");
51//!     }
52//! }
53//! ```
54//!
55//! ## Distributed Audit Logging via Threads
56//!
57//! A common pattern with audis is to delegate a single thread
58//! to the task of shunting event logs into Redis, allowing other
59//! threads to focus on their work, without being slowed down by
60//! momentary hiccups in the auditing layer.
61//!
62//! You can do this via the `background()` function, which returns
63//! a buffered channel you can send events to, and the join handle
64//! of the executing background thread:
65//!
66//! ```rust,no_run
67//! extern crate audis;
68//!
69//! fn main() {
70//!     let client = audis::Client::connect("redis://127.0.0.1:6379").unwrap();
71//!
72//!     // buffer 50 events
73//!     let (tx, thread) = client.background(50).unwrap();
74//!
75//!     tx.send(audis::Event{
76//!         id: "foo1".to_string(),
77//!         data: "{\"some\":\"data\"}".to_string(),
78//!         subjects: vec![
79//!             "system".to_string(),
80//!             "user:42".to_string(),
81//!         ],
82//!     }).unwrap();
83//!
84//!     // ... etc ...
85//!
86//!     thread.join().unwrap();
87//! }
88//! ```
89//!
90//! ## Implementation Details
91//!
92//! Audis uses four (4) types of objects A) the events
93//! themselves, B) reference counts for inserted events,
94//! C) per-subject lists of events, sorted in insertion-order,
95//! and D) a master list of all known subjects.
96//!
97//! Each audit event is stored as an opaque blob, usually
98//! JSON -- for the purposes of this library, the exact contents
99//! of events is immaterial.  Each event gets its own Redis key,
100//! derived from its globally unique ID, in the form `audit:$id`.
101//! This makes retrieving events an _O(1)_ operation, using the
102//! Redis `GET` command.
103//!
104//! Accompanying each event object is a reference count, kept
105//! under a parallel keying structure that appends `:ref` to the
106//! main event object key.  These reference counts are integers
107//! that track how many different subjects are currently still
108//! referencing the given event.  For example, `audit:ae2:ref`
109//! is the reference count key for `audit:ae2`.
110//!
111//! Each subject in the audit log maintains its own list of
112//! event IDs that are relevant to it.  These lists are stored
113//! under keys derived from the subject itself.  Callers are
114//! strongly urged to ensure that subject names are as unique
115//! as they need to be for analysis.
116//!
117//! Finally, a single Redis Set, called `subjects`, exists to
118//! track the complete set of known subject strings.  This
119//! facilitates discovery of the different subsets of the audit
120//! log.
121//!
122//! Here is some pseudocode for the insertion logic of the
123//! `LOG(e)` operation, where `e` is an object:
124//!
125//! ```redis-pseudo-code
126//! LOG(e):
127//!     var id = $e[id]
128//!     SETNX "audit:$id" $e[data]
129//!     for s in $e[subjects]:
130//!         SADD "subjects" "$s"
131//!         RPUSH "$s" "$id"
132//!         INCR "audit:$id:ref"
133//! ```
134//!
135//! Technically speaking, `LOG(e)` runs in _O(n)_, linearly
136//! to the number of subjects that the audit log event applies
137//! to.  However, given that this `n` is usually very small
138//! (almost always < 100), `LOG(e)` performs well.
139//!
140//! `RETR(s)` is straightforward: iterate over the subject list
141//! in Redis via `LRANGE` and then `GET` the referenced event
142//! objects:
143//!
144//! ```redis-pseudo-code
145//! RETR(s):
146//!     var log = []
147//!     for id in LRANGE "$s" 0 -1:
148//!         $log.append( GET "audit:$id" )
149//!     return $log
150//! ```
151//!
152//! Since `LOG(e)` only ever adds to our audit log dataset,
153//! and `RETR(s)` is a read-only operation, our Redis footprint
154//! will forever grow, unless we define operations to clear out
155//! old log entries.  Deleting parts of our audit log seems
156//! wrong and counter-productive -- the whole point is to know
157//! what happened!  Storage (especially memory), however, isn't
158//! unlimited, and for debugging purposes (at least), audit log
159//! events become less relevant over time.
160//!
161//! For these reasons, we define two pruning operations:
162//! `TRUNC(s,n)`, for truncating a subject eventset to the most
163//! recent `n` audit events, and `PURGE(s,last)`, for deleting
164//! a events from a subject eventset, until a given ID is found.
165//! (That ID will also be removed, as well).
166//!
167//! These two operations allow us to define a cleanup routine,
168//! outside of the audis library, which can do things like
169//! render and persist audit events to a log file in a filesystem
170//! or external blobstore (i.e. S3), before ultimately deleting
171//! them from Redis.
172//!
173//! Here is the pseudo-code for `TRUNC(s,n)`:
174//!
175//! ```redis-pseudo-code
176//! TRUNC(s,n):
177//!     var end = 0 - n - 1
178//!     for id in LRANGE "$s" 0 $end:
179//!         LPOP "$s"
180//!         DECR "audit:$id:ref"
181//!         if GET "audit:$id:ref" <= 0:
182//!             DEL "audit:$id:ref" "audit:$id"
183//! ```
184//!
185//! As events are truncated from the subject's index, the
186//! associated reference counts are checked to determine if any
187//! larger cleanup (via `DEL`) needs to be performed.
188//!
189//! `PURGE(s,last)` is similar:
190//!
191//! ```redis-pseudo-code
192//! PURGE(s,last):
193//!     for id in LRANGE "$s" 0 -1:
194//!         LPOP "$s"
195//!         DECR "audit:$id:ref"
196//!         if GET "audit:$id:ref" <= 0:
197//!             DEL "audit:$id:ref" "audit:$id"
198//!         if $id == $last
199//!             break
200//! ```
201//!
202//! Both of these operations suffer from massive problems
203//! when run concurrently with each other, or with other
204//! calls to themselves.  A future version of this library
205//! will correct this, by the judicious use of `LOCK()`/`UNLOCK()`
206//! primitives implemented inside of the same Redis database.
207//!
208
209use redis;
210use std::sync::mpsc::{sync_channel, SyncSender};
211use std::thread::{spawn, JoinHandle};
212
213macro_rules! id {
214    ($x:expr) => {
215        format!("audit:{}", $x)
216    };
217}
218
219macro_rules! idref {
220    ($x:expr) => {
221        format!("audit:{}:ref", $x)
222    };
223}
224
225pub type AudisResult<T> = redis::RedisResult<T>;
226
227/// A single Redis endpoint housing an audit log.
228pub struct Client {
229    url: String,
230    redis: redis::Client,
231}
232
233/// An event, suitable for logging in the audit log.
234pub struct Event {
235    pub id: String,
236    pub data: String,
237    pub subjects: Vec<String>,
238}
239
240impl Client {
241    /// Connect to a Redis instance, by URL.
242    ///
243    /// This implementation understands the same URL formats
244    /// that the underlying `redis` crate understands.
245    /// Primarily, this means the following should work:
246    ///
247    ///  - redis://127.0.0.1:6379
248    ///  - redis://localhost
249    ///  - unix:/path/to/redis.sock
250    ///
251    pub fn connect(url: &str) -> AudisResult<Client> {
252        let c = Client {
253            url: url.to_string(),
254            redis: redis::Client::open(url)?,
255        };
256        match c.ping() {
257            Ok(_) => Ok(c),
258            Err(e) => Err(e),
259        }
260    }
261
262    /// Delegate event logging to a background thread.
263    ///
264    /// This function spins up a new thread, with a copy of the
265    /// audis Client object, and returns a channel for sending
266    /// new audis::Event objects to be logged, and the thread
267    /// JoinHandle for waiting on the thread to finish.
268    ///
269    /// The sending channel is buffered, and will have enough
270    /// space to keep `n` Event objects in memory.  If `n` is
271    /// passed as zero, a suitable default will be used instead.
272    ///
273    /// If the background thread encounters an error while trying
274    /// to log an Event to the Redis backend, it will print out
275    /// the error and attempt to recover.
276    ///
277    /// To shut down the background thread, drop the returned
278    /// SyncSender<Event> object and then join the thread's
279    /// JoinHandle.
280    ///
281    pub fn background(&self, n: usize) -> AudisResult<(SyncSender<Event>, JoinHandle<()>)> {
282        let c = Client {
283            url: self.url.to_string(),
284            redis: redis::Client::open(self.url.as_str())?,
285        };
286        let (tx, rx) = sync_channel(if n == 0 { 100 } else { n });
287
288        let t = spawn(move || {
289            for e in rx {
290                match c.log(&e) {
291                    Err(err) => println!("audis failed to log event {}: {}", e.id, err),
292                    Ok(_) => (),
293                };
294            }
295        });
296
297        Ok((tx, t))
298    }
299
300    /// Return the list of all known subjects.
301    pub fn subjects(&self) -> AudisResult<Vec<String>> {
302        self.smembers("subjects")
303    }
304
305    /// Log an event to the audit log.
306    pub fn log(&self, e: &Event) -> AudisResult<&Client> {
307        self.setnx(&id!(e.id), &e.data)?;
308        for s in &e.subjects {
309            self.sadd("subjects", s)?.rpush(s, &e.id)?.incr(&e.id)?;
310        }
311        Ok(self)
312    }
313
314    /// Retrieve the full list of events for the given subject.
315    pub fn retrieve(&self, log: &str) -> AudisResult<Vec<Event>> {
316        let mut events: Vec<Event> = vec![];
317        for id in self.lrange(&log, "0", "-1")? {
318            events.push(Event {
319                id: String::from(&id),
320                data: self.get(&id!(id))?,
321                subjects: vec![],
322            })
323        }
324
325        Ok(events)
326    }
327
328    /// Truncate a subject so that it only contains `n` Events.
329    pub fn truncate(&self, log: &str, n: u32) -> AudisResult<&Client> {
330        for id in self.lrange(&log, "0", &format!("-{}", n + 1))? {
331            self.lpop(&log)?.deref(&id)?;
332        }
333        Ok(self)
334    }
335
336    /// Delete the Event `last` and all prior events from a given subject.
337    pub fn purge(&self, log: &str, last: &str) -> AudisResult<&Client> {
338        for id in self.lrange(&log, "0", "-1")? {
339            self.lpop(&log)?.deref(&id)?;
340            if id == last {
341                break;
342            }
343        }
344
345        Ok(self)
346    }
347
348    fn query<T: redis::FromRedisValue>(&self, cmd: &mut redis::Cmd) -> AudisResult<T> {
349        cmd.query(&mut self.redis.get_connection()?)
350    }
351
352    fn ping(&self) -> AudisResult<&Client> {
353        self.query(&mut redis::cmd("PING"))?;
354        Ok(self)
355    }
356
357    fn lrange(&self, key: &str, a: &str, b: &str) -> AudisResult<Vec<String>> {
358        self.query(redis::cmd("LRANGE").arg(key).arg(a).arg(b))
359    }
360
361    fn smembers(&self, key: &str) -> AudisResult<Vec<String>> {
362        self.query(redis::cmd("SMEMBERS").arg(key))
363    }
364
365    fn rpush(&self, log: &str, id: &str) -> AudisResult<&Client> {
366        self.query(redis::cmd("RPUSH").arg(log).arg(id))?;
367        Ok(self)
368    }
369
370    fn lpop(&self, log: &str) -> AudisResult<&Client> {
371        self.query(redis::cmd("LPOP").arg(log))?;
372        Ok(self)
373    }
374
375    fn decr(&self, key: &str) -> AudisResult<&Client> {
376        self.query(redis::cmd("DECR").arg(key))?;
377        Ok(self)
378    }
379
380    fn incr(&self, key: &str) -> AudisResult<&Client> {
381        self.query(redis::cmd("INCR").arg(key))?;
382        Ok(self)
383    }
384
385    fn setnx(&self, key: &str, data: &str) -> AudisResult<&Client> {
386        let s: i32 = self.query(redis::cmd("SETNX").arg(key).arg(data))?;
387        if s == 1 {
388            Ok(self)
389        } else {
390            Err(redis::RedisError::from((
391                redis::ErrorKind::IoError,
392                "duplicate key detected",
393            )))
394        }
395    }
396
397    fn sadd(&self, key: &str, data: &str) -> AudisResult<&Client> {
398        self.query(redis::cmd("SADD").arg(key).arg(data))?;
399        Ok(self)
400    }
401
402    fn get(&self, key: &str) -> AudisResult<String> {
403        self.query(redis::cmd("GET").arg(key))
404    }
405
406    fn del(&self, id: &str) -> AudisResult<&Client> {
407        self.query(redis::cmd("DEL").arg(id!(id)).arg(idref!(id)))?;
408        Ok(self)
409    }
410
411    // Dereference (and possibly delete) an audit event.
412    fn deref(&self, id: &str) -> AudisResult<&Client> {
413        if self.decr(&idref!(id))?.get(&idref!(id))? == "0" {
414            self.del(id)?;
415        }
416        Ok(self)
417    }
418}