1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
#[macro_use]
extern crate bitflags;
extern crate libc;
extern crate time;

#[macro_use]
mod macros;

pub mod cell;
pub mod error;
mod redis;

use cell::store;
use error::CellError;
use libc::c_int;
use redis::Command;
use redis::raw;

const MODULE_NAME: &str = "redis-cell";
const MODULE_VERSION: c_int = 1;

// ThrottleCommand provides GCRA rate limiting as a command in Redis.
struct ThrottleCommand {}

impl Command for ThrottleCommand {
    // Should return the name of the command to be registered.
    fn name(&self) -> &'static str {
        "cl.throttle"
    }

    // Run the command.
    fn run(&self, r: redis::Redis, args: &[&str]) -> Result<(), CellError> {
        if args.len() != 5 && args.len() != 6 {
            return Err(error!(
                "Usage: {} <key> <max_burst> <count per period> \
                 <period> [<quantity>]",
                self.name()
            ));
        }

        // the first argument is command name "cl.throttle" (ignore it)
        let key = args[1];
        let max_burst = parse_i64(args[2])?;
        let count = parse_i64(args[3])?;
        let period = parse_i64(args[4])?;
        let quantity = match args.get(5) {
            Some(n) => parse_i64(n)?,
            None => 1,
        };

        // We reinitialize a new store and rate limiter every time this command
        // is run, but these structures don't have a huge overhead to them so
        // it's not that big of a problem.
        let mut store = store::InternalRedisStore::new(&r);
        let rate = cell::Rate::per_period(count, time::Duration::seconds(period));
        let mut limiter = cell::RateLimiter::new(
            &mut store,
            &cell::RateQuota {
                max_burst,
                max_rate: rate,
            },
        );

        let (throttled, rate_limit_result) = limiter.rate_limit(key, quantity)?;

        // Reply with an array containing rate limiting results. Note that
        // Redis' support for interesting data types is quite weak, so we have
        // to jam a few square pegs into round holes. It's a little messy, but
        // the interface comes out as pretty workable.
        r.reply_array(5)?;
        r.reply_integer(if throttled { 1 } else { 0 })?;
        r.reply_integer(rate_limit_result.limit)?;
        r.reply_integer(rate_limit_result.remaining)?;
        r.reply_integer(rate_limit_result.retry_after.num_seconds())?;
        r.reply_integer(rate_limit_result.reset_after.num_seconds())?;

        Ok(())
    }

    // Should return any flags to be registered with the name as a string
    // separated list. See the Redis module API documentation for a complete
    // list of the ones that are available.
    fn str_flags(&self) -> &'static str {
        "write"
    }
}

#[allow(non_snake_case)]
#[allow(unused_variables)]
#[no_mangle]
pub extern "C" fn Throttle_RedisCommand(
    ctx: *mut raw::RedisModuleCtx,
    argv: *mut *mut raw::RedisModuleString,
    argc: c_int,
) -> raw::Status {
    Command::harness(&ThrottleCommand {}, ctx, argv, argc)
}

#[allow(non_snake_case)]
#[allow(unused_variables)]
#[no_mangle]
pub extern "C" fn RedisModule_OnLoad(
    ctx: *mut raw::RedisModuleCtx,
    argv: *mut *mut raw::RedisModuleString,
    argc: c_int,
) -> raw::Status {
    if raw::init(
        ctx,
        format!("{}\0", MODULE_NAME).as_ptr(),
        MODULE_VERSION,
        raw::REDISMODULE_APIVER_1,
    ) == raw::Status::Err
    {
        return raw::Status::Err;
    }

    let command = ThrottleCommand {};
    if raw::create_command(
        ctx,
        format!("{}\0", command.name()).as_ptr(),
        Some(Throttle_RedisCommand),
        format!("{}\0", command.str_flags()).as_ptr(),
        0,
        0,
        0,
    ) == raw::Status::Err
    {
        return raw::Status::Err;
    }

    raw::Status::Ok
}

fn parse_i64(arg: &str) -> Result<i64, CellError> {
    arg.parse::<i64>()
        .map_err(|_| error!("Couldn't parse as integer: {}", arg))
}