use std::borrow::Cow;
use std::error::Error as StdError;
use std::io::{Error as IoError, ErrorKind as IoErrorKind};
use std::sync::{Arc, Mutex};
use std::time::Duration;
use crate::{Entry, Error, Store, SubscribeableStore, Subscription};
use byteorder::{ByteOrder, LittleEndian};
use redis::streams::{StreamId, StreamMaxlen, StreamRangeReply, StreamReadOptions, StreamReadReply};
use redis::{Client, Cmd, Commands, Connection, ConnectionLike, IntoConnectionInfo, RedisError, Value};
use string_cache::DefaultAtom as Atom;
static STREAM_READ_BLOCK_MS: usize = 1000;
static CONN_POOL_MAX_COUNT: usize = 4;
impl From<RedisError> for Error {
fn from(err: RedisError) -> Self {
Error::Database(Box::new(err))
}
}
fn redis_channel(name: &Atom) -> String {
format!("binlog:stream:v0:{}", name)
}
fn invalid_data_err<E: Into<Box<dyn StdError + Send + Sync>>>(msg: E) -> Error {
IoError::new(IoErrorKind::InvalidData, msg).into()
}
fn unexpected_data_format() -> Error {
invalid_data_err("unexpected data format received from redis")
}
fn entry_from_stream_id(stream_id: &StreamId, name: Atom) -> Result<Entry, Error> {
let (timestamp, value) = match (stream_id.map.get("timestamp"), stream_id.map.get("value")) {
(Some(Value::Data(timestamp_bytes)), Some(Value::Data(value_bytes))) => {
(LittleEndian::read_i64(timestamp_bytes), value_bytes)
}
_ => {
return Err(unexpected_data_format());
}
};
Ok(Entry::new_with_timestamp(timestamp, name, value.clone()))
}
#[derive(Clone)]
pub struct RedisStreamStore {
client: Client,
conn_pool: Arc<Mutex<Vec<Connection>>>,
}
impl RedisStreamStore {
pub fn new_with_client(client: Client) -> Self {
Self {
client,
conn_pool: Arc::new(Mutex::new(Vec::default())),
}
}
pub fn new<T: IntoConnectionInfo>(params: T) -> Result<Self, Error> {
Ok(Self::new_with_client(Client::open(params)?))
}
fn with_connection<T, F>(&self, f: F) -> Result<T, Error>
where
F: FnOnce(&mut Connection) -> Result<T, Error>,
{
let mut conn = {
let mut conn_pool = self.conn_pool.lock().unwrap();
if let Some(conn) = conn_pool.pop() {
conn
} else {
self.client.get_connection()?
}
};
let result = f(&mut conn)?;
let mut conn_pool = self.conn_pool.lock().unwrap();
if conn_pool.len() < CONN_POOL_MAX_COUNT {
conn_pool.push(conn);
}
Ok(result)
}
}
impl Store for RedisStreamStore {
fn push(&self, entry: Cow<Entry>) -> Result<(), Error> {
let channel = redis_channel(&entry.name);
let mut timestamp_bytes = [0; 8];
LittleEndian::write_i64(&mut timestamp_bytes, entry.timestamp);
let cmd = Cmd::xadd_maxlen(
channel,
StreamMaxlen::Equals(1),
"*",
&[
("timestamp", timestamp_bytes.as_slice()),
("value", entry.value.as_slice()),
],
);
self.with_connection(|conn| {
conn.req_command(&cmd)?;
Ok(())
})
}
fn latest<A: Into<Atom>>(&self, name: A) -> Result<Option<Entry>, Error> {
let name = name.into();
let channel = redis_channel(&name);
let reply: StreamRangeReply = self.with_connection(move |conn| {
let value = conn.xrevrange_count(channel, "+", "-", 1i8)?;
Ok(value)
})?;
debug_assert!(reply.ids.len() <= 1);
if reply.ids.is_empty() {
Ok(None)
} else {
match entry_from_stream_id(&reply.ids[0], name) {
Ok(entry) => Ok(Some(entry)),
Err(err) => Err(err),
}
}
}
}
impl SubscribeableStore for RedisStreamStore {
type Subscription = RedisStreamSubscription;
fn subscribe<A: Into<Atom>>(&self, name: A) -> Result<Self::Subscription, Error> {
let conn = self.client.get_connection()?;
Ok(RedisStreamSubscription::new(conn, name.into()))
}
}
pub struct RedisStreamSubscription {
conn: Connection,
name: Atom,
last_id: String,
}
impl RedisStreamSubscription {
fn new(conn: Connection, name: Atom) -> Self {
RedisStreamSubscription {
conn,
name,
last_id: "0".to_string(),
}
}
}
impl Subscription for RedisStreamSubscription {
fn next(&mut self, timeout: Option<Duration>) -> Result<Option<Entry>, Error> {
let channels = vec![redis_channel(&self.name)];
let opts = StreamReadOptions::default().block(match timeout {
Some(timeout) => timeout.as_millis().try_into().unwrap(),
None => STREAM_READ_BLOCK_MS,
});
loop {
let reply: StreamReadReply = self.conn.xread_options(&channels, &[&self.last_id], &opts)?;
if let Some(stream_key) = reply.keys.into_iter().next() {
if let Some(stream_id) = stream_key.ids.into_iter().next() {
let value = entry_from_stream_id(&stream_id, self.name.clone())?;
self.last_id = stream_id.id;
return Ok(Some(value));
}
}
if timeout.is_some() {
return Ok(None);
}
}
}
}
#[cfg(test)]
mod tests {
use crate::{define_test, test_store_impl, test_subscribeable_store_impl, RedisStreamStore};
test_store_impl!(RedisStreamStore::new("redis://localhost:6379").unwrap());
test_subscribeable_store_impl!(RedisStreamStore::new("redis://localhost:6379").unwrap());
}
#[cfg(test)]
#[cfg(feature = "benches")]
mod benches {
use crate::{bench_store_impl, define_bench, RedisStreamStore};
bench_store_impl!(RedisStreamStore::new("redis://localhost:6379").unwrap());
}