use std::convert::TryFrom;
use std::fmt::{Debug, Display};
use eventually_core::store::{
AppendError, EventStream as StoreEventStream, Expected, Persisted, Select,
};
use eventually_core::subscription::EventStream as SubscriberEventStream;
use futures::future::BoxFuture;
use futures::stream::{Stream, StreamExt, TryStreamExt};
use lazy_static::lazy_static;
use redis::streams::{StreamId, StreamRangeReply};
use redis::{AsyncCommands, RedisError, RedisResult};
use serde::{Deserialize, Serialize};
pub const STREAM_PAGE_DEFAULT: usize = 128;
static APPEND_TO_STORE_SOURCE: &str = std::include_str!("append_to_store.lua");
lazy_static! {
static ref APPEND_TO_STORE_SCRIPT: redis::Script = redis::Script::new(APPEND_TO_STORE_SOURCE);
}
#[derive(Clone)]
pub struct EventStoreBuilder {
client: redis::Client,
stream_page_size: Option<usize>,
}
impl EventStoreBuilder {
pub fn new(client: redis::Client) -> Self {
Self {
client,
stream_page_size: None,
}
}
pub fn stream_page_size(mut self, size: usize) -> Self {
self.stream_page_size = Some(size);
self
}
pub async fn build_store<Id, Event>(
&self,
stream_name: &'static str,
) -> RedisResult<EventStore<Id, Event>> {
Ok(EventStore {
stream_name,
conn: self.client.get_multiplexed_async_connection().await?,
stream_page_size: self.stream_page_size.unwrap_or(STREAM_PAGE_DEFAULT),
id: std::marker::PhantomData,
event: std::marker::PhantomData,
})
}
pub fn build_subscriber<Id, Event>(
&self,
stream_name: &'static str,
) -> EventSubscriber<Id, Event> {
EventSubscriber {
stream_name,
client: self.client.clone(),
id: std::marker::PhantomData,
event: std::marker::PhantomData,
}
}
}
pub type StoreResult<T> = Result<T, StoreError>;
#[derive(Debug, thiserror::Error)]
pub enum StoreError {
#[error("failed to encode events: {0}")]
EncodeEvents(#[source] serde_json::Error),
#[error("failed to decode events: {0}")]
DecodeEvents(#[source] serde_json::Error),
#[error("failed while reading stream from Redis: {0}")]
Stream(#[source] RedisError),
#[error("no key from Redis result: `{0}`")]
NoKey(&'static str),
#[error("failed to decode source_id from Redis entry: {0}")]
DecodeSourceId(#[source] anyhow::Error),
}
impl AppendError for StoreError {
#[inline]
fn is_conflict_error(&self) -> bool {
false
}
}
#[derive(Clone)]
pub struct EventStore<Id, Event> {
stream_name: &'static str,
conn: redis::aio::MultiplexedConnection,
stream_page_size: usize,
id: std::marker::PhantomData<Id>,
event: std::marker::PhantomData<Event>,
}
impl<Id, Event> eventually_core::store::EventStore for EventStore<Id, Event>
where
Id: TryFrom<String> + Display + Eq + Clone + Send + Sync,
<Id as TryFrom<String>>::Error: std::error::Error + Send + Sync + 'static,
Event: Serialize + Send + Sync,
for<'de> Event: Deserialize<'de>,
{
type SourceId = Id;
type Event = Event;
type Error = StoreError;
fn append(
&mut self,
id: Self::SourceId,
version: Expected,
events: Vec<Self::Event>,
) -> BoxFuture<StoreResult<u32>> {
let fut = async move {
let events = events
.iter()
.map(serde_json::to_string)
.collect::<Result<Vec<_>, _>>()
.map_err(StoreError::EncodeEvents)?;
Ok(APPEND_TO_STORE_SCRIPT
.key(self.stream_name)
.key(id.to_string())
.arg(match version {
Expected::Any => -1,
Expected::Exact(v) => v as i64,
})
.arg(events)
.invoke_async(&mut self.conn)
.await
.unwrap())
};
Box::pin(fut)
}
fn stream(
&self,
id: Self::SourceId,
select: Select,
) -> BoxFuture<StoreResult<StoreEventStream<Self>>> {
let fut = async move {
let stream_name = format!("{}.{}", self.stream_name, id.to_string());
let paginator = RedisPaginatedStream {
conn: self.conn.clone(),
stream_name,
page_size: self.stream_page_size,
from: match select {
Select::All => 0,
Select::From(v) => v as usize,
},
};
Ok(paginator
.into_stream()
.map_err(StoreError::Stream)
.map(move |res| res.map(|v| (id.clone(), v)))
.and_then(move |(id, entry)| async move {
let event: Vec<u8> = entry
.get("event")
.ok_or_else(|| StoreError::NoKey("event"))?;
let event: Event =
serde_json::from_slice(&event).map_err(StoreError::DecodeEvents)?;
let (version, sequence_number) = parse_entry_id(&entry.id);
Ok(Persisted::from(id, event)
.sequence_number(sequence_number as u32)
.version(version as u32))
})
.boxed())
};
Box::pin(fut)
}
fn stream_all(&self, select: Select) -> BoxFuture<StoreResult<StoreEventStream<Self>>> {
let fut = async move {
let paginator = RedisPaginatedStream {
conn: self.conn.clone(),
stream_name: self.stream_name.to_owned(),
page_size: self.stream_page_size,
from: match select {
Select::All => 0,
Select::From(v) => v as usize,
},
};
Ok(paginator
.into_stream()
.map_err(StoreError::Stream)
.and_then(|entry| async move {
let source_id: String = entry
.get("source_id")
.ok_or_else(|| StoreError::NoKey("source_id"))?;
let source_id: Id = Id::try_from(source_id)
.map_err(anyhow::Error::from)
.map_err(StoreError::DecodeSourceId)?;
let event: Vec<u8> = entry
.get("event")
.ok_or_else(|| StoreError::NoKey("event"))?;
let event: Event =
serde_json::from_slice(&event).map_err(StoreError::DecodeEvents)?;
let (sequence_number, version) = parse_entry_id(&entry.id);
Ok(Persisted::from(source_id, event)
.sequence_number(sequence_number as u32)
.version(version as u32))
})
.boxed())
};
Box::pin(fut)
}
fn remove(&mut self, _id: Self::SourceId) -> BoxFuture<StoreResult<()>> {
unimplemented!()
}
}
pub type SubscriberResult<T> = Result<T, SubscriberError>;
#[derive(Debug, thiserror::Error)]
pub enum SubscriberError {
#[error("failed to establish connection with Redis: {0}")]
Connection(#[source] RedisError),
#[error("failed to get payload from message: {0}")]
Payload(#[source] RedisError),
#[error("failed to decode published message: {0}")]
DecodeMessage(#[source] serde_json::Error),
#[error("failed to subscriber to stream events: {0}")]
Subscribe(#[source] RedisError),
#[error("failed to decode source_id from published message: {0}")]
DecodeSourceId(#[source] anyhow::Error),
}
pub struct EventSubscriber<Id, Event> {
stream_name: &'static str,
client: redis::Client,
id: std::marker::PhantomData<Id>,
event: std::marker::PhantomData<Event>,
}
impl<Id, Event> eventually_core::subscription::EventSubscriber for EventSubscriber<Id, Event>
where
Id: TryFrom<String> + Eq + Send + Sync,
<Id as TryFrom<String>>::Error: std::error::Error + Send + Sync + 'static,
Event: Send + Sync,
for<'de> Event: Deserialize<'de>,
{
type SourceId = Id;
type Event = Event;
type Error = SubscriberError;
fn subscribe_all(&self) -> BoxFuture<SubscriberResult<SubscriberEventStream<Self>>> {
#[derive(Deserialize)]
struct SubscribeMessage<Event> {
source_id: String,
sequence_number: u32,
version: u32,
event: Event,
}
let fut = async move {
let mut pubsub = self
.client
.get_async_connection()
.await
.map_err(SubscriberError::Connection)?
.into_pubsub();
pubsub
.subscribe(self.stream_name)
.await
.map_err(SubscriberError::Subscribe)?;
Ok(pubsub
.into_on_message()
.map(|msg| msg.get_payload::<Vec<u8>>())
.map_err(SubscriberError::Payload)
.and_then(|payload| async move {
let msg: SubscribeMessage<Event> =
serde_json::from_slice(&payload).map_err(SubscriberError::DecodeMessage)?;
let source_id = Id::try_from(msg.source_id)
.map_err(anyhow::Error::from)
.map_err(SubscriberError::DecodeSourceId)?;
Ok(Persisted::from(source_id, msg.event)
.sequence_number(msg.sequence_number)
.version(msg.version))
})
.boxed())
};
Box::pin(fut)
}
}
struct RedisPaginatedStream {
conn: redis::aio::MultiplexedConnection,
stream_name: String,
page_size: usize,
from: usize,
}
impl RedisPaginatedStream {
fn into_stream(mut self) -> impl Stream<Item = RedisResult<StreamId>> + 'static {
async_stream::try_stream! {
let mut from = self.from;
loop {
let result: StreamRangeReply = self.conn.xrange_count(&self.stream_name, from, "+", self.page_size).await?;
let ids = result.ids;
let size = ids.len();
for id in ids {
from = parse_entry_id(&id.id).0 + 1;
yield id;
}
if size < self.page_size {
break;
}
}
}
}
}
fn parse_entry_id(id: &str) -> (usize, usize) {
let parts: Vec<&str> = id.split('-').collect();
(parts[0].parse().unwrap(), parts[1].parse().unwrap())
}