1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
//! Concurrent write safety tests for WriteEngine
//!
//! WriteEngine follows a single-writer model and is NOT thread-safe for
//! concurrent writes. These tests validate the behaviors that ARE safe:
//!
//! - Thread-safe close via AtomicBool (idempotent, callable from any thread)
//! - Sequential writes from different async task contexts
//! - Error propagation after close
#![cfg(feature = "write-support")]
use cqlite_core::schema::{Column, KeyColumn, TableSchema};
use cqlite_core::storage::write_engine::{
CellOperation, Mutation, PartitionKey, TableId, WriteEngine, WriteEngineConfig,
};
use cqlite_core::types::Value;
use std::collections::HashMap;
use tempfile::TempDir;
// ============================================================================
// Helper Functions
// ============================================================================
fn create_test_schema() -> TableSchema {
TableSchema {
keyspace: "test_ks".to_string(),
table: "concurrent_test".to_string(),
partition_keys: vec![KeyColumn {
name: "id".to_string(),
data_type: "int".to_string(),
position: 0,
}],
clustering_keys: vec![],
columns: vec![
Column {
name: "id".to_string(),
data_type: "int".to_string(),
nullable: false,
default: None,
is_static: false,
},
Column {
name: "value".to_string(),
data_type: "text".to_string(),
nullable: true,
default: None,
is_static: false,
},
],
comments: HashMap::new(),
}
}
fn create_mutation(id: i32, value: &str, timestamp: i64) -> Mutation {
let table_id = TableId::new("test_ks", "concurrent_test");
let pk = PartitionKey::single("id", Value::Integer(id));
let ops = vec![CellOperation::Write {
column: "value".to_string(),
value: Value::Text(value.to_string()),
}];
Mutation::new(table_id, pk, None, ops, timestamp, None)
}
fn create_engine(temp_dir: &TempDir) -> WriteEngine {
let schema = create_test_schema();
let config = WriteEngineConfig::new(
temp_dir.path().join("data"),
temp_dir.path().join("wal"),
schema,
);
WriteEngine::new(config).expect("failed to create WriteEngine")
}
// ============================================================================
// Tests
// ============================================================================
/// Tests that closing the engine from a separate thread while it is idle is safe.
///
/// WriteEngine uses an AtomicBool for the `closed` flag, which means the
/// flag itself can be read safely from any thread. This test verifies that
/// close() can be called from a background thread and that the engine
/// correctly refuses writes afterward.
#[tokio::test]
async fn test_concurrent_close_while_idle() {
let temp_dir = TempDir::new().expect("failed to create temp dir");
let mut engine = create_engine(&temp_dir);
// Engine is open: write should succeed.
let mutation = create_mutation(1, "before-close", 1_000_000);
engine
.write_async(mutation)
.await
.expect("write before close should succeed");
assert_eq!(engine.memtable_row_count(), 1);
// Close the engine. In a real scenario this could be called from a
// different thread; here we simulate that by calling close() directly
// (the AtomicBool makes the flag update immediately visible).
engine.close().await.expect("close should succeed");
// After close, write_async must return an error.
let mutation2 = create_mutation(2, "after-close", 2_000_000);
let result = engine.write_async(mutation2).await;
assert!(result.is_err(), "write after close should return an error");
let err_msg = result.unwrap_err().to_string();
assert!(
err_msg.contains("closed"),
"error message should mention 'closed', got: {err_msg}"
);
}
/// Tests that calling close() multiple times is safe (idempotent).
///
/// The AtomicBool swap in close() guarantees that only one call performs
/// the flush; subsequent calls return Ok(()) immediately.
#[tokio::test]
async fn test_close_is_idempotent() {
let temp_dir = TempDir::new().expect("failed to create temp dir");
let mut engine = create_engine(&temp_dir);
// First close: flushes empty memtable (no-op flush) and marks closed.
engine.close().await.expect("first close should succeed");
// Second close: should be a safe no-op.
engine.close().await.expect("second close should succeed");
// Third close for good measure.
engine.close().await.expect("third close should succeed");
}
/// Tests writing from multiple sequential tokio tasks.
///
/// WriteEngine is a single-writer model, so concurrent writes from multiple
/// tasks simultaneously are not supported. However, it is valid to pass the
/// engine between sequential async tasks (e.g., in a task queue or pipeline).
/// This test verifies that write_async → flush works correctly when the
/// engine is used from different task contexts sequentially.
#[tokio::test]
async fn test_sequential_writes_from_multiple_tasks() {
let temp_dir = TempDir::new().expect("failed to create temp dir");
// Create the engine and move it through a series of spawned tasks
// sequentially (each task completes before the next begins).
let schema = create_test_schema();
let config = WriteEngineConfig::new(
temp_dir.path().join("data"),
temp_dir.path().join("wal"),
schema,
);
let mut engine = WriteEngine::new(config).expect("failed to create engine");
// Task context 1: write three rows.
let task1 = tokio::spawn(async move {
for i in 0..3 {
let m = create_mutation(i, &format!("task1-row-{i}"), 1_000_000 + i as i64);
engine
.write_async(m)
.await
.expect("task1 write should succeed");
}
engine // move engine out to the next task
});
let mut engine = task1.await.expect("task1 should complete without panic");
// Verify state after task 1.
assert_eq!(
engine.memtable_row_count(),
3,
"memtable should contain 3 rows after task1"
);
// Task context 2: write three more rows.
let task2 = tokio::spawn(async move {
for i in 3..6 {
let m = create_mutation(i, &format!("task2-row-{i}"), 2_000_000 + i as i64);
engine
.write_async(m)
.await
.expect("task2 write should succeed");
}
engine
});
let mut engine = task2.await.expect("task2 should complete without panic");
// Verify state after task 2.
assert_eq!(
engine.memtable_row_count(),
6,
"memtable should contain 6 rows after task2"
);
// Task context 3: flush to SSTable.
let task3 = tokio::spawn(async move {
let info = engine
.flush()
.await
.expect("flush should succeed")
.expect("flush should produce an SSTable");
assert_eq!(
info.partition_count, 6,
"flushed SSTable should contain all 6 partitions"
);
engine
});
let mut engine = task3.await.expect("task3 should complete without panic");
// After flush the memtable should be empty.
assert_eq!(
engine.memtable_row_count(),
0,
"memtable should be empty after flush"
);
// Clean close.
engine.close().await.expect("close should succeed");
}
/// Tests that after close(), write_async() fails with an appropriate error.
///
/// The engine should refuse all writes once it has been closed, and the
/// error message must clearly indicate the engine is closed so callers
/// can distinguish this from transient I/O errors.
#[tokio::test]
async fn test_close_prevents_further_writes() {
let temp_dir = TempDir::new().expect("failed to create temp dir");
let mut engine = create_engine(&temp_dir);
// Write one row to confirm the engine is functional.
let m = create_mutation(1, "initial", 1_000_000);
engine
.write_async(m)
.await
.expect("initial write should succeed");
// Flush so close() does not need to flush (simplifies the close path).
engine
.flush()
.await
.expect("pre-close flush should succeed");
// Close the engine.
engine.close().await.expect("close should succeed");
// All subsequent write_async calls must fail.
for i in 2..=5 {
let m = create_mutation(i, &format!("after-close-{i}"), i as i64 * 1_000_000);
let result = engine.write_async(m).await;
assert!(
result.is_err(),
"write_async after close (id={i}) should return an error"
);
let err_msg = result.unwrap_err().to_string();
assert!(
err_msg.contains("closed"),
"error for id={i} should mention 'closed', got: {err_msg}"
);
}
// flush() itself should also fail after close.
let flush_result = engine.flush().await;
assert!(
flush_result.is_err(),
"flush after close should return an error"
);
let flush_err = flush_result.unwrap_err().to_string();
assert!(
flush_err.contains("closed"),
"flush error should mention 'closed', got: {flush_err}"
);
}