use futures_util::stream::StreamExt;
use serde::de::{Deserialize, DeserializeOwned};
use crate::{
bson::{Document, RawDocument},
error::Result,
sync::ClientSession,
};
use super::{
session::{SessionCursor as AsyncSessionCursor, SessionCursorStream},
Cursor as AsyncCursor,
};
#[derive(Debug)]
pub struct Cursor<T> {
async_cursor: AsyncCursor<T>,
}
impl<T> Cursor<T> {
pub(crate) fn new(async_cursor: AsyncCursor<T>) -> Self {
Self { async_cursor }
}
}
impl<T> Cursor<T> {
pub fn advance(&mut self) -> Result<bool> {
crate::sync::TOKIO_RUNTIME.block_on(self.async_cursor.advance())
}
pub fn current(&self) -> &RawDocument {
self.async_cursor.current()
}
pub fn has_next(&self) -> bool {
self.async_cursor.has_next()
}
pub fn deserialize_current<'a>(&'a self) -> Result<T>
where
T: Deserialize<'a>,
{
self.async_cursor.deserialize_current()
}
}
impl<T> Iterator for Cursor<T>
where
T: DeserializeOwned + Unpin + Send + Sync,
{
type Item = Result<T>;
fn next(&mut self) -> Option<Self::Item> {
crate::sync::TOKIO_RUNTIME.block_on(self.async_cursor.next())
}
}
#[derive(Debug)]
pub struct SessionCursor<T> {
async_cursor: AsyncSessionCursor<T>,
}
impl<T> SessionCursor<T> {
pub(crate) fn new(async_cursor: AsyncSessionCursor<T>) -> Self {
Self { async_cursor }
}
pub fn advance(&mut self, session: &mut ClientSession) -> Result<bool> {
crate::sync::TOKIO_RUNTIME
.block_on(self.async_cursor.advance(&mut session.async_client_session))
}
pub fn current(&self) -> &RawDocument {
self.async_cursor.current()
}
pub fn deserialize_current<'a>(&'a self) -> Result<T>
where
T: Deserialize<'a>,
{
self.async_cursor.deserialize_current()
}
}
impl<T> SessionCursor<T>
where
T: DeserializeOwned + Unpin + Send + Sync,
{
pub fn iter<'session>(
&mut self,
session: &'session mut ClientSession,
) -> SessionCursorIter<'_, 'session, T> {
SessionCursorIter {
async_stream: self.async_cursor.stream(&mut session.async_client_session),
}
}
pub fn next(&mut self, session: &mut ClientSession) -> Option<Result<T>> {
self.iter(session).next()
}
}
pub struct SessionCursorIter<'cursor, 'session, T = Document>
where
T: DeserializeOwned + Unpin + Send + Sync,
{
async_stream: SessionCursorStream<'cursor, 'session, T>,
}
impl<'cursor, 'session, T> Iterator for SessionCursorIter<'cursor, 'session, T>
where
T: DeserializeOwned + Unpin + Send + Sync,
'session: 'cursor,
{
type Item = Result<T>;
fn next(&mut self) -> Option<Self::Item> {
crate::sync::TOKIO_RUNTIME.block_on(self.async_stream.next())
}
}