use std::sync::{
Arc,
atomic::{AtomicBool, AtomicUsize, Ordering},
};
use super::{CellValue, Gettable, Watchable};
use crate::{
cell::{Cell, CellImmutable, CellMutable},
signal::Signal,
};
pub trait MergeMapExt<T>: Watchable<T> {
#[track_caller]
fn merge_map<U, F>(&self, f: F) -> Cell<U, CellImmutable>
where
T: CellValue,
U: CellValue,
F: Fn(&T) -> Cell<U, CellImmutable> + Send + Sync + 'static,
Self: Clone + Send + Sync + 'static,
{
let first_inner = f(&self.get());
let cell = Cell::<U, CellMutable>::new(first_inner.get());
let cell = if let Some(name) = self.name() {
cell.with_name(format!("{}::merge_map", name))
} else {
cell
};
let outer_complete = Arc::new(AtomicBool::new(false));
let active_inners = Arc::new(AtomicUsize::new(1));
let weak = cell.downgrade();
let oc = outer_complete.clone();
let ai = active_inners.clone();
let first_inner_guard = first_inner.subscribe(move |signal| {
if let Some(c) = weak.upgrade() {
match signal {
Signal::Value(_) => c.notify(signal.clone()),
Signal::Complete => {
let remaining = ai.fetch_sub(1, Ordering::SeqCst) - 1;
if remaining == 0 && oc.load(Ordering::SeqCst) {
c.notify(Signal::Complete);
}
}
Signal::Error(e) => c.notify(Signal::Error(e.clone())),
}
}
});
cell.own(first_inner_guard);
let weak_outer = cell.downgrade();
let f = Arc::new(f);
let first = Arc::new(AtomicBool::new(true));
let oc2 = outer_complete.clone();
let ai2 = active_inners.clone();
let outer_guard = self.subscribe(move |signal| {
match signal {
Signal::Value(outer_value) => {
if first.swap(false, Ordering::SeqCst) {
return;
}
let Some(c) = weak_outer.upgrade() else {
return;
};
ai2.fetch_add(1, Ordering::SeqCst);
let inner = f(outer_value.as_ref());
let weak_inner = weak_outer.clone();
let oc_inner = oc2.clone();
let ai_inner = ai2.clone();
let inner_guard = inner.subscribe(move |signal| {
if let Some(c) = weak_inner.upgrade() {
match signal {
Signal::Value(_) => c.notify(signal.clone()),
Signal::Complete => {
let remaining = ai_inner.fetch_sub(1, Ordering::SeqCst) - 1;
if remaining == 0 && oc_inner.load(Ordering::SeqCst) {
c.notify(Signal::Complete);
}
}
Signal::Error(e) => c.notify(Signal::Error(e.clone())),
}
}
});
c.own(inner_guard);
}
Signal::Complete => {
outer_complete.store(true, Ordering::SeqCst);
if active_inners.load(Ordering::SeqCst) == 0
&& let Some(c) = weak_outer.upgrade()
{
c.notify(Signal::Complete);
}
}
Signal::Error(e) => {
if let Some(c) = weak_outer.upgrade() {
c.notify(Signal::Error(e.clone()));
}
}
}
});
cell.own(outer_guard);
cell.lock()
}
}
impl<T, W: Watchable<T>> MergeMapExt<T> for W {}
#[cfg(test)]
mod tests {
use super::*;
use crate::{MapExt, MaterializeDefinite};
#[test]
fn test_merge_map_merges() {
let source = Cell::new(1u64);
let merged = source.merge_map(|v| {
Cell::new(*v).map(|x| x * 10).materialize()
});
assert_eq!(merged.get(), 10);
}
}