mod common;
pub(crate) mod session;
use std::{
pin::Pin,
task::{Context, Poll},
};
use futures_core::{future::BoxFuture, Stream};
use serde::de::DeserializeOwned;
use crate::{
bson::Document,
error::{Error, Result},
operation::GetMore,
results::GetMoreResult,
Client,
ClientSession,
RUNTIME,
};
pub(crate) use common::{CursorInformation, CursorSpecification};
use common::{GenericCursor, GetMoreProvider, GetMoreProviderResult};
#[derive(Debug)]
pub struct Cursor<T>
where
T: DeserializeOwned + Unpin + Send + Sync,
{
client: Client,
wrapped_cursor: ImplicitSessionCursor<T>,
_phantom: std::marker::PhantomData<T>,
}
impl<T> Cursor<T>
where
T: DeserializeOwned + Unpin + Send + Sync,
{
pub(crate) fn new(
client: Client,
spec: CursorSpecification<T>,
session: Option<ClientSession>,
) -> Self {
let provider = ImplicitSessionGetMoreProvider::new(&spec, session);
Self {
client: client.clone(),
wrapped_cursor: ImplicitSessionCursor::new(client, spec, provider),
_phantom: Default::default(),
}
}
}
impl<T> Stream for Cursor<T>
where
T: DeserializeOwned + Unpin + Send + Sync,
{
type Item = Result<T>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Pin::new(&mut self.wrapped_cursor).poll_next(cx)
}
}
impl<T> Drop for Cursor<T>
where
T: DeserializeOwned + Unpin + Send + Sync,
{
fn drop(&mut self) {
if self.wrapped_cursor.is_exhausted() {
return;
}
let ns = self.wrapped_cursor.namespace();
let coll = self
.client
.database(ns.db.as_str())
.collection::<Document>(ns.coll.as_str());
let cursor_id = self.wrapped_cursor.id();
RUNTIME.execute(async move { coll.kill_cursor(cursor_id).await });
}
}
type ImplicitSessionCursor<T> = GenericCursor<ImplicitSessionGetMoreProvider<T>, T>;
struct ImplicitSessionGetMoreResult<T> {
get_more_result: Result<GetMoreResult<T>>,
session: Option<Box<ClientSession>>,
}
impl<T> GetMoreProviderResult for ImplicitSessionGetMoreResult<T> {
type Session = Option<Box<ClientSession>>;
type DocumentType = T;
fn as_ref(&self) -> std::result::Result<&GetMoreResult<T>, &Error> {
self.get_more_result.as_ref()
}
fn into_parts(self) -> (Result<GetMoreResult<T>>, Self::Session) {
(self.get_more_result, self.session)
}
}
enum ImplicitSessionGetMoreProvider<T> {
Executing(BoxFuture<'static, ImplicitSessionGetMoreResult<T>>),
Idle(Option<Box<ClientSession>>),
Done,
}
impl<T> ImplicitSessionGetMoreProvider<T> {
fn new(spec: &CursorSpecification<T>, session: Option<ClientSession>) -> Self {
if spec.id() == 0 {
Self::Done
} else {
Self::Idle(session.map(Box::new))
}
}
}
impl<T: Send + Sync + DeserializeOwned> GetMoreProvider for ImplicitSessionGetMoreProvider<T> {
type DocumentType = T;
type ResultType = ImplicitSessionGetMoreResult<T>;
type GetMoreFuture = BoxFuture<'static, ImplicitSessionGetMoreResult<T>>;
fn executing_future(&mut self) -> Option<&mut Self::GetMoreFuture> {
match self {
Self::Executing(ref mut future) => Some(future),
Self::Idle(_) | Self::Done => None,
}
}
fn clear_execution(&mut self, session: Option<Box<ClientSession>>, exhausted: bool) {
if exhausted {
*self = Self::Done;
} else {
*self = Self::Idle(session)
}
}
fn start_execution(&mut self, info: CursorInformation, client: Client) {
take_mut::take(self, |self_| match self_ {
Self::Idle(mut session) => {
let future = Box::pin(async move {
let get_more = GetMore::new(info);
let get_more_result = client
.execute_operation(get_more, session.as_mut().map(|b| b.as_mut()))
.await;
ImplicitSessionGetMoreResult {
get_more_result,
session,
}
});
Self::Executing(future)
}
Self::Executing(_) | Self::Done => self_,
})
}
}