1use alloc::boxed::Box;
10use alloc::string::String;
11use alloc::vec::Vec;
12
13use crate::object_cache::{ObjectCache, ObjectRef};
14
15#[derive(Debug, Clone, Copy, PartialEq, Eq)]
17pub enum SortOrder {
18 Ascending,
20 Descending,
22}
23
24#[derive(Debug, Clone, PartialEq, Eq)]
26pub enum QueryError {
27 LimitTooLarge,
29 EmptyTopic,
31}
32
33impl core::fmt::Display for QueryError {
34 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
35 match self {
36 Self::LimitTooLarge => f.write_str("limit exceeds u32::MAX"),
37 Self::EmptyTopic => f.write_str("topic filter must be non-empty"),
38 }
39 }
40}
41
42#[cfg(feature = "std")]
43impl std::error::Error for QueryError {}
44
45pub type QueryResult = Vec<ObjectRef>;
47
48pub type FilterFn = Box<dyn Fn(&ObjectRef) -> bool + Send + Sync>;
50
51pub type SortKeyFn = Box<dyn Fn(&ObjectRef) -> Vec<u8> + Send + Sync>;
53
54pub struct Query {
56 topic_filter: Option<String>,
57 state_filter: Option<crate::object_cache::ObjectState>,
58 custom_filter: Option<FilterFn>,
59 sort: Option<(SortOrder, SortKeyFn)>,
60 limit: Option<usize>,
61}
62
63impl core::fmt::Debug for Query {
64 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
65 f.debug_struct("Query")
66 .field("topic_filter", &self.topic_filter)
67 .field("state_filter", &self.state_filter)
68 .field("has_custom_filter", &self.custom_filter.is_some())
69 .field("has_sort", &self.sort.is_some())
70 .field("limit", &self.limit)
71 .finish()
72 }
73}
74
75impl Default for Query {
76 fn default() -> Self {
77 Self::new()
78 }
79}
80
81impl Query {
82 #[must_use]
84 pub fn new() -> Self {
85 Self {
86 topic_filter: None,
87 state_filter: None,
88 custom_filter: None,
89 sort: None,
90 limit: None,
91 }
92 }
93
94 pub fn topic(mut self, topic: &str) -> Result<Self, QueryError> {
99 if topic.is_empty() {
100 return Err(QueryError::EmptyTopic);
101 }
102 self.topic_filter = Some(topic.into());
103 Ok(self)
104 }
105
106 #[must_use]
108 pub fn state(mut self, state: crate::object_cache::ObjectState) -> Self {
109 self.state_filter = Some(state);
110 self
111 }
112
113 #[must_use]
115 pub fn filter<F>(mut self, f: F) -> Self
116 where
117 F: Fn(&ObjectRef) -> bool + Send + Sync + 'static,
118 {
119 self.custom_filter = Some(Box::new(f));
120 self
121 }
122
123 #[must_use]
125 pub fn order_by<F>(mut self, order: SortOrder, key_fn: F) -> Self
126 where
127 F: Fn(&ObjectRef) -> Vec<u8> + Send + Sync + 'static,
128 {
129 self.sort = Some((order, Box::new(key_fn)));
130 self
131 }
132
133 pub fn limit(mut self, limit: usize) -> Result<Self, QueryError> {
138 if limit > u32::MAX as usize {
139 return Err(QueryError::LimitTooLarge);
140 }
141 self.limit = Some(limit);
142 Ok(self)
143 }
144
145 pub fn execute(&self, cache: &ObjectCache) -> QueryResult {
147 let mut out: Vec<ObjectRef> = cache
148 .iter()
149 .filter(|o| match &self.topic_filter {
150 Some(t) => o.id.topic == *t,
151 None => true,
152 })
153 .filter(|o| match &self.state_filter {
154 Some(s) => o.lifecycle == *s,
155 None => true,
156 })
157 .filter(|o| match &self.custom_filter {
158 Some(f) => f(o),
159 None => true,
160 })
161 .cloned()
162 .collect();
163 if let Some((order, key_fn)) = &self.sort {
164 out.sort_by_key(|o| key_fn(o));
165 if matches!(order, SortOrder::Descending) {
166 out.reverse();
167 }
168 }
169 if let Some(n) = self.limit {
170 out.truncate(n);
171 }
172 out
173 }
174}
175
176#[cfg(test)]
177#[allow(clippy::expect_used, clippy::unwrap_used, clippy::panic)]
178mod tests {
179 use super::*;
180 use crate::object_cache::{ObjectCache, ObjectId};
181
182 fn populate(c: &mut ObjectCache) {
183 c.register(
184 ObjectId::new("Trade".into(), b"AAPL".to_vec()),
185 alloc::vec![3],
186 );
187 c.register(
188 ObjectId::new("Trade".into(), b"GOOG".to_vec()),
189 alloc::vec![1],
190 );
191 c.register(
192 ObjectId::new("Quote".into(), b"AAPL".to_vec()),
193 alloc::vec![2],
194 );
195 }
196
197 #[test]
198 fn empty_query_returns_all() {
199 let mut c = ObjectCache::new();
200 populate(&mut c);
201 let r = Query::new().execute(&c);
202 assert_eq!(r.len(), 3);
203 }
204
205 #[test]
206 fn topic_filter_narrows_result() {
207 let mut c = ObjectCache::new();
208 populate(&mut c);
209 let r = Query::new().topic("Trade").unwrap().execute(&c);
210 assert_eq!(r.len(), 2);
211 assert!(r.iter().all(|o| o.id.topic == "Trade"));
212 }
213
214 #[test]
215 fn empty_topic_rejected() {
216 let err = Query::new().topic("").unwrap_err();
217 assert_eq!(err, QueryError::EmptyTopic);
218 }
219
220 #[test]
221 fn limit_caps_result_size() {
222 let mut c = ObjectCache::new();
223 populate(&mut c);
224 let r = Query::new().limit(2).unwrap().execute(&c);
225 assert_eq!(r.len(), 2);
226 }
227
228 #[test]
229 fn order_by_sorts_ascending() {
230 let mut c = ObjectCache::new();
231 populate(&mut c);
232 let r = Query::new()
233 .order_by(SortOrder::Ascending, |o| o.id.key.clone())
234 .execute(&c);
235 assert_eq!(r[0].id.key, b"AAPL"); }
237
238 #[test]
239 fn order_by_descending_reverses() {
240 let mut c = ObjectCache::new();
241 populate(&mut c);
242 let r_asc = Query::new()
243 .order_by(SortOrder::Ascending, |o| o.id.key.clone())
244 .execute(&c);
245 let r_desc = Query::new()
246 .order_by(SortOrder::Descending, |o| o.id.key.clone())
247 .execute(&c);
248 assert_eq!(r_asc[0].id.key, r_desc[r_desc.len() - 1].id.key);
249 }
250
251 #[test]
252 fn custom_filter_applied() {
253 let mut c = ObjectCache::new();
254 populate(&mut c);
255 let r = Query::new()
256 .filter(|o| o.state == alloc::vec![3])
257 .execute(&c);
258 assert_eq!(r.len(), 1);
259 assert_eq!(r[0].id.key, b"AAPL");
260 }
261
262 #[test]
263 fn limit_too_large_rejected() {
264 let err = Query::new().limit(u32::MAX as usize + 1).unwrap_err();
265 assert_eq!(err, QueryError::LimitTooLarge);
266 }
267
268 #[test]
269 fn state_filter_only_returns_matching() {
270 let mut c = ObjectCache::new();
271 populate(&mut c);
272 let r = Query::new()
273 .state(crate::object_cache::ObjectState::New)
274 .execute(&c);
275 assert_eq!(r.len(), 3);
276 }
277}