Skip to main content

ankurah_core/reactor/
fetch_gap.rs

1use crate::{
2    context::NodeAndContext,
3    error::RetrievalError,
4    node::{MatchArgs, Node, NodeInner},
5    policy::PolicyAgent,
6    reactor::AbstractEntity,
7    storage::StorageEngine,
8    value::{Value, ValueType},
9};
10use ankurah_proto as proto;
11use async_trait::async_trait;
12use std::sync::{Arc, Weak};
13
14/// Trait for fetching entities to fill gaps when LIMIT causes entities to be evicted
15#[async_trait]
16pub trait GapFetcher<E: AbstractEntity>: Send + Sync + 'static {
17    /// Fetch entities to fill a gap in a limited result set
18    ///
19    /// # Arguments
20    /// * `collection_id` - The collection to fetch from
21    /// * `selection` - The original selection (predicate, order_by, limit)
22    /// * `last_entity` - The last entity in the current result set (used to build continuation predicate)
23    /// * `gap_size` - Number of entities needed to fill the gap
24    ///
25    /// # Returns
26    /// Vector of entities that match the selection and come after `last_entity` in sort order
27    async fn fetch_gap(
28        &self,
29        collection_id: &proto::CollectionId,
30        selection: &ankql::ast::Selection,
31        last_entity: Option<&E>,
32        gap_size: usize,
33    ) -> Result<Vec<E>, RetrievalError>;
34}
35
36/// Concrete implementation of GapFetcher using a WeakNode and typed ContextData
37#[derive(Clone)]
38pub struct QueryGapFetcher<SE, PA>
39where
40    SE: StorageEngine,
41    PA: PolicyAgent,
42{
43    weak_node: Weak<NodeInner<SE, PA>>,
44    cdata: PA::ContextData,
45}
46
47impl<SE, PA> QueryGapFetcher<SE, PA>
48where
49    SE: StorageEngine,
50    PA: PolicyAgent,
51{
52    pub fn new(node: &Node<SE, PA>, cdata: PA::ContextData) -> Self { Self { weak_node: Arc::downgrade(&node.0), cdata } }
53}
54
55#[async_trait]
56impl<SE, PA> GapFetcher<crate::entity::Entity> for QueryGapFetcher<SE, PA>
57where
58    SE: StorageEngine + 'static,
59    PA: PolicyAgent + 'static,
60{
61    async fn fetch_gap(
62        &self,
63        collection_id: &proto::CollectionId,
64        selection: &ankql::ast::Selection,
65        last_entity: Option<&crate::entity::Entity>,
66        gap_size: usize,
67    ) -> Result<Vec<crate::entity::Entity>, RetrievalError> {
68        // Try to upgrade the weak reference to the node
69        let node_inner = self
70            .weak_node
71            .upgrade()
72            .ok_or_else(|| RetrievalError::storage(std::io::Error::other("Node has been dropped, cannot fill gap")))?;
73
74        // Create a Node wrapper and NodeAndContext
75        let node = Node(node_inner);
76        let node_context = NodeAndContext { node, cdata: self.cdata.clone() };
77
78        // Build gap predicate if we have a last entity
79        let gap_selection = if let Some(last) = last_entity {
80            let gap_predicate = if let Some(ref order_by) = selection.order_by {
81                build_continuation_predicate(&selection.predicate, order_by, last)
82                    .map_err(|e| RetrievalError::storage(std::io::Error::other(e)))?
83            } else {
84                selection.predicate.clone()
85            };
86
87            ankql::ast::Selection { predicate: gap_predicate, order_by: selection.order_by.clone(), limit: Some(gap_size as u64) }
88        } else {
89            // No last entity, just use original selection with gap_size limit
90            ankql::ast::Selection {
91                predicate: selection.predicate.clone(),
92                order_by: selection.order_by.clone(),
93                limit: Some(gap_size as u64),
94            }
95        };
96
97        let match_args = MatchArgs { selection: gap_selection, cached: false };
98
99        node_context.fetch_entities(collection_id, match_args).await
100    }
101}
102
103/// Build a supplemental predicate to fetch entities after the last entity in sort order
104///
105/// For ORDER BY a ASC, b DESC with last entity having a=5, b=10:
106/// Returns: a >= 5 AND b <= 10 AND NOT (id = last_entity.id)
107pub fn build_continuation_predicate<E: AbstractEntity>(
108    original_predicate: &ankql::ast::Predicate,
109    order_by: &[ankql::ast::OrderByItem],
110    last_entity: &E,
111) -> Result<ankql::ast::Predicate, String> {
112    use ankql::ast::{ComparisonOperator, Expr, Literal, OrderDirection, PathExpr, Predicate};
113
114    let mut gap_conditions = Vec::new();
115
116    // Add original predicate
117    gap_conditions.push(original_predicate.clone());
118
119    // Add ORDER BY continuation conditions
120    for order_item in order_by {
121        let field_name = order_item.path.property();
122
123        // Get the field value from the last entity
124        if let Some(field_value) = last_entity.value(field_name) {
125            let literal = match field_value {
126                Value::String(s) => Literal::String(s),
127                Value::I16(i) => Literal::I16(i),
128                Value::I32(i) => Literal::I32(i),
129                Value::I64(i) => Literal::I64(i),
130                Value::F64(f) => Literal::F64(f),
131                Value::Bool(b) => Literal::Bool(b),
132                Value::EntityId(id) => Literal::EntityId(id.into()),
133                // Skip Object, Binary, and Json for now - they're not commonly used in ORDER BY
134                Value::Object(_) | Value::Binary(_) | Value::Json(_) => continue,
135            };
136
137            let operator = match order_item.direction {
138                OrderDirection::Asc => ComparisonOperator::GreaterThanOrEqual,
139                OrderDirection::Desc => ComparisonOperator::LessThanOrEqual,
140            };
141
142            let condition = Predicate::Comparison {
143                left: Box::new(Expr::Path(order_item.path.clone())),
144                operator,
145                right: Box::new(Expr::Literal(literal)),
146            };
147
148            gap_conditions.push(condition);
149        }
150    }
151
152    // Add entity ID exclusion to avoid fetching the last entity again
153    let id_exclusion = Predicate::Comparison {
154        left: Box::new(Expr::Path(PathExpr::simple("id"))),
155        operator: ComparisonOperator::NotEqual,
156        right: Box::new(Expr::Literal(Literal::EntityId((*last_entity.id()).into()))),
157    };
158    gap_conditions.push(id_exclusion);
159
160    // Combine all conditions with AND
161    let result =
162        gap_conditions.into_iter().reduce(|acc, condition| Predicate::And(Box::new(acc), Box::new(condition))).unwrap_or(Predicate::True);
163
164    Ok(result)
165}
166
167/// Infer ValueType from the first non-null value in a collection of entities
168pub fn infer_value_type_for_field<E: AbstractEntity>(entities: &[E], field_name: &str) -> ValueType {
169    for entity in entities {
170        if let Some(value) = entity.value(field_name) {
171            return ValueType::of(&value);
172        }
173    }
174
175    // TODO: Get type from system catalog instead of defaulting to String
176    ValueType::String
177}
178
179#[cfg(test)]
180mod tests {
181    use super::*;
182    use crate::value::Value;
183    use ankql::ast::{OrderByItem, OrderDirection, PathExpr, Predicate};
184    use ankurah_derive::selection;
185    use ankurah_proto as proto;
186    use maplit::hashmap;
187    use std::collections::HashMap;
188    use std::sync::{Arc, Mutex};
189
190    #[derive(Debug, Clone)]
191    struct TestEntity {
192        id: proto::EntityId,
193        collection: proto::CollectionId,
194        data: Arc<Mutex<HashMap<String, Value>>>,
195    }
196
197    impl TestEntity {
198        fn new(id: u8, data: HashMap<String, Value>) -> Self {
199            let mut id_bytes = [0u8; 16];
200            id_bytes[15] = id;
201            Self {
202                id: proto::EntityId::from_bytes(id_bytes),
203                collection: proto::CollectionId::fixed_name("test"),
204                data: Arc::new(Mutex::new(data)),
205            }
206        }
207    }
208
209    impl AbstractEntity for TestEntity {
210        fn collection(&self) -> proto::CollectionId { self.collection.clone() }
211
212        fn id(&self) -> &proto::EntityId { &self.id }
213
214        fn value(&self, field: &str) -> Option<Value> { self.data.lock().unwrap().get(field).cloned() }
215    }
216
217    #[test]
218    fn test_build_gap_predicate_single_column_asc() {
219        let entity = TestEntity::new(1, hashmap!("name".to_string() => Value::String("John".to_string())));
220
221        let original_predicate = Predicate::True;
222        let order_by = vec![OrderByItem { path: PathExpr::simple("name"), direction: OrderDirection::Asc }];
223
224        let gap_predicate = build_continuation_predicate(&original_predicate, &order_by, &entity).unwrap();
225        let expected = ankurah_derive::selection!("true AND name >= 'John' AND id != {}", entity.id()).predicate;
226
227        assert_eq!(gap_predicate, expected);
228    }
229
230    #[test]
231    fn test_build_gap_predicate_multi_column() {
232        let entity =
233            TestEntity::new(2, hashmap!("name".to_string() => Value::String("John".to_string()), "age".to_string() => Value::I32(30)));
234
235        let original_predicate = Predicate::True;
236        let order_by = vec![
237            OrderByItem { path: PathExpr::simple("name"), direction: OrderDirection::Asc },
238            OrderByItem { path: PathExpr::simple("age"), direction: OrderDirection::Desc },
239        ];
240
241        let gap_predicate = build_continuation_predicate(&original_predicate, &order_by, &entity).unwrap();
242        let expected = selection!("true AND name >= 'John' AND age <= 30 AND id != {}", entity.id()).predicate;
243
244        assert_eq!(gap_predicate, expected);
245    }
246
247    #[test]
248    fn test_infer_value_type_for_field() {
249        let entities = vec![
250            TestEntity::new(1, hashmap!("name".to_string() => Value::String("Alice".to_string()))),
251            TestEntity::new(2, hashmap!("age".to_string() => Value::I32(25))),
252        ];
253
254        assert_eq!(infer_value_type_for_field(&entities, "name"), ValueType::String);
255        assert_eq!(infer_value_type_for_field(&entities, "age"), ValueType::I32);
256        assert_eq!(infer_value_type_for_field(&entities, "nonexistent"), ValueType::String);
257    }
258}