use super::{
Image, InteractivePlotSession, Plot,
data::{ReactiveTeardown, SharedReactiveCallback},
};
use crate::core::Result;
use std::fmt;
use std::sync::{Arc, Mutex};
#[derive(Clone, Debug, PartialEq, Eq)]
struct PreparedFrameKey {
size_px: (u32, u32),
scale_bits: u32,
time_bits: Option<u64>,
versions: Vec<u64>,
}
#[derive(Clone, Debug)]
struct PreparedFrameCache {
key: PreparedFrameKey,
image: Image,
}
#[derive(Default)]
pub struct ReactiveSubscription {
teardowns: Vec<ReactiveTeardown>,
}
impl fmt::Debug for ReactiveSubscription {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("ReactiveSubscription")
.field("subscription_count", &self.teardowns.len())
.finish()
}
}
impl Drop for ReactiveSubscription {
fn drop(&mut self) {
for teardown in &mut self.teardowns {
teardown();
}
}
}
impl ReactiveSubscription {
pub fn is_empty(&self) -> bool {
self.teardowns.is_empty()
}
}
#[derive(Debug)]
pub struct PreparedPlot {
plot: Plot,
cache: Mutex<Option<PreparedFrameCache>>,
}
impl Clone for PreparedPlot {
fn clone(&self) -> Self {
let cache = self
.cache
.lock()
.expect("PreparedPlot cache lock poisoned")
.clone();
Self {
plot: self.plot.clone(),
cache: Mutex::new(cache),
}
}
}
impl PreparedPlot {
pub(crate) fn new(plot: Plot) -> Self {
Self {
plot,
cache: Mutex::new(None),
}
}
pub fn plot(&self) -> &Plot {
&self.plot
}
pub fn invalidate(&self) {
*self.cache.lock().expect("PreparedPlot cache lock poisoned") = None;
}
pub fn is_dirty(&self, size_px: (u32, u32), scale_factor: f32, time: f64) -> bool {
let key = self.frame_key(size_px, scale_factor, time);
self.cache
.lock()
.expect("PreparedPlot cache lock poisoned")
.as_ref()
.is_none_or(|cached| cached.key != key)
}
pub fn render_frame(&self, size_px: (u32, u32), scale_factor: f32, time: f64) -> Result<Image> {
let key = self.frame_key(size_px, scale_factor, time);
if let Some(image) = self
.cache
.lock()
.expect("PreparedPlot cache lock poisoned")
.as_ref()
.and_then(|cached| (cached.key == key).then(|| cached.image.clone()))
{
return Ok(image);
}
let image = self
.plot
.prepared_frame_plot(size_px, scale_factor, time)
.render()?;
self.plot.mark_reactive_sources_rendered();
*self.cache.lock().expect("PreparedPlot cache lock poisoned") = Some(PreparedFrameCache {
key,
image: image.clone(),
});
Ok(image)
}
pub fn subscribe_reactive<F>(&self, callback: F) -> ReactiveSubscription
where
F: Fn() + Send + Sync + 'static,
{
let callback: SharedReactiveCallback = Arc::new(callback);
let mut subscription = ReactiveSubscription::default();
self.plot
.subscribe_push_updates(callback, &mut subscription.teardowns);
subscription
}
fn frame_key(&self, size_px: (u32, u32), scale_factor: f32, time: f64) -> PreparedFrameKey {
PreparedFrameKey {
size_px,
scale_bits: Plot::sanitize_prepared_scale_factor(scale_factor).to_bits(),
time_bits: self.plot.has_temporal_sources().then_some(time.to_bits()),
versions: self.plot.collect_reactive_versions(),
}
}
pub fn into_interactive(self) -> InteractivePlotSession {
InteractivePlotSession::new(self)
}
}
impl From<Plot> for PreparedPlot {
fn from(plot: Plot) -> Self {
Self::new(plot)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::data::{Observable, StreamingXY};
use crate::render::Color;
use std::sync::{
Arc,
atomic::{AtomicUsize, Ordering},
};
#[test]
fn test_prepared_plot_is_dirty_until_first_render() {
let y = Observable::new(vec![0.0, 1.0, 4.0]);
let plot: Plot = Plot::new()
.line_source(vec![0.0, 1.0, 2.0], y.clone())
.into();
let prepared = plot.prepare();
assert!(prepared.is_dirty((320, 240), 1.0, 0.0));
prepared
.render_frame((320, 240), 1.0, 0.0)
.expect("prepared plot should render");
assert!(!prepared.is_dirty((320, 240), 1.0, 0.0));
y.set(vec![0.0, 1.0, 9.0]);
assert!(prepared.is_dirty((320, 240), 1.0, 0.0));
}
#[test]
fn test_prepared_plot_subscribe_reactive_observable() {
let y = Observable::new(vec![0.0, 1.0, 4.0]);
let plot: Plot = Plot::new()
.line_source(vec![0.0, 1.0, 2.0], y.clone())
.into();
let prepared = plot.prepare();
let hits = Arc::new(AtomicUsize::new(0));
let hits_for_callback = Arc::clone(&hits);
let _subscription = prepared.subscribe_reactive(move || {
hits_for_callback.fetch_add(1, Ordering::Relaxed);
});
y.set(vec![0.0, 1.0, 9.0]);
assert_eq!(hits.load(Ordering::Relaxed), 1);
}
#[test]
fn test_prepared_plot_is_dirty_after_reactive_color_change() {
let color = Observable::new(Color::RED);
let plot: Plot = Plot::new()
.line(&[0.0, 1.0, 2.0], &[0.0, 1.0, 0.5])
.color_source(color.clone())
.into();
let prepared = plot.prepare();
prepared
.render_frame((320, 240), 1.0, 0.0)
.expect("prepared plot should render");
assert!(!prepared.is_dirty((320, 240), 1.0, 0.0));
color.set(Color::BLUE);
assert!(prepared.is_dirty((320, 240), 1.0, 0.0));
}
#[test]
fn test_prepared_plot_subscribe_reactive_color_observable() {
let color = Observable::new(Color::RED);
let plot: Plot = Plot::new()
.line(&[0.0, 1.0, 2.0], &[0.0, 1.0, 0.5])
.color_source(color.clone())
.into();
let prepared = plot.prepare();
let hits = Arc::new(AtomicUsize::new(0));
let hits_for_callback = Arc::clone(&hits);
let _subscription = prepared.subscribe_reactive(move || {
hits_for_callback.fetch_add(1, Ordering::Relaxed);
});
color.set(Color::BLUE);
assert_eq!(hits.load(Ordering::Relaxed), 1);
}
#[test]
fn test_prepared_plot_subscribe_reactive_streaming() {
let stream = StreamingXY::new(32);
let plot: Plot = Plot::new().line_streaming(&stream).into();
let prepared = plot.prepare();
let hits = Arc::new(AtomicUsize::new(0));
let hits_for_callback = Arc::clone(&hits);
let _subscription = prepared.subscribe_reactive(move || {
hits_for_callback.fetch_add(1, Ordering::Relaxed);
});
stream.push(1.0, 1.0);
assert_eq!(hits.load(Ordering::Relaxed), 1);
}
#[test]
fn test_prepared_plot_streaming_callback_can_render_immediately() {
let stream = StreamingXY::new(32);
let plot: Plot = Plot::new()
.line_streaming(&stream)
.xlim(0.0, 4.0)
.ylim(0.0, 4.0)
.into();
let prepared = plot.prepare();
let prepared_for_callback = prepared.clone();
let errors = Arc::new(Mutex::new(Vec::new()));
let errors_for_callback = Arc::clone(&errors);
let _subscription = prepared.subscribe_reactive(move || {
if let Err(err) = prepared_for_callback.render_frame((320, 240), 1.0, 0.0) {
errors_for_callback
.lock()
.expect("error lock poisoned")
.push(err.to_string());
}
});
stream.push(1.0, 1.0);
assert!(
errors.lock().expect("error lock poisoned").is_empty(),
"streaming rerender in callback should not observe mismatched x/y data"
);
}
#[test]
fn test_prepared_plot_normalizes_invalid_scale_factor_in_cache_key() {
let plot: Plot = Plot::new().line(&[0.0, 1.0], &[0.0, 1.0]).into();
let prepared = plot.prepare();
prepared
.render_frame((320, 240), 0.0, 0.0)
.expect("prepared plot should render with sanitized scale factor");
assert!(
!prepared.is_dirty((320, 240), -1.0, 0.0),
"equivalent sanitized scale factors should reuse the cached frame"
);
}
}