eyeball_im/
vector.rs

1use std::{fmt, ops};
2
3use imbl::Vector;
4use tokio::sync::broadcast::{self, Sender};
5
6mod entry;
7mod subscriber;
8mod transaction;
9
10pub use self::{
11    entry::{ObservableVectorEntries, ObservableVectorEntry},
12    subscriber::{VectorSubscriber, VectorSubscriberBatchedStream, VectorSubscriberStream},
13    transaction::{
14        ObservableVectorTransaction, ObservableVectorTransactionEntries,
15        ObservableVectorTransactionEntry,
16    },
17};
18
19/// An ordered list of elements that broadcasts any changes made to it.
20pub struct ObservableVector<T> {
21    values: Vector<T>,
22    sender: Sender<BroadcastMessage<T>>,
23}
24
25impl<T: Clone + 'static> ObservableVector<T> {
26    /// Create a new `ObservableVector`.
27    ///
28    /// As of the time of writing, this is equivalent to
29    /// `ObservableVector::with_capacity(16)`, but the internal buffer capacity
30    /// is subject to change in non-breaking releases.
31    ///
32    /// See [`with_capacity`][Self::with_capacity] for details about the buffer
33    /// capacity.
34    pub fn new() -> Self {
35        Self::with_capacity(16)
36    }
37
38    /// Create a new `ObservableVector` with the given capacity for the inner
39    /// buffer.
40    ///
41    /// Up to `capacity` updates that have not been received by all of the
42    /// subscribers yet will be retained in the inner buffer. If an update
43    /// happens while the buffer is at capacity, the oldest update is discarded
44    /// from it and all subscribers that have not yet received it will instead
45    /// see [`VectorDiff::Reset`] as the next update.
46    ///
47    /// # Panics
48    ///
49    /// Panics if the capacity is `0`, or overflows.
50    pub fn with_capacity(capacity: usize) -> Self {
51        let (sender, _) = broadcast::channel(capacity);
52        Self { values: Vector::new(), sender }
53    }
54
55    /// Turn the `ObservableVector` back into a regular `Vector`.
56    pub fn into_inner(self) -> Vector<T> {
57        self.values
58    }
59
60    /// Obtain a new subscriber.
61    ///
62    /// If you put the `ObservableVector` behind a lock, it is highly
63    /// recommended to make access of the elements and subscribing one
64    /// operation. Otherwise, the values could be altered in between the
65    /// reading of the values and subscribing to changes.
66    pub fn subscribe(&self) -> VectorSubscriber<T> {
67        let rx = self.sender.subscribe();
68        VectorSubscriber::new(self.values.clone(), rx)
69    }
70
71    /// Append the given elements at the end of the `Vector` and notify
72    /// subscribers.
73    pub fn append(&mut self, values: Vector<T>) {
74        #[cfg(feature = "tracing")]
75        tracing::debug!(target: "eyeball_im::vector::update", "append(len = {})", values.len());
76
77        self.values.append(values.clone());
78        self.broadcast_diff(VectorDiff::Append { values });
79    }
80
81    /// Clear out all of the elements in this `Vector` and notify subscribers.
82    pub fn clear(&mut self) {
83        let already_empty = self.values.is_empty();
84
85        #[cfg(feature = "tracing")]
86        tracing::debug!(
87            target: "eyeball_im::vector::update",
88            nop = already_empty.then_some(true),
89            "clear"
90        );
91
92        if !already_empty {
93            self.values.clear();
94            self.broadcast_diff(VectorDiff::Clear);
95        }
96    }
97
98    /// Add an element at the front of the list and notify subscribers.
99    pub fn push_front(&mut self, value: T) {
100        #[cfg(feature = "tracing")]
101        tracing::debug!(target: "eyeball_im::vector::update", "push_front");
102
103        self.values.push_front(value.clone());
104        self.broadcast_diff(VectorDiff::PushFront { value });
105    }
106
107    /// Add an element at the back of the list and notify subscribers.
108    pub fn push_back(&mut self, value: T) {
109        #[cfg(feature = "tracing")]
110        tracing::debug!(target: "eyeball_im::vector::update", "push_back");
111
112        self.values.push_back(value.clone());
113        self.broadcast_diff(VectorDiff::PushBack { value });
114    }
115
116    /// Remove the first element, notify subscribers and return the element.
117    ///
118    /// If there are no elements, subscribers will not be notified and this
119    /// method will return `None`.
120    pub fn pop_front(&mut self) -> Option<T> {
121        let value = self.values.pop_front();
122        if value.is_some() {
123            #[cfg(feature = "tracing")]
124            tracing::debug!(target: "eyeball_im::vector::update", "pop_front");
125
126            self.broadcast_diff(VectorDiff::PopFront);
127        }
128        value
129    }
130
131    /// Remove the last element, notify subscribers and return the element.
132    ///
133    /// If there are no elements, subscribers will not be notified and this
134    /// method will return `None`.
135    pub fn pop_back(&mut self) -> Option<T> {
136        let value = self.values.pop_back();
137        if value.is_some() {
138            #[cfg(feature = "tracing")]
139            tracing::debug!(target: "eyeball_im::vector::update", "pop_back");
140
141            self.broadcast_diff(VectorDiff::PopBack);
142        }
143        value
144    }
145
146    /// Insert an element at the given position and notify subscribers.
147    ///
148    /// # Panics
149    ///
150    /// Panics if `index > len`.
151    #[track_caller]
152    pub fn insert(&mut self, index: usize, value: T) {
153        let len = self.values.len();
154        if index <= len {
155            #[cfg(feature = "tracing")]
156            tracing::debug!(target: "eyeball_im::vector::update", "insert(index = {index})");
157
158            self.values.insert(index, value.clone());
159            self.broadcast_diff(VectorDiff::Insert { index, value });
160        } else {
161            panic!("index out of bounds: the length is {len} but the index is {index}");
162        }
163    }
164
165    /// Replace the element at the given position, notify subscribers and return
166    /// the previous element at that position.
167    ///
168    /// # Panics
169    ///
170    /// Panics if `index >= len`.
171    #[track_caller]
172    pub fn set(&mut self, index: usize, value: T) -> T {
173        let len = self.values.len();
174        if index < len {
175            #[cfg(feature = "tracing")]
176            tracing::debug!(target: "eyeball_im::vector::update", "set(index = {index})");
177
178            let old_value = self.values.set(index, value.clone());
179            self.broadcast_diff(VectorDiff::Set { index, value });
180            old_value
181        } else {
182            panic!("index out of bounds: the length is {len} but the index is {index}");
183        }
184    }
185
186    /// Remove the element at the given position, notify subscribers and return
187    /// the element.
188    ///
189    /// # Panics
190    ///
191    /// Panics if `index >= len`.
192    #[track_caller]
193    pub fn remove(&mut self, index: usize) -> T {
194        let len = self.values.len();
195        if index < len {
196            #[cfg(feature = "tracing")]
197            tracing::debug!(target: "eyeball_im::vector::update", "remove(index = {index})");
198
199            let value = self.values.remove(index);
200            self.broadcast_diff(VectorDiff::Remove { index });
201            value
202        } else {
203            panic!("index out of bounds: the length is {len} but the index is {index}");
204        }
205    }
206
207    /// Truncate the vector to `len` elements and notify subscribers.
208    ///
209    /// Does nothing if `len` is greater or equal to the vector's current
210    /// length.
211    pub fn truncate(&mut self, len: usize) {
212        if len < self.len() {
213            #[cfg(feature = "tracing")]
214            tracing::debug!(target: "eyeball_im::vector::update", "truncate(len = {len})");
215
216            self.values.truncate(len);
217            self.broadcast_diff(VectorDiff::Truncate { length: len });
218        }
219    }
220
221    /// Gets an entry for the given index, through which only the element at
222    /// that index alone can be updated or removed.
223    ///
224    /// # Panics
225    ///
226    /// Panics if `index >= len`.
227    #[track_caller]
228    pub fn entry(&mut self, index: usize) -> ObservableVectorEntry<'_, T> {
229        let len = self.values.len();
230        if index < len {
231            ObservableVectorEntry::new(self, index)
232        } else {
233            panic!("index out of bounds: the length is {len} but the index is {index}");
234        }
235    }
236
237    /// Call the given closure for every element in this `ObservableVector`,
238    /// with an entry struct that allows updating or removing that element.
239    ///
240    /// Iteration happens in order, i.e. starting at index `0`.
241    pub fn for_each(&mut self, mut f: impl FnMut(ObservableVectorEntry<'_, T>)) {
242        let mut entries = self.entries();
243        while let Some(entry) = entries.next() {
244            f(entry);
245        }
246    }
247
248    /// Get an iterator over all the entries in this `ObservableVector`.
249    ///
250    /// This is a more flexible, but less convenient alternative to
251    /// [`for_each`][Self::for_each]. If you don't need to use special control
252    /// flow like `.await` or `break` when iterating, it's recommended to use
253    /// that method instead.
254    ///
255    /// Because `std`'s `Iterator` trait does not allow iterator items to borrow
256    /// from the iterator itself, the returned typed does not implement the
257    /// `Iterator` trait and can thus not be used with a `for` loop. Instead,
258    /// you have to call its `.next()` method directly, as in:
259    ///
260    /// ```rust
261    /// # use eyeball_im::ObservableVector;
262    /// # let mut ob = ObservableVector::<u8>::new();
263    /// let mut entries = ob.entries();
264    /// while let Some(entry) = entries.next() {
265    ///     // use entry
266    /// }
267    /// ```
268    pub fn entries(&mut self) -> ObservableVectorEntries<'_, T> {
269        ObservableVectorEntries::new(self)
270    }
271
272    /// Start a new transaction to make multiple updates as one unit.
273    ///
274    /// See [`ObservableVectorTransaction`]s documentation for more details.
275    pub fn transaction(&mut self) -> ObservableVectorTransaction<'_, T> {
276        ObservableVectorTransaction::new(self)
277    }
278
279    fn broadcast_diff(&self, diff: VectorDiff<T>) {
280        if self.sender.receiver_count() != 0 {
281            let msg =
282                BroadcastMessage { diffs: OneOrManyDiffs::One(diff), state: self.values.clone() };
283            let _num_receivers = self.sender.send(msg).unwrap_or(0);
284            #[cfg(feature = "tracing")]
285            tracing::debug!(
286                target: "eyeball_im::vector::broadcast",
287                "New observable value broadcast to {_num_receivers} receivers"
288            );
289        }
290    }
291}
292
293impl<T: Clone + 'static> Default for ObservableVector<T> {
294    fn default() -> Self {
295        Self::new()
296    }
297}
298
299impl<T> fmt::Debug for ObservableVector<T>
300where
301    T: fmt::Debug,
302{
303    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
304        f.debug_struct("ObservableVector").field("values", &self.values).finish_non_exhaustive()
305    }
306}
307
308// Note: No DerefMut because all mutating must go through inherent methods that
309// notify subscribers
310impl<T> ops::Deref for ObservableVector<T> {
311    type Target = Vector<T>;
312
313    fn deref(&self) -> &Self::Target {
314        &self.values
315    }
316}
317
318impl<T: Clone + 'static> From<Vector<T>> for ObservableVector<T> {
319    fn from(values: Vector<T>) -> Self {
320        let mut this = Self::new();
321        this.append(values);
322        this
323    }
324}
325
326#[derive(Clone)]
327struct BroadcastMessage<T> {
328    diffs: OneOrManyDiffs<T>,
329    state: Vector<T>,
330}
331
332#[derive(Clone)]
333enum OneOrManyDiffs<T> {
334    One(VectorDiff<T>),
335    Many(Vec<VectorDiff<T>>),
336}
337
338impl<T> OneOrManyDiffs<T> {
339    fn into_vec(self) -> Vec<VectorDiff<T>> {
340        match self {
341            OneOrManyDiffs::One(diff) => vec![diff],
342            OneOrManyDiffs::Many(diffs) => diffs,
343        }
344    }
345}
346
347/// A change to an [`ObservableVector`].
348#[derive(Clone, Debug, PartialEq, Eq)]
349pub enum VectorDiff<T> {
350    /// Multiple elements were appended.
351    Append {
352        /// The appended elements.
353        values: Vector<T>,
354    },
355    /// The vector was cleared.
356    Clear,
357    /// An element was added at the front.
358    PushFront {
359        /// The new element.
360        value: T,
361    },
362    /// An element was added at the back.
363    PushBack {
364        /// The new element.
365        value: T,
366    },
367    /// The element at the front was removed.
368    PopFront,
369    /// The element at the back was removed.
370    PopBack,
371    /// An element was inserted at the given position.
372    Insert {
373        /// The index of the new element.
374        ///
375        /// The element that was previously at that index as well as all the
376        /// ones after it were shifted to the right.
377        index: usize,
378        /// The new element.
379        value: T,
380    },
381    /// A replacement of the previous value at the given position.
382    Set {
383        /// The index of the element that was replaced.
384        index: usize,
385        /// The new element.
386        value: T,
387    },
388    /// Removal of an element.
389    Remove {
390        /// The index that the removed element had.
391        index: usize,
392    },
393    /// Truncation of the vector.
394    Truncate {
395        /// The number of elements that remain.
396        length: usize,
397    },
398    /// The subscriber lagged too far behind, and the next update that should
399    /// have been received has already been discarded from the internal buffer.
400    Reset {
401        /// The full list of elements.
402        values: Vector<T>,
403    },
404}
405
406impl<T: Clone> VectorDiff<T> {
407    /// Transform `VectorDiff<T>` into `VectorDiff<U>` by applying the given
408    /// function to any contained items.
409    pub fn map<U: Clone>(self, mut f: impl FnMut(T) -> U) -> VectorDiff<U> {
410        match self {
411            VectorDiff::Append { values } => VectorDiff::Append { values: vector_map(values, f) },
412            VectorDiff::Clear => VectorDiff::Clear,
413            VectorDiff::PushFront { value } => VectorDiff::PushFront { value: f(value) },
414            VectorDiff::PushBack { value } => VectorDiff::PushBack { value: f(value) },
415            VectorDiff::PopFront => VectorDiff::PopFront,
416            VectorDiff::PopBack => VectorDiff::PopBack,
417            VectorDiff::Insert { index, value } => VectorDiff::Insert { index, value: f(value) },
418            VectorDiff::Set { index, value } => VectorDiff::Set { index, value: f(value) },
419            VectorDiff::Remove { index } => VectorDiff::Remove { index },
420            VectorDiff::Truncate { length } => VectorDiff::Truncate { length },
421            VectorDiff::Reset { values } => VectorDiff::Reset { values: vector_map(values, f) },
422        }
423    }
424
425    /// Applies this [`VectorDiff`] to a vector.
426    ///
427    /// This is useful to keep two vectors in sync, with potentially one
428    /// containing data [`map`](Self::map)ped from the other.
429    ///
430    /// # Panics
431    ///
432    /// When inserting/setting/removing elements past the end.
433    pub fn apply(self, vec: &mut Vector<T>) {
434        match self {
435            VectorDiff::Append { values } => {
436                vec.append(values);
437            }
438            VectorDiff::Clear => {
439                vec.clear();
440            }
441            VectorDiff::PushFront { value } => {
442                vec.push_front(value);
443            }
444            VectorDiff::PushBack { value } => {
445                vec.push_back(value);
446            }
447            VectorDiff::PopFront => {
448                vec.pop_front();
449            }
450            VectorDiff::PopBack => {
451                vec.pop_back();
452            }
453            VectorDiff::Insert { index, value } => {
454                vec.insert(index, value);
455            }
456            VectorDiff::Set { index, value } => {
457                vec.set(index, value);
458            }
459            VectorDiff::Remove { index } => {
460                vec.remove(index);
461            }
462            VectorDiff::Truncate { length } => {
463                vec.truncate(length);
464            }
465            VectorDiff::Reset { values } => {
466                *vec = values;
467            }
468        }
469    }
470}
471
472#[cfg(feature = "serde")]
473impl<T> serde::Serialize for VectorDiff<T>
474where
475    T: serde::Serialize + Clone,
476{
477    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
478    where
479        S: serde::Serializer,
480    {
481        use serde::ser::SerializeStructVariant;
482
483        const SELF_NAME: &str = "VectorDiff";
484
485        match self {
486            Self::Append { values } => {
487                let mut state = serializer.serialize_struct_variant(SELF_NAME, 0, "Append", 1)?;
488                state.serialize_field("values", values)?;
489                state.end()
490            }
491            VectorDiff::Clear => {
492                serializer.serialize_struct_variant(SELF_NAME, 1, "Clear", 0)?.end()
493            }
494            VectorDiff::PushFront { value } => {
495                let mut state =
496                    serializer.serialize_struct_variant(SELF_NAME, 2, "PushFront", 1)?;
497                state.serialize_field("value", value)?;
498                state.end()
499            }
500            VectorDiff::PushBack { value } => {
501                let mut state = serializer.serialize_struct_variant(SELF_NAME, 3, "PushBack", 1)?;
502                state.serialize_field("value", value)?;
503                state.end()
504            }
505            VectorDiff::PopFront => {
506                serializer.serialize_struct_variant(SELF_NAME, 4, "PopFront", 0)?.end()
507            }
508            VectorDiff::PopBack => {
509                serializer.serialize_struct_variant(SELF_NAME, 5, "PopBack", 0)?.end()
510            }
511            VectorDiff::Insert { index, value } => {
512                let mut state = serializer.serialize_struct_variant(SELF_NAME, 6, "Insert", 2)?;
513                state.serialize_field("index", index)?;
514                state.serialize_field("value", value)?;
515                state.end()
516            }
517            VectorDiff::Set { index, value } => {
518                let mut state = serializer.serialize_struct_variant(SELF_NAME, 7, "Set", 2)?;
519                state.serialize_field("index", index)?;
520                state.serialize_field("value", value)?;
521                state.end()
522            }
523            VectorDiff::Remove { index } => {
524                let mut state = serializer.serialize_struct_variant(SELF_NAME, 8, "Remove", 1)?;
525                state.serialize_field("index", index)?;
526                state.end()
527            }
528            VectorDiff::Truncate { length } => {
529                let mut state = serializer.serialize_struct_variant(SELF_NAME, 9, "Truncate", 1)?;
530                state.serialize_field("length", length)?;
531                state.end()
532            }
533            VectorDiff::Reset { values } => {
534                let mut state = serializer.serialize_struct_variant(SELF_NAME, 10, "Reset", 1)?;
535                state.serialize_field("values", values)?;
536                state.end()
537            }
538        }
539    }
540}
541
542fn vector_map<T: Clone, U: Clone>(v: Vector<T>, f: impl FnMut(T) -> U) -> Vector<U> {
543    v.into_iter().map(f).collect()
544}