use crate::core::{PlottingError, Result};
use std::ops::Deref;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Mutex, RwLock, RwLockReadGuard, Weak};
pub type SubscriberCallback = Box<dyn Fn() + Send + Sync>;
type SharedSubscriberCallback = Arc<dyn Fn() + Send + Sync>;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct SubscriberId(u64);
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
struct DropHookId(u64);
struct Subscriber {
id: SubscriberId,
callback: SharedSubscriberCallback,
}
struct DropHookEntry {
id: DropHookId,
hook: Box<dyn FnOnce() + Send + 'static>,
}
struct ObservableLifecycle {
drop_hooks: Mutex<Vec<DropHookEntry>>,
next_drop_hook_id: AtomicU64,
}
impl ObservableLifecycle {
fn new() -> Self {
Self {
drop_hooks: Mutex::new(Vec::new()),
next_drop_hook_id: AtomicU64::new(0),
}
}
fn add_drop_hook<F>(&self, hook: F) -> DropHookId
where
F: FnOnce() + Send + 'static,
{
let id = DropHookId(self.next_drop_hook_id.fetch_add(1, Ordering::Relaxed));
self.drop_hooks
.lock()
.expect("Observable lifecycle lock poisoned")
.push(DropHookEntry {
id,
hook: Box::new(hook),
});
id
}
fn remove_drop_hook(&self, id: DropHookId) -> bool {
let mut hooks = self
.drop_hooks
.lock()
.expect("Observable lifecycle lock poisoned");
if let Some(pos) = hooks.iter().position(|entry| entry.id == id) {
hooks.remove(pos);
true
} else {
false
}
}
#[cfg(test)]
fn hook_count(&self) -> usize {
self.drop_hooks
.lock()
.expect("Observable lifecycle lock poisoned")
.len()
}
}
impl Drop for ObservableLifecycle {
fn drop(&mut self) {
let hooks = std::mem::take(
&mut *self
.drop_hooks
.lock()
.expect("Observable lifecycle lock poisoned"),
);
for entry in hooks {
(entry.hook)();
}
}
}
fn collect_subscriber_callbacks(
subscribers: &RwLock<Vec<Subscriber>>,
lock_error: &str,
) -> Vec<SharedSubscriberCallback> {
subscribers
.read()
.expect(lock_error)
.iter()
.map(|subscriber| Arc::clone(&subscriber.callback))
.collect()
}
pub struct Observable<T> {
data: Arc<RwLock<T>>,
version: Arc<AtomicU64>,
subscribers: Arc<RwLock<Vec<Subscriber>>>,
next_subscriber_id: Arc<AtomicU64>,
lifecycle: Arc<ObservableLifecycle>,
}
impl<T> Clone for Observable<T> {
fn clone(&self) -> Self {
Self {
data: Arc::clone(&self.data),
version: Arc::clone(&self.version),
subscribers: Arc::clone(&self.subscribers),
next_subscriber_id: Arc::clone(&self.next_subscriber_id),
lifecycle: Arc::clone(&self.lifecycle),
}
}
}
impl<T: std::fmt::Debug> std::fmt::Debug for Observable<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Observable")
.field("data", &self.data)
.field("version", &self.version.load(Ordering::Relaxed))
.field(
"subscriber_count",
&self.subscribers.read().map(|s| s.len()).unwrap_or(0),
)
.finish()
}
}
impl<T> Observable<T> {
fn reserve_subscriber_id(&self) -> SubscriberId {
SubscriberId(self.next_subscriber_id.fetch_add(1, Ordering::Relaxed))
}
fn add_subscriber_with_id(&self, id: SubscriberId, callback: SharedSubscriberCallback) {
let subscriber = Subscriber { id, callback };
self.subscribers
.write()
.expect("Subscribers lock poisoned")
.push(subscriber);
}
pub fn new(value: T) -> Self {
Self {
data: Arc::new(RwLock::new(value)),
version: Arc::new(AtomicU64::new(0)),
subscribers: Arc::new(RwLock::new(Vec::new())),
next_subscriber_id: Arc::new(AtomicU64::new(0)),
lifecycle: Arc::new(ObservableLifecycle::new()),
}
}
pub fn version(&self) -> u64 {
self.version.load(Ordering::Acquire)
}
fn bump_version(&self) {
self.version.fetch_add(1, Ordering::Release);
self.notify_subscribers();
}
pub fn read(&self) -> std::sync::RwLockReadGuard<'_, T> {
self.data.read().expect("Observable lock poisoned")
}
pub fn try_read(&self) -> Option<std::sync::RwLockReadGuard<'_, T>> {
self.data.try_read().ok()
}
pub fn update<F>(&self, f: F)
where
F: FnOnce(&mut T),
{
{
let mut guard = self.data.write().expect("Observable lock poisoned");
f(&mut *guard);
}
self.bump_version();
}
pub fn update_with<F, R>(&self, f: F) -> R
where
F: FnOnce(&mut T) -> R,
{
let result = {
let mut guard = self.data.write().expect("Observable lock poisoned");
f(&mut *guard)
};
self.bump_version();
result
}
pub fn set(&self, value: T) {
{
let mut guard = self.data.write().expect("Observable lock poisoned");
*guard = value;
}
self.bump_version();
}
pub fn subscribe<F>(&self, callback: F) -> SubscriberId
where
F: Fn() + Send + Sync + 'static,
{
let id = self.reserve_subscriber_id();
self.add_subscriber_with_id(id, Arc::new(callback));
id
}
pub fn unsubscribe(&self, id: SubscriberId) -> bool {
let mut subscribers = self.subscribers.write().expect("Subscribers lock poisoned");
if let Some(pos) = subscribers.iter().position(|s| s.id == id) {
subscribers.remove(pos);
true
} else {
false
}
}
pub fn subscriber_count(&self) -> usize {
self.subscribers
.read()
.expect("Subscribers lock poisoned")
.len()
}
fn notify_subscribers(&self) {
let callbacks =
collect_subscriber_callbacks(&self.subscribers, "Subscribers lock poisoned");
for callback in callbacks {
callback();
}
}
fn on_last_drop<F>(&self, hook: F) -> DropHookId
where
F: FnOnce() + Send + 'static,
{
self.lifecycle.add_drop_hook(hook)
}
fn remove_drop_hook(&self, id: DropHookId) -> bool {
self.lifecycle.remove_drop_hook(id)
}
#[cfg(test)]
fn lifecycle_hook_count(&self) -> usize {
self.lifecycle.hook_count()
}
pub fn downgrade(&self) -> WeakObservable<T> {
WeakObservable {
data: Arc::downgrade(&self.data),
version: Arc::downgrade(&self.version),
subscribers: Arc::downgrade(&self.subscribers),
next_subscriber_id: Arc::downgrade(&self.next_subscriber_id),
lifecycle: Arc::downgrade(&self.lifecycle),
}
}
}
impl<T: Clone> Observable<T> {
pub fn get(&self) -> T {
self.read().clone()
}
}
impl<T: Default> Default for Observable<T> {
fn default() -> Self {
Self::new(T::default())
}
}
pub struct WeakObservable<T> {
data: Weak<RwLock<T>>,
version: Weak<AtomicU64>,
subscribers: Weak<RwLock<Vec<Subscriber>>>,
next_subscriber_id: Weak<AtomicU64>,
lifecycle: Weak<ObservableLifecycle>,
}
impl<T> Clone for WeakObservable<T> {
fn clone(&self) -> Self {
Self {
data: Weak::clone(&self.data),
version: Weak::clone(&self.version),
subscribers: Weak::clone(&self.subscribers),
next_subscriber_id: Weak::clone(&self.next_subscriber_id),
lifecycle: Weak::clone(&self.lifecycle),
}
}
}
impl<T> WeakObservable<T> {
pub fn upgrade(&self) -> Option<Observable<T>> {
let data = self.data.upgrade()?;
let version = self.version.upgrade()?;
let subscribers = self.subscribers.upgrade()?;
let next_subscriber_id = self.next_subscriber_id.upgrade()?;
let lifecycle = self.lifecycle.upgrade()?;
Some(Observable {
data,
version,
subscribers,
next_subscriber_id,
lifecycle,
})
}
pub fn is_alive(&self) -> bool {
self.data.strong_count() > 0
}
}
pub struct BatchUpdate<'a> {
observables: Vec<&'a dyn BatchNotifier>,
}
pub trait BatchNotifier {
fn notify(&self);
}
impl<T> BatchNotifier for Observable<T> {
fn notify(&self) {
self.notify_subscribers();
}
}
impl<'a> BatchUpdate<'a> {
pub fn new() -> Self {
Self {
observables: Vec::new(),
}
}
pub fn add<T>(&mut self, observable: &'a Observable<T>) {
self.observables.push(observable);
}
}
impl<'a> Default for BatchUpdate<'a> {
fn default() -> Self {
Self::new()
}
}
impl<'a> Drop for BatchUpdate<'a> {
fn drop(&mut self) {
for obs in &self.observables {
obs.notify();
}
}
}
pub struct SlidingWindowObservable<T> {
inner: Observable<Vec<T>>,
max_size: usize,
}
impl<T: Clone> SlidingWindowObservable<T> {
pub fn new(max_size: usize) -> Self {
Self {
inner: Observable::new(Vec::with_capacity(max_size)),
max_size,
}
}
pub fn push(&self, value: T) {
self.inner.update(|data| {
if data.len() >= self.max_size {
data.remove(0);
}
data.push(value);
});
}
pub fn push_many(&self, values: impl IntoIterator<Item = T>) {
self.inner.update(|data| {
for value in values {
if data.len() >= self.max_size {
data.remove(0);
}
data.push(value);
}
});
}
pub fn clear(&self) {
self.inner.update(|data| data.clear());
}
pub fn version(&self) -> u64 {
self.inner.version()
}
pub fn read(&self) -> std::sync::RwLockReadGuard<'_, Vec<T>> {
self.inner.read()
}
pub fn subscribe<F>(&self, callback: F) -> SubscriberId
where
F: Fn() + Send + Sync + 'static,
{
self.inner.subscribe(callback)
}
pub fn unsubscribe(&self, id: SubscriberId) -> bool {
self.inner.unsubscribe(id)
}
pub fn as_observable(&self) -> &Observable<Vec<T>> {
&self.inner
}
pub fn max_size(&self) -> usize {
self.max_size
}
pub fn len(&self) -> usize {
self.inner.read().len()
}
pub fn is_empty(&self) -> bool {
self.inner.read().is_empty()
}
}
impl<T: Clone> Clone for SlidingWindowObservable<T> {
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
max_size: self.max_size,
}
}
}
pub trait IntoObservable<T> {
fn into_observable(self) -> Observable<T>;
}
pub fn lift<T, U, F>(source: &Observable<T>, f: F) -> Observable<U>
where
T: Clone + Send + Sync + 'static,
U: Send + Sync + 'static,
F: Fn(T) -> U + Send + Sync + 'static,
{
let initial = f(source.get());
let derived = Observable::new(initial);
let f = Arc::new(f);
let weak_source = source.downgrade();
let weak_derived = derived.downgrade();
let id = source.reserve_subscriber_id();
source.add_subscriber_with_id(
id,
Arc::new(move || {
let Some(source) = weak_source.upgrade() else {
return;
};
let Some(derived) = weak_derived.upgrade() else {
source.unsubscribe(id);
return;
};
let new_value = f(source.get());
{
let mut guard = derived.data.write().expect("Lock poisoned");
*guard = new_value;
}
derived.bump_version();
}),
);
let weak_source_for_drop = source.downgrade();
derived.on_last_drop(move || {
if let Some(source) = weak_source_for_drop.upgrade() {
source.unsubscribe(id);
}
});
derived
}
pub fn lift2<T1, T2, U, F>(
source1: &Observable<T1>,
source2: &Observable<T2>,
f: F,
) -> Observable<U>
where
T1: Clone + Send + Sync + 'static,
T2: Clone + Send + Sync + 'static,
U: Send + Sync + 'static,
F: Fn(T1, T2) -> U + Send + Sync + 'static,
{
let initial = f(source1.get(), source2.get());
let derived = Observable::new(initial);
let f = Arc::new(f);
let weak_derived = derived.downgrade();
let weak_s1 = source1.downgrade();
let weak_s2 = source2.downgrade();
let source1_id = source1.reserve_subscriber_id();
let source2_id = source2.reserve_subscriber_id();
{
let f_clone = Arc::clone(&f);
let weak_derived = weak_derived.clone();
let weak_s1 = weak_s1.clone();
let weak_s2 = weak_s2.clone();
source1.add_subscriber_with_id(
source1_id,
Arc::new(move || {
let Some(source1) = weak_s1.upgrade() else {
return;
};
let Some(source2) = weak_s2.upgrade() else {
source1.unsubscribe(source1_id);
return;
};
let Some(derived) = weak_derived.upgrade() else {
source1.unsubscribe(source1_id);
return;
};
let new_value = f_clone(source1.get(), source2.get());
{
let mut guard = derived.data.write().expect("Lock poisoned");
*guard = new_value;
}
derived.bump_version();
}),
);
}
{
let f_clone = Arc::clone(&f);
let weak_derived = weak_derived.clone();
let weak_s1 = weak_s1.clone();
let weak_s2 = weak_s2.clone();
source2.add_subscriber_with_id(
source2_id,
Arc::new(move || {
let Some(source1) = weak_s1.upgrade() else {
if let Some(source2) = weak_s2.upgrade() {
source2.unsubscribe(source2_id);
}
return;
};
let Some(source2) = weak_s2.upgrade() else {
return;
};
let Some(derived) = weak_derived.upgrade() else {
source2.unsubscribe(source2_id);
return;
};
let new_value = f_clone(source1.get(), source2.get());
{
let mut guard = derived.data.write().expect("Lock poisoned");
*guard = new_value;
}
derived.bump_version();
}),
);
}
let weak_source1_for_source2_drop = source1.downgrade();
let source2_drop_hook_id = source2.on_last_drop(move || {
if let Some(source1) = weak_source1_for_source2_drop.upgrade() {
source1.unsubscribe(source1_id);
}
});
let weak_source2_for_source1_drop = source2.downgrade();
let source1_drop_hook_id = source1.on_last_drop(move || {
if let Some(source2) = weak_source2_for_source1_drop.upgrade() {
source2.unsubscribe(source2_id);
}
});
let weak_source1_for_derived_drop = source1.downgrade();
let weak_source2_for_derived_drop = source2.downgrade();
derived.on_last_drop(move || {
if let Some(source1) = weak_source1_for_derived_drop.upgrade() {
source1.unsubscribe(source1_id);
source1.remove_drop_hook(source1_drop_hook_id);
}
if let Some(source2) = weak_source2_for_derived_drop.upgrade() {
source2.unsubscribe(source2_id);
source2.remove_drop_hook(source2_drop_hook_id);
}
});
derived
}
pub fn map<T, U, F>(source: &Observable<T>, f: F) -> Observable<U>
where
T: Clone + Send + Sync + 'static,
U: Send + Sync + 'static,
F: Fn(T) -> U + Send + Sync + 'static,
{
lift(source, f)
}
#[derive(Clone)]
pub struct ReactiveDataHandle {
last_versions: Arc<RwLock<Vec<u64>>>,
current_versions: Arc<RwLock<Vec<Arc<AtomicU64>>>>,
}
impl ReactiveDataHandle {
pub fn new() -> Self {
Self {
last_versions: Arc::new(RwLock::new(Vec::new())),
current_versions: Arc::new(RwLock::new(Vec::new())),
}
}
pub fn track<T>(&self, observable: &Observable<T>) {
let mut last = self.last_versions.write().expect("Lock poisoned");
let mut current = self.current_versions.write().expect("Lock poisoned");
last.push(observable.version());
current.push(Arc::clone(&observable.version));
}
pub fn has_changes(&self) -> bool {
let last = self.last_versions.read().expect("Lock poisoned");
let current = self.current_versions.read().expect("Lock poisoned");
for (i, version_arc) in current.iter().enumerate() {
if let Some(&last_version) = last.get(i) {
if version_arc.load(Ordering::Acquire) != last_version {
return true;
}
}
}
false
}
pub fn mark_updated(&self) {
let mut last = self.last_versions.write().expect("Lock poisoned");
let current = self.current_versions.read().expect("Lock poisoned");
for (i, version_arc) in current.iter().enumerate() {
if let Some(last_version) = last.get_mut(i) {
*last_version = version_arc.load(Ordering::Acquire);
}
}
}
}
impl Default for ReactiveDataHandle {
fn default() -> Self {
Self::new()
}
}
impl<T> IntoObservable<Vec<T>> for Vec<T> {
fn into_observable(self) -> Observable<Vec<T>> {
Observable::new(self)
}
}
impl<T: Clone, const N: usize> IntoObservable<Vec<T>> for [T; N] {
fn into_observable(self) -> Observable<Vec<T>> {
Observable::new(self.to_vec())
}
}
pub struct StreamingBufferView<'a, T> {
guard: RwLockReadGuard<'a, Vec<Option<T>>>,
}
impl<T> Deref for StreamingBufferView<'_, T> {
type Target = [Option<T>];
fn deref(&self) -> &Self::Target {
&self.guard
}
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub enum StreamingRenderState {
Unchanged,
AppendOnly { visible_appended: usize },
FullRedrawRequired,
}
impl StreamingRenderState {
pub fn visible_appended(self) -> usize {
match self {
Self::Unchanged | Self::FullRedrawRequired => 0,
Self::AppendOnly { visible_appended } => visible_appended,
}
}
pub fn can_incrementally_render(self) -> bool {
matches!(self, Self::AppendOnly { .. })
}
}
pub struct StreamingBuffer<T> {
data: Arc<RwLock<Vec<Option<T>>>>,
capacity: usize,
write_pos: Arc<std::sync::atomic::AtomicUsize>,
total_written: Arc<AtomicU64>,
version: Arc<AtomicU64>,
appended_since_render: Arc<std::sync::atomic::AtomicUsize>,
subscribers: Arc<RwLock<Vec<Subscriber>>>,
next_subscriber_id: Arc<AtomicU64>,
}
impl<T: Clone> StreamingBuffer<T> {
pub fn new(capacity: usize) -> Self {
Self::try_new(capacity).unwrap_or_else(|_| Self::with_capacity(1))
}
pub fn try_new(capacity: usize) -> Result<Self> {
if capacity == 0 {
return Err(PlottingError::InvalidInput(
"StreamingBuffer capacity must be at least 1".to_string(),
));
}
Ok(Self::with_capacity(capacity))
}
fn with_capacity(capacity: usize) -> Self {
let mut data = Vec::with_capacity(capacity);
data.resize_with(capacity, || None);
Self {
data: Arc::new(RwLock::new(data)),
capacity,
write_pos: Arc::new(std::sync::atomic::AtomicUsize::new(0)),
total_written: Arc::new(AtomicU64::new(0)),
version: Arc::new(AtomicU64::new(0)),
appended_since_render: Arc::new(std::sync::atomic::AtomicUsize::new(0)),
subscribers: Arc::new(RwLock::new(Vec::new())),
next_subscriber_id: Arc::new(AtomicU64::new(0)),
}
}
pub fn push(&self, value: T) {
{
let mut data = self.data.write().expect("Lock poisoned");
let write_pos = self.write_pos.load(Ordering::Relaxed);
let pos = write_pos % self.capacity;
data[pos] = Some(value);
self.write_pos
.store(write_pos.wrapping_add(1), Ordering::Release);
let total = self.total_written.load(Ordering::Relaxed);
self.total_written
.store(total.saturating_add(1), Ordering::Release);
let appended = self.appended_since_render.load(Ordering::Relaxed);
self.appended_since_render
.store(appended.saturating_add(1), Ordering::Release);
}
self.bump_version();
}
pub fn push_many(&self, values: impl IntoIterator<Item = T>) {
let values: Vec<T> = values.into_iter().collect();
let count = values.len();
if count == 0 {
return;
}
{
let mut data = self.data.write().expect("Lock poisoned");
let mut write_pos = self.write_pos.load(Ordering::Relaxed);
for value in values {
let pos = write_pos % self.capacity;
data[pos] = Some(value);
write_pos = write_pos.wrapping_add(1);
}
self.write_pos.store(write_pos, Ordering::Release);
let total = self.total_written.load(Ordering::Relaxed);
self.total_written
.store(total.saturating_add(count as u64), Ordering::Release);
let appended = self.appended_since_render.load(Ordering::Relaxed);
self.appended_since_render
.store(appended.saturating_add(count), Ordering::Release);
}
self.bump_version();
}
pub fn read(&self) -> Vec<T> {
let data = self.data.read().expect("Lock poisoned");
let total = self.total_written.load(Ordering::Acquire);
let write_pos = self.write_pos.load(Ordering::Acquire);
if total == 0 {
return Vec::new();
}
let len = std::cmp::min(total as usize, self.capacity);
let mut result = Vec::with_capacity(len);
if total <= self.capacity as u64 {
for i in 0..len {
if let Some(ref value) = data[i] {
result.push(value.clone());
}
}
} else {
let start = write_pos % self.capacity;
for i in 0..self.capacity {
let idx = (start + i) % self.capacity;
if let Some(ref value) = data[idx] {
result.push(value.clone());
}
}
}
result
}
pub fn read_view(&self) -> StreamingBufferView<'_, T> {
StreamingBufferView {
guard: self.data.read().expect("Lock poisoned"),
}
}
pub fn read_appended(&self) -> Vec<T> {
let data = self.data.read().expect("Lock poisoned");
let appended = self.appended_since_render.load(Ordering::Acquire);
let write_pos = self.write_pos.load(Ordering::Acquire);
if appended == 0 {
return Vec::new();
}
let count = std::cmp::min(appended, self.capacity);
let mut result = Vec::with_capacity(count);
for i in 0..count {
let idx = (write_pos + self.capacity - count + i) % self.capacity;
if let Some(ref value) = data[idx] {
result.push(value.clone());
}
}
result
}
pub fn appended_since_mark(&self) -> usize {
self.appended_since_render.load(Ordering::Acquire)
}
pub fn mark_rendered(&self) {
self.appended_since_render.store(0, Ordering::Release);
}
pub fn render_state(&self) -> StreamingRenderState {
let appended = self.appended_since_render.load(Ordering::Acquire);
if appended == 0 {
return StreamingRenderState::Unchanged;
}
let total_written = self.total_written.load(Ordering::Acquire);
let visible_after = std::cmp::min(total_written as usize, self.capacity);
let total_before = total_written.saturating_sub(appended as u64);
let visible_before = std::cmp::min(total_before as usize, self.capacity);
if visible_before == 0 {
return StreamingRenderState::AppendOnly {
visible_appended: visible_after,
};
}
if visible_before.saturating_add(appended) <= self.capacity {
return StreamingRenderState::AppendOnly {
visible_appended: appended,
};
}
StreamingRenderState::FullRedrawRequired
}
pub fn can_partial_render(&self) -> bool {
self.render_state().can_incrementally_render()
}
pub fn version(&self) -> u64 {
self.version.load(Ordering::Acquire)
}
pub fn capacity(&self) -> usize {
self.capacity
}
pub fn len(&self) -> usize {
let total = self.total_written.load(Ordering::Acquire);
std::cmp::min(total as usize, self.capacity)
}
pub fn is_empty(&self) -> bool {
self.total_written.load(Ordering::Acquire) == 0
}
pub fn is_full(&self) -> bool {
self.total_written.load(Ordering::Acquire) >= self.capacity as u64
}
pub fn total_written(&self) -> u64 {
self.total_written.load(Ordering::Acquire)
}
pub fn clear(&self) {
{
let mut data = self.data.write().expect("Lock poisoned");
for slot in data.iter_mut() {
*slot = None;
}
self.write_pos.store(0, Ordering::Release);
self.total_written.store(0, Ordering::Release);
self.appended_since_render.store(0, Ordering::Release);
}
self.bump_version();
}
pub fn subscribe<F>(&self, callback: F) -> SubscriberId
where
F: Fn() + Send + Sync + 'static,
{
let id = SubscriberId(self.next_subscriber_id.fetch_add(1, Ordering::Relaxed));
let subscriber = Subscriber {
id,
callback: Arc::new(callback),
};
self.subscribers
.write()
.expect("Lock poisoned")
.push(subscriber);
id
}
pub fn unsubscribe(&self, id: SubscriberId) -> bool {
let mut subscribers = self.subscribers.write().expect("Lock poisoned");
if let Some(pos) = subscribers.iter().position(|s| s.id == id) {
subscribers.remove(pos);
true
} else {
false
}
}
fn bump_version(&self) {
self.version.fetch_add(1, Ordering::Release);
let callbacks = collect_subscriber_callbacks(&self.subscribers, "Lock poisoned");
for callback in callbacks {
callback();
}
}
}
impl<T: Clone> Clone for StreamingBuffer<T> {
fn clone(&self) -> Self {
Self {
data: Arc::clone(&self.data),
capacity: self.capacity,
write_pos: Arc::clone(&self.write_pos),
total_written: Arc::clone(&self.total_written),
version: Arc::clone(&self.version),
appended_since_render: Arc::clone(&self.appended_since_render),
subscribers: Arc::clone(&self.subscribers),
next_subscriber_id: Arc::clone(&self.next_subscriber_id),
}
}
}
pub struct StreamingXY {
x: StreamingBuffer<f64>,
y: StreamingBuffer<f64>,
subscribers: Arc<RwLock<Vec<Subscriber>>>,
next_subscriber_id: Arc<AtomicU64>,
}
impl StreamingXY {
pub fn new(capacity: usize) -> Self {
Self {
x: StreamingBuffer::new(capacity),
y: StreamingBuffer::new(capacity),
subscribers: Arc::new(RwLock::new(Vec::new())),
next_subscriber_id: Arc::new(AtomicU64::new(0)),
}
}
pub fn push(&self, x: f64, y: f64) {
self.x.push(x);
self.y.push(y);
self.notify_subscribers();
}
pub fn push_many(&self, points: impl IntoIterator<Item = (f64, f64)>) {
let mut pushed_any = false;
for (x, y) in points {
self.x.push(x);
self.y.push(y);
pushed_any = true;
}
if pushed_any {
self.notify_subscribers();
}
}
pub fn x(&self) -> &StreamingBuffer<f64> {
&self.x
}
pub fn y(&self) -> &StreamingBuffer<f64> {
&self.y
}
pub fn read_x(&self) -> Vec<f64> {
self.x.read()
}
pub fn read_y(&self) -> Vec<f64> {
self.y.read()
}
pub fn read_view_x(&self) -> StreamingBufferView<'_, f64> {
self.x.read_view()
}
pub fn read_view_y(&self) -> StreamingBufferView<'_, f64> {
self.y.read_view()
}
pub fn read_view(&self) -> (StreamingBufferView<'_, f64>, StreamingBufferView<'_, f64>) {
(self.x.read_view(), self.y.read_view())
}
pub fn read_appended_x(&self) -> Vec<f64> {
self.x.read_appended()
}
pub fn read_appended_y(&self) -> Vec<f64> {
self.y.read_appended()
}
pub fn appended_count(&self) -> usize {
self.x.appended_since_mark()
}
pub fn mark_rendered(&self) {
self.x.mark_rendered();
self.y.mark_rendered();
}
pub fn can_partial_render(&self) -> bool {
self.render_state().can_incrementally_render()
}
pub fn render_state(&self) -> StreamingRenderState {
match (self.x.render_state(), self.y.render_state()) {
(StreamingRenderState::Unchanged, StreamingRenderState::Unchanged) => {
StreamingRenderState::Unchanged
}
(
StreamingRenderState::AppendOnly {
visible_appended: x,
},
StreamingRenderState::AppendOnly {
visible_appended: y,
},
) => StreamingRenderState::AppendOnly {
visible_appended: x.min(y),
},
_ => StreamingRenderState::FullRedrawRequired,
}
}
pub fn version(&self) -> u64 {
std::cmp::max(self.x.version(), self.y.version())
}
pub fn len(&self) -> usize {
self.x.len()
}
pub fn is_empty(&self) -> bool {
self.x.is_empty()
}
pub fn clear(&self) {
self.x.clear();
self.y.clear();
self.notify_subscribers();
}
pub(crate) fn subscribe_paired<F>(&self, callback: F) -> SubscriberId
where
F: Fn() + Send + Sync + 'static,
{
let id = SubscriberId(self.next_subscriber_id.fetch_add(1, Ordering::Relaxed));
let subscriber = Subscriber {
id,
callback: Arc::new(callback),
};
self.subscribers
.write()
.expect("Lock poisoned")
.push(subscriber);
id
}
pub(crate) fn unsubscribe_paired(&self, id: SubscriberId) -> bool {
let mut subscribers = self.subscribers.write().expect("Lock poisoned");
if let Some(pos) = subscribers
.iter()
.position(|subscriber| subscriber.id == id)
{
subscribers.remove(pos);
true
} else {
false
}
}
fn notify_subscribers(&self) {
let callbacks = collect_subscriber_callbacks(&self.subscribers, "Lock poisoned");
for callback in callbacks {
callback();
}
}
}
impl Clone for StreamingXY {
fn clone(&self) -> Self {
Self {
x: self.x.clone(),
y: self.y.clone(),
subscribers: Arc::clone(&self.subscribers),
next_subscriber_id: Arc::clone(&self.next_subscriber_id),
}
}
}
impl std::fmt::Debug for StreamingXY {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("StreamingXY")
.field("len", &self.len())
.field("version", &self.version())
.finish()
}
}
#[cfg(test)]
mod tests;