langchain_rust/document_loaders/pandoc_loader/
pandoc_loader.rs1use std::{fmt, path::Path, pin::Pin, process::Stdio};
2
3use async_trait::async_trait;
4use futures_util::{stream, Stream};
5use tokio::{
6 fs::File,
7 io::{AsyncRead, AsyncWriteExt, BufReader},
8 process::Command,
9};
10
11use crate::{
12 document_loaders::{process_doc_stream, Loader, LoaderError},
13 schemas::Document,
14 text_splitter::TextSplitter,
15};
16
17#[derive(Debug)]
18pub enum InputFormat {
19 Docx,
20 Epub,
21 Html,
22 JuypterNotebook,
23 Markdown,
24 MediaWiki,
25 RichTextFormat,
26 Typst,
27 VimWiki,
28}
29
30impl ToString for InputFormat {
31 fn to_string(&self) -> String {
32 match self {
33 InputFormat::Docx => "docx".into(),
34 InputFormat::Epub => "epub".into(),
35 InputFormat::Html => "html".into(),
36 InputFormat::JuypterNotebook => "ipynb".into(),
37 InputFormat::MediaWiki => "mediawiki".into(),
38 InputFormat::Markdown => "markdown".into(),
39 InputFormat::RichTextFormat => "rtf".into(),
40 InputFormat::Typst => "typst".into(),
41 InputFormat::VimWiki => "vimwiki".into(),
42 }
43 }
44}
45
46pub struct PandocLoader<R> {
47 pandoc_path: String,
48 input_format: String,
49 input: R,
50}
51
52impl<R> fmt::Debug for PandocLoader<R> {
53 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
54 f.debug_struct("PandocLoader")
55 .field("pandoc_path", &self.pandoc_path)
56 .field("input_format", &self.input_format)
57 .finish()
58 }
59}
60
61impl<R: AsyncRead + Send + Sync + Unpin + 'static> PandocLoader<R> {
62 pub fn new<S: Into<String>>(pandoc_path: S, input_format: S, input: R) -> Self {
63 PandocLoader {
64 pandoc_path: pandoc_path.into(),
65 input_format: input_format.into(),
66 input,
67 }
68 }
69
70 pub fn new_from_reader<S: Into<String>>(input_format: S, input: R) -> Self {
71 PandocLoader::new("pandoc".into(), input_format.into(), input)
72 }
73
74 pub fn with_pandoc_path<S: Into<String>>(mut self, pandoc_path: S) -> Self {
75 self.pandoc_path = pandoc_path.into();
76 self
77 }
78}
79
80impl PandocLoader<BufReader<File>> {
81 pub async fn from_path<P: AsRef<Path>, S: Into<String>>(
82 input_format: S,
83 path: P,
84 ) -> Result<Self, LoaderError> {
85 let file = File::open(path).await?;
86 let reader = BufReader::new(file);
87
88 Ok(Self::new("pandoc".into(), input_format.into(), reader))
89 }
90}
91
92#[async_trait]
93impl<R: AsyncRead + Send + Sync + Unpin + 'static> Loader for PandocLoader<R> {
94 async fn load(
95 mut self,
96 ) -> Result<
97 Pin<Box<dyn Stream<Item = Result<Document, LoaderError>> + Send + 'static>>,
98 LoaderError,
99 > {
100 let mut process = Command::new(self.pandoc_path)
104 .arg("-f")
105 .arg(self.input_format)
106 .arg("-t")
107 .arg("plain")
108 .stdin(Stdio::piped())
109 .stdout(Stdio::piped())
110 .spawn()?;
111
112 let mut stdin = process.stdin.take().unwrap();
114 let mut stdout = process.stdout.take().unwrap();
115
116 tokio::spawn(async move {
117 match tokio::io::copy(&mut self.input, &mut stdin).await {
118 Ok(_) => {}
119 Err(e) => {
120 log::error!("pandoc stdin error: {}", e.to_string());
121 }
122 }
123 stdin.flush().await.unwrap();
124 stdin.shutdown().await.unwrap();
125 });
126
127 let stdout_task = tokio::spawn(async move {
128 let mut buffer = Vec::new();
129 match tokio::io::copy(&mut stdout, &mut buffer).await {
130 Ok(_) => Ok(buffer),
131 Err(e) => Err(e),
132 }
133 });
134
135 let _exit_status = process.wait().await?;
136 let stdout_result = stdout_task.await?.unwrap();
137 let stdout_string = String::from_utf8(stdout_result).map_err(|e| {
138 LoaderError::OtherError(format!("Failed to convert to utf8 string: {}", e))
139 })?;
140
141 let doc = Document::new(stdout_string);
142 let stream = stream::iter(vec![Ok(doc)]);
143 Ok(Box::pin(stream))
144 }
145
146 async fn load_and_split<TS: TextSplitter + 'static>(
147 mut self,
148 splitter: TS,
149 ) -> Result<
150 Pin<Box<dyn Stream<Item = Result<Document, LoaderError>> + Send + 'static>>,
151 LoaderError,
152 > {
153 let doc_stream = self.load().await?;
154 let stream = process_doc_stream(doc_stream, splitter).await;
155 Ok(Box::pin(stream))
156 }
157}
158
159#[cfg(test)]
160mod tests {
161 use futures_util::StreamExt;
162
163 use super::*;
164
165 #[tokio::test]
166 async fn test_pandoc_loader() {
167 let path = "./src/document_loaders/test_data/sample.docx";
168
169 let loader = PandocLoader::from_path(InputFormat::Docx.to_string(), path)
170 .await
171 .expect("Failed to create PandocLoader");
172
173 let docs = loader
174 .load()
175 .await
176 .unwrap()
177 .map(|d| d.unwrap())
178 .collect::<Vec<_>>()
179 .await;
180
181 assert_eq!(&docs[0].page_content[..27], "Lorem ipsum dolor sit amet,");
183 assert_eq!(docs.len(), 1);
184 }
185}