1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
//! Parallel-merge extension trait for `SharedARTrie<V>`.
//!
//! Split out of byte `dict_impl.rs` (lines ~6266-6399) as part of the
//! Phase-5 decomposition. Feature-gated on `parallel-merge`; uses rayon
//! to parallelize the read-and-merge phase before serializing the write
//! phase under the trie's write lock. The trait sits in its own module
//! because (a) it is the only top-level `pub trait` introduced by
//! byte's dict_impl, and (b) it is the only Phase-5 surface that does
//! not require new methods on `PersistentARTrie<V, S>` — the
//! `insert_impl` / `get_value_impl` helpers it calls just had their
//! visibility widened to `pub(super)` to permit access from this
//! sibling module.
#![cfg(feature = "parallel-merge")]
use super::SharedARTrie;
use crate::persistent_artrie::error::Result;
// F4: the `.read()/.write()` compat shim on the collapsed `Arc<PersistentARTrie>`.
use crate::persistent_artrie::core::shared_access::SharedTrieAccess;
use crate::value::DictionaryValue;
/// Extension trait for parallel merge operations on [`SharedARTrie`].
///
/// These methods require the `parallel-merge` feature and use rayon for
/// parallel processing. They are implemented as an extension trait because
/// `SharedARTrie` is a type alias for `Arc<PersistentARTrie<V>>`, and Rust
/// doesn't allow inherent `impl` blocks on type aliases that resolve to
/// external types.
///
/// # Usage
///
/// ```text
/// use libdictenstein::persistent_artrie::{SharedARTrie, SharedARTrieParallelExt};
///
/// let trie1: SharedARTrie<u32> = /* ... */;
/// let trie2: SharedARTrie<u32> = /* ... */;
///
/// // Import the trait to use the method
/// let count = trie1.merge_from_parallel(&trie2, |a, b| a + b)?;
/// ```
pub trait SharedARTrieParallelExt<V: DictionaryValue> {
/// Merge all terms from another trie using parallel processing.
///
/// This method uses rayon to parallelize the merge computation across multiple
/// cores. The parallelization strategy:
/// 1. Partition source terms by first byte (256 possible partitions)
/// 2. Process partitions in parallel: read source terms, compute merge values
/// 3. Batch-insert results sequentially (avoids write contention)
///
/// # Performance
///
/// Expected speedup: 4-6x on 8 cores for large merges (100K+ terms).
/// The speedup is limited by the sequential write phase but the parallel
/// read and merge computation phases scale well.
///
/// # Arguments
///
/// * `other` - The source trie to merge from
/// * `merge_fn` - Function to merge values when a term exists in both tries.
/// Called as `merge_fn(self_value, other_value)`.
///
/// # Returns
///
/// The number of terms processed from the source trie.
fn merge_from_parallel<F>(&self, other: &Self, merge_fn: F) -> Result<usize>
where
F: Fn(&V, &V) -> V + Sync + Send;
}
impl<V: DictionaryValue + Clone + Send + Sync> SharedARTrieParallelExt<V> for SharedARTrie<V> {
fn merge_from_parallel<F>(&self, other: &Self, merge_fn: F) -> Result<usize>
where
F: Fn(&V, &V) -> V + Sync + Send,
{
// **F4 / V11.2 — merge_lock (the merge‖merge serializer).** This is the only
// `Shared*`-reachable merge driver, so it is the single `merge_lock`
// acquisition site. It is a near-leaf in the hierarchy `CK > merge_lock > OR
// > EC`: the body takes OR (owned arm) / runs lock-free CAS (overlay arm)
// UNDER merge_lock, never the reverse, and never CK. Cloned out of a brief
// read borrow so we don't hold the handle. Kills merge‖merge livelock; other
// writers stay obstruction-free.
let merge_lock = self.read().merge_lock.clone();
let _merge_guard = merge_lock.lock();
// L3.3: the overlay is the sole representation. Snapshot `other` (releasing its
// read lock) then apply SERIALLY under `self.write()` via the shared overlay merge
// funnel. The "parallel WRITE" was illusory under the overlay (the per-key CAS
// re-reads self each iteration), and snapshot-then-write avoids the cross-instance
// AB/BA deadlock the owned parallel phase risked (holding `other.read()` per
// partition across a pending `self.write()`).
let entries: Vec<(Vec<u8>, V)> = {
let other_guard = other.read();
match other_guard.iter_prefix_with_values_and_arena(b"") {
Ok(Some(terms)) => terms.into_iter().map(|t| (t.term, t.value)).collect(),
_ => Vec::new(),
}
};
let guard = self.write();
guard.merge_entries_overlay(entries, merge_fn)
}
}