1use std::fs::File;
6use std::io::{BufRead, BufReader, Seek};
7use std::path::Path;
8
9use chrono::DateTime;
10use serde::Deserialize;
11
12use crate::Message;
13use crate::error::ChatpackError;
14
15use super::{MessageIterator, StreamingConfig, StreamingError, StreamingParser, StreamingResult};
16
17pub struct DiscordStreamingParser {
22 config: StreamingConfig,
23}
24
25impl DiscordStreamingParser {
26 pub fn new() -> Self {
28 Self {
29 config: StreamingConfig::default(),
30 }
31 }
32
33 pub fn with_config(config: StreamingConfig) -> Self {
35 Self { config }
36 }
37
38 fn is_jsonl(first_line: &str) -> bool {
40 let trimmed = first_line.trim();
41 trimmed.starts_with('{')
44 && !trimmed.contains("\"messages\"")
45 && !trimmed.contains("\"guild\"")
46 }
47}
48
49impl Default for DiscordStreamingParser {
50 fn default() -> Self {
51 Self::new()
52 }
53}
54
55impl StreamingParser for DiscordStreamingParser {
56 fn name(&self) -> &'static str {
57 "Discord (Streaming)"
58 }
59
60 fn stream(&self, file_path: &str) -> Result<Box<dyn MessageIterator>, ChatpackError> {
61 let path = Path::new(file_path);
62 let file = File::open(path)?;
63 let file_size = file.metadata()?.len();
64
65 let mut reader = BufReader::with_capacity(self.config.buffer_size, file);
66
67 let mut first_line = String::new();
69 reader.read_line(&mut first_line)?;
70 reader.seek(std::io::SeekFrom::Start(0))?;
71
72 if Self::is_jsonl(&first_line) {
73 let iterator = DiscordJsonlIterator::new(reader, file_size, self.config);
74 Ok(Box::new(iterator))
75 } else {
76 let iterator = DiscordJsonIterator::new(reader, file_size, self.config)?;
78 Ok(Box::new(iterator))
79 }
80 }
81}
82
83pub struct DiscordJsonlIterator<R: BufRead> {
85 reader: R,
86 file_size: u64,
87 bytes_read: u64,
88 config: StreamingConfig,
89 line_buffer: String,
90}
91
92impl<R: BufRead> DiscordJsonlIterator<R> {
93 fn new(reader: R, file_size: u64, config: StreamingConfig) -> Self {
94 Self {
95 reader,
96 file_size,
97 bytes_read: 0,
98 config,
99 line_buffer: String::with_capacity(4096),
100 }
101 }
102
103 fn parse_line(line: &str) -> StreamingResult<Option<Message>> {
104 let trimmed = line.trim();
105 if trimmed.is_empty() {
106 return Ok(None);
107 }
108
109 let msg: DiscordRawMessage = serde_json::from_str(trimmed)?;
110
111 if msg.content.trim().is_empty() {
112 return Ok(None);
113 }
114
115 let sender = msg.author.nickname.unwrap_or(msg.author.name);
116
117 let timestamp = DateTime::parse_from_rfc3339(&msg.timestamp)
118 .ok()
119 .map(|dt| dt.to_utc());
120
121 let edited = msg
122 .timestamp_edited
123 .as_ref()
124 .and_then(|ts| DateTime::parse_from_rfc3339(ts).ok())
125 .map(|dt| dt.to_utc());
126
127 let id = msg.id.parse::<u64>().ok();
128
129 Ok(Some(Message::with_metadata(
130 sender,
131 msg.content,
132 timestamp,
133 id,
134 None,
135 edited,
136 )))
137 }
138}
139
140impl<R: BufRead + Send> MessageIterator for DiscordJsonlIterator<R> {
141 fn progress(&self) -> Option<f64> {
142 if self.file_size == 0 {
143 return None;
144 }
145 Some((self.bytes_read as f64 / self.file_size as f64) * 100.0)
146 }
147
148 fn bytes_processed(&self) -> u64 {
149 self.bytes_read
150 }
151
152 fn total_bytes(&self) -> Option<u64> {
153 Some(self.file_size)
154 }
155}
156
157impl<R: BufRead + Send> Iterator for DiscordJsonlIterator<R> {
158 type Item = StreamingResult<Message>;
159
160 fn next(&mut self) -> Option<Self::Item> {
161 loop {
162 self.line_buffer.clear();
163 match self.reader.read_line(&mut self.line_buffer) {
164 Ok(0) => return None, Ok(n) => {
166 self.bytes_read += n as u64;
167 match Self::parse_line(&self.line_buffer) {
168 Ok(Some(msg)) => return Some(Ok(msg)),
169 Ok(None) => {}
170 Err(_) if self.config.skip_invalid => {}
171 Err(e) => return Some(Err(e)),
172 }
173 }
174 Err(e) => return Some(Err(e.into())),
175 }
176 }
177 }
178}
179
180pub struct DiscordJsonIterator<R: BufRead + Seek> {
182 reader: R,
183 file_size: u64,
184 bytes_read: u64,
185 config: StreamingConfig,
186 buffer: String,
187 finished: bool,
188 brace_depth: i32,
189}
190
191impl<R: BufRead + Seek> DiscordJsonIterator<R> {
192 fn new(mut reader: R, file_size: u64, config: StreamingConfig) -> StreamingResult<Self> {
193 let mut buffer = String::with_capacity(config.buffer_size);
194 let mut total_read = 0u64;
195
196 loop {
198 buffer.clear();
199 let bytes = reader.read_line(&mut buffer)?;
200 if bytes == 0 {
201 return Err(StreamingError::InvalidFormat(
202 "Could not find 'messages' array".into(),
203 ));
204 }
205 total_read += bytes as u64;
206
207 if buffer.contains("\"messages\"") && buffer.contains('[') {
208 break;
209 }
210
211 if total_read > 10 * 1024 * 1024 {
212 return Err(StreamingError::InvalidFormat(
213 "File header too large".into(),
214 ));
215 }
216 }
217
218 Ok(Self {
219 reader,
220 file_size,
221 bytes_read: total_read,
222 config,
223 buffer: String::with_capacity(config.max_message_size),
224 finished: false,
225 brace_depth: 0,
226 })
227 }
228
229 fn read_next_object(&mut self) -> StreamingResult<Option<String>> {
230 self.buffer.clear();
231 self.brace_depth = 0;
232 let mut found_start = false;
233
234 loop {
235 let mut line = String::new();
236 let bytes = self.reader.read_line(&mut line)?;
237
238 if bytes == 0 {
239 self.finished = true;
240 return Ok(None);
241 }
242
243 self.bytes_read += bytes as u64;
244
245 if !found_start && line.trim().starts_with(']') {
246 self.finished = true;
247 return Ok(None);
248 }
249
250 let trimmed = line.trim();
251 if !found_start && (trimmed.is_empty() || trimmed == ",") {
252 continue;
253 }
254
255 for ch in line.chars() {
256 match ch {
257 '{' => {
258 if !found_start {
259 found_start = true;
260 }
261 self.brace_depth += 1;
262 }
263 '}' => self.brace_depth -= 1,
264 _ => {}
265 }
266 }
267
268 if found_start {
269 self.buffer.push_str(&line);
270
271 if self.buffer.len() > self.config.max_message_size {
272 return Err(StreamingError::BufferOverflow {
273 max_size: self.config.max_message_size,
274 actual_size: self.buffer.len(),
275 });
276 }
277
278 if self.brace_depth == 0 {
279 return Ok(Some(self.buffer.trim().trim_end_matches(',').to_string()));
280 }
281 }
282 }
283 }
284
285 fn parse_message(json_str: &str) -> StreamingResult<Option<Message>> {
286 let msg: DiscordRawMessage = serde_json::from_str(json_str)?;
287
288 let content = msg.content;
289
290 if content.trim().is_empty() {
292 return Ok(None);
293 }
294
295 let sender = msg.author.nickname.unwrap_or(msg.author.name);
296
297 let timestamp = DateTime::parse_from_rfc3339(&msg.timestamp)
298 .ok()
299 .map(|dt| dt.to_utc());
300
301 let edited = msg
302 .timestamp_edited
303 .as_ref()
304 .and_then(|ts| DateTime::parse_from_rfc3339(ts).ok())
305 .map(|dt| dt.to_utc());
306
307 let id = msg.id.parse::<u64>().ok();
308
309 let reply_to = msg
310 .reference
311 .and_then(|r| r.message_id)
312 .and_then(|id| id.parse::<u64>().ok());
313
314 Ok(Some(Message::with_metadata(
315 sender, content, timestamp, id, reply_to, edited,
316 )))
317 }
318}
319
320impl<R: BufRead + Seek + Send> MessageIterator for DiscordJsonIterator<R> {
321 fn progress(&self) -> Option<f64> {
322 if self.file_size == 0 {
323 return None;
324 }
325 Some((self.bytes_read as f64 / self.file_size as f64) * 100.0)
326 }
327
328 fn bytes_processed(&self) -> u64 {
329 self.bytes_read
330 }
331
332 fn total_bytes(&self) -> Option<u64> {
333 Some(self.file_size)
334 }
335}
336
337impl<R: BufRead + Seek + Send> Iterator for DiscordJsonIterator<R> {
338 type Item = StreamingResult<Message>;
339
340 fn next(&mut self) -> Option<Self::Item> {
341 if self.finished {
342 return None;
343 }
344
345 loop {
346 match self.read_next_object() {
347 Ok(Some(json_str)) => match Self::parse_message(&json_str) {
348 Ok(Some(msg)) => return Some(Ok(msg)),
349 Ok(None) => {}
350 Err(_) if self.config.skip_invalid => {}
351 Err(e) => return Some(Err(e)),
352 },
353 Ok(None) => return None,
354 Err(_) if self.config.skip_invalid => {}
355 Err(e) => return Some(Err(e)),
356 }
357 }
358 }
359}
360
361#[derive(Debug, Deserialize)]
363#[serde(rename_all = "camelCase")]
364struct DiscordRawMessage {
365 id: String,
366 timestamp: String,
367 timestamp_edited: Option<String>,
368 content: String,
369 author: DiscordAuthor,
370 reference: Option<DiscordReference>,
371}
372
373#[derive(Debug, Deserialize)]
374#[serde(rename_all = "camelCase")]
375struct DiscordAuthor {
376 name: String,
377 nickname: Option<String>,
378}
379
380#[derive(Debug, Deserialize)]
381#[serde(rename_all = "camelCase")]
382struct DiscordReference {
383 message_id: Option<String>,
384}
385
386#[cfg(test)]
387mod tests {
388 use super::*;
389 use std::io::Cursor;
390
391 #[test]
396 fn test_parser_new() {
397 let parser = DiscordStreamingParser::new();
398 assert_eq!(parser.name(), "Discord (Streaming)");
399 }
400
401 #[test]
402 fn test_parser_default() {
403 let parser = DiscordStreamingParser::default();
404 assert_eq!(parser.name(), "Discord (Streaming)");
405 }
406
407 #[test]
408 fn test_parser_with_config() {
409 let config = StreamingConfig::new()
410 .with_buffer_size(128 * 1024)
411 .with_skip_invalid(true);
412 let parser = DiscordStreamingParser::with_config(config);
413 assert_eq!(parser.name(), "Discord (Streaming)");
414 }
415
416 #[test]
417 fn test_parser_name() {
418 let parser = DiscordStreamingParser::new();
419 assert_eq!(parser.name(), "Discord (Streaming)");
420 }
421
422 #[test]
427 fn test_is_jsonl_detection() {
428 assert!(DiscordStreamingParser::is_jsonl(
430 r#"{"id":"1","timestamp":"2024-01-01T00:00:00Z","content":"hi","author":{"name":"bob"}}"#
431 ));
432
433 assert!(!DiscordStreamingParser::is_jsonl(
435 r#"{"guild":{"id":"123"},"messages":["#
436 ));
437 }
438
439 #[test]
440 fn test_is_jsonl_with_messages_key() {
441 assert!(!DiscordStreamingParser::is_jsonl(r#"{"messages":[]}"#));
443 }
444
445 #[test]
446 fn test_is_jsonl_with_guild_key() {
447 assert!(!DiscordStreamingParser::is_jsonl(
449 r#"{"guild":{"id":"123"}}"#
450 ));
451 }
452
453 #[test]
454 fn test_is_jsonl_whitespace() {
455 assert!(DiscordStreamingParser::is_jsonl(
457 r#" {"id":"1","timestamp":"2024-01-01T00:00:00Z","content":"hi","author":{"name":"bob"}}"#
458 ));
459 }
460
461 #[test]
466 fn test_jsonl_iterator_basic() {
467 let jsonl = r#"{"id":"1","timestamp":"2024-01-01T00:00:00Z","content":"Hello","author":{"name":"Alice"}}
468{"id":"2","timestamp":"2024-01-01T00:01:00Z","content":"Hi there","author":{"name":"Bob"}}"#;
469
470 let cursor = Cursor::new(jsonl.as_bytes().to_vec());
471 let config = StreamingConfig::default();
472 let mut iter = DiscordJsonlIterator::new(cursor, jsonl.len() as u64, config);
473
474 let msg1 = iter.next().expect("should have message").expect("parse ok");
475 assert_eq!(msg1.sender, "Alice");
476 assert_eq!(msg1.content, "Hello");
477
478 let msg2 = iter.next().expect("should have message").expect("parse ok");
479 assert_eq!(msg2.sender, "Bob");
480 assert_eq!(msg2.content, "Hi there");
481
482 assert!(iter.next().is_none());
483 }
484
485 #[test]
486 fn test_jsonl_iterator_with_nickname() {
487 let jsonl = r#"{"id":"1","timestamp":"2024-01-01T00:00:00Z","content":"Hello","author":{"name":"alice123","nickname":"Alice"}}"#;
488
489 let cursor = Cursor::new(jsonl.as_bytes().to_vec());
490 let config = StreamingConfig::default();
491 let mut iter = DiscordJsonlIterator::new(cursor, jsonl.len() as u64, config);
492
493 let msg = iter.next().expect("should have message").expect("parse ok");
494 assert_eq!(msg.sender, "Alice"); }
496
497 #[test]
498 fn test_jsonl_iterator_skips_empty_content() {
499 let jsonl = r#"{"id":"1","timestamp":"2024-01-01T00:00:00Z","content":"Hello","author":{"name":"Alice"}}
500{"id":"2","timestamp":"2024-01-01T00:01:00Z","content":" ","author":{"name":"Bob"}}
501{"id":"3","timestamp":"2024-01-01T00:02:00Z","content":"World","author":{"name":"Charlie"}}"#;
502
503 let cursor = Cursor::new(jsonl.as_bytes().to_vec());
504 let config = StreamingConfig::default();
505 let iter = DiscordJsonlIterator::new(cursor, jsonl.len() as u64, config);
506
507 let messages: Vec<_> = iter.filter_map(|r| r.ok()).collect();
508 assert_eq!(messages.len(), 2);
509 assert_eq!(messages[0].sender, "Alice");
510 assert_eq!(messages[1].sender, "Charlie");
511 }
512
513 #[test]
514 fn test_jsonl_iterator_skips_empty_lines() {
515 let jsonl = r#"{"id":"1","timestamp":"2024-01-01T00:00:00Z","content":"Hello","author":{"name":"Alice"}}
516
517{"id":"2","timestamp":"2024-01-01T00:01:00Z","content":"World","author":{"name":"Bob"}}"#;
518
519 let cursor = Cursor::new(jsonl.as_bytes().to_vec());
520 let config = StreamingConfig::default();
521 let iter = DiscordJsonlIterator::new(cursor, jsonl.len() as u64, config);
522
523 let messages: Vec<_> = iter.filter_map(|r| r.ok()).collect();
524 assert_eq!(messages.len(), 2);
525 }
526
527 #[test]
528 fn test_jsonl_iterator_with_edited_timestamp() {
529 let jsonl = r#"{"id":"1","timestamp":"2024-01-01T00:00:00Z","timestampEdited":"2024-01-01T00:05:00Z","content":"Edited","author":{"name":"Alice"}}"#;
530
531 let cursor = Cursor::new(jsonl.as_bytes().to_vec());
532 let config = StreamingConfig::default();
533 let mut iter = DiscordJsonlIterator::new(cursor, jsonl.len() as u64, config);
534
535 let msg = iter.next().expect("should have message").expect("parse ok");
536 assert!(msg.edited.is_some());
537 }
538
539 #[test]
540 fn test_jsonl_iterator_progress() {
541 let jsonl = r#"{"id":"1","timestamp":"2024-01-01T00:00:00Z","content":"Hello","author":{"name":"Alice"}}"#;
542
543 let cursor = Cursor::new(jsonl.as_bytes().to_vec());
544 let file_size = jsonl.len() as u64;
545 let config = StreamingConfig::default();
546 let iter = DiscordJsonlIterator::new(cursor, file_size, config);
547
548 assert_eq!(iter.total_bytes(), Some(file_size));
549 assert_eq!(iter.bytes_processed(), 0);
550 let progress = iter.progress();
551 assert!(progress.is_some());
552 }
553
554 #[test]
555 fn test_jsonl_iterator_zero_file_size() {
556 let jsonl = "";
557 let cursor = Cursor::new(jsonl.as_bytes().to_vec());
558 let config = StreamingConfig::default();
559 let iter = DiscordJsonlIterator::new(cursor, 0, config);
560
561 assert!(iter.progress().is_none());
562 }
563
564 #[test]
565 fn test_jsonl_iterator_skip_invalid() {
566 let jsonl = r#"{"id":"1","timestamp":"2024-01-01T00:00:00Z","content":"Hello","author":{"name":"Alice"}}
567invalid json line
568{"id":"2","timestamp":"2024-01-01T00:01:00Z","content":"World","author":{"name":"Bob"}}"#;
569
570 let cursor = Cursor::new(jsonl.as_bytes().to_vec());
571 let config = StreamingConfig::new().with_skip_invalid(true);
572 let iter = DiscordJsonlIterator::new(cursor, jsonl.len() as u64, config);
573
574 let messages: Vec<_> = iter.filter_map(|r| r.ok()).collect();
575 assert_eq!(messages.len(), 2);
576 }
577
578 #[test]
579 fn test_jsonl_iterator_error_on_invalid() {
580 let jsonl = r#"{"id":"1","timestamp":"2024-01-01T00:00:00Z","content":"Hello","author":{"name":"Alice"}}
581invalid json line"#;
582
583 let cursor = Cursor::new(jsonl.as_bytes().to_vec());
584 let config = StreamingConfig::new().with_skip_invalid(false);
585 let mut iter = DiscordJsonlIterator::new(cursor, jsonl.len() as u64, config);
586
587 let _ = iter.next(); let result = iter.next();
589 assert!(result.is_some());
590 assert!(result.unwrap().is_err());
591 }
592
593 #[test]
598 fn test_json_iterator_basic() {
599 let json = r#"{"guild":{"id":"123"},"messages":[
600{"id":"1","timestamp":"2024-01-01T00:00:00Z","content":"Hello","author":{"name":"Alice"}},
601{"id":"2","timestamp":"2024-01-01T00:01:00Z","content":"Hi","author":{"name":"Bob"}}
602]}"#;
603
604 let cursor = Cursor::new(json.as_bytes().to_vec());
605 let file_size = json.len() as u64;
606 let config = StreamingConfig::default();
607 let iter = DiscordJsonIterator::new(cursor, file_size, config).expect("create iterator");
608
609 let messages: Vec<_> = iter.filter_map(|r| r.ok()).collect();
610 assert_eq!(messages.len(), 2);
611 assert_eq!(messages[0].sender, "Alice");
612 assert_eq!(messages[1].sender, "Bob");
613 }
614
615 #[test]
616 fn test_json_iterator_with_reference() {
617 let json = r#"{"messages":[
618{"id":"1","timestamp":"2024-01-01T00:00:00Z","content":"Hello","author":{"name":"Alice"}},
619{"id":"2","timestamp":"2024-01-01T00:01:00Z","content":"Reply","author":{"name":"Bob"},"reference":{"messageId":"1"}}
620]}"#;
621
622 let cursor = Cursor::new(json.as_bytes().to_vec());
623 let file_size = json.len() as u64;
624 let config = StreamingConfig::default();
625 let iter = DiscordJsonIterator::new(cursor, file_size, config).expect("create iterator");
626
627 let messages: Vec<_> = iter.filter_map(|r| r.ok()).collect();
628 assert_eq!(messages.len(), 2);
629 assert_eq!(messages[1].reply_to, Some(1));
630 }
631
632 #[test]
633 fn test_json_iterator_skips_empty_content() {
634 let json = r#"{"messages":[
635{"id":"1","timestamp":"2024-01-01T00:00:00Z","content":"Hello","author":{"name":"Alice"}},
636{"id":"2","timestamp":"2024-01-01T00:01:00Z","content":"","author":{"name":"Bob"}},
637{"id":"3","timestamp":"2024-01-01T00:02:00Z","content":"World","author":{"name":"Charlie"}}
638]}"#;
639
640 let cursor = Cursor::new(json.as_bytes().to_vec());
641 let file_size = json.len() as u64;
642 let config = StreamingConfig::default();
643 let iter = DiscordJsonIterator::new(cursor, file_size, config).expect("create iterator");
644
645 let messages: Vec<_> = iter.filter_map(|r| r.ok()).collect();
646 assert_eq!(messages.len(), 2);
647 }
648
649 #[test]
650 fn test_json_iterator_missing_messages_array() {
651 let json = r#"{"guild":{"id":"123"}}"#;
652
653 let cursor = Cursor::new(json.as_bytes().to_vec());
654 let file_size = json.len() as u64;
655 let config = StreamingConfig::default();
656 let result = DiscordJsonIterator::new(cursor, file_size, config);
657 assert!(result.is_err());
658 }
659
660 #[test]
661 fn test_json_iterator_progress() {
662 let json = r#"{"messages":[
663{"id":"1","timestamp":"2024-01-01T00:00:00Z","content":"Hello","author":{"name":"Alice"}}
664]}"#;
665
666 let cursor = Cursor::new(json.as_bytes().to_vec());
667 let file_size = json.len() as u64;
668 let config = StreamingConfig::default();
669 let iter = DiscordJsonIterator::new(cursor, file_size, config).expect("create iterator");
670
671 assert_eq!(iter.total_bytes(), Some(file_size));
672 assert!(iter.bytes_processed() > 0); }
674
675 #[test]
676 fn test_json_iterator_zero_file_size() {
677 let json = r#"{"messages":[]}"#;
678
679 let cursor = Cursor::new(json.as_bytes().to_vec());
680 let config = StreamingConfig::default();
681 let iter = DiscordJsonIterator::new(cursor, 0, config).expect("create iterator");
682
683 assert!(iter.progress().is_none());
684 }
685
686 #[test]
691 fn test_parse_line_valid() {
692 let line = r#"{"id":"1","timestamp":"2024-01-01T00:00:00Z","content":"Hello","author":{"name":"Alice"}}"#;
693 let result = DiscordJsonlIterator::<Cursor<Vec<u8>>>::parse_line(line);
694 assert!(result.is_ok());
695 let msg = result.unwrap();
696 assert!(msg.is_some());
697 assert_eq!(msg.unwrap().sender, "Alice");
698 }
699
700 #[test]
701 fn test_parse_line_empty() {
702 let result = DiscordJsonlIterator::<Cursor<Vec<u8>>>::parse_line("");
703 assert!(result.is_ok());
704 assert!(result.unwrap().is_none());
705 }
706
707 #[test]
708 fn test_parse_line_whitespace_only() {
709 let result = DiscordJsonlIterator::<Cursor<Vec<u8>>>::parse_line(" ");
710 assert!(result.is_ok());
711 assert!(result.unwrap().is_none());
712 }
713
714 #[test]
715 fn test_parse_line_empty_content() {
716 let line = r#"{"id":"1","timestamp":"2024-01-01T00:00:00Z","content":"","author":{"name":"Alice"}}"#;
717 let result = DiscordJsonlIterator::<Cursor<Vec<u8>>>::parse_line(line);
718 assert!(result.is_ok());
719 assert!(result.unwrap().is_none());
720 }
721
722 #[test]
723 fn test_parse_line_invalid_json() {
724 let result = DiscordJsonlIterator::<Cursor<Vec<u8>>>::parse_line("not json");
725 assert!(result.is_err());
726 }
727}