1use crate::constants;
4use crate::format::format_std_duration_ms;
5use crate::operations::Operation;
6use anyhow::{Result, bail};
7use chrono::{DateTime, Utc};
8use log::{debug, info};
9use std::collections::{HashMap, HashSet};
10use std::fs::{self, File, OpenOptions};
11use std::io::{BufRead, BufReader, BufWriter, Write};
12use std::path::{Path, PathBuf};
13
14pub struct Mempool {
17 operations: Vec<Operation>,
18 did_index: HashMap<String, Vec<usize>>, target_bundle: u32,
21 min_timestamp: DateTime<Utc>,
22 file: PathBuf,
23 validated: bool,
24 dirty: bool,
25 verbose: bool,
26
27 last_saved_len: usize,
29 last_save_time: std::time::Instant,
30 save_threshold: usize,
31 save_interval: std::time::Duration,
32}
33
34impl Mempool {
35 pub fn new(
37 bundle_dir: &Path,
38 target_bundle: u32,
39 min_timestamp: DateTime<Utc>,
40 verbose: bool,
41 ) -> Result<Self> {
42 let filename = format!(
43 "{}{:06}.jsonl",
44 constants::MEMPOOL_FILE_PREFIX,
45 target_bundle
46 );
47 let file = bundle_dir.join(filename);
48
49 let mut mempool = Self {
50 operations: Vec::new(),
51 did_index: HashMap::new(),
52 target_bundle,
53 min_timestamp,
54 file,
55 validated: false,
56 dirty: false,
57 verbose,
58 last_saved_len: 0,
59 last_save_time: std::time::Instant::now(),
60 save_threshold: 100,
61 save_interval: std::time::Duration::from_secs(15),
62 };
63
64 if mempool.file.exists() {
66 mempool.load()?;
67 }
68
69 Ok(mempool)
70 }
71
72 pub fn add(&mut self, ops: Vec<Operation>) -> Result<usize> {
74 if ops.is_empty() {
75 return Ok(0);
76 }
77
78 let mut existing_cids: HashSet<String> = self
80 .operations
81 .iter()
82 .filter_map(|op| op.cid.clone())
83 .collect();
84
85 let mut new_ops = Vec::new();
86 let total_in = ops.len();
87 let mut skipped_no_cid = 0usize;
88 let mut skipped_dupe = 0usize;
89
90 let mut last_time = if !self.operations.is_empty() {
92 self.parse_timestamp(&self.operations.last().unwrap().created_at)?
93 } else {
94 self.min_timestamp
95 };
96
97 let start_add = std::time::Instant::now();
98 let mut first_added_time: Option<DateTime<Utc>> = None;
99 let mut last_added_time: Option<DateTime<Utc>> = None;
100 for op in ops {
101 let cid = match &op.cid {
103 Some(c) => c,
104 None => {
105 skipped_no_cid += 1;
106 continue;
107 }
108 };
109
110 if existing_cids.contains(cid) {
112 skipped_dupe += 1;
113 continue;
114 }
115
116 let op_time = self.parse_timestamp(&op.created_at)?;
117
118 if op_time < last_time {
120 bail!(
121 "chronological violation: operation {} at {} is before {}",
122 cid,
123 op.created_at,
124 last_time.to_rfc3339()
125 );
126 }
127
128 if op_time < self.min_timestamp {
130 bail!(
131 "operation {} at {} is before minimum timestamp {} (belongs in earlier bundle)",
132 cid,
133 op.created_at,
134 self.min_timestamp.to_rfc3339()
135 );
136 }
137
138 new_ops.push(op.clone());
139 existing_cids.insert(cid.clone());
140 last_time = op_time;
141 if first_added_time.is_none() {
142 first_added_time = Some(op_time);
143 }
144 last_added_time = Some(op_time);
145 }
146
147 let added = new_ops.len();
148
149 let start_idx = self.operations.len();
151 self.operations.extend(new_ops);
152
153 for (offset, op) in self.operations[start_idx..].iter().enumerate() {
155 let idx = start_idx + offset;
156 self.did_index.entry(op.did.clone()).or_default().push(idx);
157 }
158
159 self.validated = true;
160 self.dirty = true;
161
162 if self.verbose {
163 let dur = start_add.elapsed();
164 info!(
165 "mempool add: +{} unique from {} ({} no-cid, {} dupes) in {} • total {}",
166 added,
167 total_in,
168 skipped_no_cid,
169 skipped_dupe,
170 format_std_duration_ms(dur),
171 self.operations.len()
172 );
173 if let (Some(f), Some(l)) = (first_added_time, last_added_time) {
174 debug!(
175 "mempool add range: {} → {}",
176 f.format("%Y-%m-%d %H:%M:%S"),
177 l.format("%Y-%m-%d %H:%M:%S")
178 );
179 }
180 if added == 0 && (skipped_no_cid > 0 || skipped_dupe > 0) {
181 debug!("mempool add made no progress");
182 }
183 }
184
185 Ok(added)
186 }
187
188 pub fn validate(&self) -> Result<()> {
190 if self.operations.is_empty() {
191 return Ok(());
192 }
193
194 for (i, op) in self.operations.iter().enumerate() {
196 let op_time = self.parse_timestamp(&op.created_at)?;
197 if op_time < self.min_timestamp {
198 bail!(
199 "operation {} (CID: {:?}) at {} is before minimum timestamp {}",
200 i,
201 op.cid,
202 op.created_at,
203 self.min_timestamp.to_rfc3339()
204 );
205 }
206 }
207
208 for i in 1..self.operations.len() {
210 let prev = &self.operations[i - 1];
211 let curr = &self.operations[i];
212
213 let prev_time = self.parse_timestamp(&prev.created_at)?;
214 let curr_time = self.parse_timestamp(&curr.created_at)?;
215
216 if curr_time < prev_time {
217 bail!(
218 "chronological violation at index {}: {:?} ({}) is before {:?} ({})",
219 i,
220 curr.cid,
221 curr.created_at,
222 prev.cid,
223 prev.created_at
224 );
225 }
226 }
227
228 let mut cid_map: HashMap<String, usize> = HashMap::new();
230 for (i, op) in self.operations.iter().enumerate() {
231 if let Some(cid) = &op.cid {
232 if let Some(prev_idx) = cid_map.get(cid) {
233 bail!("duplicate CID {} at indices {} and {}", cid, prev_idx, i);
234 }
235 cid_map.insert(cid.clone(), i);
236 }
237 }
238
239 Ok(())
240 }
241
242 pub fn count(&self) -> usize {
244 self.operations.len()
245 }
246
247 pub fn take(&mut self, n: usize) -> Result<Vec<Operation>> {
249 self.validate_locked()?;
251
252 let take_count = n.min(self.operations.len());
253
254 let result: Vec<Operation> = self.operations.drain(..take_count).collect();
255
256 self.rebuild_did_index();
258
259 self.last_saved_len = 0;
263 self.last_save_time = std::time::Instant::now();
264
265 self.dirty = !self.operations.is_empty();
267 self.validated = false;
268
269 Ok(result)
270 }
271
272 fn validate_locked(&mut self) -> Result<()> {
274 if self.validated {
275 return Ok(());
276 }
277
278 if self.operations.is_empty() {
279 return Ok(());
280 }
281
282 let mut last_time = self.min_timestamp;
284 for (i, op) in self.operations.iter().enumerate() {
285 let op_time = self.parse_timestamp(&op.created_at)?;
286 if op_time < last_time {
287 bail!(
288 "chronological violation at index {}: {} is before {}",
289 i,
290 op.created_at,
291 last_time.to_rfc3339()
292 );
293 }
294 last_time = op_time;
295 }
296
297 self.validated = true;
298 Ok(())
299 }
300
301 pub fn peek(&self, n: usize) -> Vec<Operation> {
303 let count = n.min(self.operations.len());
304 self.operations[..count].to_vec()
305 }
306
307 pub fn clear(&mut self) {
309 let prev = self.operations.len();
310 self.operations.clear();
311 self.did_index.clear();
312 self.validated = false;
313 self.dirty = true;
314 if self.verbose {
315 info!("mempool clear: removed {} ops", prev);
316 }
317 }
318
319 pub fn should_save(&self) -> bool {
321 if !self.dirty {
322 return false;
323 }
324
325 let new_ops = self.operations.len().saturating_sub(self.last_saved_len);
326 let time_since_last_save = self.last_save_time.elapsed();
327
328 new_ops >= self.save_threshold || time_since_last_save >= self.save_interval
329 }
330
331 pub fn save_if_needed(&mut self) -> Result<()> {
333 if !self.should_save() {
334 return Ok(());
335 }
336 self.save()
337 }
338
339 pub fn save(&mut self) -> Result<()> {
341 if !self.dirty {
342 return Ok(());
343 }
344
345 if self.operations.is_empty() {
346 if self.file.exists() {
348 fs::remove_file(&self.file)?;
349 }
350 self.last_saved_len = 0;
351 self.last_save_time = std::time::Instant::now();
352 self.dirty = false;
353 return Ok(());
354 }
355
356 self.validate_locked()?;
358
359 if self.last_saved_len > self.operations.len() {
361 if self.verbose {
362 eprintln!(
363 "Warning: lastSavedLen ({}) > operations ({}), resetting to 0",
364 self.last_saved_len,
365 self.operations.len()
366 );
367 }
368 self.last_saved_len = 0;
369 }
370
371 let new_ops = &self.operations[self.last_saved_len..];
373
374 if new_ops.is_empty() {
375 self.dirty = false;
377 return Ok(());
378 }
379
380 let file = OpenOptions::new()
382 .create(true)
383 .append(true)
384 .open(&self.file)?;
385
386 let mut writer = BufWriter::new(file);
387
388 let mut bytes_written = 0usize;
393 let mut appended = 0usize;
394 for op in new_ops {
395 let json = if let Some(ref raw) = op.raw_json {
396 raw.clone()
397 } else {
398 sonic_rs::to_string(op)?
401 };
402 writeln!(writer, "{}", json)?;
403 bytes_written += json.len() + 1;
404 appended += 1;
405 }
406
407 writer.flush()?;
408
409 let file = writer.into_inner()?;
411 file.sync_all()?;
412
413 self.last_saved_len = self.operations.len();
414 self.last_save_time = std::time::Instant::now();
415 self.dirty = false;
416
417 if self.verbose {
418 info!(
419 "mempool save: appended {} ops ({} bytes) to {}",
420 appended,
421 bytes_written,
422 self.get_filename()
423 );
424 }
425
426 Ok(())
427 }
428
429 pub fn load(&mut self) -> Result<()> {
431 let start = std::time::Instant::now();
432 let file = File::open(&self.file)?;
433 let reader = BufReader::new(file);
434
435 self.operations.clear();
436 self.did_index.clear();
437
438 for line in reader.lines() {
439 let line = line?;
440 if line.trim().is_empty() {
441 continue;
442 }
443
444 let op = Operation::from_json(&line)?;
450 let idx = self.operations.len();
451 self.operations.push(op);
452
453 let did = self.operations[idx].did.clone();
455 self.did_index.entry(did).or_default().push(idx);
456 }
457
458 self.validate_locked()?;
460
461 self.last_saved_len = self.operations.len();
463 self.last_save_time = std::time::Instant::now();
464 self.dirty = false;
465
466 if self.verbose {
467 info!(
468 "mempool load: {} ops for bundle {:06} in {}",
469 self.operations.len(),
470 self.target_bundle,
471 format_std_duration_ms(start.elapsed())
472 );
473 }
474
475 Ok(())
476 }
477
478 pub fn get_first_time(&self) -> Option<String> {
480 self.operations.first().map(|op| op.created_at.clone())
481 }
482
483 pub fn get_last_time(&self) -> Option<String> {
485 self.operations.last().map(|op| op.created_at.clone())
486 }
487
488 pub fn get_target_bundle(&self) -> u32 {
490 self.target_bundle
491 }
492
493 pub fn get_min_timestamp(&self) -> DateTime<Utc> {
495 self.min_timestamp
496 }
497
498 pub fn stats(&self) -> MempoolStats {
500 let count = self.operations.len();
501
502 let mut stats = MempoolStats {
503 count,
504 can_create_bundle: count >= constants::BUNDLE_SIZE,
505 target_bundle: self.target_bundle,
506 min_timestamp: self.min_timestamp,
507 validated: self.validated,
508 first_time: None,
509 last_time: None,
510 size_bytes: None,
511 did_count: None,
512 };
513
514 if count > 0 {
515 stats.first_time = self
516 .operations
517 .first()
518 .and_then(|op| self.parse_timestamp(&op.created_at).ok());
519 stats.last_time = self
520 .operations
521 .last()
522 .and_then(|op| self.parse_timestamp(&op.created_at).ok());
523
524 let mut total_size = 0;
525 for op in &self.operations {
526 if let Ok(json) = serde_json::to_string(op) {
527 total_size += json.len();
528 }
529 }
530
531 stats.size_bytes = Some(total_size);
532 stats.did_count = Some(self.did_index.len());
533 }
534
535 stats
536 }
537
538 pub fn delete(&self) -> Result<()> {
540 if self.file.exists() {
541 fs::remove_file(&self.file)?;
542 }
543 Ok(())
544 }
545
546 pub fn get_filename(&self) -> String {
548 self.file
549 .file_name()
550 .and_then(|n| n.to_str())
551 .unwrap_or("")
552 .to_string()
553 }
554
555 pub fn find_did_operations(&self, did: &str) -> Vec<Operation> {
558 if let Some(indices) = self.did_index.get(did) {
559 indices
561 .iter()
562 .map(|&idx| self.operations[idx].clone())
563 .collect()
564 } else {
565 Vec::new()
567 }
568 }
569
570 fn rebuild_did_index(&mut self) {
572 self.did_index.clear();
573 for (idx, op) in self.operations.iter().enumerate() {
574 self.did_index.entry(op.did.clone()).or_default().push(idx);
575 }
576 }
577
578 pub fn find_latest_did_operation(&self, did: &str) -> Option<(Operation, usize)> {
582 if let Some(indices) = self.did_index.get(did) {
583 indices.iter().rev().find_map(|&idx| {
586 let op = &self.operations[idx];
587 if !op.nullified {
588 Some((op.clone(), idx))
589 } else {
590 None
591 }
592 })
593 } else {
594 None
595 }
596 }
597
598 pub fn get_operations(&self) -> &[Operation] {
600 &self.operations
601 }
602
603 fn parse_timestamp(&self, s: &str) -> Result<DateTime<Utc>> {
605 Ok(DateTime::parse_from_rfc3339(s)?.with_timezone(&Utc))
606 }
607}
608
609#[derive(Debug, Clone)]
610pub struct MempoolStats {
611 pub count: usize,
612 pub can_create_bundle: bool,
613 pub target_bundle: u32,
614 pub min_timestamp: DateTime<Utc>,
615 pub validated: bool,
616 pub first_time: Option<DateTime<Utc>>,
617 pub last_time: Option<DateTime<Utc>>,
618 pub size_bytes: Option<usize>,
619 pub did_count: Option<usize>,
620}
621
622#[cfg(test)]
623mod tests {
624 use super::*;
625 use crate::operations::Operation;
626 use sonic_rs::Value;
627 use tempfile::TempDir;
628
629 fn create_test_operation(did: &str, cid: &str, created_at: &str) -> Operation {
630 Operation {
631 did: did.to_string(),
632 operation: Value::new(),
633 cid: Some(cid.to_string()),
634 nullified: false,
635 created_at: created_at.to_string(),
636 extra: Value::new(),
637 raw_json: None,
638 }
639 }
640
641 #[test]
642 fn test_mempool_count() {
643 let tmp = TempDir::new().unwrap();
644 let min_time = DateTime::parse_from_rfc3339("2024-01-01T00:00:00Z")
645 .unwrap()
646 .with_timezone(&Utc);
647 let mut mempool = Mempool::new(tmp.path(), 1, min_time, false).unwrap();
648
649 assert_eq!(mempool.count(), 0);
650
651 let ops = vec![
652 create_test_operation("did:plc:test1", "cid1", "2024-01-01T00:00:01Z"),
653 create_test_operation("did:plc:test2", "cid2", "2024-01-01T00:00:02Z"),
654 ];
655 mempool.add(ops).unwrap();
656 assert_eq!(mempool.count(), 2);
657 }
658
659 #[test]
660 fn test_mempool_get_target_bundle() {
661 let tmp = TempDir::new().unwrap();
662 let min_time = DateTime::parse_from_rfc3339("2024-01-01T00:00:00Z")
663 .unwrap()
664 .with_timezone(&Utc);
665 let mempool = Mempool::new(tmp.path(), 42, min_time, false).unwrap();
666 assert_eq!(mempool.get_target_bundle(), 42);
667 }
668
669 #[test]
670 fn test_mempool_get_min_timestamp() {
671 let tmp = TempDir::new().unwrap();
672 let min_time = DateTime::parse_from_rfc3339("2024-01-01T00:00:00Z")
673 .unwrap()
674 .with_timezone(&Utc);
675 let mempool = Mempool::new(tmp.path(), 1, min_time, false).unwrap();
676 assert_eq!(mempool.get_min_timestamp(), min_time);
677 }
678
679 #[test]
680 fn test_mempool_get_first_time() {
681 let tmp = TempDir::new().unwrap();
682 let min_time = DateTime::parse_from_rfc3339("2024-01-01T00:00:00Z")
683 .unwrap()
684 .with_timezone(&Utc);
685 let mut mempool = Mempool::new(tmp.path(), 1, min_time, false).unwrap();
686
687 assert_eq!(mempool.get_first_time(), None);
688
689 let ops = vec![create_test_operation(
690 "did:plc:test1",
691 "cid1",
692 "2024-01-01T00:00:01Z",
693 )];
694 mempool.add(ops).unwrap();
695 assert_eq!(
696 mempool.get_first_time(),
697 Some("2024-01-01T00:00:01Z".to_string())
698 );
699 }
700
701 #[test]
702 fn test_mempool_get_last_time() {
703 let tmp = TempDir::new().unwrap();
704 let min_time = DateTime::parse_from_rfc3339("2024-01-01T00:00:00Z")
705 .unwrap()
706 .with_timezone(&Utc);
707 let mut mempool = Mempool::new(tmp.path(), 1, min_time, false).unwrap();
708
709 assert_eq!(mempool.get_last_time(), None);
710
711 let ops = vec![
712 create_test_operation("did:plc:test1", "cid1", "2024-01-01T00:00:01Z"),
713 create_test_operation("did:plc:test2", "cid2", "2024-01-01T00:00:02Z"),
714 ];
715 mempool.add(ops).unwrap();
716 assert_eq!(
717 mempool.get_last_time(),
718 Some("2024-01-01T00:00:02Z".to_string())
719 );
720 }
721
722 #[test]
723 fn test_mempool_peek() {
724 let tmp = TempDir::new().unwrap();
725 let min_time = DateTime::parse_from_rfc3339("2024-01-01T00:00:00Z")
726 .unwrap()
727 .with_timezone(&Utc);
728 let mut mempool = Mempool::new(tmp.path(), 1, min_time, false).unwrap();
729
730 let ops = vec![
731 create_test_operation("did:plc:test1", "cid1", "2024-01-01T00:00:01Z"),
732 create_test_operation("did:plc:test2", "cid2", "2024-01-01T00:00:02Z"),
733 create_test_operation("did:plc:test3", "cid3", "2024-01-01T00:00:03Z"),
734 ];
735 mempool.add(ops).unwrap();
736
737 let peeked = mempool.peek(2);
738 assert_eq!(peeked.len(), 2);
739 assert_eq!(peeked[0].cid, Some("cid1".to_string()));
740 assert_eq!(peeked[1].cid, Some("cid2".to_string()));
741
742 assert_eq!(mempool.count(), 3);
744 }
745
746 #[test]
747 fn test_mempool_peek_more_than_available() {
748 let tmp = TempDir::new().unwrap();
749 let min_time = DateTime::parse_from_rfc3339("2024-01-01T00:00:00Z")
750 .unwrap()
751 .with_timezone(&Utc);
752 let mut mempool = Mempool::new(tmp.path(), 1, min_time, false).unwrap();
753
754 let ops = vec![create_test_operation(
755 "did:plc:test1",
756 "cid1",
757 "2024-01-01T00:00:01Z",
758 )];
759 mempool.add(ops).unwrap();
760
761 let peeked = mempool.peek(10);
762 assert_eq!(peeked.len(), 1);
763 }
764
765 #[test]
766 fn test_mempool_clear() {
767 let tmp = TempDir::new().unwrap();
768 let min_time = DateTime::parse_from_rfc3339("2024-01-01T00:00:00Z")
769 .unwrap()
770 .with_timezone(&Utc);
771 let mut mempool = Mempool::new(tmp.path(), 1, min_time, false).unwrap();
772
773 let ops = vec![
774 create_test_operation("did:plc:test1", "cid1", "2024-01-01T00:00:01Z"),
775 create_test_operation("did:plc:test2", "cid2", "2024-01-01T00:00:02Z"),
776 ];
777 mempool.add(ops).unwrap();
778 assert_eq!(mempool.count(), 2);
779
780 mempool.clear();
781 assert_eq!(mempool.count(), 0);
782 }
783
784 #[test]
785 fn test_mempool_find_did_operations() {
786 let tmp = TempDir::new().unwrap();
787 let min_time = DateTime::parse_from_rfc3339("2024-01-01T00:00:00Z")
788 .unwrap()
789 .with_timezone(&Utc);
790 let mut mempool = Mempool::new(tmp.path(), 1, min_time, false).unwrap();
791
792 let ops = vec![
793 create_test_operation("did:plc:test1", "cid1", "2024-01-01T00:00:01Z"),
794 create_test_operation("did:plc:test1", "cid2", "2024-01-01T00:00:02Z"),
795 create_test_operation("did:plc:test2", "cid3", "2024-01-01T00:00:03Z"),
796 ];
797 mempool.add(ops).unwrap();
798
799 let found = mempool.find_did_operations("did:plc:test1");
800 assert_eq!(found.len(), 2);
801 assert_eq!(found[0].cid, Some("cid1".to_string()));
802 assert_eq!(found[1].cid, Some("cid2".to_string()));
803
804 let found = mempool.find_did_operations("did:plc:test2");
805 assert_eq!(found.len(), 1);
806 assert_eq!(found[0].cid, Some("cid3".to_string()));
807
808 let found = mempool.find_did_operations("did:plc:nonexistent");
809 assert_eq!(found.len(), 0);
810 }
811
812 #[test]
813 fn test_mempool_stats_empty() {
814 let tmp = TempDir::new().unwrap();
815 let min_time = DateTime::parse_from_rfc3339("2024-01-01T00:00:00Z")
816 .unwrap()
817 .with_timezone(&Utc);
818 let mempool = Mempool::new(tmp.path(), 1, min_time, false).unwrap();
819
820 let stats = mempool.stats();
821 assert_eq!(stats.count, 0);
822 assert!(!stats.can_create_bundle);
823 assert_eq!(stats.target_bundle, 1);
824 }
825
826 #[test]
827 fn test_mempool_stats_with_operations() {
828 let tmp = TempDir::new().unwrap();
829 let min_time = DateTime::parse_from_rfc3339("2024-01-01T00:00:00Z")
830 .unwrap()
831 .with_timezone(&Utc);
832 let mut mempool = Mempool::new(tmp.path(), 1, min_time, false).unwrap();
833
834 let ops = vec![
835 create_test_operation("did:plc:test1", "cid1", "2024-01-01T00:00:01Z"),
836 create_test_operation("did:plc:test2", "cid2", "2024-01-01T00:00:02Z"),
837 ];
838 mempool.add(ops).unwrap();
839
840 let stats = mempool.stats();
841 assert_eq!(stats.count, 2);
842 assert!(!stats.can_create_bundle); assert_eq!(stats.target_bundle, 1);
844 assert!(stats.first_time.is_some());
845 assert!(stats.last_time.is_some());
846 assert_eq!(stats.did_count, Some(2));
847 }
848
849 #[test]
850 fn test_mempool_get_filename() {
851 let tmp = TempDir::new().unwrap();
852 let min_time = DateTime::parse_from_rfc3339("2024-01-01T00:00:00Z")
853 .unwrap()
854 .with_timezone(&Utc);
855 let mempool = Mempool::new(tmp.path(), 42, min_time, false).unwrap();
856
857 let filename = mempool.get_filename();
858 assert!(filename.contains("plc_mempool_"));
859 assert!(filename.contains("000042"));
860 assert!(filename.ends_with(".jsonl"));
861 }
862}