ztopic 0.3.18

A topic-based pub/sub library for Rust
Documentation
use std::{
    hash::{DefaultHasher, Hash, Hasher},
    ops::Deref,
    sync::Arc,
};

use tokio::sync::Notify;

use crate::{storages::StorageManager, Storage};

pub struct FlowGroup<K, V, S>
where
    K: Clone + Default + Hash + Eq + Ord,
    S: Storage<V>,
{
    index: usize,
    max_load: usize,
    refreshed: Arc<Notify>,
    manager: StorageManager<K, V, S>,
}

impl<K, V, S> Deref for FlowGroup<K, V, S>
where
    K: Clone + Default + Hash + Eq + Ord,
    S: Storage<V>,
{
    type Target = StorageManager<K, V, S>;

    fn deref(&self) -> &Self::Target {
        &self.manager
    }
}

impl<K, V, S> FlowGroup<K, V, S>
where
    K: Clone + Default + Hash + Eq + Ord,
    S: Storage<V>,
{
    pub fn new(index: usize, max_load: usize, refreshed: Arc<Notify>, manager: StorageManager<K, V, S>) -> Self {
        Self {
            index,
            max_load,
            refreshed,
            manager,
        }
    }

    pub async fn refreshed(&self) {
        self.refreshed.notified().await
    }

    pub fn difference(&self, current: &Vec<K>) -> (Vec<K>, Vec<K>) {
        let keys = self.manager.registry_keys();

        let group_size = keys.len() / self.max_load + 1;

        let group_keys = keys
            .iter()
            .filter_map(|key| {
                let mut hasher = DefaultHasher::new();
                key.hash(&mut hasher);
                let hash = hasher.finish() as usize;
                if hash % group_size == self.index {
                    Some(key.clone())
                } else {
                    None
                }
            })
            .collect::<Vec<_>>();

        let mut removing = vec![];
        let mut adding = vec![];

        for c in current {
            if !group_keys.contains(c) {
                removing.push(c.clone());
            }
        }
        for c in &group_keys {
            if !current.contains(c) {
                adding.push(c.clone());
            }
        }

        (removing, adding)
    }
}