batpak 0.9.0

Event sourcing with causal graphs and caller-defined gates. Sync API, no async runtime.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
//! Sealed-segment compaction with off-side index rebuild and single swap point.

use super::sync;
use crate::coordinate::Coordinate;
use crate::event::{Event, EventKind, StoredEvent};
use crate::store::file_classification::StoreFileKind;
use crate::store::lifecycle_close::write_cold_start_artifacts_on_close;
use crate::store::platform::fs::StoreFs;
use crate::store::segment::scan as reader;
use crate::store::segment::{self, Active, FramePayload};
use crate::store::{CompactionConfig, CompactionStrategy, Open, Store, StoreError};

pub(crate) fn compact(
    store: &Store<Open>,
    config: &CompactionConfig,
) -> Result<
    (
        segment::CompactionResult,
        crate::store::compaction_report::CompactionReportBody,
    ),
    StoreError,
> {
    tracing::debug!(target: "batpak::flow", flow = "compact");
    let fs = store.config.fs();
    let _lifecycle = store.lifecycle_gate.lock();
    sync(store)?;

    let mut all_segments: Vec<(u64, std::path::PathBuf)> = Vec::new();
    for entry in fs
        .read_dir(&store.config.data_dir)
        .map_err(StoreError::Io)?
    {
        let entry = entry.map_err(StoreError::Io)?;
        let path = entry.path();
        let seg_id = match StoreFileKind::from_path(&path) {
            StoreFileKind::Segment(segment_id) => segment_id.as_u64(),
            StoreFileKind::MalformedSegment(error) => {
                tracing::warn!(
                    path = %path.display(),
                    %error,
                    "skipping malformed segment filename"
                );
                continue;
            }
            StoreFileKind::VisibilityRanges
            | StoreFileKind::Checkpoint
            | StoreFileKind::MmapIndex
            | StoreFileKind::IdempotencyStore
            | StoreFileKind::PendingCompactionMarker
            | StoreFileKind::CompactSource
            | StoreFileKind::CursorDirectory
            | StoreFileKind::Keyset
            | StoreFileKind::Other => continue,
        };
        all_segments.push((seg_id, path));
    }
    all_segments.sort_by_key(|(id, _)| *id);

    let active_segment_id = all_segments.last().map(|(id, _)| *id).unwrap_or(0);
    let mut sealed: Vec<(u64, std::path::PathBuf)> = all_segments
        .into_iter()
        .filter(|(id, _)| *id < active_segment_id)
        .collect();

    if sealed.len() < config.min_segments {
        let result = segment::CompactionResult {
            outcome: segment::CompactionOutcome::Skipped,
            segments_removed: 0,
            bytes_reclaimed: 0,
        };
        let report =
            crate::store::compaction_report::report_skipped(config, active_segment_id, &sealed)?;
        return Ok((result, report));
    }

    let merged_id = sealed[0].0;
    let merged_path = store
        .config
        .data_dir
        .join(segment::segment_filename(merged_id));
    let source_segment_ids: Vec<u64> = sealed.iter().map(|(seg_id, _)| *seg_id).collect();
    let mut compact_source_path = None;

    crate::store::cold_start::rebuild::write_pending_compaction(
        &store.config.data_dir,
        merged_id,
        &source_segment_ids,
        fs.as_ref(),
    )?;

    let fresh_index = match materialize_compacted_segment(
        store,
        &config.strategy,
        &mut sealed,
        merged_id,
        &merged_path,
        &mut compact_source_path,
    )
    .and_then(|_| rebuild_fresh_compaction_index(store))
    {
        Ok(fresh_index) => fresh_index,
        Err(error) => {
            return failed_compaction_with_rollback(&FailedCompactionCtx {
                config,
                active_segment_id,
                sealed: &sealed,
                merged_segment_id: merged_id,
                data_dir: &store.config.data_dir,
                merged_path: &merged_path,
                compact_source_path: compact_source_path.as_deref(),
                error: &error,
                context: "compaction pre-swap phase failed",
                fs: fs.as_ref(),
            });
        }
    };

    store.index.replace_contents_from_fresh(fresh_index)?;

    let mut bytes_reclaimed = 0_u64;
    let mut segments_removed = 0_usize;
    for (_, path) in &sealed {
        if let Ok(meta) = fs.metadata(path) {
            bytes_reclaimed += meta.len();
        }
        fs.remove_file(path).map_err(StoreError::Io)?;
        segments_removed += 1;
    }

    if let Some(temp_source_path) = compact_source_path {
        fs.remove_file_if_present(&temp_source_path)
            .map_err(StoreError::Io)?;
    }
    crate::store::cold_start::rebuild::clear_pending_compaction(
        &store.config.data_dir,
        fs.as_ref(),
    )?;

    let frontier = store.index.global_sequence();
    store.index.mark_idemp_evicted_against_live();
    let eviction = store.index.idemp.evict(frontier);
    tracing::debug!(
        target: "batpak::idemp",
        flow = "compact",
        frontier,
        aged_out = eviction.aged_out,
        cap_trimmed = eviction.cap_trimmed_out_of_window,
        within_window_exceeds_cap = eviction.within_window_exceeds_cap,
        remaining = eviction.remaining,
        "applied window-priority idempotency eviction after compaction"
    );

    store
        .index
        .idemp
        .flush(&store.config.data_dir, fs.as_ref())?;

    if let Err(e) = write_cold_start_artifacts_on_close(store) {
        tracing::warn!("post-compaction cold-start artifact write failed: {e}");
    }

    let result = segment::CompactionResult {
        outcome: segment::CompactionOutcome::Performed,
        segments_removed,
        bytes_reclaimed,
    };
    let report = crate::store::compaction_report::report_for_run(
        config,
        active_segment_id,
        &sealed,
        Some(merged_id),
        &result,
        Some(&merged_path),
    )?;
    Ok((result, report))
}

fn rollback_compaction_disk_state(
    data_dir: &std::path::Path,
    merged_path: &std::path::Path,
    compact_source_path: Option<&std::path::Path>,
    fs: &dyn StoreFs,
) -> Result<(), StoreError> {
    fs.remove_file_if_present(merged_path)
        .map_err(StoreError::Io)?;
    if let Some(temp_source_path) = compact_source_path {
        fs.rename(temp_source_path, merged_path)
            .map_err(StoreError::Io)?;
    }
    crate::store::cold_start::rebuild::clear_pending_compaction(data_dir, fs)?;
    Ok(())
}

struct FailedCompactionCtx<'a> {
    config: &'a CompactionConfig,
    active_segment_id: u64,
    sealed: &'a [(u64, std::path::PathBuf)],
    merged_segment_id: u64,
    data_dir: &'a std::path::Path,
    merged_path: &'a std::path::Path,
    compact_source_path: Option<&'a std::path::Path>,
    error: &'a StoreError,
    context: &'a str,
    fs: &'a dyn StoreFs,
}

fn failed_compaction_with_rollback(
    ctx: &FailedCompactionCtx<'_>,
) -> Result<
    (
        segment::CompactionResult,
        crate::store::compaction_report::CompactionReportBody,
    ),
    StoreError,
> {
    rollback_compaction_disk_state(
        ctx.data_dir,
        ctx.merged_path,
        ctx.compact_source_path,
        ctx.fs,
    )?;
    let reason = format!("{}; disk layout rolled back: {}", ctx.context, ctx.error);
    tracing::error!(target: "batpak::flow", flow = "compact", error = %ctx.error, "{reason}");
    let result = segment::CompactionResult {
        outcome: segment::CompactionOutcome::Failed {
            reason: reason.clone(),
        },
        segments_removed: 0,
        bytes_reclaimed: 0,
    };
    let report = crate::store::compaction_report::report_for_run(
        ctx.config,
        ctx.active_segment_id,
        ctx.sealed,
        Some(ctx.merged_segment_id),
        &result,
        None,
    )?;
    Ok((result, report))
}

fn scan_sealed_entries(
    store: &Store<Open>,
    sealed: &[(u64, std::path::PathBuf)],
) -> Result<Vec<reader::ScannedEntry>, StoreError> {
    let mut all_events = Vec::new();
    for (_, path) in sealed {
        all_events.extend(store.reader.scan_segment(path)?);
    }
    Ok(all_events)
}

fn scanned_entry_as_stored_event(
    entry: &reader::ScannedEntry,
) -> Result<StoredEvent<serde_json::Value>, StoreError> {
    Ok(StoredEvent {
        coordinate: Coordinate::new(&entry.entity, &entry.scope)?,
        event: entry.event.clone(),
    })
}

/// Evaluate a Retention/Tombstone predicate against an event's payload, giving
/// the predicate the DECRYPTED plaintext view of an encrypted event (Stage E1).
///
/// Returns whether the predicate KEEPS the event (Retention: survives the drop;
/// Tombstone: `true` ⇒ NOT rewritten to a tombstone). A crypto-shredded event
/// cannot be predicate-evaluated — its plaintext is permanently destroyed — so
/// the conservative default is to KEEP it: you cannot decide to DROP what you
/// cannot read, and keeping never silently loses data the operator did not ask
/// to erase. Either way the write side re-emits the original CIPHERTEXT bytes
/// verbatim, so a survivor's `event_hash` stays byte-stable.
fn compaction_predicate_keeps(
    store: &Store<Open>,
    entry: &reader::ScannedEntry,
    predicate: &crate::store::RetentionPredicate,
) -> Result<bool, StoreError> {
    #[cfg(feature = "payload-encryption")]
    if entry.event.header.payload_encryption.is_some() {
        return Ok(match decrypt_compaction_payload(store, entry)? {
            Some(stored) => predicate(&stored),
            None => true, // Shredded → conservative KEEP.
        });
    }
    #[cfg(not(feature = "payload-encryption"))]
    let _ = store;
    Ok(predicate(&scanned_entry_as_stored_event(entry)?))
}

/// Decrypt an encrypted scanned entry's ciphertext into the predicate's
/// `StoredEvent<Value>` view via the shared Stage C primitive, or `None` when the
/// scope key has been destroyed (a crypto-shred — the plaintext is gone).
#[cfg(feature = "payload-encryption")]
fn decrypt_compaction_payload(
    store: &Store<Open>,
    entry: &reader::ScannedEntry,
) -> Result<Option<StoredEvent<serde_json::Value>>, StoreError> {
    let coordinate = Coordinate::new(&entry.entity, &entry.scope)?;
    let header = &entry.event.header;
    let Some(meta) = header.payload_encryption.as_ref() else {
        // Caller routes only encrypted entries here; a plaintext entry decodes
        // straight from its already-decoded scanned value.
        return Ok(Some(scanned_entry_as_stored_event(entry)?));
    };
    match store.open_encrypted_payload_bytes(
        &coordinate,
        header.event_kind,
        header.event_id,
        meta,
        &entry.payload_bytes,
    )? {
        crate::store::read_api::PayloadPlaintext::Shredded => Ok(None),
        crate::store::read_api::PayloadPlaintext::Plaintext(plaintext) => {
            let value = crate::encoding::from_bytes::<serde_json::Value>(&plaintext)
                .map_err(|e| StoreError::Serialization(Box::new(e)))?;
            Ok(Some(StoredEvent {
                coordinate,
                event: Event {
                    header: entry.event.header.clone(),
                    payload: value,
                    hash_chain: entry.event.hash_chain.clone(),
                },
            }))
        }
    }
}

fn write_scanned_entry(
    merged_segment: &mut segment::Segment<Active>,
    entry: reader::ScannedEntry,
) -> Result<(), StoreError> {
    // Re-emit the survivor's ORIGINAL payload BYTES, never the decoded Value.
    // `entry.event.payload` is the `serde_json::Value` view kept only for the
    // keep/drop predicate; serializing THAT writes a msgpack MAP where the
    // reader's `FramePayload<Vec<u8>>` decode expects raw bytes, making every
    // survivor unreadable ("invalid type: map, expected a sequence"). Rebuilding
    // the frame from `entry.payload_bytes` re-encodes only the outer frame
    // envelope — the user payload is carried verbatim — so a kept frame is
    // byte-identical to the original and its `event_hash` (blake3 over
    // `event.payload`) is byte-stable across compaction. The Tombstone path's
    // in-place `event_kind` mutation rides through `entry.event.header` here.
    let event = Event {
        header: entry.event.header,
        payload: entry.payload_bytes,
        hash_chain: entry.event.hash_chain,
    };
    let frame_payload = FramePayload {
        event,
        entity: entry.entity,
        scope: entry.scope,
        receipt_extensions: entry.receipt_extensions,
    };
    let frame = segment::frame_encode(&frame_payload)?;
    merged_segment.write_frame(&frame)?;
    Ok(())
}

fn relocate_merged_source_if_present(
    store: &Store<Open>,
    sealed: &mut [(u64, std::path::PathBuf)],
    merged_id: u64,
    compact_source_path: &mut Option<std::path::PathBuf>,
) -> Result<(), StoreError> {
    if let Some((_, source_path)) = sealed.iter_mut().find(|(seg_id, _)| *seg_id == merged_id) {
        let fs = store.config.fs();
        let temp_source_path = store.config.data_dir.join(format!(
            "{merged_id:06}.{}.compact-src",
            segment::SEGMENT_EXTENSION
        ));
        fs.remove_file_if_present(&temp_source_path)
            .map_err(StoreError::Io)?;
        fs.rename(&*source_path, &temp_source_path)
            .map_err(StoreError::Io)?;
        *source_path = temp_source_path.clone();
        *compact_source_path = Some(temp_source_path);
    }
    Ok(())
}

fn materialize_compacted_segment(
    store: &Store<Open>,
    strategy: &CompactionStrategy,
    sealed: &mut [(u64, std::path::PathBuf)],
    merged_id: u64,
    merged_path: &std::path::Path,
    compact_source_path: &mut Option<std::path::PathBuf>,
) -> Result<(), StoreError> {
    for (seg_id, _) in sealed.iter() {
        store.reader.evict_segment(*seg_id);
    }

    relocate_merged_source_if_present(store, sealed, merged_id, compact_source_path)?;

    store
        .config
        .fs()
        .remove_file_if_present(merged_path)
        .map_err(StoreError::Io)?;
    let mut merged_segment = segment::Segment::<Active>::create_with_created_ns_on(
        &store.config.data_dir,
        merged_id,
        store.runtime.now_wall_ns(),
        store.config.fs(),
    )?;
    // Crypto-shred coupling — DELIBERATELY NONE (the safe semantics). Retention
    // (drop) and Tombstone (mark) compaction operate per EVENT, but a crypto-shred
    // key is per SCOPE, and under a coarse granularity (the default PerEntity) one
    // key covers every event of an entity. A predicate that drops/tombstones SOME
    // of a scope's events must NOT shred the WHOLE scope's key — that would
    // silently destroy the still-live siblings' plaintext (over-shred). Rather than
    // track "was this the scope's LAST event" here (fragile, and still an implicit
    // shred the operator never asked for), compaction destroys NO payload keys:
    // erasure stays the single explicit `Store::shred_scope` op, which is
    // granularity-agnostic and never over-shreds. Compaction therefore leaves the
    // keyset untouched; a surviving event of a partially-compacted scope still
    // decrypts under its (undestroyed) key.
    match strategy {
        CompactionStrategy::Merge => {
            for (_, path) in sealed.iter() {
                merged_segment.append_frames_from_segment(path)?;
            }
        }
        CompactionStrategy::Retention(predicate) => {
            for entry in scan_sealed_entries(store, sealed)? {
                if compaction_predicate_keeps(store, &entry, predicate)? {
                    write_scanned_entry(&mut merged_segment, entry)?;
                }
            }
        }
        CompactionStrategy::Tombstone(predicate) => {
            let tombstone_kind = EventKind::TOMBSTONE;
            for mut entry in scan_sealed_entries(store, sealed)? {
                if !compaction_predicate_keeps(store, &entry, predicate)? {
                    entry.event.header.event_kind = tombstone_kind;
                }
                write_scanned_entry(&mut merged_segment, entry)?;
            }
        }
    }

    merged_segment.sync_with_mode(&store.config.sync.mode)?;
    let _sealed_segment = merged_segment.seal();
    Ok(())
}

fn rebuild_fresh_compaction_index(
    store: &Store<Open>,
) -> Result<crate::store::index::StoreIndex, StoreError> {
    sync(store)?;

    let fresh_index = crate::store::index::StoreIndex::with_config(&store.config.index);
    crate::store::cold_start::rebuild::rebuild_from_segments(
        &fresh_index,
        &store.reader,
        &store.config.data_dir,
    )?;
    if let Some(ranges) =
        crate::store::hidden_ranges::load_cancelled_ranges(&store.config.data_dir)?
    {
        fresh_index.restore_cancelled_visibility_ranges(ranges);
    }

    Ok(fresh_index)
}

#[cfg(test)]
#[path = "lifecycle_compact_mutation_kill.rs"]
mod lifecycle_compact_mutation_kill;