mod common;
pub(crate) mod session;
#[cfg(test)]
use std::collections::VecDeque;
use std::{
pin::Pin,
task::{Context, Poll},
};
use bson::RawDocument;
#[cfg(test)]
use bson::RawDocumentBuf;
use derive_where::derive_where;
use futures_core::Stream;
use serde::{de::DeserializeOwned, Deserialize};
#[cfg(test)]
use tokio::sync::oneshot;
use crate::{
change_stream::event::ResumeToken,
client::{options::ServerAddress, AsyncDropToken},
cmap::conn::PinnedConnectionHandle,
cursor::common::ImplicitClientSessionHandle,
error::{Error, Result},
Client,
ClientSession,
};
use common::{kill_cursor, GenericCursor};
pub(crate) use common::{
stream_poll_next,
BatchValue,
CursorInformation,
CursorSpecification,
CursorStream,
NextInBatchFuture,
PinnedConnection,
};
#[derive_where(Debug)]
pub struct Cursor<T> {
client: Client,
drop_token: AsyncDropToken,
wrapped_cursor: Option<ImplicitSessionCursor>,
drop_address: Option<ServerAddress>,
#[cfg(test)]
kill_watcher: Option<oneshot::Sender<()>>,
#[derive_where(skip)]
_phantom: std::marker::PhantomData<fn() -> T>,
}
impl<T> Cursor<T> {
pub(crate) fn new(
client: Client,
spec: CursorSpecification,
session: Option<ClientSession>,
pin: Option<PinnedConnectionHandle>,
) -> Self {
Self {
client: client.clone(),
drop_token: client.register_async_drop(),
wrapped_cursor: Some(ImplicitSessionCursor::with_implicit_session(
client,
spec,
PinnedConnection::new(pin),
ImplicitClientSessionHandle(session),
)),
drop_address: None,
#[cfg(test)]
kill_watcher: None,
_phantom: Default::default(),
}
}
pub(crate) fn post_batch_resume_token(&self) -> Option<&ResumeToken> {
self.wrapped_cursor
.as_ref()
.and_then(|c| c.post_batch_resume_token())
}
pub(crate) fn is_exhausted(&self) -> bool {
self.wrapped_cursor.as_ref().unwrap().is_exhausted()
}
pub(crate) fn has_next(&self) -> bool {
!self.is_exhausted()
|| !self
.wrapped_cursor
.as_ref()
.unwrap()
.state()
.buffer
.is_empty()
}
pub(crate) fn client(&self) -> &Client {
&self.client
}
pub(crate) fn address(&self) -> &ServerAddress {
self.wrapped_cursor.as_ref().unwrap().address()
}
pub(crate) fn set_drop_address(&mut self, address: ServerAddress) {
self.drop_address = Some(address);
}
pub(crate) fn take_implicit_session(&mut self) -> Option<ClientSession> {
self.wrapped_cursor
.as_mut()
.and_then(|c| c.take_implicit_session())
}
pub async fn advance(&mut self) -> Result<bool> {
self.wrapped_cursor.as_mut().unwrap().advance().await
}
#[cfg(test)]
pub(crate) async fn try_advance(&mut self) -> Result<()> {
self.wrapped_cursor
.as_mut()
.unwrap()
.try_advance()
.await
.map(|_| ())
}
pub fn current(&self) -> &RawDocument {
self.wrapped_cursor.as_ref().unwrap().current().unwrap()
}
pub fn deserialize_current<'a>(&'a self) -> Result<T>
where
T: Deserialize<'a>,
{
bson::from_slice(self.current().as_bytes()).map_err(Error::from)
}
pub fn with_type<'a, D>(mut self) -> Cursor<D>
where
D: Deserialize<'a>,
{
Cursor {
client: self.client.clone(),
drop_token: self.drop_token.take(),
wrapped_cursor: self.wrapped_cursor.take(),
drop_address: self.drop_address.take(),
#[cfg(test)]
kill_watcher: self.kill_watcher.take(),
_phantom: Default::default(),
}
}
#[cfg(test)]
pub(crate) fn set_kill_watcher(&mut self, tx: oneshot::Sender<()>) {
assert!(
self.kill_watcher.is_none(),
"cursor already has a kill_watcher"
);
self.kill_watcher = Some(tx);
}
#[cfg(test)]
pub(crate) fn current_batch(&self) -> &VecDeque<RawDocumentBuf> {
self.wrapped_cursor.as_ref().unwrap().current_batch()
}
}
impl<T> CursorStream for Cursor<T>
where
T: DeserializeOwned,
{
fn poll_next_in_batch(&mut self, cx: &mut Context<'_>) -> Poll<Result<BatchValue>> {
self.wrapped_cursor.as_mut().unwrap().poll_next_in_batch(cx)
}
}
impl<T> Stream for Cursor<T>
where
T: DeserializeOwned,
{
type Item = Result<T>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
stream_poll_next(self.wrapped_cursor.as_mut().unwrap(), cx)
}
}
impl<T> Drop for Cursor<T> {
fn drop(&mut self) {
let wrapped_cursor = match &self.wrapped_cursor {
None => return,
Some(c) => c,
};
if wrapped_cursor.is_exhausted() {
return;
}
kill_cursor(
self.client.clone(),
&mut self.drop_token,
wrapped_cursor.namespace(),
wrapped_cursor.id(),
wrapped_cursor.pinned_connection().replicate(),
self.drop_address.take(),
#[cfg(test)]
self.kill_watcher.take(),
);
}
}
type ImplicitSessionCursor = GenericCursor<'static, ImplicitClientSessionHandle>;