use std::ops::{Deref, DerefMut};
use std::rc::Rc;
use std::sync::mpsc::{channel, Receiver, RecvError, SendError, Sender, TryRecvError};
use std::sync::{Mutex, PoisonError};
use std::thread::JoinHandle;
pub struct Store<A, S>
where
A: Send + Clone + Eq + PartialEq + 'static,
S: Send + Clone + 'static,
{
sender: Sender<A>,
receiver: Receiver<A>,
state: &'static Mutex<S>,
connection: std::cell::Cell<Option<Sender<A>>>,
callbacks: Mutex<Vec<Box<dyn Fn(A, &S) + 'static>>>,
}
impl<A, S> Store<A, S>
where
A: Send + Clone + Eq + PartialEq + 'static,
S: Send + Clone + 'static,
{
pub fn new(state: &'static Mutex<S>) -> Self {
let (sender, receiver) = channel();
Store {
sender,
receiver,
state,
connection: std::cell::Cell::new(None),
callbacks: Mutex::new(Vec::new()),
}
}
pub fn send(&self, action: A) {
if let Some(sender) = self.connection.take() {
sender.send(action).expect("Could not send action.");
self.connection.set(Some(sender));
}
}
pub fn receive(&self, callback: impl Fn(A, &S) + 'static) {
match self.callbacks.lock() {
Ok(mut cb) => {
cb.deref_mut().push(Box::new(callback));
}
Err(_) => {}
}
}
pub fn ui_send(&self, action: A) {
match self.callbacks.lock() {
Ok(callbacks_lock) => match self.state.lock() {
Ok(state) => {
for callback in callbacks_lock.deref() {
callback(action.clone(), state.deref());
}
}
Err(_) => {}
},
Err(_) => {}
}
}
pub fn ui_try_receive(&self) -> Result<A, TryRecvError> {
self.receiver.try_recv()
}
fn connect(&self) -> (Sender<A>, Receiver<A>) {
let (s, r) = channel();
self.connection.set(Some(s.clone()));
(self.sender.clone(), r)
}
}
#[derive(Debug)]
pub struct StoreError {
message: String,
cause: String,
}
impl StoreError {
pub fn message(&self) -> &str {
&self.message
}
pub fn cause(&self) -> &str {
&self.cause
}
}
impl From<RecvError> for StoreError {
fn from(re: RecvError) -> Self {
StoreError {
message: "Error in gstores channel communication.".to_string(),
cause: format!("RecvError: {}", re),
}
}
}
impl<T> From<SendError<T>> for StoreError {
fn from(re: SendError<T>) -> Self {
StoreError {
message: "Error in gstores channel communication.".to_string(),
cause: format!("SendError: {}", re),
}
}
}
impl<T> From<PoisonError<T>> for StoreError {
fn from(e: PoisonError<T>) -> Self {
StoreError {
message: "".to_string(),
cause: format!("{:?}", e),
}
}
}
pub fn combine_reducers<A, S>(
store: Rc<Store<A, S>>,
state: &'static Mutex<S>,
reducer: impl Fn(A, S) -> Option<S> + Send + 'static,
) -> JoinHandle<Result<(), StoreError>>
where
A: Send + Clone + Eq + PartialEq + 'static,
S: Send + Clone + 'static,
{
let (sender, receiver) = store.connect();
return std::thread::spawn(move || {
loop {
let action = receiver.recv()?;
let mut state = state.lock()?;
let next_state = reducer(action.clone(), state.clone());
if next_state.is_none() {
break;
}
*state = next_state.unwrap();
sender.send(action.clone())?;
}
return Ok(());
});
}
#[cfg(test)]
#[macro_use]
extern crate lazy_static;
#[cfg(test)]
mod tests {
use std::rc::Rc;
use std::sync::Mutex;
use crate::{combine_reducers, Store};
#[derive(Debug, Clone)]
struct State {
count: u32,
}
#[derive(Debug, Clone, Eq, PartialEq)]
enum Action {
Increment,
Decrement,
Shutdown,
}
lazy_static! {
static ref STATE: Mutex<State> = Mutex::new(State { count: 0 });
}
#[test]
fn test_counter() {
let store: Rc<Store<Action, State>> = Rc::new(Store::new(&STATE));
let join = combine_reducers(store.clone(), &STATE, |action, state| match action {
Action::Increment => Some(State {
count: state.count + 1,
}),
Action::Decrement => Some(State {
count: state.count - 1,
}),
Action::Shutdown => None,
});
store.send(Action::Increment);
store.send(Action::Increment);
store.send(Action::Increment);
store.send(Action::Decrement);
store.send(Action::Shutdown);
join.join().unwrap().expect("Error during Store handling.");
assert_eq!(STATE.lock().unwrap().count, 2);
}
}