use super::{follow_stream_driver::FollowStreamDriverHandle, follow_stream_unpin::BlockRef};
use crate::{
config::{Config, HashFor},
error::{BackendError, RpcError},
};
use futures::{FutureExt, Stream, StreamExt};
use pezkuwi_subxt_rpcs::methods::chain_head::{
ChainHeadRpcMethods, FollowEvent, MethodResponse, StorageQuery, StorageResult,
};
use std::{
collections::VecDeque,
future::Future,
pin::Pin,
sync::Arc,
task::{Context, Poll},
};
pub struct StorageItems<T: Config> {
done: bool,
operation_id: Arc<str>,
buffered_responses: VecDeque<StorageResult>,
continue_call: ContinueFutGetter,
continue_fut: Option<ContinueFut>,
follow_event_stream: FollowEventStream<HashFor<T>>,
}
impl<T: Config> StorageItems<T> {
pub async fn from_methods(
queries: impl Iterator<Item = StorageQuery<&[u8]>>,
at: HashFor<T>,
follow_handle: &FollowStreamDriverHandle<HashFor<T>>,
methods: ChainHeadRpcMethods<T>,
) -> Result<Self, BackendError> {
let sub_id = super::get_subscription_id(follow_handle).await?;
let follow_events = follow_handle.subscribe().events();
let status = methods.chainhead_v1_storage(&sub_id, at, queries, None).await?;
let operation_id: Arc<str> = match status {
MethodResponse::LimitReached => return Err(RpcError::LimitReached.into()),
MethodResponse::Started(s) => s.operation_id.into(),
};
let continue_call: ContinueFutGetter = {
let operation_id = operation_id.clone();
Box::new(move || {
let sub_id = sub_id.clone();
let operation_id = operation_id.clone();
let methods = methods.clone();
Box::pin(async move {
methods.chainhead_v1_continue(&sub_id, &operation_id).await?;
Ok(())
})
})
};
Ok(StorageItems::new(operation_id, continue_call, Box::pin(follow_events)))
}
fn new(
operation_id: Arc<str>,
continue_call: ContinueFutGetter,
follow_event_stream: FollowEventStream<HashFor<T>>,
) -> Self {
Self {
done: false,
buffered_responses: VecDeque::new(),
operation_id,
continue_call,
continue_fut: None,
follow_event_stream,
}
}
}
pub type FollowEventStream<Hash> =
Pin<Box<dyn Stream<Item = FollowEvent<BlockRef<Hash>>> + Send + 'static>>;
pub type ContinueFutGetter = Box<dyn Fn() -> ContinueFut + Send + 'static>;
pub type ContinueFut = Pin<Box<dyn Future<Output = Result<(), BackendError>> + Send + 'static>>;
impl<T: Config> Stream for StorageItems<T> {
type Item = Result<StorageResult, BackendError>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
loop {
if self.done {
return Poll::Ready(None);
}
if let Some(item) = self.buffered_responses.pop_front() {
return Poll::Ready(Some(Ok(item)));
}
if let Some(mut fut) = self.continue_fut.take() {
match fut.poll_unpin(cx) {
Poll::Pending => {
self.continue_fut = Some(fut);
return Poll::Pending;
},
Poll::Ready(Err(e)) => {
if e.is_disconnected_will_reconnect() {
self.continue_fut = Some((self.continue_call)());
continue;
}
self.done = true;
return Poll::Ready(Some(Err(e)));
},
Poll::Ready(Ok(())) => {
},
}
}
let ev = match self.follow_event_stream.poll_next_unpin(cx) {
Poll::Pending => return Poll::Pending,
Poll::Ready(None) => return Poll::Ready(None),
Poll::Ready(Some(ev)) => ev,
};
match ev {
FollowEvent::OperationWaitingForContinue(id)
if id.operation_id == *self.operation_id =>
{
self.continue_fut = Some((self.continue_call)());
continue;
},
FollowEvent::OperationStorageDone(id) if id.operation_id == *self.operation_id => {
self.done = true;
return Poll::Ready(None);
},
FollowEvent::OperationStorageItems(items)
if items.operation_id == *self.operation_id =>
{
self.buffered_responses = items.items;
continue;
},
FollowEvent::OperationError(err) if err.operation_id == *self.operation_id => {
self.done = true;
return Poll::Ready(Some(Err(BackendError::Other(err.error))));
},
_ => {
continue;
},
}
}
}
}