use std::{
cell::{Cell, RefCell},
cmp::Ordering,
collections::BinaryHeap,
};
use rxrust::prelude::*;
use super::AppCtx;
use crate::window::WindowId;
#[derive(Default)]
pub struct PriorityTaskQueue {
queue: RefCell<BinaryHeap<QueueEntry>>,
next_seq: Cell<u64>,
}
pub struct QueueEntry {
priority: i64,
seq: u64,
task: PriorityTask,
}
impl PartialEq for QueueEntry {
fn eq(&self, other: &Self) -> bool { self.priority == other.priority && self.seq == other.seq }
}
impl Eq for QueueEntry {}
impl Ord for QueueEntry {
fn cmp(&self, other: &Self) -> Ordering {
other
.priority
.cmp(&self.priority)
.then_with(|| other.seq.cmp(&self.seq))
}
}
impl PartialOrd for QueueEntry {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> { Some(self.cmp(other)) }
}
pub struct PriorOp<S, P> {
source: S,
priority: P,
}
pub struct PriorityTask(Box<dyn FnOnce()>);
pub struct PriorObserver<O, P> {
observer: O,
priority: P,
}
pub trait PriorityObservable: Observable + Sized {
type Item: Clone + 'static;
type Err: Clone + 'static;
fn value_priority(
self, priority: i64, wnd_id: WindowId,
) -> Self::With<PriorOp<Self::Inner, WindowPriority<impl FnMut() -> i64>>> {
self.fn_priority(move || priority, wnd_id)
}
fn fn_priority(
self, f: impl FnMut() -> i64 + 'static, wnd_id: WindowId,
) -> Self::With<PriorOp<Self::Inner, WindowPriority<impl FnMut() -> i64>>> {
self.priority(WindowPriority { wnd_id, priority: f })
}
fn priority<P: Priority + 'static>(self, priority: P) -> Self::With<PriorOp<Self::Inner, P>> {
self.transform(|source| PriorOp { source, priority })
}
}
pub trait Priority {
fn priority(&mut self) -> i64;
fn queue(&mut self) -> Option<&PriorityTaskQueue>;
}
pub struct WindowPriority<P> {
wnd_id: WindowId,
priority: P,
}
impl<P> WindowPriority<P> {
pub fn new(wnd_id: WindowId, priority: P) -> Self { Self { wnd_id, priority } }
}
impl<P: FnMut() -> i64> Priority for WindowPriority<P> {
fn priority(&mut self) -> i64 { (self.priority)() }
fn queue(&mut self) -> Option<&PriorityTaskQueue> {
AppCtx::get_window(self.wnd_id).map(|wnd| {
let queue = wnd.priority_task_queue();
unsafe { std::mem::transmute(queue) }
})
}
}
impl Priority for Box<dyn Priority> {
fn priority(&mut self) -> i64 { (**self).priority() }
fn queue(&mut self) -> Option<&PriorityTaskQueue> { (**self).queue() }
}
impl<T> PriorityObservable for T
where
T: Observable + Sized + 'static,
T::Err: Clone + 'static,
for<'a> T::Item<'a>: Clone + 'static,
{
type Item = T::Item<'static>;
type Err = T::Err;
}
impl<S, P> ObservableType for PriorOp<S, P>
where
S: ObservableType,
{
type Item<'a>
= S::Item<'a>
where
Self: 'a;
type Err = S::Err;
}
type PriorObserverCtx<C, P> =
<C as Context>::With<PriorObserver<<C as Context>::RcMut<Option<<C as Context>::Inner>>, P>>;
impl<C, S, P> CoreObservable<C> for PriorOp<S, P>
where
C: Context,
S: CoreObservable<PriorObserverCtx<C, P>> + 'static,
PrioritySubscription<C::RcMut<Option<C::Inner>>>: Subscription,
{
type Unsub = SourceWithHandle<S::Unsub, PrioritySubscription<C::RcMut<Option<C::Inner>>>>;
fn subscribe(self, context: C) -> Self::Unsub {
let Self { source, priority } = self;
let rc_observer = C::RcMut::from(None);
let context = context.transform(|observer| {
*rc_observer.rc_deref_mut() = Some(observer);
PriorObserver { observer: rc_observer.clone(), priority }
});
let source = source.subscribe(context);
SourceWithHandle { source, handle: PrioritySubscription(rc_observer) }
}
}
impl<RcO, P, Item: 'static, Err: 'static> Observer<Item, Err> for PriorObserver<RcO, P>
where
RcO: Observer<Item, Err> + Clone + 'static,
P: Priority + 'static,
{
fn next(&mut self, value: Item) {
let priority = self.priority.priority();
if let Some(queue) = self.priority.queue() {
let mut observer = self.observer.clone();
let task: Box<dyn FnOnce()> = Box::new(move || observer.next(value));
queue.add(PriorityTask(task), priority);
} else {
self.observer.next(value)
}
}
fn error(mut self, err: Err) {
let priority = self.priority.priority();
if let Some(queue) = self.priority.queue() {
let observer = self.observer.clone();
let task: Box<dyn FnOnce()> = Box::new(move || observer.error(err));
queue.add(PriorityTask(task), priority + 1);
} else {
self.observer.error(err)
}
}
fn complete(mut self) {
let priority = self.priority.priority();
if let Some(queue) = self.priority.queue() {
let observer = self.observer.clone();
let task: Box<dyn FnOnce()> = Box::new(move || {
observer.complete();
});
queue.add(PriorityTask(task), priority + 1);
} else {
self.observer.complete()
}
}
fn is_closed(&self) -> bool { self.observer.is_closed() }
}
pub struct PrioritySubscription<RcO>(RcO);
impl<RcO, O> Subscription for PrioritySubscription<RcO>
where
RcO: RcDerefMut<Target = Option<O>>,
{
fn unsubscribe(self) { self.0.rc_deref_mut().take(); }
fn is_closed(&self) -> bool { self.0.rc_deref().is_none() }
}
impl PriorityTaskQueue {
pub fn is_empty(&self) -> bool { self.queue.borrow().is_empty() }
pub fn pop(&self) -> Option<(PriorityTask, i64)> {
self
.queue
.borrow_mut()
.pop()
.map(|entry| (entry.task, entry.priority))
}
pub fn add(&self, task: PriorityTask, priority: i64) {
let seq = self.next_seq.get();
self.next_seq.set(seq + 1);
self
.queue
.borrow_mut()
.push(QueueEntry { priority, seq, task });
}
}
impl PriorityTask {
pub fn new(f: impl FnOnce() + 'static) -> Self { PriorityTask(Box::new(f)) }
pub fn run(self) { (self.0)() }
}
#[cfg(test)]
mod tests {
use super::*;
#[cfg(target_arch = "wasm32")]
use crate::test_helper::wasm_bindgen_test;
use crate::{prelude::*, reset_test_env, test_helper::TestWindow};
#[cfg_attr(target_arch = "wasm32", wasm_bindgen_test)]
#[test]
fn prior_smoke() {
reset_test_env!();
let wnd = TestWindow::from_widget(fn_widget!(Void::default()));
let wnd_id = wnd.id();
let r = Stateful::new(Vec::new());
let result = r.clone_writer();
Local::of(2)
.value_priority(2, wnd_id)
.subscribe(move |v: i32| result.write().push(v));
let result = r.clone_writer();
Local::of(1)
.value_priority(1, wnd_id)
.subscribe(move |v| result.write().push(v));
let result = r.clone_writer();
Local::of(3)
.value_priority(3, wnd_id)
.subscribe(move |v| result.write().push(v));
wnd.draw_frame();
assert_eq!(*r.read(), vec![1, 2, 3]);
}
#[cfg_attr(target_arch = "wasm32", wasm_bindgen_test)]
#[test]
fn prior_by_smoke() {
reset_test_env!();
let r = Stateful::new(Vec::new());
let wnd = TestWindow::from_widget(fn_widget!(Void::default()));
let wnd_id = wnd.id();
let result = r.clone_writer();
Local::of(2)
.fn_priority(|| 2, wnd_id)
.subscribe(move |v| result.write().push(v));
let result = r.clone_writer();
Local::of(1)
.fn_priority(|| 1, wnd_id)
.subscribe(move |v| result.write().push(v));
let result = r.clone_writer();
Local::of(3)
.fn_priority(|| 3, wnd_id)
.subscribe(move |v| result.write().push(v));
wnd.draw_frame();
assert_eq!(*r.read(), vec![1, 2, 3]);
}
#[cfg_attr(target_arch = "wasm32", wasm_bindgen_test)]
#[test]
fn same_priority_keeps_insertion_order() {
reset_test_env!();
let r = Stateful::new(Vec::new());
let wnd = TestWindow::from_widget(fn_widget!(Void::default()));
let wnd_id = wnd.id();
for value in [1, 2, 3] {
let result = r.clone_writer();
Local::of(value)
.value_priority(1, wnd_id)
.subscribe(move |v| result.write().push(v));
}
wnd.draw_frame();
assert_eq!(*r.read(), vec![1, 2, 3]);
}
}