ankurah_core/
resultset.rs

1use crate::indexing::{encode_tuple_values_with_key_spec, KeySpec};
2use crate::{entity::Entity, model::View, reactor::AbstractEntity};
3use ankurah_proto as proto;
4use ankurah_signals::{
5    broadcast::{Broadcast, BroadcastId},
6    signal::{Listener, ListenerGuard},
7    subscribe::IntoSubscribeListener,
8    CurrentObserver, Get, Peek, Signal, Subscribe, SubscriptionGuard,
9};
10use std::{
11    collections::HashMap,
12    ops::Deref,
13    sync::{
14        atomic::{AtomicBool, Ordering},
15        Arc,
16    },
17};
18
19/// Efficient storage for sort keys - uses fixed array for small keys, Vec for larger ones
20#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
21enum IVec {
22    /// Keys <= 16 bytes stored in zero-padded fixed array
23    Small([u8; 16]),
24    /// Keys > 16 bytes stored in Vec
25    Large(Vec<u8>),
26}
27
28impl IVec {
29    /// Create from byte slice
30    fn from_slice(bytes: &[u8]) -> Self {
31        if bytes.len() <= 16 {
32            let mut data = [0u8; 16];
33            data[..bytes.len()].copy_from_slice(bytes);
34            Self::Small(data)
35        } else {
36            Self::Large(bytes.to_vec())
37        }
38    }
39}
40
41impl From<Vec<u8>> for IVec {
42    fn from(vec: Vec<u8>) -> Self { Self::from_slice(&vec) }
43}
44
45#[derive(Debug, Clone)]
46pub struct EntityResultSet<E: AbstractEntity = Entity>(Arc<Inner<E>>);
47
48/// View-typed ResultSet
49#[derive(Debug)]
50pub struct ResultSet<R: View>(EntityResultSet<Entity>, std::marker::PhantomData<R>);
51
52impl<R: View> Deref for ResultSet<R> {
53    type Target = EntityResultSet<Entity>;
54    fn deref(&self) -> &Self::Target { &self.0 }
55}
56
57impl<R: View> ResultSet<R> {
58    pub fn by_id(&self, id: &proto::EntityId) -> Option<R> { self.0.by_id(id).map(|e| R::from_entity(e)) }
59}
60
61#[derive(Debug)]
62struct Inner<E: AbstractEntity> {
63    // Order preserving set of entities
64    state: std::sync::Mutex<State<E>>,
65    loaded: AtomicBool,
66    broadcast: Broadcast<()>,
67}
68
69#[derive(Debug)]
70struct State<E: AbstractEntity> {
71    order: Vec<EntityEntry<E>>,
72    index: HashMap<proto::EntityId, usize>,
73    // Ordering configuration
74    key_spec: Option<KeySpec>,
75    limit: Option<usize>,
76    gap_dirty: bool, // Set when we remove entities and go from =LIMIT to < LIMIT
77}
78
79#[derive(Debug, Clone)]
80struct EntityEntry<E: AbstractEntity> {
81    entity: E,
82    sort_key: Option<IVec>,
83    dirty: bool,
84}
85// TODO - figure out how to maintain ordering of entities
86
87/// A write guard for making atomic changes to a ResultSet
88/// Holds the mutex guard to ensure all changes happen atomically
89/// Sends a single notification when dropped (if any changes were made)
90pub struct ResultSetWrite<'a, E: AbstractEntity = Entity> {
91    resultset: &'a EntityResultSet<E>,
92    changed: bool,
93    guard: Option<std::sync::MutexGuard<'a, State<E>>>,
94}
95
96/// A read guard for read-only access to a ResultSet
97/// Holds the mutex guard to ensure consistent reads
98pub struct ResultSetRead<'a, E: AbstractEntity = Entity> {
99    guard: std::sync::MutexGuard<'a, State<E>>,
100}
101
102// TODO - build unit tests for this
103impl<'a, E: AbstractEntity> ResultSetWrite<'a, E> {
104    /// Add an entity to the result set
105    pub fn add(&mut self, entity: E) -> bool {
106        let guard = self.guard.as_mut().expect("write guard already dropped");
107        let id = *entity.id();
108        if guard.index.contains_key(&id) {
109            return false; // Already present
110        }
111
112        // Compute sort key if ordering is configured
113        let sort_key = guard.key_spec.as_ref().map(|key_spec| Self::compute_sort_key(&entity, key_spec));
114
115        let entry = EntityEntry { entity, sort_key, dirty: false };
116
117        // Insert in correct position (always sort by entity ID, with optional key spec first)
118        let pos = guard
119            .order
120            .binary_search_by(|existing| {
121                match (&existing.sort_key, &entry.sort_key) {
122                    (Some(existing_key), Some(entry_key)) => {
123                        // Both have sort keys - compare keys first, then entity ID for tie-breaking
124                        existing_key.cmp(entry_key).then_with(|| existing.entity.id().cmp(entry.entity.id()))
125                    }
126                    (Some(_), None) => std::cmp::Ordering::Less, // Keyed entries sort before unkeyed
127                    (None, Some(_)) => std::cmp::Ordering::Greater, // Unkeyed entries sort after keyed
128                    (None, None) => existing.entity.id().cmp(entry.entity.id()), // Both unkeyed - sort by entity ID
129                }
130            })
131            .unwrap_or_else(|pos| pos);
132
133        guard.order.insert(pos, entry);
134        guard.index.insert(id, pos);
135
136        // Fix indices for all entries after the insertion point
137        for i in (pos + 1)..guard.order.len() {
138            let entry_id = *guard.order[i].entity.id();
139            guard.index.insert(entry_id, i);
140        }
141
142        // Apply limit if configured
143        if let Some(limit) = guard.limit {
144            if guard.order.len() > limit {
145                // Remove the last entry (beyond limit)
146                if let Some(removed_entry) = guard.order.pop() {
147                    let removed_id = *removed_entry.entity.id();
148                    guard.index.remove(&removed_id);
149                    // TODO: Return the evicted entity ID for the caller to handle
150                }
151            }
152        }
153
154        self.changed = true;
155        true
156    }
157
158    /// Remove an entity from the result set
159    pub fn remove(&mut self, id: proto::EntityId) -> bool {
160        let guard = self.guard.as_mut().expect("write guard already dropped");
161        if let Some(idx) = guard.index.remove(&id) {
162            // Check if we were at limit before removal
163            if guard.limit.is_some_and(|limit| guard.order.len() == limit) {
164                guard.gap_dirty = true;
165            }
166
167            guard.order.remove(idx);
168            if idx < guard.order.len() {
169                fix_from(guard, idx);
170            }
171
172            self.changed = true;
173            true
174        } else {
175            false
176        }
177    }
178
179    /// Check if an entity exists
180    pub fn contains(&self, id: &proto::EntityId) -> bool {
181        self.guard.as_ref().expect("write guard already dropped").index.contains_key(id)
182    }
183
184    /// Iterate over all entities
185    /// Returns an iterator over (entity_id, entity) pairs
186    pub fn iter_entities(&self) -> impl Iterator<Item = (proto::EntityId, &E)> {
187        let guard = self.guard.as_ref().expect("write guard already dropped");
188        guard.order.iter().map(|entry| (*entry.entity.id(), &entry.entity))
189    }
190
191    /// Mark all entities as dirty for re-evaluation
192    pub fn mark_all_dirty(&mut self) {
193        let guard = self.guard.as_mut().expect("write guard already dropped");
194        for entry in &mut guard.order {
195            entry.dirty = true;
196        }
197        self.changed = true;
198    }
199
200    /// Retain only dirty entities that pass the closure, removing those that don't
201    pub fn retain_dirty<F>(&mut self, mut should_retain: F) -> Vec<proto::EntityId>
202    where F: FnMut(&E) -> bool {
203        let guard = self.guard.as_mut().expect("write guard already dropped");
204        let mut removed_ids = Vec::new();
205        let mut i = 0;
206
207        // Check if we were at limit before any removals
208        let was_at_limit = guard.limit.is_some_and(|limit| guard.order.len() == limit);
209
210        while i < guard.order.len() {
211            if guard.order[i].dirty {
212                let should_keep = should_retain(&guard.order[i].entity);
213                if should_keep {
214                    // Entity should be retained - recompute sort key and mark clean
215                    let key_spec = guard.key_spec.clone();
216                    if let Some(key_spec) = key_spec {
217                        guard.order[i].sort_key = Some(Self::compute_sort_key(&guard.order[i].entity, &key_spec));
218                    }
219                    guard.order[i].dirty = false;
220                    i += 1;
221                } else {
222                    // Entity should be removed
223                    let removed_entry = guard.order.remove(i);
224                    let removed_id = *removed_entry.entity.id();
225                    guard.index.remove(&removed_id);
226                    removed_ids.push(removed_id);
227                    // Don't increment i since we removed an element
228                }
229            } else {
230                i += 1;
231            }
232        }
233
234        // Fix indices after removals (no re-sorting needed)
235        guard.index.clear();
236        let index_updates: Vec<_> = guard.order.iter().enumerate().map(|(i, entry)| (*entry.entity.id(), i)).collect();
237        for (id, i) in index_updates {
238            guard.index.insert(id, i);
239        }
240
241        if !removed_ids.is_empty() {
242            self.changed = true;
243
244            // Set gap_dirty if we went from LIMIT to < LIMIT
245            if (!guard.gap_dirty) && was_at_limit && guard.limit.is_some_and(|limit| guard.order.len() < limit) {
246                guard.gap_dirty = true;
247            }
248        }
249
250        removed_ids
251    }
252
253    /// Replace all entities in the result set with proper sorting
254    pub fn replace_all(&mut self, entities: Vec<E>) {
255        let guard = self.guard.as_mut().expect("write guard already dropped");
256
257        // Clear existing data
258        guard.order.clear();
259        guard.index.clear();
260
261        // Add all entities with proper sorting
262        for entity in entities {
263            // Compute sort key if ordering is configured
264            let sort_key = guard.key_spec.as_ref().map(|key_spec| Self::compute_sort_key(&entity, key_spec));
265
266            let entry = EntityEntry { entity, sort_key, dirty: false };
267            guard.order.push(entry);
268        }
269
270        // Sort all entries if we have ordering configured
271        if guard.key_spec.is_some() {
272            guard.order.sort_by(|a, b| {
273                match (&a.sort_key, &b.sort_key) {
274                    (Some(key_a), Some(key_b)) => {
275                        // Compare keys first, then entity ID for tie-breaking
276                        key_a.cmp(key_b).then_with(|| a.entity.id().cmp(b.entity.id()))
277                    }
278                    (Some(_), None) => std::cmp::Ordering::Less,
279                    (None, Some(_)) => std::cmp::Ordering::Greater,
280                    (None, None) => a.entity.id().cmp(b.entity.id()),
281                }
282            });
283        } else {
284            // Sort by entity ID only if no key spec
285            guard.order.sort_by(|a, b| a.entity.id().cmp(b.entity.id()));
286        }
287
288        // Apply limit if configured
289        if let Some(limit) = guard.limit {
290            if guard.order.len() > limit {
291                guard.order.truncate(limit);
292            }
293        }
294
295        // Rebuild index
296        let index_updates: Vec<_> = guard.order.iter().enumerate().map(|(i, entry)| (*entry.entity.id(), i)).collect();
297        for (id, i) in index_updates {
298            guard.index.insert(id, i);
299        }
300
301        self.changed = true;
302    }
303
304    /// Compute sort key for an entity using the current key spec
305    fn compute_sort_key(entity: &E, key_spec: &KeySpec) -> IVec {
306        let mut values = Vec::new();
307
308        // Extract values for each key part
309        for keypart in &key_spec.keyparts {
310            let value = AbstractEntity::value(entity, &keypart.column);
311            // TODO: Handle NULLs properly - for now we'll get encoding errors on NULLs
312            // which will cause unwrap_or_default() to return empty key (sorts first)
313            if let Some(v) = value {
314                values.push(v);
315            } else {
316                // Skip this entity for now if any field is NULL
317                return IVec::from_slice(&[]); // Empty key sorts first
318            }
319        }
320
321        // Encode the tuple - if this fails, return empty key (will sort first)
322        let encoded = encode_tuple_values_with_key_spec(&values, key_spec).unwrap_or_default();
323        IVec::from(encoded)
324    }
325}
326
327impl<'a, E: AbstractEntity> Drop for ResultSetWrite<'a, E> {
328    fn drop(&mut self) {
329        if self.changed {
330            // Drop the guard first to release the lock before broadcasting
331            drop(self.guard.take());
332            self.resultset.0.broadcast.send(());
333        }
334    }
335}
336
337impl<'a, E: AbstractEntity> ResultSetRead<'a, E> {
338    /// Check if an entity exists
339    pub fn contains(&self, id: &proto::EntityId) -> bool { self.guard.index.contains_key(id) }
340
341    /// Iterate over all entities
342    /// Returns an iterator over (entity_id, entity) pairs
343    pub fn iter_entities(&self) -> impl Iterator<Item = (proto::EntityId, &E)> {
344        self.guard.order.iter().map(|entity| (*entity.entity.id(), &entity.entity))
345    }
346
347    /// Get the number of entities
348    pub fn len(&self) -> usize { self.guard.order.len() }
349
350    /// Check if the result set is empty
351    pub fn is_empty(&self) -> bool { self.guard.order.is_empty() }
352}
353
354impl<E: AbstractEntity> EntityResultSet<E> {
355    pub fn from_vec(entities: Vec<E>, loaded: bool) -> Self {
356        let mut index = HashMap::new();
357        let mut order = Vec::new();
358
359        for (i, entity) in entities.into_iter().enumerate() {
360            index.insert(*entity.id(), i);
361            order.push(EntityEntry { entity, sort_key: None, dirty: false });
362        }
363
364        let state = State { order, index, key_spec: None, limit: None, gap_dirty: false };
365        Self(Arc::new(Inner { state: std::sync::Mutex::new(state), loaded: AtomicBool::new(loaded), broadcast: Broadcast::new() }))
366    }
367    pub fn empty() -> Self {
368        let state = State { order: Vec::new(), index: HashMap::new(), key_spec: None, limit: None, gap_dirty: false };
369        Self(Arc::new(Inner { state: std::sync::Mutex::new(state), loaded: AtomicBool::new(false), broadcast: Broadcast::new() }))
370    }
371    pub fn single(entity: E) -> Self {
372        let entry = EntityEntry { entity: entity.clone(), sort_key: None, dirty: false };
373        let mut state = State { order: vec![entry], index: HashMap::new(), key_spec: None, limit: None, gap_dirty: false };
374        state.index.insert(*entity.id(), 0);
375        Self(Arc::new(Inner { state: std::sync::Mutex::new(state), loaded: AtomicBool::new(false), broadcast: Broadcast::new() }))
376    }
377
378    /// Begin a write operation for atomic changes to the resultset
379    /// All mutations happen through the returned write guard
380    /// A single notification is sent when the guard is dropped (if changes were made)
381    pub fn write(&self) -> ResultSetWrite<'_, E> {
382        let guard = self.0.state.lock().unwrap();
383        ResultSetWrite { resultset: self, changed: false, guard: Some(guard) }
384    }
385
386    /// Get a read guard for consistent read-only access to the resultset
387    pub fn read(&self) -> ResultSetRead<'_, E> {
388        let guard = self.0.state.lock().unwrap();
389        ResultSetRead { guard }
390    }
391    pub fn set_loaded(&self, loaded: bool) {
392        self.0.loaded.store(loaded, Ordering::Relaxed);
393        self.0.broadcast.send(());
394    }
395    pub fn is_loaded(&self) -> bool {
396        CurrentObserver::track(&self);
397        self.0.loaded.load(Ordering::Relaxed)
398    }
399
400    pub fn clear(&self) {
401        let mut st = self.0.state.lock().unwrap();
402        st.order.clear();
403        st.index.clear();
404        drop(st);
405        self.0.broadcast.send(());
406    }
407
408    /// Get an iterator over entity IDs without cloning entities
409    pub fn keys(&self) -> EntityResultSetKeyIterator {
410        // TODO make a signal trait for tracked keys
411        CurrentObserver::track(&self);
412        let st = self.0.state.lock().unwrap();
413        let keys: Vec<proto::EntityId> = st.order.iter().map(|e| *e.entity.id()).collect();
414        EntityResultSetKeyIterator::new(keys)
415    }
416
417    /// Check if an entity with the given ID exists
418    pub fn contains_key(&self, id: &proto::EntityId) -> bool {
419        // TODO make a signal trait for tracked contains_key
420        CurrentObserver::track(&self);
421        let st = self.0.state.lock().unwrap();
422        st.index.contains_key(id)
423    }
424
425    pub fn by_id(&self, id: &proto::EntityId) -> Option<E> {
426        // TODO make a signal trait for tracked by_id
427        CurrentObserver::track(self);
428        let st = self.0.state.lock().unwrap();
429        st.index.get(id).map(|&i| st.order[i].entity.clone())
430    }
431
432    pub fn len(&self) -> usize {
433        CurrentObserver::track(&self);
434        let st = self.0.state.lock().unwrap();
435        st.order.len()
436    }
437
438    /// Check if this result set needs gap filling
439    pub(crate) fn is_gap_dirty(&self) -> bool {
440        let st = self.0.state.lock().unwrap();
441        st.gap_dirty
442    }
443
444    /// Clear the gap_dirty flag (called after gap filling is complete)
445    pub(crate) fn clear_gap_dirty(&self) {
446        let mut st = self.0.state.lock().unwrap();
447        st.gap_dirty = false;
448    }
449
450    /// Get the current limit for this result set
451    pub fn get_limit(&self) -> Option<usize> {
452        let st = self.0.state.lock().unwrap();
453        st.limit
454    }
455
456    /// Get the last entity for gap filling continuation
457    pub(crate) fn last_entity(&self) -> Option<E> {
458        let st = self.0.state.lock().unwrap();
459        st.order.last().map(|entry| entry.entity.clone())
460    }
461
462    /// Configure ordering for this result set
463    pub(crate) fn order_by(&self, key_spec: Option<KeySpec>) {
464        let mut st = self.0.state.lock().unwrap();
465
466        // Check if the key spec actually changed
467        if st.key_spec == key_spec {
468            return; // No change, no-op
469        }
470
471        st.key_spec = key_spec.clone();
472
473        // Recompute sort keys for all entries
474        for entry in &mut st.order {
475            entry.sort_key = if let Some(ref ks) = key_spec {
476                Some(ResultSetWrite::compute_sort_key(&entry.entity, ks))
477            } else {
478                None // No ORDER BY, sort by entity ID only
479            };
480        }
481
482        // Sort by the new keys
483        st.order.sort_by(|a, b| {
484            match (&a.sort_key, &b.sort_key) {
485                (Some(key_a), Some(key_b)) => {
486                    // First compare by sort key
487                    match key_a.cmp(key_b) {
488                        std::cmp::Ordering::Equal => a.entity.id().cmp(b.entity.id()), // Tie-break by entity ID
489                        other => other,
490                    }
491                }
492                (Some(_), None) => std::cmp::Ordering::Greater,
493                (None, Some(_)) => std::cmp::Ordering::Less,
494                (None, None) => a.entity.id().cmp(b.entity.id()),
495            }
496        });
497
498        // Rebuild index after sorting
499        st.index.clear();
500        let index_updates: Vec<_> = st.order.iter().enumerate().map(|(i, entry)| (*entry.entity.id(), i)).collect();
501        for (id, i) in index_updates {
502            st.index.insert(id, i);
503        }
504
505        drop(st);
506        self.0.broadcast.send(());
507    }
508
509    /// Set the limit for this result set
510    pub(crate) fn limit(&self, limit: Option<usize>) {
511        let mut st = self.0.state.lock().unwrap();
512
513        // Check if the limit actually changed
514        if st.limit == limit {
515            return; // No change, no-op
516        }
517
518        st.limit = limit;
519
520        // Apply the new limit by truncating if necessary
521        let mut entities_removed = false;
522        if let Some(limit) = limit {
523            if st.order.len() > limit {
524                st.order.truncate(limit);
525                entities_removed = true;
526
527                // Rebuild index after truncation
528                st.index.clear();
529                let index_updates: Vec<_> = st.order.iter().enumerate().map(|(i, entry)| (*entry.entity.id(), i)).collect();
530                for (id, i) in index_updates {
531                    st.index.insert(id, i);
532                }
533            }
534        }
535
536        drop(st);
537
538        // Only broadcast if entities were actually removed
539        if entities_removed {
540            self.0.broadcast.send(());
541        }
542    }
543}
544
545#[cfg(test)]
546mod tests {
547    use super::*;
548    use crate::indexing::{IndexDirection, IndexKeyPart, KeySpec, NullsOrder};
549    use crate::value::{Value, ValueType};
550    use ankurah_proto as proto;
551    use std::collections::HashMap;
552
553    #[derive(Debug, Clone)]
554    struct TestEntity {
555        id: proto::EntityId,
556        collection: proto::CollectionId,
557        properties: HashMap<String, Value>,
558    }
559
560    impl TestEntity {
561        fn new(id: u8, properties: HashMap<String, Value>) -> Self {
562            let mut id_bytes = [0u8; 16];
563            id_bytes[15] = id;
564            Self { id: proto::EntityId::from_bytes(id_bytes), collection: proto::CollectionId::fixed_name("test"), properties }
565        }
566    }
567
568    impl AbstractEntity for TestEntity {
569        fn collection(&self) -> proto::CollectionId { self.collection.clone() }
570
571        fn id(&self) -> &proto::EntityId { &self.id }
572
573        fn value(&self, field: &str) -> Option<Value> {
574            if field == "id" {
575                Some(Value::EntityId(self.id.clone()))
576            } else {
577                self.properties.get(field).cloned()
578            }
579        }
580    }
581
582    #[test]
583    fn test_entity_id_ordering() {
584        let resultset = EntityResultSet::empty();
585        let mut write = resultset.write();
586
587        // Create entities with different IDs (bytes sort chronologically)
588        let entity1 = TestEntity::new(1, HashMap::new());
589        let entity2 = TestEntity::new(2, HashMap::new());
590        let entity3 = TestEntity::new(3, HashMap::new());
591
592        // Add in reverse order
593        write.add(entity3.clone());
594        write.add(entity1.clone());
595        write.add(entity2.clone());
596
597        drop(write);
598
599        // Should be sorted by entity ID
600        let read_guard = resultset.read();
601        let entities: Vec<_> = read_guard.iter_entities().collect();
602        assert_eq!(entities.len(), 3);
603        assert_eq!(entities[0].0, entity1.id);
604        assert_eq!(entities[1].0, entity2.id);
605        assert_eq!(entities[2].0, entity3.id);
606    }
607
608    #[test]
609    fn test_order_by_with_tie_breaking() {
610        let resultset = EntityResultSet::empty();
611
612        // Create entities with same name but different IDs
613        let mut props1 = HashMap::new();
614        props1.insert("name".to_string(), Value::String("Alice".to_string()));
615        let entity1 = TestEntity::new(1, props1);
616
617        let mut props2 = HashMap::new();
618        props2.insert("name".to_string(), Value::String("Alice".to_string()));
619        let entity2 = TestEntity::new(2, props2);
620
621        let mut props3 = HashMap::new();
622        props3.insert("name".to_string(), Value::String("Bob".to_string()));
623        let entity3 = TestEntity::new(3, props3);
624
625        // Set up ordering by name
626        let key_spec = KeySpec {
627            keyparts: vec![IndexKeyPart {
628                column: "name".to_string(),
629                direction: IndexDirection::Asc,
630                nulls: Some(NullsOrder::Last),
631                collation: None,
632                value_type: ValueType::String,
633            }],
634        };
635        resultset.order_by(Some(key_spec));
636
637        let mut write = resultset.write();
638        write.add(entity2.clone());
639        write.add(entity3.clone());
640        write.add(entity1.clone());
641        drop(write);
642
643        // Should be sorted by name, then by entity ID for tie-breaking
644        let read_guard = resultset.read();
645        let entities: Vec<_> = read_guard.iter_entities().collect();
646        assert_eq!(entities.len(), 3);
647        // Both Alice entities should come first (sorted by ID), then Bob
648        assert_eq!(entities[0].0, entity1.id); // Alice (earlier ID)
649        assert_eq!(entities[1].0, entity2.id); // Alice (later ID)
650        assert_eq!(entities[2].0, entity3.id); // Bob
651    }
652
653    #[test]
654    fn test_limit_functionality() {
655        let resultset = EntityResultSet::empty();
656
657        // Add some entities
658        let mut write = resultset.write();
659        for i in 0..5u8 {
660            let mut props = HashMap::new();
661            props.insert("value".to_string(), Value::I32(i as i32));
662            let entity = TestEntity::new(i, props);
663            write.add(entity);
664        }
665        drop(write);
666
667        assert_eq!(resultset.len(), 5);
668
669        // Apply limit
670        resultset.limit(Some(3));
671        assert_eq!(resultset.len(), 3);
672
673        // Remove limit
674        resultset.limit(None);
675        assert_eq!(resultset.len(), 3); // Should stay truncated
676    }
677
678    #[test]
679    fn test_dirty_tracking() {
680        let resultset = EntityResultSet::empty();
681
682        let mut props = HashMap::new();
683        props.insert("active".to_string(), Value::Bool(true));
684        let entity1 = TestEntity::new(1, props);
685
686        let mut props = HashMap::new();
687        props.insert("active".to_string(), Value::Bool(false));
688        let entity2 = TestEntity::new(2, props);
689
690        let mut write = resultset.write();
691        write.add(entity1.clone());
692        write.add(entity2.clone());
693
694        // Mark all dirty
695        write.mark_all_dirty();
696
697        // Retain only active entities
698        let removed = write.retain_dirty(|entity| entity.value("active") == Some(Value::Bool(true)));
699
700        drop(write);
701
702        assert_eq!(removed.len(), 1);
703        assert_eq!(removed[0], entity2.id);
704        assert_eq!(resultset.len(), 1);
705        assert_eq!(resultset.read().iter_entities().next().unwrap().0, entity1.id);
706    }
707
708    #[test]
709    fn test_write_guard_atomic_operations() {
710        let resultset = EntityResultSet::empty();
711
712        // Multiple operations in one write guard should be atomic
713        {
714            let mut write = resultset.write();
715            let entity1 = TestEntity::new(1, HashMap::new());
716            let entity2 = TestEntity::new(2, HashMap::new());
717
718            write.add(entity1);
719            write.add(entity2);
720
721            // Operations are visible within the same write guard
722            assert_eq!(write.iter_entities().count(), 2);
723            // Notification sent when write guard is dropped
724        }
725
726        // Operations should be visible after guard is dropped
727        assert_eq!(resultset.len(), 2);
728    }
729
730    #[test]
731    fn test_ivec_small_keys() {
732        // Test that small keys (<=16 bytes) use the Small variant
733        let small_key = IVec::from_slice(b"hello");
734        let another_small = IVec::from_slice(b"world");
735        let empty_key = IVec::from_slice(b"");
736
737        // Verify ordering works correctly with zero padding
738        assert!(small_key < another_small); // "hello" < "world"
739        assert!(empty_key < small_key); // empty sorts first
740
741        // Test that zero padding doesn't affect comparison
742        let key_ab = IVec::from_slice(b"ab");
743        let key_abc = IVec::from_slice(b"abc");
744        assert!(key_ab < key_abc); // "ab" < "abc" even with zero padding
745    }
746
747    #[test]
748    fn test_ivec_large_keys() {
749        // Test that large keys (>16 bytes) use the Large variant
750        let large_key = IVec::from_slice(&[1u8; 20]); // 20 bytes
751        let small_key = IVec::from_slice(&[1u8; 10]); // 10 bytes
752
753        // Verify they can be compared
754        assert!(small_key < large_key); // smaller array should sort first
755    }
756
757    #[test]
758    fn test_ivec_boundary() {
759        // Test the 16-byte boundary
760        let exactly_16 = IVec::from_slice(&[1u8; 16]);
761        let exactly_17 = IVec::from_slice(&[1u8; 17]);
762
763        // Both should work and be comparable
764        assert!(exactly_16 < exactly_17);
765
766        // Verify 16-byte keys use Small variant (this is implicit in the implementation)
767        match exactly_16 {
768            IVec::Small(_) => (), // Expected
769            IVec::Large(_) => panic!("16-byte key should use Small variant"),
770        }
771
772        match exactly_17 {
773            IVec::Large(_) => (), // Expected
774            IVec::Small(_) => panic!("17-byte key should use Large variant"),
775        }
776    }
777}
778
779fn fix_from<E: AbstractEntity>(st: &mut State<E>, start: usize) {
780    // Recompute indices for shifted tail
781    for i in start..st.order.len() {
782        let id = *st.order[i].entity.id();
783        st.index.insert(id, i);
784    }
785}
786
787impl<E: View> ResultSet<E> {
788    pub fn iter(&self) -> ResultSetIter<E> { ResultSetIter::new(self.clone()) }
789}
790
791impl<E: View> Clone for ResultSet<E> {
792    fn clone(&self) -> Self { Self(self.0.clone(), std::marker::PhantomData) }
793}
794
795impl<E: View> Default for ResultSet<E> {
796    fn default() -> Self {
797        let entity_resultset = EntityResultSet::empty();
798        Self(entity_resultset, std::marker::PhantomData)
799    }
800}
801
802impl<E: AbstractEntity> Signal for EntityResultSet<E> {
803    fn listen(&self, listener: Listener) -> ListenerGuard { ListenerGuard::new(self.0.broadcast.reference().listen(listener)) }
804    fn broadcast_id(&self) -> BroadcastId { self.0.broadcast.id() }
805}
806
807impl<R: View> Signal for ResultSet<R> {
808    fn listen(&self, listener: Listener) -> ListenerGuard { ListenerGuard::new(self.0 .0.broadcast.reference().listen(listener)) }
809
810    fn broadcast_id(&self) -> BroadcastId { self.0 .0.broadcast.id() }
811}
812
813impl<E: View + Clone + 'static> Get<Vec<E>> for ResultSet<E> {
814    fn get(&self) -> Vec<E> {
815        use ankurah_signals::CurrentObserver;
816        CurrentObserver::track(self);
817        self.0 .0.state.lock().unwrap().order.iter().map(|e| E::from_entity(e.entity.clone())).collect()
818    }
819}
820
821impl<E: View + Clone + 'static> Peek<Vec<E>> for ResultSet<E> {
822    fn peek(&self) -> Vec<E> { self.0 .0.state.lock().unwrap().order.iter().map(|e| E::from_entity(e.entity.clone())).collect() }
823}
824
825impl<E: View + Clone + 'static> Subscribe<Vec<E>> for ResultSet<E> {
826    fn subscribe<F>(&self, listener: F) -> SubscriptionGuard
827    where F: IntoSubscribeListener<Vec<E>> {
828        let listener = listener.into_subscribe_listener();
829        let me = self.clone();
830        let guard: ankurah_signals::broadcast::ListenerGuard<()> = self.0 .0.broadcast.reference().listen(move |_| {
831            let entities: Vec<E> = me.0 .0.state.lock().unwrap().order.iter().map(|e| E::from_entity(e.entity.clone())).collect();
832            listener(entities);
833        });
834        SubscriptionGuard::new(ListenerGuard::new(guard))
835    }
836}
837
838#[derive(Debug)]
839pub struct ResultSetIter<E: View> {
840    resultset: ResultSet<E>,
841    index: usize,
842}
843
844impl<E: View> ResultSetIter<E> {
845    fn new(resultset: ResultSet<E>) -> Self { Self { resultset, index: 0 } }
846}
847
848impl<E: View + Clone> Iterator for ResultSetIter<E> {
849    type Item = E;
850
851    fn next(&mut self) -> Option<Self::Item> {
852        // Track the underlying resultset using the CurrentObserver when iterating
853        use ankurah_signals::CurrentObserver;
854        CurrentObserver::track(&self.resultset);
855
856        let state = self.resultset.0 .0.state.lock().unwrap();
857        if self.index < state.order.len() {
858            let entity = &state.order[self.index].entity;
859            let view = E::from_entity(entity.clone());
860            self.index += 1;
861            Some(view)
862        } else {
863            None
864        }
865    }
866}
867
868impl<E: View + Clone> IntoIterator for ResultSet<E> {
869    type Item = E;
870    type IntoIter = ResultSetIter<E>;
871
872    fn into_iter(self) -> Self::IntoIter { ResultSetIter::new(self) }
873}
874
875impl<E: View + Clone> IntoIterator for &ResultSet<E> {
876    type Item = E;
877    type IntoIter = ResultSetIter<E>;
878
879    fn into_iter(self) -> Self::IntoIter { ResultSetIter::new(self.clone()) }
880}
881
882#[derive(Debug)]
883pub struct EntityResultSetKeyIterator {
884    keys: Vec<proto::EntityId>,
885    index: usize,
886}
887
888impl EntityResultSetKeyIterator {
889    fn new(keys: Vec<proto::EntityId>) -> Self { Self { keys, index: 0 } }
890}
891
892impl Iterator for EntityResultSetKeyIterator {
893    type Item = proto::EntityId;
894
895    fn next(&mut self) -> Option<Self::Item> {
896        if self.index < self.keys.len() {
897            let key = self.keys[self.index];
898            self.index += 1;
899            Some(key)
900        } else {
901            None
902        }
903    }
904}
905
906// Specific implementation for EntityResultSet<Entity> to provide map method
907impl EntityResultSet<Entity> {
908    pub fn wrap<R: View>(&self) -> ResultSet<R> { ResultSet(self.clone(), std::marker::PhantomData) }
909}