use serde::de::DeserializeOwned;
use crate::{error::Result, ClientSession, SessionCursor};
use super::{
event::{ChangeStreamEvent, ResumeToken},
ChangeStreamData,
WatchArgs,
};
pub struct SessionChangeStream<T>
where
T: DeserializeOwned + Unpin,
{
inner: CursorWrapper,
_marker: std::marker::PhantomData<fn() -> T>,
}
type CursorWrapper = super::common::CursorWrapper<SessionCursor<()>>;
impl<T> SessionChangeStream<T>
where
T: DeserializeOwned + Unpin + Send + Sync,
{
pub(crate) fn new(cursor: SessionCursor<()>, args: WatchArgs, data: ChangeStreamData) -> Self {
Self {
inner: CursorWrapper::new(cursor, args, data),
_marker: std::marker::PhantomData,
}
}
pub fn resume_token(&self) -> Option<ResumeToken> {
self.inner.data.resume_token.clone()
}
pub fn with_type<D: DeserializeOwned + Unpin + Send + Sync>(self) -> SessionChangeStream<D> {
SessionChangeStream {
inner: self.inner,
_marker: std::marker::PhantomData,
}
}
pub async fn next(&mut self, session: &mut ClientSession) -> Result<Option<T>> {
loop {
let maybe_next = self.next_if_any(session).await?;
match maybe_next {
Some(t) => return Ok(Some(t)),
None if self.is_alive() => continue,
None => return Ok(None),
}
}
}
pub fn is_alive(&self) -> bool {
!self.inner.cursor.raw().is_exhausted()
}
pub async fn next_if_any(&mut self, session: &mut ClientSession) -> Result<Option<T>> {
self.inner.next_if_any(session).await
}
}
impl super::common::InnerCursor for SessionCursor<()> {
type Session = ClientSession;
async fn try_advance(&mut self, session: &mut Self::Session) -> Result<bool> {
self.try_advance(session).await
}
fn get_resume_token(&self) -> Result<Option<ResumeToken>> {
super::common::get_resume_token(self.batch(), self.raw().post_batch_resume_token())
}
fn current(&self) -> &crate::bson::RawDocument {
self.current()
}
async fn execute_watch(
&mut self,
args: WatchArgs,
data: ChangeStreamData,
session: &mut Self::Session,
) -> Result<(Self, WatchArgs)> {
let new_stream: SessionChangeStream<ChangeStreamEvent<()>> = self
.raw()
.client()
.execute_watch_with_session(
args.pipeline,
args.options,
args.target,
Some(data),
session,
)
.await?;
let new_inner = new_stream.inner;
Ok((new_inner.cursor, new_inner.args))
}
fn set_drop_address(&mut self, from: &Self) {
self.raw_mut()
.set_drop_address(from.raw().address().clone());
}
}