mod common;
#[allow(dead_code)]
mod session;
use std::{
pin::Pin,
task::{Context, Poll},
};
use futures::{future::BoxFuture, Stream};
use crate::{
bson::Document,
client::ClientSession,
error::Result,
operation::GetMore,
results::GetMoreResult,
Client,
RUNTIME,
};
pub(crate) use common::{CursorInformation, CursorSpecification};
use common::{GenericCursor, GetMoreProvider, GetMoreProviderResult};
#[derive(Debug)]
pub struct Cursor {
client: Client,
wrapped_cursor: ImplicitSessionCursor,
}
impl Cursor {
pub(crate) fn new(
client: Client,
spec: CursorSpecification,
session: Option<ClientSession>,
) -> Self {
let provider = ImplicitSessionGetMoreProvider::new(&spec, session);
Self {
client: client.clone(),
wrapped_cursor: ImplicitSessionCursor::new(client, spec, provider),
}
}
}
impl Stream for Cursor {
type Item = Result<Document>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Pin::new(&mut self.wrapped_cursor).poll_next(cx)
}
}
impl Drop for Cursor {
fn drop(&mut self) {
if self.wrapped_cursor.is_exhausted() {
return;
}
let ns = self.wrapped_cursor.namespace();
let coll = self
.client
.database(ns.db.as_str())
.collection(ns.coll.as_str());
let cursor_id = self.wrapped_cursor.id();
RUNTIME.execute(async move { coll.kill_cursor(cursor_id).await });
}
}
type ImplicitSessionCursor = GenericCursor<ImplicitSessionGetMoreProvider>;
struct ImplicitSessionGetMoreResult {
get_more_result: Result<GetMoreResult>,
session: Option<ClientSession>,
}
impl GetMoreProviderResult for ImplicitSessionGetMoreResult {
fn as_mut(&mut self) -> Result<&mut GetMoreResult> {
self.get_more_result.as_mut().map_err(|e| e.clone())
}
fn as_ref(&self) -> Result<&GetMoreResult> {
self.get_more_result.as_ref().map_err(|e| e.clone())
}
}
enum ImplicitSessionGetMoreProvider {
Executing(BoxFuture<'static, ImplicitSessionGetMoreResult>),
Idle(Option<ClientSession>),
Done,
}
impl ImplicitSessionGetMoreProvider {
fn new(spec: &CursorSpecification, session: Option<ClientSession>) -> Self {
if spec.id() == 0 {
Self::Done
} else {
Self::Idle(session)
}
}
}
impl GetMoreProvider for ImplicitSessionGetMoreProvider {
type GetMoreResult = ImplicitSessionGetMoreResult;
type GetMoreFuture = BoxFuture<'static, ImplicitSessionGetMoreResult>;
fn executing_future(&mut self) -> Option<&mut Self::GetMoreFuture> {
match self {
Self::Executing(ref mut future) => Some(future),
Self::Idle(_) | Self::Done => None,
}
}
fn clear_execution(&mut self, result: Self::GetMoreResult) {
if result.get_more_result.map(|r| r.exhausted).unwrap_or(false) {
*self = Self::Done;
} else {
*self = Self::Idle(result.session)
}
}
fn start_execution(&mut self, info: CursorInformation, client: Client) {
take_mut::take(self, |self_| match self_ {
Self::Idle(mut session) => {
let future = Box::pin(async move {
let get_more = GetMore::new(info);
let get_more_result = match session {
Some(ref mut session) => {
client
.execute_operation_with_session(get_more, session)
.await
}
None => client.execute_operation(get_more).await,
};
ImplicitSessionGetMoreResult {
get_more_result,
session,
}
});
Self::Executing(future)
}
Self::Executing(_) | Self::Done => self_,
})
}
}