langchain_rust/document_loaders/pandoc_loader/
pandoc_loader.rs

1use std::{fmt, path::Path, pin::Pin, process::Stdio};
2
3use async_trait::async_trait;
4use futures_util::{stream, Stream};
5use tokio::{
6    fs::File,
7    io::{AsyncRead, AsyncWriteExt, BufReader},
8    process::Command,
9};
10
11use crate::{
12    document_loaders::{process_doc_stream, Loader, LoaderError},
13    schemas::Document,
14    text_splitter::TextSplitter,
15};
16
17#[derive(Debug)]
18pub enum InputFormat {
19    Docx,
20    Epub,
21    Html,
22    JuypterNotebook,
23    Markdown,
24    MediaWiki,
25    RichTextFormat,
26    Typst,
27    VimWiki,
28}
29
30impl ToString for InputFormat {
31    fn to_string(&self) -> String {
32        match self {
33            InputFormat::Docx => "docx".into(),
34            InputFormat::Epub => "epub".into(),
35            InputFormat::Html => "html".into(),
36            InputFormat::JuypterNotebook => "ipynb".into(),
37            InputFormat::MediaWiki => "mediawiki".into(),
38            InputFormat::Markdown => "markdown".into(),
39            InputFormat::RichTextFormat => "rtf".into(),
40            InputFormat::Typst => "typst".into(),
41            InputFormat::VimWiki => "vimwiki".into(),
42        }
43    }
44}
45
46pub struct PandocLoader<R> {
47    pandoc_path: String,
48    input_format: String,
49    input: R,
50}
51
52impl<R> fmt::Debug for PandocLoader<R> {
53    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
54        f.debug_struct("PandocLoader")
55            .field("pandoc_path", &self.pandoc_path)
56            .field("input_format", &self.input_format)
57            .finish()
58    }
59}
60
61impl<R: AsyncRead + Send + Sync + Unpin + 'static> PandocLoader<R> {
62    pub fn new<S: Into<String>>(pandoc_path: S, input_format: S, input: R) -> Self {
63        PandocLoader {
64            pandoc_path: pandoc_path.into(),
65            input_format: input_format.into(),
66            input,
67        }
68    }
69
70    pub fn new_from_reader<S: Into<String>>(input_format: S, input: R) -> Self {
71        PandocLoader::new("pandoc".into(), input_format.into(), input)
72    }
73
74    pub fn with_pandoc_path<S: Into<String>>(mut self, pandoc_path: S) -> Self {
75        self.pandoc_path = pandoc_path.into();
76        self
77    }
78}
79
80impl PandocLoader<BufReader<File>> {
81    pub async fn from_path<P: AsRef<Path>, S: Into<String>>(
82        input_format: S,
83        path: P,
84    ) -> Result<Self, LoaderError> {
85        let file = File::open(path).await?;
86        let reader = BufReader::new(file);
87
88        Ok(Self::new("pandoc".into(), input_format.into(), reader))
89    }
90}
91
92#[async_trait]
93impl<R: AsyncRead + Send + Sync + Unpin + 'static> Loader for PandocLoader<R> {
94    async fn load(
95        mut self,
96    ) -> Result<
97        Pin<Box<dyn Stream<Item = Result<Document, LoaderError>> + Send + 'static>>,
98        LoaderError,
99    > {
100        // echo "# Heading1 \n ## Heading 2 \n this is a markdown" | pandoc -f markdown -t plain
101        // cat test.md | pandoc -f markdown -t plain
102
103        let mut process = Command::new(self.pandoc_path)
104            .arg("-f")
105            .arg(self.input_format)
106            .arg("-t")
107            .arg("plain")
108            .stdin(Stdio::piped())
109            .stdout(Stdio::piped())
110            .spawn()?;
111
112        // safe to unwrap since stdout/stdout has been configured.
113        let mut stdin = process.stdin.take().unwrap();
114        let mut stdout = process.stdout.take().unwrap();
115
116        tokio::spawn(async move {
117            match tokio::io::copy(&mut self.input, &mut stdin).await {
118                Ok(_) => {}
119                Err(e) => {
120                    log::error!("pandoc stdin error: {}", e.to_string());
121                }
122            }
123            stdin.flush().await.unwrap();
124            stdin.shutdown().await.unwrap();
125        });
126
127        let stdout_task = tokio::spawn(async move {
128            let mut buffer = Vec::new();
129            match tokio::io::copy(&mut stdout, &mut buffer).await {
130                Ok(_) => Ok(buffer),
131                Err(e) => Err(e),
132            }
133        });
134
135        let _exit_status = process.wait().await?;
136        let stdout_result = stdout_task.await?.unwrap();
137        let stdout_string = String::from_utf8(stdout_result).map_err(|e| {
138            LoaderError::OtherError(format!("Failed to convert to utf8 string: {}", e))
139        })?;
140
141        let doc = Document::new(stdout_string);
142        let stream = stream::iter(vec![Ok(doc)]);
143        Ok(Box::pin(stream))
144    }
145
146    async fn load_and_split<TS: TextSplitter + 'static>(
147        mut self,
148        splitter: TS,
149    ) -> Result<
150        Pin<Box<dyn Stream<Item = Result<Document, LoaderError>> + Send + 'static>>,
151        LoaderError,
152    > {
153        let doc_stream = self.load().await?;
154        let stream = process_doc_stream(doc_stream, splitter).await;
155        Ok(Box::pin(stream))
156    }
157}
158
159#[cfg(test)]
160mod tests {
161    use futures_util::StreamExt;
162
163    use super::*;
164
165    #[tokio::test]
166    async fn test_pandoc_loader() {
167        let path = "./src/document_loaders/test_data/sample.docx";
168
169        let loader = PandocLoader::from_path(InputFormat::Docx.to_string(), path)
170            .await
171            .expect("Failed to create PandocLoader");
172
173        let docs = loader
174            .load()
175            .await
176            .unwrap()
177            .map(|d| d.unwrap())
178            .collect::<Vec<_>>()
179            .await;
180
181        // only pick the first 27 characters for now
182        assert_eq!(&docs[0].page_content[..27], "Lorem ipsum dolor sit amet,");
183        assert_eq!(docs.len(), 1);
184    }
185}