1use std::collections::BTreeSet;
4use std::io;
5
6use crate::infinitedb_core::{
7 address::{Address, DimensionVector, RevisionId, SpaceId},
8 block::Record,
9};
10use crate::infinitedb_storage::wal::{WalDurability, WalEntry, WalWriter};
11
12use super::super::InfiniteDb;
13
14pub const DEFAULT_BULK_SYNC_EVERY: usize = 4096;
16
17pub const DEFAULT_BULK_FLUSH_THRESHOLD: usize = 8192;
19
20#[derive(Debug, Clone)]
22pub struct BulkWriteOptions {
23 pub sync_every: usize,
24 pub flush_threshold: usize,
25}
26
27impl Default for BulkWriteOptions {
28 fn default() -> Self {
29 Self {
30 sync_every: DEFAULT_BULK_SYNC_EVERY,
31 flush_threshold: DEFAULT_BULK_FLUSH_THRESHOLD,
32 }
33 }
34}
35
36#[derive(Debug, Clone, PartialEq, Eq)]
38pub struct BulkWriteResult {
39 pub count: usize,
40 pub wal_frames: usize,
41 pub first_revision: Option<RevisionId>,
42 pub last_revision: Option<RevisionId>,
43}
44
45pub type BulkImportResult = BulkWriteResult;
47
48pub struct BulkSessionCore<'a> {
50 pub(super) db: &'a mut InfiniteDb,
51 saved_durability: WalDurability,
52 saved_flush_threshold: usize,
53 pub(super) touched_spaces: BTreeSet<u64>,
54 pub(super) count: usize,
55 pub(super) wal_frames: usize,
56 pub(super) first_revision: Option<RevisionId>,
57 pub(super) last_revision: Option<RevisionId>,
58 finished: bool,
59 #[cfg(feature = "sync")]
60 pub(super) pending_sync: Vec<crate::infinitedb_sync::transport::SyncOperation>,
61}
62
63impl<'a> BulkSessionCore<'a> {
64 pub fn begin(db: &'a mut InfiniteDb, options: BulkWriteOptions) -> io::Result<Self> {
66 if db.bulk_session_active {
67 return Err(io::Error::new(
68 io::ErrorKind::AlreadyExists,
69 "a bulk write session is already active on this database",
70 ));
71 }
72 db.bulk_session_active = true;
73 let saved_durability = db.wal.durability();
74 let saved_flush_threshold = db.flush_threshold;
75 db.wal.set_durability(WalDurability::Buffered {
76 sync_every: options.sync_every.max(1),
77 });
78 db.flush_threshold = options.flush_threshold;
79 db.defer_auto_flush = true;
80 Ok(Self {
81 db,
82 saved_durability,
83 saved_flush_threshold,
84 touched_spaces: BTreeSet::new(),
85 count: 0,
86 wal_frames: 0,
87 first_revision: None,
88 last_revision: None,
89 finished: false,
90 #[cfg(feature = "sync")]
91 pending_sync: Vec::new(),
92 })
93 }
94
95 pub fn touch_space(&mut self, space: SpaceId) {
96 self.touched_spaces.insert(space.0);
97 }
98
99 pub fn record_operation(&mut self, rev: RevisionId, wal_frame_count: usize) {
100 self.count += 1;
101 self.wal_frames += wal_frame_count;
102 self.first_revision = Some(self.first_revision.unwrap_or(rev));
103 self.last_revision = Some(rev);
104 }
105
106 pub fn push_write(
108 &mut self,
109 space: SpaceId,
110 point: DimensionVector,
111 data: Vec<u8>,
112 ) -> io::Result<RevisionId> {
113 let rev = self.db.next_revision();
114 let address = Address::new(space, point);
115 self.push_row_raw(
116 WalEntry::Write {
117 address: address.clone(),
118 revision: rev,
119 data: data.clone(),
120 },
121 Record {
122 address,
123 revision: rev,
124 data,
125 tombstone: false,
126 },
127 )?;
128 self.touch_space(space);
129 self.record_operation(rev, 1);
130 Ok(rev)
131 }
132
133 pub fn push_tombstone(
135 &mut self,
136 space: SpaceId,
137 point: DimensionVector,
138 ) -> io::Result<RevisionId> {
139 let rev = self.db.next_revision();
140 let address = Address::new(space, point);
141 self.push_row_raw(
142 WalEntry::Tombstone {
143 address: address.clone(),
144 revision: rev,
145 },
146 Record {
147 address,
148 revision: rev,
149 data: vec![],
150 tombstone: true,
151 },
152 )?;
153 self.touch_space(space);
154 self.record_operation(rev, 1);
155 Ok(rev)
156 }
157
158 pub fn push_rows(&mut self, rows: Vec<(WalEntry, Record)>) -> io::Result<RevisionId> {
160 if rows.is_empty() {
161 return Err(io::Error::new(
162 io::ErrorKind::InvalidInput,
163 "bulk push_rows requires at least one row",
164 ));
165 }
166 let rev = rows[0].1.revision;
167 let n = rows.len();
168 for (entry, record) in rows {
169 self.push_row_raw(entry, record)?;
170 }
171 self.record_operation(rev, n);
172 Ok(rev)
173 }
174
175 pub fn push_rows_only(&mut self, rows: Vec<(WalEntry, Record)>) -> io::Result<RevisionId> {
177 if rows.is_empty() {
178 return Err(io::Error::new(
179 io::ErrorKind::InvalidInput,
180 "bulk push_rows_only requires at least one row",
181 ));
182 }
183 let rev = rows[0].1.revision;
184 for (entry, record) in rows {
185 self.push_row_raw(entry, record)?;
186 }
187 Ok(rev)
188 }
189
190 pub fn push_row_raw(&mut self, entry: WalEntry, record: Record) -> io::Result<()> {
191 self.db.wal.append_frame(&entry)?;
192 self.db.buffer.push(record);
193 Self::maybe_sync(self.db.wal_mut())
194 }
195
196 fn maybe_sync(wal: &mut WalWriter) -> io::Result<()> {
197 let every = wal.durability().sync_every();
198 if wal.pending_frames() >= every {
199 wal.sync()?;
200 }
201 Ok(())
202 }
203
204 #[cfg(feature = "sync")]
205 pub fn defer_sync(&mut self, op: crate::infinitedb_sync::transport::SyncOperation) {
206 self.pending_sync.push(op);
207 }
208
209 #[cfg(feature = "sync")]
210 fn flush_sync_ops(&mut self) -> io::Result<()> {
211 let ops = std::mem::take(&mut self.pending_sync);
212 for op in ops {
213 self.db.enqueue_sync(op)?;
214 }
215 Ok(())
216 }
217
218 pub fn result_snapshot(&self) -> BulkWriteResult {
219 BulkWriteResult {
220 count: self.count,
221 wal_frames: self.wal_frames,
222 first_revision: self.first_revision,
223 last_revision: self.last_revision,
224 }
225 }
226
227 pub fn finish(mut self) -> io::Result<BulkWriteResult> {
228 self.db.wal.sync()?;
229 for &space_id in self.touched_spaces.clone().iter() {
230 self.db.flush(SpaceId(space_id))?;
231 }
232 #[cfg(feature = "sync")]
233 self.flush_sync_ops()?;
234 self.restore_inner()?;
235 self.finished = true;
236 Ok(self.result_snapshot())
237 }
238
239 fn restore_inner(&mut self) -> io::Result<()> {
240 self.db.wal.set_durability(self.saved_durability);
241 self.db.flush_threshold = self.saved_flush_threshold;
242 self.db.defer_auto_flush = false;
243 self.db.bulk_session_active = false;
244 Ok(())
245 }
246}
247
248impl Drop for BulkSessionCore<'_> {
249 fn drop(&mut self) {
250 if !self.finished {
251 let _ = self.restore_inner();
252 }
253 }
254}
255
256#[cfg(test)]
257mod tests {
258 use super::*;
259 use tempfile::TempDir;
260
261 #[test]
262 fn second_bulk_session_returns_already_exists() {
263 let dir = TempDir::new().unwrap();
264 let mut db = InfiniteDb::open(dir.path()).unwrap();
265 db.bulk_session_active = true;
266 let second = BulkSessionCore::begin(&mut db, BulkWriteOptions::default());
267 assert!(matches!(
268 second,
269 Err(e) if e.kind() == io::ErrorKind::AlreadyExists
270 ));
271 }
272
273 #[test]
274 fn drop_without_finish_clears_active_flag() {
275 let dir = TempDir::new().unwrap();
276 let mut db = InfiniteDb::open(dir.path()).unwrap();
277 let session = BulkSessionCore::begin(&mut db, BulkWriteOptions::default()).unwrap();
278 drop(session);
279 assert!(!db.bulk_session_active);
280 }
281}
282
283impl InfiniteDb {
284 pub(crate) fn wal_mut(&mut self) -> &mut WalWriter {
285 &mut self.wal
286 }
287
288 pub fn sync_wal(&mut self) -> std::io::Result<()> {
290 self.wal.sync()
291 }
292
293 pub(crate) fn apply_prepared_writes_strict(
294 &mut self,
295 rows: Vec<(WalEntry, Record)>,
296 ) -> std::io::Result<RevisionId> {
297 if rows.is_empty() {
298 return Err(std::io::Error::new(
299 std::io::ErrorKind::InvalidInput,
300 "no rows to write",
301 ));
302 }
303 let rev = rows[0].1.revision;
304 for (entry, record) in rows {
305 self.wal.append(&entry)?;
306 self.buffer.push(record);
307 }
308 Ok(rev)
309 }
310}