use std::sync::{Arc, Weak, atomic::{AtomicBool, AtomicUsize, Ordering}};
use crate::{Signal, ConnectHandle};
use crate::combiner::Combiner;
pub enum Position {
Front,
Back
}
#[derive(Ord, PartialOrd, Eq, PartialEq)]
pub enum Group<G>
where
G: Ord + Send + Sync
{
Front,
Named(G),
Back
}
macro_rules! impl_connect {
($name:ident; $($args:ident)*; $($params:ident)*) => {
pub trait $name<R, C, G, $($args),*>
where
($($args,)*): Clone + 'static,
R: 'static,
C: Combiner<R> + 'static,
G: Ord + Send + Sync
{
fn connect_group_position<F>(&self, f: F, group: Group<G>, pos: Position) -> Connection
where
F: Fn($($args,)*) -> R + Send + Sync + 'static;
fn connect_group_position_extended<F>(&self, f: F, group: Group<G>, pos: Position) -> Connection
where
F: Fn(Connection, $($args,)*) -> R + Send + Sync + 'static;
fn connect_group<F>(&self, f: F, group: Group<G>) -> Connection
where
F: Fn($($args,)*) -> R + Send + Sync + 'static
{
self.connect_group_position(f, group, Position::Back)
}
fn connect_position<F>(&self, f: F, pos: Position) -> Connection
where
F: Fn($($args,)*) -> R + Send + Sync + 'static
{
self.connect_group_position(f, Group::Back, pos)
}
fn connect<F>(&self, f: F) -> Connection
where
F: Fn($($args,)*) -> R + Send + Sync + 'static
{
self.connect_group_position(f, Group::Back, Position::Back)
}
fn connect_group_extended<F>(&self, f: F, group: Group<G>) -> Connection
where
F: Fn(Connection, $($args,)*) -> R + Send + Sync + 'static
{
self.connect_group_position_extended(f, group, Position::Back)
}
fn connect_position_extended<F>(&self, f: F, pos: Position) -> Connection
where
F: Fn(Connection, $($args,)*) -> R + Send + Sync + 'static
{
self.connect_group_position_extended(f, Group::Back, pos)
}
fn connect_extended<F>(&self, f: F) -> Connection
where
F: Fn(Connection, $($args,)*) -> R + Send + Sync + 'static
{
self.connect_group_position_extended(f, Group::Back, Position::Back)
}
}
impl<R, C, G, $($args,)*> $name<R, C, G, $($args,)*> for Signal<($($args,)*), R, C, G>
where
($($args,)*): Clone + 'static,
R: 'static,
C: Combiner<R> + 'static,
G: Ord + Send + Sync + 'static,
{
fn connect_group_position<F>(&self, f: F, group: Group<G>, pos: Position) -> Connection
where
F: Fn($($args,)*) -> R + Send + Sync + 'static
{
let weak_core = Arc::downgrade(&self.core);
let cleanup = move || {
if let Some(core) = weak_core.upgrade() {
let mut lock = core.write().unwrap();
let mut core_clone = (**lock).clone();
core_clone.cleanup();
*lock = Arc::new(core_clone);
}
};
let make_conn = move |connected, blocker_count| {
Connection::new(connected, blocker_count, Arc::new(cleanup))
};
let mut lock = self.core.write().unwrap();
let mut core_clone = (**lock).clone();
let wrapped_f = move |($($params,)*)| f($($params,)*);
let conn = core_clone.connect(wrapped_f, group, pos, make_conn);
*lock = Arc::new(core_clone);
conn
}
fn connect_group_position_extended<F>(&self, f: F, group: Group<G>, pos: Position) -> Connection
where
F: Fn(Connection, $($args,)*) -> R + Send + Sync + 'static
{
let weak_core = Arc::downgrade(&self.core);
let cleanup = move || {
if let Some(core) = weak_core.upgrade() {
let mut lock = core.write().unwrap();
let mut core_clone = (**lock).clone();
core_clone.cleanup();
*lock = Arc::new(core_clone);
}
};
let make_conn = move |connected, blocker_count| {
Connection::new(connected, blocker_count, Arc::new(cleanup))
};
let mut lock = self.core.write().unwrap();
let mut core_clone = (**lock).clone();
let wrapped_f = move |conn, ($($params,)*)| f(conn, $($params,)*);
let conn = core_clone.connect_extended(wrapped_f, group, pos, make_conn);
*lock = Arc::new(core_clone);
conn
}
}
impl<R, C, G, $($args,)*> $name<R, C, G, $($args,)*> for ConnectHandle<($($args,)*), R, C, G>
where
($($args,)*): Clone + 'static,
R: 'static,
C: Combiner<R> + 'static,
G: Ord + Send + Sync + 'static,
{
fn connect_group_position<F>(&self, f: F, group: Group<G>, pos: Position) -> Connection
where
F: Fn($($args,)*) -> R + Send + Sync + 'static
{
self.weak_sig
.upgrade()
.map(|sig| sig.connect_group_position(f, group, pos))
.unwrap_or(Connection::empty())
}
fn connect_group_position_extended<F>(&self, f: F, group: Group<G>, pos: Position) -> Connection
where
F: Fn(Connection, $($args,)*) -> R + Send + Sync + 'static
{
self.weak_sig
.upgrade()
.map(|sig| sig.connect_group_position_extended(f, group, pos))
.unwrap_or(Connection::empty())
}
}
};
}
impl_connect!(Connect0;;);
impl_connect!(Connect1; T0; a);
impl_connect!(Connect2; T0 T1; a b);
impl_connect!(Connect3; T0 T1 T2; a b c);
impl_connect!(Connect4; T0 T1 T2 T3; a b c d);
impl_connect!(Connect5; T0 T1 T2 T3 T4; a b c d e);
impl_connect!(Connect6; T0 T1 T2 T3 T4 T5; a b c d e f);
impl_connect!(Connect7; T0 T1 T2 T3 T4 T5 T6; a b c d e f g);
impl_connect!(Connect8; T0 T1 T2 T3 T4 T5 T6 T7; a b c d e f g h);
impl_connect!(Connect9; T0 T1 T2 T3 T4 T5 T6 T7 T8; a b c d e f g h i);
impl_connect!(Connect10; T0 T1 T2 T3 T4 T5 T6 T7 T8 T9; a b c d e f g h i j);
impl_connect!(Connect11; T0 T1 T2 T3 T4 T5 T6 T7 T8 T9 T10; a b c d e f g h i j k);
impl_connect!(Connect12; T0 T1 T2 T3 T4 T5 T6 T7 T8 T9 T10 T11; a b c d e f g h i j k l);
#[derive(Clone)]
pub struct ConnectionImpl<const SCOPED: bool>
{
weak_connected: Weak<AtomicBool>,
weak_blocker_count: Weak<AtomicUsize>,
cleanup: Arc<dyn Fn() -> () + Send + Sync>
}
impl<const SCOPED: bool> ConnectionImpl<SCOPED> {
fn new(weak_connected: Weak<AtomicBool>, weak_blocker_count: Weak<AtomicUsize>, cleanup: Arc<dyn Fn() -> () + Send + Sync>) -> Self {
Self {
weak_connected,
weak_blocker_count,
cleanup
}
}
fn empty() -> Self {
Self {
weak_connected: Weak::new(),
weak_blocker_count: Weak::new(),
cleanup: Arc::new(|| ())
}
}
pub fn connected(&self) -> bool {
self.weak_connected
.upgrade()
.map(|connected| connected.load(Ordering::SeqCst))
.unwrap_or(false)
}
pub fn disconnect(&self) {
if let Some(connected) = self.weak_connected.upgrade() {
connected.store(false, Ordering::SeqCst);
(self.cleanup)();
}
}
pub fn blocked(&self) -> bool {
self.weak_blocker_count
.upgrade()
.map(|blocker_count| blocker_count.load(Ordering::SeqCst) != 0usize)
.unwrap_or(true)
}
pub fn blocker_count(&self) -> usize {
self.weak_blocker_count
.upgrade()
.map(|blocker_count| blocker_count.load(Ordering::SeqCst))
.unwrap_or(usize::MAX)
}
#[must_use="shared connection blocks are automatically unblocked when dropped"]
pub fn shared_block(&self, initially_blocking: bool) -> SharedConnectionBlock {
SharedConnectionBlock::new(self.weak_blocker_count.clone(), initially_blocking)
}
}
impl<const SCOPED: bool> Drop for ConnectionImpl<SCOPED> {
fn drop(&mut self) {
if SCOPED {
self.disconnect();
}
}
}
impl ConnectionImpl<false> {
#[must_use="ScopedConnection automatically disconnects when dropped"]
pub fn scoped(self) -> ScopedConnection {
ScopedConnection::new(self.weak_connected.clone(), self.weak_blocker_count.clone(), self.cleanup.clone())
}
}
pub type Connection = ConnectionImpl<false>;
pub type ScopedConnection = ConnectionImpl<true>;
pub struct SharedConnectionBlock {
weak_blocker_count: Weak<AtomicUsize>,
blocking: AtomicBool
}
impl SharedConnectionBlock {
fn new(weak_blocker_count: Weak<AtomicUsize>, initially_blocking: bool) -> Self {
let shared_block = Self {
weak_blocker_count,
blocking: AtomicBool::new(false)
};
if initially_blocking {
shared_block.block_impl(true);
}
shared_block
}
pub fn block(&self) {
if !self.blocking() {
self.block_impl(true);
}
}
pub fn unblock(&self) {
if self.blocking() {
self.block_impl(false);
}
}
pub fn blocking(&self) -> bool {
self.blocking.load(Ordering::SeqCst)
}
fn block_impl(&self, block: bool) {
if let Some(blocker_count) = self.weak_blocker_count.upgrade() {
if block {
blocker_count.fetch_add(1, Ordering::SeqCst);
} else {
blocker_count.fetch_sub(1, Ordering::SeqCst);
}
}
self.blocking.store(block, Ordering::SeqCst);
}
}
impl Clone for SharedConnectionBlock {
fn clone(&self) -> Self {
SharedConnectionBlock::new(self.weak_blocker_count.clone(), self.blocking())
}
}
impl Drop for SharedConnectionBlock {
fn drop(&mut self) {
self.unblock();
}
}