1use crate::indexing::{encode_tuple_values_with_key_spec, KeySpec};
2use crate::{entity::Entity, model::View, reactor::AbstractEntity};
3use ankurah_proto as proto;
4use ankurah_signals::{
5 broadcast::{Broadcast, BroadcastId},
6 signal::{Listener, ListenerGuard},
7 subscribe::IntoSubscribeListener,
8 CurrentObserver, Get, Peek, Signal, Subscribe, SubscriptionGuard,
9};
10use std::{
11 collections::HashMap,
12 ops::Deref,
13 sync::{
14 atomic::{AtomicBool, Ordering},
15 Arc,
16 },
17};
18
19#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
21enum IVec {
22 Small([u8; 16]),
24 Large(Vec<u8>),
26}
27
28impl IVec {
29 fn from_slice(bytes: &[u8]) -> Self {
31 if bytes.len() <= 16 {
32 let mut data = [0u8; 16];
33 data[..bytes.len()].copy_from_slice(bytes);
34 Self::Small(data)
35 } else {
36 Self::Large(bytes.to_vec())
37 }
38 }
39}
40
41impl From<Vec<u8>> for IVec {
42 fn from(vec: Vec<u8>) -> Self { Self::from_slice(&vec) }
43}
44
45#[derive(Debug, Clone)]
46pub struct EntityResultSet<E: AbstractEntity = Entity>(Arc<Inner<E>>);
47
48#[derive(Debug)]
50pub struct ResultSet<R: View>(EntityResultSet<Entity>, std::marker::PhantomData<R>);
51
52impl<R: View> Deref for ResultSet<R> {
53 type Target = EntityResultSet<Entity>;
54 fn deref(&self) -> &Self::Target { &self.0 }
55}
56
57impl<R: View> ResultSet<R> {
58 pub fn by_id(&self, id: &proto::EntityId) -> Option<R> { self.0.by_id(id).map(|e| R::from_entity(e)) }
59}
60
61#[derive(Debug)]
62struct Inner<E: AbstractEntity> {
63 state: std::sync::Mutex<State<E>>,
65 loaded: AtomicBool,
66 broadcast: Broadcast<()>,
67}
68
69#[derive(Debug)]
70struct State<E: AbstractEntity> {
71 order: Vec<EntityEntry<E>>,
72 index: HashMap<proto::EntityId, usize>,
73 key_spec: Option<KeySpec>,
75 limit: Option<usize>,
76 gap_dirty: bool, }
78
79#[derive(Debug, Clone)]
80struct EntityEntry<E: AbstractEntity> {
81 entity: E,
82 sort_key: Option<IVec>,
83 dirty: bool,
84}
85pub struct ResultSetWrite<'a, E: AbstractEntity = Entity> {
91 resultset: &'a EntityResultSet<E>,
92 changed: bool,
93 guard: Option<std::sync::MutexGuard<'a, State<E>>>,
94}
95
96pub struct ResultSetRead<'a, E: AbstractEntity = Entity> {
99 guard: std::sync::MutexGuard<'a, State<E>>,
100}
101
102impl<'a, E: AbstractEntity> ResultSetWrite<'a, E> {
104 pub fn add(&mut self, entity: E) -> bool {
106 let guard = self.guard.as_mut().expect("write guard already dropped");
107 let id = *entity.id();
108 if guard.index.contains_key(&id) {
109 return false; }
111
112 let sort_key = guard.key_spec.as_ref().map(|key_spec| Self::compute_sort_key(&entity, key_spec));
114
115 let entry = EntityEntry { entity, sort_key, dirty: false };
116
117 let pos = guard
119 .order
120 .binary_search_by(|existing| {
121 match (&existing.sort_key, &entry.sort_key) {
122 (Some(existing_key), Some(entry_key)) => {
123 existing_key.cmp(entry_key).then_with(|| existing.entity.id().cmp(entry.entity.id()))
125 }
126 (Some(_), None) => std::cmp::Ordering::Less, (None, Some(_)) => std::cmp::Ordering::Greater, (None, None) => existing.entity.id().cmp(entry.entity.id()), }
130 })
131 .unwrap_or_else(|pos| pos);
132
133 guard.order.insert(pos, entry);
134 guard.index.insert(id, pos);
135
136 for i in (pos + 1)..guard.order.len() {
138 let entry_id = *guard.order[i].entity.id();
139 guard.index.insert(entry_id, i);
140 }
141
142 if let Some(limit) = guard.limit {
144 if guard.order.len() > limit {
145 if let Some(removed_entry) = guard.order.pop() {
147 let removed_id = *removed_entry.entity.id();
148 guard.index.remove(&removed_id);
149 }
151 }
152 }
153
154 self.changed = true;
155 true
156 }
157
158 pub fn remove(&mut self, id: proto::EntityId) -> bool {
160 let guard = self.guard.as_mut().expect("write guard already dropped");
161 if let Some(idx) = guard.index.remove(&id) {
162 if guard.limit.is_some_and(|limit| guard.order.len() == limit) {
164 guard.gap_dirty = true;
165 }
166
167 guard.order.remove(idx);
168 if idx < guard.order.len() {
169 fix_from(guard, idx);
170 }
171
172 self.changed = true;
173 true
174 } else {
175 false
176 }
177 }
178
179 pub fn contains(&self, id: &proto::EntityId) -> bool {
181 self.guard.as_ref().expect("write guard already dropped").index.contains_key(id)
182 }
183
184 pub fn iter_entities(&self) -> impl Iterator<Item = (proto::EntityId, &E)> {
187 let guard = self.guard.as_ref().expect("write guard already dropped");
188 guard.order.iter().map(|entry| (*entry.entity.id(), &entry.entity))
189 }
190
191 pub fn mark_all_dirty(&mut self) {
193 let guard = self.guard.as_mut().expect("write guard already dropped");
194 for entry in &mut guard.order {
195 entry.dirty = true;
196 }
197 self.changed = true;
198 }
199
200 pub fn retain_dirty<F>(&mut self, mut should_retain: F) -> Vec<proto::EntityId>
202 where F: FnMut(&E) -> bool {
203 let guard = self.guard.as_mut().expect("write guard already dropped");
204 let mut removed_ids = Vec::new();
205 let mut i = 0;
206
207 let was_at_limit = guard.limit.is_some_and(|limit| guard.order.len() == limit);
209
210 while i < guard.order.len() {
211 if guard.order[i].dirty {
212 let should_keep = should_retain(&guard.order[i].entity);
213 if should_keep {
214 let key_spec = guard.key_spec.clone();
216 if let Some(key_spec) = key_spec {
217 guard.order[i].sort_key = Some(Self::compute_sort_key(&guard.order[i].entity, &key_spec));
218 }
219 guard.order[i].dirty = false;
220 i += 1;
221 } else {
222 let removed_entry = guard.order.remove(i);
224 let removed_id = *removed_entry.entity.id();
225 guard.index.remove(&removed_id);
226 removed_ids.push(removed_id);
227 }
229 } else {
230 i += 1;
231 }
232 }
233
234 guard.index.clear();
236 let index_updates: Vec<_> = guard.order.iter().enumerate().map(|(i, entry)| (*entry.entity.id(), i)).collect();
237 for (id, i) in index_updates {
238 guard.index.insert(id, i);
239 }
240
241 if !removed_ids.is_empty() {
242 self.changed = true;
243
244 if (!guard.gap_dirty) && was_at_limit && guard.limit.is_some_and(|limit| guard.order.len() < limit) {
246 guard.gap_dirty = true;
247 }
248 }
249
250 removed_ids
251 }
252
253 pub fn replace_all(&mut self, entities: Vec<E>) {
255 let guard = self.guard.as_mut().expect("write guard already dropped");
256
257 guard.order.clear();
259 guard.index.clear();
260
261 for entity in entities {
263 let sort_key = guard.key_spec.as_ref().map(|key_spec| Self::compute_sort_key(&entity, key_spec));
265
266 let entry = EntityEntry { entity, sort_key, dirty: false };
267 guard.order.push(entry);
268 }
269
270 if guard.key_spec.is_some() {
272 guard.order.sort_by(|a, b| {
273 match (&a.sort_key, &b.sort_key) {
274 (Some(key_a), Some(key_b)) => {
275 key_a.cmp(key_b).then_with(|| a.entity.id().cmp(b.entity.id()))
277 }
278 (Some(_), None) => std::cmp::Ordering::Less,
279 (None, Some(_)) => std::cmp::Ordering::Greater,
280 (None, None) => a.entity.id().cmp(b.entity.id()),
281 }
282 });
283 } else {
284 guard.order.sort_by(|a, b| a.entity.id().cmp(b.entity.id()));
286 }
287
288 if let Some(limit) = guard.limit {
290 if guard.order.len() > limit {
291 guard.order.truncate(limit);
292 }
293 }
294
295 let index_updates: Vec<_> = guard.order.iter().enumerate().map(|(i, entry)| (*entry.entity.id(), i)).collect();
297 for (id, i) in index_updates {
298 guard.index.insert(id, i);
299 }
300
301 self.changed = true;
302 }
303
304 fn compute_sort_key(entity: &E, key_spec: &KeySpec) -> IVec {
306 let mut values = Vec::new();
307
308 for keypart in &key_spec.keyparts {
310 let value = AbstractEntity::value(entity, &keypart.column);
311 if let Some(v) = value {
314 values.push(v);
315 } else {
316 return IVec::from_slice(&[]); }
319 }
320
321 let encoded = encode_tuple_values_with_key_spec(&values, key_spec).unwrap_or_default();
323 IVec::from(encoded)
324 }
325
326 pub fn set_loaded(&mut self, loaded: bool) {
330 self.resultset.0.loaded.store(loaded, std::sync::atomic::Ordering::Relaxed);
331 self.changed = true; }
333}
334
335impl<'a, E: AbstractEntity> Drop for ResultSetWrite<'a, E> {
336 fn drop(&mut self) {
337 if self.changed {
338 drop(self.guard.take());
340 self.resultset.0.broadcast.send(());
341 }
342 }
343}
344
345impl<'a, E: AbstractEntity> ResultSetRead<'a, E> {
346 pub fn contains(&self, id: &proto::EntityId) -> bool { self.guard.index.contains_key(id) }
348
349 pub fn iter_entities(&self) -> impl Iterator<Item = (proto::EntityId, &E)> {
352 self.guard.order.iter().map(|entity| (*entity.entity.id(), &entity.entity))
353 }
354
355 pub fn len(&self) -> usize { self.guard.order.len() }
357
358 pub fn is_empty(&self) -> bool { self.guard.order.is_empty() }
360}
361
362impl<E: AbstractEntity> EntityResultSet<E> {
363 pub fn from_vec(entities: Vec<E>, loaded: bool) -> Self {
364 let mut index = HashMap::new();
365 let mut order = Vec::new();
366
367 for (i, entity) in entities.into_iter().enumerate() {
368 index.insert(*entity.id(), i);
369 order.push(EntityEntry { entity, sort_key: None, dirty: false });
370 }
371
372 let state = State { order, index, key_spec: None, limit: None, gap_dirty: false };
373 Self(Arc::new(Inner { state: std::sync::Mutex::new(state), loaded: AtomicBool::new(loaded), broadcast: Broadcast::new() }))
374 }
375 pub fn empty() -> Self {
376 let state = State { order: Vec::new(), index: HashMap::new(), key_spec: None, limit: None, gap_dirty: false };
377 Self(Arc::new(Inner { state: std::sync::Mutex::new(state), loaded: AtomicBool::new(false), broadcast: Broadcast::new() }))
378 }
379 pub fn single(entity: E) -> Self {
380 let entry = EntityEntry { entity: entity.clone(), sort_key: None, dirty: false };
381 let mut state = State { order: vec![entry], index: HashMap::new(), key_spec: None, limit: None, gap_dirty: false };
382 state.index.insert(*entity.id(), 0);
383 Self(Arc::new(Inner { state: std::sync::Mutex::new(state), loaded: AtomicBool::new(false), broadcast: Broadcast::new() }))
384 }
385
386 pub fn write(&self) -> ResultSetWrite<'_, E> {
390 let guard = self.0.state.lock().unwrap();
391 ResultSetWrite { resultset: self, changed: false, guard: Some(guard) }
392 }
393
394 pub fn read(&self) -> ResultSetRead<'_, E> {
396 let guard = self.0.state.lock().unwrap();
397 ResultSetRead { guard }
398 }
399 pub fn set_loaded(&self, loaded: bool) {
400 self.0.loaded.store(loaded, Ordering::Relaxed);
401 self.0.broadcast.send(());
402 }
403 pub fn is_loaded(&self) -> bool {
404 CurrentObserver::track(&self);
405 self.0.loaded.load(Ordering::Relaxed)
406 }
407
408 pub fn clear(&self) {
409 let mut st = self.0.state.lock().unwrap();
410 st.order.clear();
411 st.index.clear();
412 drop(st);
413 self.0.broadcast.send(());
414 }
415
416 pub fn keys(&self) -> EntityResultSetKeyIterator {
418 CurrentObserver::track(&self);
420 let st = self.0.state.lock().unwrap();
421 let keys: Vec<proto::EntityId> = st.order.iter().map(|e| *e.entity.id()).collect();
422 EntityResultSetKeyIterator::new(keys)
423 }
424
425 pub fn contains_key(&self, id: &proto::EntityId) -> bool {
427 CurrentObserver::track(&self);
429 let st = self.0.state.lock().unwrap();
430 st.index.contains_key(id)
431 }
432
433 pub fn by_id(&self, id: &proto::EntityId) -> Option<E> {
434 CurrentObserver::track(self);
436 let st = self.0.state.lock().unwrap();
437 st.index.get(id).map(|&i| st.order[i].entity.clone())
438 }
439
440 pub fn len(&self) -> usize {
441 CurrentObserver::track(&self);
442 let st = self.0.state.lock().unwrap();
443 st.order.len()
444 }
445
446 pub(crate) fn is_gap_dirty(&self) -> bool {
448 let st = self.0.state.lock().unwrap();
449 st.gap_dirty
450 }
451
452 pub(crate) fn clear_gap_dirty(&self) {
454 let mut st = self.0.state.lock().unwrap();
455 st.gap_dirty = false;
456 }
457
458 pub fn get_limit(&self) -> Option<usize> {
460 let st = self.0.state.lock().unwrap();
461 st.limit
462 }
463
464 pub(crate) fn last_entity(&self) -> Option<E> {
466 let st = self.0.state.lock().unwrap();
467 st.order.last().map(|entry| entry.entity.clone())
468 }
469
470 pub(crate) fn order_by(&self, key_spec: Option<KeySpec>) {
472 let mut st = self.0.state.lock().unwrap();
473
474 if st.key_spec == key_spec {
476 return; }
478
479 st.key_spec = key_spec.clone();
480
481 for entry in &mut st.order {
483 entry.sort_key = if let Some(ref ks) = key_spec {
484 Some(ResultSetWrite::compute_sort_key(&entry.entity, ks))
485 } else {
486 None };
488 }
489
490 st.order.sort_by(|a, b| {
492 match (&a.sort_key, &b.sort_key) {
493 (Some(key_a), Some(key_b)) => {
494 match key_a.cmp(key_b) {
496 std::cmp::Ordering::Equal => a.entity.id().cmp(b.entity.id()), other => other,
498 }
499 }
500 (Some(_), None) => std::cmp::Ordering::Greater,
501 (None, Some(_)) => std::cmp::Ordering::Less,
502 (None, None) => a.entity.id().cmp(b.entity.id()),
503 }
504 });
505
506 st.index.clear();
508 let index_updates: Vec<_> = st.order.iter().enumerate().map(|(i, entry)| (*entry.entity.id(), i)).collect();
509 for (id, i) in index_updates {
510 st.index.insert(id, i);
511 }
512
513 drop(st);
514 self.0.broadcast.send(());
515 }
516
517 pub(crate) fn limit(&self, limit: Option<usize>) {
519 let mut st = self.0.state.lock().unwrap();
520
521 if st.limit == limit {
523 return; }
525
526 st.limit = limit;
527
528 let mut entities_removed = false;
530 if let Some(limit) = limit {
531 if st.order.len() > limit {
532 st.order.truncate(limit);
533 entities_removed = true;
534
535 st.index.clear();
537 let index_updates: Vec<_> = st.order.iter().enumerate().map(|(i, entry)| (*entry.entity.id(), i)).collect();
538 for (id, i) in index_updates {
539 st.index.insert(id, i);
540 }
541 }
542 }
543
544 drop(st);
545
546 if entities_removed {
548 self.0.broadcast.send(());
549 }
550 }
551}
552
553#[cfg(test)]
554mod tests {
555 use super::*;
556 use crate::indexing::{IndexDirection, IndexKeyPart, KeySpec, NullsOrder};
557 use crate::value::{Value, ValueType};
558 use ankurah_proto as proto;
559 use std::collections::HashMap;
560
561 #[derive(Debug, Clone)]
562 struct TestEntity {
563 id: proto::EntityId,
564 collection: proto::CollectionId,
565 properties: HashMap<String, Value>,
566 }
567
568 impl TestEntity {
569 fn new(id: u8, properties: HashMap<String, Value>) -> Self {
570 let mut id_bytes = [0u8; 16];
571 id_bytes[15] = id;
572 Self { id: proto::EntityId::from_bytes(id_bytes), collection: proto::CollectionId::fixed_name("test"), properties }
573 }
574 }
575
576 impl AbstractEntity for TestEntity {
577 fn collection(&self) -> proto::CollectionId { self.collection.clone() }
578
579 fn id(&self) -> &proto::EntityId { &self.id }
580
581 fn value(&self, field: &str) -> Option<Value> {
582 if field == "id" {
583 Some(Value::EntityId(self.id.clone()))
584 } else {
585 self.properties.get(field).cloned()
586 }
587 }
588 }
589
590 #[test]
591 fn test_entity_id_ordering() {
592 let resultset = EntityResultSet::empty();
593 let mut write = resultset.write();
594
595 let entity1 = TestEntity::new(1, HashMap::new());
597 let entity2 = TestEntity::new(2, HashMap::new());
598 let entity3 = TestEntity::new(3, HashMap::new());
599
600 write.add(entity3.clone());
602 write.add(entity1.clone());
603 write.add(entity2.clone());
604
605 drop(write);
606
607 let read_guard = resultset.read();
609 let entities: Vec<_> = read_guard.iter_entities().collect();
610 assert_eq!(entities.len(), 3);
611 assert_eq!(entities[0].0, entity1.id);
612 assert_eq!(entities[1].0, entity2.id);
613 assert_eq!(entities[2].0, entity3.id);
614 }
615
616 #[test]
617 fn test_order_by_with_tie_breaking() {
618 let resultset = EntityResultSet::empty();
619
620 let mut props1 = HashMap::new();
622 props1.insert("name".to_string(), Value::String("Alice".to_string()));
623 let entity1 = TestEntity::new(1, props1);
624
625 let mut props2 = HashMap::new();
626 props2.insert("name".to_string(), Value::String("Alice".to_string()));
627 let entity2 = TestEntity::new(2, props2);
628
629 let mut props3 = HashMap::new();
630 props3.insert("name".to_string(), Value::String("Bob".to_string()));
631 let entity3 = TestEntity::new(3, props3);
632
633 let key_spec = KeySpec {
635 keyparts: vec![IndexKeyPart {
636 column: "name".to_string(),
637 sub_path: None,
638 direction: IndexDirection::Asc,
639 nulls: Some(NullsOrder::Last),
640 collation: None,
641 value_type: ValueType::String,
642 }],
643 };
644 resultset.order_by(Some(key_spec));
645
646 let mut write = resultset.write();
647 write.add(entity2.clone());
648 write.add(entity3.clone());
649 write.add(entity1.clone());
650 drop(write);
651
652 let read_guard = resultset.read();
654 let entities: Vec<_> = read_guard.iter_entities().collect();
655 assert_eq!(entities.len(), 3);
656 assert_eq!(entities[0].0, entity1.id); assert_eq!(entities[1].0, entity2.id); assert_eq!(entities[2].0, entity3.id); }
661
662 #[test]
663 fn test_limit_functionality() {
664 let resultset = EntityResultSet::empty();
665
666 let mut write = resultset.write();
668 for i in 0..5u8 {
669 let mut props = HashMap::new();
670 props.insert("value".to_string(), Value::I32(i as i32));
671 let entity = TestEntity::new(i, props);
672 write.add(entity);
673 }
674 drop(write);
675
676 assert_eq!(resultset.len(), 5);
677
678 resultset.limit(Some(3));
680 assert_eq!(resultset.len(), 3);
681
682 resultset.limit(None);
684 assert_eq!(resultset.len(), 3); }
686
687 #[test]
688 fn test_dirty_tracking() {
689 let resultset = EntityResultSet::empty();
690
691 let mut props = HashMap::new();
692 props.insert("active".to_string(), Value::Bool(true));
693 let entity1 = TestEntity::new(1, props);
694
695 let mut props = HashMap::new();
696 props.insert("active".to_string(), Value::Bool(false));
697 let entity2 = TestEntity::new(2, props);
698
699 let mut write = resultset.write();
700 write.add(entity1.clone());
701 write.add(entity2.clone());
702
703 write.mark_all_dirty();
705
706 let removed = write.retain_dirty(|entity| entity.value("active") == Some(Value::Bool(true)));
708
709 drop(write);
710
711 assert_eq!(removed.len(), 1);
712 assert_eq!(removed[0], entity2.id);
713 assert_eq!(resultset.len(), 1);
714 assert_eq!(resultset.read().iter_entities().next().unwrap().0, entity1.id);
715 }
716
717 #[test]
718 fn test_write_guard_atomic_operations() {
719 let resultset = EntityResultSet::empty();
720
721 {
723 let mut write = resultset.write();
724 let entity1 = TestEntity::new(1, HashMap::new());
725 let entity2 = TestEntity::new(2, HashMap::new());
726
727 write.add(entity1);
728 write.add(entity2);
729
730 assert_eq!(write.iter_entities().count(), 2);
732 }
734
735 assert_eq!(resultset.len(), 2);
737 }
738
739 #[test]
740 fn test_ivec_small_keys() {
741 let small_key = IVec::from_slice(b"hello");
743 let another_small = IVec::from_slice(b"world");
744 let empty_key = IVec::from_slice(b"");
745
746 assert!(small_key < another_small); assert!(empty_key < small_key); let key_ab = IVec::from_slice(b"ab");
752 let key_abc = IVec::from_slice(b"abc");
753 assert!(key_ab < key_abc); }
755
756 #[test]
757 fn test_ivec_large_keys() {
758 let large_key = IVec::from_slice(&[1u8; 20]); let small_key = IVec::from_slice(&[1u8; 10]); assert!(small_key < large_key); }
765
766 #[test]
767 fn test_ivec_boundary() {
768 let exactly_16 = IVec::from_slice(&[1u8; 16]);
770 let exactly_17 = IVec::from_slice(&[1u8; 17]);
771
772 assert!(exactly_16 < exactly_17);
774
775 match exactly_16 {
777 IVec::Small(_) => (), IVec::Large(_) => panic!("16-byte key should use Small variant"),
779 }
780
781 match exactly_17 {
782 IVec::Large(_) => (), IVec::Small(_) => panic!("17-byte key should use Large variant"),
784 }
785 }
786}
787
788fn fix_from<E: AbstractEntity>(st: &mut State<E>, start: usize) {
789 for i in start..st.order.len() {
791 let id = *st.order[i].entity.id();
792 st.index.insert(id, i);
793 }
794}
795
796impl<E: View> ResultSet<E> {
797 pub fn iter(&self) -> ResultSetIter<E> { ResultSetIter::new(self.clone()) }
798}
799
800impl<E: View> Clone for ResultSet<E> {
801 fn clone(&self) -> Self { Self(self.0.clone(), std::marker::PhantomData) }
802}
803
804impl<E: View> Default for ResultSet<E> {
805 fn default() -> Self {
806 let entity_resultset = EntityResultSet::empty();
807 Self(entity_resultset, std::marker::PhantomData)
808 }
809}
810
811impl<E: AbstractEntity> Signal for EntityResultSet<E> {
812 fn listen(&self, listener: Listener) -> ListenerGuard { ListenerGuard::new(self.0.broadcast.reference().listen(listener)) }
813 fn broadcast_id(&self) -> BroadcastId { self.0.broadcast.id() }
814}
815
816impl<R: View> Signal for ResultSet<R> {
817 fn listen(&self, listener: Listener) -> ListenerGuard { ListenerGuard::new(self.0 .0.broadcast.reference().listen(listener)) }
818
819 fn broadcast_id(&self) -> BroadcastId { self.0 .0.broadcast.id() }
820}
821
822impl<E: View + Clone + 'static> Get<Vec<E>> for ResultSet<E> {
823 fn get(&self) -> Vec<E> {
824 use ankurah_signals::CurrentObserver;
825 CurrentObserver::track(self);
826 self.0 .0.state.lock().unwrap().order.iter().map(|e| E::from_entity(e.entity.clone())).collect()
827 }
828}
829
830impl<E: View + Clone + 'static> Peek<Vec<E>> for ResultSet<E> {
831 fn peek(&self) -> Vec<E> { self.0 .0.state.lock().unwrap().order.iter().map(|e| E::from_entity(e.entity.clone())).collect() }
832}
833
834impl<E: View + Clone + 'static> Subscribe<Vec<E>> for ResultSet<E> {
835 fn subscribe<F>(&self, listener: F) -> SubscriptionGuard
836 where F: IntoSubscribeListener<Vec<E>> {
837 let listener = listener.into_subscribe_listener();
838 let me = self.clone();
839 let guard: ankurah_signals::broadcast::ListenerGuard<()> = self.0 .0.broadcast.reference().listen(move |_| {
840 let entities: Vec<E> = me.0 .0.state.lock().unwrap().order.iter().map(|e| E::from_entity(e.entity.clone())).collect();
841 listener(entities);
842 });
843 SubscriptionGuard::new(ListenerGuard::new(guard))
844 }
845}
846
847#[derive(Debug)]
848pub struct ResultSetIter<E: View> {
849 resultset: ResultSet<E>,
850 index: usize,
851}
852
853impl<E: View> ResultSetIter<E> {
854 fn new(resultset: ResultSet<E>) -> Self { Self { resultset, index: 0 } }
855}
856
857impl<E: View + Clone> Iterator for ResultSetIter<E> {
858 type Item = E;
859
860 fn next(&mut self) -> Option<Self::Item> {
861 use ankurah_signals::CurrentObserver;
863 CurrentObserver::track(&self.resultset);
864
865 let state = self.resultset.0 .0.state.lock().unwrap();
866 if self.index < state.order.len() {
867 let entity = &state.order[self.index].entity;
868 let view = E::from_entity(entity.clone());
869 self.index += 1;
870 Some(view)
871 } else {
872 None
873 }
874 }
875}
876
877impl<E: View + Clone> IntoIterator for ResultSet<E> {
878 type Item = E;
879 type IntoIter = ResultSetIter<E>;
880
881 fn into_iter(self) -> Self::IntoIter { ResultSetIter::new(self) }
882}
883
884impl<E: View + Clone> IntoIterator for &ResultSet<E> {
885 type Item = E;
886 type IntoIter = ResultSetIter<E>;
887
888 fn into_iter(self) -> Self::IntoIter { ResultSetIter::new(self.clone()) }
889}
890
891#[derive(Debug)]
892pub struct EntityResultSetKeyIterator {
893 keys: Vec<proto::EntityId>,
894 index: usize,
895}
896
897impl EntityResultSetKeyIterator {
898 fn new(keys: Vec<proto::EntityId>) -> Self { Self { keys, index: 0 } }
899}
900
901impl Iterator for EntityResultSetKeyIterator {
902 type Item = proto::EntityId;
903
904 fn next(&mut self) -> Option<Self::Item> {
905 if self.index < self.keys.len() {
906 let key = self.keys[self.index];
907 self.index += 1;
908 Some(key)
909 } else {
910 None
911 }
912 }
913}
914
915impl EntityResultSet<Entity> {
917 pub fn wrap<R: View>(&self) -> ResultSet<R> { ResultSet(self.clone(), std::marker::PhantomData) }
918}