1use std::convert::TryFrom;
18use std::fmt::{Debug, Display};
19
20use eventually_core::store::{
21 AppendError, EventStream as StoreEventStream, Expected, Persisted, Select,
22};
23use eventually_core::subscription::EventStream as SubscriberEventStream;
24
25use futures::future::BoxFuture;
26use futures::stream::{Stream, StreamExt, TryStreamExt};
27
28use lazy_static::lazy_static;
29
30use redis::streams::{StreamId, StreamRangeReply};
31use redis::{AsyncCommands, RedisError, RedisResult};
32
33use serde::{Deserialize, Serialize};
34
35pub const STREAM_PAGE_DEFAULT: usize = 128;
45
46static APPEND_TO_STORE_SOURCE: &str = std::include_str!("append_to_store.lua");
47
48lazy_static! {
49 static ref APPEND_TO_STORE_SCRIPT: redis::Script = redis::Script::new(APPEND_TO_STORE_SOURCE);
50}
51
52#[derive(Clone)]
60pub struct EventStoreBuilder {
61 client: redis::Client,
62 stream_page_size: Option<usize>,
63}
64
65impl EventStoreBuilder {
66 pub fn new(client: redis::Client) -> Self {
68 Self {
69 client,
70 stream_page_size: None,
71 }
72 }
73
74 pub fn stream_page_size(mut self, size: usize) -> Self {
81 self.stream_page_size = Some(size);
82 self
83 }
84
85 pub async fn build_store<Id, Event>(
92 &self,
93 stream_name: &'static str,
94 ) -> RedisResult<EventStore<Id, Event>> {
95 Ok(EventStore {
96 stream_name,
97 conn: self.client.get_multiplexed_async_connection().await?,
98 stream_page_size: self.stream_page_size.unwrap_or(STREAM_PAGE_DEFAULT),
99 id: std::marker::PhantomData,
100 event: std::marker::PhantomData,
101 })
102 }
103
104 pub fn build_subscriber<Id, Event>(
108 &self,
109 stream_name: &'static str,
110 ) -> EventSubscriber<Id, Event> {
111 EventSubscriber {
112 stream_name,
113 client: self.client.clone(),
114 id: std::marker::PhantomData,
115 event: std::marker::PhantomData,
116 }
117 }
118}
119
120pub type StoreResult<T> = Result<T, StoreError>;
124
125#[derive(Debug, thiserror::Error)]
131pub enum StoreError {
132 #[error("failed to encode events: {0}")]
136 EncodeEvents(#[source] serde_json::Error),
137
138 #[error("failed to decode events: {0}")]
144 DecodeEvents(#[source] serde_json::Error),
145
146 #[error("failed while reading stream from Redis: {0}")]
152 Stream(#[source] RedisError),
153
154 #[error("no key from Redis result: `{0}`")]
157 NoKey(&'static str),
158
159 #[error("failed to decode source_id from Redis entry: {0}")]
162 DecodeSourceId(#[source] anyhow::Error),
163}
164
165impl AppendError for StoreError {
166 #[inline]
167 fn is_conflict_error(&self) -> bool {
168 false
169 }
170}
171
172#[derive(Clone)]
176pub struct EventStore<Id, Event> {
177 stream_name: &'static str,
178 conn: redis::aio::MultiplexedConnection,
179 stream_page_size: usize,
180 id: std::marker::PhantomData<Id>,
181 event: std::marker::PhantomData<Event>,
182}
183
184impl<Id, Event> eventually_core::store::EventStore for EventStore<Id, Event>
185where
186 Id: TryFrom<String> + Display + Eq + Clone + Send + Sync,
187 <Id as TryFrom<String>>::Error: std::error::Error + Send + Sync + 'static,
188 Event: Serialize + Send + Sync,
189 for<'de> Event: Deserialize<'de>,
190{
191 type SourceId = Id;
192 type Event = Event;
193 type Error = StoreError;
194
195 fn append(
196 &mut self,
197 id: Self::SourceId,
198 version: Expected,
199 events: Vec<Self::Event>,
200 ) -> BoxFuture<StoreResult<u32>> {
201 let fut = async move {
202 let events = events
203 .iter()
204 .map(serde_json::to_string)
205 .collect::<Result<Vec<_>, _>>()
206 .map_err(StoreError::EncodeEvents)?;
207
208 Ok(APPEND_TO_STORE_SCRIPT
209 .key(self.stream_name)
210 .key(id.to_string())
211 .arg(match version {
212 Expected::Any => -1,
213 Expected::Exact(v) => v as i64,
214 })
215 .arg(events)
216 .invoke_async(&mut self.conn)
217 .await
218 .unwrap())
219 };
220
221 Box::pin(fut)
222 }
223
224 fn stream(
225 &self,
226 id: Self::SourceId,
227 select: Select,
228 ) -> BoxFuture<StoreResult<StoreEventStream<Self>>> {
229 let fut = async move {
230 let stream_name = format!("{}.{}", self.stream_name, id.to_string());
231
232 let paginator = RedisPaginatedStream {
233 conn: self.conn.clone(),
234 stream_name,
235 page_size: self.stream_page_size,
236 from: match select {
237 Select::All => 0,
238 Select::From(v) => v as usize,
239 },
240 };
241
242 Ok(paginator
243 .into_stream()
244 .map_err(StoreError::Stream)
245 .map(move |res| res.map(|v| (id.clone(), v)))
246 .and_then(move |(id, entry)| async move {
247 let event: Vec<u8> = entry
248 .get("event")
249 .ok_or_else(|| StoreError::NoKey("event"))?;
250 let event: Event =
251 serde_json::from_slice(&event).map_err(StoreError::DecodeEvents)?;
252
253 let (version, sequence_number) = parse_entry_id(&entry.id);
254
255 Ok(Persisted::from(id, event)
256 .sequence_number(sequence_number as u32)
257 .version(version as u32))
258 })
259 .boxed())
260 };
261
262 Box::pin(fut)
263 }
264
265 fn stream_all(&self, select: Select) -> BoxFuture<StoreResult<StoreEventStream<Self>>> {
266 let fut = async move {
267 let paginator = RedisPaginatedStream {
268 conn: self.conn.clone(),
269 stream_name: self.stream_name.to_owned(),
270 page_size: self.stream_page_size,
271 from: match select {
272 Select::All => 0,
273 Select::From(v) => v as usize,
274 },
275 };
276
277 Ok(paginator
278 .into_stream()
279 .map_err(StoreError::Stream)
280 .and_then(|entry| async move {
281 let source_id: String = entry
282 .get("source_id")
283 .ok_or_else(|| StoreError::NoKey("source_id"))?;
284
285 let source_id: Id = Id::try_from(source_id)
286 .map_err(anyhow::Error::from)
287 .map_err(StoreError::DecodeSourceId)?;
288
289 let event: Vec<u8> = entry
290 .get("event")
291 .ok_or_else(|| StoreError::NoKey("event"))?;
292 let event: Event =
293 serde_json::from_slice(&event).map_err(StoreError::DecodeEvents)?;
294
295 let (sequence_number, version) = parse_entry_id(&entry.id);
296
297 Ok(Persisted::from(source_id, event)
298 .sequence_number(sequence_number as u32)
299 .version(version as u32))
300 })
301 .boxed())
302 };
303
304 Box::pin(fut)
305 }
306
307 fn remove(&mut self, _id: Self::SourceId) -> BoxFuture<StoreResult<()>> {
308 unimplemented!()
309 }
310}
311
312pub type SubscriberResult<T> = Result<T, SubscriberError>;
316
317#[derive(Debug, thiserror::Error)]
323pub enum SubscriberError {
324 #[error("failed to establish connection with Redis: {0}")]
329 Connection(#[source] RedisError),
330
331 #[error("failed to get payload from message: {0}")]
333 Payload(#[source] RedisError),
334
335 #[error("failed to decode published message: {0}")]
338 DecodeMessage(#[source] serde_json::Error),
339
340 #[error("failed to subscriber to stream events: {0}")]
343 Subscribe(#[source] RedisError),
344
345 #[error("failed to decode source_id from published message: {0}")]
348 DecodeSourceId(#[source] anyhow::Error),
349}
350
351pub struct EventSubscriber<Id, Event> {
355 stream_name: &'static str,
356 client: redis::Client,
357 id: std::marker::PhantomData<Id>,
358 event: std::marker::PhantomData<Event>,
359}
360
361impl<Id, Event> eventually_core::subscription::EventSubscriber for EventSubscriber<Id, Event>
362where
363 Id: TryFrom<String> + Eq + Send + Sync,
364 <Id as TryFrom<String>>::Error: std::error::Error + Send + Sync + 'static,
365 Event: Send + Sync,
366 for<'de> Event: Deserialize<'de>,
367{
368 type SourceId = Id;
369 type Event = Event;
370 type Error = SubscriberError;
371
372 fn subscribe_all(&self) -> BoxFuture<SubscriberResult<SubscriberEventStream<Self>>> {
373 #[derive(Deserialize)]
374 struct SubscribeMessage<Event> {
375 source_id: String,
376 sequence_number: u32,
377 version: u32,
378 event: Event,
379 }
380
381 let fut = async move {
382 let mut pubsub = self
383 .client
384 .get_async_connection()
385 .await
386 .map_err(SubscriberError::Connection)?
387 .into_pubsub();
388
389 pubsub
390 .subscribe(self.stream_name)
391 .await
392 .map_err(SubscriberError::Subscribe)?;
393
394 Ok(pubsub
395 .into_on_message()
396 .map(|msg| msg.get_payload::<Vec<u8>>())
397 .map_err(SubscriberError::Payload)
398 .and_then(|payload| async move {
399 let msg: SubscribeMessage<Event> =
400 serde_json::from_slice(&payload).map_err(SubscriberError::DecodeMessage)?;
401
402 let source_id = Id::try_from(msg.source_id)
403 .map_err(anyhow::Error::from)
404 .map_err(SubscriberError::DecodeSourceId)?;
405
406 Ok(Persisted::from(source_id, msg.event)
407 .sequence_number(msg.sequence_number)
408 .version(msg.version))
409 })
410 .boxed())
411 };
412
413 Box::pin(fut)
414 }
415}
416
417struct RedisPaginatedStream {
421 conn: redis::aio::MultiplexedConnection,
422 stream_name: String,
423 page_size: usize,
424 from: usize,
425}
426
427impl RedisPaginatedStream {
428 fn into_stream(mut self) -> impl Stream<Item = RedisResult<StreamId>> + 'static {
442 async_stream::try_stream! {
443 let mut from = self.from;
444
445 loop {
446 let result: StreamRangeReply = self.conn.xrange_count(&self.stream_name, from, "+", self.page_size).await?;
447 let ids = result.ids;
448 let size = ids.len();
449
450 for id in ids {
451 from = parse_entry_id(&id.id).0 + 1;
452 yield id;
453 }
454
455 if size < self.page_size {
456 break;
457 }
458 }
459 }
460 }
461}
462
463fn parse_entry_id(id: &str) -> (usize, usize) {
464 let parts: Vec<&str> = id.split('-').collect();
465 (parts[0].parse().unwrap(), parts[1].parse().unwrap())
466}