ztopic 0.3.18

A topic-based pub/sub library for Rust
Documentation
use std::{cmp::Ordering, hash::Hash, marker::Send, sync::Arc};

use futures::{stream::BoxStream, Future, StreamExt};
use tokio::{sync::Notify, task::JoinSet};

use crate::{storages::StorageManager, FlowGroup, Storage};

pub struct Flow<K, V, S, F, Fut>
where
    K: Clone + Default + Hash + Eq + Ord,
    S: Storage<V>,
    F: Fn(FlowGroup<K, V, S>) -> Fut + Send + 'static,
    Fut: Future<Output = ()> + Send + 'static,
{
    max_load: usize,
    storage: StorageManager<K, V, S>,
    new_group: F,
    join_sets: Vec<JoinSet<()>>,
    refreshed: Arc<Notify>,
}

impl<K, V, S, F, Fut> Flow<K, V, S, F, Fut>
where
    K: Clone + Default + Hash + Eq + Ord + Send + 'static,
    V: Send + 'static,
    S: Storage<V> + Send + Sync + 'static,
    F: Fn(FlowGroup<K, V, S>) -> Fut + Send + 'static,
    Fut: Future<Output = ()> + Send + 'static,
{
    pub fn new(max_load: usize, storage: StorageManager<K, V, S>, new_group: F) -> Self {
        Self {
            max_load,
            storage,
            new_group,
            join_sets: vec![],
            refreshed: Arc::new(Notify::new()),
        }
    }

    pub fn to_stream(mut self) -> BoxStream<'static, ()> {
        async_stream::stream! {
            loop {
                self.refresh();
                yield;
                self.storage.registry_changed().await;
            }
        }
        .boxed()
    }

    fn refresh(&mut self) {
        let keys = self.storage.registry_keys();
        let group_size = keys.len() / self.max_load + 1;
        loop {
            match self.join_sets.len().cmp(&group_size) {
                Ordering::Less => {
                    let mut join_set = JoinSet::new();
                    join_set.spawn((self.new_group)(FlowGroup::new(
                        self.join_sets.len(),
                        self.max_load,
                        self.refreshed.clone(),
                        self.storage.clone(),
                    )));
                    self.join_sets.push(join_set);
                }
                Ordering::Greater => {
                    self.join_sets.pop();
                }
                Ordering::Equal => break,
            }
        }
        self.refreshed.notify_waiters();
    }
}