pub(crate) mod common;
pub mod event;
pub(crate) mod options;
pub mod session;
#[cfg(test)]
use std::collections::VecDeque;
use std::{
pin::Pin,
task::{Context, Poll},
};
use crate::error::Error;
use derive_where::derive_where;
use futures_core::{future::BoxFuture, Stream};
use futures_util::FutureExt;
use serde::de::DeserializeOwned;
#[cfg(test)]
use tokio::sync::oneshot;
use crate::{change_stream::event::ResumeToken, error::Result, Cursor};
use common::{ChangeStreamData, WatchArgs};
#[derive_where(Debug)]
pub struct ChangeStream<T>
where
T: DeserializeOwned,
{
inner: StreamState<T>,
}
impl<T> ChangeStream<T>
where
T: DeserializeOwned,
{
pub(crate) fn new(cursor: Cursor<()>, args: WatchArgs, data: ChangeStreamData) -> Self {
Self {
inner: StreamState::Idle(CursorWrapper::new(cursor, args, data)),
}
}
pub fn resume_token(&self) -> Option<ResumeToken> {
self.inner.state().data.resume_token.clone()
}
pub fn with_type<D: DeserializeOwned>(self) -> ChangeStream<D> {
ChangeStream {
inner: StreamState::Idle(self.inner.take_state()),
}
}
pub fn is_alive(&self) -> bool {
!self.inner.state().cursor.raw().is_exhausted()
}
pub async fn next_if_any(&mut self) -> Result<Option<T>> {
self.inner.state_mut().next_if_any(&mut ()).await
}
#[cfg(test)]
pub(crate) fn set_kill_watcher(&mut self, tx: oneshot::Sender<()>) {
self.inner.state_mut().cursor.raw_mut().set_kill_watcher(tx);
}
#[cfg(test)]
pub(crate) fn current_batch(&self) -> &VecDeque<crate::bson::RawDocumentBuf> {
self.inner.state().cursor.batch()
}
#[cfg(test)]
pub(crate) fn client(&self) -> &crate::Client {
self.inner.state().cursor.raw().client()
}
}
impl<T> Stream for ChangeStream<T>
where
T: DeserializeOwned,
{
type Item = Result<T>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Pin::new(&mut self.inner).poll_next(cx)
}
}
type CursorWrapper = common::CursorWrapper<Cursor<()>>;
#[derive_where(Debug)]
enum StreamState<T: DeserializeOwned> {
Idle(CursorWrapper),
Polling,
Next(#[derive_where(skip)] BoxFuture<'static, NextDone<T>>),
}
struct NextDone<T> {
state: CursorWrapper,
out: Result<Option<T>>,
}
impl<T: DeserializeOwned> StreamState<T> {
fn state(&self) -> &CursorWrapper {
match self {
Self::Idle(st) => st,
_ => panic!("invalid change stream state access"),
}
}
fn state_mut(&mut self) -> &mut CursorWrapper {
match self {
Self::Idle(st) => st,
_ => panic!("invalid change stream state access"),
}
}
fn take_state(self) -> CursorWrapper {
match self {
Self::Idle(st) => st,
_ => panic!("invalid change stream state access"),
}
}
}
impl<T: DeserializeOwned> Stream for StreamState<T> {
type Item = Result<T>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
loop {
match std::mem::replace(&mut *self, StreamState::Polling) {
StreamState::Idle(mut state) => {
*self = StreamState::Next(
async move {
let out = state.next_if_any(&mut ()).await;
NextDone { state, out }
}
.boxed(),
);
continue;
}
StreamState::Next(mut fut) => match fut.poll_unpin(cx) {
Poll::Pending => {
*self = StreamState::Next(fut);
return Poll::Pending;
}
Poll::Ready(NextDone { state, out }) => {
*self = StreamState::Idle(state);
match out {
Ok(Some(v)) => return Poll::Ready(Some(Ok(v))),
Ok(None) => continue,
Err(e) => return Poll::Ready(Some(Err(e))),
}
}
},
StreamState::Polling => {
return Poll::Ready(Some(Err(Error::internal(
"attempt to poll change stream already in polling state",
))))
}
}
}
}
}
impl common::InnerCursor for Cursor<()> {
type Session = ();
async fn try_advance(&mut self, _session: &mut Self::Session) -> Result<bool> {
self.try_advance().await
}
fn get_resume_token(&self) -> Result<Option<ResumeToken>> {
common::get_resume_token(self.batch(), self.raw().post_batch_resume_token())
}
fn current(&self) -> &crate::bson::RawDocument {
self.current()
}
async fn execute_watch(
&mut self,
args: WatchArgs,
mut data: ChangeStreamData,
_session: &mut Self::Session,
) -> Result<(Self, WatchArgs)> {
data.implicit_session = self.raw_mut().take_implicit_session();
let new_stream: ChangeStream<event::ChangeStreamEvent<()>> = self
.raw()
.client()
.execute_watch(args.pipeline, args.options, args.target, Some(data))
.await?;
let new_wrapper = new_stream.inner.take_state();
Ok((new_wrapper.cursor, new_wrapper.args))
}
fn set_drop_address(&mut self, from: &Self) {
self.raw_mut()
.set_drop_address(from.raw().address().clone());
}
}