1use std::cmp::Ordering;
4
5use eyeball_im::{
6 VectorDiff, VectorSubscriber, VectorSubscriberBatchedStream, VectorSubscriberStream,
7};
8use futures_core::Stream;
9use imbl::Vector;
10
11use super::{
12 ops::{
13 VecVectorDiffFamily, VectorDiffContainerFamily, VectorDiffContainerOps, VectorDiffFamily,
14 },
15 EmptyCountStream, EmptyLimitStream, Filter, FilterMap, Head, Skip, Sort, SortBy, SortByKey,
16 Tail,
17};
18
19pub trait VectorDiffContainer:
22 VectorDiffContainerOps<Self::Element, Family = <Self as VectorDiffContainer>::Family>
23{
24 type Element: Clone + 'static;
27
28 #[doc(hidden)]
29 type Family: VectorDiffContainerFamily<Member<Self::Element> = Self>;
30}
31
32impl<T: Clone + 'static> VectorDiffContainer for VectorDiff<T> {
33 type Element = T;
34 type Family = VectorDiffFamily;
35}
36
37impl<T: Clone + 'static> VectorDiffContainer for Vec<VectorDiff<T>> {
38 type Element = T;
39 type Family = VecVectorDiffFamily;
40}
41
42pub trait VectorSubscriberExt<T> {
44 fn batched(self) -> BatchedVectorSubscriber<T>;
46}
47
48impl<T> VectorSubscriberExt<T> for VectorSubscriber<T> {
49 fn batched(self) -> BatchedVectorSubscriber<T> {
50 BatchedVectorSubscriber { inner: self }
51 }
52}
53
54#[derive(Debug)]
57pub struct BatchedVectorSubscriber<T> {
58 inner: VectorSubscriber<T>,
59}
60
61pub trait VectorObserver<T>: Sized {
66 #[doc(hidden)]
67 type Stream: Stream;
68
69 #[doc(hidden)]
70 fn into_parts(self) -> (Vector<T>, Self::Stream);
71}
72
73impl<T: Clone + 'static> VectorObserver<T> for VectorSubscriber<T> {
74 type Stream = VectorSubscriberStream<T>;
75
76 fn into_parts(self) -> (Vector<T>, Self::Stream) {
77 self.into_values_and_stream()
78 }
79}
80
81impl<T: Clone + 'static> VectorObserver<T> for BatchedVectorSubscriber<T> {
82 type Stream = VectorSubscriberBatchedStream<T>;
83
84 fn into_parts(self) -> (Vector<T>, Self::Stream) {
85 self.inner.into_values_and_batched_stream()
86 }
87}
88
89impl<T, S> VectorObserver<T> for (Vector<T>, S)
90where
91 S: Stream,
92 S::Item: VectorDiffContainer,
93{
94 type Stream = S;
95
96 fn into_parts(self) -> (Vector<T>, Self::Stream) {
97 self
98 }
99}
100
101pub trait VectorObserverExt<T>: VectorObserver<T>
105where
106 T: Clone + 'static,
107 <Self::Stream as Stream>::Item: VectorDiffContainer<Element = T>,
108{
109 fn filter<F>(self, f: F) -> (Vector<T>, Filter<Self::Stream, F>)
111 where
112 F: Fn(&T) -> bool,
113 {
114 let (items, stream) = self.into_parts();
115 Filter::new(items, stream, f)
116 }
117
118 fn filter_map<U, F>(self, f: F) -> (Vector<U>, FilterMap<Self::Stream, F>)
120 where
121 U: Clone,
122 F: Fn(T) -> Option<U>,
123 {
124 let (items, stream) = self.into_parts();
125 FilterMap::new(items, stream, f)
126 }
127
128 fn head(self, limit: usize) -> (Vector<T>, Head<Self::Stream, EmptyLimitStream>) {
132 let (items, stream) = self.into_parts();
133 Head::new(items, stream, limit)
134 }
135
136 fn dynamic_head<L>(self, limit_stream: L) -> Head<Self::Stream, L>
141 where
142 L: Stream<Item = usize>,
143 {
144 let (items, stream) = self.into_parts();
145 Head::dynamic(items, stream, limit_stream)
146 }
147
148 fn dynamic_head_with_initial_value<L>(
153 self,
154 initial_limit: usize,
155 limit_stream: L,
156 ) -> (Vector<T>, Head<Self::Stream, L>)
157 where
158 L: Stream<Item = usize>,
159 {
160 let (items, stream) = self.into_parts();
161 Head::dynamic_with_initial_limit(items, stream, initial_limit, limit_stream)
162 }
163
164 fn tail(self, limit: usize) -> (Vector<T>, Tail<Self::Stream, EmptyLimitStream>) {
168 let (items, stream) = self.into_parts();
169 Tail::new(items, stream, limit)
170 }
171
172 fn dynamic_tail<L>(self, limit_stream: L) -> Tail<Self::Stream, L>
177 where
178 L: Stream<Item = usize>,
179 {
180 let (items, stream) = self.into_parts();
181 Tail::dynamic(items, stream, limit_stream)
182 }
183
184 fn dynamic_tail_with_initial_value<L>(
189 self,
190 initial_limit: usize,
191 limit_stream: L,
192 ) -> (Vector<T>, Tail<Self::Stream, L>)
193 where
194 L: Stream<Item = usize>,
195 {
196 let (items, stream) = self.into_parts();
197 Tail::dynamic_with_initial_limit(items, stream, initial_limit, limit_stream)
198 }
199
200 fn skip(self, count: usize) -> (Vector<T>, Skip<Self::Stream, EmptyCountStream>) {
204 let (items, stream) = self.into_parts();
205 Skip::new(items, stream, count)
206 }
207
208 fn dynamic_skip<C>(self, count_stream: C) -> Skip<Self::Stream, C>
213 where
214 C: Stream<Item = usize>,
215 {
216 let (items, stream) = self.into_parts();
217 Skip::dynamic(items, stream, count_stream)
218 }
219
220 fn dynamic_skip_with_initial_count<C>(
225 self,
226 initial_count: usize,
227 count_stream: C,
228 ) -> (Vector<T>, Skip<Self::Stream, C>)
229 where
230 C: Stream<Item = usize>,
231 {
232 let (items, stream) = self.into_parts();
233 Skip::dynamic_with_initial_count(items, stream, initial_count, count_stream)
234 }
235
236 fn sort(self) -> (Vector<T>, Sort<Self::Stream>)
240 where
241 T: Ord,
242 {
243 let (items, stream) = self.into_parts();
244 Sort::new(items, stream)
245 }
246
247 fn sort_by<F>(self, compare: F) -> (Vector<T>, SortBy<Self::Stream, F>)
251 where
252 F: Fn(&T, &T) -> Ordering,
253 {
254 let (items, stream) = self.into_parts();
255 SortBy::new(items, stream, compare)
256 }
257
258 fn sort_by_key<F, K>(self, key_fn: F) -> (Vector<T>, SortByKey<Self::Stream, F>)
262 where
263 F: Fn(&T) -> K,
264 K: Ord,
265 {
266 let (items, stream) = self.into_parts();
267 SortByKey::new(items, stream, key_fn)
268 }
269}
270
271impl<T, O> VectorObserverExt<T> for O
272where
273 T: Clone + 'static,
274 O: VectorObserver<T>,
275 <Self::Stream as Stream>::Item: VectorDiffContainer<Element = T>,
276{
277}