use std::{path::PathBuf, pin::Pin};
use async_stream::stream;
use futures::Stream;
use tokio::fs as tokio_fs;
use crate::Result;
pub fn stream_document_ids(collection_path: PathBuf) -> Pin<Box<dyn Stream<Item = Result<String>> + Send>> {
Box::pin(stream! {
let mut entries = match tokio_fs::read_dir(&collection_path).await {
Ok(entries) => entries,
Err(e) => {
yield Err(e.into());
return;
}
};
loop {
let entry = match entries.next_entry().await {
Ok(Some(entry)) => entry,
Ok(None) => break,
Err(e) => {
yield Err(e.into());
continue;
}
};
let path = entry.path();
let metadata = match tokio_fs::metadata(&path).await {
Ok(metadata) => metadata,
Err(e) => {
yield Err(e.into());
continue;
}
};
if !metadata.is_dir()
&& let Some(file_name) = path.file_name().and_then(|n| n.to_str())
&& file_name.ends_with(".json") && !file_name.starts_with('.') {
let id = file_name.strip_suffix(".json").unwrap();
yield Ok(id.to_owned());
}
}
})
}
#[cfg(test)]
mod tests {
use tempfile::TempDir;
use tokio::fs as tokio_fs;
use futures::StreamExt;
use super::*;
#[tokio::test]
async fn test_stream_document_ids_empty_directory() {
let temp_dir = TempDir::new().unwrap();
let collection_path = temp_dir.path().join("collection");
tokio_fs::create_dir(&collection_path).await.unwrap();
let mut stream = stream_document_ids(collection_path);
let mut found_ids = Vec::new();
while let Some(result) = stream.next().await {
let id = result.unwrap();
found_ids.push(id);
}
assert!(found_ids.is_empty());
}
#[tokio::test]
async fn test_stream_document_ids_with_directory_removal() {
let temp_dir = TempDir::new().unwrap();
let collection_path = temp_dir.path().join("collection");
tokio_fs::create_dir(&collection_path).await.unwrap();
let doc_ids = vec!["doc1", "doc2"];
for id in &doc_ids {
let file_path = collection_path.join(format!("{}.json", id));
tokio_fs::write(&file_path, b"{}").await.unwrap();
}
let mut stream = stream_document_ids(collection_path.clone());
let first_result = stream.next().await;
assert!(first_result.is_some());
let first_id = match first_result.unwrap() {
Ok(id) => id,
Err(_) => panic!("Expected first item to succeed"),
};
tokio_fs::remove_dir_all(&collection_path).await.unwrap();
let mut found_ids = vec![first_id]; while let Some(result) = stream.next().await {
match result {
Ok(id) => found_ids.push(id),
Err(_) => {}, }
}
assert!(
!found_ids.is_empty(),
"Expected to find at least one document id"
);
}
#[tokio::test]
async fn test_stream_document_ids_with_invalid_path() {
let invalid_path = std::path::PathBuf::from("/dev/null/nonexistent");
let mut stream = stream_document_ids(invalid_path);
let mut error_count = 0;
while let Some(result) = stream.next().await {
match result {
Ok(_) => {},
Err(_) => error_count += 1,
}
}
assert!(error_count > 0, "Expected error when path is invalid");
}
#[tokio::test]
async fn test_stream_document_ids_with_next_entry_error() {
use futures::StreamExt;
let temp_dir = tempfile::tempdir().unwrap();
let collection_path = temp_dir.path().join("test_collection");
tokio::fs::create_dir_all(&collection_path).await.unwrap();
tokio::fs::write(collection_path.join("doc1.json"), "{}")
.await
.unwrap();
tokio::fs::write(collection_path.join("doc2.json"), "{}")
.await
.unwrap();
let mut stream = stream_document_ids(collection_path.clone());
let mut count = 0;
while let Some(result) = stream.next().await {
match result {
Ok(_) => count += 1,
Err(_) => {},
}
}
assert_eq!(count, 2);
}
}