ruchei 0.1.2

Utilities for working with many streams
Documentation
//! [`ruchei::group_concurrent`] and [`ruchei::timeout_unused`]

use std::marker::PhantomData;

use async_net::TcpListener;
use futures_channel::mpsc::{UnboundedReceiver, UnboundedSender, unbounded};
use futures_util::{StreamExt, future::ready};
use ruchei::{
    concurrent::ConcurrentExt,
    connection_item::ConnectionItemExt,
    echo::buffered::EchoBuffered,
    liveness::{
        group_concurrent::{Group, GroupConcurrent},
        timeout_unused::TimeoutUnused,
    },
    multicast::replay::MulticastReplay,
    poll_on_wake::PollOnWakeExt,
};

struct ChannelGroup<Item>(PhantomData<Item>);

impl<Item> Group for ChannelGroup<Item> {
    type Item = Item;

    type Sender = UnboundedSender<Item>;

    type Receiver = UnboundedReceiver<Item>;

    fn send(&mut self, sender: &mut Self::Sender, item: Self::Item) {
        let _ = sender.unbounded_send(item);
    }

    fn pair(&mut self) -> (Self::Sender, Self::Receiver) {
        unbounded()
    }
}

#[async_std::main]
async fn main() {
    TcpListener::bind("127.0.0.1:8080")
        .await
        .unwrap()
        .incoming()
        .poll_on_wake()
        .filter_map(|r| async { r.ok() })
        .map(|stream| async move {
            let mut stream = async_tungstenite::accept_async(stream).await.ok()?;
            let group = stream.next().await?.ok()?;
            Some((group, stream))
        })
        .concurrent()
        .filter_map(|o| async move { o.map(|(group, s)| (group.into_data(), s.poll_on_wake())) })
        .group_concurrent(ChannelGroup(PhantomData))
        .for_each_concurrent(None, |(receiver, guard)| async move {
            let _guard = guard;
            receiver
                .timeout_unused(|| ready(()))
                .multicast_replay()
                .connection_item_ignore()
                .echo_buffered()
                .await
                .unwrap();
        })
        .await;
}