use crate::core::{PlottingError, Result};
use std::ops::Deref;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Mutex, RwLock, RwLockReadGuard, Weak};
pub type SubscriberCallback = Box<dyn Fn() + Send + Sync>;
type SharedSubscriberCallback = Arc<dyn Fn() + Send + Sync>;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct SubscriberId(u64);
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
struct DropHookId(u64);
struct Subscriber {
id: SubscriberId,
callback: SharedSubscriberCallback,
}
struct DropHookEntry {
id: DropHookId,
hook: Box<dyn FnOnce() + Send + 'static>,
}
struct ObservableLifecycle {
drop_hooks: Mutex<Vec<DropHookEntry>>,
next_drop_hook_id: AtomicU64,
}
impl ObservableLifecycle {
fn new() -> Self {
Self {
drop_hooks: Mutex::new(Vec::new()),
next_drop_hook_id: AtomicU64::new(0),
}
}
fn add_drop_hook<F>(&self, hook: F) -> DropHookId
where
F: FnOnce() + Send + 'static,
{
let id = DropHookId(self.next_drop_hook_id.fetch_add(1, Ordering::Relaxed));
self.drop_hooks
.lock()
.expect("Observable lifecycle lock poisoned")
.push(DropHookEntry {
id,
hook: Box::new(hook),
});
id
}
fn remove_drop_hook(&self, id: DropHookId) -> bool {
let mut hooks = self
.drop_hooks
.lock()
.expect("Observable lifecycle lock poisoned");
if let Some(pos) = hooks.iter().position(|entry| entry.id == id) {
hooks.remove(pos);
true
} else {
false
}
}
#[cfg(test)]
fn hook_count(&self) -> usize {
self.drop_hooks
.lock()
.expect("Observable lifecycle lock poisoned")
.len()
}
}
impl Drop for ObservableLifecycle {
fn drop(&mut self) {
let hooks = std::mem::take(
&mut *self
.drop_hooks
.lock()
.expect("Observable lifecycle lock poisoned"),
);
for entry in hooks {
(entry.hook)();
}
}
}
fn collect_subscriber_callbacks(
subscribers: &RwLock<Vec<Subscriber>>,
lock_error: &str,
) -> Vec<SharedSubscriberCallback> {
subscribers
.read()
.expect(lock_error)
.iter()
.map(|subscriber| Arc::clone(&subscriber.callback))
.collect()
}
pub struct Observable<T> {
data: Arc<RwLock<T>>,
version: Arc<AtomicU64>,
subscribers: Arc<RwLock<Vec<Subscriber>>>,
next_subscriber_id: Arc<AtomicU64>,
lifecycle: Arc<ObservableLifecycle>,
}
impl<T> Clone for Observable<T> {
fn clone(&self) -> Self {
Self {
data: Arc::clone(&self.data),
version: Arc::clone(&self.version),
subscribers: Arc::clone(&self.subscribers),
next_subscriber_id: Arc::clone(&self.next_subscriber_id),
lifecycle: Arc::clone(&self.lifecycle),
}
}
}
impl<T: std::fmt::Debug> std::fmt::Debug for Observable<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Observable")
.field("data", &self.data)
.field("version", &self.version.load(Ordering::Relaxed))
.field(
"subscriber_count",
&self.subscribers.read().map(|s| s.len()).unwrap_or(0),
)
.finish()
}
}
impl<T> Observable<T> {
fn reserve_subscriber_id(&self) -> SubscriberId {
SubscriberId(self.next_subscriber_id.fetch_add(1, Ordering::Relaxed))
}
fn add_subscriber_with_id(&self, id: SubscriberId, callback: SharedSubscriberCallback) {
let subscriber = Subscriber { id, callback };
self.subscribers
.write()
.expect("Subscribers lock poisoned")
.push(subscriber);
}
pub fn new(value: T) -> Self {
Self {
data: Arc::new(RwLock::new(value)),
version: Arc::new(AtomicU64::new(0)),
subscribers: Arc::new(RwLock::new(Vec::new())),
next_subscriber_id: Arc::new(AtomicU64::new(0)),
lifecycle: Arc::new(ObservableLifecycle::new()),
}
}
pub fn version(&self) -> u64 {
self.version.load(Ordering::Acquire)
}
fn bump_version(&self) {
self.version.fetch_add(1, Ordering::Release);
self.notify_subscribers();
}
pub fn read(&self) -> std::sync::RwLockReadGuard<'_, T> {
self.data.read().expect("Observable lock poisoned")
}
pub fn try_read(&self) -> Option<std::sync::RwLockReadGuard<'_, T>> {
self.data.try_read().ok()
}
pub fn update<F>(&self, f: F)
where
F: FnOnce(&mut T),
{
{
let mut guard = self.data.write().expect("Observable lock poisoned");
f(&mut *guard);
}
self.bump_version();
}
pub fn update_with<F, R>(&self, f: F) -> R
where
F: FnOnce(&mut T) -> R,
{
let result = {
let mut guard = self.data.write().expect("Observable lock poisoned");
f(&mut *guard)
};
self.bump_version();
result
}
pub fn set(&self, value: T) {
{
let mut guard = self.data.write().expect("Observable lock poisoned");
*guard = value;
}
self.bump_version();
}
pub fn subscribe<F>(&self, callback: F) -> SubscriberId
where
F: Fn() + Send + Sync + 'static,
{
let id = self.reserve_subscriber_id();
self.add_subscriber_with_id(id, Arc::new(callback));
id
}
pub fn unsubscribe(&self, id: SubscriberId) -> bool {
let mut subscribers = self.subscribers.write().expect("Subscribers lock poisoned");
if let Some(pos) = subscribers.iter().position(|s| s.id == id) {
subscribers.remove(pos);
true
} else {
false
}
}
pub fn subscriber_count(&self) -> usize {
self.subscribers
.read()
.expect("Subscribers lock poisoned")
.len()
}
fn notify_subscribers(&self) {
let callbacks =
collect_subscriber_callbacks(&self.subscribers, "Subscribers lock poisoned");
for callback in callbacks {
callback();
}
}
fn on_last_drop<F>(&self, hook: F) -> DropHookId
where
F: FnOnce() + Send + 'static,
{
self.lifecycle.add_drop_hook(hook)
}
fn remove_drop_hook(&self, id: DropHookId) -> bool {
self.lifecycle.remove_drop_hook(id)
}
#[cfg(test)]
fn lifecycle_hook_count(&self) -> usize {
self.lifecycle.hook_count()
}
pub fn downgrade(&self) -> WeakObservable<T> {
WeakObservable {
data: Arc::downgrade(&self.data),
version: Arc::downgrade(&self.version),
subscribers: Arc::downgrade(&self.subscribers),
next_subscriber_id: Arc::downgrade(&self.next_subscriber_id),
lifecycle: Arc::downgrade(&self.lifecycle),
}
}
}
impl<T: Clone> Observable<T> {
pub fn get(&self) -> T {
self.read().clone()
}
}
impl<T: Default> Default for Observable<T> {
fn default() -> Self {
Self::new(T::default())
}
}
pub struct WeakObservable<T> {
data: Weak<RwLock<T>>,
version: Weak<AtomicU64>,
subscribers: Weak<RwLock<Vec<Subscriber>>>,
next_subscriber_id: Weak<AtomicU64>,
lifecycle: Weak<ObservableLifecycle>,
}
impl<T> Clone for WeakObservable<T> {
fn clone(&self) -> Self {
Self {
data: Weak::clone(&self.data),
version: Weak::clone(&self.version),
subscribers: Weak::clone(&self.subscribers),
next_subscriber_id: Weak::clone(&self.next_subscriber_id),
lifecycle: Weak::clone(&self.lifecycle),
}
}
}
impl<T> WeakObservable<T> {
pub fn upgrade(&self) -> Option<Observable<T>> {
let data = self.data.upgrade()?;
let version = self.version.upgrade()?;
let subscribers = self.subscribers.upgrade()?;
let next_subscriber_id = self.next_subscriber_id.upgrade()?;
let lifecycle = self.lifecycle.upgrade()?;
Some(Observable {
data,
version,
subscribers,
next_subscriber_id,
lifecycle,
})
}
pub fn is_alive(&self) -> bool {
self.data.strong_count() > 0
}
}
pub struct BatchUpdate<'a> {
observables: Vec<&'a dyn BatchNotifier>,
}
pub trait BatchNotifier {
fn notify(&self);
}
impl<T> BatchNotifier for Observable<T> {
fn notify(&self) {
self.notify_subscribers();
}
}
impl<'a> BatchUpdate<'a> {
pub fn new() -> Self {
Self {
observables: Vec::new(),
}
}
pub fn add<T>(&mut self, observable: &'a Observable<T>) {
self.observables.push(observable);
}
}
impl<'a> Default for BatchUpdate<'a> {
fn default() -> Self {
Self::new()
}
}
impl<'a> Drop for BatchUpdate<'a> {
fn drop(&mut self) {
for obs in &self.observables {
obs.notify();
}
}
}
pub struct SlidingWindowObservable<T> {
inner: Observable<Vec<T>>,
max_size: usize,
}
impl<T: Clone> SlidingWindowObservable<T> {
pub fn new(max_size: usize) -> Self {
Self {
inner: Observable::new(Vec::with_capacity(max_size)),
max_size,
}
}
pub fn push(&self, value: T) {
self.inner.update(|data| {
if data.len() >= self.max_size {
data.remove(0);
}
data.push(value);
});
}
pub fn push_many(&self, values: impl IntoIterator<Item = T>) {
self.inner.update(|data| {
for value in values {
if data.len() >= self.max_size {
data.remove(0);
}
data.push(value);
}
});
}
pub fn clear(&self) {
self.inner.update(|data| data.clear());
}
pub fn version(&self) -> u64 {
self.inner.version()
}
pub fn read(&self) -> std::sync::RwLockReadGuard<'_, Vec<T>> {
self.inner.read()
}
pub fn subscribe<F>(&self, callback: F) -> SubscriberId
where
F: Fn() + Send + Sync + 'static,
{
self.inner.subscribe(callback)
}
pub fn unsubscribe(&self, id: SubscriberId) -> bool {
self.inner.unsubscribe(id)
}
pub fn as_observable(&self) -> &Observable<Vec<T>> {
&self.inner
}
pub fn max_size(&self) -> usize {
self.max_size
}
pub fn len(&self) -> usize {
self.inner.read().len()
}
pub fn is_empty(&self) -> bool {
self.inner.read().is_empty()
}
}
impl<T: Clone> Clone for SlidingWindowObservable<T> {
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
max_size: self.max_size,
}
}
}
pub trait IntoObservable<T> {
fn into_observable(self) -> Observable<T>;
}
pub fn lift<T, U, F>(source: &Observable<T>, f: F) -> Observable<U>
where
T: Clone + Send + Sync + 'static,
U: Send + Sync + 'static,
F: Fn(T) -> U + Send + Sync + 'static,
{
let initial = f(source.get());
let derived = Observable::new(initial);
let f = Arc::new(f);
let weak_source = source.downgrade();
let weak_derived = derived.downgrade();
let id = source.reserve_subscriber_id();
source.add_subscriber_with_id(
id,
Arc::new(move || {
let Some(source) = weak_source.upgrade() else {
return;
};
let Some(derived) = weak_derived.upgrade() else {
source.unsubscribe(id);
return;
};
let new_value = f(source.get());
{
let mut guard = derived.data.write().expect("Lock poisoned");
*guard = new_value;
}
derived.bump_version();
}),
);
let weak_source_for_drop = source.downgrade();
derived.on_last_drop(move || {
if let Some(source) = weak_source_for_drop.upgrade() {
source.unsubscribe(id);
}
});
derived
}
pub fn lift2<T1, T2, U, F>(
source1: &Observable<T1>,
source2: &Observable<T2>,
f: F,
) -> Observable<U>
where
T1: Clone + Send + Sync + 'static,
T2: Clone + Send + Sync + 'static,
U: Send + Sync + 'static,
F: Fn(T1, T2) -> U + Send + Sync + 'static,
{
let initial = f(source1.get(), source2.get());
let derived = Observable::new(initial);
let f = Arc::new(f);
let weak_derived = derived.downgrade();
let weak_s1 = source1.downgrade();
let weak_s2 = source2.downgrade();
let source1_id = source1.reserve_subscriber_id();
let source2_id = source2.reserve_subscriber_id();
{
let f_clone = Arc::clone(&f);
let weak_derived = weak_derived.clone();
let weak_s1 = weak_s1.clone();
let weak_s2 = weak_s2.clone();
source1.add_subscriber_with_id(
source1_id,
Arc::new(move || {
let Some(source1) = weak_s1.upgrade() else {
return;
};
let Some(source2) = weak_s2.upgrade() else {
source1.unsubscribe(source1_id);
return;
};
let Some(derived) = weak_derived.upgrade() else {
source1.unsubscribe(source1_id);
return;
};
let new_value = f_clone(source1.get(), source2.get());
{
let mut guard = derived.data.write().expect("Lock poisoned");
*guard = new_value;
}
derived.bump_version();
}),
);
}
{
let f_clone = Arc::clone(&f);
let weak_derived = weak_derived.clone();
let weak_s1 = weak_s1.clone();
let weak_s2 = weak_s2.clone();
source2.add_subscriber_with_id(
source2_id,
Arc::new(move || {
let Some(source1) = weak_s1.upgrade() else {
if let Some(source2) = weak_s2.upgrade() {
source2.unsubscribe(source2_id);
}
return;
};
let Some(source2) = weak_s2.upgrade() else {
return;
};
let Some(derived) = weak_derived.upgrade() else {
source2.unsubscribe(source2_id);
return;
};
let new_value = f_clone(source1.get(), source2.get());
{
let mut guard = derived.data.write().expect("Lock poisoned");
*guard = new_value;
}
derived.bump_version();
}),
);
}
let weak_source1_for_source2_drop = source1.downgrade();
let source2_drop_hook_id = source2.on_last_drop(move || {
if let Some(source1) = weak_source1_for_source2_drop.upgrade() {
source1.unsubscribe(source1_id);
}
});
let weak_source2_for_source1_drop = source2.downgrade();
let source1_drop_hook_id = source1.on_last_drop(move || {
if let Some(source2) = weak_source2_for_source1_drop.upgrade() {
source2.unsubscribe(source2_id);
}
});
let weak_source1_for_derived_drop = source1.downgrade();
let weak_source2_for_derived_drop = source2.downgrade();
derived.on_last_drop(move || {
if let Some(source1) = weak_source1_for_derived_drop.upgrade() {
source1.unsubscribe(source1_id);
source1.remove_drop_hook(source1_drop_hook_id);
}
if let Some(source2) = weak_source2_for_derived_drop.upgrade() {
source2.unsubscribe(source2_id);
source2.remove_drop_hook(source2_drop_hook_id);
}
});
derived
}
pub fn map<T, U, F>(source: &Observable<T>, f: F) -> Observable<U>
where
T: Clone + Send + Sync + 'static,
U: Send + Sync + 'static,
F: Fn(T) -> U + Send + Sync + 'static,
{
lift(source, f)
}
#[derive(Clone)]
pub struct ReactiveDataHandle {
last_versions: Arc<RwLock<Vec<u64>>>,
current_versions: Arc<RwLock<Vec<Arc<AtomicU64>>>>,
}
impl ReactiveDataHandle {
pub fn new() -> Self {
Self {
last_versions: Arc::new(RwLock::new(Vec::new())),
current_versions: Arc::new(RwLock::new(Vec::new())),
}
}
pub fn track<T>(&self, observable: &Observable<T>) {
let mut last = self.last_versions.write().expect("Lock poisoned");
let mut current = self.current_versions.write().expect("Lock poisoned");
last.push(observable.version());
current.push(Arc::clone(&observable.version));
}
pub fn has_changes(&self) -> bool {
let last = self.last_versions.read().expect("Lock poisoned");
let current = self.current_versions.read().expect("Lock poisoned");
for (i, version_arc) in current.iter().enumerate() {
if let Some(&last_version) = last.get(i) {
if version_arc.load(Ordering::Acquire) != last_version {
return true;
}
}
}
false
}
pub fn mark_updated(&self) {
let mut last = self.last_versions.write().expect("Lock poisoned");
let current = self.current_versions.read().expect("Lock poisoned");
for (i, version_arc) in current.iter().enumerate() {
if let Some(last_version) = last.get_mut(i) {
*last_version = version_arc.load(Ordering::Acquire);
}
}
}
}
impl Default for ReactiveDataHandle {
fn default() -> Self {
Self::new()
}
}
impl<T> IntoObservable<Vec<T>> for Vec<T> {
fn into_observable(self) -> Observable<Vec<T>> {
Observable::new(self)
}
}
impl<T: Clone, const N: usize> IntoObservable<Vec<T>> for [T; N] {
fn into_observable(self) -> Observable<Vec<T>> {
Observable::new(self.to_vec())
}
}
pub struct StreamingBufferView<'a, T> {
guard: RwLockReadGuard<'a, Vec<Option<T>>>,
}
impl<T> Deref for StreamingBufferView<'_, T> {
type Target = [Option<T>];
fn deref(&self) -> &Self::Target {
&self.guard
}
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub enum StreamingRenderState {
Unchanged,
AppendOnly { visible_appended: usize },
FullRedrawRequired,
}
impl StreamingRenderState {
pub fn visible_appended(self) -> usize {
match self {
Self::Unchanged | Self::FullRedrawRequired => 0,
Self::AppendOnly { visible_appended } => visible_appended,
}
}
pub fn can_incrementally_render(self) -> bool {
matches!(self, Self::AppendOnly { .. })
}
}
pub struct StreamingBuffer<T> {
data: Arc<RwLock<Vec<Option<T>>>>,
capacity: usize,
write_pos: Arc<std::sync::atomic::AtomicUsize>,
total_written: Arc<AtomicU64>,
version: Arc<AtomicU64>,
appended_since_render: Arc<std::sync::atomic::AtomicUsize>,
subscribers: Arc<RwLock<Vec<Subscriber>>>,
next_subscriber_id: Arc<AtomicU64>,
}
impl<T: Clone> StreamingBuffer<T> {
pub fn new(capacity: usize) -> Self {
Self::try_new(capacity).unwrap_or_else(|_| Self::with_capacity(1))
}
pub fn try_new(capacity: usize) -> Result<Self> {
if capacity == 0 {
return Err(PlottingError::InvalidInput(
"StreamingBuffer capacity must be at least 1".to_string(),
));
}
Ok(Self::with_capacity(capacity))
}
fn with_capacity(capacity: usize) -> Self {
let mut data = Vec::with_capacity(capacity);
data.resize_with(capacity, || None);
Self {
data: Arc::new(RwLock::new(data)),
capacity,
write_pos: Arc::new(std::sync::atomic::AtomicUsize::new(0)),
total_written: Arc::new(AtomicU64::new(0)),
version: Arc::new(AtomicU64::new(0)),
appended_since_render: Arc::new(std::sync::atomic::AtomicUsize::new(0)),
subscribers: Arc::new(RwLock::new(Vec::new())),
next_subscriber_id: Arc::new(AtomicU64::new(0)),
}
}
pub fn push(&self, value: T) {
{
let mut data = self.data.write().expect("Lock poisoned");
let write_pos = self.write_pos.load(Ordering::Relaxed);
let pos = write_pos % self.capacity;
data[pos] = Some(value);
self.write_pos
.store(write_pos.wrapping_add(1), Ordering::Release);
let total = self.total_written.load(Ordering::Relaxed);
self.total_written
.store(total.saturating_add(1), Ordering::Release);
let appended = self.appended_since_render.load(Ordering::Relaxed);
self.appended_since_render
.store(appended.saturating_add(1), Ordering::Release);
}
self.bump_version();
}
pub fn push_many(&self, values: impl IntoIterator<Item = T>) {
let values: Vec<T> = values.into_iter().collect();
let count = values.len();
if count == 0 {
return;
}
{
let mut data = self.data.write().expect("Lock poisoned");
let mut write_pos = self.write_pos.load(Ordering::Relaxed);
for value in values {
let pos = write_pos % self.capacity;
data[pos] = Some(value);
write_pos = write_pos.wrapping_add(1);
}
self.write_pos.store(write_pos, Ordering::Release);
let total = self.total_written.load(Ordering::Relaxed);
self.total_written
.store(total.saturating_add(count as u64), Ordering::Release);
let appended = self.appended_since_render.load(Ordering::Relaxed);
self.appended_since_render
.store(appended.saturating_add(count), Ordering::Release);
}
self.bump_version();
}
pub fn read(&self) -> Vec<T> {
let data = self.data.read().expect("Lock poisoned");
let total = self.total_written.load(Ordering::Acquire);
let write_pos = self.write_pos.load(Ordering::Acquire);
if total == 0 {
return Vec::new();
}
let len = std::cmp::min(total as usize, self.capacity);
let mut result = Vec::with_capacity(len);
if total <= self.capacity as u64 {
for i in 0..len {
if let Some(ref value) = data[i] {
result.push(value.clone());
}
}
} else {
let start = write_pos % self.capacity;
for i in 0..self.capacity {
let idx = (start + i) % self.capacity;
if let Some(ref value) = data[idx] {
result.push(value.clone());
}
}
}
result
}
pub fn read_view(&self) -> StreamingBufferView<'_, T> {
StreamingBufferView {
guard: self.data.read().expect("Lock poisoned"),
}
}
pub fn read_appended(&self) -> Vec<T> {
let data = self.data.read().expect("Lock poisoned");
let appended = self.appended_since_render.load(Ordering::Acquire);
let write_pos = self.write_pos.load(Ordering::Acquire);
if appended == 0 {
return Vec::new();
}
let count = std::cmp::min(appended, self.capacity);
let mut result = Vec::with_capacity(count);
for i in 0..count {
let idx = (write_pos + self.capacity - count + i) % self.capacity;
if let Some(ref value) = data[idx] {
result.push(value.clone());
}
}
result
}
pub fn appended_since_mark(&self) -> usize {
self.appended_since_render.load(Ordering::Acquire)
}
pub fn mark_rendered(&self) {
self.appended_since_render.store(0, Ordering::Release);
}
pub fn render_state(&self) -> StreamingRenderState {
let appended = self.appended_since_render.load(Ordering::Acquire);
if appended == 0 {
return StreamingRenderState::Unchanged;
}
let total_written = self.total_written.load(Ordering::Acquire);
let visible_after = std::cmp::min(total_written as usize, self.capacity);
let total_before = total_written.saturating_sub(appended as u64);
let visible_before = std::cmp::min(total_before as usize, self.capacity);
if visible_before == 0 {
return StreamingRenderState::AppendOnly {
visible_appended: visible_after,
};
}
if visible_before.saturating_add(appended) <= self.capacity {
return StreamingRenderState::AppendOnly {
visible_appended: appended,
};
}
StreamingRenderState::FullRedrawRequired
}
pub fn can_partial_render(&self) -> bool {
self.render_state().can_incrementally_render()
}
pub fn version(&self) -> u64 {
self.version.load(Ordering::Acquire)
}
pub fn capacity(&self) -> usize {
self.capacity
}
pub fn len(&self) -> usize {
let total = self.total_written.load(Ordering::Acquire);
std::cmp::min(total as usize, self.capacity)
}
pub fn is_empty(&self) -> bool {
self.total_written.load(Ordering::Acquire) == 0
}
pub fn is_full(&self) -> bool {
self.total_written.load(Ordering::Acquire) >= self.capacity as u64
}
pub fn total_written(&self) -> u64 {
self.total_written.load(Ordering::Acquire)
}
pub fn clear(&self) {
{
let mut data = self.data.write().expect("Lock poisoned");
for slot in data.iter_mut() {
*slot = None;
}
self.write_pos.store(0, Ordering::Release);
self.total_written.store(0, Ordering::Release);
self.appended_since_render.store(0, Ordering::Release);
}
self.bump_version();
}
pub fn subscribe<F>(&self, callback: F) -> SubscriberId
where
F: Fn() + Send + Sync + 'static,
{
let id = SubscriberId(self.next_subscriber_id.fetch_add(1, Ordering::Relaxed));
let subscriber = Subscriber {
id,
callback: Arc::new(callback),
};
self.subscribers
.write()
.expect("Lock poisoned")
.push(subscriber);
id
}
pub fn unsubscribe(&self, id: SubscriberId) -> bool {
let mut subscribers = self.subscribers.write().expect("Lock poisoned");
if let Some(pos) = subscribers.iter().position(|s| s.id == id) {
subscribers.remove(pos);
true
} else {
false
}
}
fn bump_version(&self) {
self.version.fetch_add(1, Ordering::Release);
let callbacks = collect_subscriber_callbacks(&self.subscribers, "Lock poisoned");
for callback in callbacks {
callback();
}
}
}
impl<T: Clone> Clone for StreamingBuffer<T> {
fn clone(&self) -> Self {
Self {
data: Arc::clone(&self.data),
capacity: self.capacity,
write_pos: Arc::clone(&self.write_pos),
total_written: Arc::clone(&self.total_written),
version: Arc::clone(&self.version),
appended_since_render: Arc::clone(&self.appended_since_render),
subscribers: Arc::clone(&self.subscribers),
next_subscriber_id: Arc::clone(&self.next_subscriber_id),
}
}
}
pub struct StreamingXY {
x: StreamingBuffer<f64>,
y: StreamingBuffer<f64>,
subscribers: Arc<RwLock<Vec<Subscriber>>>,
next_subscriber_id: Arc<AtomicU64>,
}
impl StreamingXY {
pub fn new(capacity: usize) -> Self {
Self {
x: StreamingBuffer::new(capacity),
y: StreamingBuffer::new(capacity),
subscribers: Arc::new(RwLock::new(Vec::new())),
next_subscriber_id: Arc::new(AtomicU64::new(0)),
}
}
pub fn push(&self, x: f64, y: f64) {
self.x.push(x);
self.y.push(y);
self.notify_subscribers();
}
pub fn push_many(&self, points: impl IntoIterator<Item = (f64, f64)>) {
let mut pushed_any = false;
for (x, y) in points {
self.x.push(x);
self.y.push(y);
pushed_any = true;
}
if pushed_any {
self.notify_subscribers();
}
}
pub fn x(&self) -> &StreamingBuffer<f64> {
&self.x
}
pub fn y(&self) -> &StreamingBuffer<f64> {
&self.y
}
pub fn read_x(&self) -> Vec<f64> {
self.x.read()
}
pub fn read_y(&self) -> Vec<f64> {
self.y.read()
}
pub fn read_view_x(&self) -> StreamingBufferView<'_, f64> {
self.x.read_view()
}
pub fn read_view_y(&self) -> StreamingBufferView<'_, f64> {
self.y.read_view()
}
pub fn read_view(&self) -> (StreamingBufferView<'_, f64>, StreamingBufferView<'_, f64>) {
(self.x.read_view(), self.y.read_view())
}
pub fn read_appended_x(&self) -> Vec<f64> {
self.x.read_appended()
}
pub fn read_appended_y(&self) -> Vec<f64> {
self.y.read_appended()
}
pub fn appended_count(&self) -> usize {
self.x.appended_since_mark()
}
pub fn mark_rendered(&self) {
self.x.mark_rendered();
self.y.mark_rendered();
}
pub fn can_partial_render(&self) -> bool {
self.render_state().can_incrementally_render()
}
pub fn render_state(&self) -> StreamingRenderState {
match (self.x.render_state(), self.y.render_state()) {
(StreamingRenderState::Unchanged, StreamingRenderState::Unchanged) => {
StreamingRenderState::Unchanged
}
(
StreamingRenderState::AppendOnly {
visible_appended: x,
},
StreamingRenderState::AppendOnly {
visible_appended: y,
},
) => StreamingRenderState::AppendOnly {
visible_appended: x.min(y),
},
_ => StreamingRenderState::FullRedrawRequired,
}
}
pub fn version(&self) -> u64 {
std::cmp::max(self.x.version(), self.y.version())
}
pub fn len(&self) -> usize {
self.x.len()
}
pub fn is_empty(&self) -> bool {
self.x.is_empty()
}
pub fn clear(&self) {
self.x.clear();
self.y.clear();
self.notify_subscribers();
}
pub(crate) fn subscribe_paired<F>(&self, callback: F) -> SubscriberId
where
F: Fn() + Send + Sync + 'static,
{
let id = SubscriberId(self.next_subscriber_id.fetch_add(1, Ordering::Relaxed));
let subscriber = Subscriber {
id,
callback: Arc::new(callback),
};
self.subscribers
.write()
.expect("Lock poisoned")
.push(subscriber);
id
}
pub(crate) fn unsubscribe_paired(&self, id: SubscriberId) -> bool {
let mut subscribers = self.subscribers.write().expect("Lock poisoned");
if let Some(pos) = subscribers
.iter()
.position(|subscriber| subscriber.id == id)
{
subscribers.remove(pos);
true
} else {
false
}
}
fn notify_subscribers(&self) {
let callbacks = collect_subscriber_callbacks(&self.subscribers, "Lock poisoned");
for callback in callbacks {
callback();
}
}
}
impl Clone for StreamingXY {
fn clone(&self) -> Self {
Self {
x: self.x.clone(),
y: self.y.clone(),
subscribers: Arc::clone(&self.subscribers),
next_subscriber_id: Arc::clone(&self.next_subscriber_id),
}
}
}
impl std::fmt::Debug for StreamingXY {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("StreamingXY")
.field("len", &self.len())
.field("version", &self.version())
.finish()
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering};
use std::thread;
#[test]
fn test_observable_basic() {
let obs = Observable::new(vec![1.0, 2.0, 3.0]);
assert_eq!(obs.version(), 0);
assert_eq!(obs.read().len(), 3);
}
#[test]
fn test_observable_update() {
let obs = Observable::new(vec![1.0, 2.0, 3.0]);
let v1 = obs.version();
obs.update(|data| data.push(4.0));
assert!(obs.version() > v1);
assert_eq!(obs.read().len(), 4);
}
#[test]
fn test_observable_set() {
let obs = Observable::new(42);
let v1 = obs.version();
obs.set(100);
assert!(obs.version() > v1);
assert_eq!(*obs.read(), 100);
}
#[test]
fn test_observable_subscribe() {
let obs = Observable::new(42);
let counter = Arc::new(AtomicUsize::new(0));
let counter_clone = Arc::clone(&counter);
let id = obs.subscribe(move || {
counter_clone.fetch_add(1, AtomicOrdering::Relaxed);
});
obs.set(100);
assert_eq!(counter.load(AtomicOrdering::Relaxed), 1);
obs.update(|v| *v += 1);
assert_eq!(counter.load(AtomicOrdering::Relaxed), 2);
obs.unsubscribe(id);
obs.set(200);
assert_eq!(counter.load(AtomicOrdering::Relaxed), 2);
}
#[test]
fn test_observable_multiple_subscribers() {
let obs = Observable::new(0);
let counter1 = Arc::new(AtomicUsize::new(0));
let counter2 = Arc::new(AtomicUsize::new(0));
let c1 = Arc::clone(&counter1);
let c2 = Arc::clone(&counter2);
obs.subscribe(move || {
c1.fetch_add(1, AtomicOrdering::Relaxed);
});
obs.subscribe(move || {
c2.fetch_add(1, AtomicOrdering::Relaxed);
});
obs.set(42);
assert_eq!(counter1.load(AtomicOrdering::Relaxed), 1);
assert_eq!(counter2.load(AtomicOrdering::Relaxed), 1);
}
#[test]
fn test_observable_thread_safe() {
let obs = Observable::new(0i32);
let obs_clone = obs.clone();
let handle = thread::spawn(move || {
for _ in 0..1000 {
obs_clone.update(|v| *v += 1);
}
});
for _ in 0..1000 {
obs.update(|v| *v += 1);
}
handle.join().unwrap();
assert_eq!(*obs.read(), 2000);
}
#[test]
fn test_observable_get_clone() {
let obs = Observable::new(vec![1, 2, 3]);
let cloned = obs.get();
assert_eq!(cloned, vec![1, 2, 3]);
obs.update(|v| v.push(4));
assert_eq!(cloned, vec![1, 2, 3]); assert_eq!(obs.get(), vec![1, 2, 3, 4]);
}
#[test]
fn test_weak_observable() {
let obs = Observable::new(42);
let weak = obs.downgrade();
assert!(weak.is_alive());
assert!(weak.upgrade().is_some());
drop(obs);
assert!(!weak.is_alive());
}
#[test]
fn test_weak_observable_upgrade_preserves_subscribers() {
let obs = Observable::new(1);
let notifications = Arc::new(AtomicUsize::new(0));
let notifications_clone = Arc::clone(¬ifications);
let first_id = obs.subscribe(move || {
notifications_clone.fetch_add(1, AtomicOrdering::Relaxed);
});
let weak = obs.downgrade();
let upgraded = weak.upgrade().expect("upgrade should preserve state");
assert_eq!(upgraded.subscriber_count(), 1);
let second_id = upgraded.subscribe(|| {});
assert_ne!(first_id, second_id);
upgraded.set(2);
assert_eq!(notifications.load(AtomicOrdering::Relaxed), 1);
}
#[test]
fn test_observable_unsubscribe_within_callback_does_not_deadlock() {
let obs = Observable::new(0);
let callback_count = Arc::new(AtomicUsize::new(0));
let callback_count_clone = Arc::clone(&callback_count);
let callback_id = Arc::new(Mutex::new(None));
let callback_id_clone = Arc::clone(&callback_id);
let obs_clone = obs.clone();
let id = obs.subscribe(move || {
callback_count_clone.fetch_add(1, AtomicOrdering::Relaxed);
if let Some(id) = *callback_id_clone.lock().expect("Lock poisoned") {
obs_clone.unsubscribe(id);
}
});
*callback_id.lock().expect("Lock poisoned") = Some(id);
obs.set(1);
obs.set(2);
assert_eq!(callback_count.load(AtomicOrdering::Relaxed), 1);
}
#[test]
fn test_sliding_window() {
let window = SlidingWindowObservable::new(3);
window.push(1.0);
window.push(2.0);
window.push(3.0);
assert_eq!(*window.read(), vec![1.0, 2.0, 3.0]);
window.push(4.0);
assert_eq!(*window.read(), vec![2.0, 3.0, 4.0]);
window.push(5.0);
assert_eq!(*window.read(), vec![3.0, 4.0, 5.0]);
}
#[test]
fn test_sliding_window_push_many() {
let window = SlidingWindowObservable::new(3);
window.push_many(vec![1.0, 2.0, 3.0, 4.0, 5.0]);
assert_eq!(*window.read(), vec![3.0, 4.0, 5.0]);
}
#[test]
fn test_batch_update() {
let x = Observable::new(0);
let y = Observable::new(0);
let counter = Arc::new(AtomicUsize::new(0));
let c1 = Arc::clone(&counter);
let c2 = Arc::clone(&counter);
x.subscribe(move || {
c1.fetch_add(1, AtomicOrdering::Relaxed);
});
y.subscribe(move || {
c2.fetch_add(1, AtomicOrdering::Relaxed);
});
x.set(1);
y.set(1);
assert_eq!(counter.load(AtomicOrdering::Relaxed), 2);
}
#[test]
fn test_update_with() {
let obs = Observable::new(vec![1, 2, 3]);
let old_len = obs.update_with(|v| {
let len = v.len();
v.push(4);
len
});
assert_eq!(old_len, 3);
assert_eq!(obs.read().len(), 4);
}
#[test]
fn test_subscriber_count() {
let obs = Observable::new(42);
assert_eq!(obs.subscriber_count(), 0);
let id1 = obs.subscribe(|| {});
assert_eq!(obs.subscriber_count(), 1);
let id2 = obs.subscribe(|| {});
assert_eq!(obs.subscriber_count(), 2);
obs.unsubscribe(id1);
assert_eq!(obs.subscriber_count(), 1);
obs.unsubscribe(id2);
assert_eq!(obs.subscriber_count(), 0);
}
#[test]
fn test_into_observable() {
let v = vec![1.0, 2.0, 3.0];
let obs = v.into_observable();
assert_eq!(obs.read().len(), 3);
let arr = [1.0, 2.0, 3.0];
let obs = arr.into_observable();
assert_eq!(obs.read().len(), 3);
}
#[test]
fn test_lift_basic() {
let x = Observable::new(3.0);
let squared = lift(&x, |v| v * v);
assert_eq!(*squared.read(), 9.0);
assert_eq!(squared.version(), 0);
x.set(4.0);
assert_eq!(*squared.read(), 16.0);
assert!(squared.version() > 0);
}
#[test]
fn test_lift_with_vec() {
let data = Observable::new(vec![1.0, 2.0, 3.0]);
let sum = lift(&data, |v| v.iter().sum::<f64>());
assert_eq!(*sum.read(), 6.0);
data.update(|v| v.push(4.0));
assert_eq!(*sum.read(), 10.0);
}
#[test]
fn test_lift2() {
let a = Observable::new(10.0);
let b = Observable::new(5.0);
let sum = lift2(&a, &b, |x, y| x + y);
assert_eq!(*sum.read(), 15.0);
a.set(20.0);
assert_eq!(*sum.read(), 25.0);
b.set(10.0);
assert_eq!(*sum.read(), 30.0);
}
#[test]
fn test_map_alias() {
let x = Observable::new(5.0);
let doubled = map(&x, |v| v * 2.0);
assert_eq!(*doubled.read(), 10.0);
x.set(7.0);
assert_eq!(*doubled.read(), 14.0);
}
#[test]
fn test_chained_lift() {
let x = Observable::new(2.0);
let doubled = lift(&x, |v| v * 2.0);
let quadrupled = lift(&doubled, |v| v * 2.0);
assert_eq!(*doubled.read(), 4.0);
assert_eq!(*quadrupled.read(), 8.0);
x.set(3.0);
assert_eq!(*doubled.read(), 6.0);
assert_eq!(*quadrupled.read(), 12.0);
}
#[test]
fn test_lift_releases_source_subscription_on_drop() {
let source = Observable::new(2.0);
let derived = lift(&source, |v| v * 2.0);
assert_eq!(source.subscriber_count(), 1);
drop(derived);
assert_eq!(source.subscriber_count(), 0);
}
#[test]
fn test_lift2_releases_remaining_source_subscription_when_other_source_drops() {
let source2 = Observable::new(5.0);
let derived = {
let source1 = Observable::new(10.0);
let derived = lift2(&source1, &source2, |x, y| x + y);
assert_eq!(source2.subscriber_count(), 1);
derived
};
source2.set(6.0);
assert_eq!(source2.subscriber_count(), 0);
assert_eq!(*derived.read(), 15.0);
}
#[test]
fn test_lift2_releases_source1_subscription_when_source2_drops_first() {
let source1 = Observable::new(10.0);
let derived = {
let source2 = Observable::new(5.0);
let derived = lift2(&source1, &source2, |x, y| x + y);
assert_eq!(source1.subscriber_count(), 1);
derived
};
assert_eq!(source1.subscriber_count(), 0);
assert_eq!(*derived.read(), 15.0);
}
#[test]
fn test_lift2_does_not_accumulate_source_drop_hooks_after_drop() {
let source1 = Observable::new(1.0);
let source2 = Observable::new(2.0);
for _ in 0..8 {
let derived = lift2(&source1, &source2, |x, y| x + y);
assert!(source1.lifecycle_hook_count() >= 1);
assert!(source2.lifecycle_hook_count() >= 1);
drop(derived);
assert_eq!(source1.lifecycle_hook_count(), 0);
assert_eq!(source2.lifecycle_hook_count(), 0);
}
}
#[test]
fn test_reactive_data_handle() {
let x = Observable::new(vec![1.0, 2.0]);
let y = Observable::new(vec![3.0, 4.0]);
let handle = ReactiveDataHandle::new();
handle.track(&x);
handle.track(&y);
assert!(!handle.has_changes());
x.update(|v| v.push(5.0));
assert!(handle.has_changes());
handle.mark_updated();
assert!(!handle.has_changes());
y.set(vec![10.0]);
assert!(handle.has_changes());
}
#[test]
fn test_streaming_buffer_basic() {
let buffer = StreamingBuffer::<f64>::new(5);
assert!(buffer.is_empty());
assert_eq!(buffer.len(), 0);
assert_eq!(buffer.capacity(), 5);
buffer.push(1.0);
buffer.push(2.0);
buffer.push(3.0);
assert!(!buffer.is_empty());
assert_eq!(buffer.len(), 3);
assert_eq!(buffer.read(), vec![1.0, 2.0, 3.0]);
}
#[test]
fn test_streaming_buffer_wrap_around() {
let buffer = StreamingBuffer::<i32>::new(3);
buffer.push(1);
buffer.push(2);
buffer.push(3);
assert_eq!(buffer.read(), vec![1, 2, 3]);
assert!(buffer.is_full());
buffer.push(4);
assert_eq!(buffer.read(), vec![2, 3, 4]);
buffer.push(5);
assert_eq!(buffer.read(), vec![3, 4, 5]);
buffer.push(6);
assert_eq!(buffer.read(), vec![4, 5, 6]);
}
#[test]
fn test_streaming_buffer_push_many() {
let buffer = StreamingBuffer::<f64>::new(5);
buffer.push_many(vec![1.0, 2.0, 3.0]);
assert_eq!(buffer.read(), vec![1.0, 2.0, 3.0]);
buffer.push_many(vec![4.0, 5.0, 6.0, 7.0]);
assert_eq!(buffer.read(), vec![3.0, 4.0, 5.0, 6.0, 7.0]);
}
#[test]
fn test_streaming_buffer_appended_tracking() {
let buffer = StreamingBuffer::<f64>::new(10);
buffer.push(1.0);
buffer.push(2.0);
buffer.push(3.0);
assert_eq!(buffer.appended_since_mark(), 3);
assert_eq!(buffer.read_appended(), vec![1.0, 2.0, 3.0]);
buffer.mark_rendered();
assert_eq!(buffer.appended_since_mark(), 0);
assert!(buffer.read_appended().is_empty());
buffer.push(4.0);
buffer.push(5.0);
assert_eq!(buffer.appended_since_mark(), 2);
assert_eq!(buffer.read_appended(), vec![4.0, 5.0]);
}
#[test]
fn test_streaming_buffer_partial_render() {
let buffer = StreamingBuffer::<f64>::new(5);
buffer.push_many(vec![1.0, 2.0, 3.0]);
assert!(buffer.can_partial_render());
assert_eq!(
buffer.render_state(),
StreamingRenderState::AppendOnly {
visible_appended: 3
}
);
buffer.mark_rendered();
buffer.push_many(vec![4.0, 5.0]);
assert!(buffer.can_partial_render());
assert_eq!(
buffer.render_state(),
StreamingRenderState::AppendOnly {
visible_appended: 2
}
);
buffer.push_many(vec![6.0, 7.0, 8.0, 9.0, 10.0]);
assert!(!buffer.can_partial_render());
assert_eq!(
buffer.render_state(),
StreamingRenderState::FullRedrawRequired
);
}
#[test]
fn test_streaming_buffer_version_tracking() {
let buffer = StreamingBuffer::<f64>::new(10);
let v0 = buffer.version();
buffer.push(1.0);
let v1 = buffer.version();
assert!(v1 > v0);
buffer.push_many(vec![2.0, 3.0]);
let v2 = buffer.version();
assert!(v2 > v1);
}
#[test]
fn test_streaming_buffer_clear() {
let buffer = StreamingBuffer::<f64>::new(5);
buffer.push_many(vec![1.0, 2.0, 3.0]);
assert_eq!(buffer.len(), 3);
buffer.clear();
assert!(buffer.is_empty());
assert_eq!(buffer.len(), 0);
assert!(buffer.read().is_empty());
}
#[test]
fn test_streaming_buffer_subscribers() {
let buffer = StreamingBuffer::<f64>::new(10);
let count = Arc::new(AtomicUsize::new(0));
let count_clone = Arc::clone(&count);
let id = buffer.subscribe(move || {
count_clone.fetch_add(1, AtomicOrdering::Relaxed);
});
buffer.push(1.0);
buffer.push(2.0);
assert_eq!(count.load(AtomicOrdering::Relaxed), 2);
buffer.unsubscribe(id);
buffer.push(3.0);
assert_eq!(count.load(AtomicOrdering::Relaxed), 2);
}
#[test]
fn test_streaming_buffer_thread_safety() {
let buffer = StreamingBuffer::<i32>::new(1000);
let buffer_clone = buffer.clone();
let handle = thread::spawn(move || {
for i in 0..500 {
buffer_clone.push(i);
}
});
for i in 500..1000 {
buffer.push(i);
}
handle.join().unwrap();
assert_eq!(buffer.total_written(), 1000);
assert_eq!(buffer.len(), 1000);
}
#[test]
fn test_streaming_xy_basic() {
let xy = StreamingXY::new(100);
assert!(xy.is_empty());
xy.push(1.0, 10.0);
xy.push(2.0, 20.0);
xy.push(3.0, 30.0);
assert_eq!(xy.len(), 3);
assert_eq!(xy.read_x(), vec![1.0, 2.0, 3.0]);
assert_eq!(xy.read_y(), vec![10.0, 20.0, 30.0]);
}
#[test]
fn test_streaming_xy_push_many() {
let xy = StreamingXY::new(100);
xy.push_many(vec![(1.0, 10.0), (2.0, 20.0), (3.0, 30.0)]);
assert_eq!(xy.read_x(), vec![1.0, 2.0, 3.0]);
assert_eq!(xy.read_y(), vec![10.0, 20.0, 30.0]);
}
#[test]
fn test_streaming_xy_paired_subscribers_fire_once_per_batch() {
let xy = StreamingXY::new(100);
let hits = Arc::new(AtomicUsize::new(0));
let hits_for_callback = Arc::clone(&hits);
let id = xy.subscribe_paired(move || {
hits_for_callback.fetch_add(1, AtomicOrdering::Relaxed);
});
xy.push(1.0, 10.0);
xy.push_many(vec![(2.0, 20.0), (3.0, 30.0)]);
assert_eq!(hits.load(AtomicOrdering::Relaxed), 2);
xy.unsubscribe_paired(id);
xy.push(4.0, 40.0);
assert_eq!(hits.load(AtomicOrdering::Relaxed), 2);
}
#[test]
fn test_streaming_xy_appended() {
let xy = StreamingXY::new(100);
xy.push_many(vec![(1.0, 10.0), (2.0, 20.0)]);
xy.mark_rendered();
xy.push_many(vec![(3.0, 30.0), (4.0, 40.0)]);
assert_eq!(xy.appended_count(), 2);
assert_eq!(xy.read_appended_x(), vec![3.0, 4.0]);
assert_eq!(xy.read_appended_y(), vec![30.0, 40.0]);
assert_eq!(
xy.render_state(),
StreamingRenderState::AppendOnly {
visible_appended: 2
}
);
}
#[test]
fn test_streaming_xy_clear() {
let xy = StreamingXY::new(100);
xy.push_many(vec![(1.0, 10.0), (2.0, 20.0)]);
assert_eq!(xy.len(), 2);
xy.clear();
assert!(xy.is_empty());
}
#[test]
fn test_streaming_buffer_empty_read() {
let buffer = StreamingBuffer::<f64>::new(10);
assert!(buffer.read().is_empty());
assert!(buffer.read_appended().is_empty());
assert_eq!(buffer.appended_since_mark(), 0);
assert!(buffer.is_empty());
assert!(!buffer.is_full());
}
#[test]
fn test_streaming_buffer_capacity_one() {
let buffer = StreamingBuffer::<i32>::new(1);
buffer.push(1);
assert_eq!(buffer.read(), vec![1]);
assert!(buffer.is_full());
buffer.push(2);
assert_eq!(buffer.read(), vec![2]);
assert_eq!(buffer.len(), 1);
buffer.push(3);
assert_eq!(buffer.read(), vec![3]);
}
#[test]
fn test_streaming_buffer_try_new_rejects_zero_capacity() {
assert!(StreamingBuffer::<i32>::try_new(0).is_err());
}
#[test]
fn test_streaming_buffer_new_zero_capacity_is_normalized() {
let buffer = StreamingBuffer::<i32>::new(0);
assert_eq!(buffer.capacity(), 1);
buffer.push(7);
buffer.push(8);
assert_eq!(buffer.read(), vec![8]);
}
#[test]
fn test_streaming_buffer_appended_exceeds_capacity() {
let buffer = StreamingBuffer::<f64>::new(3);
buffer.push_many(vec![1.0, 2.0, 3.0, 4.0, 5.0]);
assert_eq!(buffer.appended_since_mark(), 5);
let appended = buffer.read_appended();
assert_eq!(appended.len(), 3);
assert_eq!(appended, vec![3.0, 4.0, 5.0]);
assert!(buffer.can_partial_render());
assert_eq!(
buffer.render_state(),
StreamingRenderState::AppendOnly {
visible_appended: 3
}
);
}
#[test]
fn test_streaming_buffer_render_state_requires_full_redraw_after_wrap() {
let buffer = StreamingBuffer::<f64>::new(5);
buffer.push_many(vec![1.0, 2.0, 3.0, 4.0]);
buffer.mark_rendered();
buffer.push_many(vec![5.0, 6.0]);
assert_eq!(buffer.read(), vec![2.0, 3.0, 4.0, 5.0, 6.0]);
assert_eq!(
buffer.render_state(),
StreamingRenderState::FullRedrawRequired
);
assert!(!buffer.can_partial_render());
}
#[test]
fn test_streaming_buffer_render_state_stays_append_only_from_empty_cache() {
let buffer = StreamingBuffer::<f64>::new(3);
buffer.mark_rendered();
buffer.push_many(vec![1.0, 2.0, 3.0, 4.0, 5.0]);
assert_eq!(buffer.read(), vec![3.0, 4.0, 5.0]);
assert_eq!(
buffer.render_state(),
StreamingRenderState::AppendOnly {
visible_appended: 3
}
);
}
#[test]
fn test_streaming_buffer_clone_shares_state() {
let buffer1 = StreamingBuffer::<f64>::new(10);
let buffer2 = buffer1.clone();
buffer1.push(1.0);
buffer1.push(2.0);
assert_eq!(buffer2.read(), vec![1.0, 2.0]);
assert_eq!(buffer2.len(), 2);
buffer2.push(3.0);
assert_eq!(buffer1.read(), vec![1.0, 2.0, 3.0]);
}
#[test]
fn test_streaming_buffer_push_after_clear() {
let buffer = StreamingBuffer::<f64>::new(5);
buffer.push_many(vec![1.0, 2.0, 3.0]);
buffer.clear();
buffer.push(10.0);
buffer.push(20.0);
assert_eq!(buffer.read(), vec![10.0, 20.0]);
assert_eq!(buffer.len(), 2);
assert_eq!(buffer.total_written(), 2); }
#[test]
fn test_streaming_buffer_multiple_wrap_cycles() {
let buffer = StreamingBuffer::<i32>::new(3);
buffer.push_many(vec![1, 2, 3]);
assert_eq!(buffer.read(), vec![1, 2, 3]);
buffer.push_many(vec![4, 5, 6]);
assert_eq!(buffer.read(), vec![4, 5, 6]);
buffer.push_many(vec![7, 8, 9]);
assert_eq!(buffer.read(), vec![7, 8, 9]);
buffer.push(10);
assert_eq!(buffer.read(), vec![8, 9, 10]);
}
#[test]
fn test_streaming_buffer_total_written_tracking() {
let buffer = StreamingBuffer::<f64>::new(3);
buffer.push_many(vec![1.0, 2.0, 3.0, 4.0, 5.0]);
assert_eq!(buffer.total_written(), 5);
assert_eq!(buffer.len(), 3);
buffer.push_many(vec![6.0, 7.0]);
assert_eq!(buffer.total_written(), 7);
}
#[test]
fn test_streaming_buffer_mark_rendered_resets_only_appended() {
let buffer = StreamingBuffer::<f64>::new(10);
buffer.push_many(vec![1.0, 2.0, 3.0]);
let version_before = buffer.version();
buffer.mark_rendered();
assert_eq!(buffer.appended_since_mark(), 0);
assert_eq!(buffer.version(), version_before); assert_eq!(buffer.len(), 3); }
#[test]
fn test_streaming_xy_version_tracking() {
let xy = StreamingXY::new(100);
let v0 = xy.version();
xy.push(1.0, 10.0);
let v1 = xy.version();
assert!(v1 > v0);
xy.push_many(vec![(2.0, 20.0), (3.0, 30.0)]);
let v2 = xy.version();
assert!(v2 > v1);
}
#[test]
fn test_streaming_xy_clone_shares_state() {
let xy1 = StreamingXY::new(100);
let xy2 = xy1.clone();
xy1.push(1.0, 10.0);
assert_eq!(xy2.len(), 1);
assert_eq!(xy2.read_x(), vec![1.0]);
}
#[test]
fn test_streaming_buffer_concurrent_push_many() {
use std::sync::Arc;
let buffer = StreamingBuffer::<i32>::new(10000);
let buffer1 = buffer.clone();
let buffer2 = buffer.clone();
let handle1 = thread::spawn(move || {
for i in 0..1000 {
buffer1.push(i);
}
});
let handle2 = thread::spawn(move || {
for i in 1000..2000 {
buffer2.push(i);
}
});
handle1.join().unwrap();
handle2.join().unwrap();
assert_eq!(buffer.total_written(), 2000);
assert_eq!(buffer.len(), 2000);
let data = buffer.read();
let mut sorted = data.clone();
sorted.sort();
sorted.dedup();
assert_eq!(sorted.len(), 2000);
}
#[test]
fn test_streaming_buffer_subscriber_notification_count() {
let buffer = StreamingBuffer::<f64>::new(10);
let notify_count = Arc::new(AtomicUsize::new(0));
let count_clone = Arc::clone(¬ify_count);
buffer.subscribe(move || {
count_clone.fetch_add(1, AtomicOrdering::Relaxed);
});
buffer.push(1.0);
assert_eq!(notify_count.load(AtomicOrdering::Relaxed), 1);
buffer.push_many(vec![2.0, 3.0, 4.0]);
assert_eq!(notify_count.load(AtomicOrdering::Relaxed), 2);
buffer.clear();
assert_eq!(notify_count.load(AtomicOrdering::Relaxed), 3);
}
#[test]
fn test_streaming_buffer_unsubscribe_within_callback_does_not_deadlock() {
let buffer = StreamingBuffer::<i32>::new(4);
let notify_count = Arc::new(AtomicUsize::new(0));
let notify_count_clone = Arc::clone(¬ify_count);
let callback_id = Arc::new(Mutex::new(None));
let callback_id_clone = Arc::clone(&callback_id);
let buffer_clone = buffer.clone();
let id = buffer.subscribe(move || {
notify_count_clone.fetch_add(1, AtomicOrdering::Relaxed);
if let Some(id) = *callback_id_clone.lock().expect("Lock poisoned") {
buffer_clone.unsubscribe(id);
}
});
*callback_id.lock().expect("Lock poisoned") = Some(id);
buffer.push(1);
buffer.push(2);
assert_eq!(notify_count.load(AtomicOrdering::Relaxed), 1);
}
}