sentinel_dbms/collection/streaming.rs
1use async_stream::stream;
2use tokio::fs as tokio_fs;
3use tokio_stream::Stream;
4use tracing::{debug, trace};
5
6use crate::{streaming::stream_document_ids, Document, Result, SentinelError};
7use super::coll::Collection;
8
9#[allow(
10 clippy::multiple_inherent_impl,
11 reason = "multiple impl blocks for Collection are intentional for organization"
12)]
13impl Collection {
14 /// Lists all document IDs in the collection.
15 ///
16 /// Returns a stream of document IDs from the collection directory.
17 /// IDs are streamed as they are discovered, without guaranteed ordering.
18 /// For sorted results, collect the stream and sort manually.
19 ///
20 /// # Returns
21 ///
22 /// Returns a stream of document IDs (filenames without the .json extension),
23 /// or a `SentinelError` if the operation fails due to filesystem errors.
24 ///
25 /// # Example
26 ///
27 /// ```rust
28 /// use sentinel_dbms::{Store, Collection};
29 /// use serde_json::json;
30 /// use futures::TryStreamExt;
31 ///
32 /// # async fn example() -> sentinel_dbms::Result<()> {
33 /// let store = Store::new("/path/to/data", None).await?;
34 /// let collection = store.collection("users").await?;
35 ///
36 /// // Insert some documents
37 /// collection.insert("user-123", json!({"name": "Alice"})).await?;
38 /// collection.insert("user-456", json!({"name": "Bob"})).await?;
39 ///
40 /// // Stream all document IDs
41 /// let ids: Vec<_> = collection.list().try_collect().await?;
42 /// assert_eq!(ids.len(), 2);
43 /// assert!(ids.contains(&"user-123".to_string()));
44 /// assert!(ids.contains(&"user-456".to_string()));
45 /// # Ok(())
46 /// # }
47 /// ```
48 pub fn list(&self) -> std::pin::Pin<Box<dyn Stream<Item = Result<String>> + Send>> {
49 trace!("Streaming document IDs from collection: {}", self.name());
50 stream_document_ids(self.path.clone())
51 }
52
53 /// Filters documents in the collection using a predicate function.
54 ///
55 /// This method performs streaming filtering by loading and checking documents
56 /// one by one, keeping only matching documents in memory. This approach
57 /// minimizes memory usage while maintaining good performance for most use cases.
58 ///
59 /// By default, this method verifies both hash and signature with strict mode.
60 /// Use `filter_with_verification()` to customize verification behavior.
61 ///
62 /// # Arguments
63 ///
64 /// * `predicate` - A function that takes a `&Document` and returns `true` if the document
65 /// should be included in the results.
66 ///
67 /// # Returns
68 ///
69 /// Returns a stream of documents that match the predicate.
70 ///
71 /// # Example
72 ///
73 /// ```rust
74 /// use sentinel_dbms::{Store, Collection};
75 /// use serde_json::json;
76 /// use futures::stream::StreamExt;
77 ///
78 /// # async fn example() -> sentinel_dbms::Result<()> {
79 /// let store = Store::new("/path/to/data", None).await?;
80 /// let collection = store.collection("users").await?;
81 ///
82 /// // Insert some test data
83 /// collection.insert("user-1", json!({"name": "Alice", "age": 25})).await?;
84 /// collection.insert("user-2", json!({"name": "Bob", "age": 30})).await?;
85 ///
86 /// // Filter for users older than 26
87 /// let mut adults = collection.filter(|doc| {
88 /// doc.data().get("age")
89 /// .and_then(|v| v.as_i64())
90 /// .map_or(false, |age| age > 26)
91 /// });
92 ///
93 /// let mut count = 0;
94 /// while let Some(doc) = adults.next().await {
95 /// let doc = doc?;
96 /// assert_eq!(doc.id(), "user-2");
97 /// count += 1;
98 /// }
99 /// assert_eq!(count, 1);
100 /// # Ok(())
101 /// # }
102 /// ```
103 pub fn filter<F>(&self, predicate: F) -> std::pin::Pin<Box<dyn Stream<Item = Result<Document>> + Send>>
104 where
105 F: Fn(&Document) -> bool + Send + Sync + 'static,
106 {
107 self.filter_with_verification(predicate, &crate::VerificationOptions::default())
108 }
109
110 /// Filters documents in the collection using a predicate function with custom verification
111 /// options.
112 ///
113 /// This method performs streaming filtering by loading and checking documents
114 /// one by one, keeping only matching documents in memory. This approach
115 /// minimizes memory usage while maintaining good performance for most use cases.
116 ///
117 /// # Arguments
118 ///
119 /// * `predicate` - A function that takes a `&Document` and returns `true` if the document
120 /// should be included in the results.
121 /// * `options` - Verification options controlling hash and signature verification.
122 ///
123 /// # Returns
124 ///
125 /// Returns a stream of documents that match the predicate.
126 ///
127 /// # Example
128 ///
129 /// ```rust
130 /// use sentinel_dbms::{Store, Collection, VerificationOptions};
131 /// use serde_json::json;
132 /// use futures::stream::StreamExt;
133 ///
134 /// # async fn example() -> sentinel_dbms::Result<()> {
135 /// let store = Store::new("/path/to/data", None).await?;
136 /// let collection = store.collection("users").await?;
137 ///
138 /// // Insert some test data
139 /// collection.insert("user-1", json!({"name": "Alice", "age": 25})).await?;
140 /// collection.insert("user-2", json!({"name": "Bob", "age": 30})).await?;
141 ///
142 /// // Filter with warnings enabled
143 /// let options = VerificationOptions::warn();
144 /// let mut adults = collection.filter_with_verification(
145 /// |doc| {
146 /// doc.data().get("age")
147 /// .and_then(|v| v.as_i64())
148 /// .map_or(false, |age| age > 26)
149 /// },
150 /// &options
151 /// );
152 ///
153 /// let mut count = 0;
154 /// while let Some(doc) = adults.next().await {
155 /// let doc = doc?;
156 /// assert_eq!(doc.id(), "user-2");
157 /// count += 1;
158 /// }
159 /// assert_eq!(count, 1);
160 /// # Ok(())
161 /// # }
162 /// ```
163 pub fn filter_with_verification<F>(
164 &self,
165 predicate: F,
166 options: &crate::VerificationOptions,
167 ) -> std::pin::Pin<Box<dyn Stream<Item = Result<Document>> + Send>>
168 where
169 F: Fn(&Document) -> bool + Send + Sync + 'static,
170 {
171 let collection_path = self.path.clone();
172 let signing_key = self.signing_key.clone();
173 let options = *options;
174
175 Box::pin(stream! {
176 trace!(
177 "Streaming filter on collection (verification enabled: {})",
178 options.verify_signature || options.verify_hash
179 );
180 let mut entries = match tokio_fs::read_dir(&collection_path).await {
181 Ok(entries) => entries,
182 Err(e) => {
183 yield Err(e.into());
184 return;
185 }
186 };
187
188 loop {
189 let entry = match entries.next_entry().await {
190 Ok(Some(entry)) => entry,
191 Ok(None) => break,
192 Err(e) => {
193 yield Err(e.into());
194 continue;
195 }
196 };
197
198 let path = entry.path();
199 if !tokio_fs::metadata(&path).await.map(|m| m.is_dir()).unwrap_or(false)
200 && let Some(file_name) = path.file_name().and_then(|n| n.to_str())
201 && file_name.ends_with(".json") && !file_name.starts_with('.') {
202 let id = file_name.strip_suffix(".json").unwrap();
203 let file_path = collection_path.join(format!("{}.json", id));
204 match tokio_fs::read_to_string(&file_path).await {
205 Ok(content) => {
206 match serde_json::from_str::<Document>(&content) {
207 Ok(mut doc) => {
208 doc.id = id.to_owned();
209
210 let collection_ref = Self {
211 path: collection_path.clone(),
212 created_at: chrono::Utc::now(),
213 updated_at: std::sync::RwLock::new(chrono::Utc::now()),
214 last_checkpoint_at: std::sync::RwLock::new(None),
215 total_documents: std::sync::Arc::new(std::sync::atomic::AtomicU64::new(0)),
216 total_size_bytes: std::sync::Arc::new(std::sync::atomic::AtomicU64::new(0)),
217 signing_key: signing_key.clone(),
218 wal_manager: None,
219 stored_wal_config: sentinel_wal::CollectionWalConfig::default(),
220 wal_config: sentinel_wal::CollectionWalConfig::default(),
221 event_sender: None,
222 event_task: None,
223 recovery_mode: std::sync::atomic::AtomicBool::new(false),
224 };
225
226 if let Err(e) = collection_ref.verify_document(&doc, &options).await {
227 if matches!(e, SentinelError::HashVerificationFailed { .. } | SentinelError::SignatureVerificationFailed { .. }) {
228 if options.hash_verification_mode == crate::VerificationMode::Strict
229 || options.signature_verification_mode == crate::VerificationMode::Strict
230 {
231 yield Err(e);
232 continue;
233 }
234 } else {
235 yield Err(e);
236 continue;
237 }
238 }
239
240 if predicate(&doc) {
241 yield Ok(doc);
242 }
243 }
244 Err(e) => yield Err(e.into()),
245 }
246 }
247 Err(e) => yield Err(e.into()),
248 }
249 }
250 }
251 debug!("Streaming filter completed");
252 })
253 }
254
255 /// Streams all documents in the collection.
256 ///
257 /// This method performs streaming by loading documents one by one,
258 /// minimizing memory usage.
259 ///
260 /// By default, this method verifies both hash and signature with strict mode.
261 /// Use `all_with_verification()` to customize verification behavior.
262 ///
263 /// # Returns
264 ///
265 /// Returns a stream of all documents in the collection.
266 ///
267 /// # Example
268 ///
269 /// ```rust
270 /// use sentinel_dbms::{Collection, Store};
271 /// use futures::stream::StreamExt;
272 ///
273 /// # async fn example() -> sentinel_dbms::Result<()> {
274 /// let store = Store::new("/path/to/data", None).await?;
275 /// let collection = store.collection("users").await?;
276 ///
277 /// // Stream all documents
278 /// let mut all_docs = collection.all();
279 /// while let Some(doc) = all_docs.next().await {
280 /// let doc = doc?;
281 /// println!("Document: {}", doc.id());
282 /// }
283 /// # Ok(())
284 /// # }
285 /// ```
286 pub fn all(&self) -> std::pin::Pin<Box<dyn Stream<Item = Result<Document>> + Send>> {
287 self.all_with_verification(&crate::VerificationOptions::default())
288 }
289
290 /// Streams all documents in the collection with custom verification options.
291 ///
292 /// This method performs streaming by loading documents one by one,
293 /// minimizing memory usage.
294 ///
295 /// # Arguments
296 ///
297 /// * `options` - Verification options controlling hash and signature verification.
298 ///
299 /// # Returns
300 ///
301 /// Returns a stream of all documents in the collection.
302 ///
303 /// # Example
304 ///
305 /// ```rust
306 /// use sentinel_dbms::{Collection, Store, VerificationOptions};
307 /// use futures::stream::StreamExt;
308 ///
309 /// # async fn example() -> sentinel_dbms::Result<()> {
310 /// let store = Store::new("/path/to/data", None).await?;
311 /// let collection = store.collection("users").await?;
312 ///
313 /// // Stream all documents with warnings instead of errors
314 /// let options = VerificationOptions::warn();
315 /// let mut all_docs = collection.all_with_verification(&options);
316 /// while let Some(doc) = all_docs.next().await {
317 /// let doc = doc?;
318 /// println!("Document: {}", doc.id());
319 /// }
320 /// # Ok(())
321 /// # }
322 /// ```
323 pub fn all_with_verification(
324 &self,
325 options: &crate::VerificationOptions,
326 ) -> std::pin::Pin<Box<dyn Stream<Item = Result<Document>> + Send>> {
327 let collection_path = self.path.clone();
328 let signing_key = self.signing_key.clone();
329 let options = *options;
330
331 Box::pin(stream! {
332 trace!(
333 "Streaming all documents on collection (verification enabled: {})",
334 options.verify_signature || options.verify_hash
335 );
336 let mut entries = match tokio_fs::read_dir(&collection_path).await {
337 Ok(entries) => entries,
338 Err(e) => {
339 yield Err(e.into());
340 return;
341 }
342 };
343
344 loop {
345 let entry = match entries.next_entry().await {
346 Ok(Some(entry)) => entry,
347 Ok(None) => break,
348 Err(e) => {
349 yield Err(e.into());
350 continue;
351 }
352 };
353
354 let path = entry.path();
355 if !tokio_fs::metadata(&path).await.map(|m| m.is_dir()).unwrap_or(false)
356 && let Some(file_name) = path.file_name().and_then(|n| n.to_str())
357 && file_name.ends_with(".json") && !file_name.starts_with('.') {
358 let id = file_name.strip_suffix(".json").unwrap();
359 let file_path = collection_path.join(format!("{}.json", id));
360 match tokio_fs::read_to_string(&file_path).await {
361 Ok(content) => {
362 match serde_json::from_str::<Document>(&content) {
363 Ok(mut doc) => {
364 doc.id = id.to_owned();
365
366 let collection_ref = Self {
367 path: collection_path.clone(),
368 created_at: chrono::Utc::now(),
369 updated_at: std::sync::RwLock::new(chrono::Utc::now()),
370 last_checkpoint_at: std::sync::RwLock::new(None),
371 total_documents: std::sync::Arc::new(std::sync::atomic::AtomicU64::new(0)),
372 total_size_bytes: std::sync::Arc::new(std::sync::atomic::AtomicU64::new(0)),
373 signing_key: signing_key.clone(),
374 wal_manager: None,
375 stored_wal_config: sentinel_wal::CollectionWalConfig::default(),
376 wal_config: sentinel_wal::CollectionWalConfig::default(),
377 event_sender: None,
378 event_task: None,
379 recovery_mode: std::sync::atomic::AtomicBool::new(false),
380 };
381
382 if let Err(e) = collection_ref.verify_document(&doc, &options).await {
383 if matches!(e, SentinelError::HashVerificationFailed { .. } | SentinelError::SignatureVerificationFailed { .. }) {
384 if options.hash_verification_mode == crate::VerificationMode::Strict
385 || options.signature_verification_mode == crate::VerificationMode::Strict
386 {
387 yield Err(e);
388 continue;
389 }
390 } else {
391 yield Err(e);
392 continue;
393 }
394 }
395
396 yield Ok(doc);
397 }
398 Err(e) => yield Err(e.into()),
399 }
400 }
401 Err(e) => yield Err(e.into()),
402 }
403 }
404 }
405 debug!("Streaming all completed");
406 })
407 }
408}