use serde::de::DeserializeOwned;
use crate::{
cursor::{BatchValue, NextInBatchFuture},
error::Result,
ClientSession,
SessionCursor,
};
use super::{
event::{ChangeStreamEvent, ResumeToken},
get_resume_token,
ChangeStreamData,
WatchArgs,
};
pub struct SessionChangeStream<T>
where
T: DeserializeOwned + Unpin,
{
cursor: SessionCursor<T>,
args: WatchArgs,
data: ChangeStreamData,
}
impl<T> SessionChangeStream<T>
where
T: DeserializeOwned + Unpin + Send + Sync,
{
pub(crate) fn new(cursor: SessionCursor<T>, args: WatchArgs, data: ChangeStreamData) -> Self {
Self { cursor, args, data }
}
pub fn resume_token(&self) -> Option<ResumeToken> {
self.data.resume_token.clone()
}
pub fn with_type<D: DeserializeOwned + Unpin + Send + Sync>(self) -> SessionChangeStream<D> {
SessionChangeStream::new(self.cursor.with_type(), self.args, self.data)
}
pub async fn next(&mut self, session: &mut ClientSession) -> Result<Option<T>> {
loop {
let maybe_next = self.next_if_any(session).await?;
match maybe_next {
Some(t) => return Ok(Some(t)),
None if self.is_alive() => continue,
None => return Ok(None),
}
}
}
pub fn is_alive(&self) -> bool {
!self.cursor.is_exhausted()
}
pub async fn next_if_any(&mut self, session: &mut ClientSession) -> Result<Option<T>> {
loop {
let (next, post_batch_token, client) = {
let mut stream = self.cursor.stream(session);
let next = NextInBatchFuture::new(&mut stream).await;
let post_batch_token = stream.post_batch_resume_token().cloned();
let client = stream.client().clone();
(next, post_batch_token, client)
};
match next {
Ok(bv) => {
if let Some(token) = get_resume_token(&bv, post_batch_token.as_ref())? {
self.data.resume_token = Some(token);
}
match bv {
BatchValue::Some { doc, .. } => {
self.data.document_returned = true;
return Ok(Some(bson::from_slice(doc.as_bytes())?));
}
BatchValue::Empty | BatchValue::Exhausted => return Ok(None),
}
}
Err(e) if e.is_resumable() && !self.data.resume_attempted => {
self.data.resume_attempted = true;
let args = self.args.clone();
let new_stream: SessionChangeStream<ChangeStreamEvent<()>> = client
.execute_watch_with_session(
args.pipeline,
args.options,
args.target,
Some(self.data.take()),
session,
)
.await?;
let new_stream = new_stream.with_type::<T>();
self.cursor
.set_drop_address(new_stream.cursor.address().clone());
self.cursor = new_stream.cursor;
self.args = new_stream.args;
self.data.resume_attempted = false;
continue;
}
Err(e) => return Err(e),
}
}
}
}