selium_server/sink/
fanout_many.rs1use std::{
4 borrow::{Borrow, BorrowMut},
5 fmt::Debug,
6 hash::Hash,
7 pin::Pin,
8 slice::IterMut,
9 task::{Context, Poll},
10};
11
12use anyhow::Result;
13use futures::Sink;
14use log::error;
15use tokio::pin;
16
17#[must_use = "sinks do nothing unless you poll them"]
18pub struct FanoutMany<K, V> {
19 entries: Vec<(K, V)>,
20}
21
22impl<K, V> FanoutMany<K, V> {
23 pub fn new() -> Self {
24 Self { entries: vec![] }
25 }
26
27 pub fn with_capacity(capacity: usize) -> Self {
28 Self {
29 entries: Vec::with_capacity(capacity),
30 }
31 }
32
33 pub fn iter_mut(&mut self) -> IterMut<'_, (K, V)> {
34 self.entries.iter_mut()
35 }
36
37 pub fn insert(&mut self, k: K, sink: V) -> Option<V>
38 where
39 K: Hash + Eq,
40 {
41 let ret = self.remove(&k);
42 self.entries.push((k, sink));
43
44 ret
45 }
46
47 pub fn remove<Q: ?Sized>(&mut self, k: &Q) -> Option<V>
48 where
49 K: Borrow<Q>,
50 Q: Hash + Eq,
51 {
52 for i in 0..self.entries.len() {
53 if self.entries[i].0.borrow() == k {
54 return Some(self.entries.swap_remove(i).1);
55 }
56 }
57
58 None
59 }
60}
61
62impl<K, V> Default for FanoutMany<K, V> {
63 fn default() -> Self {
64 Self::new()
65 }
66}
67
68impl<K, V, Item> Sink<Item> for FanoutMany<K, V>
69where
70 K: Unpin,
71 V: Sink<Item> + Unpin,
72 V::Error: Debug,
73 Item: Clone + Unpin,
74{
75 type Error = V::Error;
76
77 fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
78 let mut idx = 0;
79 while idx < self.entries.len() {
80 let (_, sink) = self.entries[idx].borrow_mut();
81 pin!(sink);
82 match sink.poll_ready(cx) {
83 Poll::Pending => return Poll::Pending,
84 Poll::Ready(Err(_)) => {
85 self.entries.swap_remove(idx);
86 }
87 Poll::Ready(Ok(())) => idx += 1,
88 }
89 }
90
91 Poll::Ready(Ok(()))
92 }
93
94 fn start_send(mut self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> {
95 let mut idx = 0;
96 let len = self.entries.len();
97 while idx < len {
98 let (_, sink) = self.entries[idx].borrow_mut();
99 pin!(sink);
100 if idx == len - 1 {
101 if let Err(e) = sink.start_send(item) {
102 error!("Evicting broken sink from FanoutMany::start_send with err: {e:?}");
103 self.entries.swap_remove(idx);
104 }
105 break;
106 };
107
108 if let Err(e) = sink.start_send(item.clone()) {
109 error!("Evicting broken sink from FanoutMany::start_send with err: {e:?}");
110 self.entries.swap_remove(idx);
111 } else {
112 idx += 1;
113 }
114 }
115
116 Ok(())
117 }
118
119 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
120 let mut idx = 0;
121 while idx < self.entries.len() {
122 let (_, sink) = self.entries[idx].borrow_mut();
123 pin!(sink);
124 match sink.poll_flush(cx) {
125 Poll::Pending => return Poll::Pending,
126 Poll::Ready(Err(_)) => {
127 self.entries.swap_remove(idx);
128 }
129 Poll::Ready(Ok(())) => idx += 1,
130 }
131 }
132
133 Poll::Ready(Ok(()))
134 }
135
136 fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
137 let mut idx = 0;
138 while idx < self.entries.len() {
139 let (_, sink) = self.entries[idx].borrow_mut();
140 pin!(sink);
141 match sink.poll_close(cx) {
142 Poll::Pending => return Poll::Pending,
143 Poll::Ready(Err(_)) => {
144 self.entries.swap_remove(idx);
145 }
146 Poll::Ready(Ok(())) => idx += 1,
147 }
148 }
149
150 Poll::Ready(Ok(()))
151 }
152}
153
154impl<K, V> FromIterator<(K, V)> for FanoutMany<K, V>
155where
156 K: Hash + Eq,
157{
158 fn from_iter<T: IntoIterator<Item = (K, V)>>(iter: T) -> Self {
159 let iterator = iter.into_iter();
160 let (lower_bound, _) = iterator.size_hint();
161 let mut sink_map = Self::with_capacity(lower_bound);
162
163 for (key, value) in iterator {
164 sink_map.insert(key, value);
165 }
166
167 sink_map
168 }
169}
170
171impl<K, V> Extend<(K, V)> for FanoutMany<K, V> {
172 fn extend<T>(&mut self, iter: T)
173 where
174 T: IntoIterator<Item = (K, V)>,
175 {
176 self.entries.extend(iter);
177 }
178}