sentinel-dbms 2.1.1

Cyberpath Sentinel is a filesystem-backed document DBMS written in Rust.
Documentation
//! Streaming utilities for document processing.

use std::{path::PathBuf, pin::Pin};

use async_stream::stream;
use futures::Stream;
use tokio::fs as tokio_fs;

use crate::Result;

/// Streams document IDs from a collection directory.
pub fn stream_document_ids(collection_path: PathBuf) -> Pin<Box<dyn Stream<Item = Result<String>> + Send>> {
    Box::pin(stream! {
        let mut entries = match tokio_fs::read_dir(&collection_path).await {
            Ok(entries) => entries,
            Err(e) => {
                yield Err(e.into());
                return;
            }
        };

        loop {
            let entry = match entries.next_entry().await {
                Ok(Some(entry)) => entry,
                Ok(None) => break,
                Err(e) => {
                    yield Err(e.into());
                    continue;
                }
            };

            let path = entry.path();
            let metadata = match tokio_fs::metadata(&path).await {
                Ok(metadata) => metadata,
                Err(e) => {
                    yield Err(e.into());
                    continue;
                }
            };
            if !metadata.is_dir()
                && let Some(file_name) = path.file_name().and_then(|n| n.to_str())
                && file_name.ends_with(".json") && !file_name.starts_with('.') {
                let id = file_name.strip_suffix(".json").unwrap();
                yield Ok(id.to_owned());
            }
        }
    })
}

#[cfg(test)]
mod tests {
    use tempfile::TempDir;
    use tokio::fs as tokio_fs;
    use futures::StreamExt;

    use super::*;

    #[tokio::test]
    async fn test_stream_document_ids_empty_directory() {
        let temp_dir = TempDir::new().unwrap();
        let collection_path = temp_dir.path().join("collection");
        tokio_fs::create_dir(&collection_path).await.unwrap();
        // No files in directory
        let mut stream = stream_document_ids(collection_path);
        let mut found_ids = Vec::new();
        while let Some(result) = stream.next().await {
            let id = result.unwrap();
            found_ids.push(id);
        }
        assert!(found_ids.is_empty());
    }

    #[tokio::test]
    async fn test_stream_document_ids_with_directory_removal() {
        let temp_dir = TempDir::new().unwrap();
        let collection_path = temp_dir.path().join("collection");
        tokio_fs::create_dir(&collection_path).await.unwrap();
        let doc_ids = vec!["doc1", "doc2"];
        for id in &doc_ids {
            let file_path = collection_path.join(format!("{}.json", id));
            tokio_fs::write(&file_path, b"{}").await.unwrap();
        }

        let mut stream = stream_document_ids(collection_path.clone());

        // Consume one item first to ensure read_dir() succeeds
        let first_result = stream.next().await;
        assert!(first_result.is_some());
        let first_id = match first_result.unwrap() {
            Ok(id) => id,
            Err(_) => panic!("Expected first item to succeed"),
        };

        // Now remove the directory during iteration
        tokio_fs::remove_dir_all(&collection_path).await.unwrap();

        let mut found_ids = vec![first_id]; // already consumed
        while let Some(result) = stream.next().await {
            match result {
                Ok(id) => found_ids.push(id),
                Err(_) => {}, // Error handling is tested by not panicking
            }
        }

        // The behavior may vary by platform - some may continue reading, others may error
        // The important thing is that the error handling doesn't panic
        assert!(
            !found_ids.is_empty(),
            "Expected to find at least one document id"
        );
        // Error count may be 0 or more depending on platform
    }

    #[tokio::test]
    async fn test_stream_document_ids_with_invalid_path() {
        // Test with a path that is not a directory to trigger read_dir error
        let invalid_path = std::path::PathBuf::from("/dev/null/nonexistent");
        let mut stream = stream_document_ids(invalid_path);
        let mut error_count = 0;
        while let Some(result) = stream.next().await {
            match result {
                Ok(_) => {},
                Err(_) => error_count += 1,
            }
        }
        assert!(error_count > 0, "Expected error when path is invalid");
    }

    #[tokio::test]
    async fn test_stream_document_ids_with_next_entry_error() {
        // Test next_entry error path (line 26-27)
        use futures::StreamExt;

        let temp_dir = tempfile::tempdir().unwrap();
        let collection_path = temp_dir.path().join("test_collection");
        tokio::fs::create_dir_all(&collection_path).await.unwrap();

        // Create some valid documents
        tokio::fs::write(collection_path.join("doc1.json"), "{}")
            .await
            .unwrap();
        tokio::fs::write(collection_path.join("doc2.json"), "{}")
            .await
            .unwrap();

        let mut stream = stream_document_ids(collection_path.clone());
        let mut count = 0;

        while let Some(result) = stream.next().await {
            match result {
                Ok(_) => count += 1,
                Err(_) => {},
            }
        }

        // Should have found the two documents
        assert_eq!(count, 2);
    }
}