use std::cmp::PartialEq;
use std::collections::LinkedList;
use std::marker::PhantomData;
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::{SystemTime, UNIX_EPOCH};
#[cfg(feature = "for_futures")]
use futures::executor::ThreadPool;
#[cfg(feature = "for_futures")]
use futures::stream::Stream;
#[cfg(feature = "for_futures")]
use std::mem;
#[cfg(feature = "for_futures")]
use std::pin::Pin;
#[cfg(feature = "for_futures")]
use std::sync::{
atomic::{AtomicBool, Ordering},
Once,
};
#[cfg(feature = "for_futures")]
use std::task::{Context, Poll, Waker};
#[cfg(feature = "for_futures")]
#[derive(Clone)]
pub struct SharedThreadPoolReader {
pub inner: Arc<Mutex<ThreadPool>>,
}
#[cfg(feature = "for_futures")]
pub fn shared_thread_pool() -> SharedThreadPoolReader {
static mut SINGLETON: *const SharedThreadPoolReader = 0 as *const SharedThreadPoolReader;
static ONCE: Once = Once::new();
unsafe {
ONCE.call_once(|| {
let singleton = SharedThreadPoolReader {
inner: Arc::new(Mutex::new(
ThreadPool::new().expect("Unable to create threadpool"),
)),
};
SINGLETON = mem::transmute(Box::new(singleton));
});
(*SINGLETON).clone()
}
}
#[macro_export]
macro_rules! map_insert {
($target:ident, {
$($key:ident : $value:expr,)*
}) => {
$(
$target.insert(stringify!($key), $value);
)*
};
($target:ident, [
$($key:ident : $value:expr,)*
]) => {
$(
$target.insert(stringify!($key), $value);
)*
};
($target:ident, {
$($key:expr => $value:expr,)*
}) => {
$(
$target.insert($key, $value);
)*
};
($target:ident, [
$($key:expr => $value:expr,)*
]) => {
$(
$target.insert($key, $value);
)*
};
($target:ident, [
$($key:expr, $value:expr,)*
]) => {
$(
$target.insert($key, $value);
)*
};
}
pub fn get_mut<'a, T>(v: &'a mut Vec<T>, index: usize) -> Option<&'a mut T> {
for (i, elem) in v.into_iter().enumerate() {
if index == i {
return Some(elem);
}
}
None
}
#[derive(Debug, Clone)]
pub struct LinkedListAsync<T> {
inner: Arc<Mutex<LinkedList<T>>>,
#[cfg(feature = "for_futures")]
alive: Arc<Mutex<AtomicBool>>,
#[cfg(feature = "for_futures")]
waker: Arc<Mutex<Option<Waker>>>,
_t: PhantomData<T>,
}
impl<T> LinkedListAsync<T> {
pub fn new() -> Self {
Self {
inner: Arc::new(Mutex::new(LinkedList::new())),
#[cfg(feature = "for_futures")]
alive: Arc::new(Mutex::new(AtomicBool::new(true))),
#[cfg(feature = "for_futures")]
waker: Arc::new(Mutex::new(None)),
_t: PhantomData,
}
}
pub fn push_back(&self, input: T) {
#[cfg(feature = "for_futures")]
{
{
let alive = self.alive.lock().unwrap();
if alive.load(Ordering::SeqCst) {
self.inner.lock().unwrap().push_back(input);
}
self.wake();
}
return;
}
#[cfg(not(feature = "for_futures"))]
{
self.inner.lock().unwrap().push_back(input);
}
}
pub fn pop_front(&self) -> Option<T> {
self.inner.lock().unwrap().pop_front()
}
#[cfg(feature = "for_futures")]
#[inline]
fn wake(&self) {
let mut waker = self.waker.lock().unwrap();
if let Some(waker) = waker.take() {
waker.wake();
}
}
#[cfg(feature = "for_futures")]
fn open_stream(&mut self) {
self.alive.lock().unwrap().store(true, Ordering::SeqCst);
}
#[cfg(feature = "for_futures")]
pub fn close_stream(&mut self) {
{
let alive = self.alive.lock().unwrap();
alive.store(false, Ordering::SeqCst);
self.wake()
}
}
}
#[cfg(feature = "for_futures")]
impl<T> Stream for LinkedListAsync<T>
where
T: 'static + Send + Unpin,
{
type Item = T;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let alive = self.alive.lock().unwrap();
let mut waker = self.waker.lock().unwrap();
let picked: Option<T>;
picked = self.pop_front();
if picked.is_some() {
return Poll::Ready(picked);
}
if alive.load(Ordering::SeqCst) {
{
waker.replace(cx.waker().clone());
};
return Poll::Pending;
}
return Poll::Ready(None);
}
fn size_hint(&self) -> (usize, Option<usize>) {
let alive = self.alive.lock().unwrap();
if alive.load(Ordering::SeqCst) {
return (0, Some(0));
}
return (0, None);
}
}
impl<T> Default for LinkedListAsync<Arc<T>> {
fn default() -> Self {
Self::new()
}
}
pub trait Observable<X, T: Subscription<X>> {
fn add_observer(&mut self, observer: Arc<T>);
fn delete_observer(&mut self, observer: Arc<T>);
fn notify_observers(&mut self, x: Arc<X>);
}
pub trait Subscription<X>: Send + Sync + 'static + UniqueId<String> {
fn on_next(&self, x: Arc<X>);
}
pub trait UniqueId<T> {
fn get_id(&self) -> T;
}
pub fn generate_id() -> String {
let since_the_epoch = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("Time went backwards");
format!("{:?}{:?}", thread::current().id(), since_the_epoch)
}
pub struct SubscriptionFunc<T> {
id: String,
pub receiver: RawReceiver<T>,
#[cfg(feature = "for_futures")]
cached: LinkedListAsync<Arc<T>>,
}
impl<T> SubscriptionFunc<T> {
fn generate_id() -> String {
generate_id()
}
pub fn new(func: impl FnMut(Arc<T>) + Send + Sync + 'static) -> SubscriptionFunc<T> {
SubscriptionFunc {
id: Self::generate_id(),
receiver: RawReceiver::new(func),
#[cfg(feature = "for_futures")]
cached: LinkedListAsync::new(),
}
}
}
impl<T> Clone for SubscriptionFunc<T> {
fn clone(&self) -> Self {
SubscriptionFunc {
id: self.id.clone(),
receiver: RawReceiver {
func: self.receiver.func.clone(),
_t: PhantomData,
},
#[cfg(feature = "for_futures")]
cached: self.cached.clone(),
}
}
}
#[cfg(feature = "for_futures")]
impl<T> SubscriptionFunc<T> {
pub fn close_stream(&mut self) {
self.cached.close_stream();
}
}
#[cfg(feature = "for_futures")]
impl<T> SubscriptionFunc<T>
where
T: Unpin,
{
fn open_stream(&mut self) {
self.cached.open_stream();
}
pub fn as_stream(&mut self) -> LinkedListAsync<Arc<T>> {
self.open_stream();
self.cached.clone()
}
}
impl<T> UniqueId<String> for SubscriptionFunc<T> {
fn get_id(&self) -> String {
self.id.clone()
}
}
impl<T> PartialEq for SubscriptionFunc<T> {
fn eq(&self, other: &SubscriptionFunc<T>) -> bool {
self.id == other.id
}
}
impl<T: Send + Sync + 'static> Subscription<T> for SubscriptionFunc<T> {
fn on_next(&self, x: Arc<T>) {
self.receiver.invoke(x.clone());
#[cfg(feature = "for_futures")]
{
self.cached.push_back(x);
}
}
}
#[derive(Clone)]
pub struct RawReceiver<T> {
func: Arc<Mutex<dyn FnMut(Arc<T>) + Send + Sync + 'static>>,
_t: PhantomData<T>,
}
impl<T> RawReceiver<T> {
pub fn new(func: impl FnMut(Arc<T>) + Send + Sync + 'static) -> RawReceiver<T> {
RawReceiver {
func: Arc::new(Mutex::new(func)),
_t: PhantomData,
}
}
pub fn invoke(&self, x: Arc<T>) {
(self.func.lock().unwrap())(x);
}
}
#[derive(Clone)]
pub struct RawFunc {
func: Arc<Mutex<dyn FnMut() + Send + Sync + 'static>>,
}
impl RawFunc {
pub fn new<T>(func: T) -> RawFunc
where
T: FnMut() + Send + Sync + 'static,
{
RawFunc {
func: Arc::new(Mutex::new(func)),
}
}
pub fn invoke(&self) {
(self.func.lock().unwrap())();
}
}
#[test]
fn test_map_insert() {
use std::collections::HashMap;
let expected_by_ident = &mut HashMap::new();
expected_by_ident.insert("a", "2");
expected_by_ident.insert("b", "4");
expected_by_ident.insert("c", "6");
let actual = &mut HashMap::new();
map_insert!(actual, [
a : "2",
b : "4",
c : "6",
]);
assert_eq!(expected_by_ident, actual);
let actual = &mut HashMap::new();
map_insert!(actual, {
a : "2",
b : "4",
c : "6",
});
assert_eq!(expected_by_ident, actual);
let expected = &mut HashMap::new();
expected.insert("1", "2");
expected.insert("3", "4");
expected.insert("5", "6");
let actual = &mut HashMap::new();
map_insert!(actual, [
"1" => "2",
"3" => "4",
"5" => "6",
]);
assert_eq!(expected, actual);
let actual = &mut HashMap::new();
map_insert!(actual, {
"1" => "2",
"3" => "4",
"5" => "6",
});
assert_eq!(expected, actual);
let actual = &mut HashMap::new();
map_insert!(actual, ["1", "2", "3", "4", "5", "6",]);
assert_eq!(expected, actual);
}