use serde::{Deserialize, Serialize};
use std::{
collections::HashSet,
fmt,
iter::{Enumerate, FusedIterator},
mem::take,
ops::{Deref, DerefMut},
sync::Arc,
};
use tokio::sync::{RwLock, RwLockReadGuard, oneshot, watch};
use tracing::Instrument;
use super::{ChangeNotifier, ChangeSender, RecvError, SendError, default_on_err, send_event};
use crate::{exec, prelude::*};
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum VecEvent<T> {
Push(T),
Pop,
Insert(usize, T),
Set(usize, T),
Remove(usize),
SwapRemove(usize),
Fill(T),
Resize(usize, T),
Truncate(usize),
Retain(HashSet<usize>),
RetainNot(HashSet<usize>),
Clear,
ShrinkToFit,
Done,
#[serde(skip)]
InitialComplete,
}
pub struct ObservableVec<T, Codec = crate::codec::Default> {
v: Vec<T>,
tx: rch::broadcast::Sender<VecEvent<T>, Codec>,
change: ChangeSender,
on_err: Arc<dyn Fn(SendError) + Send + Sync>,
done: bool,
}
impl<T, Codec> fmt::Debug for ObservableVec<T, Codec>
where
T: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
self.v.fmt(f)
}
}
impl<T, Codec> From<Vec<T>> for ObservableVec<T, Codec>
where
T: Clone + RemoteSend,
Codec: crate::codec::Codec,
{
fn from(hs: Vec<T>) -> Self {
let (tx, _rx) = rch::broadcast::channel::<_, _, { rch::DEFAULT_BUFFER }>(1);
Self { v: hs, tx, change: ChangeSender::new(), on_err: Arc::new(default_on_err), done: false }
}
}
impl<T, Codec> From<ObservableVec<T, Codec>> for Vec<T> {
fn from(ohs: ObservableVec<T, Codec>) -> Self {
ohs.v
}
}
impl<T, Codec> Default for ObservableVec<T, Codec>
where
T: Clone + RemoteSend,
Codec: crate::codec::Codec,
{
fn default() -> Self {
Self::from(Vec::new())
}
}
impl<T, Codec> ObservableVec<T, Codec>
where
T: Clone + RemoteSend,
Codec: crate::codec::Codec,
{
pub fn new() -> Self {
Self::default()
}
pub fn set_error_handler<E>(&mut self, on_err: E)
where
E: Fn(SendError) + Send + Sync + 'static,
{
self.on_err = Arc::new(on_err);
}
pub fn subscribe(&self, buffer: usize) -> VecSubscription<T, Codec> {
VecSubscription::new(
VecInitialValue::new_value(self.v.clone()),
if self.done { None } else { Some(self.tx.subscribe(buffer)) },
)
}
pub fn subscribe_incremental(&self, buffer: usize) -> VecSubscription<T, Codec> {
VecSubscription::new(
VecInitialValue::new_incremental(self.v.clone(), self.on_err.clone()),
if self.done { None } else { Some(self.tx.subscribe(buffer)) },
)
}
pub fn subscriber_count(&self) -> usize {
self.tx.receiver_count()
}
pub fn notifier(&self) -> ChangeNotifier {
self.change.subscribe()
}
pub fn push(&mut self, value: T) {
self.assert_not_done();
self.change.notify();
send_event(&self.tx, &*self.on_err, VecEvent::Push(value.clone()));
self.v.push(value);
}
pub fn pop(&mut self) -> Option<T> {
self.assert_not_done();
match self.v.pop() {
Some(value) => {
self.change.notify();
send_event(&self.tx, &*self.on_err, VecEvent::Pop);
Some(value)
}
None => None,
}
}
pub fn get_mut(&mut self, index: usize) -> Option<RefMut<'_, T, Codec>> {
self.assert_not_done();
match self.v.get_mut(index) {
Some(value) => Some(RefMut {
index,
value,
changed: false,
tx: &self.tx,
change: &self.change,
on_err: &*self.on_err,
}),
None => None,
}
}
pub fn iter_mut(&mut self) -> IterMut<'_, T, Codec> {
self.assert_not_done();
IterMut {
inner: self.v.iter_mut().enumerate(),
tx: &self.tx,
change: &self.change,
on_err: &*self.on_err,
}
}
pub fn insert(&mut self, index: usize, value: T) {
self.assert_not_done();
let value_event = value.clone();
self.v.insert(index, value);
self.change.notify();
send_event(&self.tx, &*self.on_err, VecEvent::Insert(index, value_event));
}
pub fn remove(&mut self, index: usize) -> T {
self.assert_not_done();
let value = self.v.remove(index);
self.change.notify();
send_event(&self.tx, &*self.on_err, VecEvent::Remove(index));
value
}
pub fn swap_remove(&mut self, index: usize) -> T {
self.assert_not_done();
let value = self.v.swap_remove(index);
self.change.notify();
send_event(&self.tx, &*self.on_err, VecEvent::SwapRemove(index));
value
}
pub fn fill(&mut self, value: T) {
self.assert_not_done();
self.change.notify();
send_event(&self.tx, &*self.on_err, VecEvent::Fill(value.clone()));
self.v.fill(value);
}
pub fn resize(&mut self, new_len: usize, value: T) {
self.assert_not_done();
if new_len != self.v.len() {
self.change.notify();
send_event(&self.tx, &*self.on_err, VecEvent::Resize(new_len, value.clone()));
self.v.resize(new_len, value);
}
}
pub fn truncate(&mut self, new_len: usize) {
self.assert_not_done();
if new_len < self.len() {
self.change.notify();
send_event(&self.tx, &*self.on_err, VecEvent::Truncate(new_len));
self.v.truncate(new_len);
}
}
pub fn clear(&mut self) {
self.assert_not_done();
if !self.v.is_empty() {
self.v.clear();
self.change.notify();
send_event(&self.tx, &*self.on_err, VecEvent::Clear);
}
}
pub fn retain<F>(&mut self, mut f: F)
where
F: FnMut(&T) -> bool,
{
self.assert_not_done();
let mut keep = HashSet::new();
let mut remove = HashSet::new();
let mut pos = 0;
self.v.retain(|v| {
let keep_this = f(v);
if keep_this {
keep.insert(pos);
} else {
remove.insert(pos);
}
pos += 1;
keep_this
});
if !remove.is_empty() {
self.change.notify();
if keep.len() < remove.len() {
send_event(&self.tx, &*self.on_err, VecEvent::Retain(keep));
} else {
send_event(&self.tx, &*self.on_err, VecEvent::RetainNot(remove));
}
}
}
pub fn shrink_to_fit(&mut self) {
self.assert_not_done();
send_event(&self.tx, &*self.on_err, VecEvent::ShrinkToFit);
self.v.shrink_to_fit()
}
fn assert_not_done(&self) {
if self.done {
panic!("observable vector cannot be changed after done has been called");
}
}
pub fn done(&mut self) {
if !self.done {
send_event(&self.tx, &*self.on_err, VecEvent::Done);
self.done = true;
}
}
pub fn is_done(&self) -> bool {
self.done
}
pub fn into_inner(self) -> Vec<T> {
self.into()
}
}
impl<T, Codec> Deref for ObservableVec<T, Codec> {
type Target = Vec<T>;
fn deref(&self) -> &Self::Target {
&self.v
}
}
impl<T, Codec> Extend<T> for ObservableVec<T, Codec>
where
T: RemoteSend + Clone,
Codec: crate::codec::Codec,
{
fn extend<I: IntoIterator<Item = T>>(&mut self, iter: I) {
for value in iter {
self.push(value);
}
}
}
pub struct RefMut<'a, T, Codec>
where
T: Clone + RemoteSend,
Codec: crate::codec::Codec,
{
index: usize,
value: &'a mut T,
changed: bool,
tx: &'a rch::broadcast::Sender<VecEvent<T>, Codec>,
change: &'a ChangeSender,
on_err: &'a dyn Fn(SendError),
}
impl<T, Codec> Deref for RefMut<'_, T, Codec>
where
T: Clone + RemoteSend,
Codec: crate::codec::Codec,
{
type Target = T;
fn deref(&self) -> &Self::Target {
self.value
}
}
impl<T, Codec> DerefMut for RefMut<'_, T, Codec>
where
T: Clone + RemoteSend,
Codec: crate::codec::Codec,
{
fn deref_mut(&mut self) -> &mut Self::Target {
self.changed = true;
self.value
}
}
impl<T, Codec> Drop for RefMut<'_, T, Codec>
where
T: Clone + RemoteSend,
Codec: crate::codec::Codec,
{
fn drop(&mut self) {
if self.changed {
self.change.notify();
send_event(self.tx, self.on_err, VecEvent::Set(self.index, self.value.clone()));
}
}
}
pub struct IterMut<'a, T, Codec> {
inner: Enumerate<std::slice::IterMut<'a, T>>,
tx: &'a rch::broadcast::Sender<VecEvent<T>, Codec>,
change: &'a ChangeSender,
on_err: &'a dyn Fn(SendError),
}
impl<'a, T, Codec> Iterator for IterMut<'a, T, Codec>
where
T: Clone + RemoteSend,
Codec: crate::codec::Codec,
{
type Item = RefMut<'a, T, Codec>;
fn next(&mut self) -> Option<Self::Item> {
match self.inner.next() {
Some((index, value)) => Some(RefMut {
index,
value,
changed: false,
tx: self.tx,
change: self.change,
on_err: self.on_err,
}),
None => None,
}
}
fn size_hint(&self) -> (usize, Option<usize>) {
self.inner.size_hint()
}
}
impl<T, Codec> ExactSizeIterator for IterMut<'_, T, Codec>
where
T: Clone + RemoteSend,
Codec: crate::codec::Codec,
{
fn len(&self) -> usize {
self.inner.len()
}
}
impl<T, Codec> DoubleEndedIterator for IterMut<'_, T, Codec>
where
T: Clone + RemoteSend,
Codec: crate::codec::Codec,
{
fn next_back(&mut self) -> Option<Self::Item> {
match self.inner.next_back() {
Some((index, value)) => Some(RefMut {
index,
value,
changed: false,
tx: self.tx,
change: self.change,
on_err: self.on_err,
}),
None => None,
}
}
}
impl<T, Codec> FusedIterator for IterMut<'_, T, Codec>
where
T: Clone + RemoteSend,
Codec: crate::codec::Codec,
{
}
struct MirroredVecInner<T> {
v: Vec<T>,
complete: bool,
done: bool,
error: Option<RecvError>,
max_size: usize,
}
impl<T> MirroredVecInner<T>
where
T: Clone,
{
fn handle_event(&mut self, event: VecEvent<T>) -> Result<(), RecvError> {
match event {
VecEvent::InitialComplete => {
self.complete = true;
}
VecEvent::Push(v) => {
self.v.push(v);
if self.v.len() > self.max_size {
return Err(RecvError::MaxSizeExceeded(self.max_size));
}
}
VecEvent::Pop => {
self.v.pop();
}
VecEvent::Insert(i, v) => {
if i > self.v.len() {
return Err(RecvError::InvalidIndex(i));
}
self.v.insert(i, v);
}
VecEvent::Set(i, v) => {
if i >= self.v.len() {
return Err(RecvError::InvalidIndex(i));
}
self.v[i] = v;
}
VecEvent::Remove(i) => {
if i >= self.v.len() {
return Err(RecvError::InvalidIndex(i));
}
self.v.remove(i);
}
VecEvent::SwapRemove(i) => {
if i >= self.v.len() {
return Err(RecvError::InvalidIndex(i));
}
self.v.swap_remove(i);
}
VecEvent::Fill(v) => {
self.v.fill(v);
}
VecEvent::Resize(l, v) => {
self.v.resize(l, v);
}
VecEvent::Truncate(l) => {
self.v.truncate(l);
}
VecEvent::Retain(r) => {
let mut pos = 0;
self.v.retain(|_| {
let keep_this = r.contains(&pos);
pos += 1;
keep_this
});
}
VecEvent::RetainNot(nr) => {
let mut pos = 0;
self.v.retain(|_| {
let keep_this = !nr.contains(&pos);
pos += 1;
keep_this
});
}
VecEvent::Clear => {
self.v.clear();
}
VecEvent::ShrinkToFit => {
self.v.shrink_to_fit();
}
VecEvent::Done => {
self.done = true;
}
}
Ok(())
}
}
#[derive(Debug, Serialize, Deserialize)]
#[serde(bound(serialize = "T: RemoteSend, Codec: crate::codec::Codec"))]
#[serde(bound(deserialize = "T: RemoteSend, Codec: crate::codec::Codec"))]
enum VecInitialValue<T, Codec = crate::codec::Default> {
Value(Vec<T>),
Incremental {
len: usize,
rx: rch::mpsc::Receiver<T, Codec>,
},
}
impl<T, Codec> VecInitialValue<T, Codec>
where
T: RemoteSend + Clone,
Codec: crate::codec::Codec,
{
fn new_value(hs: Vec<T>) -> Self {
Self::Value(hs)
}
fn new_incremental(hs: Vec<T>, on_err: Arc<dyn Fn(SendError) + Send + Sync>) -> Self {
let (tx, rx) = rch::mpsc::channel(128);
let len = hs.len();
exec::spawn(
async move {
for v in hs.into_iter() {
match tx.send(v).await {
Ok(_) => (),
Err(err) if err.is_disconnected() => break,
Err(err) => match err.try_into() {
Ok(err) => (on_err)(err),
Err(_) => unreachable!(),
},
}
}
}
.in_current_span(),
);
Self::Incremental { len, rx }
}
}
#[derive(Debug, Serialize, Deserialize)]
#[serde(bound(serialize = "T: RemoteSend, Codec: crate::codec::Codec"))]
#[serde(bound(deserialize = "T: RemoteSend, Codec: crate::codec::Codec"))]
pub struct VecSubscription<T, Codec = crate::codec::Default> {
initial: VecInitialValue<T, Codec>,
#[serde(skip, default)]
complete: bool,
events: Option<rch::broadcast::Receiver<VecEvent<T>, Codec>>,
#[serde(skip, default)]
done: bool,
}
impl<T, Codec> VecSubscription<T, Codec>
where
T: RemoteSend + Clone,
Codec: crate::codec::Codec,
{
fn new(
initial: VecInitialValue<T, Codec>, events: Option<rch::broadcast::Receiver<VecEvent<T>, Codec>>,
) -> Self {
Self { initial, complete: false, events, done: false }
}
pub fn is_incremental(&self) -> bool {
matches!(self.initial, VecInitialValue::Incremental { .. })
}
pub fn is_complete(&self) -> bool {
self.complete
}
pub fn is_done(&self) -> bool {
self.events.is_none() || self.done
}
pub fn take_initial(&mut self) -> Option<Vec<T>> {
match &mut self.initial {
VecInitialValue::Value(value) if !self.complete => {
self.complete = true;
Some(take(value))
}
_ => None,
}
}
pub async fn recv(&mut self) -> Result<Option<VecEvent<T>>, RecvError> {
if !self.complete {
match &mut self.initial {
VecInitialValue::Incremental { len, rx } => {
if *len > 0 {
match rx.recv().await? {
Some(v) => {
*len -= 1;
return Ok(Some(VecEvent::Push(v)));
}
None => return Err(RecvError::Closed),
}
} else {
self.complete = true;
return Ok(Some(VecEvent::InitialComplete));
}
}
VecInitialValue::Value(_) => {
panic!("take_initial must be called before recv for non-incremental subscription");
}
}
}
if let Some(rx) = &mut self.events {
match rx.recv().await? {
VecEvent::Done => self.events = None,
evt => return Ok(Some(evt)),
}
}
if self.done {
Ok(None)
} else {
self.done = true;
Ok(Some(VecEvent::Done))
}
}
}
impl<T, Codec> VecSubscription<T, Codec>
where
T: RemoteSend + Clone + Sync,
Codec: crate::codec::Codec,
{
pub fn mirror(mut self, max_size: usize) -> MirroredVec<T, Codec> {
let (tx, _rx) = rch::broadcast::channel::<_, _, { rch::DEFAULT_BUFFER }>(1);
let (changed_tx, changed_rx) = watch::channel(());
let (dropped_tx, mut dropped_rx) = oneshot::channel();
let inner = Arc::new(RwLock::new(Some(MirroredVecInner {
v: self.take_initial().unwrap_or_default(),
complete: self.is_complete(),
done: self.is_done(),
error: None,
max_size,
})));
let inner_task = inner.clone();
let tx_send = tx.clone();
exec::spawn(
async move {
loop {
let event = tokio::select! {
event = self.recv() => event,
_ = &mut dropped_rx => return,
};
let mut inner = inner_task.write().await;
let inner = match inner.as_mut() {
Some(inner) => inner,
None => return,
};
changed_tx.send_replace(());
match event {
Ok(Some(event)) => {
if tx_send.receiver_count() > 0 {
let _ = tx_send.send(event.clone());
}
if let Err(err) = inner.handle_event(event) {
inner.error = Some(err);
return;
}
if inner.done {
break;
}
}
Ok(None) => break,
Err(err) => {
inner.error = Some(err);
return;
}
}
}
}
.in_current_span(),
);
MirroredVec { inner, tx, changed_rx, _dropped_tx: dropped_tx }
}
}
pub struct MirroredVec<T, Codec = crate::codec::Default> {
inner: Arc<RwLock<Option<MirroredVecInner<T>>>>,
tx: rch::broadcast::Sender<VecEvent<T>, Codec>,
changed_rx: watch::Receiver<()>,
_dropped_tx: oneshot::Sender<()>,
}
impl<T, Codec> fmt::Debug for MirroredVec<T, Codec> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("MirroredVec").finish()
}
}
impl<T, Codec> MirroredVec<T, Codec>
where
T: RemoteSend + Clone,
Codec: crate::codec::Codec,
{
pub async fn borrow(&self) -> Result<MirroredVecRef<'_, T>, RecvError> {
let inner = self.inner.read().await;
let inner = RwLockReadGuard::map(inner, |inner| inner.as_ref().unwrap());
match &inner.error {
None => Ok(MirroredVecRef(inner)),
Some(err) => Err(err.clone()),
}
}
pub async fn borrow_and_update(&mut self) -> Result<MirroredVecRef<'_, T>, RecvError> {
let inner = self.inner.read().await;
self.changed_rx.borrow_and_update();
let inner = RwLockReadGuard::map(inner, |inner| inner.as_ref().unwrap());
match &inner.error {
None => Ok(MirroredVecRef(inner)),
Some(err) => Err(err.clone()),
}
}
pub async fn detach(self) -> Vec<T> {
let mut inner = self.inner.write().await;
inner.take().unwrap().v
}
pub async fn changed(&mut self) {
let _ = self.changed_rx.changed().await;
}
pub async fn subscribe(&self, buffer: usize) -> Result<VecSubscription<T, Codec>, RecvError> {
let view = self.borrow().await?;
let initial = view.clone();
let events = if view.is_done() { None } else { Some(self.tx.subscribe(buffer)) };
Ok(VecSubscription::new(VecInitialValue::new_value(initial), events))
}
pub async fn subscribe_incremental(&self, buffer: usize) -> Result<VecSubscription<T, Codec>, RecvError> {
let view = self.borrow().await?;
let initial = view.clone();
let events = if view.is_done() { None } else { Some(self.tx.subscribe(buffer)) };
Ok(VecSubscription::new(VecInitialValue::new_incremental(initial, Arc::new(default_on_err)), events))
}
}
impl<T, Codec> Drop for MirroredVec<T, Codec> {
fn drop(&mut self) {
}
}
pub struct MirroredVecRef<'a, T>(RwLockReadGuard<'a, MirroredVecInner<T>>);
impl<T> MirroredVecRef<'_, T> {
pub fn is_complete(&self) -> bool {
self.0.complete
}
pub fn is_done(&self) -> bool {
self.0.done
}
}
impl<T> fmt::Debug for MirroredVecRef<'_, T>
where
T: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
self.0.v.fmt(f)
}
}
impl<T> Deref for MirroredVecRef<'_, T> {
type Target = Vec<T>;
fn deref(&self) -> &Self::Target {
&self.0.v
}
}
impl<T> Drop for MirroredVecRef<'_, T> {
fn drop(&mut self) {
}
}