use serde::{Deserialize, Serialize};
use std::{
collections::{HashSet, VecDeque},
fmt,
iter::{Enumerate, FusedIterator},
mem::take,
ops::{Deref, DerefMut},
sync::Arc,
};
use tokio::sync::{RwLock, RwLockReadGuard, oneshot, watch};
use tracing::Instrument;
use super::{ChangeNotifier, ChangeSender, RecvError, SendError, default_on_err, send_event};
use crate::{exec, prelude::*};
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum VecDequeEvent<T> {
PushBack(T),
PushFront(T),
PopBack,
PopFront,
Insert(usize, T),
Set(usize, T),
Remove(usize),
SwapRemoveBack(usize),
SwapRemoveFront(usize),
Resize(usize, T),
Truncate(usize),
Retain(HashSet<usize>),
RetainNot(HashSet<usize>),
Clear,
ShrinkToFit,
Done,
#[serde(skip)]
InitialComplete,
}
pub struct ObservableVecDeque<T, Codec = crate::codec::Default> {
v: VecDeque<T>,
tx: rch::broadcast::Sender<VecDequeEvent<T>, Codec>,
change: ChangeSender,
on_err: Arc<dyn Fn(SendError) + Send + Sync>,
done: bool,
}
impl<T, Codec> fmt::Debug for ObservableVecDeque<T, Codec>
where
T: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
self.v.fmt(f)
}
}
impl<T, Codec> From<VecDeque<T>> for ObservableVecDeque<T, Codec>
where
T: Clone + RemoteSend,
Codec: crate::codec::Codec,
{
fn from(v: VecDeque<T>) -> Self {
let (tx, _rx) = rch::broadcast::channel::<_, _, { rch::DEFAULT_BUFFER }>(1);
Self { v, tx, change: ChangeSender::new(), on_err: Arc::new(default_on_err), done: false }
}
}
impl<T, Codec> From<ObservableVecDeque<T, Codec>> for VecDeque<T> {
fn from(ovd: ObservableVecDeque<T, Codec>) -> Self {
ovd.v
}
}
impl<T, Codec> Default for ObservableVecDeque<T, Codec>
where
T: Clone + RemoteSend,
Codec: crate::codec::Codec,
{
fn default() -> Self {
Self::from(VecDeque::new())
}
}
impl<T, Codec> ObservableVecDeque<T, Codec>
where
T: Clone + RemoteSend,
Codec: crate::codec::Codec,
{
pub fn new() -> Self {
Self::default()
}
pub fn with_capacity(capacity: usize) -> Self {
Self::from(VecDeque::with_capacity(capacity))
}
pub fn set_error_handler<E>(&mut self, on_err: E)
where
E: Fn(SendError) + Send + Sync + 'static,
{
self.on_err = Arc::new(on_err);
}
pub fn subscribe(&self, buffer: usize) -> VecDequeSubscription<T, Codec> {
VecDequeSubscription::new(
VecDequeInitialValue::new_value(self.v.clone()),
if self.done { None } else { Some(self.tx.subscribe(buffer)) },
)
}
pub fn subscribe_incremental(&self, buffer: usize) -> VecDequeSubscription<T, Codec> {
VecDequeSubscription::new(
VecDequeInitialValue::new_incremental(self.v.clone(), self.on_err.clone()),
if self.done { None } else { Some(self.tx.subscribe(buffer)) },
)
}
pub fn subscriber_count(&self) -> usize {
self.tx.receiver_count()
}
pub fn notifier(&self) -> ChangeNotifier {
self.change.subscribe()
}
pub fn push_back(&mut self, value: T) {
self.assert_not_done();
self.change.notify();
send_event(&self.tx, &*self.on_err, VecDequeEvent::PushBack(value.clone()));
self.v.push_back(value);
}
pub fn push_front(&mut self, value: T) {
self.assert_not_done();
self.change.notify();
send_event(&self.tx, &*self.on_err, VecDequeEvent::PushFront(value.clone()));
self.v.push_front(value);
}
pub fn pop_back(&mut self) -> Option<T> {
self.assert_not_done();
match self.v.pop_back() {
Some(value) => {
self.change.notify();
send_event(&self.tx, &*self.on_err, VecDequeEvent::PopBack);
Some(value)
}
None => None,
}
}
pub fn pop_front(&mut self) -> Option<T> {
self.assert_not_done();
match self.v.pop_front() {
Some(value) => {
self.change.notify();
send_event(&self.tx, &*self.on_err, VecDequeEvent::PopFront);
Some(value)
}
None => None,
}
}
pub fn get_mut(&mut self, index: usize) -> Option<RefMut<'_, T, Codec>> {
self.assert_not_done();
match self.v.get_mut(index) {
Some(value) => Some(RefMut {
index,
value,
changed: false,
tx: &self.tx,
change: &self.change,
on_err: &*self.on_err,
}),
None => None,
}
}
pub fn iter_mut(&mut self) -> IterMut<'_, T, Codec> {
self.assert_not_done();
IterMut {
inner: self.v.iter_mut().enumerate(),
tx: &self.tx,
change: &self.change,
on_err: &*self.on_err,
}
}
pub fn insert(&mut self, index: usize, value: T) {
self.assert_not_done();
let value_event = value.clone();
self.v.insert(index, value);
self.change.notify();
send_event(&self.tx, &*self.on_err, VecDequeEvent::Insert(index, value_event));
}
pub fn remove(&mut self, index: usize) -> Option<T> {
self.assert_not_done();
let value = self.v.remove(index);
if value.is_some() {
self.change.notify();
send_event(&self.tx, &*self.on_err, VecDequeEvent::Remove(index));
}
value
}
pub fn swap_remove_back(&mut self, index: usize) -> Option<T> {
self.assert_not_done();
let value = self.v.swap_remove_back(index);
if value.is_some() {
self.change.notify();
send_event(&self.tx, &*self.on_err, VecDequeEvent::SwapRemoveBack(index));
}
value
}
pub fn swap_remove_front(&mut self, index: usize) -> Option<T> {
self.assert_not_done();
let value = self.v.swap_remove_front(index);
if value.is_some() {
self.change.notify();
send_event(&self.tx, &*self.on_err, VecDequeEvent::SwapRemoveFront(index));
}
value
}
pub fn resize(&mut self, new_len: usize, value: T) {
self.assert_not_done();
if new_len != self.v.len() {
self.change.notify();
send_event(&self.tx, &*self.on_err, VecDequeEvent::Resize(new_len, value.clone()));
self.v.resize(new_len, value);
}
}
pub fn truncate(&mut self, new_len: usize) {
self.assert_not_done();
if new_len < self.len() {
self.change.notify();
send_event(&self.tx, &*self.on_err, VecDequeEvent::Truncate(new_len));
self.v.truncate(new_len);
}
}
pub fn clear(&mut self) {
self.assert_not_done();
if !self.v.is_empty() {
self.v.clear();
self.change.notify();
send_event(&self.tx, &*self.on_err, VecDequeEvent::Clear);
}
}
pub fn retain<F>(&mut self, mut f: F)
where
F: FnMut(&T) -> bool,
{
self.assert_not_done();
let mut keep = HashSet::new();
let mut remove = HashSet::new();
let mut pos = 0;
self.v.retain(|v| {
let keep_this = f(v);
if keep_this {
keep.insert(pos);
} else {
remove.insert(pos);
}
pos += 1;
keep_this
});
if !remove.is_empty() {
self.change.notify();
if keep.len() < remove.len() {
send_event(&self.tx, &*self.on_err, VecDequeEvent::Retain(keep));
} else {
send_event(&self.tx, &*self.on_err, VecDequeEvent::RetainNot(remove));
}
}
}
pub fn shrink_to_fit(&mut self) {
self.assert_not_done();
send_event(&self.tx, &*self.on_err, VecDequeEvent::ShrinkToFit);
self.v.shrink_to_fit()
}
fn assert_not_done(&self) {
if self.done {
panic!("observable VecDeque cannot be changed after done has been called");
}
}
pub fn done(&mut self) {
if !self.done {
send_event(&self.tx, &*self.on_err, VecDequeEvent::Done);
self.done = true;
}
}
pub fn is_done(&self) -> bool {
self.done
}
pub fn into_inner(self) -> VecDeque<T> {
self.into()
}
}
impl<T, Codec> Deref for ObservableVecDeque<T, Codec> {
type Target = VecDeque<T>;
fn deref(&self) -> &Self::Target {
&self.v
}
}
impl<T, Codec> Extend<T> for ObservableVecDeque<T, Codec>
where
T: RemoteSend + Clone,
Codec: crate::codec::Codec,
{
fn extend<I: IntoIterator<Item = T>>(&mut self, iter: I) {
for value in iter {
self.push_back(value);
}
}
}
pub struct RefMut<'a, T, Codec>
where
T: Clone + RemoteSend,
Codec: crate::codec::Codec,
{
index: usize,
value: &'a mut T,
changed: bool,
tx: &'a rch::broadcast::Sender<VecDequeEvent<T>, Codec>,
change: &'a ChangeSender,
on_err: &'a dyn Fn(SendError),
}
impl<T, Codec> Deref for RefMut<'_, T, Codec>
where
T: Clone + RemoteSend,
Codec: crate::codec::Codec,
{
type Target = T;
fn deref(&self) -> &Self::Target {
self.value
}
}
impl<T, Codec> DerefMut for RefMut<'_, T, Codec>
where
T: Clone + RemoteSend,
Codec: crate::codec::Codec,
{
fn deref_mut(&mut self) -> &mut Self::Target {
self.changed = true;
self.value
}
}
impl<T, Codec> Drop for RefMut<'_, T, Codec>
where
T: Clone + RemoteSend,
Codec: crate::codec::Codec,
{
fn drop(&mut self) {
if self.changed {
self.change.notify();
send_event(self.tx, self.on_err, VecDequeEvent::Set(self.index, self.value.clone()));
}
}
}
pub struct IterMut<'a, T, Codec> {
inner: Enumerate<std::collections::vec_deque::IterMut<'a, T>>,
tx: &'a rch::broadcast::Sender<VecDequeEvent<T>, Codec>,
change: &'a ChangeSender,
on_err: &'a dyn Fn(SendError),
}
impl<'a, T, Codec> Iterator for IterMut<'a, T, Codec>
where
T: Clone + RemoteSend,
Codec: crate::codec::Codec,
{
type Item = RefMut<'a, T, Codec>;
fn next(&mut self) -> Option<Self::Item> {
match self.inner.next() {
Some((index, value)) => Some(RefMut {
index,
value,
changed: false,
tx: self.tx,
change: self.change,
on_err: self.on_err,
}),
None => None,
}
}
fn size_hint(&self) -> (usize, Option<usize>) {
self.inner.size_hint()
}
}
impl<T, Codec> ExactSizeIterator for IterMut<'_, T, Codec>
where
T: Clone + RemoteSend,
Codec: crate::codec::Codec,
{
fn len(&self) -> usize {
self.inner.len()
}
}
impl<T, Codec> DoubleEndedIterator for IterMut<'_, T, Codec>
where
T: Clone + RemoteSend,
Codec: crate::codec::Codec,
{
fn next_back(&mut self) -> Option<Self::Item> {
match self.inner.next_back() {
Some((index, value)) => Some(RefMut {
index,
value,
changed: false,
tx: self.tx,
change: self.change,
on_err: self.on_err,
}),
None => None,
}
}
}
impl<T, Codec> FusedIterator for IterMut<'_, T, Codec>
where
T: Clone + RemoteSend,
Codec: crate::codec::Codec,
{
}
struct MirroredVecDequeInner<T> {
v: VecDeque<T>,
complete: bool,
done: bool,
error: Option<RecvError>,
max_size: usize,
}
impl<T> MirroredVecDequeInner<T>
where
T: Clone,
{
fn handle_event(&mut self, event: VecDequeEvent<T>) -> Result<(), RecvError> {
match event {
VecDequeEvent::InitialComplete => {
self.complete = true;
}
VecDequeEvent::PushBack(v) => {
self.v.push_back(v);
if self.v.len() > self.max_size {
return Err(RecvError::MaxSizeExceeded(self.max_size));
}
}
VecDequeEvent::PushFront(v) => {
self.v.push_front(v);
if self.v.len() > self.max_size {
return Err(RecvError::MaxSizeExceeded(self.max_size));
}
}
VecDequeEvent::PopBack => {
self.v.pop_back();
}
VecDequeEvent::PopFront => {
self.v.pop_front();
}
VecDequeEvent::Insert(i, v) => {
if i > self.v.len() {
return Err(RecvError::InvalidIndex(i));
}
self.v.insert(i, v);
}
VecDequeEvent::Set(i, v) => {
if i >= self.v.len() {
return Err(RecvError::InvalidIndex(i));
}
self.v[i] = v;
}
VecDequeEvent::Remove(i) => {
if i >= self.v.len() {
return Err(RecvError::InvalidIndex(i));
}
self.v.remove(i);
}
VecDequeEvent::SwapRemoveBack(i) => {
if i >= self.v.len() {
return Err(RecvError::InvalidIndex(i));
}
self.v.swap_remove_back(i);
}
VecDequeEvent::SwapRemoveFront(i) => {
if i >= self.v.len() {
return Err(RecvError::InvalidIndex(i));
}
self.v.swap_remove_front(i);
}
VecDequeEvent::Resize(l, v) => {
self.v.resize(l, v);
}
VecDequeEvent::Truncate(l) => {
self.v.truncate(l);
}
VecDequeEvent::Retain(r) => {
let mut pos = 0;
self.v.retain(|_| {
let keep_this = r.contains(&pos);
pos += 1;
keep_this
});
}
VecDequeEvent::RetainNot(nr) => {
let mut pos = 0;
self.v.retain(|_| {
let keep_this = !nr.contains(&pos);
pos += 1;
keep_this
});
}
VecDequeEvent::Clear => {
self.v.clear();
}
VecDequeEvent::ShrinkToFit => {
self.v.shrink_to_fit();
}
VecDequeEvent::Done => {
self.done = true;
}
}
Ok(())
}
}
#[derive(Debug, Serialize, Deserialize)]
#[serde(bound(serialize = "T: RemoteSend, Codec: crate::codec::Codec"))]
#[serde(bound(deserialize = "T: RemoteSend, Codec: crate::codec::Codec"))]
enum VecDequeInitialValue<T, Codec = crate::codec::Default> {
Value(VecDeque<T>),
Incremental {
len: usize,
rx: rch::mpsc::Receiver<T, Codec>,
},
}
impl<T, Codec> VecDequeInitialValue<T, Codec>
where
T: RemoteSend + Clone,
Codec: crate::codec::Codec,
{
fn new_value(v: VecDeque<T>) -> Self {
Self::Value(v)
}
fn new_incremental(v: VecDeque<T>, on_err: Arc<dyn Fn(SendError) + Send + Sync>) -> Self {
let (tx, rx) = rch::mpsc::channel(128);
let len = v.len();
exec::spawn(
async move {
for item in v.into_iter() {
match tx.send(item).await {
Ok(_) => (),
Err(err) if err.is_disconnected() => break,
Err(err) => match err.try_into() {
Ok(err) => (on_err)(err),
Err(_) => unreachable!(),
},
}
}
}
.in_current_span(),
);
Self::Incremental { len, rx }
}
}
#[derive(Debug, Serialize, Deserialize)]
#[serde(bound(serialize = "T: RemoteSend, Codec: crate::codec::Codec"))]
#[serde(bound(deserialize = "T: RemoteSend, Codec: crate::codec::Codec"))]
pub struct VecDequeSubscription<T, Codec = crate::codec::Default> {
initial: VecDequeInitialValue<T, Codec>,
#[serde(skip, default)]
complete: bool,
events: Option<rch::broadcast::Receiver<VecDequeEvent<T>, Codec>>,
#[serde(skip, default)]
done: bool,
}
impl<T, Codec> VecDequeSubscription<T, Codec>
where
T: RemoteSend + Clone,
Codec: crate::codec::Codec,
{
fn new(
initial: VecDequeInitialValue<T, Codec>,
events: Option<rch::broadcast::Receiver<VecDequeEvent<T>, Codec>>,
) -> Self {
Self { initial, complete: false, events, done: false }
}
pub fn is_incremental(&self) -> bool {
matches!(self.initial, VecDequeInitialValue::Incremental { .. })
}
pub fn is_complete(&self) -> bool {
self.complete
}
pub fn is_done(&self) -> bool {
self.events.is_none() || self.done
}
pub fn take_initial(&mut self) -> Option<VecDeque<T>> {
match &mut self.initial {
VecDequeInitialValue::Value(value) if !self.complete => {
self.complete = true;
Some(take(value))
}
_ => None,
}
}
pub async fn recv(&mut self) -> Result<Option<VecDequeEvent<T>>, RecvError> {
if !self.complete {
match &mut self.initial {
VecDequeInitialValue::Incremental { len, rx } => {
if *len > 0 {
match rx.recv().await? {
Some(v) => {
*len -= 1;
return Ok(Some(VecDequeEvent::PushBack(v)));
}
None => return Err(RecvError::Closed),
}
} else {
self.complete = true;
return Ok(Some(VecDequeEvent::InitialComplete));
}
}
VecDequeInitialValue::Value(_) => {
panic!("take_initial must be called before recv for non-incremental subscription");
}
}
}
if let Some(rx) = &mut self.events {
match rx.recv().await? {
VecDequeEvent::Done => self.events = None,
evt => return Ok(Some(evt)),
}
}
if self.done {
Ok(None)
} else {
self.done = true;
Ok(Some(VecDequeEvent::Done))
}
}
}
impl<T, Codec> VecDequeSubscription<T, Codec>
where
T: RemoteSend + Clone + Sync,
Codec: crate::codec::Codec,
{
pub fn mirror(mut self, max_size: usize) -> MirroredVecDeque<T, Codec> {
let (tx, _rx) = rch::broadcast::channel::<_, _, { rch::DEFAULT_BUFFER }>(1);
let (changed_tx, changed_rx) = watch::channel(());
let (dropped_tx, mut dropped_rx) = oneshot::channel();
let inner = Arc::new(RwLock::new(Some(MirroredVecDequeInner {
v: self.take_initial().unwrap_or_default(),
complete: self.is_complete(),
done: self.is_done(),
error: None,
max_size,
})));
let inner_task = inner.clone();
let tx_send = tx.clone();
exec::spawn(
async move {
loop {
let event = tokio::select! {
event = self.recv() => event,
_ = &mut dropped_rx => return,
};
let mut inner = inner_task.write().await;
let inner = match inner.as_mut() {
Some(inner) => inner,
None => return,
};
changed_tx.send_replace(());
match event {
Ok(Some(event)) => {
if tx_send.receiver_count() > 0 {
let _ = tx_send.send(event.clone());
}
if let Err(err) = inner.handle_event(event) {
inner.error = Some(err);
return;
}
if inner.done {
break;
}
}
Ok(None) => break,
Err(err) => {
inner.error = Some(err);
return;
}
}
}
}
.in_current_span(),
);
MirroredVecDeque { inner, tx, changed_rx, _dropped_tx: dropped_tx }
}
}
pub struct MirroredVecDeque<T, Codec = crate::codec::Default> {
inner: Arc<RwLock<Option<MirroredVecDequeInner<T>>>>,
tx: rch::broadcast::Sender<VecDequeEvent<T>, Codec>,
changed_rx: watch::Receiver<()>,
_dropped_tx: oneshot::Sender<()>,
}
impl<T, Codec> fmt::Debug for MirroredVecDeque<T, Codec> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("MirroredVecDeque").finish()
}
}
impl<T, Codec> MirroredVecDeque<T, Codec>
where
T: RemoteSend + Clone,
Codec: crate::codec::Codec,
{
pub async fn borrow(&self) -> Result<MirroredVecDequeRef<'_, T>, RecvError> {
let inner = self.inner.read().await;
let inner = RwLockReadGuard::map(inner, |inner| inner.as_ref().unwrap());
match &inner.error {
None => Ok(MirroredVecDequeRef(inner)),
Some(err) => Err(err.clone()),
}
}
pub async fn borrow_and_update(&mut self) -> Result<MirroredVecDequeRef<'_, T>, RecvError> {
let inner = self.inner.read().await;
self.changed_rx.borrow_and_update();
let inner = RwLockReadGuard::map(inner, |inner| inner.as_ref().unwrap());
match &inner.error {
None => Ok(MirroredVecDequeRef(inner)),
Some(err) => Err(err.clone()),
}
}
pub async fn detach(self) -> VecDeque<T> {
let mut inner = self.inner.write().await;
inner.take().unwrap().v
}
pub async fn changed(&mut self) {
let _ = self.changed_rx.changed().await;
}
pub async fn subscribe(&self, buffer: usize) -> Result<VecDequeSubscription<T, Codec>, RecvError> {
let view = self.borrow().await?;
let initial = view.clone();
let events = if view.is_done() { None } else { Some(self.tx.subscribe(buffer)) };
Ok(VecDequeSubscription::new(VecDequeInitialValue::new_value(initial), events))
}
pub async fn subscribe_incremental(
&self, buffer: usize,
) -> Result<VecDequeSubscription<T, Codec>, RecvError> {
let view = self.borrow().await?;
let initial = view.clone();
let events = if view.is_done() { None } else { Some(self.tx.subscribe(buffer)) };
Ok(VecDequeSubscription::new(
VecDequeInitialValue::new_incremental(initial, Arc::new(default_on_err)),
events,
))
}
}
impl<T, Codec> Drop for MirroredVecDeque<T, Codec> {
fn drop(&mut self) {
}
}
pub struct MirroredVecDequeRef<'a, T>(RwLockReadGuard<'a, MirroredVecDequeInner<T>>);
impl<T> MirroredVecDequeRef<'_, T> {
pub fn is_complete(&self) -> bool {
self.0.complete
}
pub fn is_done(&self) -> bool {
self.0.done
}
}
impl<T> fmt::Debug for MirroredVecDequeRef<'_, T>
where
T: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
self.0.v.fmt(f)
}
}
impl<T> Deref for MirroredVecDequeRef<'_, T> {
type Target = VecDeque<T>;
fn deref(&self) -> &Self::Target {
&self.0.v
}
}
impl<T> Drop for MirroredVecDequeRef<'_, T> {
fn drop(&mut self) {
}
}