#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[non_exhaustive]
pub enum ResyncMarker {
ResyncStart,
ResyncEnd,
}
#[derive(Debug, Clone, PartialEq, Eq)]
#[non_exhaustive]
pub enum ResyncedEvent<T> {
Event(T),
Resynced(T),
Marker(ResyncMarker),
}
impl<T> ResyncedEvent<T> {
pub fn is_resync_start(&self) -> bool {
matches!(self, Self::Marker(ResyncMarker::ResyncStart))
}
pub fn is_resync_end(&self) -> bool {
matches!(self, Self::Marker(ResyncMarker::ResyncEnd))
}
pub fn into_inner(self) -> Option<T> {
match self {
Self::Event(t) | Self::Resynced(t) => Some(t),
Self::Marker(_) => None,
}
}
pub fn as_inner(&self) -> Option<&T> {
match self {
Self::Event(t) | Self::Resynced(t) => Some(t),
Self::Marker(_) => None,
}
}
}
use std::collections::VecDeque;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use tokio_stream::Stream;
pub type ConnectionFuture<P> =
Pin<Box<dyn Future<Output = crate::Result<crate::Connection<P>>> + Send + 'static>>;
pub type ConnectionFactory<P> =
Arc<dyn Fn() -> ConnectionFuture<P> + Send + Sync + 'static>;
enum ResyncState<'a, T> {
Forwarding,
RunningSnapshot(Pin<Box<dyn Future<Output = crate::Result<Vec<T>>> + Send + 'a>>),
Replaying {
items: VecDeque<T>,
did_emit_start: bool,
},
Done,
#[doc(hidden)]
_Phantom(std::marker::PhantomData<&'a ()>),
}
#[must_use = "streams do nothing unless polled"]
#[non_exhaustive]
pub struct ResyncStream<'a, S, T, F>
where
S: Stream<Item = crate::Result<T>>,
F: FnMut() -> Pin<Box<dyn Future<Output = crate::Result<Vec<T>>> + Send + 'a>>,
{
inner: S,
resync: F,
state: ResyncState<'a, T>,
}
impl<'a, S, T, F> Stream for ResyncStream<'a, S, T, F>
where
S: Stream<Item = crate::Result<T>> + Unpin,
F: FnMut() -> Pin<Box<dyn Future<Output = crate::Result<Vec<T>>> + Send + 'a>> + Unpin,
T: Unpin,
{
type Item = crate::Result<ResyncedEvent<T>>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
loop {
let state = std::mem::replace(&mut this.state, ResyncState::Done);
match state {
ResyncState::Done => return Poll::Ready(None),
ResyncState::_Phantom(_) => return Poll::Ready(None),
ResyncState::Forwarding => {
match Pin::new(&mut this.inner).poll_next(cx) {
Poll::Ready(Some(Ok(item))) => {
this.state = ResyncState::Forwarding;
return Poll::Ready(Some(Ok(ResyncedEvent::Event(item))));
}
Poll::Ready(Some(Err(e))) if e.is_no_buffer_space() => {
let fut = (this.resync)();
this.state = ResyncState::RunningSnapshot(fut);
}
Poll::Ready(Some(Err(e))) => {
this.state = ResyncState::Done;
return Poll::Ready(Some(Err(e)));
}
Poll::Ready(None) => {
this.state = ResyncState::Done;
return Poll::Ready(None);
}
Poll::Pending => {
this.state = ResyncState::Forwarding;
return Poll::Pending;
}
}
}
ResyncState::RunningSnapshot(mut fut) => {
match fut.as_mut().poll(cx) {
Poll::Ready(Ok(items)) => {
this.state = ResyncState::Replaying {
items: items.into(),
did_emit_start: false,
};
}
Poll::Ready(Err(e)) => {
this.state = ResyncState::Done;
return Poll::Ready(Some(Err(e)));
}
Poll::Pending => {
this.state = ResyncState::RunningSnapshot(fut);
return Poll::Pending;
}
}
}
ResyncState::Replaying {
mut items,
did_emit_start,
} => {
if !did_emit_start {
this.state = ResyncState::Replaying {
items,
did_emit_start: true,
};
return Poll::Ready(Some(Ok(ResyncedEvent::Marker(
ResyncMarker::ResyncStart,
))));
}
if let Some(item) = items.pop_front() {
this.state = ResyncState::Replaying {
items,
did_emit_start: true,
};
return Poll::Ready(Some(Ok(ResyncedEvent::Resynced(item))));
}
this.state = ResyncState::Forwarding;
return Poll::Ready(Some(Ok(ResyncedEvent::Marker(
ResyncMarker::ResyncEnd,
))));
}
}
}
}
}
pub fn events_with_resync<'a, S, T, F>(
events: S,
resync: F,
) -> ResyncStream<'a, S, T, F>
where
S: Stream<Item = crate::Result<T>> + Unpin,
F: FnMut() -> Pin<Box<dyn Future<Output = crate::Result<Vec<T>>> + Send + 'a>> + Unpin,
T: Unpin,
{
ResyncStream {
inner: events,
resync,
state: ResyncState::Forwarding,
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn marker_predicates() {
let start: ResyncedEvent<u32> = ResyncedEvent::Marker(ResyncMarker::ResyncStart);
let end: ResyncedEvent<u32> = ResyncedEvent::Marker(ResyncMarker::ResyncEnd);
let event = ResyncedEvent::Event(42u32);
let resynced = ResyncedEvent::Resynced(7u32);
assert!(start.is_resync_start());
assert!(!start.is_resync_end());
assert!(end.is_resync_end());
assert!(!end.is_resync_start());
assert!(!event.is_resync_start());
assert!(!resynced.is_resync_end());
}
#[test]
fn inner_extraction_skips_markers() {
let start: ResyncedEvent<u32> = ResyncedEvent::Marker(ResyncMarker::ResyncStart);
let event = ResyncedEvent::Event(42u32);
let resynced = ResyncedEvent::Resynced(7u32);
assert_eq!(start.clone().into_inner(), None);
assert_eq!(event.clone().into_inner(), Some(42));
assert_eq!(resynced.clone().into_inner(), Some(7));
assert_eq!(start.as_inner(), None);
assert_eq!(event.as_inner(), Some(&42));
assert_eq!(resynced.as_inner(), Some(&7));
}
use tokio_stream::StreamExt;
struct ScriptedStream {
items: VecDeque<crate::Result<u32>>,
}
impl Stream for ScriptedStream {
type Item = crate::Result<u32>;
fn poll_next(
mut self: Pin<&mut Self>,
_cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
Poll::Ready(self.items.pop_front())
}
}
fn enobufs() -> crate::Error {
crate::Error::from_errno(-libc::ENOBUFS)
}
#[tokio::test]
async fn resync_stream_passes_events_through() {
let s = ScriptedStream {
items: vec![Ok(1u32), Ok(2), Ok(3)].into(),
};
let mut stream = events_with_resync(s, || {
Box::pin(async move { Ok::<Vec<u32>, crate::Error>(vec![]) })
});
let mut got = Vec::new();
while let Some(item) = stream.next().await {
got.push(item.unwrap());
}
assert_eq!(got.len(), 3);
assert!(matches!(got[0], ResyncedEvent::Event(1)));
assert!(matches!(got[1], ResyncedEvent::Event(2)));
assert!(matches!(got[2], ResyncedEvent::Event(3)));
}
#[tokio::test]
async fn resync_stream_handles_enobufs_with_replay() {
let s = ScriptedStream {
items: vec![Ok(1u32), Err(enobufs()), Ok(99)].into(),
};
let mut stream = events_with_resync(s, || {
Box::pin(async move { Ok::<Vec<u32>, crate::Error>(vec![10, 20, 30]) })
});
let mut got = Vec::new();
while let Some(item) = stream.next().await {
got.push(item.unwrap());
}
assert_eq!(got.len(), 7);
assert!(matches!(got[0], ResyncedEvent::Event(1)));
assert!(got[1].is_resync_start());
assert!(matches!(got[2], ResyncedEvent::Resynced(10)));
assert!(matches!(got[3], ResyncedEvent::Resynced(20)));
assert!(matches!(got[4], ResyncedEvent::Resynced(30)));
assert!(got[5].is_resync_end());
assert!(matches!(got[6], ResyncedEvent::Event(99)));
}
#[tokio::test]
async fn resync_stream_replay_with_empty_snapshot_still_emits_markers() {
let s = ScriptedStream {
items: vec![Err(enobufs()), Ok(1u32)].into(),
};
let mut stream = events_with_resync(s, || {
Box::pin(async move { Ok::<Vec<u32>, crate::Error>(vec![]) })
});
let mut got = Vec::new();
while let Some(item) = stream.next().await {
got.push(item.unwrap());
}
assert_eq!(got.len(), 3);
assert!(got[0].is_resync_start());
assert!(got[1].is_resync_end());
assert!(matches!(got[2], ResyncedEvent::Event(1)));
}
#[tokio::test]
async fn resync_stream_propagates_non_enobufs_error_and_fuses() {
let s = ScriptedStream {
items: vec![
Ok(1u32),
Err(crate::Error::from_errno(-libc::EPERM)),
Ok(99), ]
.into(),
};
let mut stream = events_with_resync(s, || {
Box::pin(async move { Ok::<Vec<u32>, crate::Error>(vec![]) })
});
let mut results = Vec::new();
while let Some(item) = stream.next().await {
results.push(item);
}
assert_eq!(results.len(), 2);
assert!(matches!(results[0].as_ref().unwrap(), ResyncedEvent::Event(1)));
assert!(results[1].as_ref().unwrap_err().is_permission_denied());
}
#[tokio::test]
async fn resync_stream_propagates_snapshot_failure_and_fuses() {
let s = ScriptedStream {
items: vec![Err(enobufs())].into(),
};
let mut stream = events_with_resync(s, || {
Box::pin(async move {
Err::<Vec<u32>, crate::Error>(crate::Error::from_errno(-libc::ENODEV))
})
});
let mut results = Vec::new();
while let Some(item) = stream.next().await {
results.push(item);
}
assert_eq!(results.len(), 1);
assert!(results[0].as_ref().unwrap_err().errno() == Some(libc::ENODEV));
}
#[tokio::test]
async fn resync_stream_handles_multiple_enobufs_recoveries() {
let s = ScriptedStream {
items: vec![
Ok(1u32),
Err(enobufs()),
Ok(2),
Err(enobufs()),
Ok(3),
]
.into(),
};
let mut call_count = 0;
let mut stream = events_with_resync(s, move || {
call_count += 1;
let count = call_count;
Box::pin(async move { Ok::<Vec<u32>, crate::Error>(vec![count * 100]) })
});
let mut got = Vec::new();
while let Some(item) = stream.next().await {
got.push(item.unwrap());
}
assert_eq!(got.len(), 9);
assert!(matches!(got[0], ResyncedEvent::Event(1)));
assert!(got[1].is_resync_start());
assert!(matches!(got[2], ResyncedEvent::Resynced(100)));
assert!(got[3].is_resync_end());
assert!(matches!(got[4], ResyncedEvent::Event(2)));
assert!(got[5].is_resync_start());
assert!(matches!(got[6], ResyncedEvent::Resynced(200)));
assert!(got[7].is_resync_end());
assert!(matches!(got[8], ResyncedEvent::Event(3)));
}
}