1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
//! Document-transaction execution methods for `PersistentARTrieChar<V, S>`.
//!
//! Split out of char `dict_impl_char.rs` (lines ~502-829, ~328 LOC)
//! as a Phase-6 char sub-module, mirroring the byte
//! `super::document_tx` split. Methods covered:
//!
//! - `begin_document` — logs BeginTx + constructs CharDocumentTransaction
//! - `tx_insert` / `tx_insert_chars` / `tx_insert_bytes` — buffer terms
//! - `tx_increment` / `tx_increment_bytes` — buffer increment operations
//! - `commit_document` — atomically apply all buffered terms
//! - `abort_document` — discard buffered terms
//!
//! The `CharDocumentTransaction<V>` data type lives in
//! `super::transactions`.
use std::collections::HashMap;
use std::sync::atomic::Ordering as AtomicOrdering;
use crate::persistent_artrie::block_storage::BlockStorage;
use crate::persistent_artrie::error::{PersistentARTrieError, Result};
use crate::persistent_artrie_core::key_encoding::CharKey;
use crate::persistent_artrie_core::overlay::durable_write::DurableOverlayWrite;
use crate::value::DictionaryValue;
use super::transactions::CharDocumentTransaction;
use crate::persistent_artrie::TransactionState;
impl<V: DictionaryValue, S: BlockStorage> super::PersistentARTrieChar<V, S> {
pub fn begin_document(&self, document_id: &str) -> Result<CharDocumentTransaction<V>> {
// Generate a unique transaction ID
let tx_id = {
let base = self.next_lsn.load(AtomicOrdering::Acquire);
// tx-ID hash = LSN ⊕ (low 64 bits of the nanos timestamp). The low 8 LE
// bytes of the u128 nanos are the same value a u64 truncation would yield;
// taken via `from_le_bytes` (a NON-counter value) to avoid a numeric cast
// so the counter-codec gate stays clean for this file.
let nanos = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_nanos())
.unwrap_or(0);
let low8: [u8; 8] = nanos.to_le_bytes()[..8]
.try_into()
.expect("low 8 bytes of a u128");
base ^ u64::from_le_bytes(low8)
};
// L3.3: the overlay `commit_document` is per-op durable (NOT bracketed), so no
// orphan BeginTx WAL append (it would burn an un-`mark_committed` LSN that stalls
// the committed watermark and thus checkpoint reclaim).
Ok(CharDocumentTransaction {
tx_id,
document_id: document_id.to_string(),
shadow_terms: Vec::new(),
increments: Vec::new(),
failure: None,
state: TransactionState::Active,
})
}
/// Buffer a term in a document transaction.
///
/// The term is NOT inserted into the trie yet - it's only buffered in memory.
/// The term will be inserted when `commit_document()` is called.
///
/// # Arguments
///
/// * `tx` - The active transaction to buffer the term in
/// * `term` - The term to insert (as a string)
/// * `value` - Optional value to associate with the term
///
/// # Panics
///
/// Panics if the transaction is not in Active state.
pub fn tx_insert(&self, tx: &mut CharDocumentTransaction<V>, term: &str, value: Option<V>) {
assert!(
tx.is_active(),
"Cannot insert into a {} transaction",
match tx.state {
TransactionState::Committed => "committed",
TransactionState::Aborted => "aborted",
TransactionState::Active => unreachable!(),
}
);
tx.shadow_terms.push((term.as_bytes().to_vec(), value));
}
/// Buffer a term (as char slice) in a document transaction.
///
/// This method accepts a slice of characters directly, which is useful when
/// working with pre-parsed Unicode data or when you want to avoid UTF-8
/// encoding overhead.
///
/// The term is NOT inserted into the trie yet - it's only buffered in memory.
/// The term will be inserted when `commit_document()` is called.
///
/// # Arguments
///
/// * `tx` - The active transaction to buffer the term in
/// * `chars` - The term characters to insert
/// * `value` - Optional value to associate with the term
///
/// # Panics
///
/// Panics if the transaction is not in Active state.
pub fn tx_insert_chars(
&self,
tx: &mut CharDocumentTransaction<V>,
chars: &[char],
value: Option<V>,
) {
assert!(
tx.is_active(),
"Cannot insert into a {} transaction",
match tx.state {
TransactionState::Committed => "committed",
TransactionState::Aborted => "aborted",
TransactionState::Active => unreachable!(),
}
);
// Convert chars to UTF-8 string bytes for WAL storage
let term_str: String = chars.iter().collect();
tx.shadow_terms.push((term_str.into_bytes(), value));
}
/// Buffer a term (as bytes) in a document transaction.
///
/// This method accepts raw UTF-8 bytes, which is useful when you already
/// have byte data and want to avoid conversion overhead.
///
/// # Arguments
///
/// * `tx` - The active transaction to buffer the term in
/// * `term_bytes` - The term bytes to insert (must be valid UTF-8)
/// * `value` - Optional value to associate with the term
///
/// # Panics
///
/// Panics if the transaction is not in Active state.
pub fn tx_insert_bytes(
&self,
tx: &mut CharDocumentTransaction<V>,
term_bytes: &[u8],
value: Option<V>,
) {
assert!(
tx.is_active(),
"Cannot insert into a {} transaction",
match tx.state {
TransactionState::Committed => "committed",
TransactionState::Aborted => "aborted",
TransactionState::Active => unreachable!(),
}
);
tx.shadow_terms.push((term_bytes.to_vec(), value));
}
/// Buffer an increment operation in a document transaction.
///
/// Unlike `tx_insert()` which uses SET semantics, this accumulates the delta
/// with any existing value when the transaction commits. Multiple increments
/// to the same term within a transaction are aggregated.
///
/// # Arguments
///
/// * `tx` - The active transaction to buffer the increment in
/// * `term` - The term to increment
/// * `delta` - The amount to add (can be negative)
///
/// # Panics
///
/// Panics if the transaction is not in Active state.
///
/// # Example
///
/// ```text
/// let mut tx = trie.begin_document("file1")?;
/// trie.tx_increment(&mut tx, "the|quick", 100);
/// trie.tx_increment(&mut tx, "the|quick", 50); // Accumulates: will add 150
/// trie.commit_document(tx)?; // Adds 150 to existing value
/// ```
pub fn tx_increment(&self, tx: &mut CharDocumentTransaction<V>, term: &str, delta: i64) {
assert!(
tx.is_active(),
"Cannot increment in a {} transaction",
match tx.state {
TransactionState::Committed => "committed",
TransactionState::Aborted => "aborted",
TransactionState::Active => unreachable!(),
}
);
if let Err(error) = self.try_tx_increment(tx, term, delta) {
tx.mark_failed(error.to_string());
}
}
/// Checked variant of [`Self::tx_increment`].
pub fn try_tx_increment(
&self,
tx: &mut CharDocumentTransaction<V>,
term: &str,
delta: i64,
) -> Result<()> {
self.try_tx_increment_bytes(tx, term.as_bytes(), delta)
}
/// Buffer an increment operation (as bytes) in a document transaction.
///
/// This variant accepts raw UTF-8 bytes directly.
///
/// # Arguments
///
/// * `tx` - The active transaction to buffer the increment in
/// * `term_bytes` - The term bytes to increment (must be valid UTF-8)
/// * `delta` - The amount to add (can be negative)
///
/// # Panics
///
/// Panics if the transaction is not in Active state.
pub fn tx_increment_bytes(
&self,
tx: &mut CharDocumentTransaction<V>,
term_bytes: &[u8],
delta: i64,
) {
assert!(
tx.is_active(),
"Cannot increment in a {} transaction",
match tx.state {
TransactionState::Committed => "committed",
TransactionState::Aborted => "aborted",
TransactionState::Active => unreachable!(),
}
);
if let Err(error) = self.try_tx_increment_bytes(tx, term_bytes, delta) {
tx.mark_failed(error.to_string());
}
}
/// Checked byte-key variant of [`Self::tx_increment_bytes`].
pub fn try_tx_increment_bytes(
&self,
tx: &mut CharDocumentTransaction<V>,
term_bytes: &[u8],
delta: i64,
) -> Result<()> {
if tx.state != TransactionState::Active {
return Err(PersistentARTrieError::InvalidOperation(format!(
"Cannot increment in a {} transaction",
match tx.state {
TransactionState::Committed => "committed",
TransactionState::Aborted => "aborted",
TransactionState::Active => unreachable!(),
}
)));
}
if let Some(reason) = tx.failure_reason() {
return Err(PersistentARTrieError::InvalidOperation(format!(
"Cannot increment in failed transaction {}: {}",
tx.document_id(),
reason
)));
}
let pending_delta =
tx.increments
.iter()
.try_fold(0_i64, |acc, (existing_term, existing_delta)| {
if existing_term == term_bytes {
acc.checked_add(*existing_delta)
} else {
Some(acc)
}
});
let aggregate = match pending_delta.and_then(|pending| pending.checked_add(delta)) {
Some(value) => value,
None => {
let reason = format!(
"transaction increment aggregate overflow for term {:?}",
String::from_utf8_lossy(term_bytes)
);
tx.mark_failed(reason.clone());
return Err(PersistentARTrieError::InvalidOperation(reason));
}
};
let _ = aggregate;
tx.increments.push((term_bytes.to_vec(), delta));
Ok(())
}
/// Commit a document transaction, applying all buffered operations atomically.
///
/// This method writes all buffered SET and INCREMENT operations to the WAL
/// as batch records, then applies them to the trie. This ensures that either
/// all operations are committed or none are (crash atomicity via WAL).
///
/// # Arguments
///
/// * `tx` - The transaction to commit (consumed)
///
/// # Returns
///
/// The total number of operations committed (SETs + INCREMENTs).
///
/// # Errors
///
/// Returns an error if:
/// - The transaction is not in Active state
/// - WAL write fails
/// Takes `&self` (not `&mut self`): both the overlay arm (the production default)
/// and the owned arm apply via interior mutability, so an `Arc<PersistentARTrieChar>`
/// can commit chunked transactions without exclusive access — required by lock-free
/// embedders that also arm `enable_eviction` (which needs a bare `Arc`, not `&mut`).
pub fn commit_document(&self, mut tx: CharDocumentTransaction<V>) -> Result<usize>
where
V: Clone,
{
if tx.state != TransactionState::Active {
return Err(PersistentARTrieError::InvalidOperation(format!(
"Cannot commit a {} transaction",
match tx.state {
TransactionState::Committed => "committed",
TransactionState::Aborted => "aborted",
TransactionState::Active => unreachable!(),
}
)));
}
if let Some(reason) = tx.failure_reason() {
return Err(PersistentARTrieError::InvalidOperation(format!(
"Cannot commit failed transaction {}: {}",
tx.document_id(),
reason
)));
}
let set_count = tx.shadow_terms.len();
let increment_count = tx.increments.len();
// L3.3: the overlay is the sole representation. Per-op durable, NOT batch-atomic.
// SETs via upsert (valued) / membership insert; increments via the proven add-only
// overlay counter (counter-monomorph only) with a NEGATIVE-aggregate reject
// preflight (char's owned aggregation checked overflow only, not sign). No
// BeginTx/CommitTx/sync — each primitive writes its own durable, ranked record
// (matches owned recovery, which ignored tx brackets on replay).
let total_operations = set_count + increment_count;
// Aggregate increments + reject a negative aggregate BEFORE applying any SET, so a
// rejected commit applies nothing (closer to all-or-nothing on reject).
let mut aggregated: HashMap<Vec<u8>, i64> = HashMap::with_capacity(increment_count);
for (term_bytes, delta) in &tx.increments {
let e = aggregated.entry(term_bytes.clone()).or_insert(0);
*e = e.checked_add(*delta).ok_or_else(|| {
PersistentARTrieError::InvalidOperation(format!(
"transaction increment aggregate overflow for term {:?}",
String::from_utf8_lossy(term_bytes)
))
})?;
}
for (term_bytes, agg) in &aggregated {
if *agg < 0 {
return Err(PersistentARTrieError::InvalidOperation(format!(
"overlay document-tx increment aggregate for term {:?} is negative \
({}); the overlay counter is add-only",
String::from_utf8_lossy(term_bytes),
agg
)));
}
}
// Apply SETs: upsert (valued) / membership insert (None).
for (term_bytes, value) in tx.shadow_terms.drain(..) {
match value {
Some(v) => {
<Self as DurableOverlayWrite<CharKey, V, S>>::upsert_cas_durable_default(
self,
&term_bytes,
v,
)?;
}
None => {
let term_str = String::from_utf8_lossy(&term_bytes).into_owned();
self.insert_cas_durable(&term_str)?;
}
}
}
// Apply increments (counter-monomorph only; route_increment downcasts to u64 and
// returns None for a non-counter V).
for (term_bytes, agg) in aggregated {
if agg == 0 {
continue;
}
let term_str = String::from_utf8_lossy(&term_bytes).into_owned();
match super::lockfree_value_route::route_increment(self, &term_str, agg) {
Some(r) => {
r?;
}
None => {
return Err(PersistentARTrieError::InvalidOperation(
"overlay document-tx increments require a counter value type (u64)"
.to_string(),
));
}
}
}
tx.increments.clear();
tx.state = TransactionState::Committed;
Ok(total_operations)
}
/// Abort a document transaction, discarding all buffered terms.
///
/// This method logs AbortTx to WAL and discards the buffered terms.
/// No terms are inserted into the trie.
///
/// # Arguments
///
/// * `tx` - The transaction to abort (consumed)
///
/// # Errors
///
/// Returns an error if:
/// - The transaction is not in Active state
/// - WAL write fails
pub fn abort_document(&self, mut tx: CharDocumentTransaction<V>) -> Result<()> {
if tx.state != TransactionState::Active {
return Err(PersistentARTrieError::InvalidOperation(format!(
"Cannot abort a {} transaction",
match tx.state {
TransactionState::Committed => "committed",
TransactionState::Aborted => "aborted",
TransactionState::Active => unreachable!(),
}
)));
}
// L3.3: the overlay tx buffered nothing visible (no BeginTx written), so there is
// nothing to bracket-abort — just discard the shadow (consumed `tx` drops it).
tx.state = TransactionState::Aborted;
Ok(())
}
}