selium_server/sink/
fanout_many.rs

1//! FanoutMany is based on tokio-stream::StreamMap
2
3use std::{
4    borrow::{Borrow, BorrowMut},
5    fmt::Debug,
6    hash::Hash,
7    pin::Pin,
8    slice::IterMut,
9    task::{Context, Poll},
10};
11
12use anyhow::Result;
13use futures::Sink;
14use log::error;
15use tokio::pin;
16
17#[must_use = "sinks do nothing unless you poll them"]
18pub struct FanoutMany<K, V> {
19    entries: Vec<(K, V)>,
20}
21
22impl<K, V> FanoutMany<K, V> {
23    pub fn new() -> Self {
24        Self { entries: vec![] }
25    }
26
27    pub fn with_capacity(capacity: usize) -> Self {
28        Self {
29            entries: Vec::with_capacity(capacity),
30        }
31    }
32
33    pub fn iter_mut(&mut self) -> IterMut<'_, (K, V)> {
34        self.entries.iter_mut()
35    }
36
37    pub fn insert(&mut self, k: K, sink: V) -> Option<V>
38    where
39        K: Hash + Eq,
40    {
41        let ret = self.remove(&k);
42        self.entries.push((k, sink));
43
44        ret
45    }
46
47    pub fn remove<Q: ?Sized>(&mut self, k: &Q) -> Option<V>
48    where
49        K: Borrow<Q>,
50        Q: Hash + Eq,
51    {
52        for i in 0..self.entries.len() {
53            if self.entries[i].0.borrow() == k {
54                return Some(self.entries.swap_remove(i).1);
55            }
56        }
57
58        None
59    }
60}
61
62impl<K, V> Default for FanoutMany<K, V> {
63    fn default() -> Self {
64        Self::new()
65    }
66}
67
68impl<K, V, Item> Sink<Item> for FanoutMany<K, V>
69where
70    K: Unpin,
71    V: Sink<Item> + Unpin,
72    V::Error: Debug,
73    Item: Clone + Unpin,
74{
75    type Error = V::Error;
76
77    fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
78        let mut idx = 0;
79        while idx < self.entries.len() {
80            let (_, sink) = self.entries[idx].borrow_mut();
81            pin!(sink);
82            match sink.poll_ready(cx) {
83                Poll::Pending => return Poll::Pending,
84                Poll::Ready(Err(_)) => {
85                    self.entries.swap_remove(idx);
86                }
87                Poll::Ready(Ok(())) => idx += 1,
88            }
89        }
90
91        Poll::Ready(Ok(()))
92    }
93
94    fn start_send(mut self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> {
95        let mut idx = 0;
96        let len = self.entries.len();
97        while idx < len {
98            let (_, sink) = self.entries[idx].borrow_mut();
99            pin!(sink);
100            if idx == len - 1 {
101                if let Err(e) = sink.start_send(item) {
102                    error!("Evicting broken sink from FanoutMany::start_send with err: {e:?}");
103                    self.entries.swap_remove(idx);
104                }
105                break;
106            };
107
108            if let Err(e) = sink.start_send(item.clone()) {
109                error!("Evicting broken sink from FanoutMany::start_send with err: {e:?}");
110                self.entries.swap_remove(idx);
111            } else {
112                idx += 1;
113            }
114        }
115
116        Ok(())
117    }
118
119    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
120        let mut idx = 0;
121        while idx < self.entries.len() {
122            let (_, sink) = self.entries[idx].borrow_mut();
123            pin!(sink);
124            match sink.poll_flush(cx) {
125                Poll::Pending => return Poll::Pending,
126                Poll::Ready(Err(_)) => {
127                    self.entries.swap_remove(idx);
128                }
129                Poll::Ready(Ok(())) => idx += 1,
130            }
131        }
132
133        Poll::Ready(Ok(()))
134    }
135
136    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
137        let mut idx = 0;
138        while idx < self.entries.len() {
139            let (_, sink) = self.entries[idx].borrow_mut();
140            pin!(sink);
141            match sink.poll_close(cx) {
142                Poll::Pending => return Poll::Pending,
143                Poll::Ready(Err(_)) => {
144                    self.entries.swap_remove(idx);
145                }
146                Poll::Ready(Ok(())) => idx += 1,
147            }
148        }
149
150        Poll::Ready(Ok(()))
151    }
152}
153
154impl<K, V> FromIterator<(K, V)> for FanoutMany<K, V>
155where
156    K: Hash + Eq,
157{
158    fn from_iter<T: IntoIterator<Item = (K, V)>>(iter: T) -> Self {
159        let iterator = iter.into_iter();
160        let (lower_bound, _) = iterator.size_hint();
161        let mut sink_map = Self::with_capacity(lower_bound);
162
163        for (key, value) in iterator {
164            sink_map.insert(key, value);
165        }
166
167        sink_map
168    }
169}
170
171impl<K, V> Extend<(K, V)> for FanoutMany<K, V> {
172    fn extend<T>(&mut self, iter: T)
173    where
174        T: IntoIterator<Item = (K, V)>,
175    {
176        self.entries.extend(iter);
177    }
178}