use derive_where::derive_where;
use serde::{de::DeserializeOwned, Deserialize};
use futures_util::StreamExt;
use crate::{
bson::{Document, RawDocument},
cursor::raw_batch::SessionRawBatchCursor,
error::Result,
raw_batch_cursor::SessionRawBatchCursorStream,
ClientSession,
};
use super::{common, stream};
#[derive_where(Debug)]
pub struct SessionCursor<T> {
buffer: Option<stream::BatchBuffer<()>>,
raw: SessionRawBatchCursor,
_phantom: std::marker::PhantomData<T>,
}
impl<T> crate::cursor::NewCursor for SessionCursor<T> {
fn generic_new(
client: crate::Client,
spec: common::CursorSpecification,
implicit_session: Option<ClientSession>,
pinned: Option<crate::cmap::conn::PinnedConnectionHandle>,
) -> Result<Self> {
let raw = SessionRawBatchCursor::generic_new(client, spec, implicit_session, pinned)?;
Ok(Self {
buffer: Some(stream::BatchBuffer::new(())),
raw,
_phantom: std::marker::PhantomData,
})
}
}
impl<T> SessionCursor<T> {
pub fn stream<'session>(
&mut self,
session: &'session mut ClientSession,
) -> SessionCursorStream<'_, 'session, T> {
let raw_stream = self.raw.stream(session);
let stream = stream::Stream::from_cursor(self.buffer.take().unwrap().map(|_| raw_stream));
SessionCursorStream {
parent: &mut self.buffer,
stream,
}
}
pub async fn next(&mut self, session: &mut ClientSession) -> Option<Result<T>>
where
T: DeserializeOwned,
{
self.stream(session).next().await
}
pub async fn advance(&mut self, session: &mut ClientSession) -> Result<bool> {
self.stream(session).stream.buffer_mut().advance().await
}
pub(crate) async fn try_advance(&mut self, session: &mut ClientSession) -> Result<bool> {
self.stream(session).stream.buffer_mut().try_advance().await
}
fn buffer(&self) -> &stream::BatchBuffer<()> {
self.buffer.as_ref().unwrap()
}
pub(crate) fn batch(&self) -> &std::collections::VecDeque<crate::bson::RawDocumentBuf> {
self.buffer().batch()
}
pub fn current(&self) -> &RawDocument {
self.buffer().current()
}
pub fn deserialize_current<'a>(&'a self) -> Result<T>
where
T: Deserialize<'a>,
{
self.buffer().deserialize_current()
}
pub fn with_type<'a, D>(self) -> SessionCursor<D>
where
D: Deserialize<'a>,
{
SessionCursor {
buffer: self.buffer,
raw: self.raw,
_phantom: std::marker::PhantomData,
}
}
pub(crate) fn raw(&self) -> &SessionRawBatchCursor {
&self.raw
}
pub(crate) fn raw_mut(&mut self) -> &mut SessionRawBatchCursor {
&mut self.raw
}
}
pub struct SessionCursorStream<'cursor, 'session, T = Document> {
parent: &'cursor mut Option<stream::BatchBuffer<()>>,
stream: stream::Stream<'cursor, SessionRawBatchCursorStream<'cursor, 'session>, T>,
}
impl<T> Drop for SessionCursorStream<'_, '_, T> {
fn drop(&mut self) {
*self.parent = Some(self.stream.take_buffer().map(|_| ()));
}
}
impl<'cursor, 'session, T> futures_core::Stream for SessionCursorStream<'cursor, 'session, T>
where
T: DeserializeOwned,
'session: 'cursor,
{
type Item = Result<T>;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
self.stream.poll_next_unpin(cx)
}
}