Skip to main content

sentinel_dbms/collection/
streaming.rs

1use async_stream::stream;
2use tokio::fs as tokio_fs;
3use tokio_stream::Stream;
4use tracing::{debug, trace};
5
6use crate::{streaming::stream_document_ids, Document, Result, SentinelError};
7use super::coll::Collection;
8
9#[allow(
10    clippy::multiple_inherent_impl,
11    reason = "multiple impl blocks for Collection are intentional for organization"
12)]
13impl Collection {
14    /// Lists all document IDs in the collection.
15    ///
16    /// Returns a stream of document IDs from the collection directory.
17    /// IDs are streamed as they are discovered, without guaranteed ordering.
18    /// For sorted results, collect the stream and sort manually.
19    ///
20    /// # Returns
21    ///
22    /// Returns a stream of document IDs (filenames without the .json extension),
23    /// or a `SentinelError` if the operation fails due to filesystem errors.
24    ///
25    /// # Example
26    ///
27    /// ```rust
28    /// use sentinel_dbms::{Store, Collection};
29    /// use serde_json::json;
30    /// use futures::TryStreamExt;
31    ///
32    /// # async fn example() -> sentinel_dbms::Result<()> {
33    /// let store = Store::new("/path/to/data", None).await?;
34    /// let collection = store.collection("users").await?;
35    ///
36    /// // Insert some documents
37    /// collection.insert("user-123", json!({"name": "Alice"})).await?;
38    /// collection.insert("user-456", json!({"name": "Bob"})).await?;
39    ///
40    /// // Stream all document IDs
41    /// let ids: Vec<_> = collection.list().try_collect().await?;
42    /// assert_eq!(ids.len(), 2);
43    /// assert!(ids.contains(&"user-123".to_string()));
44    /// assert!(ids.contains(&"user-456".to_string()));
45    /// # Ok(())
46    /// # }
47    /// ```
48    pub fn list(&self) -> std::pin::Pin<Box<dyn Stream<Item = Result<String>> + Send>> {
49        trace!("Streaming document IDs from collection: {}", self.name());
50        stream_document_ids(self.path.clone())
51    }
52
53    /// Filters documents in the collection using a predicate function.
54    ///
55    /// This method performs streaming filtering by loading and checking documents
56    /// one by one, keeping only matching documents in memory. This approach
57    /// minimizes memory usage while maintaining good performance for most use cases.
58    ///
59    /// By default, this method verifies both hash and signature with strict mode.
60    /// Use `filter_with_verification()` to customize verification behavior.
61    ///
62    /// # Arguments
63    ///
64    /// * `predicate` - A function that takes a `&Document` and returns `true` if the document
65    ///   should be included in the results.
66    ///
67    /// # Returns
68    ///
69    /// Returns a stream of documents that match the predicate.
70    ///
71    /// # Example
72    ///
73    /// ```rust
74    /// use sentinel_dbms::{Store, Collection};
75    /// use serde_json::json;
76    /// use futures::stream::StreamExt;
77    ///
78    /// # async fn example() -> sentinel_dbms::Result<()> {
79    /// let store = Store::new("/path/to/data", None).await?;
80    /// let collection = store.collection("users").await?;
81    ///
82    /// // Insert some test data
83    /// collection.insert("user-1", json!({"name": "Alice", "age": 25})).await?;
84    /// collection.insert("user-2", json!({"name": "Bob", "age": 30})).await?;
85    ///
86    /// // Filter for users older than 26
87    /// let mut adults = collection.filter(|doc| {
88    ///     doc.data().get("age")
89    ///         .and_then(|v| v.as_i64())
90    ///         .map_or(false, |age| age > 26)
91    /// });
92    ///
93    /// let mut count = 0;
94    /// while let Some(doc) = adults.next().await {
95    ///     let doc = doc?;
96    ///     assert_eq!(doc.id(), "user-2");
97    ///     count += 1;
98    /// }
99    /// assert_eq!(count, 1);
100    /// # Ok(())
101    /// # }
102    /// ```
103    pub fn filter<F>(&self, predicate: F) -> std::pin::Pin<Box<dyn Stream<Item = Result<Document>> + Send>>
104    where
105        F: Fn(&Document) -> bool + Send + Sync + 'static,
106    {
107        self.filter_with_verification(predicate, &crate::VerificationOptions::default())
108    }
109
110    /// Filters documents in the collection using a predicate function with custom verification
111    /// options.
112    ///
113    /// This method performs streaming filtering by loading and checking documents
114    /// one by one, keeping only matching documents in memory. This approach
115    /// minimizes memory usage while maintaining good performance for most use cases.
116    ///
117    /// # Arguments
118    ///
119    /// * `predicate` - A function that takes a `&Document` and returns `true` if the document
120    ///   should be included in the results.
121    /// * `options` - Verification options controlling hash and signature verification.
122    ///
123    /// # Returns
124    ///
125    /// Returns a stream of documents that match the predicate.
126    ///
127    /// # Example
128    ///
129    /// ```rust
130    /// use sentinel_dbms::{Store, Collection, VerificationOptions};
131    /// use serde_json::json;
132    /// use futures::stream::StreamExt;
133    ///
134    /// # async fn example() -> sentinel_dbms::Result<()> {
135    /// let store = Store::new("/path/to/data", None).await?;
136    /// let collection = store.collection("users").await?;
137    ///
138    /// // Insert some test data
139    /// collection.insert("user-1", json!({"name": "Alice", "age": 25})).await?;
140    /// collection.insert("user-2", json!({"name": "Bob", "age": 30})).await?;
141    ///
142    /// // Filter with warnings enabled
143    /// let options = VerificationOptions::warn();
144    /// let mut adults = collection.filter_with_verification(
145    ///     |doc| {
146    ///         doc.data().get("age")
147    ///             .and_then(|v| v.as_i64())
148    ///             .map_or(false, |age| age > 26)
149    ///     },
150    ///     &options
151    /// );
152    ///
153    /// let mut count = 0;
154    /// while let Some(doc) = adults.next().await {
155    ///     let doc = doc?;
156    ///     assert_eq!(doc.id(), "user-2");
157    ///     count += 1;
158    /// }
159    /// assert_eq!(count, 1);
160    /// # Ok(())
161    /// # }
162    /// ```
163    pub fn filter_with_verification<F>(
164        &self,
165        predicate: F,
166        options: &crate::VerificationOptions,
167    ) -> std::pin::Pin<Box<dyn Stream<Item = Result<Document>> + Send>>
168    where
169        F: Fn(&Document) -> bool + Send + Sync + 'static,
170    {
171        let collection_path = self.path.clone();
172        let signing_key = self.signing_key.clone();
173        let options = *options;
174
175        Box::pin(stream! {
176            trace!(
177                "Streaming filter on collection (verification enabled: {})",
178                options.verify_signature || options.verify_hash
179            );
180            let mut entries = match tokio_fs::read_dir(&collection_path).await {
181                Ok(entries) => entries,
182                Err(e) => {
183                    yield Err(e.into());
184                    return;
185                }
186            };
187
188            loop {
189                let entry = match entries.next_entry().await {
190                    Ok(Some(entry)) => entry,
191                    Ok(None) => break,
192                    Err(e) => {
193                        yield Err(e.into());
194                        continue;
195                    }
196                };
197
198                let path = entry.path();
199                if !tokio_fs::metadata(&path).await.map(|m| m.is_dir()).unwrap_or(false)
200                    && let Some(file_name) = path.file_name().and_then(|n| n.to_str())
201                        && file_name.ends_with(".json") && !file_name.starts_with('.') {
202                            let id = file_name.strip_suffix(".json").unwrap();
203                            let file_path = collection_path.join(format!("{}.json", id));
204                            match tokio_fs::read_to_string(&file_path).await {
205                                Ok(content) => {
206                                    match serde_json::from_str::<Document>(&content) {
207                                        Ok(mut doc) => {
208                                            doc.id = id.to_owned();
209
210                                            let collection_ref = Self {
211                                                path: collection_path.clone(),
212                                                created_at: chrono::Utc::now(),
213                                                updated_at: std::sync::RwLock::new(chrono::Utc::now()),
214                                                last_checkpoint_at: std::sync::RwLock::new(None),
215                                                total_documents: std::sync::Arc::new(std::sync::atomic::AtomicU64::new(0)),
216                                                total_size_bytes: std::sync::Arc::new(std::sync::atomic::AtomicU64::new(0)),
217                                                signing_key: signing_key.clone(),
218                                                wal_manager: None,
219                                                stored_wal_config: sentinel_wal::CollectionWalConfig::default(),
220                                                wal_config: sentinel_wal::CollectionWalConfig::default(),
221                                                event_sender: None,
222                                                event_task: None,
223                                                recovery_mode: std::sync::atomic::AtomicBool::new(false),
224                                            };
225
226                                            if let Err(e) = collection_ref.verify_document(&doc, &options).await {
227                                                if matches!(e, SentinelError::HashVerificationFailed { .. } | SentinelError::SignatureVerificationFailed { .. }) {
228                                                    if options.hash_verification_mode == crate::VerificationMode::Strict
229                                                        || options.signature_verification_mode == crate::VerificationMode::Strict
230                                                    {
231                                                        yield Err(e);
232                                                        continue;
233                                                    }
234                                                } else {
235                                                    yield Err(e);
236                                                    continue;
237                                                }
238                                            }
239
240                                            if predicate(&doc) {
241                                                yield Ok(doc);
242                                            }
243                                        }
244                                        Err(e) => yield Err(e.into()),
245                                    }
246                                }
247                                Err(e) => yield Err(e.into()),
248                            }
249                        }
250            }
251            debug!("Streaming filter completed");
252        })
253    }
254
255    /// Streams all documents in the collection.
256    ///
257    /// This method performs streaming by loading documents one by one,
258    /// minimizing memory usage.
259    ///
260    /// By default, this method verifies both hash and signature with strict mode.
261    /// Use `all_with_verification()` to customize verification behavior.
262    ///
263    /// # Returns
264    ///
265    /// Returns a stream of all documents in the collection.
266    ///
267    /// # Example
268    ///
269    /// ```rust
270    /// use sentinel_dbms::{Collection, Store};
271    /// use futures::stream::StreamExt;
272    ///
273    /// # async fn example() -> sentinel_dbms::Result<()> {
274    /// let store = Store::new("/path/to/data", None).await?;
275    /// let collection = store.collection("users").await?;
276    ///
277    /// // Stream all documents
278    /// let mut all_docs = collection.all();
279    /// while let Some(doc) = all_docs.next().await {
280    ///     let doc = doc?;
281    ///     println!("Document: {}", doc.id());
282    /// }
283    /// # Ok(())
284    /// # }
285    /// ```
286    pub fn all(&self) -> std::pin::Pin<Box<dyn Stream<Item = Result<Document>> + Send>> {
287        self.all_with_verification(&crate::VerificationOptions::default())
288    }
289
290    /// Streams all documents in the collection with custom verification options.
291    ///
292    /// This method performs streaming by loading documents one by one,
293    /// minimizing memory usage.
294    ///
295    /// # Arguments
296    ///
297    /// * `options` - Verification options controlling hash and signature verification.
298    ///
299    /// # Returns
300    ///
301    /// Returns a stream of all documents in the collection.
302    ///
303    /// # Example
304    ///
305    /// ```rust
306    /// use sentinel_dbms::{Collection, Store, VerificationOptions};
307    /// use futures::stream::StreamExt;
308    ///
309    /// # async fn example() -> sentinel_dbms::Result<()> {
310    /// let store = Store::new("/path/to/data", None).await?;
311    /// let collection = store.collection("users").await?;
312    ///
313    /// // Stream all documents with warnings instead of errors
314    /// let options = VerificationOptions::warn();
315    /// let mut all_docs = collection.all_with_verification(&options);
316    /// while let Some(doc) = all_docs.next().await {
317    ///     let doc = doc?;
318    ///     println!("Document: {}", doc.id());
319    /// }
320    /// # Ok(())
321    /// # }
322    /// ```
323    pub fn all_with_verification(
324        &self,
325        options: &crate::VerificationOptions,
326    ) -> std::pin::Pin<Box<dyn Stream<Item = Result<Document>> + Send>> {
327        let collection_path = self.path.clone();
328        let signing_key = self.signing_key.clone();
329        let options = *options;
330
331        Box::pin(stream! {
332            trace!(
333                "Streaming all documents on collection (verification enabled: {})",
334                options.verify_signature || options.verify_hash
335            );
336            let mut entries = match tokio_fs::read_dir(&collection_path).await {
337                Ok(entries) => entries,
338                Err(e) => {
339                    yield Err(e.into());
340                    return;
341                }
342            };
343
344            loop {
345                let entry = match entries.next_entry().await {
346                    Ok(Some(entry)) => entry,
347                    Ok(None) => break,
348                    Err(e) => {
349                        yield Err(e.into());
350                        continue;
351                    }
352                };
353
354                let path = entry.path();
355                if !tokio_fs::metadata(&path).await.map(|m| m.is_dir()).unwrap_or(false)
356                    && let Some(file_name) = path.file_name().and_then(|n| n.to_str())
357                        && file_name.ends_with(".json") && !file_name.starts_with('.') {
358                            let id = file_name.strip_suffix(".json").unwrap();
359                            let file_path = collection_path.join(format!("{}.json", id));
360                            match tokio_fs::read_to_string(&file_path).await {
361                                Ok(content) => {
362                                    match serde_json::from_str::<Document>(&content) {
363                                        Ok(mut doc) => {
364                                            doc.id = id.to_owned();
365
366                                            let collection_ref = Self {
367                                                path: collection_path.clone(),
368                                                created_at: chrono::Utc::now(),
369                                                updated_at: std::sync::RwLock::new(chrono::Utc::now()),
370                                                last_checkpoint_at: std::sync::RwLock::new(None),
371                                                total_documents: std::sync::Arc::new(std::sync::atomic::AtomicU64::new(0)),
372                                                total_size_bytes: std::sync::Arc::new(std::sync::atomic::AtomicU64::new(0)),
373                                                signing_key: signing_key.clone(),
374                                                wal_manager: None,
375                                                stored_wal_config: sentinel_wal::CollectionWalConfig::default(),
376                                                wal_config: sentinel_wal::CollectionWalConfig::default(),
377                                                event_sender: None,
378                                                event_task: None,
379                                                recovery_mode: std::sync::atomic::AtomicBool::new(false),
380                                            };
381
382                                            if let Err(e) = collection_ref.verify_document(&doc, &options).await {
383                                                if matches!(e, SentinelError::HashVerificationFailed { .. } | SentinelError::SignatureVerificationFailed { .. }) {
384                                                    if options.hash_verification_mode == crate::VerificationMode::Strict
385                                                        || options.signature_verification_mode == crate::VerificationMode::Strict
386                                                    {
387                                                        yield Err(e);
388                                                        continue;
389                                                    }
390                                                } else {
391                                                    yield Err(e);
392                                                    continue;
393                                                }
394                                            }
395
396                                            yield Ok(doc);
397                                        }
398                                        Err(e) => yield Err(e.into()),
399                                    }
400                                }
401                                Err(e) => yield Err(e.into()),
402                            }
403                        }
404            }
405            debug!("Streaming all completed");
406        })
407    }
408}