1use crate::indexing::{encode_tuple_values_with_key_spec, KeySpec};
2use crate::{entity::Entity, model::View, reactor::AbstractEntity};
3use ankurah_proto as proto;
4use ankurah_signals::{
5 broadcast::{Broadcast, BroadcastId},
6 signal::{Listener, ListenerGuard},
7 subscribe::IntoSubscribeListener,
8 CurrentObserver, Get, Peek, Signal, Subscribe, SubscriptionGuard,
9};
10use std::{
11 collections::HashMap,
12 ops::Deref,
13 sync::{
14 atomic::{AtomicBool, Ordering},
15 Arc,
16 },
17};
18
19#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
21enum IVec {
22 Small([u8; 16]),
24 Large(Vec<u8>),
26}
27
28impl IVec {
29 fn from_slice(bytes: &[u8]) -> Self {
31 if bytes.len() <= 16 {
32 let mut data = [0u8; 16];
33 data[..bytes.len()].copy_from_slice(bytes);
34 Self::Small(data)
35 } else {
36 Self::Large(bytes.to_vec())
37 }
38 }
39}
40
41impl From<Vec<u8>> for IVec {
42 fn from(vec: Vec<u8>) -> Self { Self::from_slice(&vec) }
43}
44
45#[derive(Debug, Clone)]
46pub struct EntityResultSet<E: AbstractEntity = Entity>(Arc<Inner<E>>);
47
48#[derive(Debug)]
50pub struct ResultSet<R: View>(EntityResultSet<Entity>, std::marker::PhantomData<R>);
51
52impl<R: View> Deref for ResultSet<R> {
53 type Target = EntityResultSet<Entity>;
54 fn deref(&self) -> &Self::Target { &self.0 }
55}
56
57impl<R: View> ResultSet<R> {
58 pub fn by_id(&self, id: &proto::EntityId) -> Option<R> { self.0.by_id(id).map(|e| R::from_entity(e)) }
59}
60
61#[derive(Debug)]
62struct Inner<E: AbstractEntity> {
63 state: std::sync::Mutex<State<E>>,
65 loaded: AtomicBool,
66 broadcast: Broadcast<()>,
67}
68
69#[derive(Debug)]
70struct State<E: AbstractEntity> {
71 order: Vec<EntityEntry<E>>,
72 index: HashMap<proto::EntityId, usize>,
73 key_spec: Option<KeySpec>,
75 limit: Option<usize>,
76 gap_dirty: bool, }
78
79#[derive(Debug, Clone)]
80struct EntityEntry<E: AbstractEntity> {
81 entity: E,
82 sort_key: Option<IVec>,
83 dirty: bool,
84}
85pub struct ResultSetWrite<'a, E: AbstractEntity = Entity> {
91 resultset: &'a EntityResultSet<E>,
92 changed: bool,
93 guard: Option<std::sync::MutexGuard<'a, State<E>>>,
94}
95
96pub struct ResultSetRead<'a, E: AbstractEntity = Entity> {
99 guard: std::sync::MutexGuard<'a, State<E>>,
100}
101
102impl<'a, E: AbstractEntity> ResultSetWrite<'a, E> {
104 pub fn add(&mut self, entity: E) -> bool {
106 let guard = self.guard.as_mut().expect("write guard already dropped");
107 let id = *entity.id();
108 if guard.index.contains_key(&id) {
109 return false; }
111
112 let sort_key = guard.key_spec.as_ref().map(|key_spec| Self::compute_sort_key(&entity, key_spec));
114
115 let entry = EntityEntry { entity, sort_key, dirty: false };
116
117 let pos = guard
119 .order
120 .binary_search_by(|existing| {
121 match (&existing.sort_key, &entry.sort_key) {
122 (Some(existing_key), Some(entry_key)) => {
123 existing_key.cmp(entry_key).then_with(|| existing.entity.id().cmp(entry.entity.id()))
125 }
126 (Some(_), None) => std::cmp::Ordering::Less, (None, Some(_)) => std::cmp::Ordering::Greater, (None, None) => existing.entity.id().cmp(entry.entity.id()), }
130 })
131 .unwrap_or_else(|pos| pos);
132
133 guard.order.insert(pos, entry);
134 guard.index.insert(id, pos);
135
136 for i in (pos + 1)..guard.order.len() {
138 let entry_id = *guard.order[i].entity.id();
139 guard.index.insert(entry_id, i);
140 }
141
142 if let Some(limit) = guard.limit {
144 if guard.order.len() > limit {
145 if let Some(removed_entry) = guard.order.pop() {
147 let removed_id = *removed_entry.entity.id();
148 guard.index.remove(&removed_id);
149 }
151 }
152 }
153
154 self.changed = true;
155 true
156 }
157
158 pub fn remove(&mut self, id: proto::EntityId) -> bool {
160 let guard = self.guard.as_mut().expect("write guard already dropped");
161 if let Some(idx) = guard.index.remove(&id) {
162 if guard.limit.is_some_and(|limit| guard.order.len() == limit) {
164 guard.gap_dirty = true;
165 }
166
167 guard.order.remove(idx);
168 if idx < guard.order.len() {
169 fix_from(guard, idx);
170 }
171
172 self.changed = true;
173 true
174 } else {
175 false
176 }
177 }
178
179 pub fn contains(&self, id: &proto::EntityId) -> bool {
181 self.guard.as_ref().expect("write guard already dropped").index.contains_key(id)
182 }
183
184 pub fn iter_entities(&self) -> impl Iterator<Item = (proto::EntityId, &E)> {
187 let guard = self.guard.as_ref().expect("write guard already dropped");
188 guard.order.iter().map(|entry| (*entry.entity.id(), &entry.entity))
189 }
190
191 pub fn mark_all_dirty(&mut self) {
193 let guard = self.guard.as_mut().expect("write guard already dropped");
194 for entry in &mut guard.order {
195 entry.dirty = true;
196 }
197 self.changed = true;
198 }
199
200 pub fn retain_dirty<F>(&mut self, mut should_retain: F) -> Vec<proto::EntityId>
202 where F: FnMut(&E) -> bool {
203 let guard = self.guard.as_mut().expect("write guard already dropped");
204 let mut removed_ids = Vec::new();
205 let mut i = 0;
206
207 let was_at_limit = guard.limit.is_some_and(|limit| guard.order.len() == limit);
209
210 while i < guard.order.len() {
211 if guard.order[i].dirty {
212 let should_keep = should_retain(&guard.order[i].entity);
213 if should_keep {
214 let key_spec = guard.key_spec.clone();
216 if let Some(key_spec) = key_spec {
217 guard.order[i].sort_key = Some(Self::compute_sort_key(&guard.order[i].entity, &key_spec));
218 }
219 guard.order[i].dirty = false;
220 i += 1;
221 } else {
222 let removed_entry = guard.order.remove(i);
224 let removed_id = *removed_entry.entity.id();
225 guard.index.remove(&removed_id);
226 removed_ids.push(removed_id);
227 }
229 } else {
230 i += 1;
231 }
232 }
233
234 guard.index.clear();
236 let index_updates: Vec<_> = guard.order.iter().enumerate().map(|(i, entry)| (*entry.entity.id(), i)).collect();
237 for (id, i) in index_updates {
238 guard.index.insert(id, i);
239 }
240
241 if !removed_ids.is_empty() {
242 self.changed = true;
243
244 if (!guard.gap_dirty) && was_at_limit && guard.limit.is_some_and(|limit| guard.order.len() < limit) {
246 guard.gap_dirty = true;
247 }
248 }
249
250 removed_ids
251 }
252
253 pub fn replace_all(&mut self, entities: Vec<E>) {
255 let guard = self.guard.as_mut().expect("write guard already dropped");
256
257 guard.order.clear();
259 guard.index.clear();
260
261 for entity in entities {
263 let sort_key = guard.key_spec.as_ref().map(|key_spec| Self::compute_sort_key(&entity, key_spec));
265
266 let entry = EntityEntry { entity, sort_key, dirty: false };
267 guard.order.push(entry);
268 }
269
270 if guard.key_spec.is_some() {
272 guard.order.sort_by(|a, b| {
273 match (&a.sort_key, &b.sort_key) {
274 (Some(key_a), Some(key_b)) => {
275 key_a.cmp(key_b).then_with(|| a.entity.id().cmp(b.entity.id()))
277 }
278 (Some(_), None) => std::cmp::Ordering::Less,
279 (None, Some(_)) => std::cmp::Ordering::Greater,
280 (None, None) => a.entity.id().cmp(b.entity.id()),
281 }
282 });
283 } else {
284 guard.order.sort_by(|a, b| a.entity.id().cmp(b.entity.id()));
286 }
287
288 if let Some(limit) = guard.limit {
290 if guard.order.len() > limit {
291 guard.order.truncate(limit);
292 }
293 }
294
295 let index_updates: Vec<_> = guard.order.iter().enumerate().map(|(i, entry)| (*entry.entity.id(), i)).collect();
297 for (id, i) in index_updates {
298 guard.index.insert(id, i);
299 }
300
301 self.changed = true;
302 }
303
304 fn compute_sort_key(entity: &E, key_spec: &KeySpec) -> IVec {
306 let mut values = Vec::new();
307
308 for keypart in &key_spec.keyparts {
310 let value = AbstractEntity::value(entity, &keypart.column);
311 if let Some(v) = value {
314 values.push(v);
315 } else {
316 return IVec::from_slice(&[]); }
319 }
320
321 let encoded = encode_tuple_values_with_key_spec(&values, key_spec).unwrap_or_default();
323 IVec::from(encoded)
324 }
325}
326
327impl<'a, E: AbstractEntity> Drop for ResultSetWrite<'a, E> {
328 fn drop(&mut self) {
329 if self.changed {
330 drop(self.guard.take());
332 self.resultset.0.broadcast.send(());
333 }
334 }
335}
336
337impl<'a, E: AbstractEntity> ResultSetRead<'a, E> {
338 pub fn contains(&self, id: &proto::EntityId) -> bool { self.guard.index.contains_key(id) }
340
341 pub fn iter_entities(&self) -> impl Iterator<Item = (proto::EntityId, &E)> {
344 self.guard.order.iter().map(|entity| (*entity.entity.id(), &entity.entity))
345 }
346
347 pub fn len(&self) -> usize { self.guard.order.len() }
349
350 pub fn is_empty(&self) -> bool { self.guard.order.is_empty() }
352}
353
354impl<E: AbstractEntity> EntityResultSet<E> {
355 pub fn from_vec(entities: Vec<E>, loaded: bool) -> Self {
356 let mut index = HashMap::new();
357 let mut order = Vec::new();
358
359 for (i, entity) in entities.into_iter().enumerate() {
360 index.insert(*entity.id(), i);
361 order.push(EntityEntry { entity, sort_key: None, dirty: false });
362 }
363
364 let state = State { order, index, key_spec: None, limit: None, gap_dirty: false };
365 Self(Arc::new(Inner { state: std::sync::Mutex::new(state), loaded: AtomicBool::new(loaded), broadcast: Broadcast::new() }))
366 }
367 pub fn empty() -> Self {
368 let state = State { order: Vec::new(), index: HashMap::new(), key_spec: None, limit: None, gap_dirty: false };
369 Self(Arc::new(Inner { state: std::sync::Mutex::new(state), loaded: AtomicBool::new(false), broadcast: Broadcast::new() }))
370 }
371 pub fn single(entity: E) -> Self {
372 let entry = EntityEntry { entity: entity.clone(), sort_key: None, dirty: false };
373 let mut state = State { order: vec![entry], index: HashMap::new(), key_spec: None, limit: None, gap_dirty: false };
374 state.index.insert(*entity.id(), 0);
375 Self(Arc::new(Inner { state: std::sync::Mutex::new(state), loaded: AtomicBool::new(false), broadcast: Broadcast::new() }))
376 }
377
378 pub fn write(&self) -> ResultSetWrite<'_, E> {
382 let guard = self.0.state.lock().unwrap();
383 ResultSetWrite { resultset: self, changed: false, guard: Some(guard) }
384 }
385
386 pub fn read(&self) -> ResultSetRead<'_, E> {
388 let guard = self.0.state.lock().unwrap();
389 ResultSetRead { guard }
390 }
391 pub fn set_loaded(&self, loaded: bool) {
392 self.0.loaded.store(loaded, Ordering::Relaxed);
393 self.0.broadcast.send(());
394 }
395 pub fn is_loaded(&self) -> bool {
396 CurrentObserver::track(&self);
397 self.0.loaded.load(Ordering::Relaxed)
398 }
399
400 pub fn clear(&self) {
401 let mut st = self.0.state.lock().unwrap();
402 st.order.clear();
403 st.index.clear();
404 drop(st);
405 self.0.broadcast.send(());
406 }
407
408 pub fn keys(&self) -> EntityResultSetKeyIterator {
410 CurrentObserver::track(&self);
412 let st = self.0.state.lock().unwrap();
413 let keys: Vec<proto::EntityId> = st.order.iter().map(|e| *e.entity.id()).collect();
414 EntityResultSetKeyIterator::new(keys)
415 }
416
417 pub fn contains_key(&self, id: &proto::EntityId) -> bool {
419 CurrentObserver::track(&self);
421 let st = self.0.state.lock().unwrap();
422 st.index.contains_key(id)
423 }
424
425 pub fn by_id(&self, id: &proto::EntityId) -> Option<E> {
426 CurrentObserver::track(self);
428 let st = self.0.state.lock().unwrap();
429 st.index.get(id).map(|&i| st.order[i].entity.clone())
430 }
431
432 pub fn len(&self) -> usize {
433 CurrentObserver::track(&self);
434 let st = self.0.state.lock().unwrap();
435 st.order.len()
436 }
437
438 pub(crate) fn is_gap_dirty(&self) -> bool {
440 let st = self.0.state.lock().unwrap();
441 st.gap_dirty
442 }
443
444 pub(crate) fn clear_gap_dirty(&self) {
446 let mut st = self.0.state.lock().unwrap();
447 st.gap_dirty = false;
448 }
449
450 pub fn get_limit(&self) -> Option<usize> {
452 let st = self.0.state.lock().unwrap();
453 st.limit
454 }
455
456 pub(crate) fn last_entity(&self) -> Option<E> {
458 let st = self.0.state.lock().unwrap();
459 st.order.last().map(|entry| entry.entity.clone())
460 }
461
462 pub(crate) fn order_by(&self, key_spec: Option<KeySpec>) {
464 let mut st = self.0.state.lock().unwrap();
465
466 if st.key_spec == key_spec {
468 return; }
470
471 st.key_spec = key_spec.clone();
472
473 for entry in &mut st.order {
475 entry.sort_key = if let Some(ref ks) = key_spec {
476 Some(ResultSetWrite::compute_sort_key(&entry.entity, ks))
477 } else {
478 None };
480 }
481
482 st.order.sort_by(|a, b| {
484 match (&a.sort_key, &b.sort_key) {
485 (Some(key_a), Some(key_b)) => {
486 match key_a.cmp(key_b) {
488 std::cmp::Ordering::Equal => a.entity.id().cmp(b.entity.id()), other => other,
490 }
491 }
492 (Some(_), None) => std::cmp::Ordering::Greater,
493 (None, Some(_)) => std::cmp::Ordering::Less,
494 (None, None) => a.entity.id().cmp(b.entity.id()),
495 }
496 });
497
498 st.index.clear();
500 let index_updates: Vec<_> = st.order.iter().enumerate().map(|(i, entry)| (*entry.entity.id(), i)).collect();
501 for (id, i) in index_updates {
502 st.index.insert(id, i);
503 }
504
505 drop(st);
506 self.0.broadcast.send(());
507 }
508
509 pub(crate) fn limit(&self, limit: Option<usize>) {
511 let mut st = self.0.state.lock().unwrap();
512
513 if st.limit == limit {
515 return; }
517
518 st.limit = limit;
519
520 let mut entities_removed = false;
522 if let Some(limit) = limit {
523 if st.order.len() > limit {
524 st.order.truncate(limit);
525 entities_removed = true;
526
527 st.index.clear();
529 let index_updates: Vec<_> = st.order.iter().enumerate().map(|(i, entry)| (*entry.entity.id(), i)).collect();
530 for (id, i) in index_updates {
531 st.index.insert(id, i);
532 }
533 }
534 }
535
536 drop(st);
537
538 if entities_removed {
540 self.0.broadcast.send(());
541 }
542 }
543}
544
545#[cfg(test)]
546mod tests {
547 use super::*;
548 use crate::indexing::{IndexDirection, IndexKeyPart, KeySpec, NullsOrder};
549 use crate::value::{Value, ValueType};
550 use ankurah_proto as proto;
551 use std::collections::HashMap;
552
553 #[derive(Debug, Clone)]
554 struct TestEntity {
555 id: proto::EntityId,
556 collection: proto::CollectionId,
557 properties: HashMap<String, Value>,
558 }
559
560 impl TestEntity {
561 fn new(id: u8, properties: HashMap<String, Value>) -> Self {
562 let mut id_bytes = [0u8; 16];
563 id_bytes[15] = id;
564 Self { id: proto::EntityId::from_bytes(id_bytes), collection: proto::CollectionId::fixed_name("test"), properties }
565 }
566 }
567
568 impl AbstractEntity for TestEntity {
569 fn collection(&self) -> proto::CollectionId { self.collection.clone() }
570
571 fn id(&self) -> &proto::EntityId { &self.id }
572
573 fn value(&self, field: &str) -> Option<Value> {
574 if field == "id" {
575 Some(Value::EntityId(self.id.clone()))
576 } else {
577 self.properties.get(field).cloned()
578 }
579 }
580 }
581
582 #[test]
583 fn test_entity_id_ordering() {
584 let resultset = EntityResultSet::empty();
585 let mut write = resultset.write();
586
587 let entity1 = TestEntity::new(1, HashMap::new());
589 let entity2 = TestEntity::new(2, HashMap::new());
590 let entity3 = TestEntity::new(3, HashMap::new());
591
592 write.add(entity3.clone());
594 write.add(entity1.clone());
595 write.add(entity2.clone());
596
597 drop(write);
598
599 let read_guard = resultset.read();
601 let entities: Vec<_> = read_guard.iter_entities().collect();
602 assert_eq!(entities.len(), 3);
603 assert_eq!(entities[0].0, entity1.id);
604 assert_eq!(entities[1].0, entity2.id);
605 assert_eq!(entities[2].0, entity3.id);
606 }
607
608 #[test]
609 fn test_order_by_with_tie_breaking() {
610 let resultset = EntityResultSet::empty();
611
612 let mut props1 = HashMap::new();
614 props1.insert("name".to_string(), Value::String("Alice".to_string()));
615 let entity1 = TestEntity::new(1, props1);
616
617 let mut props2 = HashMap::new();
618 props2.insert("name".to_string(), Value::String("Alice".to_string()));
619 let entity2 = TestEntity::new(2, props2);
620
621 let mut props3 = HashMap::new();
622 props3.insert("name".to_string(), Value::String("Bob".to_string()));
623 let entity3 = TestEntity::new(3, props3);
624
625 let key_spec = KeySpec {
627 keyparts: vec![IndexKeyPart {
628 column: "name".to_string(),
629 direction: IndexDirection::Asc,
630 nulls: Some(NullsOrder::Last),
631 collation: None,
632 value_type: ValueType::String,
633 }],
634 };
635 resultset.order_by(Some(key_spec));
636
637 let mut write = resultset.write();
638 write.add(entity2.clone());
639 write.add(entity3.clone());
640 write.add(entity1.clone());
641 drop(write);
642
643 let read_guard = resultset.read();
645 let entities: Vec<_> = read_guard.iter_entities().collect();
646 assert_eq!(entities.len(), 3);
647 assert_eq!(entities[0].0, entity1.id); assert_eq!(entities[1].0, entity2.id); assert_eq!(entities[2].0, entity3.id); }
652
653 #[test]
654 fn test_limit_functionality() {
655 let resultset = EntityResultSet::empty();
656
657 let mut write = resultset.write();
659 for i in 0..5u8 {
660 let mut props = HashMap::new();
661 props.insert("value".to_string(), Value::I32(i as i32));
662 let entity = TestEntity::new(i, props);
663 write.add(entity);
664 }
665 drop(write);
666
667 assert_eq!(resultset.len(), 5);
668
669 resultset.limit(Some(3));
671 assert_eq!(resultset.len(), 3);
672
673 resultset.limit(None);
675 assert_eq!(resultset.len(), 3); }
677
678 #[test]
679 fn test_dirty_tracking() {
680 let resultset = EntityResultSet::empty();
681
682 let mut props = HashMap::new();
683 props.insert("active".to_string(), Value::Bool(true));
684 let entity1 = TestEntity::new(1, props);
685
686 let mut props = HashMap::new();
687 props.insert("active".to_string(), Value::Bool(false));
688 let entity2 = TestEntity::new(2, props);
689
690 let mut write = resultset.write();
691 write.add(entity1.clone());
692 write.add(entity2.clone());
693
694 write.mark_all_dirty();
696
697 let removed = write.retain_dirty(|entity| entity.value("active") == Some(Value::Bool(true)));
699
700 drop(write);
701
702 assert_eq!(removed.len(), 1);
703 assert_eq!(removed[0], entity2.id);
704 assert_eq!(resultset.len(), 1);
705 assert_eq!(resultset.read().iter_entities().next().unwrap().0, entity1.id);
706 }
707
708 #[test]
709 fn test_write_guard_atomic_operations() {
710 let resultset = EntityResultSet::empty();
711
712 {
714 let mut write = resultset.write();
715 let entity1 = TestEntity::new(1, HashMap::new());
716 let entity2 = TestEntity::new(2, HashMap::new());
717
718 write.add(entity1);
719 write.add(entity2);
720
721 assert_eq!(write.iter_entities().count(), 2);
723 }
725
726 assert_eq!(resultset.len(), 2);
728 }
729
730 #[test]
731 fn test_ivec_small_keys() {
732 let small_key = IVec::from_slice(b"hello");
734 let another_small = IVec::from_slice(b"world");
735 let empty_key = IVec::from_slice(b"");
736
737 assert!(small_key < another_small); assert!(empty_key < small_key); let key_ab = IVec::from_slice(b"ab");
743 let key_abc = IVec::from_slice(b"abc");
744 assert!(key_ab < key_abc); }
746
747 #[test]
748 fn test_ivec_large_keys() {
749 let large_key = IVec::from_slice(&[1u8; 20]); let small_key = IVec::from_slice(&[1u8; 10]); assert!(small_key < large_key); }
756
757 #[test]
758 fn test_ivec_boundary() {
759 let exactly_16 = IVec::from_slice(&[1u8; 16]);
761 let exactly_17 = IVec::from_slice(&[1u8; 17]);
762
763 assert!(exactly_16 < exactly_17);
765
766 match exactly_16 {
768 IVec::Small(_) => (), IVec::Large(_) => panic!("16-byte key should use Small variant"),
770 }
771
772 match exactly_17 {
773 IVec::Large(_) => (), IVec::Small(_) => panic!("17-byte key should use Large variant"),
775 }
776 }
777}
778
779fn fix_from<E: AbstractEntity>(st: &mut State<E>, start: usize) {
780 for i in start..st.order.len() {
782 let id = *st.order[i].entity.id();
783 st.index.insert(id, i);
784 }
785}
786
787impl<E: View> ResultSet<E> {
788 pub fn iter(&self) -> ResultSetIter<E> { ResultSetIter::new(self.clone()) }
789}
790
791impl<E: View> Clone for ResultSet<E> {
792 fn clone(&self) -> Self { Self(self.0.clone(), std::marker::PhantomData) }
793}
794
795impl<E: View> Default for ResultSet<E> {
796 fn default() -> Self {
797 let entity_resultset = EntityResultSet::empty();
798 Self(entity_resultset, std::marker::PhantomData)
799 }
800}
801
802impl<E: AbstractEntity> Signal for EntityResultSet<E> {
803 fn listen(&self, listener: Listener) -> ListenerGuard { ListenerGuard::new(self.0.broadcast.reference().listen(listener)) }
804 fn broadcast_id(&self) -> BroadcastId { self.0.broadcast.id() }
805}
806
807impl<R: View> Signal for ResultSet<R> {
808 fn listen(&self, listener: Listener) -> ListenerGuard { ListenerGuard::new(self.0 .0.broadcast.reference().listen(listener)) }
809
810 fn broadcast_id(&self) -> BroadcastId { self.0 .0.broadcast.id() }
811}
812
813impl<E: View + Clone + 'static> Get<Vec<E>> for ResultSet<E> {
814 fn get(&self) -> Vec<E> {
815 use ankurah_signals::CurrentObserver;
816 CurrentObserver::track(self);
817 self.0 .0.state.lock().unwrap().order.iter().map(|e| E::from_entity(e.entity.clone())).collect()
818 }
819}
820
821impl<E: View + Clone + 'static> Peek<Vec<E>> for ResultSet<E> {
822 fn peek(&self) -> Vec<E> { self.0 .0.state.lock().unwrap().order.iter().map(|e| E::from_entity(e.entity.clone())).collect() }
823}
824
825impl<E: View + Clone + 'static> Subscribe<Vec<E>> for ResultSet<E> {
826 fn subscribe<F>(&self, listener: F) -> SubscriptionGuard
827 where F: IntoSubscribeListener<Vec<E>> {
828 let listener = listener.into_subscribe_listener();
829 let me = self.clone();
830 let guard: ankurah_signals::broadcast::ListenerGuard<()> = self.0 .0.broadcast.reference().listen(move |_| {
831 let entities: Vec<E> = me.0 .0.state.lock().unwrap().order.iter().map(|e| E::from_entity(e.entity.clone())).collect();
832 listener(entities);
833 });
834 SubscriptionGuard::new(ListenerGuard::new(guard))
835 }
836}
837
838#[derive(Debug)]
839pub struct ResultSetIter<E: View> {
840 resultset: ResultSet<E>,
841 index: usize,
842}
843
844impl<E: View> ResultSetIter<E> {
845 fn new(resultset: ResultSet<E>) -> Self { Self { resultset, index: 0 } }
846}
847
848impl<E: View + Clone> Iterator for ResultSetIter<E> {
849 type Item = E;
850
851 fn next(&mut self) -> Option<Self::Item> {
852 use ankurah_signals::CurrentObserver;
854 CurrentObserver::track(&self.resultset);
855
856 let state = self.resultset.0 .0.state.lock().unwrap();
857 if self.index < state.order.len() {
858 let entity = &state.order[self.index].entity;
859 let view = E::from_entity(entity.clone());
860 self.index += 1;
861 Some(view)
862 } else {
863 None
864 }
865 }
866}
867
868impl<E: View + Clone> IntoIterator for ResultSet<E> {
869 type Item = E;
870 type IntoIter = ResultSetIter<E>;
871
872 fn into_iter(self) -> Self::IntoIter { ResultSetIter::new(self) }
873}
874
875impl<E: View + Clone> IntoIterator for &ResultSet<E> {
876 type Item = E;
877 type IntoIter = ResultSetIter<E>;
878
879 fn into_iter(self) -> Self::IntoIter { ResultSetIter::new(self.clone()) }
880}
881
882#[derive(Debug)]
883pub struct EntityResultSetKeyIterator {
884 keys: Vec<proto::EntityId>,
885 index: usize,
886}
887
888impl EntityResultSetKeyIterator {
889 fn new(keys: Vec<proto::EntityId>) -> Self { Self { keys, index: 0 } }
890}
891
892impl Iterator for EntityResultSetKeyIterator {
893 type Item = proto::EntityId;
894
895 fn next(&mut self) -> Option<Self::Item> {
896 if self.index < self.keys.len() {
897 let key = self.keys[self.index];
898 self.index += 1;
899 Some(key)
900 } else {
901 None
902 }
903 }
904}
905
906impl EntityResultSet<Entity> {
908 pub fn wrap<R: View>(&self) -> ResultSet<R> { ResultSet(self.clone(), std::marker::PhantomData) }
909}