use crate::types::connection::QueryOperation;
use crate::{Connection, Context, Cursor, DataSource, FieldResult, ObjectType};
use futures::{stream::BoxStream, StreamExt};
use std::collections::VecDeque;
#[async_trait::async_trait]
impl<'a, T, E> DataSource for BoxStream<'a, (Cursor, E, T)>
where
T: Send + 'a,
E: ObjectType + Send + 'a,
{
type Element = T;
type EdgeFieldsObj = E;
async fn query_operation(
&mut self,
_ctx: &Context<'_>,
operation: &QueryOperation,
) -> FieldResult<Connection<Self::Element, Self::EdgeFieldsObj>> {
let mut count: usize = 0;
let mut has_seen_before = false;
let mut has_prev_page = false;
let mut has_next_page = false;
let mut edges = VecDeque::new();
while let Some(edge) = self.next().await {
count += 1;
if has_seen_before {
continue;
}
match operation {
QueryOperation::After { after }
| QueryOperation::Between { after, .. }
| QueryOperation::FirstAfter { after, .. }
| QueryOperation::FirstBetween { after, .. }
| QueryOperation::LastAfter { after, .. }
| QueryOperation::LastBetween { after, .. } => {
if *after == edge.0 {
has_prev_page = true;
has_next_page = false;
edges.clear();
continue;
}
}
_ => {}
}
match operation {
QueryOperation::Before { before }
| QueryOperation::Between { before, .. }
| QueryOperation::FirstBefore { before, .. }
| QueryOperation::FirstBetween { before, .. }
| QueryOperation::LastBefore { before, .. }
| QueryOperation::LastBetween { before, .. } => {
if *before == edge.0 {
has_seen_before = true;
has_next_page = true;
continue;
}
}
_ => {}
}
match operation {
QueryOperation::First { limit }
| QueryOperation::FirstAfter { limit, .. }
| QueryOperation::FirstBefore { limit, .. }
| QueryOperation::FirstBetween { limit, .. } => {
if edges.len() < *limit {
edges.push_back(edge)
} else {
has_next_page = true;
}
}
QueryOperation::Last { limit }
| QueryOperation::LastAfter { limit, .. }
| QueryOperation::LastBefore { limit, .. }
| QueryOperation::LastBetween { limit, .. } => {
if edges.len() >= *limit {
has_prev_page = true;
edges.pop_front();
}
edges.push_back(edge);
}
_ => {
edges.push_back(edge);
}
}
}
Ok(Connection::new(
Some(count),
has_prev_page,
has_next_page,
edges.into(),
))
}
}