1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
//! # Batch Write
//!
//! This module adds batch write functionality to DbInner. Prior to this feature,
//! writes were performed directly in DbInner's `put_with_options` and
//! `delete_with_options` methods. For each operation, a lock was acquired on the
//! db_state to mutate the WAL or memtable. This worked fine for single writes,
//! but for batch writes, which take longer, it could create contention on the lock
//! because. This is dangerous in an async runtime because it can block the
//! threads, leading to starvation.
//!
//! This module spawns a separate task to handle batch writes. The task receives
//! a `WriteBatchMsg``, which contains a `WriteBatchRequest``. The `WriteBatchRequest`
//! contains a `WriteBatch` containing Put/Delete operations and a `oneshot::Sender`.
//! The `Sender` is used to send the table that the batch was written to back to the
//! caller so the caller can `.await` the result. The result is that callers safely
//! `.await` on their writes rather than holding a lock on the db_state.
//!
//! Centralizing the writes in a single event loop also provides a single location to
//! assign sequence numbers when we implement MVCC.
//!
//! [Pebble](https://github.com/cockroachdb/pebble) has a similar design and
//! [a good write-up](https://github.com/cockroachdb/pebble/blob/master/docs/rocksdb.md#commit-pipeline)
//! describing its benefits.
//!
//! _Note: The `write_batch` loop still holds a lock on the db_state. There can still
//! be contention between `get`s, which holds a lock, and the write loop._
use async_trait::async_trait;
use fail_parallel::fail_point;
use futures::stream::BoxStream;
use futures::StreamExt;
use log::warn;
use std::sync::Arc;
use std::time::Duration;
use tracing::instrument;
use crate::config::WriteOptions;
use crate::dispatcher::MessageHandler;
use crate::types::RowEntry;
use crate::utils::WatchableOnceCellReader;
use crate::{batch::WriteBatch, db::DbInner, db::WriteHandle, error::SlateDBError};
use slatedb_common::clock::SystemClock;
pub(crate) const WRITE_BATCH_TASK_NAME: &str = "writer";
pub(crate) type WriteBatchResult = Result<
(
WriteHandle,
WatchableOnceCellReader<Result<(), SlateDBError>>,
),
SlateDBError,
>;
pub(crate) struct WriteBatchMessage {
pub(crate) batch: WriteBatch,
pub(crate) options: WriteOptions,
pub(crate) done: tokio::sync::oneshot::Sender<WriteBatchResult>,
}
impl std::fmt::Debug for WriteBatchMessage {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let WriteBatchMessage { batch, options, .. } = self;
f.debug_struct("WriteBatch")
.field("batch", batch)
.field("options", options)
.finish()
}
}
pub(crate) struct WriteBatchEventHandler {
db_inner: Arc<DbInner>,
is_first_write: bool,
}
impl WriteBatchEventHandler {
pub(crate) fn new(db_inner: Arc<DbInner>) -> Self {
Self {
db_inner,
is_first_write: true,
}
}
}
#[async_trait]
impl MessageHandler<WriteBatchMessage> for WriteBatchEventHandler {
async fn handle(&mut self, message: WriteBatchMessage) -> Result<(), SlateDBError> {
let WriteBatchMessage {
batch,
options,
done,
} = message;
let result = self.db_inner.write_batch(batch, &options).await;
// if this is the first write and the WAL is disabled, make sure users are flushing
// their memtables in a timely manner.
if self.is_first_write && !self.db_inner.wal_enabled && options.await_durable {
if let Ok((_, this_watcher)) = &result {
let this_watcher = this_watcher.clone();
let this_clock = self.db_inner.system_clock.clone();
tokio::spawn(async move {
monitor_first_write(this_watcher, this_clock).await;
});
}
}
self.is_first_write = false;
_ = done.send(result);
Ok(())
}
async fn cleanup(
&mut self,
mut messages: BoxStream<'async_trait, WriteBatchMessage>,
result: Result<(), SlateDBError>,
) -> Result<(), SlateDBError> {
let error = result.clone().err().unwrap_or(SlateDBError::Closed);
while let Some(msg) = messages.next().await {
let _ = msg.done.send(Err(error.clone()));
}
Ok(())
}
}
impl DbInner {
#[allow(clippy::panic)]
#[instrument(level = "trace", skip_all, fields(batch_size = batch.ops.len()))]
async fn write_batch(&self, batch: WriteBatch, options: &WriteOptions) -> WriteBatchResult {
let _options = options;
#[cfg(not(dst))]
let now = self.mono_clock.now().await?;
#[cfg(dst)]
// Force the current timestamp for DST operations. See #719 for details.
let now = options.now;
let commit_seq = self.oracle.next_seq();
// Check for transaction conflicts before proceeding with the write batch
// if this batch is part of a transaction.
if let Some(txn_id) = batch.txn_id.as_ref() {
if self.txn_manager.check_has_conflict(txn_id) {
return Err(SlateDBError::TransactionConflict);
}
}
// Count batch-local merge folding on the flush path so DB-side merge
// resolution uses one metric for both write batches and memtable flushes.
let entries = batch
.extract_entries(
commit_seq,
now,
self.settings.default_ttl,
self.flush_merge_operator.clone(),
)
.await?;
let durable_watcher = if self.wal_enabled {
// WAL entries must be appended to the wal buffer atomically. Otherwise,
// the WAL buffer might flush the entries in the middle of the batch, which
// would violate the guarantee that batches are written atomically. We do
// this by appending the entire entry batch in a single call to the WAL buffer,
// which holds a write lock during the append.
let wal_watcher = self.wal_buffer.append(&entries)?;
self.wal_buffer.maybe_trigger_flush()?;
// TODO: handle sync here, if sync is enabled, we can call `flush` here. let's put this
// in another Pull Request.
self.write_entries_to_memtable(entries);
wal_watcher
} else {
// if WAL is disabled, we just write the entries to memtable.
self.write_entries_to_memtable(entries)
};
// update the last_applied_seq to wal buffer. if a chunk of WAL entries are applied to the memtable
// and flushed to the remote storage, WAL buffer manager will recycle these WAL entries.
self.wal_buffer.track_last_applied_seq(commit_seq);
// insert a fail point to make it easier to test the case where the last_committed_seq is not updated.
// this is useful for testing the case where the reader is not able to see the writes.
fail_point!(
Arc::clone(&self.fp_registry),
"write-batch-pre-commit",
|_| { Err(SlateDBError::from(std::io::Error::other("oops"))) }
);
// track the recent committed txn for conflict check. if txn_id is not supplied,
// we still consider this as an transaction commit.
if let Some(txn_id) = &batch.txn_id {
self.txn_manager
.track_recent_committed_txn(txn_id, commit_seq);
} else {
let write_keys = batch.keys();
self.txn_manager
.track_recent_committed_write_batch(&write_keys, commit_seq);
}
// insert a fail point to make it easier to test the case where the transaction is committed but
// but remaining work hasn't been done. this is useful for testing that transaction commits and
// commited seqnums get updated in lock-step. See #1301 for details.
fail_point!(
Arc::clone(&self.fp_registry),
"write-batch-post-commit",
|_| { Err(SlateDBError::from(std::io::Error::other("oops"))) }
);
// record the memtable sequence in the memtable's sequence tracker.
self.record_memtable_sequence(commit_seq);
// maybe freeze the memtable.
self.maybe_freeze_current_memtable()?;
let write_handle = WriteHandle::new(commit_seq, now);
Ok((write_handle, durable_watcher))
}
/// Write entries to the currently active memtable. Returns a durable watcher for the memtable.
fn write_entries_to_memtable(
&self,
entries: Vec<RowEntry>,
) -> WatchableOnceCellReader<Result<(), SlateDBError>> {
let guard = self.state.read();
let memtable = guard.memtable();
entries.into_iter().for_each(|entry| memtable.put(entry));
memtable.table().durable_watcher()
}
fn record_memtable_sequence(&self, seq: u64) {
let ts = self.system_clock.now();
let guard = self.state.read();
guard.memtable().record_sequence(seq, ts);
}
}
async fn monitor_first_write(
mut watcher: WatchableOnceCellReader<Result<(), SlateDBError>>,
system_clock: Arc<dyn SystemClock>,
) {
tokio::select! {
_ = watcher.await_value() => {}
_ = system_clock.sleep(Duration::from_secs(5)) => {
warn!("First write not durable after 5 seconds and WAL is disabled. \
SlateDB does not automatically flush memtables until `l0_sst_size_bytes` \
is reached. If writer is single threaded or has low throughput, the \
applications must call `flush` to ensure durability in a timely manner.");
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::object_store::memory::InMemory;
use crate::Db;
#[tokio::test]
async fn test_is_first_write_set_false_after_first_write() {
let object_store = Arc::new(InMemory::new());
let db = Db::open(
"/tmp/test_is_first_write_set_false_after_first_write",
object_store,
)
.await
.unwrap();
let mut handler = WriteBatchEventHandler::new(db.inner.clone());
assert!(handler.is_first_write);
let mut batch = WriteBatch::new();
batch.put(b"key", b"value");
let (done_tx, done_rx) = tokio::sync::oneshot::channel();
handler
.handle(WriteBatchMessage {
batch,
options: WriteOptions::default(),
done: done_tx,
})
.await
.unwrap();
let result = done_rx.await.unwrap();
assert!(result.is_ok());
assert!(!handler.is_first_write);
}
}