use futures_util::stream::StreamExt;
use serde::de::DeserializeOwned;
use crate::{
change_stream::{
event::ResumeToken,
session::SessionChangeStream as AsyncSessionChangeStream,
ChangeStream as AsyncChangeStream,
},
error::Result,
runtime,
};
use super::ClientSession;
pub struct ChangeStream<T>
where
T: DeserializeOwned + Unpin + Send + Sync,
{
async_stream: AsyncChangeStream<T>,
}
impl<T> ChangeStream<T>
where
T: DeserializeOwned + Unpin + Send + Sync,
{
pub(crate) fn new(async_stream: AsyncChangeStream<T>) -> Self {
Self { async_stream }
}
pub fn resume_token(&self) -> Option<ResumeToken> {
self.async_stream.resume_token()
}
pub fn with_type<D: DeserializeOwned + Unpin + Send + Sync>(self) -> ChangeStream<D> {
ChangeStream {
async_stream: self.async_stream.with_type(),
}
}
pub fn is_alive(&self) -> bool {
self.async_stream.is_alive()
}
pub fn next_if_any(&mut self) -> Result<Option<T>> {
runtime::block_on(self.async_stream.next_if_any())
}
}
impl<T> Iterator for ChangeStream<T>
where
T: DeserializeOwned + Unpin + Send + Sync,
{
type Item = Result<T>;
fn next(&mut self) -> Option<Self::Item> {
runtime::block_on(self.async_stream.next())
}
}
pub struct SessionChangeStream<T>
where
T: DeserializeOwned + Unpin,
{
async_stream: AsyncSessionChangeStream<T>,
}
impl<T> SessionChangeStream<T>
where
T: DeserializeOwned + Unpin + Send + Sync,
{
pub(crate) fn new(async_stream: AsyncSessionChangeStream<T>) -> Self {
Self { async_stream }
}
pub fn resume_token(&self) -> Option<ResumeToken> {
self.async_stream.resume_token()
}
pub fn with_type<D: DeserializeOwned + Unpin + Send + Sync>(self) -> SessionChangeStream<D> {
SessionChangeStream {
async_stream: self.async_stream.with_type(),
}
}
pub fn next(&mut self, session: &mut ClientSession) -> Result<Option<T>> {
runtime::block_on(self.async_stream.next(&mut session.async_client_session))
}
pub fn is_alive(&self) -> bool {
self.async_stream.is_alive()
}
pub fn next_if_any(&mut self, session: &mut ClientSession) -> Result<Option<T>> {
runtime::block_on(
self.async_stream
.next_if_any(&mut session.async_client_session),
)
}
}