1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
use std::{borrow::Borrow, sync::Arc};

use arc_swap::{ArcSwap, Guard};
use im::HashMap;
use std::hash::Hash;
use tokio::sync::{mpsc, oneshot};

enum MpmcMapMutationOp<
    K: Send + Sync + Hash + Clone + Eq + 'static,
    V: Send + Clone + Sync + 'static,
> {
    Insert(K, V),
    Remove(K),
}

enum MpmcMapMutationResponse<V: Send + Clone + Sync + 'static> {
    None,
    Value(V),
}

struct MpmcMapMutation<
    K: Send + Sync + Hash + Clone + Eq + 'static,
    V: Send + Clone + Sync + 'static,
> {
    op: MpmcMapMutationOp<K, V>,
    response: oneshot::Sender<MpmcMapMutationResponse<V>>,
}

#[derive(Debug)]
pub struct MpmcMap<K: Send + Sync + Hash + Clone + Eq + 'static, V: Send + Clone + Sync + 'static> {
    inner: Arc<ArcSwap<HashMap<K, V>>>,
    sender: mpsc::Sender<MpmcMapMutation<K, V>>,
}

impl<K: Send + Sync + Hash + Clone + Eq + 'static, V: Send + Clone + Sync + 'static> Default
    for MpmcMap<K, V>
{
    fn default() -> Self {
        Self::new()
    }
}

impl<K: Send + Sync + Hash + Clone + Eq + 'static, V: Send + Clone + Sync + 'static> Clone
    for MpmcMap<K, V>
{
    fn clone(&self) -> Self {
        Self {
            inner: self.inner.clone(),
            sender: self.sender.clone(),
        }
    }
}

impl<K: Send + Sync + Hash + Clone + Eq + 'static, V: Send + Clone + Sync + 'static> MpmcMap<K, V> {
    pub fn new() -> Self {
        let (sender, receiver) = mpsc::channel(512);

        let new_self = MpmcMap {
            inner: Arc::new(ArcSwap::new(Arc::new(HashMap::new()))),
            sender,
        };
        tokio::spawn(Self::updater(new_self.inner.clone(), receiver));
        new_self
    }

    async fn updater(
        map: Arc<ArcSwap<HashMap<K, V>>>,
        mut receiver: mpsc::Receiver<MpmcMapMutation<K, V>>,
    ) {
        while let Some(mutation) = receiver.recv().await {
            match mutation.op {
                MpmcMapMutationOp::Insert(key, value) => {
                    let new_map = map.load().update(key, value);
                    map.store(Arc::new(new_map));
                    mutation.response.send(MpmcMapMutationResponse::None).ok();
                }
                MpmcMapMutationOp::Remove(key) => {
                    if let Some((old_value, new_map)) = map.load().extract(&key) {
                        map.store(Arc::new(new_map));
                        mutation
                            .response
                            .send(MpmcMapMutationResponse::Value(old_value))
                            .ok();
                    } else {
                        mutation.response.send(MpmcMapMutationResponse::None).ok();
                    }
                }
            }
        }
    }

    pub async fn insert(&self, key: K, value: V) {
        let (response, receiver) = oneshot::channel::<MpmcMapMutationResponse<V>>();
        self.sender
            .send(MpmcMapMutation {
                op: MpmcMapMutationOp::Insert(key, value),
                response,
            })
            .await
            .ok()
            .expect("failed to send insert mutation");
        receiver
            .await
            .expect("failed to receive mpmc map mutation response");
    }

    pub async fn remove(&self, key: K) {
        let (response, receiver) = oneshot::channel::<MpmcMapMutationResponse<V>>();
        self.sender
            .send(MpmcMapMutation {
                op: MpmcMapMutationOp::Remove(key),
                response,
            })
            .await
            .ok()
            .expect("failed to send insert mutation");
        receiver
            .await
            .expect("failed to receive mpmc map mutation response");
    }

    pub fn get<BK: ?Sized>(&self, key: &BK) -> Option<V>
    where
        BK: Hash + Eq,
        K: Borrow<BK>,
    {
        self.inner.load().get(key).cloned()
    }

    pub fn contains_key<BK: ?Sized>(&self, key: &BK) -> bool
    where
        BK: Hash + Eq,
        K: Borrow<BK>,
    {
        self.inner.load().contains_key(key)
    }

    pub fn inner_full(&self) -> Arc<HashMap<K, V>> {
        self.inner.load_full()
    }

    pub fn inner(&self) -> Guard<Arc<HashMap<K, V>>> {
        self.inner.load()
    }

    pub fn reset(&self, value: Arc<HashMap<K, V>>) {
        self.inner.store(value);
    }

    pub fn len(&self) -> usize {
        self.inner().len()
    }

    pub fn is_empty(&self) -> bool {
        self.inner().is_empty()
    }
}