pub(crate) mod common;
pub mod raw_batch;
pub(crate) mod session;
mod stream;
#[cfg(feature = "sync")]
pub(crate) mod sync;
use std::task::Poll;
use derive_where::derive_where;
use futures_core::Stream as AsyncStream;
use futures_util::stream::StreamExt;
use serde::{de::DeserializeOwned, Deserialize};
use crate::{
bson::RawDocument,
cmap::conn::PinnedConnectionHandle,
error::Result,
Client,
ClientSession,
};
#[derive_where(Debug)]
pub struct Cursor<T> {
stream: stream::Stream<'static, raw_batch::RawBatchCursor, T>,
}
impl<T> Cursor<T> {
pub async fn advance(&mut self) -> Result<bool> {
self.stream.buffer_mut().advance().await
}
pub fn current(&self) -> &RawDocument {
self.stream.buffer().current()
}
pub fn has_next(&self) -> bool {
let state = self.stream.buffer();
!state.batch().is_empty() || state.raw.has_next()
}
pub fn deserialize_current<'a>(&'a self) -> Result<T>
where
T: Deserialize<'a>,
{
self.stream.buffer().deserialize_current()
}
pub fn with_type<'a, D>(self) -> Cursor<D>
where
D: Deserialize<'a>,
{
Cursor {
stream: self.stream.with_type(),
}
}
pub(crate) fn raw(&self) -> &raw_batch::RawBatchCursor {
&self.stream.buffer().raw
}
pub(crate) fn raw_mut(&mut self) -> &mut raw_batch::RawBatchCursor {
&mut self.stream.buffer_mut().raw
}
pub(crate) async fn try_advance(&mut self) -> Result<bool> {
self.stream.buffer_mut().try_advance().await
}
pub(crate) fn batch(&self) -> &std::collections::VecDeque<crate::bson::RawDocumentBuf> {
self.stream.buffer().batch()
}
}
pub(crate) trait NewCursor: Sized {
fn generic_new(
client: Client,
spec: common::CursorSpecification,
implicit_session: Option<ClientSession>,
pinned: Option<PinnedConnectionHandle>,
) -> Result<Self>;
}
impl<T> NewCursor for Cursor<T> {
fn generic_new(
client: Client,
spec: common::CursorSpecification,
implicit_session: Option<ClientSession>,
pinned: Option<PinnedConnectionHandle>,
) -> Result<Self> {
let raw = crate::cursor::raw_batch::RawBatchCursor::generic_new(
client,
spec,
implicit_session,
pinned,
)?;
Ok(Self {
stream: stream::Stream::new(raw),
})
}
}
impl<T> AsyncStream for Cursor<T>
where
T: DeserializeOwned,
{
type Item = Result<T>;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Option<Self::Item>> {
self.stream.poll_next_unpin(cx)
}
}