use std::{
fmt, ops,
pin::Pin,
task::{Context, Poll},
};
use futures_core::Stream;
use im::Vector;
use tokio::sync::broadcast::{self, Sender};
use tokio_stream::wrappers::{errors::BroadcastStreamRecvError, BroadcastStream};
pub struct ObservableVector<T: Clone> {
values: Vector<T>,
sender: Sender<BroadcastMessage<T>>,
}
impl<T: Clone + Send + Sync + 'static> ObservableVector<T> {
pub fn new() -> Self {
Self::with_capacity(16)
}
pub fn with_capacity(capacity: usize) -> Self {
let (sender, _) = broadcast::channel(capacity);
Self { values: Vector::new(), sender }
}
pub fn into_inner(self) -> Vector<T> {
self.values
}
pub fn subscribe(&self) -> VectorSubscriber<T> {
let stream = BroadcastStream::new(self.sender.subscribe());
VectorSubscriber::new(stream)
}
pub fn append(&mut self, values: Vector<T>) {
self.values.append(values.clone());
self.broadcast_diff(VectorDiff::Append { values });
}
pub fn clear(&mut self) {
self.values.clear();
self.broadcast_diff(VectorDiff::Clear);
}
pub fn push_front(&mut self, value: T) {
self.values.push_front(value.clone());
self.broadcast_diff(VectorDiff::PushFront { value });
}
pub fn push_back(&mut self, value: T) {
self.values.push_back(value.clone());
self.broadcast_diff(VectorDiff::PushBack { value });
}
pub fn pop_front(&mut self) -> Option<T> {
let value = self.values.pop_front();
if value.is_some() {
self.broadcast_diff(VectorDiff::PopFront);
}
value
}
pub fn pop_back(&mut self) -> Option<T> {
let value = self.values.pop_back();
if value.is_some() {
self.broadcast_diff(VectorDiff::PopBack);
}
value
}
#[track_caller]
pub fn insert(&mut self, index: usize, value: T) {
let len = self.values.len();
if index <= len {
self.values.insert(index, value.clone());
self.broadcast_diff(VectorDiff::Insert { index, value });
} else {
panic!("index out of bounds: the length is {len} but the index is {index}");
}
}
#[track_caller]
pub fn set(&mut self, index: usize, value: T) -> T {
let len = self.values.len();
if index < len {
let old_value = self.values.set(index, value.clone());
self.broadcast_diff(VectorDiff::Set { index, value });
old_value
} else {
panic!("index out of bounds: the length is {len} but the index is {index}");
}
}
#[track_caller]
pub fn remove(&mut self, index: usize) -> T {
let len = self.values.len();
if index < len {
let value = self.values.remove(index);
self.broadcast_diff(VectorDiff::Remove { index });
value
} else {
panic!("index out of bounds: the length is {len} but the index is {index}");
}
}
fn broadcast_diff(&self, diff: VectorDiff<T>) {
if self.sender.receiver_count() != 0 {
let msg = BroadcastMessage { diff, state: self.values.clone() };
let _num_receivers = self.sender.send(msg).unwrap_or(0);
#[cfg(feature = "tracing")]
tracing::debug!("New observable value broadcast to {_num_receivers} receivers");
}
}
}
impl<T: Clone + Send + Sync + 'static> Default for ObservableVector<T> {
fn default() -> Self {
Self::new()
}
}
impl<T> fmt::Debug for ObservableVector<T>
where
T: Clone + fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("ObservableVector").field("values", &self.values).finish_non_exhaustive()
}
}
impl<T: Clone> ops::Deref for ObservableVector<T> {
type Target = Vector<T>;
fn deref(&self) -> &Self::Target {
&self.values
}
}
impl<T: Clone + Send + Sync + 'static> From<Vector<T>> for ObservableVector<T> {
fn from(values: Vector<T>) -> Self {
let mut this = Self::new();
this.append(values);
this
}
}
#[derive(Clone)]
struct BroadcastMessage<T: Clone> {
diff: VectorDiff<T>,
state: Vector<T>,
}
#[derive(Debug)]
pub struct VectorSubscriber<T: Clone> {
inner: BroadcastStream<BroadcastMessage<T>>,
must_reset: bool,
}
impl<T: Clone> VectorSubscriber<T> {
const fn new(inner: BroadcastStream<BroadcastMessage<T>>) -> Self {
Self { inner, must_reset: false }
}
}
impl<T: Clone + Send + Sync + 'static> Stream for VectorSubscriber<T> {
type Item = VectorDiff<T>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
loop {
let poll = match Pin::new(&mut self.inner).poll_next(cx) {
Poll::Ready(Some(Ok(msg))) => {
let diff = if self.must_reset {
self.must_reset = false;
VectorDiff::Reset { values: msg.state }
} else {
msg.diff
};
Poll::Ready(Some(diff))
}
Poll::Ready(None) => Poll::Ready(None),
Poll::Ready(Some(Err(BroadcastStreamRecvError::Lagged(_)))) => {
self.must_reset = true;
continue;
}
Poll::Pending => Poll::Pending,
};
return poll;
}
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum VectorDiff<T: Clone> {
Append {
values: Vector<T>,
},
Clear,
PushFront {
value: T,
},
PushBack {
value: T,
},
PopFront,
PopBack,
Insert {
index: usize,
value: T,
},
Set {
index: usize,
value: T,
},
Remove {
index: usize,
},
Reset {
values: Vector<T>,
},
}