use intrepid_core::Frame;
use tower::Service;
use crate::{Event, EventKind, EventLog, EventRepo, EventRepoError};
use super::{
SubscriptionMarker, SubscriptionName, SubscriptionStream, SubscriptionTopic, SubscriptionWorker,
};
#[derive(Clone, Debug)]
pub struct Subscription<Repo> {
pub repo: Repo,
pub id: uuid::Uuid,
pub name: SubscriptionName,
pub target_stream: SubscriptionName,
}
impl<Repo> Subscription<Repo>
where
Repo: EventRepo + Clone,
{
pub fn init(repo: &Repo, name: impl SubscriptionTopic) -> Result<Self, String> {
let id = uuid::Uuid::new_v4();
Self::resume(repo, id, name)
}
pub fn resume(
repo: &Repo,
id: uuid::Uuid,
name: impl SubscriptionTopic,
) -> Result<Self, String> {
let target_stream = name.stream_name();
let own_name: SubscriptionName = format!("{target_stream}-{id}").parse()?;
Ok(Self {
name: own_name.clone(),
target_stream,
repo: repo.clone(),
id,
})
}
pub fn stream(&self) -> SubscriptionStream
where
Repo: Clone + Send + Sync + 'static,
{
SubscriptionStream::new(self.clone())
}
pub fn worker<FrameService>(
&self,
service: FrameService,
) -> Result<SubscriptionWorker, EventRepoError>
where
Repo: Clone + Send + Sync + 'static,
FrameService: Service<Frame> + Send + Sync + 'static,
FrameService::Future: Send,
<FrameService as Service<Frame>>::Response: Into<Frame>,
<FrameService as Service<Frame>>::Error: Into<Frame>,
{
Ok(SubscriptionWorker::new(self.clone(), service))
}
pub async fn call_with_service<FrameService>(
&self,
mut service: FrameService,
) -> Result<(), EventRepoError>
where
Repo: Clone + Send + Sync + 'static,
FrameService: Service<Frame> + Send + Sync + 'static,
FrameService::Future: Send,
{
for event in self.poll_events().await? {
let _ = service.call(event.into()).await;
}
Ok(())
}
pub async fn last_read_position(&self) -> i64 {
match self.repo.last(&self.name, EventKind::Marker).await {
Some(message) => {
let marker = serde_json::from_slice::<SubscriptionMarker>(&message.data).unwrap();
marker.position
}
None => -1_i64,
}
}
pub async fn pending_events(&self) -> Result<EventLog, EventRepoError> {
let last_read = self.last_read_position().await;
let events = self
.repo
.entries_since_position(&self.target_stream, last_read)
.await?;
Ok(events)
}
pub async fn poll_events(&self) -> Result<EventLog, EventRepoError>
where
Repo: Clone + Send + Sync + 'static,
{
let events = self.pending_events().await?;
if let Some(last_message) = events.last() {
self.log_last_read(last_message.position).await?;
return Ok(events);
}
Ok(vec![])
}
pub async fn log_last_read(&self, position: i64) -> Result<(), EventRepoError> {
let marker = Event::read_marker(&self.name, &self.target_stream, position);
self.repo.publish(marker).await?;
Ok(())
}
}