sodium-rust 2.1.2

Sodium FRP (Functional Reactive Programming)
Documentation
use crate::impl_::node::{IsNode, IsWeakNode, Node, WeakNode};
use crate::impl_::sodium_ctx::{SodiumCtx, SodiumCtxData};
use crate::impl_::stream::{Stream, WeakStream};
use std::collections::HashMap;
use std::hash::Hash;
use std::sync::{Arc, RwLock};

pub struct Router<A, K> {
    sodium_ctx: SodiumCtx,
    table: Arc<RwLock<HashMap<K, WeakStream<A>>>>,
    node: Node,
}

pub struct WeakRouter<A, K> {
    sodium_ctx: SodiumCtx,
    table: Arc<RwLock<HashMap<K, WeakStream<A>>>>,
    node: WeakNode,
}

impl<A, K> Clone for Router<A, K> {
    fn clone(&self) -> Self {
        Router {
            sodium_ctx: self.sodium_ctx.clone(),
            table: self.table.clone(),
            node: self.node.clone(),
        }
    }
}

impl<A, K> Clone for WeakRouter<A, K> {
    fn clone(&self) -> Self {
        WeakRouter {
            sodium_ctx: self.sodium_ctx.clone(),
            table: self.table.clone(),
            node: self.node.clone(),
        }
    }
}

impl<A: Send + 'static, K: Send + Sync + 'static> IsNode for Router<A, K> {
    fn node(&self) -> &Node {
        &self.node
    }

    fn box_clone(&self) -> Box<dyn IsNode + Send + Sync> {
        Box::new(self.clone())
    }

    fn downgrade(&self) -> Box<dyn IsWeakNode + Send + Sync> {
        Box::new(WeakRouter {
            sodium_ctx: self.sodium_ctx.clone(),
            table: self.table.clone(),
            node: Node::downgrade2(&self.node),
        })
    }
}

impl<A: Send + 'static, K: Send + Sync + 'static> IsWeakNode for WeakRouter<A, K> {
    fn node(&self) -> &WeakNode {
        &self.node
    }

    fn box_clone(&self) -> Box<dyn IsWeakNode + Send + Sync> {
        Box::new(self.clone())
    }

    fn upgrade(&self) -> Option<Box<dyn IsNode + Send + Sync>> {
        if let Some(node) = self.node.upgrade2() {
            Some(Box::new(Router {
                sodium_ctx: self.sodium_ctx.clone(),
                table: self.table.clone(),
                node,
            }))
        } else {
            None
        }
    }
}

impl<A, K> Router<A, K> {
    pub fn new(
        sodium_ctx: &SodiumCtx,
        in_stream: &Stream<A>,
        selector: impl Fn(&A) -> Vec<K> + Send + Sync + 'static,
    ) -> Router<A, K>
    where
        A: Clone + Send + 'static,
        K: Send + Sync + Eq + Hash + 'static,
    {
        let node;
        let table = Arc::new(RwLock::new(HashMap::<K, WeakStream<A>>::new()));
        {
            let sodium_ctx2 = sodium_ctx.clone();
            let table = table.clone();
            let in_stream2 = in_stream.clone();
            node = Node::new(
                sodium_ctx,
                "Router",
                move || {
                    let sodium_ctx = &sodium_ctx2;
                    let keys_firing_op = in_stream2.with_firing_op(|firing_op: &mut Option<A>| {
                        firing_op
                            .as_ref()
                            .map(|firing| (selector(firing), firing.clone()))
                    });
                    if let Some((keys, firing)) = keys_firing_op {
                        for key in keys {
                            let mut table = table.write().unwrap();
                            let mut remove_it = false;
                            if let Some(weak_stream) = table.get(&key) {
                                if let Some(stream) = weak_stream.upgrade() {
                                    stream._send(firing.clone());
                                    sodium_ctx.with_data(|data: &mut SodiumCtxData| {
                                        data.changed_nodes.push(stream.box_clone());
                                    });
                                } else {
                                    remove_it = true;
                                }
                            }
                            if remove_it {
                                table.remove(&key);
                            }
                        }
                    }
                },
                vec![in_stream.box_clone()],
            );
            <dyn IsNode>::add_update_dependencies(&node, vec![in_stream.to_dep()]);
        }
        Router {
            sodium_ctx: sodium_ctx.clone(),
            table,
            node,
        }
    }

    pub fn filter_matches(&self, k: &K) -> Stream<A>
    where
        A: Clone + Send + 'static,
        K: Clone + Send + Sync + Eq + Hash + 'static,
    {
        let mut table = self.table.write().unwrap();
        let existing_op;
        if let Some(weak_stream) = table.get(k) {
            if let Some(stream) = weak_stream.upgrade() {
                existing_op = Some(stream);
            } else {
                existing_op = None;
            }
        } else {
            existing_op = None;
        }
        if let Some(existing) = existing_op {
            existing
        } else {
            let s = Stream::new(&self.sodium_ctx);
            s.node()
                .data()
                .dependencies
                .write()
                .unwrap()
                .push(self.box_clone());
            table.insert(k.clone(), Stream::downgrade(&s));
            {
                let table = self.table.clone();
                let k = k.clone();
                let weak_s = Stream::downgrade(&s);
                s.node()
                    .data()
                    .cleanups
                    .write()
                    .unwrap()
                    .push(Box::new(move || {
                        let _ = &weak_s;
                        let mut table = table.write().unwrap();
                        let mut remove_it = false;
                        if let Some(weak_stream) = table.get(&k) {
                            if weak_stream.data.ptr_eq(&weak_s.data) {
                                remove_it = true;
                            }
                        }
                        if remove_it {
                            table.remove(&k);
                        }
                    }))
            }
            s
        }
    }
}