Skip to main content

zerodds_dlrl/
query.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2026 ZeroDDS Contributors
3
4//! Query engine — DDS 1.4 §B.7.
5//!
6//! Spec §B.7: DLRL offers object-centric queries with filter, order,
7//! and limit over the object cache.
8
9use alloc::boxed::Box;
10use alloc::string::String;
11use alloc::vec::Vec;
12
13use crate::object_cache::{ObjectCache, ObjectRef};
14
15/// Sort order.
16#[derive(Debug, Clone, Copy, PartialEq, Eq)]
17pub enum SortOrder {
18    /// Ascending.
19    Ascending,
20    /// Descending.
21    Descending,
22}
23
24/// Query error.
25#[derive(Debug, Clone, PartialEq, Eq)]
26pub enum QueryError {
27    /// Limit exceeds u32::MAX.
28    LimitTooLarge,
29    /// Topic filter is an empty string (the spec forbids that).
30    EmptyTopic,
31}
32
33impl core::fmt::Display for QueryError {
34    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
35        match self {
36            Self::LimitTooLarge => f.write_str("limit exceeds u32::MAX"),
37            Self::EmptyTopic => f.write_str("topic filter must be non-empty"),
38        }
39    }
40}
41
42#[cfg(feature = "std")]
43impl std::error::Error for QueryError {}
44
45/// Query result — slice-like list of ObjectRefs.
46pub type QueryResult = Vec<ObjectRef>;
47
48/// Custom filter function.
49pub type FilterFn = Box<dyn Fn(&ObjectRef) -> bool + Send + Sync>;
50
51/// Sort-key extraction function.
52pub type SortKeyFn = Box<dyn Fn(&ObjectRef) -> Vec<u8> + Send + Sync>;
53
54/// Query builder. Spec §B.7.
55pub struct Query {
56    topic_filter: Option<String>,
57    state_filter: Option<crate::object_cache::ObjectState>,
58    custom_filter: Option<FilterFn>,
59    sort: Option<(SortOrder, SortKeyFn)>,
60    limit: Option<usize>,
61}
62
63impl core::fmt::Debug for Query {
64    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
65        f.debug_struct("Query")
66            .field("topic_filter", &self.topic_filter)
67            .field("state_filter", &self.state_filter)
68            .field("has_custom_filter", &self.custom_filter.is_some())
69            .field("has_sort", &self.sort.is_some())
70            .field("limit", &self.limit)
71            .finish()
72    }
73}
74
75impl Default for Query {
76    fn default() -> Self {
77        Self::new()
78    }
79}
80
81impl Query {
82    /// Constructor.
83    #[must_use]
84    pub fn new() -> Self {
85        Self {
86            topic_filter: None,
87            state_filter: None,
88            custom_filter: None,
89            sort: None,
90            limit: None,
91        }
92    }
93
94    /// Topic filter — only objects of this topic.
95    ///
96    /// # Errors
97    /// `EmptyTopic` if the filter is an empty string.
98    pub fn topic(mut self, topic: &str) -> Result<Self, QueryError> {
99        if topic.is_empty() {
100            return Err(QueryError::EmptyTopic);
101        }
102        self.topic_filter = Some(topic.into());
103        Ok(self)
104    }
105
106    /// State filter.
107    #[must_use]
108    pub fn state(mut self, state: crate::object_cache::ObjectState) -> Self {
109        self.state_filter = Some(state);
110        self
111    }
112
113    /// Custom filter.
114    #[must_use]
115    pub fn filter<F>(mut self, f: F) -> Self
116    where
117        F: Fn(&ObjectRef) -> bool + Send + Sync + 'static,
118    {
119        self.custom_filter = Some(Box::new(f));
120        self
121    }
122
123    /// Sorting — the caller supplies a key-extraction function.
124    #[must_use]
125    pub fn order_by<F>(mut self, order: SortOrder, key_fn: F) -> Self
126    where
127        F: Fn(&ObjectRef) -> Vec<u8> + Send + Sync + 'static,
128    {
129        self.sort = Some((order, Box::new(key_fn)));
130        self
131    }
132
133    /// Limit.
134    ///
135    /// # Errors
136    /// `LimitTooLarge` if `limit > u32::MAX`.
137    pub fn limit(mut self, limit: usize) -> Result<Self, QueryError> {
138        if limit > u32::MAX as usize {
139            return Err(QueryError::LimitTooLarge);
140        }
141        self.limit = Some(limit);
142        Ok(self)
143    }
144
145    /// Spec §B.7.1 — execute the query against an ObjectCache.
146    pub fn execute(&self, cache: &ObjectCache) -> QueryResult {
147        let mut out: Vec<ObjectRef> = cache
148            .iter()
149            .filter(|o| match &self.topic_filter {
150                Some(t) => o.id.topic == *t,
151                None => true,
152            })
153            .filter(|o| match &self.state_filter {
154                Some(s) => o.lifecycle == *s,
155                None => true,
156            })
157            .filter(|o| match &self.custom_filter {
158                Some(f) => f(o),
159                None => true,
160            })
161            .cloned()
162            .collect();
163        if let Some((order, key_fn)) = &self.sort {
164            out.sort_by_key(|o| key_fn(o));
165            if matches!(order, SortOrder::Descending) {
166                out.reverse();
167            }
168        }
169        if let Some(n) = self.limit {
170            out.truncate(n);
171        }
172        out
173    }
174}
175
176#[cfg(test)]
177#[allow(clippy::expect_used, clippy::unwrap_used, clippy::panic)]
178mod tests {
179    use super::*;
180    use crate::object_cache::{ObjectCache, ObjectId};
181
182    fn populate(c: &mut ObjectCache) {
183        c.register(
184            ObjectId::new("Trade".into(), b"AAPL".to_vec()),
185            alloc::vec![3],
186        );
187        c.register(
188            ObjectId::new("Trade".into(), b"GOOG".to_vec()),
189            alloc::vec![1],
190        );
191        c.register(
192            ObjectId::new("Quote".into(), b"AAPL".to_vec()),
193            alloc::vec![2],
194        );
195    }
196
197    #[test]
198    fn empty_query_returns_all() {
199        let mut c = ObjectCache::new();
200        populate(&mut c);
201        let r = Query::new().execute(&c);
202        assert_eq!(r.len(), 3);
203    }
204
205    #[test]
206    fn topic_filter_narrows_result() {
207        let mut c = ObjectCache::new();
208        populate(&mut c);
209        let r = Query::new().topic("Trade").unwrap().execute(&c);
210        assert_eq!(r.len(), 2);
211        assert!(r.iter().all(|o| o.id.topic == "Trade"));
212    }
213
214    #[test]
215    fn empty_topic_rejected() {
216        let err = Query::new().topic("").unwrap_err();
217        assert_eq!(err, QueryError::EmptyTopic);
218    }
219
220    #[test]
221    fn limit_caps_result_size() {
222        let mut c = ObjectCache::new();
223        populate(&mut c);
224        let r = Query::new().limit(2).unwrap().execute(&c);
225        assert_eq!(r.len(), 2);
226    }
227
228    #[test]
229    fn order_by_sorts_ascending() {
230        let mut c = ObjectCache::new();
231        populate(&mut c);
232        let r = Query::new()
233            .order_by(SortOrder::Ascending, |o| o.id.key.clone())
234            .execute(&c);
235        assert_eq!(r[0].id.key, b"AAPL"); // appears in both Trade & Quote, stable sort
236    }
237
238    #[test]
239    fn order_by_descending_reverses() {
240        let mut c = ObjectCache::new();
241        populate(&mut c);
242        let r_asc = Query::new()
243            .order_by(SortOrder::Ascending, |o| o.id.key.clone())
244            .execute(&c);
245        let r_desc = Query::new()
246            .order_by(SortOrder::Descending, |o| o.id.key.clone())
247            .execute(&c);
248        assert_eq!(r_asc[0].id.key, r_desc[r_desc.len() - 1].id.key);
249    }
250
251    #[test]
252    fn custom_filter_applied() {
253        let mut c = ObjectCache::new();
254        populate(&mut c);
255        let r = Query::new()
256            .filter(|o| o.state == alloc::vec![3])
257            .execute(&c);
258        assert_eq!(r.len(), 1);
259        assert_eq!(r[0].id.key, b"AAPL");
260    }
261
262    #[test]
263    fn limit_too_large_rejected() {
264        let err = Query::new().limit(u32::MAX as usize + 1).unwrap_err();
265        assert_eq!(err, QueryError::LimitTooLarge);
266    }
267
268    #[test]
269    fn state_filter_only_returns_matching() {
270        let mut c = ObjectCache::new();
271        populate(&mut c);
272        let r = Query::new()
273            .state(crate::object_cache::ObjectState::New)
274            .execute(&c);
275        assert_eq!(r.len(), 3);
276    }
277}