1use std::path::{Path, PathBuf};
7
8use graphify_security::validate_url;
9use regex::Regex;
10use reqwest::Client;
11use thiserror::Error;
12use tracing::info;
13
14#[derive(Debug, Error)]
16pub enum IngestError {
17 #[error("HTTP error: {0}")]
18 Http(#[from] reqwest::Error),
19
20 #[error("IO error: {0}")]
21 Io(#[from] std::io::Error),
22
23 #[error("security error: {0}")]
24 Security(#[from] graphify_security::SecurityError),
25
26 #[error("ingest error: {0}")]
27 Other(String),
28}
29
30pub async fn ingest_url(url: &str, output_dir: &Path) -> Result<PathBuf, IngestError> {
35 let validated = validate_url(url)?;
36 let client = Client::new();
37
38 let url_str = validated.as_str();
39 if url_str.contains("arxiv.org") {
40 ingest_arxiv(&client, url_str, output_dir).await
41 } else if url_str.contains("twitter.com") || url_str.contains("x.com") {
42 ingest_tweet(&client, url_str, output_dir).await
43 } else if url_str.ends_with(".pdf") {
44 ingest_pdf(&client, url_str, output_dir).await
45 } else {
46 ingest_webpage(&client, url_str, output_dir).await
47 }
48}
49
50async fn ingest_arxiv(client: &Client, url: &str, out: &Path) -> Result<PathBuf, IngestError> {
52 let abs_url = url.replace("/pdf/", "/abs/");
53
54 let response = client.get(&abs_url).send().await?;
55 let html = response.text().await?;
56
57 let arxiv_id = abs_url
58 .split('/')
59 .next_back()
60 .unwrap_or("unknown")
61 .trim_end_matches(".pdf");
62
63 let title = extract_between(&html, "<title>", "</title>")
64 .unwrap_or_else(|| format!("arXiv:{arxiv_id}"));
65 let title = strip_html_tags(&title).trim().to_string();
66
67 let abstract_text = extract_between(
68 &html,
69 "<blockquote class=\"abstract mathjax\">",
70 "</blockquote>",
71 )
72 .or_else(|| extract_between(&html, "Abstract:</span>", "</blockquote>"))
73 .unwrap_or_default();
74 let abstract_text = strip_html_tags(&abstract_text).trim().to_string();
75
76 let filename = format!("arxiv_{}.md", sanitize_filename(arxiv_id));
77 let path = out.join(&filename);
78 std::fs::create_dir_all(out)?;
79
80 let content = format!(
81 "---\nsource: {url}\ntype: arxiv\narxiv_id: {arxiv_id}\ntitle: \"{title}\"\n---\n\n# {title}\n\n## Abstract\n\n{abstract_text}\n"
82 );
83 std::fs::write(&path, content)?;
84
85 info!("Ingested arXiv paper: {} -> {}", arxiv_id, path.display());
86 Ok(path)
87}
88
89async fn ingest_tweet(client: &Client, url: &str, out: &Path) -> Result<PathBuf, IngestError> {
91 let oembed_url = format!(
92 "https://publish.twitter.com/oembed?url={}&omit_script=true",
93 urlencoding::encode(url)
94 );
95
96 let response = client.get(&oembed_url).send().await?;
97
98 let (author, text) = if response.status().is_success() {
99 let json: serde_json::Value = response.json().await?;
100 let author = json
101 .get("author_name")
102 .and_then(|v| v.as_str())
103 .unwrap_or("unknown")
104 .to_string();
105 let html_content = json
106 .get("html")
107 .and_then(|v| v.as_str())
108 .unwrap_or("")
109 .to_string();
110 let text = strip_html_tags(&html_content);
111 (author, text)
112 } else {
113 ("unknown".to_string(), format!("Tweet from: {url}"))
114 };
115
116 let tweet_id = url
117 .split('/')
118 .next_back()
119 .unwrap_or("unknown")
120 .split('?')
121 .next()
122 .unwrap_or("unknown");
123
124 let filename = format!("tweet_{}.md", sanitize_filename(tweet_id));
125 let path = out.join(&filename);
126 std::fs::create_dir_all(out)?;
127
128 let content = format!(
129 "---\nsource: {}\ntype: tweet\nauthor: \"{}\"\ntweet_id: {}\n---\n\n{}\n",
130 url,
131 author,
132 tweet_id,
133 text.trim()
134 );
135 std::fs::write(&path, content)?;
136
137 info!("Ingested tweet: {} -> {}", tweet_id, path.display());
138 Ok(path)
139}
140
141async fn ingest_pdf(client: &Client, url: &str, out: &Path) -> Result<PathBuf, IngestError> {
143 let response = client.get(url).send().await?;
144 let bytes = response.bytes().await?;
145
146 let filename = url.split('/').next_back().unwrap_or("document.pdf");
147 let filename = if filename.ends_with(".pdf") {
148 filename.to_string()
149 } else {
150 format!("{filename}.pdf")
151 };
152
153 let path = out.join(&filename);
154 std::fs::create_dir_all(out)?;
155 std::fs::write(&path, &bytes)?;
156
157 info!(
158 "Ingested PDF: {} ({} bytes) -> {}",
159 url,
160 bytes.len(),
161 path.display()
162 );
163 Ok(path)
164}
165
166async fn ingest_webpage(client: &Client, url: &str, out: &Path) -> Result<PathBuf, IngestError> {
168 let response = client.get(url).send().await?;
169 let html = response.text().await?;
170
171 let title = extract_between(&html, "<title>", "</title>")
172 .map(|t| strip_html_tags(&t))
173 .unwrap_or_default();
174
175 let text = strip_scripts_and_styles(&html);
176 let text = strip_html_tags(&text);
177 let text = collapse_whitespace(&text);
178
179 let filename = sanitize_filename(url);
180 let path = out.join(format!("{filename}.md"));
181 std::fs::create_dir_all(out)?;
182
183 let content = format!(
184 "---\nsource: {}\ntype: webpage\ntitle: \"{}\"\n---\n\n# {}\n\n{}\n",
185 url,
186 title.trim(),
187 title.trim(),
188 text.trim()
189 );
190 std::fs::write(&path, content)?;
191
192 info!("Ingested webpage: {} -> {}", url, path.display());
193 Ok(path)
194}
195
196pub fn save_query_result(
201 question: &str,
202 answer: &str,
203 memory_dir: &Path,
204 query_type: &str,
205 source_nodes: Option<&[String]>,
206) -> Result<PathBuf, IngestError> {
207 std::fs::create_dir_all(memory_dir)?;
208
209 let timestamp = std::time::SystemTime::now()
210 .duration_since(std::time::UNIX_EPOCH)
211 .unwrap_or_default()
212 .as_secs();
213
214 let filename = format!("{query_type}_{timestamp}.md");
215 let path = memory_dir.join(&filename);
216
217 let nodes_str = source_nodes.map(|n| n.join(", ")).unwrap_or_default();
218
219 let content = format!(
220 "---\ntype: {query_type}\ntimestamp: {timestamp}\nnodes: [{nodes_str}]\n---\n\n## Question\n\n{question}\n\n## Answer\n\n{answer}\n"
221 );
222 std::fs::write(&path, content)?;
223
224 info!("Saved query result: {} -> {}", query_type, path.display());
225 Ok(path)
226}
227
228fn extract_between(haystack: &str, start: &str, end: &str) -> Option<String> {
230 let start_idx = haystack.find(start)? + start.len();
231 let end_idx = haystack[start_idx..].find(end)? + start_idx;
232 Some(haystack[start_idx..end_idx].to_string())
233}
234
235fn strip_scripts_and_styles(html: &str) -> String {
237 static RE_SCRIPT: std::sync::LazyLock<Regex> = std::sync::LazyLock::new(|| {
238 Regex::new(r"(?is)<script[^>]*>.*?</script>").expect("valid regex")
239 });
240 static RE_STYLE: std::sync::LazyLock<Regex> = std::sync::LazyLock::new(|| {
241 Regex::new(r"(?is)<style[^>]*>.*?</style>").expect("valid regex")
242 });
243 let result = RE_SCRIPT.replace_all(html, "");
244 RE_STYLE.replace_all(&result, "").to_string()
245}
246
247fn strip_html_tags(html: &str) -> String {
249 static RE: std::sync::LazyLock<Regex> =
250 std::sync::LazyLock::new(|| Regex::new(r"<[^>]+>").expect("valid regex"));
251 RE.replace_all(html, "").to_string()
252}
253
254fn collapse_whitespace(text: &str) -> String {
256 static RE_WS: std::sync::LazyLock<Regex> =
257 std::sync::LazyLock::new(|| Regex::new(r"[ \t]+").expect("valid regex"));
258 static RE_NL: std::sync::LazyLock<Regex> =
259 std::sync::LazyLock::new(|| Regex::new(r"\n{3,}").expect("valid regex"));
260 let result = RE_WS.replace_all(text, " ");
261 RE_NL.replace_all(&result, "\n\n").to_string()
262}
263
264fn sanitize_filename(input: &str) -> String {
266 input
267 .replace("https://", "")
268 .replace("http://", "")
269 .replace(['/', '?', '&', '=', '#', ' '], "_")
270 .chars()
271 .filter(|c| c.is_alphanumeric() || *c == '_' || *c == '-' || *c == '.')
272 .take(80)
273 .collect()
274}
275
276#[cfg(test)]
277mod tests {
278 use super::*;
279
280 #[test]
281 fn test_strip_html_tags() {
282 assert_eq!(strip_html_tags("<p>Hello <b>world</b></p>"), "Hello world");
283 assert_eq!(strip_html_tags("no tags"), "no tags");
284 assert_eq!(strip_html_tags("<br/>"), "");
285 }
286
287 #[test]
288 fn test_strip_scripts_and_styles() {
289 let html = "<p>Before</p><script>alert(1)</script><p>After</p>";
290 assert_eq!(strip_scripts_and_styles(html), "<p>Before</p><p>After</p>");
291
292 let html2 = "<style>.x{color:red}</style><p>Content</p>";
293 assert_eq!(strip_scripts_and_styles(html2), "<p>Content</p>");
294 }
295
296 #[test]
297 fn test_sanitize_filename() {
298 assert_eq!(
299 sanitize_filename("https://example.com/page?q=1"),
300 "example.com_page_q_1"
301 );
302 assert_eq!(sanitize_filename("simple"), "simple");
303 }
304
305 #[test]
306 fn test_sanitize_filename_max_length() {
307 let long_url = "a".repeat(200);
308 assert!(sanitize_filename(&long_url).len() <= 80);
309 }
310
311 #[test]
312 fn test_extract_between() {
313 assert_eq!(
314 extract_between("<title>Hello</title>", "<title>", "</title>"),
315 Some("Hello".to_string())
316 );
317 assert_eq!(extract_between("no markers", "<a>", "</a>"), None);
318 }
319
320 #[test]
321 fn test_collapse_whitespace() {
322 assert_eq!(collapse_whitespace("a b c"), "a b c");
323 assert_eq!(collapse_whitespace("a\n\n\n\nb"), "a\n\nb");
324 }
325
326 #[test]
327 fn test_save_query_result() {
328 let tmp = tempfile::tempdir().unwrap();
329 let path = save_query_result(
330 "What is Rust?",
331 "A systems programming language.",
332 tmp.path(),
333 "query",
334 Some(&["node1".to_string(), "node2".to_string()]),
335 )
336 .unwrap();
337
338 assert!(path.exists());
339 let content = std::fs::read_to_string(&path).unwrap();
340 assert!(content.contains("What is Rust?"));
341 assert!(content.contains("systems programming language"));
342 assert!(content.contains("node1, node2"));
343 assert!(content.contains("type: query"));
344 }
345
346 #[test]
347 fn test_save_query_result_no_nodes() {
348 let tmp = tempfile::tempdir().unwrap();
349 let path = save_query_result("question", "answer", tmp.path(), "chat", None).unwrap();
350
351 let content = std::fs::read_to_string(&path).unwrap();
352 assert!(content.contains("nodes: []"));
353 }
354}