Skip to main content

sentinel_dbms/collection/
query.rs

1use async_stream::stream;
2use futures::{StreamExt as _, TryStreamExt as _};
3use serde_json::Value;
4use tokio::fs as tokio_fs;
5use tokio_stream::Stream;
6use tracing::{debug, trace};
7
8use crate::{
9    comparison::compare_values,
10    filtering::matches_filters,
11    projection::project_document,
12    streaming::stream_document_ids,
13    Document,
14    Result,
15    SentinelError,
16};
17use super::coll::Collection;
18
19#[allow(
20    clippy::multiple_inherent_impl,
21    reason = "multiple impl blocks for Collection are intentional for organization"
22)]
23impl Collection {
24    /// Executes a structured query against the collection.
25    ///
26    /// This method supports complex filtering, sorting, pagination, and field projection.
27    /// For optimal performance and memory usage:
28    /// - Queries without sorting use streaming processing with early limit application
29    /// - Queries with sorting collect filtered documents in memory for sorting
30    /// - Projection is applied only to final results to minimize memory usage
31    ///
32    /// By default, this method verifies both hash and signature with strict mode.
33    /// Use `query_with_verification()` to customize verification behavior.
34    ///
35    /// # Arguments
36    ///
37    /// * `query` - The query to execute
38    ///
39    /// # Returns
40    ///
41    /// Returns a `QueryResult` containing the matching documents and metadata.
42    ///
43    /// # Example
44    ///
45    /// ```rust
46    /// use sentinel_dbms::{Store, Collection, QueryBuilder, Operator, SortOrder};
47    /// use serde_json::json;
48    ///
49    /// # async fn example() -> sentinel_dbms::Result<()> {
50    /// let store = Store::new("/path/to/data", None).await?;
51    /// let collection = store.collection("users").await?;
52    ///
53    /// // Insert test data
54    /// collection.insert("user-1", json!({"name": "Alice", "age": 25, "city": "NYC"})).await?;
55    /// collection.insert("user-2", json!({"name": "Bob", "age": 30, "city": "LA"})).await?;
56    /// collection.insert("user-3", json!({"name": "Charlie", "age": 35, "city": "NYC"})).await?;
57    ///
58    /// // Query for users in NYC, sorted by age, limit 2
59    /// let query = QueryBuilder::new()
60    ///     .filter("city", Operator::Equals, json!("NYC"))
61    ///     .sort("age", SortOrder::Ascending)
62    ///     .limit(2)
63    ///     .projection(vec!["name", "age"])
64    ///     .build();
65    ///
66    /// let result = collection.query(query).await?;
67    /// let documents: Vec<_> = futures::TryStreamExt::try_collect(result.documents).await?;
68    /// assert_eq!(documents.len(), 2);
69    /// # Ok(())
70    /// # }
71    /// ```
72    pub async fn query(&self, query: crate::Query) -> Result<crate::QueryResult> {
73        self.query_with_verification(query, &crate::VerificationOptions::default())
74            .await
75    }
76
77    /// Executes a structured query against the collection with custom verification options.
78    ///
79    /// This method supports complex filtering, sorting, pagination, and field projection.
80    /// For optimal performance and memory usage:
81    /// - Queries without sorting use streaming processing with early limit application
82    /// - Queries with sorting collect filtered documents in memory for sorting
83    /// - Projection is applied only to final results to minimize memory usage
84    ///
85    /// # Arguments
86    ///
87    /// * `query` - The query to execute
88    /// * `options` - Verification options controlling hash and signature verification.
89    ///
90    /// # Returns
91    ///
92    /// Returns a `QueryResult` containing the matching documents and metadata.
93    ///
94    /// # Example
95    ///
96    /// ```rust
97    /// use sentinel_dbms::{Store, Collection, QueryBuilder, Operator, SortOrder, VerificationOptions, VerificationMode};
98    /// use serde_json::json;
99    ///
100    /// # async fn example() -> sentinel_dbms::Result<()> {
101    /// let store = Store::new("/path/to/data", None).await?;
102    /// let collection = store.collection("users").await?;
103    ///
104    /// // Insert test data
105    /// collection.insert("user-1", json!({"name": "Alice", "age": 25, "city": "NYC"})).await?;
106    /// collection.insert("user-2", json!({"name": "Bob", "age": 30, "city": "LA"})).await?;
107    /// collection.insert("user-3", json!({"name": "Charlie", "age": 35, "city": "NYC"})).await?;
108    ///
109    /// // Query with warning mode
110    /// let options = VerificationOptions::warn();
111    /// let query = QueryBuilder::new()
112    ///     .filter("city", Operator::Equals, json!("NYC"))
113    ///     .sort("age", SortOrder::Ascending)
114    ///     .limit(2)
115    ///     .projection(vec!["name", "age"])
116    ///     .build();
117    ///
118    /// let result = collection.query_with_verification(query, &options).await?;
119    /// let documents: Vec<_> = futures::TryStreamExt::try_collect(result.documents).await?;
120    /// assert_eq!(documents.len(), 2);
121    /// # Ok(())
122    /// # }
123    /// ```
124    pub async fn query_with_verification(
125        &self,
126        query: crate::Query,
127        options: &crate::VerificationOptions,
128    ) -> Result<crate::QueryResult> {
129        use std::time::Instant;
130        let start_time = Instant::now();
131
132        trace!(
133            "Executing query on collection: {} (verification enabled: {})",
134            self.name(),
135            options.verify_signature || options.verify_hash
136        );
137
138        // Get all document IDs - but for full streaming, we should avoid this
139        // However, for sorted queries, we need to know all IDs to collect
140        // For non-sorted, we can stream without knowing all IDs
141        let documents_stream = if query.sort.is_some() {
142            // For sorted queries, we need to collect all matching documents
143            let all_ids: Vec<String> = self.list().try_collect().await?;
144            let docs = self
145                .execute_sorted_query_with_verification(&all_ids, &query, options)
146                .await?;
147            let stream = tokio_stream::iter(docs.into_iter().map(Ok));
148            Box::pin(stream) as std::pin::Pin<Box<dyn Stream<Item = Result<Document>> + Send>>
149        }
150        else {
151            // For non-sorted queries, use streaming
152            self.execute_streaming_query_with_verification(&query, options)
153                .await?
154        };
155
156        let execution_time = start_time.elapsed();
157        debug!("Query completed in {:?}", execution_time);
158
159        Ok(crate::QueryResult {
160            documents: documents_stream,
161            total_count: None, // For streaming, we don't know the total count upfront
162            execution_time,
163        })
164    }
165
166    /// Executes a query that requires sorting by collecting all matching documents first with
167    /// verification.
168    async fn execute_sorted_query_with_verification(
169        &self,
170        all_ids: &[String],
171        query: &crate::Query,
172        options: &crate::VerificationOptions,
173    ) -> Result<Vec<Document>> {
174        // For sorted queries, we need to collect all matching documents to sort them
175        // But we can optimize by only keeping document IDs and sort values during filtering
176        let mut matching_docs = Vec::new();
177
178        // Precompute filter references to avoid allocating a new Vec for each document
179        let filter_refs: Vec<_> = query.filters.iter().collect();
180
181        for id in all_ids {
182            if let Some(doc) = self.get_with_verification(id, options).await? &&
183                matches_filters(&doc, &filter_refs)
184            {
185                matching_docs.push(doc);
186            }
187        }
188
189        if let Some(ref inner) = query.sort {
190            let field = &inner.0;
191            let order = &inner.1;
192            matching_docs.sort_by(|a, b| {
193                let a_val = a.data().get(field.as_str());
194                let b_val = b.data().get(field.as_str());
195                if *order == crate::SortOrder::Ascending {
196                    self.compare_values(a_val, b_val)
197                }
198                else {
199                    self.compare_values(b_val, a_val)
200                }
201            });
202        }
203
204        // Apply offset and limit
205        let offset = query.offset.unwrap_or(0);
206        let start_idx = offset.min(matching_docs.len());
207        let end_idx = query.limit.map_or(matching_docs.len(), |limit| {
208            start_idx.saturating_add(limit).min(matching_docs.len())
209        });
210
211        // Apply projection to the final results
212        let mut final_docs = Vec::new();
213        for doc in matching_docs
214            .into_iter()
215            .skip(start_idx)
216            .take(end_idx.saturating_sub(start_idx))
217        {
218            let projected_doc = if let Some(ref fields) = query.projection {
219                self.project_document(&doc, fields).await?
220            }
221            else {
222                doc
223            };
224            final_docs.push(projected_doc);
225        }
226
227        Ok(final_docs)
228    }
229
230    /// Executes a query without sorting, allowing streaming with early limit application and
231    /// verification.
232    async fn execute_streaming_query_with_verification(
233        &self,
234        query: &crate::Query,
235        options: &crate::VerificationOptions,
236    ) -> Result<std::pin::Pin<Box<dyn Stream<Item = Result<Document>> + Send>>> {
237        let collection_path = self.path.clone();
238        let signing_key = self.signing_key.clone();
239        let filters = query.filters.clone();
240        let projection_fields = query.projection.clone();
241        let limit = query.limit.unwrap_or(usize::MAX);
242        let offset = query.offset.unwrap_or(0);
243        let options = *options;
244
245        Ok(Box::pin(stream! {
246            let mut id_stream = stream_document_ids(collection_path.clone());
247            let mut yielded = 0;
248            let mut skipped = 0;
249
250            // Precompute filter references to avoid allocating a new Vec for each document
251            let filter_refs: Vec<_> = filters.iter().collect();
252
253            while let Some(id_result) = id_stream.next().await {
254                let id = match id_result {
255                    Ok(id) => id,
256                    Err(e) => {
257                        yield Err(e);
258                        continue;
259                    }
260                };
261
262                // Load document
263                let file_path = collection_path.join(format!("{}.json", id));
264                let content = match tokio_fs::read_to_string(&file_path).await {
265                    Ok(content) => content,
266                    Err(e) => {
267                        yield Err(e.into());
268                        continue;
269                    }
270                };
271
272                let doc = match serde_json::from_str::<Document>(&content) {
273                    Ok(doc) => {
274                        // Create a new document with the correct ID
275                        let mut doc_with_id = doc;
276                        doc_with_id.id = id.clone();
277
278                        let collection_ref = Self {
279                            path: collection_path.clone(),
280                                                created_at: chrono::Utc::now(),
281                                                updated_at: std::sync::RwLock::new(chrono::Utc::now()),
282                                                last_checkpoint_at: std::sync::RwLock::new(None),
283                                                total_documents: std::sync::Arc::new(std::sync::atomic::AtomicU64::new(0)),
284                                                total_size_bytes: std::sync::Arc::new(std::sync::atomic::AtomicU64::new(0)),
285                            signing_key: signing_key.clone(),
286                                                stored_wal_config: sentinel_wal::CollectionWalConfig::default(),
287                                                wal_manager: None,
288                                                wal_config: sentinel_wal::CollectionWalConfig::default(),
289                                                event_sender: None,
290                                                event_task: None,
291                            recovery_mode: std::sync::atomic::AtomicBool::new(false),
292                        };
293
294                        if let Err(e) = collection_ref.verify_document(&doc_with_id, &options).await {
295                            if matches!(e, SentinelError::HashVerificationFailed { .. } | SentinelError::SignatureVerificationFailed { .. }) {
296                                if options.hash_verification_mode == crate::VerificationMode::Strict
297                                    || options.signature_verification_mode == crate::VerificationMode::Strict
298                                    || options.empty_signature_mode == crate::VerificationMode::Strict
299                                {
300                                    yield Err(e);
301                                    continue;
302                                }
303                            } else {
304                                yield Err(e);
305                                continue;
306                            }
307                        }
308
309                        doc_with_id
310                    }
311                    Err(e) => {
312                        yield Err(e.into());
313                        continue;
314                    }
315                };
316
317                if matches_filters(&doc, &filter_refs) {
318                    if skipped < offset {
319                        skipped = skipped.saturating_add(1);
320                        continue;
321                    }
322                    if yielded >= limit {
323                        break;
324                    }
325                    let final_doc = if let Some(ref fields) = projection_fields {
326                        project_document(&doc, fields).await?
327                    } else {
328                        doc
329                    };
330                    yield Ok(final_doc);
331                    yielded = yielded.saturating_add(1);
332                }
333            }
334        }))
335    }
336
337    /// Compares two values for sorting purposes.
338    fn compare_values(&self, a: Option<&Value>, b: Option<&Value>) -> std::cmp::Ordering { compare_values(a, b) }
339
340    /// Projects a document to include only specified fields.
341    async fn project_document(&self, doc: &Document, fields: &[String]) -> Result<Document> {
342        project_document(doc, fields).await
343    }
344}