Skip to main content

ankurah_core/
resultset.rs

1use crate::indexing::{encode_tuple_values_with_key_spec, KeySpec};
2use crate::{entity::Entity, model::View, reactor::AbstractEntity};
3use ankurah_proto as proto;
4use ankurah_signals::{
5    broadcast::{Broadcast, BroadcastId},
6    signal::{Listener, ListenerGuard},
7    subscribe::IntoSubscribeListener,
8    CurrentObserver, Get, Peek, Signal, Subscribe, SubscriptionGuard,
9};
10use std::{
11    collections::HashMap,
12    ops::Deref,
13    sync::{
14        atomic::{AtomicBool, Ordering},
15        Arc,
16    },
17};
18
19/// Efficient storage for sort keys - uses fixed array for small keys, Vec for larger ones
20#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
21enum IVec {
22    /// Keys <= 16 bytes stored in zero-padded fixed array
23    Small([u8; 16]),
24    /// Keys > 16 bytes stored in Vec
25    Large(Vec<u8>),
26}
27
28impl IVec {
29    /// Create from byte slice
30    fn from_slice(bytes: &[u8]) -> Self {
31        if bytes.len() <= 16 {
32            let mut data = [0u8; 16];
33            data[..bytes.len()].copy_from_slice(bytes);
34            Self::Small(data)
35        } else {
36            Self::Large(bytes.to_vec())
37        }
38    }
39}
40
41impl From<Vec<u8>> for IVec {
42    fn from(vec: Vec<u8>) -> Self { Self::from_slice(&vec) }
43}
44
45#[derive(Debug, Clone)]
46pub struct EntityResultSet<E: AbstractEntity = Entity>(Arc<Inner<E>>);
47
48/// View-typed ResultSet
49#[derive(Debug)]
50pub struct ResultSet<R: View>(EntityResultSet<Entity>, std::marker::PhantomData<R>);
51
52impl<R: View> Deref for ResultSet<R> {
53    type Target = EntityResultSet<Entity>;
54    fn deref(&self) -> &Self::Target { &self.0 }
55}
56
57impl<R: View> ResultSet<R> {
58    pub fn by_id(&self, id: &proto::EntityId) -> Option<R> { self.0.by_id(id).map(|e| R::from_entity(e)) }
59}
60
61#[derive(Debug)]
62struct Inner<E: AbstractEntity> {
63    // Order preserving set of entities
64    state: std::sync::Mutex<State<E>>,
65    loaded: AtomicBool,
66    broadcast: Broadcast<()>,
67}
68
69#[derive(Debug)]
70struct State<E: AbstractEntity> {
71    order: Vec<EntityEntry<E>>,
72    index: HashMap<proto::EntityId, usize>,
73    // Ordering configuration
74    key_spec: Option<KeySpec>,
75    limit: Option<usize>,
76    gap_dirty: bool, // Set when we remove entities and go from =LIMIT to < LIMIT
77}
78
79#[derive(Debug, Clone)]
80struct EntityEntry<E: AbstractEntity> {
81    entity: E,
82    sort_key: Option<IVec>,
83    dirty: bool,
84}
85// TODO - figure out how to maintain ordering of entities
86
87/// A write guard for making atomic changes to a ResultSet
88/// Holds the mutex guard to ensure all changes happen atomically
89/// Sends a single notification when dropped (if any changes were made)
90pub struct ResultSetWrite<'a, E: AbstractEntity = Entity> {
91    resultset: &'a EntityResultSet<E>,
92    changed: bool,
93    guard: Option<std::sync::MutexGuard<'a, State<E>>>,
94}
95
96/// A read guard for read-only access to a ResultSet
97/// Holds the mutex guard to ensure consistent reads
98pub struct ResultSetRead<'a, E: AbstractEntity = Entity> {
99    guard: std::sync::MutexGuard<'a, State<E>>,
100}
101
102// TODO - build unit tests for this
103impl<'a, E: AbstractEntity> ResultSetWrite<'a, E> {
104    /// Add an entity to the result set
105    pub fn add(&mut self, entity: E) -> bool {
106        let guard = self.guard.as_mut().expect("write guard already dropped");
107        let id = *entity.id();
108        if guard.index.contains_key(&id) {
109            return false; // Already present
110        }
111
112        // Compute sort key if ordering is configured
113        let sort_key = guard.key_spec.as_ref().map(|key_spec| Self::compute_sort_key(&entity, key_spec));
114
115        let entry = EntityEntry { entity, sort_key, dirty: false };
116
117        // Insert in correct position (always sort by entity ID, with optional key spec first)
118        let pos = guard
119            .order
120            .binary_search_by(|existing| {
121                match (&existing.sort_key, &entry.sort_key) {
122                    (Some(existing_key), Some(entry_key)) => {
123                        // Both have sort keys - compare keys first, then entity ID for tie-breaking
124                        existing_key.cmp(entry_key).then_with(|| existing.entity.id().cmp(entry.entity.id()))
125                    }
126                    (Some(_), None) => std::cmp::Ordering::Less, // Keyed entries sort before unkeyed
127                    (None, Some(_)) => std::cmp::Ordering::Greater, // Unkeyed entries sort after keyed
128                    (None, None) => existing.entity.id().cmp(entry.entity.id()), // Both unkeyed - sort by entity ID
129                }
130            })
131            .unwrap_or_else(|pos| pos);
132
133        guard.order.insert(pos, entry);
134        guard.index.insert(id, pos);
135
136        // Fix indices for all entries after the insertion point
137        for i in (pos + 1)..guard.order.len() {
138            let entry_id = *guard.order[i].entity.id();
139            guard.index.insert(entry_id, i);
140        }
141
142        // Apply limit if configured
143        if let Some(limit) = guard.limit {
144            if guard.order.len() > limit {
145                // Remove the last entry (beyond limit)
146                if let Some(removed_entry) = guard.order.pop() {
147                    let removed_id = *removed_entry.entity.id();
148                    guard.index.remove(&removed_id);
149                    // TODO: Return the evicted entity ID for the caller to handle
150                }
151            }
152        }
153
154        self.changed = true;
155        true
156    }
157
158    /// Remove an entity from the result set
159    pub fn remove(&mut self, id: proto::EntityId) -> bool {
160        let guard = self.guard.as_mut().expect("write guard already dropped");
161        if let Some(idx) = guard.index.remove(&id) {
162            // Check if we were at limit before removal
163            if guard.limit.is_some_and(|limit| guard.order.len() == limit) {
164                guard.gap_dirty = true;
165            }
166
167            guard.order.remove(idx);
168            if idx < guard.order.len() {
169                fix_from(guard, idx);
170            }
171
172            self.changed = true;
173            true
174        } else {
175            false
176        }
177    }
178
179    /// Check if an entity exists
180    pub fn contains(&self, id: &proto::EntityId) -> bool {
181        self.guard.as_ref().expect("write guard already dropped").index.contains_key(id)
182    }
183
184    /// Iterate over all entities
185    /// Returns an iterator over (entity_id, entity) pairs
186    pub fn iter_entities(&self) -> impl Iterator<Item = (proto::EntityId, &E)> {
187        let guard = self.guard.as_ref().expect("write guard already dropped");
188        guard.order.iter().map(|entry| (*entry.entity.id(), &entry.entity))
189    }
190
191    /// Mark all entities as dirty for re-evaluation
192    pub fn mark_all_dirty(&mut self) {
193        let guard = self.guard.as_mut().expect("write guard already dropped");
194        for entry in &mut guard.order {
195            entry.dirty = true;
196        }
197        self.changed = true;
198    }
199
200    /// Retain only dirty entities that pass the closure, removing those that don't
201    pub fn retain_dirty<F>(&mut self, mut should_retain: F) -> Vec<proto::EntityId>
202    where F: FnMut(&E) -> bool {
203        let guard = self.guard.as_mut().expect("write guard already dropped");
204        let mut removed_ids = Vec::new();
205        let mut i = 0;
206
207        // Check if we were at limit before any removals
208        let was_at_limit = guard.limit.is_some_and(|limit| guard.order.len() == limit);
209
210        while i < guard.order.len() {
211            if guard.order[i].dirty {
212                let should_keep = should_retain(&guard.order[i].entity);
213                if should_keep {
214                    // Entity should be retained - recompute sort key and mark clean
215                    let key_spec = guard.key_spec.clone();
216                    if let Some(key_spec) = key_spec {
217                        guard.order[i].sort_key = Some(Self::compute_sort_key(&guard.order[i].entity, &key_spec));
218                    }
219                    guard.order[i].dirty = false;
220                    i += 1;
221                } else {
222                    // Entity should be removed
223                    let removed_entry = guard.order.remove(i);
224                    let removed_id = *removed_entry.entity.id();
225                    guard.index.remove(&removed_id);
226                    removed_ids.push(removed_id);
227                    // Don't increment i since we removed an element
228                }
229            } else {
230                i += 1;
231            }
232        }
233
234        // Fix indices after removals (no re-sorting needed)
235        guard.index.clear();
236        let index_updates: Vec<_> = guard.order.iter().enumerate().map(|(i, entry)| (*entry.entity.id(), i)).collect();
237        for (id, i) in index_updates {
238            guard.index.insert(id, i);
239        }
240
241        if !removed_ids.is_empty() {
242            self.changed = true;
243
244            // Set gap_dirty if we went from LIMIT to < LIMIT
245            if (!guard.gap_dirty) && was_at_limit && guard.limit.is_some_and(|limit| guard.order.len() < limit) {
246                guard.gap_dirty = true;
247            }
248        }
249
250        removed_ids
251    }
252
253    /// Replace all entities in the result set with proper sorting
254    pub fn replace_all(&mut self, entities: Vec<E>) {
255        let guard = self.guard.as_mut().expect("write guard already dropped");
256
257        // Clear existing data
258        guard.order.clear();
259        guard.index.clear();
260
261        // Add all entities with proper sorting
262        for entity in entities {
263            // Compute sort key if ordering is configured
264            let sort_key = guard.key_spec.as_ref().map(|key_spec| Self::compute_sort_key(&entity, key_spec));
265
266            let entry = EntityEntry { entity, sort_key, dirty: false };
267            guard.order.push(entry);
268        }
269
270        // Sort all entries if we have ordering configured
271        if guard.key_spec.is_some() {
272            guard.order.sort_by(|a, b| {
273                match (&a.sort_key, &b.sort_key) {
274                    (Some(key_a), Some(key_b)) => {
275                        // Compare keys first, then entity ID for tie-breaking
276                        key_a.cmp(key_b).then_with(|| a.entity.id().cmp(b.entity.id()))
277                    }
278                    (Some(_), None) => std::cmp::Ordering::Less,
279                    (None, Some(_)) => std::cmp::Ordering::Greater,
280                    (None, None) => a.entity.id().cmp(b.entity.id()),
281                }
282            });
283        } else {
284            // Sort by entity ID only if no key spec
285            guard.order.sort_by(|a, b| a.entity.id().cmp(b.entity.id()));
286        }
287
288        // Apply limit if configured
289        if let Some(limit) = guard.limit {
290            if guard.order.len() > limit {
291                guard.order.truncate(limit);
292            }
293        }
294
295        // Rebuild index
296        let index_updates: Vec<_> = guard.order.iter().enumerate().map(|(i, entry)| (*entry.entity.id(), i)).collect();
297        for (id, i) in index_updates {
298            guard.index.insert(id, i);
299        }
300
301        self.changed = true;
302    }
303
304    /// Compute sort key for an entity using the current key spec
305    fn compute_sort_key(entity: &E, key_spec: &KeySpec) -> IVec {
306        let mut values = Vec::new();
307
308        // Extract values for each key part
309        for keypart in &key_spec.keyparts {
310            let value = AbstractEntity::value(entity, &keypart.column);
311            // TODO: Handle NULLs properly - for now we'll get encoding errors on NULLs
312            // which will cause unwrap_or_default() to return empty key (sorts first)
313            if let Some(v) = value {
314                values.push(v);
315            } else {
316                // Skip this entity for now if any field is NULL
317                return IVec::from_slice(&[]); // Empty key sorts first
318            }
319        }
320
321        // Encode the tuple - if this fails, return empty key (will sort first)
322        let encoded = encode_tuple_values_with_key_spec(&values, key_spec).unwrap_or_default();
323        IVec::from(encoded)
324    }
325
326    /// Set the loaded flag as part of this write transaction.
327    /// The flag is set while the lock is held, ensuring subscribers see
328    /// consistent state (both content and loaded flag) when notified.
329    pub fn set_loaded(&mut self, loaded: bool) {
330        self.resultset.0.loaded.store(loaded, std::sync::atomic::Ordering::Relaxed);
331        self.changed = true; // Ensure we broadcast on drop
332    }
333}
334
335impl<'a, E: AbstractEntity> Drop for ResultSetWrite<'a, E> {
336    fn drop(&mut self) {
337        if self.changed {
338            // Drop the guard first to release the lock before broadcasting
339            drop(self.guard.take());
340            self.resultset.0.broadcast.send(());
341        }
342    }
343}
344
345impl<'a, E: AbstractEntity> ResultSetRead<'a, E> {
346    /// Check if an entity exists
347    pub fn contains(&self, id: &proto::EntityId) -> bool { self.guard.index.contains_key(id) }
348
349    /// Iterate over all entities
350    /// Returns an iterator over (entity_id, entity) pairs
351    pub fn iter_entities(&self) -> impl Iterator<Item = (proto::EntityId, &E)> {
352        self.guard.order.iter().map(|entity| (*entity.entity.id(), &entity.entity))
353    }
354
355    /// Get the number of entities
356    pub fn len(&self) -> usize { self.guard.order.len() }
357
358    /// Check if the result set is empty
359    pub fn is_empty(&self) -> bool { self.guard.order.is_empty() }
360}
361
362impl<E: AbstractEntity> EntityResultSet<E> {
363    pub fn from_vec(entities: Vec<E>, loaded: bool) -> Self {
364        let mut index = HashMap::new();
365        let mut order = Vec::new();
366
367        for (i, entity) in entities.into_iter().enumerate() {
368            index.insert(*entity.id(), i);
369            order.push(EntityEntry { entity, sort_key: None, dirty: false });
370        }
371
372        let state = State { order, index, key_spec: None, limit: None, gap_dirty: false };
373        Self(Arc::new(Inner { state: std::sync::Mutex::new(state), loaded: AtomicBool::new(loaded), broadcast: Broadcast::new() }))
374    }
375    pub fn empty() -> Self {
376        let state = State { order: Vec::new(), index: HashMap::new(), key_spec: None, limit: None, gap_dirty: false };
377        Self(Arc::new(Inner { state: std::sync::Mutex::new(state), loaded: AtomicBool::new(false), broadcast: Broadcast::new() }))
378    }
379    pub fn single(entity: E) -> Self {
380        let entry = EntityEntry { entity: entity.clone(), sort_key: None, dirty: false };
381        let mut state = State { order: vec![entry], index: HashMap::new(), key_spec: None, limit: None, gap_dirty: false };
382        state.index.insert(*entity.id(), 0);
383        Self(Arc::new(Inner { state: std::sync::Mutex::new(state), loaded: AtomicBool::new(false), broadcast: Broadcast::new() }))
384    }
385
386    /// Begin a write operation for atomic changes to the resultset
387    /// All mutations happen through the returned write guard
388    /// A single notification is sent when the guard is dropped (if changes were made)
389    pub fn write(&self) -> ResultSetWrite<'_, E> {
390        let guard = self.0.state.lock().unwrap();
391        ResultSetWrite { resultset: self, changed: false, guard: Some(guard) }
392    }
393
394    /// Get a read guard for consistent read-only access to the resultset
395    pub fn read(&self) -> ResultSetRead<'_, E> {
396        let guard = self.0.state.lock().unwrap();
397        ResultSetRead { guard }
398    }
399    pub fn set_loaded(&self, loaded: bool) {
400        self.0.loaded.store(loaded, Ordering::Relaxed);
401        self.0.broadcast.send(());
402    }
403    pub fn is_loaded(&self) -> bool {
404        CurrentObserver::track(&self);
405        self.0.loaded.load(Ordering::Relaxed)
406    }
407
408    pub fn clear(&self) {
409        let mut st = self.0.state.lock().unwrap();
410        st.order.clear();
411        st.index.clear();
412        drop(st);
413        self.0.broadcast.send(());
414    }
415
416    /// Get an iterator over entity IDs without cloning entities
417    pub fn keys(&self) -> EntityResultSetKeyIterator {
418        // TODO make a signal trait for tracked keys
419        CurrentObserver::track(&self);
420        let st = self.0.state.lock().unwrap();
421        let keys: Vec<proto::EntityId> = st.order.iter().map(|e| *e.entity.id()).collect();
422        EntityResultSetKeyIterator::new(keys)
423    }
424
425    /// Check if an entity with the given ID exists
426    pub fn contains_key(&self, id: &proto::EntityId) -> bool {
427        // TODO make a signal trait for tracked contains_key
428        CurrentObserver::track(&self);
429        let st = self.0.state.lock().unwrap();
430        st.index.contains_key(id)
431    }
432
433    pub fn by_id(&self, id: &proto::EntityId) -> Option<E> {
434        // TODO make a signal trait for tracked by_id
435        CurrentObserver::track(self);
436        let st = self.0.state.lock().unwrap();
437        st.index.get(id).map(|&i| st.order[i].entity.clone())
438    }
439
440    pub fn len(&self) -> usize {
441        CurrentObserver::track(&self);
442        let st = self.0.state.lock().unwrap();
443        st.order.len()
444    }
445
446    /// Check if this result set needs gap filling
447    pub(crate) fn is_gap_dirty(&self) -> bool {
448        let st = self.0.state.lock().unwrap();
449        st.gap_dirty
450    }
451
452    /// Clear the gap_dirty flag (called after gap filling is complete)
453    pub(crate) fn clear_gap_dirty(&self) {
454        let mut st = self.0.state.lock().unwrap();
455        st.gap_dirty = false;
456    }
457
458    /// Get the current limit for this result set
459    pub fn get_limit(&self) -> Option<usize> {
460        let st = self.0.state.lock().unwrap();
461        st.limit
462    }
463
464    /// Get the last entity for gap filling continuation
465    pub(crate) fn last_entity(&self) -> Option<E> {
466        let st = self.0.state.lock().unwrap();
467        st.order.last().map(|entry| entry.entity.clone())
468    }
469
470    /// Configure ordering for this result set
471    pub(crate) fn order_by(&self, key_spec: Option<KeySpec>) {
472        let mut st = self.0.state.lock().unwrap();
473
474        // Check if the key spec actually changed
475        if st.key_spec == key_spec {
476            return; // No change, no-op
477        }
478
479        st.key_spec = key_spec.clone();
480
481        // Recompute sort keys for all entries
482        for entry in &mut st.order {
483            entry.sort_key = if let Some(ref ks) = key_spec {
484                Some(ResultSetWrite::compute_sort_key(&entry.entity, ks))
485            } else {
486                None // No ORDER BY, sort by entity ID only
487            };
488        }
489
490        // Sort by the new keys
491        st.order.sort_by(|a, b| {
492            match (&a.sort_key, &b.sort_key) {
493                (Some(key_a), Some(key_b)) => {
494                    // First compare by sort key
495                    match key_a.cmp(key_b) {
496                        std::cmp::Ordering::Equal => a.entity.id().cmp(b.entity.id()), // Tie-break by entity ID
497                        other => other,
498                    }
499                }
500                (Some(_), None) => std::cmp::Ordering::Greater,
501                (None, Some(_)) => std::cmp::Ordering::Less,
502                (None, None) => a.entity.id().cmp(b.entity.id()),
503            }
504        });
505
506        // Rebuild index after sorting
507        st.index.clear();
508        let index_updates: Vec<_> = st.order.iter().enumerate().map(|(i, entry)| (*entry.entity.id(), i)).collect();
509        for (id, i) in index_updates {
510            st.index.insert(id, i);
511        }
512
513        drop(st);
514        self.0.broadcast.send(());
515    }
516
517    /// Set the limit for this result set
518    pub(crate) fn limit(&self, limit: Option<usize>) {
519        let mut st = self.0.state.lock().unwrap();
520
521        // Check if the limit actually changed
522        if st.limit == limit {
523            return; // No change, no-op
524        }
525
526        st.limit = limit;
527
528        // Apply the new limit by truncating if necessary
529        let mut entities_removed = false;
530        if let Some(limit) = limit {
531            if st.order.len() > limit {
532                st.order.truncate(limit);
533                entities_removed = true;
534
535                // Rebuild index after truncation
536                st.index.clear();
537                let index_updates: Vec<_> = st.order.iter().enumerate().map(|(i, entry)| (*entry.entity.id(), i)).collect();
538                for (id, i) in index_updates {
539                    st.index.insert(id, i);
540                }
541            }
542        }
543
544        drop(st);
545
546        // Only broadcast if entities were actually removed
547        if entities_removed {
548            self.0.broadcast.send(());
549        }
550    }
551}
552
553#[cfg(test)]
554mod tests {
555    use super::*;
556    use crate::indexing::{IndexDirection, IndexKeyPart, KeySpec, NullsOrder};
557    use crate::value::{Value, ValueType};
558    use ankurah_proto as proto;
559    use std::collections::HashMap;
560
561    #[derive(Debug, Clone)]
562    struct TestEntity {
563        id: proto::EntityId,
564        collection: proto::CollectionId,
565        properties: HashMap<String, Value>,
566    }
567
568    impl TestEntity {
569        fn new(id: u8, properties: HashMap<String, Value>) -> Self {
570            let mut id_bytes = [0u8; 16];
571            id_bytes[15] = id;
572            Self { id: proto::EntityId::from_bytes(id_bytes), collection: proto::CollectionId::fixed_name("test"), properties }
573        }
574    }
575
576    impl AbstractEntity for TestEntity {
577        fn collection(&self) -> proto::CollectionId { self.collection.clone() }
578
579        fn id(&self) -> &proto::EntityId { &self.id }
580
581        fn value(&self, field: &str) -> Option<Value> {
582            if field == "id" {
583                Some(Value::EntityId(self.id.clone()))
584            } else {
585                self.properties.get(field).cloned()
586            }
587        }
588    }
589
590    #[test]
591    fn test_entity_id_ordering() {
592        let resultset = EntityResultSet::empty();
593        let mut write = resultset.write();
594
595        // Create entities with different IDs (bytes sort chronologically)
596        let entity1 = TestEntity::new(1, HashMap::new());
597        let entity2 = TestEntity::new(2, HashMap::new());
598        let entity3 = TestEntity::new(3, HashMap::new());
599
600        // Add in reverse order
601        write.add(entity3.clone());
602        write.add(entity1.clone());
603        write.add(entity2.clone());
604
605        drop(write);
606
607        // Should be sorted by entity ID
608        let read_guard = resultset.read();
609        let entities: Vec<_> = read_guard.iter_entities().collect();
610        assert_eq!(entities.len(), 3);
611        assert_eq!(entities[0].0, entity1.id);
612        assert_eq!(entities[1].0, entity2.id);
613        assert_eq!(entities[2].0, entity3.id);
614    }
615
616    #[test]
617    fn test_order_by_with_tie_breaking() {
618        let resultset = EntityResultSet::empty();
619
620        // Create entities with same name but different IDs
621        let mut props1 = HashMap::new();
622        props1.insert("name".to_string(), Value::String("Alice".to_string()));
623        let entity1 = TestEntity::new(1, props1);
624
625        let mut props2 = HashMap::new();
626        props2.insert("name".to_string(), Value::String("Alice".to_string()));
627        let entity2 = TestEntity::new(2, props2);
628
629        let mut props3 = HashMap::new();
630        props3.insert("name".to_string(), Value::String("Bob".to_string()));
631        let entity3 = TestEntity::new(3, props3);
632
633        // Set up ordering by name
634        let key_spec = KeySpec {
635            keyparts: vec![IndexKeyPart {
636                column: "name".to_string(),
637                sub_path: None,
638                direction: IndexDirection::Asc,
639                nulls: Some(NullsOrder::Last),
640                collation: None,
641                value_type: ValueType::String,
642            }],
643        };
644        resultset.order_by(Some(key_spec));
645
646        let mut write = resultset.write();
647        write.add(entity2.clone());
648        write.add(entity3.clone());
649        write.add(entity1.clone());
650        drop(write);
651
652        // Should be sorted by name, then by entity ID for tie-breaking
653        let read_guard = resultset.read();
654        let entities: Vec<_> = read_guard.iter_entities().collect();
655        assert_eq!(entities.len(), 3);
656        // Both Alice entities should come first (sorted by ID), then Bob
657        assert_eq!(entities[0].0, entity1.id); // Alice (earlier ID)
658        assert_eq!(entities[1].0, entity2.id); // Alice (later ID)
659        assert_eq!(entities[2].0, entity3.id); // Bob
660    }
661
662    #[test]
663    fn test_limit_functionality() {
664        let resultset = EntityResultSet::empty();
665
666        // Add some entities
667        let mut write = resultset.write();
668        for i in 0..5u8 {
669            let mut props = HashMap::new();
670            props.insert("value".to_string(), Value::I32(i as i32));
671            let entity = TestEntity::new(i, props);
672            write.add(entity);
673        }
674        drop(write);
675
676        assert_eq!(resultset.len(), 5);
677
678        // Apply limit
679        resultset.limit(Some(3));
680        assert_eq!(resultset.len(), 3);
681
682        // Remove limit
683        resultset.limit(None);
684        assert_eq!(resultset.len(), 3); // Should stay truncated
685    }
686
687    #[test]
688    fn test_dirty_tracking() {
689        let resultset = EntityResultSet::empty();
690
691        let mut props = HashMap::new();
692        props.insert("active".to_string(), Value::Bool(true));
693        let entity1 = TestEntity::new(1, props);
694
695        let mut props = HashMap::new();
696        props.insert("active".to_string(), Value::Bool(false));
697        let entity2 = TestEntity::new(2, props);
698
699        let mut write = resultset.write();
700        write.add(entity1.clone());
701        write.add(entity2.clone());
702
703        // Mark all dirty
704        write.mark_all_dirty();
705
706        // Retain only active entities
707        let removed = write.retain_dirty(|entity| entity.value("active") == Some(Value::Bool(true)));
708
709        drop(write);
710
711        assert_eq!(removed.len(), 1);
712        assert_eq!(removed[0], entity2.id);
713        assert_eq!(resultset.len(), 1);
714        assert_eq!(resultset.read().iter_entities().next().unwrap().0, entity1.id);
715    }
716
717    #[test]
718    fn test_write_guard_atomic_operations() {
719        let resultset = EntityResultSet::empty();
720
721        // Multiple operations in one write guard should be atomic
722        {
723            let mut write = resultset.write();
724            let entity1 = TestEntity::new(1, HashMap::new());
725            let entity2 = TestEntity::new(2, HashMap::new());
726
727            write.add(entity1);
728            write.add(entity2);
729
730            // Operations are visible within the same write guard
731            assert_eq!(write.iter_entities().count(), 2);
732            // Notification sent when write guard is dropped
733        }
734
735        // Operations should be visible after guard is dropped
736        assert_eq!(resultset.len(), 2);
737    }
738
739    #[test]
740    fn test_ivec_small_keys() {
741        // Test that small keys (<=16 bytes) use the Small variant
742        let small_key = IVec::from_slice(b"hello");
743        let another_small = IVec::from_slice(b"world");
744        let empty_key = IVec::from_slice(b"");
745
746        // Verify ordering works correctly with zero padding
747        assert!(small_key < another_small); // "hello" < "world"
748        assert!(empty_key < small_key); // empty sorts first
749
750        // Test that zero padding doesn't affect comparison
751        let key_ab = IVec::from_slice(b"ab");
752        let key_abc = IVec::from_slice(b"abc");
753        assert!(key_ab < key_abc); // "ab" < "abc" even with zero padding
754    }
755
756    #[test]
757    fn test_ivec_large_keys() {
758        // Test that large keys (>16 bytes) use the Large variant
759        let large_key = IVec::from_slice(&[1u8; 20]); // 20 bytes
760        let small_key = IVec::from_slice(&[1u8; 10]); // 10 bytes
761
762        // Verify they can be compared
763        assert!(small_key < large_key); // smaller array should sort first
764    }
765
766    #[test]
767    fn test_ivec_boundary() {
768        // Test the 16-byte boundary
769        let exactly_16 = IVec::from_slice(&[1u8; 16]);
770        let exactly_17 = IVec::from_slice(&[1u8; 17]);
771
772        // Both should work and be comparable
773        assert!(exactly_16 < exactly_17);
774
775        // Verify 16-byte keys use Small variant (this is implicit in the implementation)
776        match exactly_16 {
777            IVec::Small(_) => (), // Expected
778            IVec::Large(_) => panic!("16-byte key should use Small variant"),
779        }
780
781        match exactly_17 {
782            IVec::Large(_) => (), // Expected
783            IVec::Small(_) => panic!("17-byte key should use Large variant"),
784        }
785    }
786}
787
788fn fix_from<E: AbstractEntity>(st: &mut State<E>, start: usize) {
789    // Recompute indices for shifted tail
790    for i in start..st.order.len() {
791        let id = *st.order[i].entity.id();
792        st.index.insert(id, i);
793    }
794}
795
796impl<E: View> ResultSet<E> {
797    pub fn iter(&self) -> ResultSetIter<E> { ResultSetIter::new(self.clone()) }
798}
799
800impl<E: View> Clone for ResultSet<E> {
801    fn clone(&self) -> Self { Self(self.0.clone(), std::marker::PhantomData) }
802}
803
804impl<E: View> Default for ResultSet<E> {
805    fn default() -> Self {
806        let entity_resultset = EntityResultSet::empty();
807        Self(entity_resultset, std::marker::PhantomData)
808    }
809}
810
811impl<E: AbstractEntity> Signal for EntityResultSet<E> {
812    fn listen(&self, listener: Listener) -> ListenerGuard { ListenerGuard::new(self.0.broadcast.reference().listen(listener)) }
813    fn broadcast_id(&self) -> BroadcastId { self.0.broadcast.id() }
814}
815
816impl<R: View> Signal for ResultSet<R> {
817    fn listen(&self, listener: Listener) -> ListenerGuard { ListenerGuard::new(self.0 .0.broadcast.reference().listen(listener)) }
818
819    fn broadcast_id(&self) -> BroadcastId { self.0 .0.broadcast.id() }
820}
821
822impl<E: View + Clone + 'static> Get<Vec<E>> for ResultSet<E> {
823    fn get(&self) -> Vec<E> {
824        use ankurah_signals::CurrentObserver;
825        CurrentObserver::track(self);
826        self.0 .0.state.lock().unwrap().order.iter().map(|e| E::from_entity(e.entity.clone())).collect()
827    }
828}
829
830impl<E: View + Clone + 'static> Peek<Vec<E>> for ResultSet<E> {
831    fn peek(&self) -> Vec<E> { self.0 .0.state.lock().unwrap().order.iter().map(|e| E::from_entity(e.entity.clone())).collect() }
832}
833
834impl<E: View + Clone + 'static> Subscribe<Vec<E>> for ResultSet<E> {
835    fn subscribe<F>(&self, listener: F) -> SubscriptionGuard
836    where F: IntoSubscribeListener<Vec<E>> {
837        let listener = listener.into_subscribe_listener();
838        let me = self.clone();
839        let guard: ankurah_signals::broadcast::ListenerGuard<()> = self.0 .0.broadcast.reference().listen(move |_| {
840            let entities: Vec<E> = me.0 .0.state.lock().unwrap().order.iter().map(|e| E::from_entity(e.entity.clone())).collect();
841            listener(entities);
842        });
843        SubscriptionGuard::new(ListenerGuard::new(guard))
844    }
845}
846
847#[derive(Debug)]
848pub struct ResultSetIter<E: View> {
849    resultset: ResultSet<E>,
850    index: usize,
851}
852
853impl<E: View> ResultSetIter<E> {
854    fn new(resultset: ResultSet<E>) -> Self { Self { resultset, index: 0 } }
855}
856
857impl<E: View + Clone> Iterator for ResultSetIter<E> {
858    type Item = E;
859
860    fn next(&mut self) -> Option<Self::Item> {
861        // Track the underlying resultset using the CurrentObserver when iterating
862        use ankurah_signals::CurrentObserver;
863        CurrentObserver::track(&self.resultset);
864
865        let state = self.resultset.0 .0.state.lock().unwrap();
866        if self.index < state.order.len() {
867            let entity = &state.order[self.index].entity;
868            let view = E::from_entity(entity.clone());
869            self.index += 1;
870            Some(view)
871        } else {
872            None
873        }
874    }
875}
876
877impl<E: View + Clone> IntoIterator for ResultSet<E> {
878    type Item = E;
879    type IntoIter = ResultSetIter<E>;
880
881    fn into_iter(self) -> Self::IntoIter { ResultSetIter::new(self) }
882}
883
884impl<E: View + Clone> IntoIterator for &ResultSet<E> {
885    type Item = E;
886    type IntoIter = ResultSetIter<E>;
887
888    fn into_iter(self) -> Self::IntoIter { ResultSetIter::new(self.clone()) }
889}
890
891#[derive(Debug)]
892pub struct EntityResultSetKeyIterator {
893    keys: Vec<proto::EntityId>,
894    index: usize,
895}
896
897impl EntityResultSetKeyIterator {
898    fn new(keys: Vec<proto::EntityId>) -> Self { Self { keys, index: 0 } }
899}
900
901impl Iterator for EntityResultSetKeyIterator {
902    type Item = proto::EntityId;
903
904    fn next(&mut self) -> Option<Self::Item> {
905        if self.index < self.keys.len() {
906            let key = self.keys[self.index];
907            self.index += 1;
908            Some(key)
909        } else {
910            None
911        }
912    }
913}
914
915// Specific implementation for EntityResultSet<Entity> to provide map method
916impl EntityResultSet<Entity> {
917    pub fn wrap<R: View>(&self) -> ResultSet<R> { ResultSet(self.clone(), std::marker::PhantomData) }
918}