1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111
use futures::Stream;
use intrepid_core::Frame;
use tower::Service;
use super::{EventLog, EventRepo, EventRepoError, IntoEvent};
/// A connection to an event repo. Event connections are meant to represent long running
/// or repeated access to a log of events. They keep track of the last event they read
/// and will only read events that have occurred since that event. This allows them to
/// be used in a streaming context, where they can be run in the background or polled
/// for new events.
pub struct EventConnection<Repo> {
/// The underlying repository.
pub repo: Repo,
/// The name of the stream that this connection marks reads in. This is essentially
/// the identity of the connection. Changing this will cause the connection to
/// read from the beginning of the target stream.
pub own_stream_name: String,
/// The name of the stream that this connection is reading from. The connection will
/// ignore events from other streams.
pub target_stream_name: String,
}
impl<Repo> EventConnection<Repo>
where
Repo: EventRepo + Send + Sync + 'static,
{
/// Create a new connection.
pub fn new(
repo: Repo,
target_stream_name: impl AsRef<str>,
own_stream_name: impl AsRef<str>,
) -> Self {
Self {
repo,
own_stream_name: own_stream_name.as_ref().to_owned(),
target_stream_name: target_stream_name.as_ref().to_owned(),
}
}
/// Consume the connection and return an async stream of events.
pub fn into_stream(self) -> impl Stream<Item = Result<Frame, EventRepoError>> {
async_stream::try_stream! {
while let events = self.pending_events().await? {
for event in events {
yield event.into();
}
}
}
}
/// Publish an event to the target stream.
pub async fn publish<EventCandidate>(&self, event: EventCandidate) -> Result<(), EventRepoError>
where
EventCandidate: IntoEvent + Send,
{
self.repo.publish(event).await
}
/// Process all pending events in the target stream.
pub async fn process<FrameService>(
&self,
mut service: FrameService,
) -> Result<(), EventRepoError>
where
FrameService: Service<Frame> + Send,
{
let events = self.pending_events().await?;
if let Some(last_message) = events.last() {
for event in events.clone() {
let _ = service.call(event.into()).await;
}
self.repo
.log_last_read(
self.target_stream_name.clone(),
self.own_stream_name.clone(),
last_message.position,
)
.await?;
}
Ok(())
}
/// Get all pending events in the target stream.
async fn pending_events(&self) -> Result<EventLog, EventRepoError> {
let last_read = self
.repo
.last_read_position(self.own_stream_name.clone())
.await;
let events = self
.repo
.entries_since_position(self.target_stream_name.clone(), last_read)
.await?;
if let Some(message) = events.last() {
self.repo
.log_last_read(
self.target_stream_name.clone(),
self.own_stream_name.clone(),
message.position,
)
.await?;
}
Ok(events)
}
}