use futures::Stream;
use intrepid_core::Frame;
use std::{str::FromStr, time::Duration};
use crate::EventRepo;
use super::{Subscription, SubscriptionName, SubscriptionWorker};
#[derive(Clone)]
pub struct SubscriptionConfig {
pub id: uuid::Uuid,
pub stream_name: SubscriptionName,
pub frequency: Duration,
}
impl SubscriptionConfig {
pub fn new(id: uuid::Uuid, stream_name: SubscriptionName, frequency: Duration) -> Self {
Self {
id,
stream_name,
frequency,
}
}
pub fn stream_id(&self) -> String {
format!(
"{id}@{stream_name}",
id = self.id,
stream_name = self.stream_name
)
}
pub fn subscription<Repo>(&self, repo: &Repo) -> Result<Subscription<Repo>, String>
where
Repo: EventRepo + Clone + Send + Sync + 'static,
{
Subscription::resume(repo, self.id, self.stream_name.clone())
}
pub fn stream<Repo>(&self, repo: &Repo) -> impl Stream<Item = Frame>
where
Repo: EventRepo + Clone + Send + Sync + 'static,
{
let stream = self.subscription(repo).unwrap().stream();
tokio_stream::StreamExt::throttle(stream, self.frequency)
}
pub fn worker(&self, worker: SubscriptionWorker) -> impl Stream<Item = Frame> {
tokio_stream::StreamExt::throttle(worker, self.frequency)
}
fn from_str<T: Into<SubscriptionName>>(stream_name: T) -> Self {
let id = uuid::Uuid::from_str("0588c89e-b8e3-48af-ab2b-3a8d1961dcef").unwrap();
Self::new(id, stream_name.into(), Duration::from_secs(1))
}
}
impl<T: AsRef<str>> From<T> for SubscriptionConfig {
fn from(inner: T) -> Self {
Self::from_str(SubscriptionName::parse_string(inner.as_ref()).unwrap())
}
}