use crate::stream::JsonStream;
use crate::{Result, WireError};
use futures::stream::Stream;
use serde::de::DeserializeOwned;
use serde_json::Value;
use std::marker::PhantomData;
use std::pin::Pin;
use std::task::{Context, Poll};
type Predicate = Box<dyn Fn(&Value) -> bool + Send>;
pub struct QueryStream<T: DeserializeOwned + Unpin> {
inner: JsonStream,
predicate: Option<Predicate>,
_phantom: PhantomData<T>,
}
impl<T: DeserializeOwned + Unpin> QueryStream<T> {
pub fn new(inner: JsonStream, predicate: Option<Predicate>) -> Self {
Self {
inner,
predicate,
_phantom: PhantomData,
}
}
pub async fn pause(&mut self) -> Result<()> {
self.inner.pause().await
}
pub async fn resume(&mut self) -> Result<()> {
self.inner.resume().await
}
pub fn stats(&self) -> crate::stream::StreamStats {
self.inner.stats()
}
pub fn state_snapshot(&self) -> crate::stream::StreamState {
self.inner.state_snapshot()
}
pub fn paused_occupancy(&self) -> usize {
self.inner.paused_occupancy()
}
pub async fn pause_with_reason(&mut self, reason: &str) -> Result<()> {
self.inner.pause_with_reason(reason).await
}
fn deserialize_value(value: Value) -> Result<T> {
match serde_json::from_value::<T>(value) {
Ok(result) => Ok(result),
Err(e) => Err(WireError::Deserialization {
type_name: std::any::type_name::<T>().to_string(),
details: e.to_string(),
}),
}
}
}
impl<T: DeserializeOwned + Unpin> Stream for QueryStream<T> {
type Item = Result<T>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
loop {
match Pin::new(&mut self.inner).poll_next(cx) {
Poll::Ready(Some(Ok(value))) => {
if let Some(ref predicate) = self.predicate {
if !predicate(&value) {
continue;
}
}
return Poll::Ready(Some(Self::deserialize_value(value)));
}
Poll::Ready(Some(Err(e))) => {
return Poll::Ready(Some(Err(e)));
}
Poll::Ready(None) => {
return Poll::Ready(None);
}
Poll::Pending => {
return Poll::Pending;
}
}
}
}
}
impl<T: DeserializeOwned + Unpin> Unpin for QueryStream<T> {}