langchain_rust/document_loaders/
document_loader.rs

1use std::pin::Pin;
2
3use async_stream::stream;
4use async_trait::async_trait;
5use futures::Stream;
6use futures_util::{pin_mut, StreamExt};
7
8use crate::{schemas::Document, text_splitter::TextSplitter};
9
10use super::LoaderError;
11
12#[async_trait]
13pub trait Loader: Send + Sync {
14    async fn load(
15        self,
16    ) -> Result<
17        Pin<Box<dyn Stream<Item = Result<Document, LoaderError>> + Send + 'static>>,
18        LoaderError,
19    >;
20    async fn load_and_split<TS: TextSplitter + 'static>(
21        self,
22        splitter: TS,
23    ) -> Result<
24        Pin<Box<dyn Stream<Item = Result<Document, LoaderError>> + Send + 'static>>,
25        LoaderError,
26    >;
27}
28
29pub(crate) async fn process_doc_stream<TS: TextSplitter + 'static>(
30    doc_stream: Pin<Box<dyn Stream<Item = Result<Document, LoaderError>> + Send>>,
31    splitter: TS,
32) -> impl Stream<Item = Result<Document, LoaderError>> {
33    stream! {
34        pin_mut!(doc_stream);
35        while let Some(doc_result) = doc_stream.next().await {
36            match doc_result {
37                Ok(doc) => {
38                    match splitter.split_documents(&[doc]).await {
39                        Ok(docs) => {
40                            for doc in docs {
41                                yield Ok(doc);
42                            }
43                        },
44                        Err(e) => yield Err(LoaderError::TextSplitterError(e)),
45                    }
46                }
47                Err(e) => yield Err(e),
48            }
49        }
50    }
51}