sigwake 0.0.1

A thread-safe signal-based state management library that integrates with Rust's async programming model
Documentation
use std::mem::transmute;
use std::sync::Arc;
use std::task::{Context, Poll, Waker};
use std::time::Instant;
use std::{future::poll_fn, sync::Mutex};

use crate::time::{SpawnAtTask, spawn_at};
use crate::utils::Action;
use ::futures::{Stream, stream};
use derive_ex::Ex;

use crate::utils::bipartite_graph::*;
use crate::utils::inf_vec::*;
use crate::utils::usize_set::USizeSet;

#[derive(Debug)]
struct StateGraph {
    g: BipartiteGraph,
    wakers: InfVec<Option<Action>>,
    wake_at: Option<Instant>,
    source_set: USizeSet,
    source_remove: Arc<Mutex<Vec<XKey>>>,
}
impl StateGraph {
    pub fn new() -> Self {
        Self {
            g: BipartiteGraph::new(),
            wakers: InfVec::new(),
            wake_at: None,
            source_set: USizeSet::new(),
            source_remove: Arc::new(Mutex::new(Vec::new())),
        }
    }

    fn set_source(&mut self, x: XKey) {
        self.source_set.insert(x.0);
    }
    fn remove_target(&mut self, y: YKey) {
        self.g.remove_y(y);
        self.wakers[y.0] = None;
    }
    fn apply_source_remove(&mut self) {
        let source_removing = self.source_remove.clone();
        let mut xs = source_removing.lock().unwrap();
        for x in xs.drain(..) {
            self.g.remove_x(x);
        }
    }

    fn wake(&mut self, x: XKey) {
        for (y, _) in self.g.ys_from_x(x) {
            wake(&mut self.wakers, y);
        }
    }
    pub fn context(&mut self) -> &mut StateContext {
        self.apply_source_remove();
        self.wake_at = None;
        StateContext::new(self)
    }
    fn commit_target<A: Into<Action>>(
        &mut self,
        waker: impl Fn() -> A,
    ) -> (Option<YKey>, Option<SpawnAtTask>) {
        let y = self.g.insert_y(());
        for x in self.source_set.iter() {
            self.g.insert_edge(XKey(x), y, ());
        }
        self.wakers[y.0] = Some(waker().into());
        let task = self.wake_at.map(|at| spawn_at(waker(), at));
        (Some(y), task)
    }
}
fn wake(wakers: &mut InfVec<Option<Action>>, y: YKey) {
    if let Some(waker) = wakers[y.0].take() {
        waker.call();
    }
}

#[derive(Ex)]
#[derive_ex(Debug)]
pub struct StateKey {
    x: XKey,
    #[debug(ignore)]
    source_remove: Arc<Mutex<Vec<XKey>>>,
}
impl StateKey {
    pub fn new(cx: &mut StateContext) -> Self {
        let x = cx.0.g.insert_x(());
        Self {
            x,
            source_remove: cx.0.source_remove.clone(),
        }
    }
    pub fn watch(&self, cx: &mut StateContext) {
        cx.0.set_source(self.x);
    }
    pub fn notify(&self, cx: &mut StateContext) {
        cx.0.wake(self.x);
    }
}
impl Drop for StateKey {
    fn drop(&mut self) {
        self.source_remove.lock().unwrap().push(self.x);
    }
}

#[repr(transparent)]
pub struct StateContext(StateGraph);

impl StateContext {
    fn new(g: &mut StateGraph) -> &mut Self {
        unsafe { transmute(g) }
    }
    pub fn notify_at(&mut self, at: Instant) {
        if let Some(wake_at) = self.0.wake_at {
            if wake_at <= at {
                return;
            }
        }
        self.0.wake_at = Some(at);
    }
}

struct RawStateContainer<St> {
    g: StateGraph,
    st: St,
}

#[derive(Ex)]
#[derive_ex(Clone, bound())]
pub struct StateContainer<St>(Arc<Mutex<RawStateContainer<St>>>);

impl<St> StateContainer<St> {
    pub fn new(f: impl FnOnce(&mut StateContext) -> St) -> Self {
        let mut g = StateGraph::new();
        let st = f(g.context());
        Self(Arc::new(Mutex::new(RawStateContainer { g, st })))
    }

    pub async fn poll_fn<U>(&self, mut f: impl FnMut(&mut St, &mut StateContext) -> Poll<U>) -> U {
        let mut t = Target::new(self);
        poll_fn(|cx| t.poll_fn(&mut f, cx)).await
    }
    pub fn poll_fn_stream<U>(
        &self,
        mut f: impl FnMut(&mut St, &mut StateContext) -> Poll<Option<U>> + 'static,
    ) -> impl Stream<Item = U> + 'static
    where
        St: 'static,
    {
        let mut t = Target::new(self);
        stream::poll_fn(move |cx| t.poll_fn(&mut f, cx))
    }
    pub fn subscribe<U>(
        &self,
        mut f: impl FnMut(&mut St, &mut StateContext) -> U + 'static,
    ) -> impl Stream<Item = U> + 'static
    where
        St: Sync + Send + 'static,
    {
        struct WatchState {
            waker: Option<Waker>,
            age: usize,
            is_dirty: bool,
        }
        let ws = WatchState {
            waker: None,
            age: 0,
            is_dirty: true,
        };
        fn wake(ws: Arc<Mutex<WatchState>>, age: usize) {
            let mut ws = ws.lock().unwrap();
            if ws.age == age {
                ws.is_dirty = true;
                let waker = ws.waker.take();
                if let Some(waker) = waker {
                    drop(ws);
                    waker.wake();
                }
            }
        }

        let ws_arc = Arc::new(Mutex::new(ws));
        let mut t = Target::new(self);
        stream::poll_fn(move |cx| {
            let st = &mut *t.st.0.lock().unwrap();
            let mut ws = ws_arc.lock().unwrap();
            if ws.is_dirty {
                if let Some(key) = t.key {
                    st.g.remove_target(key);
                    t.key = None;
                }
                st.g.source_set.clear();
                t.sleep = None;
                ws.age = ws.age.wrapping_add(1);
                ws.is_dirty = false;
                let value = f(&mut st.st, st.g.context());
                (t.key, t.sleep) =
                    st.g.commit_target(|| Action::from_arc_fn_usize(ws_arc.clone(), wake, ws.age));
                Poll::Ready(Some(value))
            } else {
                ws.waker = Some(cx.waker().clone());
                Poll::Pending
            }
        })
    }

    pub fn update<T>(&self, f: impl FnOnce(&mut St, &mut StateContext) -> T) -> T {
        let ss = &mut *self.0.lock().unwrap();
        f(&mut ss.st, ss.g.context())
    }
}

struct Target<St> {
    key: Option<YKey>,
    st: StateContainer<St>,
    sleep: Option<SpawnAtTask>,
}
impl<St> Target<St> {
    pub fn new(st: &StateContainer<St>) -> Self {
        Self {
            key: None,
            st: st.clone(),
            sleep: None,
        }
    }
    fn poll_fn<T>(
        &mut self,
        mut f: impl FnMut(&mut St, &mut StateContext) -> Poll<T>,
        cx: &mut Context,
    ) -> Poll<T> {
        let st = &mut *self.st.0.lock().unwrap();
        if let Some(y) = self.key.take() {
            st.g.remove_target(y);
        }
        st.g.source_set.clear();
        self.sleep = None;
        match f(&mut st.st, st.g.context()) {
            Poll::Ready(value) => Poll::Ready(value),
            Poll::Pending => {
                (self.key, self.sleep) = st.g.commit_target(|| cx.waker());
                Poll::Pending
            }
        }
    }
}

impl<St> Drop for Target<St> {
    fn drop(&mut self) {
        if let Some(key) = self.key.take() {
            self.st.0.lock().unwrap().g.remove_target(key);
        }
    }
}