plcbundle/
mempool.rs

1//! Persistent pre-bundle operation store with strict chronological validation, CID deduplication, incremental saving, and fast DID lookups
2// src/mempool.rs
3use crate::constants;
4use crate::format::format_std_duration_ms;
5use crate::operations::Operation;
6use anyhow::{Result, bail};
7use chrono::{DateTime, Utc};
8use log::{debug, info};
9use std::collections::{HashMap, HashSet};
10use std::fs::{self, File, OpenOptions};
11use std::io::{BufRead, BufReader, BufWriter, Write};
12use std::path::{Path, PathBuf};
13
14/// Mempool stores operations waiting to be bundled
15/// Operations must be strictly chronological
16pub struct Mempool {
17    operations: Vec<Operation>,
18    // Index by DID for fast lookups (maintained in sync with operations)
19    did_index: HashMap<String, Vec<usize>>, // DID -> indices in operations vec
20    target_bundle: u32,
21    min_timestamp: DateTime<Utc>,
22    file: PathBuf,
23    validated: bool,
24    dirty: bool,
25    verbose: bool,
26
27    // Incremental save tracking
28    last_saved_len: usize,
29    last_save_time: std::time::Instant,
30    save_threshold: usize,
31    save_interval: std::time::Duration,
32}
33
34impl Mempool {
35    /// Creates a new mempool for a specific bundle number
36    pub fn new(
37        bundle_dir: &Path,
38        target_bundle: u32,
39        min_timestamp: DateTime<Utc>,
40        verbose: bool,
41    ) -> Result<Self> {
42        let filename = format!(
43            "{}{:06}.jsonl",
44            constants::MEMPOOL_FILE_PREFIX,
45            target_bundle
46        );
47        let file = bundle_dir.join(filename);
48
49        let mut mempool = Self {
50            operations: Vec::new(),
51            did_index: HashMap::new(),
52            target_bundle,
53            min_timestamp,
54            file,
55            validated: false,
56            dirty: false,
57            verbose,
58            last_saved_len: 0,
59            last_save_time: std::time::Instant::now(),
60            save_threshold: 100,
61            save_interval: std::time::Duration::from_secs(15),
62        };
63
64        // Try to load existing mempool
65        if mempool.file.exists() {
66            mempool.load()?;
67        }
68
69        Ok(mempool)
70    }
71
72    /// Add operations to the mempool with strict validation
73    pub fn add(&mut self, ops: Vec<Operation>) -> Result<usize> {
74        if ops.is_empty() {
75            return Ok(0);
76        }
77
78        // Build existing CID set
79        let mut existing_cids: HashSet<String> = self
80            .operations
81            .iter()
82            .filter_map(|op| op.cid.clone())
83            .collect();
84
85        let mut new_ops = Vec::new();
86        let total_in = ops.len();
87        let mut skipped_no_cid = 0usize;
88        let mut skipped_dupe = 0usize;
89
90        // Start from last operation time if we have any
91        let mut last_time = if !self.operations.is_empty() {
92            self.parse_timestamp(&self.operations.last().unwrap().created_at)?
93        } else {
94            self.min_timestamp
95        };
96
97        let start_add = std::time::Instant::now();
98        let mut first_added_time: Option<DateTime<Utc>> = None;
99        let mut last_added_time: Option<DateTime<Utc>> = None;
100        for op in ops {
101            // Skip if no CID
102            let cid = match &op.cid {
103                Some(c) => c,
104                None => {
105                    skipped_no_cid += 1;
106                    continue;
107                }
108            };
109
110            // Skip duplicates
111            if existing_cids.contains(cid) {
112                skipped_dupe += 1;
113                continue;
114            }
115
116            let op_time = self.parse_timestamp(&op.created_at)?;
117
118            // CRITICAL: Validate chronological order
119            if op_time < last_time {
120                bail!(
121                    "chronological violation: operation {} at {} is before {}",
122                    cid,
123                    op.created_at,
124                    last_time.to_rfc3339()
125                );
126            }
127
128            // Validate operation is after minimum timestamp
129            if op_time < self.min_timestamp {
130                bail!(
131                    "operation {} at {} is before minimum timestamp {} (belongs in earlier bundle)",
132                    cid,
133                    op.created_at,
134                    self.min_timestamp.to_rfc3339()
135                );
136            }
137
138            new_ops.push(op.clone());
139            existing_cids.insert(cid.clone());
140            last_time = op_time;
141            if first_added_time.is_none() {
142                first_added_time = Some(op_time);
143            }
144            last_added_time = Some(op_time);
145        }
146
147        let added = new_ops.len();
148
149        // Add new operations and update DID index
150        let start_idx = self.operations.len();
151        self.operations.extend(new_ops);
152
153        // Update DID index for new operations
154        for (offset, op) in self.operations[start_idx..].iter().enumerate() {
155            let idx = start_idx + offset;
156            self.did_index.entry(op.did.clone()).or_default().push(idx);
157        }
158
159        self.validated = true;
160        self.dirty = true;
161
162        if self.verbose {
163            let dur = start_add.elapsed();
164            info!(
165                "mempool add: +{} unique from {} ({} no-cid, {} dupes) in {} • total {}",
166                added,
167                total_in,
168                skipped_no_cid,
169                skipped_dupe,
170                format_std_duration_ms(dur),
171                self.operations.len()
172            );
173            if let (Some(f), Some(l)) = (first_added_time, last_added_time) {
174                debug!(
175                    "mempool add range: {} → {}",
176                    f.format("%Y-%m-%d %H:%M:%S"),
177                    l.format("%Y-%m-%d %H:%M:%S")
178                );
179            }
180            if added == 0 && (skipped_no_cid > 0 || skipped_dupe > 0) {
181                debug!("mempool add made no progress");
182            }
183        }
184
185        Ok(added)
186    }
187
188    /// Validate performs a full chronological validation of all operations
189    pub fn validate(&self) -> Result<()> {
190        if self.operations.is_empty() {
191            return Ok(());
192        }
193
194        // Check all operations are after minimum timestamp
195        for (i, op) in self.operations.iter().enumerate() {
196            let op_time = self.parse_timestamp(&op.created_at)?;
197            if op_time < self.min_timestamp {
198                bail!(
199                    "operation {} (CID: {:?}) at {} is before minimum timestamp {}",
200                    i,
201                    op.cid,
202                    op.created_at,
203                    self.min_timestamp.to_rfc3339()
204                );
205            }
206        }
207
208        // Check chronological order
209        for i in 1..self.operations.len() {
210            let prev = &self.operations[i - 1];
211            let curr = &self.operations[i];
212
213            let prev_time = self.parse_timestamp(&prev.created_at)?;
214            let curr_time = self.parse_timestamp(&curr.created_at)?;
215
216            if curr_time < prev_time {
217                bail!(
218                    "chronological violation at index {}: {:?} ({}) is before {:?} ({})",
219                    i,
220                    curr.cid,
221                    curr.created_at,
222                    prev.cid,
223                    prev.created_at
224                );
225            }
226        }
227
228        // Check for duplicate CIDs
229        let mut cid_map: HashMap<String, usize> = HashMap::new();
230        for (i, op) in self.operations.iter().enumerate() {
231            if let Some(cid) = &op.cid {
232                if let Some(prev_idx) = cid_map.get(cid) {
233                    bail!("duplicate CID {} at indices {} and {}", cid, prev_idx, i);
234                }
235                cid_map.insert(cid.clone(), i);
236            }
237        }
238
239        Ok(())
240    }
241
242    /// Count returns the number of operations in mempool
243    pub fn count(&self) -> usize {
244        self.operations.len()
245    }
246
247    /// Take removes and returns up to n operations from the front
248    pub fn take(&mut self, n: usize) -> Result<Vec<Operation>> {
249        // Validate before taking
250        self.validate_locked()?;
251
252        let take_count = n.min(self.operations.len());
253
254        let result: Vec<Operation> = self.operations.drain(..take_count).collect();
255
256        // Rebuild DID index after removing operations (indices have shifted)
257        self.rebuild_did_index();
258
259        // ALWAYS reset tracking after Take
260        // Take() means we're consuming these ops for a bundle
261        // Any remaining ops are "new" and unsaved
262        self.last_saved_len = 0;
263        self.last_save_time = std::time::Instant::now();
264
265        // Mark dirty only if ops remain
266        self.dirty = !self.operations.is_empty();
267        self.validated = false;
268
269        Ok(result)
270    }
271
272    /// validateLocked performs validation (internal helper)
273    fn validate_locked(&mut self) -> Result<()> {
274        if self.validated {
275            return Ok(());
276        }
277
278        if self.operations.is_empty() {
279            return Ok(());
280        }
281
282        // Check chronological order
283        let mut last_time = self.min_timestamp;
284        for (i, op) in self.operations.iter().enumerate() {
285            let op_time = self.parse_timestamp(&op.created_at)?;
286            if op_time < last_time {
287                bail!(
288                    "chronological violation at index {}: {} is before {}",
289                    i,
290                    op.created_at,
291                    last_time.to_rfc3339()
292                );
293            }
294            last_time = op_time;
295        }
296
297        self.validated = true;
298        Ok(())
299    }
300
301    /// Peek returns up to n operations without removing them
302    pub fn peek(&self, n: usize) -> Vec<Operation> {
303        let count = n.min(self.operations.len());
304        self.operations[..count].to_vec()
305    }
306
307    /// Clear removes all operations
308    pub fn clear(&mut self) {
309        let prev = self.operations.len();
310        self.operations.clear();
311        self.did_index.clear();
312        self.validated = false;
313        self.dirty = true;
314        if self.verbose {
315            info!("mempool clear: removed {} ops", prev);
316        }
317    }
318
319    /// ShouldSave checks if threshold/interval is met
320    pub fn should_save(&self) -> bool {
321        if !self.dirty {
322            return false;
323        }
324
325        let new_ops = self.operations.len().saturating_sub(self.last_saved_len);
326        let time_since_last_save = self.last_save_time.elapsed();
327
328        new_ops >= self.save_threshold || time_since_last_save >= self.save_interval
329    }
330
331    /// SaveIfNeeded saves only if threshold is met
332    pub fn save_if_needed(&mut self) -> Result<()> {
333        if !self.should_save() {
334            return Ok(());
335        }
336        self.save()
337    }
338
339    /// Save - always append-only since mempool only grows
340    pub fn save(&mut self) -> Result<()> {
341        if !self.dirty {
342            return Ok(());
343        }
344
345        if self.operations.is_empty() {
346            // Remove file if empty
347            if self.file.exists() {
348                fs::remove_file(&self.file)?;
349            }
350            self.last_saved_len = 0;
351            self.last_save_time = std::time::Instant::now();
352            self.dirty = false;
353            return Ok(());
354        }
355
356        // Validate before saving
357        self.validate_locked()?;
358
359        // Bounds check to prevent panic
360        if self.last_saved_len > self.operations.len() {
361            if self.verbose {
362                eprintln!(
363                    "Warning: lastSavedLen ({}) > operations ({}), resetting to 0",
364                    self.last_saved_len,
365                    self.operations.len()
366                );
367            }
368            self.last_saved_len = 0;
369        }
370
371        // Get only new operations since last save
372        let new_ops = &self.operations[self.last_saved_len..];
373
374        if new_ops.is_empty() {
375            // Nothing new to save
376            self.dirty = false;
377            return Ok(());
378        }
379
380        // Open for append (or create if first save)
381        let file = OpenOptions::new()
382            .create(true)
383            .append(true)
384            .open(&self.file)?;
385
386        let mut writer = BufWriter::new(file);
387
388        // Write only new operations
389        // CRITICAL: Use raw_json if available to preserve exact byte content
390        // This is required for deterministic content_hash calculation.
391        // Re-serialization would change field order/whitespace and break hash verification.
392        let mut bytes_written = 0usize;
393        let mut appended = 0usize;
394        for op in new_ops {
395            let json = if let Some(ref raw) = op.raw_json {
396                raw.clone()
397            } else {
398                // Fallback: Re-serialize if raw_json is not available
399                // WARNING: This may produce different content_hash than the original!
400                sonic_rs::to_string(op)?
401            };
402            writeln!(writer, "{}", json)?;
403            bytes_written += json.len() + 1;
404            appended += 1;
405        }
406
407        writer.flush()?;
408
409        // Get the underlying file to sync
410        let file = writer.into_inner()?;
411        file.sync_all()?;
412
413        self.last_saved_len = self.operations.len();
414        self.last_save_time = std::time::Instant::now();
415        self.dirty = false;
416
417        if self.verbose {
418            info!(
419                "mempool save: appended {} ops ({} bytes) to {}",
420                appended,
421                bytes_written,
422                self.get_filename()
423            );
424        }
425
426        Ok(())
427    }
428
429    /// Load reads mempool from disk and validates it
430    pub fn load(&mut self) -> Result<()> {
431        let start = std::time::Instant::now();
432        let file = File::open(&self.file)?;
433        let reader = BufReader::new(file);
434
435        self.operations.clear();
436        self.did_index.clear();
437
438        for line in reader.lines() {
439            let line = line?;
440            if line.trim().is_empty() {
441                continue;
442            }
443
444            // CRITICAL: Preserve raw JSON for content hash calculation
445            // This is required by the V1 specification (docs/specification.md § 4.2)
446            // to ensure content_hash remains reproducible.
447            // Without this, re-serialization would change the hash.
448            // Use Operation::from_json (sonic_rs) instead of serde deserialization
449            let op = Operation::from_json(&line)?;
450            let idx = self.operations.len();
451            self.operations.push(op);
452
453            // Update DID index
454            let did = self.operations[idx].did.clone();
455            self.did_index.entry(did).or_default().push(idx);
456        }
457
458        // Validate loaded data
459        self.validate_locked()?;
460
461        // Mark as saved (just loaded from disk)
462        self.last_saved_len = self.operations.len();
463        self.last_save_time = std::time::Instant::now();
464        self.dirty = false;
465
466        if self.verbose {
467            info!(
468                "mempool load: {} ops for bundle {:06} in {}",
469                self.operations.len(),
470                self.target_bundle,
471                format_std_duration_ms(start.elapsed())
472            );
473        }
474
475        Ok(())
476    }
477
478    /// GetFirstTime returns the created_at of the first operation
479    pub fn get_first_time(&self) -> Option<String> {
480        self.operations.first().map(|op| op.created_at.clone())
481    }
482
483    /// GetLastTime returns the created_at of the last operation
484    pub fn get_last_time(&self) -> Option<String> {
485        self.operations.last().map(|op| op.created_at.clone())
486    }
487
488    /// GetTargetBundle returns the bundle number this mempool is for
489    pub fn get_target_bundle(&self) -> u32 {
490        self.target_bundle
491    }
492
493    /// GetMinTimestamp returns the minimum timestamp for operations
494    pub fn get_min_timestamp(&self) -> DateTime<Utc> {
495        self.min_timestamp
496    }
497
498    /// Stats returns mempool statistics
499    pub fn stats(&self) -> MempoolStats {
500        let count = self.operations.len();
501
502        let mut stats = MempoolStats {
503            count,
504            can_create_bundle: count >= constants::BUNDLE_SIZE,
505            target_bundle: self.target_bundle,
506            min_timestamp: self.min_timestamp,
507            validated: self.validated,
508            first_time: None,
509            last_time: None,
510            size_bytes: None,
511            did_count: None,
512        };
513
514        if count > 0 {
515            stats.first_time = self
516                .operations
517                .first()
518                .and_then(|op| self.parse_timestamp(&op.created_at).ok());
519            stats.last_time = self
520                .operations
521                .last()
522                .and_then(|op| self.parse_timestamp(&op.created_at).ok());
523
524            let mut total_size = 0;
525            for op in &self.operations {
526                if let Ok(json) = serde_json::to_string(op) {
527                    total_size += json.len();
528                }
529            }
530
531            stats.size_bytes = Some(total_size);
532            stats.did_count = Some(self.did_index.len());
533        }
534
535        stats
536    }
537
538    /// Delete removes the mempool file
539    pub fn delete(&self) -> Result<()> {
540        if self.file.exists() {
541            fs::remove_file(&self.file)?;
542        }
543        Ok(())
544    }
545
546    /// GetFilename returns the mempool filename
547    pub fn get_filename(&self) -> String {
548        self.file
549            .file_name()
550            .and_then(|n| n.to_str())
551            .unwrap_or("")
552            .to_string()
553    }
554
555    /// FindDIDOperations searches for operations matching a DID
556    /// Uses DID index for O(1) lookup instead of linear scan
557    pub fn find_did_operations(&self, did: &str) -> Vec<Operation> {
558        if let Some(indices) = self.did_index.get(did) {
559            // Fast path: use index
560            indices
561                .iter()
562                .map(|&idx| self.operations[idx].clone())
563                .collect()
564        } else {
565            // DID not in index (shouldn't happen if index is maintained correctly)
566            Vec::new()
567        }
568    }
569
570    /// Rebuild DID index from operations (used after take/clear)
571    fn rebuild_did_index(&mut self) {
572        self.did_index.clear();
573        for (idx, op) in self.operations.iter().enumerate() {
574            self.did_index.entry(op.did.clone()).or_default().push(idx);
575        }
576    }
577
578    /// FindLatestDIDOperation finds the most recent non-nullified operation for a DID
579    /// Returns the operation and its position/index in the mempool
580    /// Uses DID index for O(1) lookup, then finds latest by index (operations are chronologically sorted)
581    pub fn find_latest_did_operation(&self, did: &str) -> Option<(Operation, usize)> {
582        if let Some(indices) = self.did_index.get(did) {
583            // Operations are in chronological order, so highest index = latest
584            // Find the highest index that's not nullified
585            indices.iter().rev().find_map(|&idx| {
586                let op = &self.operations[idx];
587                if !op.nullified {
588                    Some((op.clone(), idx))
589                } else {
590                    None
591                }
592            })
593        } else {
594            None
595        }
596    }
597
598    /// Get all operations (for dump command)
599    pub fn get_operations(&self) -> &[Operation] {
600        &self.operations
601    }
602
603    /// Parse timestamp string to DateTime
604    fn parse_timestamp(&self, s: &str) -> Result<DateTime<Utc>> {
605        Ok(DateTime::parse_from_rfc3339(s)?.with_timezone(&Utc))
606    }
607}
608
609#[derive(Debug, Clone)]
610pub struct MempoolStats {
611    pub count: usize,
612    pub can_create_bundle: bool,
613    pub target_bundle: u32,
614    pub min_timestamp: DateTime<Utc>,
615    pub validated: bool,
616    pub first_time: Option<DateTime<Utc>>,
617    pub last_time: Option<DateTime<Utc>>,
618    pub size_bytes: Option<usize>,
619    pub did_count: Option<usize>,
620}
621
622#[cfg(test)]
623mod tests {
624    use super::*;
625    use crate::operations::Operation;
626    use sonic_rs::Value;
627    use tempfile::TempDir;
628
629    fn create_test_operation(did: &str, cid: &str, created_at: &str) -> Operation {
630        Operation {
631            did: did.to_string(),
632            operation: Value::new(),
633            cid: Some(cid.to_string()),
634            nullified: false,
635            created_at: created_at.to_string(),
636            extra: Value::new(),
637            raw_json: None,
638        }
639    }
640
641    #[test]
642    fn test_mempool_count() {
643        let tmp = TempDir::new().unwrap();
644        let min_time = DateTime::parse_from_rfc3339("2024-01-01T00:00:00Z")
645            .unwrap()
646            .with_timezone(&Utc);
647        let mut mempool = Mempool::new(tmp.path(), 1, min_time, false).unwrap();
648
649        assert_eq!(mempool.count(), 0);
650
651        let ops = vec![
652            create_test_operation("did:plc:test1", "cid1", "2024-01-01T00:00:01Z"),
653            create_test_operation("did:plc:test2", "cid2", "2024-01-01T00:00:02Z"),
654        ];
655        mempool.add(ops).unwrap();
656        assert_eq!(mempool.count(), 2);
657    }
658
659    #[test]
660    fn test_mempool_get_target_bundle() {
661        let tmp = TempDir::new().unwrap();
662        let min_time = DateTime::parse_from_rfc3339("2024-01-01T00:00:00Z")
663            .unwrap()
664            .with_timezone(&Utc);
665        let mempool = Mempool::new(tmp.path(), 42, min_time, false).unwrap();
666        assert_eq!(mempool.get_target_bundle(), 42);
667    }
668
669    #[test]
670    fn test_mempool_get_min_timestamp() {
671        let tmp = TempDir::new().unwrap();
672        let min_time = DateTime::parse_from_rfc3339("2024-01-01T00:00:00Z")
673            .unwrap()
674            .with_timezone(&Utc);
675        let mempool = Mempool::new(tmp.path(), 1, min_time, false).unwrap();
676        assert_eq!(mempool.get_min_timestamp(), min_time);
677    }
678
679    #[test]
680    fn test_mempool_get_first_time() {
681        let tmp = TempDir::new().unwrap();
682        let min_time = DateTime::parse_from_rfc3339("2024-01-01T00:00:00Z")
683            .unwrap()
684            .with_timezone(&Utc);
685        let mut mempool = Mempool::new(tmp.path(), 1, min_time, false).unwrap();
686
687        assert_eq!(mempool.get_first_time(), None);
688
689        let ops = vec![create_test_operation(
690            "did:plc:test1",
691            "cid1",
692            "2024-01-01T00:00:01Z",
693        )];
694        mempool.add(ops).unwrap();
695        assert_eq!(
696            mempool.get_first_time(),
697            Some("2024-01-01T00:00:01Z".to_string())
698        );
699    }
700
701    #[test]
702    fn test_mempool_get_last_time() {
703        let tmp = TempDir::new().unwrap();
704        let min_time = DateTime::parse_from_rfc3339("2024-01-01T00:00:00Z")
705            .unwrap()
706            .with_timezone(&Utc);
707        let mut mempool = Mempool::new(tmp.path(), 1, min_time, false).unwrap();
708
709        assert_eq!(mempool.get_last_time(), None);
710
711        let ops = vec![
712            create_test_operation("did:plc:test1", "cid1", "2024-01-01T00:00:01Z"),
713            create_test_operation("did:plc:test2", "cid2", "2024-01-01T00:00:02Z"),
714        ];
715        mempool.add(ops).unwrap();
716        assert_eq!(
717            mempool.get_last_time(),
718            Some("2024-01-01T00:00:02Z".to_string())
719        );
720    }
721
722    #[test]
723    fn test_mempool_peek() {
724        let tmp = TempDir::new().unwrap();
725        let min_time = DateTime::parse_from_rfc3339("2024-01-01T00:00:00Z")
726            .unwrap()
727            .with_timezone(&Utc);
728        let mut mempool = Mempool::new(tmp.path(), 1, min_time, false).unwrap();
729
730        let ops = vec![
731            create_test_operation("did:plc:test1", "cid1", "2024-01-01T00:00:01Z"),
732            create_test_operation("did:plc:test2", "cid2", "2024-01-01T00:00:02Z"),
733            create_test_operation("did:plc:test3", "cid3", "2024-01-01T00:00:03Z"),
734        ];
735        mempool.add(ops).unwrap();
736
737        let peeked = mempool.peek(2);
738        assert_eq!(peeked.len(), 2);
739        assert_eq!(peeked[0].cid, Some("cid1".to_string()));
740        assert_eq!(peeked[1].cid, Some("cid2".to_string()));
741
742        // Peek should not remove operations
743        assert_eq!(mempool.count(), 3);
744    }
745
746    #[test]
747    fn test_mempool_peek_more_than_available() {
748        let tmp = TempDir::new().unwrap();
749        let min_time = DateTime::parse_from_rfc3339("2024-01-01T00:00:00Z")
750            .unwrap()
751            .with_timezone(&Utc);
752        let mut mempool = Mempool::new(tmp.path(), 1, min_time, false).unwrap();
753
754        let ops = vec![create_test_operation(
755            "did:plc:test1",
756            "cid1",
757            "2024-01-01T00:00:01Z",
758        )];
759        mempool.add(ops).unwrap();
760
761        let peeked = mempool.peek(10);
762        assert_eq!(peeked.len(), 1);
763    }
764
765    #[test]
766    fn test_mempool_clear() {
767        let tmp = TempDir::new().unwrap();
768        let min_time = DateTime::parse_from_rfc3339("2024-01-01T00:00:00Z")
769            .unwrap()
770            .with_timezone(&Utc);
771        let mut mempool = Mempool::new(tmp.path(), 1, min_time, false).unwrap();
772
773        let ops = vec![
774            create_test_operation("did:plc:test1", "cid1", "2024-01-01T00:00:01Z"),
775            create_test_operation("did:plc:test2", "cid2", "2024-01-01T00:00:02Z"),
776        ];
777        mempool.add(ops).unwrap();
778        assert_eq!(mempool.count(), 2);
779
780        mempool.clear();
781        assert_eq!(mempool.count(), 0);
782    }
783
784    #[test]
785    fn test_mempool_find_did_operations() {
786        let tmp = TempDir::new().unwrap();
787        let min_time = DateTime::parse_from_rfc3339("2024-01-01T00:00:00Z")
788            .unwrap()
789            .with_timezone(&Utc);
790        let mut mempool = Mempool::new(tmp.path(), 1, min_time, false).unwrap();
791
792        let ops = vec![
793            create_test_operation("did:plc:test1", "cid1", "2024-01-01T00:00:01Z"),
794            create_test_operation("did:plc:test1", "cid2", "2024-01-01T00:00:02Z"),
795            create_test_operation("did:plc:test2", "cid3", "2024-01-01T00:00:03Z"),
796        ];
797        mempool.add(ops).unwrap();
798
799        let found = mempool.find_did_operations("did:plc:test1");
800        assert_eq!(found.len(), 2);
801        assert_eq!(found[0].cid, Some("cid1".to_string()));
802        assert_eq!(found[1].cid, Some("cid2".to_string()));
803
804        let found = mempool.find_did_operations("did:plc:test2");
805        assert_eq!(found.len(), 1);
806        assert_eq!(found[0].cid, Some("cid3".to_string()));
807
808        let found = mempool.find_did_operations("did:plc:nonexistent");
809        assert_eq!(found.len(), 0);
810    }
811
812    #[test]
813    fn test_mempool_stats_empty() {
814        let tmp = TempDir::new().unwrap();
815        let min_time = DateTime::parse_from_rfc3339("2024-01-01T00:00:00Z")
816            .unwrap()
817            .with_timezone(&Utc);
818        let mempool = Mempool::new(tmp.path(), 1, min_time, false).unwrap();
819
820        let stats = mempool.stats();
821        assert_eq!(stats.count, 0);
822        assert!(!stats.can_create_bundle);
823        assert_eq!(stats.target_bundle, 1);
824    }
825
826    #[test]
827    fn test_mempool_stats_with_operations() {
828        let tmp = TempDir::new().unwrap();
829        let min_time = DateTime::parse_from_rfc3339("2024-01-01T00:00:00Z")
830            .unwrap()
831            .with_timezone(&Utc);
832        let mut mempool = Mempool::new(tmp.path(), 1, min_time, false).unwrap();
833
834        let ops = vec![
835            create_test_operation("did:plc:test1", "cid1", "2024-01-01T00:00:01Z"),
836            create_test_operation("did:plc:test2", "cid2", "2024-01-01T00:00:02Z"),
837        ];
838        mempool.add(ops).unwrap();
839
840        let stats = mempool.stats();
841        assert_eq!(stats.count, 2);
842        assert!(!stats.can_create_bundle); // Need BUNDLE_SIZE (10000) ops
843        assert_eq!(stats.target_bundle, 1);
844        assert!(stats.first_time.is_some());
845        assert!(stats.last_time.is_some());
846        assert_eq!(stats.did_count, Some(2));
847    }
848
849    #[test]
850    fn test_mempool_get_filename() {
851        let tmp = TempDir::new().unwrap();
852        let min_time = DateTime::parse_from_rfc3339("2024-01-01T00:00:00Z")
853            .unwrap()
854            .with_timezone(&Utc);
855        let mempool = Mempool::new(tmp.path(), 42, min_time, false).unwrap();
856
857        let filename = mempool.get_filename();
858        assert!(filename.contains("plc_mempool_"));
859        assert!(filename.contains("000042"));
860        assert!(filename.ends_with(".jsonl"));
861    }
862}