sodium-rust 2.1.2

Sodium FRP (Functional Reactive Programming)
Documentation
use crate::impl_::node::IsNode;
use crate::impl_::sodium_ctx::SodiumCtx;
use crate::impl_::sodium_ctx::SodiumCtxData;
use crate::impl_::stream::Stream;
use crate::impl_::stream::WeakStream;

pub struct StreamSink<A> {
    stream: Stream<A>,
    sodium_ctx: SodiumCtx,
}

pub struct WeakStreamSink<A> {
    stream: WeakStream<A>,
    sodium_ctx: SodiumCtx,
}

impl<A> Clone for StreamSink<A> {
    fn clone(&self) -> Self {
        StreamSink {
            stream: self.stream.clone(),
            sodium_ctx: self.sodium_ctx.clone(),
        }
    }
}

impl<A: Send + 'static> StreamSink<A> {
    pub fn new(sodium_ctx: &SodiumCtx) -> StreamSink<A> {
        StreamSink {
            stream: Stream::new(sodium_ctx),
            sodium_ctx: sodium_ctx.clone(),
        }
    }

    pub fn new_with_coalescer<COALESCER: FnMut(&A, &A) -> A + Send + 'static>(
        sodium_ctx: &SodiumCtx,
        coalescer: COALESCER,
    ) -> StreamSink<A> {
        StreamSink {
            stream: Stream::_new_with_coalescer(sodium_ctx, coalescer),
            sodium_ctx: sodium_ctx.clone(),
        }
    }

    pub fn stream(&self) -> Stream<A> {
        self.stream.clone()
    }

    pub fn send(&self, a: A) {
        self.sodium_ctx.transaction(|| {
            let node = self.stream();
            {
                let mut changed = node.data().changed.write().unwrap();
                *changed = true;
            }
            self.sodium_ctx.with_data(|data: &mut SodiumCtxData| {
                data.changed_nodes.push(node.box_clone());
            });
            self.stream._send(a);
        });
    }

    pub fn downgrade(this: &Self) -> WeakStreamSink<A> {
        WeakStreamSink {
            stream: Stream::downgrade(&this.stream),
            sodium_ctx: this.sodium_ctx.clone(),
        }
    }
}

impl<A> WeakStreamSink<A> {
    pub fn upgrade(&self) -> Option<StreamSink<A>> {
        let sodium_ctx = self.sodium_ctx.clone();
        self.stream
            .upgrade()
            .map(|stream: Stream<A>| StreamSink { stream, sodium_ctx })
    }
}