use futures::Stream;
use intrepid_core::Frame;
use tower::Service;
use super::{EventLog, EventRepo, EventRepoError, IntoEvent};
pub struct EventConnection<Repo> {
pub repo: Repo,
pub own_stream_name: String,
pub target_stream_name: String,
}
impl<Repo> EventConnection<Repo>
where
Repo: EventRepo + Send + Sync + 'static,
{
pub fn new(
repo: Repo,
target_stream_name: impl AsRef<str>,
own_stream_name: impl AsRef<str>,
) -> Self {
Self {
repo,
own_stream_name: own_stream_name.as_ref().to_owned(),
target_stream_name: target_stream_name.as_ref().to_owned(),
}
}
pub fn into_stream(self) -> impl Stream<Item = Result<Frame, EventRepoError>> {
async_stream::try_stream! {
while let events = self.pending_events().await? {
for event in events {
yield event.into();
}
}
}
}
pub async fn publish<EventCandidate>(&self, event: EventCandidate) -> Result<(), EventRepoError>
where
EventCandidate: IntoEvent + Send,
{
self.repo.publish(event).await
}
pub async fn process<FrameService>(
&self,
mut service: FrameService,
) -> Result<(), EventRepoError>
where
FrameService: Service<Frame> + Send,
{
let events = self.pending_events().await?;
if let Some(last_message) = events.last() {
for event in events.clone() {
let _ = service.call(event.into()).await;
}
self.repo
.log_last_read(
self.target_stream_name.clone(),
self.own_stream_name.clone(),
last_message.position,
)
.await?;
}
Ok(())
}
async fn pending_events(&self) -> Result<EventLog, EventRepoError> {
let last_read = self
.repo
.last_read_position(self.own_stream_name.clone())
.await;
let events = self
.repo
.entries_since_position(self.target_stream_name.clone(), last_read)
.await?;
if let Some(message) = events.last() {
self.repo
.log_last_read(
self.target_stream_name.clone(),
self.own_stream_name.clone(),
message.position,
)
.await?;
}
Ok(events)
}
}