1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
//! Regression for Issue #24: `export_iceberg()` must hold a
//! `SnapshotPin` for the lifetime of the export call, otherwise
//! concurrent compaction + `gc_pending_deletions` can delete a
//! Parquet file out from under the in-flight export. Before the fix,
//! `export_iceberg` delegated directly to the catalog with no pin —
//! the same BUG-0007..0013 hazard the read path was fixed for, minus
//! the fix on this call site.
//!
//! This test is deterministic (no timing races): it builds the
//! `export_iceberg` future, polls it once to advance to the first
//! `.await` (which happens after the pin is acquired), then inspects
//! `min_pinned_snapshot()`. With the fix, the pin is observably held.
//! Without the fix, the pin is never acquired and the inspection
//! returns `None`.
use std::sync::Arc;
use merutable::engine::{EngineConfig, MeruEngine};
use merutable::types::{
schema::{ColumnDef, ColumnType, TableSchema},
value::{FieldValue, Row},
};
use tempfile::TempDir;
fn schema() -> TableSchema {
TableSchema {
table_name: "exp".into(),
columns: vec![ColumnDef {
name: "id".into(),
col_type: ColumnType::Int64,
nullable: false,
..Default::default()
}],
primary_key: vec![0],
..Default::default()
}
}
fn config(tmp: &TempDir) -> EngineConfig {
EngineConfig {
schema: schema(),
catalog_uri: tmp.path().to_string_lossy().to_string(),
object_store_prefix: tmp.path().to_string_lossy().to_string(),
wal_dir: tmp.path().join("wal"),
flush_parallelism: 0,
compaction_parallelism: 0,
..Default::default()
}
}
#[tokio::test]
async fn export_iceberg_holds_snapshot_pin_for_lifetime_of_call() {
let tmp = tempfile::tempdir().unwrap();
let engine: Arc<MeruEngine> = MeruEngine::open(config(&tmp)).await.unwrap();
// Populate so the catalog has a non-empty snapshot — exercises
// the full export path, not just the empty-metadata fast return.
for i in 0..10i64 {
engine
.put(
vec![FieldValue::Int64(i)],
Row::new(vec![Some(FieldValue::Int64(i))]),
)
.await
.unwrap();
}
engine.flush().await.unwrap();
assert_eq!(
engine.min_pinned_snapshot(),
None,
"precondition: no pin before export"
);
// Build the export future and poll it exactly once. The `.await`
// inside `catalog.export_to_iceberg` hits a tokio::fs::write that
// returns Pending on the first poll, so control comes back to us
// with the pin alive. If `export_iceberg` did not acquire a pin
// (the Issue #24 regression), nothing observable changes between
// "before poll" and "after poll — exported future parked".
let export_dir = tmp.path().join("exported");
let fut = engine.export_iceberg(export_dir.clone());
tokio::pin!(fut);
// One poll to advance the future past pin acquisition and into
// the first catalog await. `poll!` returns `Poll::Pending` on
// success (the future parked on the fs.write completion); if it
// returns Ready, the export finished synchronously (possible on
// ultra-fast systems) — in which case we skip the mid-flight
// observation and rely on the next-assertion loop.
let first_poll = futures::poll!(&mut fut);
match first_poll {
std::task::Poll::Pending => {
// Pin must be observably held while the future is parked.
assert!(
engine.min_pinned_snapshot().is_some(),
"Issue #24 regression: export_iceberg's future parked \
mid-export but min_pinned_snapshot() is None — the \
pin was never acquired"
);
}
std::task::Poll::Ready(_) => {
// Future completed on first poll — nothing parked, so
// nothing to observe. This path doesn't exercise the
// regression, but also doesn't invalidate it (behavior
// might be OS/filesystem-dependent).
}
}
// Drive to completion.
fut.await.expect("export_iceberg succeeded");
// After the export future has returned, the pin must be released.
assert_eq!(
engine.min_pinned_snapshot(),
None,
"pin must be released after export_iceberg returns"
);
engine.close().await.unwrap();
}