1use crate::{
2 context::NodeAndContext,
3 error::RetrievalError,
4 node::{MatchArgs, Node, NodeInner},
5 policy::PolicyAgent,
6 reactor::AbstractEntity,
7 storage::StorageEngine,
8 value::{Value, ValueType},
9};
10use ankurah_proto as proto;
11use async_trait::async_trait;
12use std::sync::{Arc, Weak};
13
14#[async_trait]
16pub trait GapFetcher<E: AbstractEntity>: Send + Sync + 'static {
17 async fn fetch_gap(
28 &self,
29 collection_id: &proto::CollectionId,
30 selection: &ankql::ast::Selection,
31 last_entity: Option<&E>,
32 gap_size: usize,
33 ) -> Result<Vec<E>, RetrievalError>;
34}
35
36#[derive(Clone)]
38pub struct QueryGapFetcher<SE, PA>
39where
40 SE: StorageEngine,
41 PA: PolicyAgent,
42{
43 weak_node: Weak<NodeInner<SE, PA>>,
44 cdata: PA::ContextData,
45}
46
47impl<SE, PA> QueryGapFetcher<SE, PA>
48where
49 SE: StorageEngine,
50 PA: PolicyAgent,
51{
52 pub fn new(node: &Node<SE, PA>, cdata: PA::ContextData) -> Self { Self { weak_node: Arc::downgrade(&node.0), cdata } }
53}
54
55#[async_trait]
56impl<SE, PA> GapFetcher<crate::entity::Entity> for QueryGapFetcher<SE, PA>
57where
58 SE: StorageEngine + 'static,
59 PA: PolicyAgent + 'static,
60{
61 async fn fetch_gap(
62 &self,
63 collection_id: &proto::CollectionId,
64 selection: &ankql::ast::Selection,
65 last_entity: Option<&crate::entity::Entity>,
66 gap_size: usize,
67 ) -> Result<Vec<crate::entity::Entity>, RetrievalError> {
68 let node_inner = self
70 .weak_node
71 .upgrade()
72 .ok_or_else(|| RetrievalError::storage(std::io::Error::other("Node has been dropped, cannot fill gap")))?;
73
74 let node = Node(node_inner);
76 let node_context = NodeAndContext { node, cdata: self.cdata.clone() };
77
78 let gap_selection = if let Some(last) = last_entity {
80 let gap_predicate = if let Some(ref order_by) = selection.order_by {
81 build_continuation_predicate(&selection.predicate, order_by, last)
82 .map_err(|e| RetrievalError::storage(std::io::Error::other(e)))?
83 } else {
84 selection.predicate.clone()
85 };
86
87 ankql::ast::Selection { predicate: gap_predicate, order_by: selection.order_by.clone(), limit: Some(gap_size as u64) }
88 } else {
89 ankql::ast::Selection {
91 predicate: selection.predicate.clone(),
92 order_by: selection.order_by.clone(),
93 limit: Some(gap_size as u64),
94 }
95 };
96
97 let match_args = MatchArgs { selection: gap_selection, cached: false };
98
99 node_context.fetch_entities(collection_id, match_args).await
100 }
101}
102
103pub fn build_continuation_predicate<E: AbstractEntity>(
108 original_predicate: &ankql::ast::Predicate,
109 order_by: &[ankql::ast::OrderByItem],
110 last_entity: &E,
111) -> Result<ankql::ast::Predicate, String> {
112 use ankql::ast::{ComparisonOperator, Expr, Literal, OrderDirection, PathExpr, Predicate};
113
114 let mut gap_conditions = Vec::new();
115
116 gap_conditions.push(original_predicate.clone());
118
119 for order_item in order_by {
121 let field_name = order_item.path.property();
122
123 if let Some(field_value) = last_entity.value(field_name) {
125 let literal = match field_value {
126 Value::String(s) => Literal::String(s),
127 Value::I16(i) => Literal::I16(i),
128 Value::I32(i) => Literal::I32(i),
129 Value::I64(i) => Literal::I64(i),
130 Value::F64(f) => Literal::F64(f),
131 Value::Bool(b) => Literal::Bool(b),
132 Value::EntityId(id) => Literal::EntityId(id.into()),
133 Value::Object(_) | Value::Binary(_) | Value::Json(_) => continue,
135 };
136
137 let operator = match order_item.direction {
138 OrderDirection::Asc => ComparisonOperator::GreaterThanOrEqual,
139 OrderDirection::Desc => ComparisonOperator::LessThanOrEqual,
140 };
141
142 let condition = Predicate::Comparison {
143 left: Box::new(Expr::Path(order_item.path.clone())),
144 operator,
145 right: Box::new(Expr::Literal(literal)),
146 };
147
148 gap_conditions.push(condition);
149 }
150 }
151
152 let id_exclusion = Predicate::Comparison {
154 left: Box::new(Expr::Path(PathExpr::simple("id"))),
155 operator: ComparisonOperator::NotEqual,
156 right: Box::new(Expr::Literal(Literal::EntityId((*last_entity.id()).into()))),
157 };
158 gap_conditions.push(id_exclusion);
159
160 let result =
162 gap_conditions.into_iter().reduce(|acc, condition| Predicate::And(Box::new(acc), Box::new(condition))).unwrap_or(Predicate::True);
163
164 Ok(result)
165}
166
167pub fn infer_value_type_for_field<E: AbstractEntity>(entities: &[E], field_name: &str) -> ValueType {
169 for entity in entities {
170 if let Some(value) = entity.value(field_name) {
171 return ValueType::of(&value);
172 }
173 }
174
175 ValueType::String
177}
178
179#[cfg(test)]
180mod tests {
181 use super::*;
182 use crate::value::Value;
183 use ankql::ast::{OrderByItem, OrderDirection, PathExpr, Predicate};
184 use ankurah_derive::selection;
185 use ankurah_proto as proto;
186 use maplit::hashmap;
187 use std::collections::HashMap;
188 use std::sync::{Arc, Mutex};
189
190 #[derive(Debug, Clone)]
191 struct TestEntity {
192 id: proto::EntityId,
193 collection: proto::CollectionId,
194 data: Arc<Mutex<HashMap<String, Value>>>,
195 }
196
197 impl TestEntity {
198 fn new(id: u8, data: HashMap<String, Value>) -> Self {
199 let mut id_bytes = [0u8; 16];
200 id_bytes[15] = id;
201 Self {
202 id: proto::EntityId::from_bytes(id_bytes),
203 collection: proto::CollectionId::fixed_name("test"),
204 data: Arc::new(Mutex::new(data)),
205 }
206 }
207 }
208
209 impl AbstractEntity for TestEntity {
210 fn collection(&self) -> proto::CollectionId { self.collection.clone() }
211
212 fn id(&self) -> &proto::EntityId { &self.id }
213
214 fn value(&self, field: &str) -> Option<Value> { self.data.lock().unwrap().get(field).cloned() }
215 }
216
217 #[test]
218 fn test_build_gap_predicate_single_column_asc() {
219 let entity = TestEntity::new(1, hashmap!("name".to_string() => Value::String("John".to_string())));
220
221 let original_predicate = Predicate::True;
222 let order_by = vec![OrderByItem { path: PathExpr::simple("name"), direction: OrderDirection::Asc }];
223
224 let gap_predicate = build_continuation_predicate(&original_predicate, &order_by, &entity).unwrap();
225 let expected = ankurah_derive::selection!("true AND name >= 'John' AND id != {}", entity.id()).predicate;
226
227 assert_eq!(gap_predicate, expected);
228 }
229
230 #[test]
231 fn test_build_gap_predicate_multi_column() {
232 let entity =
233 TestEntity::new(2, hashmap!("name".to_string() => Value::String("John".to_string()), "age".to_string() => Value::I32(30)));
234
235 let original_predicate = Predicate::True;
236 let order_by = vec![
237 OrderByItem { path: PathExpr::simple("name"), direction: OrderDirection::Asc },
238 OrderByItem { path: PathExpr::simple("age"), direction: OrderDirection::Desc },
239 ];
240
241 let gap_predicate = build_continuation_predicate(&original_predicate, &order_by, &entity).unwrap();
242 let expected = selection!("true AND name >= 'John' AND age <= 30 AND id != {}", entity.id()).predicate;
243
244 assert_eq!(gap_predicate, expected);
245 }
246
247 #[test]
248 fn test_infer_value_type_for_field() {
249 let entities = vec![
250 TestEntity::new(1, hashmap!("name".to_string() => Value::String("Alice".to_string()))),
251 TestEntity::new(2, hashmap!("age".to_string() => Value::I32(25))),
252 ];
253
254 assert_eq!(infer_value_type_for_field(&entities, "name"), ValueType::String);
255 assert_eq!(infer_value_type_for_field(&entities, "age"), ValueType::I32);
256 assert_eq!(infer_value_type_for_field(&entities, "nonexistent"), ValueType::String);
257 }
258}