1use std::{fmt, ops};
2
3use imbl::Vector;
4use tokio::sync::broadcast::{self, Sender};
5
6mod entry;
7mod subscriber;
8mod transaction;
9
10pub use self::{
11 entry::{ObservableVectorEntries, ObservableVectorEntry},
12 subscriber::{VectorSubscriber, VectorSubscriberBatchedStream, VectorSubscriberStream},
13 transaction::{
14 ObservableVectorTransaction, ObservableVectorTransactionEntries,
15 ObservableVectorTransactionEntry,
16 },
17};
18
19pub struct ObservableVector<T> {
21 values: Vector<T>,
22 sender: Sender<BroadcastMessage<T>>,
23}
24
25impl<T: Clone + 'static> ObservableVector<T> {
26 pub fn new() -> Self {
35 Self::with_capacity(16)
36 }
37
38 pub fn with_capacity(capacity: usize) -> Self {
51 let (sender, _) = broadcast::channel(capacity);
52 Self { values: Vector::new(), sender }
53 }
54
55 pub fn into_inner(self) -> Vector<T> {
57 self.values
58 }
59
60 pub fn subscribe(&self) -> VectorSubscriber<T> {
67 let rx = self.sender.subscribe();
68 VectorSubscriber::new(self.values.clone(), rx)
69 }
70
71 pub fn append(&mut self, values: Vector<T>) {
74 #[cfg(feature = "tracing")]
75 tracing::debug!(target: "eyeball_im::vector::update", "append(len = {})", values.len());
76
77 self.values.append(values.clone());
78 self.broadcast_diff(VectorDiff::Append { values });
79 }
80
81 pub fn clear(&mut self) {
83 let already_empty = self.values.is_empty();
84
85 #[cfg(feature = "tracing")]
86 tracing::debug!(
87 target: "eyeball_im::vector::update",
88 nop = already_empty.then_some(true),
89 "clear"
90 );
91
92 if !already_empty {
93 self.values.clear();
94 self.broadcast_diff(VectorDiff::Clear);
95 }
96 }
97
98 pub fn push_front(&mut self, value: T) {
100 #[cfg(feature = "tracing")]
101 tracing::debug!(target: "eyeball_im::vector::update", "push_front");
102
103 self.values.push_front(value.clone());
104 self.broadcast_diff(VectorDiff::PushFront { value });
105 }
106
107 pub fn push_back(&mut self, value: T) {
109 #[cfg(feature = "tracing")]
110 tracing::debug!(target: "eyeball_im::vector::update", "push_back");
111
112 self.values.push_back(value.clone());
113 self.broadcast_diff(VectorDiff::PushBack { value });
114 }
115
116 pub fn pop_front(&mut self) -> Option<T> {
121 let value = self.values.pop_front();
122 if value.is_some() {
123 #[cfg(feature = "tracing")]
124 tracing::debug!(target: "eyeball_im::vector::update", "pop_front");
125
126 self.broadcast_diff(VectorDiff::PopFront);
127 }
128 value
129 }
130
131 pub fn pop_back(&mut self) -> Option<T> {
136 let value = self.values.pop_back();
137 if value.is_some() {
138 #[cfg(feature = "tracing")]
139 tracing::debug!(target: "eyeball_im::vector::update", "pop_back");
140
141 self.broadcast_diff(VectorDiff::PopBack);
142 }
143 value
144 }
145
146 #[track_caller]
152 pub fn insert(&mut self, index: usize, value: T) {
153 let len = self.values.len();
154 if index <= len {
155 #[cfg(feature = "tracing")]
156 tracing::debug!(target: "eyeball_im::vector::update", "insert(index = {index})");
157
158 self.values.insert(index, value.clone());
159 self.broadcast_diff(VectorDiff::Insert { index, value });
160 } else {
161 panic!("index out of bounds: the length is {len} but the index is {index}");
162 }
163 }
164
165 #[track_caller]
172 pub fn set(&mut self, index: usize, value: T) -> T {
173 let len = self.values.len();
174 if index < len {
175 #[cfg(feature = "tracing")]
176 tracing::debug!(target: "eyeball_im::vector::update", "set(index = {index})");
177
178 let old_value = self.values.set(index, value.clone());
179 self.broadcast_diff(VectorDiff::Set { index, value });
180 old_value
181 } else {
182 panic!("index out of bounds: the length is {len} but the index is {index}");
183 }
184 }
185
186 #[track_caller]
193 pub fn remove(&mut self, index: usize) -> T {
194 let len = self.values.len();
195 if index < len {
196 #[cfg(feature = "tracing")]
197 tracing::debug!(target: "eyeball_im::vector::update", "remove(index = {index})");
198
199 let value = self.values.remove(index);
200 self.broadcast_diff(VectorDiff::Remove { index });
201 value
202 } else {
203 panic!("index out of bounds: the length is {len} but the index is {index}");
204 }
205 }
206
207 pub fn truncate(&mut self, len: usize) {
212 if len < self.len() {
213 #[cfg(feature = "tracing")]
214 tracing::debug!(target: "eyeball_im::vector::update", "truncate(len = {len})");
215
216 self.values.truncate(len);
217 self.broadcast_diff(VectorDiff::Truncate { length: len });
218 }
219 }
220
221 #[track_caller]
228 pub fn entry(&mut self, index: usize) -> ObservableVectorEntry<'_, T> {
229 let len = self.values.len();
230 if index < len {
231 ObservableVectorEntry::new(self, index)
232 } else {
233 panic!("index out of bounds: the length is {len} but the index is {index}");
234 }
235 }
236
237 pub fn for_each(&mut self, mut f: impl FnMut(ObservableVectorEntry<'_, T>)) {
242 let mut entries = self.entries();
243 while let Some(entry) = entries.next() {
244 f(entry);
245 }
246 }
247
248 pub fn entries(&mut self) -> ObservableVectorEntries<'_, T> {
269 ObservableVectorEntries::new(self)
270 }
271
272 pub fn transaction(&mut self) -> ObservableVectorTransaction<'_, T> {
276 ObservableVectorTransaction::new(self)
277 }
278
279 fn broadcast_diff(&self, diff: VectorDiff<T>) {
280 if self.sender.receiver_count() != 0 {
281 let msg =
282 BroadcastMessage { diffs: OneOrManyDiffs::One(diff), state: self.values.clone() };
283 let _num_receivers = self.sender.send(msg).unwrap_or(0);
284 #[cfg(feature = "tracing")]
285 tracing::debug!(
286 target: "eyeball_im::vector::broadcast",
287 "New observable value broadcast to {_num_receivers} receivers"
288 );
289 }
290 }
291}
292
293impl<T: Clone + 'static> Default for ObservableVector<T> {
294 fn default() -> Self {
295 Self::new()
296 }
297}
298
299impl<T> fmt::Debug for ObservableVector<T>
300where
301 T: fmt::Debug,
302{
303 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
304 f.debug_struct("ObservableVector").field("values", &self.values).finish_non_exhaustive()
305 }
306}
307
308impl<T> ops::Deref for ObservableVector<T> {
311 type Target = Vector<T>;
312
313 fn deref(&self) -> &Self::Target {
314 &self.values
315 }
316}
317
318impl<T: Clone + 'static> From<Vector<T>> for ObservableVector<T> {
319 fn from(values: Vector<T>) -> Self {
320 let mut this = Self::new();
321 this.append(values);
322 this
323 }
324}
325
326#[derive(Clone)]
327struct BroadcastMessage<T> {
328 diffs: OneOrManyDiffs<T>,
329 state: Vector<T>,
330}
331
332#[derive(Clone)]
333enum OneOrManyDiffs<T> {
334 One(VectorDiff<T>),
335 Many(Vec<VectorDiff<T>>),
336}
337
338impl<T> OneOrManyDiffs<T> {
339 fn into_vec(self) -> Vec<VectorDiff<T>> {
340 match self {
341 OneOrManyDiffs::One(diff) => vec![diff],
342 OneOrManyDiffs::Many(diffs) => diffs,
343 }
344 }
345}
346
347#[derive(Clone, Debug, PartialEq, Eq)]
349pub enum VectorDiff<T> {
350 Append {
352 values: Vector<T>,
354 },
355 Clear,
357 PushFront {
359 value: T,
361 },
362 PushBack {
364 value: T,
366 },
367 PopFront,
369 PopBack,
371 Insert {
373 index: usize,
378 value: T,
380 },
381 Set {
383 index: usize,
385 value: T,
387 },
388 Remove {
390 index: usize,
392 },
393 Truncate {
395 length: usize,
397 },
398 Reset {
401 values: Vector<T>,
403 },
404}
405
406impl<T: Clone> VectorDiff<T> {
407 pub fn map<U: Clone>(self, mut f: impl FnMut(T) -> U) -> VectorDiff<U> {
410 match self {
411 VectorDiff::Append { values } => VectorDiff::Append { values: vector_map(values, f) },
412 VectorDiff::Clear => VectorDiff::Clear,
413 VectorDiff::PushFront { value } => VectorDiff::PushFront { value: f(value) },
414 VectorDiff::PushBack { value } => VectorDiff::PushBack { value: f(value) },
415 VectorDiff::PopFront => VectorDiff::PopFront,
416 VectorDiff::PopBack => VectorDiff::PopBack,
417 VectorDiff::Insert { index, value } => VectorDiff::Insert { index, value: f(value) },
418 VectorDiff::Set { index, value } => VectorDiff::Set { index, value: f(value) },
419 VectorDiff::Remove { index } => VectorDiff::Remove { index },
420 VectorDiff::Truncate { length } => VectorDiff::Truncate { length },
421 VectorDiff::Reset { values } => VectorDiff::Reset { values: vector_map(values, f) },
422 }
423 }
424
425 pub fn apply(self, vec: &mut Vector<T>) {
434 match self {
435 VectorDiff::Append { values } => {
436 vec.append(values);
437 }
438 VectorDiff::Clear => {
439 vec.clear();
440 }
441 VectorDiff::PushFront { value } => {
442 vec.push_front(value);
443 }
444 VectorDiff::PushBack { value } => {
445 vec.push_back(value);
446 }
447 VectorDiff::PopFront => {
448 vec.pop_front();
449 }
450 VectorDiff::PopBack => {
451 vec.pop_back();
452 }
453 VectorDiff::Insert { index, value } => {
454 vec.insert(index, value);
455 }
456 VectorDiff::Set { index, value } => {
457 vec.set(index, value);
458 }
459 VectorDiff::Remove { index } => {
460 vec.remove(index);
461 }
462 VectorDiff::Truncate { length } => {
463 vec.truncate(length);
464 }
465 VectorDiff::Reset { values } => {
466 *vec = values;
467 }
468 }
469 }
470}
471
472#[cfg(feature = "serde")]
473impl<T> serde::Serialize for VectorDiff<T>
474where
475 T: serde::Serialize + Clone,
476{
477 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
478 where
479 S: serde::Serializer,
480 {
481 use serde::ser::SerializeStructVariant;
482
483 const SELF_NAME: &str = "VectorDiff";
484
485 match self {
486 Self::Append { values } => {
487 let mut state = serializer.serialize_struct_variant(SELF_NAME, 0, "Append", 1)?;
488 state.serialize_field("values", values)?;
489 state.end()
490 }
491 VectorDiff::Clear => {
492 serializer.serialize_struct_variant(SELF_NAME, 1, "Clear", 0)?.end()
493 }
494 VectorDiff::PushFront { value } => {
495 let mut state =
496 serializer.serialize_struct_variant(SELF_NAME, 2, "PushFront", 1)?;
497 state.serialize_field("value", value)?;
498 state.end()
499 }
500 VectorDiff::PushBack { value } => {
501 let mut state = serializer.serialize_struct_variant(SELF_NAME, 3, "PushBack", 1)?;
502 state.serialize_field("value", value)?;
503 state.end()
504 }
505 VectorDiff::PopFront => {
506 serializer.serialize_struct_variant(SELF_NAME, 4, "PopFront", 0)?.end()
507 }
508 VectorDiff::PopBack => {
509 serializer.serialize_struct_variant(SELF_NAME, 5, "PopBack", 0)?.end()
510 }
511 VectorDiff::Insert { index, value } => {
512 let mut state = serializer.serialize_struct_variant(SELF_NAME, 6, "Insert", 2)?;
513 state.serialize_field("index", index)?;
514 state.serialize_field("value", value)?;
515 state.end()
516 }
517 VectorDiff::Set { index, value } => {
518 let mut state = serializer.serialize_struct_variant(SELF_NAME, 7, "Set", 2)?;
519 state.serialize_field("index", index)?;
520 state.serialize_field("value", value)?;
521 state.end()
522 }
523 VectorDiff::Remove { index } => {
524 let mut state = serializer.serialize_struct_variant(SELF_NAME, 8, "Remove", 1)?;
525 state.serialize_field("index", index)?;
526 state.end()
527 }
528 VectorDiff::Truncate { length } => {
529 let mut state = serializer.serialize_struct_variant(SELF_NAME, 9, "Truncate", 1)?;
530 state.serialize_field("length", length)?;
531 state.end()
532 }
533 VectorDiff::Reset { values } => {
534 let mut state = serializer.serialize_struct_variant(SELF_NAME, 10, "Reset", 1)?;
535 state.serialize_field("values", values)?;
536 state.end()
537 }
538 }
539 }
540}
541
542fn vector_map<T: Clone, U: Clone>(v: Vector<T>, f: impl FnMut(T) -> U) -> Vector<U> {
543 v.into_iter().map(f).collect()
544}