sentinel_dbms/collection/query.rs
1use async_stream::stream;
2use futures::{StreamExt as _, TryStreamExt as _};
3use serde_json::Value;
4use tokio::fs as tokio_fs;
5use tokio_stream::Stream;
6use tracing::{debug, trace};
7
8use crate::{
9 comparison::compare_values,
10 filtering::matches_filters,
11 projection::project_document,
12 streaming::stream_document_ids,
13 Document,
14 Result,
15 SentinelError,
16};
17use super::coll::Collection;
18
19#[allow(
20 clippy::multiple_inherent_impl,
21 reason = "multiple impl blocks for Collection are intentional for organization"
22)]
23impl Collection {
24 /// Executes a structured query against the collection.
25 ///
26 /// This method supports complex filtering, sorting, pagination, and field projection.
27 /// For optimal performance and memory usage:
28 /// - Queries without sorting use streaming processing with early limit application
29 /// - Queries with sorting collect filtered documents in memory for sorting
30 /// - Projection is applied only to final results to minimize memory usage
31 ///
32 /// By default, this method verifies both hash and signature with strict mode.
33 /// Use `query_with_verification()` to customize verification behavior.
34 ///
35 /// # Arguments
36 ///
37 /// * `query` - The query to execute
38 ///
39 /// # Returns
40 ///
41 /// Returns a `QueryResult` containing the matching documents and metadata.
42 ///
43 /// # Example
44 ///
45 /// ```rust
46 /// use sentinel_dbms::{Store, Collection, QueryBuilder, Operator, SortOrder};
47 /// use serde_json::json;
48 ///
49 /// # async fn example() -> sentinel_dbms::Result<()> {
50 /// let store = Store::new("/path/to/data", None).await?;
51 /// let collection = store.collection("users").await?;
52 ///
53 /// // Insert test data
54 /// collection.insert("user-1", json!({"name": "Alice", "age": 25, "city": "NYC"})).await?;
55 /// collection.insert("user-2", json!({"name": "Bob", "age": 30, "city": "LA"})).await?;
56 /// collection.insert("user-3", json!({"name": "Charlie", "age": 35, "city": "NYC"})).await?;
57 ///
58 /// // Query for users in NYC, sorted by age, limit 2
59 /// let query = QueryBuilder::new()
60 /// .filter("city", Operator::Equals, json!("NYC"))
61 /// .sort("age", SortOrder::Ascending)
62 /// .limit(2)
63 /// .projection(vec!["name", "age"])
64 /// .build();
65 ///
66 /// let result = collection.query(query).await?;
67 /// let documents: Vec<_> = futures::TryStreamExt::try_collect(result.documents).await?;
68 /// assert_eq!(documents.len(), 2);
69 /// # Ok(())
70 /// # }
71 /// ```
72 pub async fn query(&self, query: crate::Query) -> Result<crate::QueryResult> {
73 self.query_with_verification(query, &crate::VerificationOptions::default())
74 .await
75 }
76
77 /// Executes a structured query against the collection with custom verification options.
78 ///
79 /// This method supports complex filtering, sorting, pagination, and field projection.
80 /// For optimal performance and memory usage:
81 /// - Queries without sorting use streaming processing with early limit application
82 /// - Queries with sorting collect filtered documents in memory for sorting
83 /// - Projection is applied only to final results to minimize memory usage
84 ///
85 /// # Arguments
86 ///
87 /// * `query` - The query to execute
88 /// * `options` - Verification options controlling hash and signature verification.
89 ///
90 /// # Returns
91 ///
92 /// Returns a `QueryResult` containing the matching documents and metadata.
93 ///
94 /// # Example
95 ///
96 /// ```rust
97 /// use sentinel_dbms::{Store, Collection, QueryBuilder, Operator, SortOrder, VerificationOptions, VerificationMode};
98 /// use serde_json::json;
99 ///
100 /// # async fn example() -> sentinel_dbms::Result<()> {
101 /// let store = Store::new("/path/to/data", None).await?;
102 /// let collection = store.collection("users").await?;
103 ///
104 /// // Insert test data
105 /// collection.insert("user-1", json!({"name": "Alice", "age": 25, "city": "NYC"})).await?;
106 /// collection.insert("user-2", json!({"name": "Bob", "age": 30, "city": "LA"})).await?;
107 /// collection.insert("user-3", json!({"name": "Charlie", "age": 35, "city": "NYC"})).await?;
108 ///
109 /// // Query with warning mode
110 /// let options = VerificationOptions::warn();
111 /// let query = QueryBuilder::new()
112 /// .filter("city", Operator::Equals, json!("NYC"))
113 /// .sort("age", SortOrder::Ascending)
114 /// .limit(2)
115 /// .projection(vec!["name", "age"])
116 /// .build();
117 ///
118 /// let result = collection.query_with_verification(query, &options).await?;
119 /// let documents: Vec<_> = futures::TryStreamExt::try_collect(result.documents).await?;
120 /// assert_eq!(documents.len(), 2);
121 /// # Ok(())
122 /// # }
123 /// ```
124 pub async fn query_with_verification(
125 &self,
126 query: crate::Query,
127 options: &crate::VerificationOptions,
128 ) -> Result<crate::QueryResult> {
129 use std::time::Instant;
130 let start_time = Instant::now();
131
132 trace!(
133 "Executing query on collection: {} (verification enabled: {})",
134 self.name(),
135 options.verify_signature || options.verify_hash
136 );
137
138 // Get all document IDs - but for full streaming, we should avoid this
139 // However, for sorted queries, we need to know all IDs to collect
140 // For non-sorted, we can stream without knowing all IDs
141 let documents_stream = if query.sort.is_some() {
142 // For sorted queries, we need to collect all matching documents
143 let all_ids: Vec<String> = self.list().try_collect().await?;
144 let docs = self
145 .execute_sorted_query_with_verification(&all_ids, &query, options)
146 .await?;
147 let stream = tokio_stream::iter(docs.into_iter().map(Ok));
148 Box::pin(stream) as std::pin::Pin<Box<dyn Stream<Item = Result<Document>> + Send>>
149 }
150 else {
151 // For non-sorted queries, use streaming
152 self.execute_streaming_query_with_verification(&query, options)
153 .await?
154 };
155
156 let execution_time = start_time.elapsed();
157 debug!("Query completed in {:?}", execution_time);
158
159 Ok(crate::QueryResult {
160 documents: documents_stream,
161 total_count: None, // For streaming, we don't know the total count upfront
162 execution_time,
163 })
164 }
165
166 /// Executes a query that requires sorting by collecting all matching documents first with
167 /// verification.
168 async fn execute_sorted_query_with_verification(
169 &self,
170 all_ids: &[String],
171 query: &crate::Query,
172 options: &crate::VerificationOptions,
173 ) -> Result<Vec<Document>> {
174 // For sorted queries, we need to collect all matching documents to sort them
175 // But we can optimize by only keeping document IDs and sort values during filtering
176 let mut matching_docs = Vec::new();
177
178 // Precompute filter references to avoid allocating a new Vec for each document
179 let filter_refs: Vec<_> = query.filters.iter().collect();
180
181 for id in all_ids {
182 if let Some(doc) = self.get_with_verification(id, options).await? &&
183 matches_filters(&doc, &filter_refs)
184 {
185 matching_docs.push(doc);
186 }
187 }
188
189 if let Some(ref inner) = query.sort {
190 let field = &inner.0;
191 let order = &inner.1;
192 matching_docs.sort_by(|a, b| {
193 let a_val = a.data().get(field.as_str());
194 let b_val = b.data().get(field.as_str());
195 if *order == crate::SortOrder::Ascending {
196 self.compare_values(a_val, b_val)
197 }
198 else {
199 self.compare_values(b_val, a_val)
200 }
201 });
202 }
203
204 // Apply offset and limit
205 let offset = query.offset.unwrap_or(0);
206 let start_idx = offset.min(matching_docs.len());
207 let end_idx = query.limit.map_or(matching_docs.len(), |limit| {
208 start_idx.saturating_add(limit).min(matching_docs.len())
209 });
210
211 // Apply projection to the final results
212 let mut final_docs = Vec::new();
213 for doc in matching_docs
214 .into_iter()
215 .skip(start_idx)
216 .take(end_idx.saturating_sub(start_idx))
217 {
218 let projected_doc = if let Some(ref fields) = query.projection {
219 self.project_document(&doc, fields).await?
220 }
221 else {
222 doc
223 };
224 final_docs.push(projected_doc);
225 }
226
227 Ok(final_docs)
228 }
229
230 /// Executes a query without sorting, allowing streaming with early limit application and
231 /// verification.
232 async fn execute_streaming_query_with_verification(
233 &self,
234 query: &crate::Query,
235 options: &crate::VerificationOptions,
236 ) -> Result<std::pin::Pin<Box<dyn Stream<Item = Result<Document>> + Send>>> {
237 let collection_path = self.path.clone();
238 let signing_key = self.signing_key.clone();
239 let filters = query.filters.clone();
240 let projection_fields = query.projection.clone();
241 let limit = query.limit.unwrap_or(usize::MAX);
242 let offset = query.offset.unwrap_or(0);
243 let options = *options;
244
245 Ok(Box::pin(stream! {
246 let mut id_stream = stream_document_ids(collection_path.clone());
247 let mut yielded = 0;
248 let mut skipped = 0;
249
250 // Precompute filter references to avoid allocating a new Vec for each document
251 let filter_refs: Vec<_> = filters.iter().collect();
252
253 while let Some(id_result) = id_stream.next().await {
254 let id = match id_result {
255 Ok(id) => id,
256 Err(e) => {
257 yield Err(e);
258 continue;
259 }
260 };
261
262 // Load document
263 let file_path = collection_path.join(format!("{}.json", id));
264 let content = match tokio_fs::read_to_string(&file_path).await {
265 Ok(content) => content,
266 Err(e) => {
267 yield Err(e.into());
268 continue;
269 }
270 };
271
272 let doc = match serde_json::from_str::<Document>(&content) {
273 Ok(doc) => {
274 // Create a new document with the correct ID
275 let mut doc_with_id = doc;
276 doc_with_id.id = id.clone();
277
278 let collection_ref = Self {
279 path: collection_path.clone(),
280 created_at: chrono::Utc::now(),
281 updated_at: std::sync::RwLock::new(chrono::Utc::now()),
282 last_checkpoint_at: std::sync::RwLock::new(None),
283 total_documents: std::sync::Arc::new(std::sync::atomic::AtomicU64::new(0)),
284 total_size_bytes: std::sync::Arc::new(std::sync::atomic::AtomicU64::new(0)),
285 signing_key: signing_key.clone(),
286 stored_wal_config: sentinel_wal::CollectionWalConfig::default(),
287 wal_manager: None,
288 wal_config: sentinel_wal::CollectionWalConfig::default(),
289 event_sender: None,
290 event_task: None,
291 recovery_mode: std::sync::atomic::AtomicBool::new(false),
292 };
293
294 if let Err(e) = collection_ref.verify_document(&doc_with_id, &options).await {
295 if matches!(e, SentinelError::HashVerificationFailed { .. } | SentinelError::SignatureVerificationFailed { .. }) {
296 if options.hash_verification_mode == crate::VerificationMode::Strict
297 || options.signature_verification_mode == crate::VerificationMode::Strict
298 || options.empty_signature_mode == crate::VerificationMode::Strict
299 {
300 yield Err(e);
301 continue;
302 }
303 } else {
304 yield Err(e);
305 continue;
306 }
307 }
308
309 doc_with_id
310 }
311 Err(e) => {
312 yield Err(e.into());
313 continue;
314 }
315 };
316
317 if matches_filters(&doc, &filter_refs) {
318 if skipped < offset {
319 skipped = skipped.saturating_add(1);
320 continue;
321 }
322 if yielded >= limit {
323 break;
324 }
325 let final_doc = if let Some(ref fields) = projection_fields {
326 project_document(&doc, fields).await?
327 } else {
328 doc
329 };
330 yield Ok(final_doc);
331 yielded = yielded.saturating_add(1);
332 }
333 }
334 }))
335 }
336
337 /// Compares two values for sorting purposes.
338 fn compare_values(&self, a: Option<&Value>, b: Option<&Value>) -> std::cmp::Ordering { compare_values(a, b) }
339
340 /// Projects a document to include only specified fields.
341 async fn project_document(&self, doc: &Document, fields: &[String]) -> Result<Document> {
342 project_document(doc, fields).await
343 }
344}