healslut 0.1.0

Activates intimate hardware in response to screen contents.
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.

use async_std::sync::Mutex;

use futures::channel::oneshot::{channel, Sender};
use futures::stream::{Stream, StreamExt as _};

use std::collections::HashMap;
use std::pin::Pin;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};

struct Helper<T> {
    id: usize,
    inner: Arc<Mutex<Inner>>,
    stream: T,

impl<T> Drop for Helper<T> {
    fn drop(&mut self) {
        let inner = self.inner.clone();
        let id = self.id;

        async_std::task::spawn(async move {
            let mut locked = inner.lock().await;

impl<T> Stream for Helper<T>
    T: Stream,
    type Item = T::Item;

    fn poll_next(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
    ) -> Poll<Option<Self::Item>> {
        let projected = unsafe { self.map_unchecked_mut(|h| &mut h.stream) };


    fn size_hint(&self) -> (usize, Option<usize>) {

pub struct Inner {
    count: usize,
    senders: HashMap<usize, Sender<()>>,

#[derive(Debug, Clone)]
pub struct Exit {
    exited: Arc<AtomicBool>,
    inner: Arc<Mutex<Inner>>,

impl Default for Exit {
    fn default() -> Self {

impl Exit {
    pub fn new() -> Self {
        Self {
            exited: Default::default(),
            inner: Arc::new(Mutex::new(Inner {
                count: 0,
                senders: HashMap::new(),

    pub async fn exit(&self) {
        if self.exited.load(Ordering::SeqCst) {

        let mut inner = self.inner.lock().await;

        if self.exited.fetch_or(true, Ordering::SeqCst) {

        for (_, sender) in inner.senders.drain() {

        // TODO: Unpark threads that used the interval method.

    pub async fn from<S>(
        stream: S,
    ) -> impl Stream<Item = S::Item> + Unpin
        S: Stream + Unpin,
        let id;
        let receiver;

            let mut inner = self.inner.lock().await;

            if self.exited.load(Ordering::SeqCst) {
                todo!("exit already triggered");

            let (sender, r) = channel();
            receiver = r;

            id = inner.count;
            inner.count += 1;

            inner.senders.insert(id, sender);

        let out = stream.take_until(receiver);

        Helper {
            stream: out,
            inner: self.inner.clone(),

    pub fn interval(&self, duration: Duration) -> Interval {
        Interval {
            exited: self.exited.clone(),
            next: Instant::now() + duration,
            no_send: Default::default(),

pub struct Interval {
    next: Instant,
    duration: Duration,
    exited: Arc<AtomicBool>,

    no_send: std::marker::PhantomData<*const ()>,

impl Drop for Interval {
    fn drop(&mut self) {
        // TODO: Remove this interval's thread from inner's hashmap.

impl Interval {
    pub fn next(&mut self) -> Option<()> {
        loop {
            let now = Instant::now();
            if self.exited.load(Ordering::SeqCst) {
                return None;
            } else if now >= self.next {
                self.next = now + self.duration;
                return Some(());
            } else {
                std::thread::park_timeout(self.next - now);