pub mod event;
pub(crate) mod options;
pub mod session;
#[cfg(test)]
use std::collections::VecDeque;
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
};
#[cfg(test)]
use bson::RawDocumentBuf;
use bson::{Document, Timestamp};
use derive_where::derive_where;
use futures_core::{future::BoxFuture, Stream};
use serde::de::DeserializeOwned;
#[cfg(test)]
use tokio::sync::oneshot;
use crate::{
change_stream::event::{ChangeStreamEvent, ResumeToken},
cursor::{stream_poll_next, BatchValue, CursorStream, NextInBatchFuture},
error::{ErrorKind, Result},
operation::aggregate::AggregateTarget,
ClientSession,
Cursor,
};
#[derive_where(Debug)]
pub struct ChangeStream<T>
where
T: DeserializeOwned,
{
cursor: Cursor<T>,
args: WatchArgs,
data: ChangeStreamData,
#[derive_where(skip)]
pending_resume: Option<BoxFuture<'static, Result<ChangeStream<T>>>>,
}
impl<T> ChangeStream<T>
where
T: DeserializeOwned,
{
pub(crate) fn new(cursor: Cursor<T>, args: WatchArgs, data: ChangeStreamData) -> Self {
let pending_resume: Option<BoxFuture<'static, Result<ChangeStream<T>>>> = None;
Self {
cursor,
args,
data,
pending_resume,
}
}
pub fn resume_token(&self) -> Option<ResumeToken> {
self.data.resume_token.clone()
}
pub fn with_type<D: DeserializeOwned>(self) -> ChangeStream<D> {
ChangeStream {
cursor: self.cursor.with_type(),
args: self.args,
data: self.data,
pending_resume: None,
}
}
pub fn is_alive(&self) -> bool {
!self.cursor.is_exhausted()
}
pub async fn next_if_any(&mut self) -> Result<Option<T>> {
Ok(match NextInBatchFuture::new(self).await? {
BatchValue::Some { doc, .. } => Some(bson::from_slice(doc.as_bytes())?),
BatchValue::Empty | BatchValue::Exhausted => None,
})
}
#[cfg(test)]
pub(crate) fn set_kill_watcher(&mut self, tx: oneshot::Sender<()>) {
self.cursor.set_kill_watcher(tx);
}
#[cfg(test)]
pub(crate) fn current_batch(&self) -> &VecDeque<RawDocumentBuf> {
self.cursor.current_batch()
}
#[cfg(test)]
pub(crate) fn client(&self) -> &crate::Client {
self.cursor.client()
}
}
#[derive(Debug, Clone)]
pub(crate) struct WatchArgs {
pub(crate) pipeline: Vec<Document>,
pub(crate) target: AggregateTarget,
pub(crate) options: Option<options::ChangeStreamOptions>,
}
#[derive(Debug, Default)]
pub(crate) struct ChangeStreamData {
pub(crate) initial_operation_time: Option<Timestamp>,
pub(crate) resume_token: Option<ResumeToken>,
pub(crate) resume_attempted: bool,
pub(crate) document_returned: bool,
pub(crate) implicit_session: Option<ClientSession>,
}
impl ChangeStreamData {
fn take(&mut self) -> Self {
Self {
initial_operation_time: self.initial_operation_time,
resume_token: self.resume_token.clone(),
resume_attempted: self.resume_attempted,
document_returned: self.document_returned,
implicit_session: self.implicit_session.take(),
}
}
}
fn get_resume_token(
batch_value: &BatchValue,
batch_token: Option<&ResumeToken>,
) -> Result<Option<ResumeToken>> {
Ok(match batch_value {
BatchValue::Some { doc, is_last } => {
let doc_token = match doc.get("_id")? {
Some(val) => ResumeToken(val.to_raw_bson()),
None => return Err(ErrorKind::MissingResumeToken.into()),
};
if *is_last && batch_token.is_some() {
batch_token.cloned()
} else {
Some(doc_token)
}
}
BatchValue::Empty => batch_token.cloned(),
_ => None,
})
}
impl<T> CursorStream for ChangeStream<T>
where
T: DeserializeOwned,
{
fn poll_next_in_batch(&mut self, cx: &mut Context<'_>) -> Poll<Result<BatchValue>> {
loop {
if let Some(mut pending) = self.pending_resume.take() {
match Pin::new(&mut pending).poll(cx) {
Poll::Pending => {
self.pending_resume = Some(pending);
return Poll::Pending;
}
Poll::Ready(Ok(new_stream)) => {
self.cursor
.set_drop_address(new_stream.cursor.address().clone());
self.cursor = new_stream.cursor;
self.args = new_stream.args;
self.data.resume_attempted = false;
continue;
}
Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
}
}
let out = self.cursor.poll_next_in_batch(cx);
match &out {
Poll::Ready(Ok(bv)) => {
if let Some(token) =
get_resume_token(bv, self.cursor.post_batch_resume_token())?
{
self.data.resume_token = Some(token);
}
if matches!(bv, BatchValue::Some { .. }) {
self.data.document_returned = true;
}
}
Poll::Ready(Err(e)) if e.is_resumable() && !self.data.resume_attempted => {
self.data.resume_attempted = true;
let client = self.cursor.client().clone();
let args = self.args.clone();
let mut data = self.data.take();
data.implicit_session = self.cursor.take_implicit_session();
self.pending_resume = Some(Box::pin(async move {
let new_stream: Result<ChangeStream<ChangeStreamEvent<()>>> = client
.execute_watch(args.pipeline, args.options, args.target, Some(data))
.await;
new_stream.map(|cs| cs.with_type::<T>())
}));
continue;
}
_ => {}
}
return out;
}
}
}
impl<T> Stream for ChangeStream<T>
where
T: DeserializeOwned,
{
type Item = Result<T>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
stream_poll_next(Pin::into_inner(self), cx)
}
}