use std::sync::{Arc, Mutex};
use uuid::Uuid;
use crate::{
pipeline::{Definite, MaterializeDefinite, Pipeline, PipelineInstall, PipelineSeed},
signal::Signal,
subscription::SubscriptionGuard,
traits::CellValue,
};
trait UpstreamHandle<T>: Send + Sync + 'static {
fn seed(&self) -> T;
fn install_upstream(&self, sink: Arc<dyn Fn(&Signal<T>) + Send + Sync>) -> SubscriptionGuard;
}
struct UpstreamWrap<P>(P);
impl<T, P> UpstreamHandle<T> for UpstreamWrap<P>
where
T: CellValue,
P: Pipeline<T, Definite> + PipelineSeed<T>,
{
fn seed(&self) -> T {
self.0.seed()
}
fn install_upstream(&self, sink: Arc<dyn Fn(&Signal<T>) + Send + Sync>) -> SubscriptionGuard {
self.0.install(sink)
}
}
type Subscriber<T> = Arc<dyn Fn(&Signal<T>) + Send + Sync>;
pub(crate) struct SharedPipelineInner<T: CellValue> {
upstream: Arc<dyn UpstreamHandle<T>>,
upstream_guard: Mutex<Option<SubscriptionGuard>>,
subscribers: parking_lot::Mutex<Arc<Vec<(Uuid, Subscriber<T>)>>>,
}
impl<T: CellValue> SharedPipelineInner<T> {
fn add_subscriber(&self, id: Uuid, cb: Subscriber<T>) {
let _old = {
let mut guard = self.subscribers.lock();
let mut next: Vec<(Uuid, Subscriber<T>)> = (**guard).clone();
next.push((id, cb));
std::mem::replace(&mut *guard, Arc::new(next))
};
}
fn remove_subscriber(&self, id: Uuid) -> usize {
let (remaining, _old) = {
let mut guard = self.subscribers.lock();
let mut next: Vec<(Uuid, Subscriber<T>)> = (**guard)
.iter()
.filter(|(i, _)| *i != id)
.cloned()
.collect();
let remaining = next.len();
next.shrink_to_fit();
(remaining, std::mem::replace(&mut *guard, Arc::new(next)))
};
remaining
}
}
pub struct SharedPipeline<T: CellValue> {
inner: Arc<SharedPipelineInner<T>>,
}
impl<T: CellValue> Clone for SharedPipeline<T> {
fn clone(&self) -> Self {
Self {
inner: Arc::clone(&self.inner),
}
}
}
impl<T: CellValue> SharedPipeline<T> {
#[allow(private_bounds)]
pub fn new<P: Pipeline<T, Definite> + PipelineSeed<T>>(p: P) -> Self {
let upstream: Arc<dyn UpstreamHandle<T>> = Arc::new(UpstreamWrap(p));
Self {
inner: Arc::new(SharedPipelineInner {
upstream,
upstream_guard: Mutex::new(None),
subscribers: parking_lot::Mutex::new(Arc::new(Vec::new())),
}),
}
}
}
impl<T: CellValue> PipelineSeed<T> for SharedPipeline<T> {
fn seed(&self) -> T {
self.inner.upstream.seed()
}
}
impl<T: CellValue> PipelineInstall<T> for SharedPipeline<T> {
fn install(&self, callback: Arc<dyn Fn(&Signal<T>) + Send + Sync>) -> SubscriptionGuard {
let id = Uuid::new_v4();
self.inner.add_subscriber(id, callback);
{
let mut guard_slot = self
.inner
.upstream_guard
.lock()
.expect("share upstream_guard poisoned");
if guard_slot.is_none() {
let weak = Arc::downgrade(&self.inner);
let fanout: Arc<dyn Fn(&Signal<T>) + Send + Sync> =
Arc::new(move |signal: &Signal<T>| {
let Some(inner) = weak.upgrade() else {
return;
};
let subs = Arc::clone(&*inner.subscribers.lock());
for (_, cb) in subs.iter() {
cb(signal);
}
});
let upstream_guard = self.inner.upstream.install_upstream(fanout);
*guard_slot = Some(upstream_guard);
}
}
let weak = Arc::downgrade(&self.inner);
SubscriptionGuard::from_callback(move || {
let Some(inner) = weak.upgrade() else {
return;
};
let remaining = inner.remove_subscriber(id);
if remaining == 0 {
let mut slot = inner
.upstream_guard
.lock()
.expect("share upstream_guard poisoned");
let _drop_outside_lock = slot.take();
drop(slot);
drop(_drop_outside_lock);
}
})
}
}
#[allow(private_bounds)]
impl<T: CellValue> Pipeline<T, Definite> for SharedPipeline<T> {}
impl<T: CellValue> MaterializeDefinite<T> for SharedPipeline<T> {
}
#[allow(private_bounds)]
pub trait PipelineShareExt<T: CellValue>: Pipeline<T, Definite> + PipelineSeed<T> {
fn share(self) -> SharedPipeline<T> {
SharedPipeline::new(self)
}
}
impl<T: CellValue, P: Pipeline<T, Definite> + PipelineSeed<T>> PipelineShareExt<T> for P {}