use std::collections::VecDeque;
use crate::{
bson::{Document, RawDocument, RawDocumentBuf, Timestamp},
bson_compat::deserialize_from_slice,
error::Error,
operation::OperationTarget,
};
use serde::de::DeserializeOwned;
#[cfg(feature = "bson-3")]
use crate::bson_compat::RawBsonRefExt as _;
use crate::{
change_stream::event::ResumeToken,
error::{ErrorKind, Result},
ClientSession,
};
#[derive(Debug, Clone)]
pub(crate) struct WatchArgs {
pub(crate) pipeline: Vec<Document>,
pub(crate) target: OperationTarget,
pub(crate) options: Option<super::options::ChangeStreamOptions>,
}
#[derive(Debug, Default)]
pub(crate) struct ChangeStreamData {
pub(crate) initial_operation_time: Option<Timestamp>,
pub(crate) resume_token: Option<ResumeToken>,
pub(crate) document_returned: bool,
pub(crate) implicit_session: Option<ClientSession>,
}
impl ChangeStreamData {
pub(super) fn take(&mut self) -> Self {
Self {
initial_operation_time: self.initial_operation_time,
resume_token: self.resume_token.clone(),
document_returned: self.document_returned,
implicit_session: self.implicit_session.take(),
}
}
}
#[derive(Debug)]
pub(super) struct CursorWrapper<Inner> {
pub(super) cursor: Inner,
pub(super) args: WatchArgs,
pub(super) data: ChangeStreamData,
}
impl<Inner> CursorWrapper<Inner> {
pub(super) fn new(cursor: Inner, args: WatchArgs, data: ChangeStreamData) -> Self {
Self { cursor, args, data }
}
pub(super) async fn next_if_any<T: DeserializeOwned>(
&mut self,
session: &mut Inner::Session,
) -> Result<Option<T>>
where
Inner: InnerCursor,
{
loop {
match self.cursor.try_advance(session).await {
Ok(has) => {
self.data.resume_token = self.cursor.get_resume_token()?;
return if has {
self.data.document_returned = true;
deserialize_from_slice(self.cursor.current().as_bytes())
.map(Some)
.map_err(Error::from)
} else {
Ok(None)
};
}
Err(e) if e.is_resumable() => {
let (new_cursor, new_args) = self
.cursor
.execute_watch(self.args.clone(), self.data.take(), session)
.await?;
self.cursor.set_drop_address(&new_cursor);
self.cursor = new_cursor;
self.args = new_args;
continue;
}
Err(e) => return Err(e),
}
}
}
}
pub(super) fn get_resume_token(
batch: &VecDeque<RawDocumentBuf>,
batch_token: Option<&ResumeToken>,
) -> Result<Option<ResumeToken>> {
Ok(match batch.front() {
Some(doc) => {
let doc_token = doc
.get("_id")?
.ok_or_else(|| Error::from(ErrorKind::MissingResumeToken))?;
Some(ResumeToken(doc_token.to_raw_bson()))
}
None => batch_token.cloned(),
})
}
pub(super) trait InnerCursor: Sized {
type Session;
async fn try_advance(&mut self, session: &mut Self::Session) -> Result<bool>;
fn get_resume_token(&self) -> Result<Option<ResumeToken>>;
fn current(&self) -> &RawDocument;
async fn execute_watch(
&mut self,
args: WatchArgs,
data: ChangeStreamData,
session: &mut Self::Session,
) -> Result<(Self, WatchArgs)>;
fn set_drop_address(&mut self, from: &Self);
}