Skip to main content

infinite_db/bulk/
session.rs

1//! Shared bulk write session (buffered WAL, deferred flush).
2
3use std::collections::BTreeSet;
4use std::io;
5
6use crate::infinitedb_core::{
7    address::{Address, DimensionVector, RevisionId, SpaceId},
8    block::Record,
9};
10use crate::infinitedb_storage::wal::{WalDurability, WalEntry, WalWriter};
11
12use super::super::InfiniteDb;
13
14/// Default WAL sync interval during bulk import (frames between fsyncs).
15pub const DEFAULT_BULK_SYNC_EVERY: usize = 4096;
16
17/// Default in-memory buffer size before auto-flush during bulk import.
18pub const DEFAULT_BULK_FLUSH_THRESHOLD: usize = 8192;
19
20/// Tuning for any bulk write session.
21#[derive(Debug, Clone)]
22pub struct BulkWriteOptions {
23    pub sync_every: usize,
24    pub flush_threshold: usize,
25}
26
27impl Default for BulkWriteOptions {
28    fn default() -> Self {
29        Self {
30            sync_every: DEFAULT_BULK_SYNC_EVERY,
31            flush_threshold: DEFAULT_BULK_FLUSH_THRESHOLD,
32        }
33    }
34}
35
36/// Statistics returned after a successful bulk session.
37#[derive(Debug, Clone, PartialEq, Eq)]
38pub struct BulkWriteResult {
39    pub count: usize,
40    pub wal_frames: usize,
41    pub first_revision: Option<RevisionId>,
42    pub last_revision: Option<RevisionId>,
43}
44
45/// Backward-compatible alias for hyperedge bulk results.
46pub type BulkImportResult = BulkWriteResult;
47
48/// Active bulk session state shared by record, hyperedge, and signal guards.
49pub struct BulkSessionCore<'a> {
50    pub(super) db: &'a mut InfiniteDb,
51    saved_durability: WalDurability,
52    saved_flush_threshold: usize,
53    pub(super) touched_spaces: BTreeSet<u64>,
54    pub(super) count: usize,
55    pub(super) wal_frames: usize,
56    pub(super) first_revision: Option<RevisionId>,
57    pub(super) last_revision: Option<RevisionId>,
58    finished: bool,
59    #[cfg(feature = "sync")]
60    pub(super) pending_sync: Vec<crate::infinitedb_sync::transport::SyncOperation>,
61}
62
63impl<'a> BulkSessionCore<'a> {
64    /// Start a bulk session. Only one session may be active per [`InfiniteDb`].
65    pub fn begin(db: &'a mut InfiniteDb, options: BulkWriteOptions) -> io::Result<Self> {
66        if db.bulk_session_active {
67            return Err(io::Error::new(
68                io::ErrorKind::AlreadyExists,
69                "a bulk write session is already active on this database",
70            ));
71        }
72        db.bulk_session_active = true;
73        let saved_durability = db.wal.durability();
74        let saved_flush_threshold = db.flush_threshold;
75        db.wal.set_durability(WalDurability::Buffered {
76            sync_every: options.sync_every.max(1),
77        });
78        db.flush_threshold = options.flush_threshold;
79        db.defer_auto_flush = true;
80        Ok(Self {
81            db,
82            saved_durability,
83            saved_flush_threshold,
84            touched_spaces: BTreeSet::new(),
85            count: 0,
86            wal_frames: 0,
87            first_revision: None,
88            last_revision: None,
89            finished: false,
90            #[cfg(feature = "sync")]
91            pending_sync: Vec::new(),
92        })
93    }
94
95    pub fn touch_space(&mut self, space: SpaceId) {
96        self.touched_spaces.insert(space.0);
97    }
98
99    pub fn record_operation(&mut self, rev: RevisionId, wal_frame_count: usize) {
100        self.count += 1;
101        self.wal_frames += wal_frame_count;
102        self.first_revision = Some(self.first_revision.unwrap_or(rev));
103        self.last_revision = Some(rev);
104    }
105
106    /// Append one live record using buffered WAL I/O.
107    pub fn push_write(
108        &mut self,
109        space: SpaceId,
110        point: DimensionVector,
111        data: Vec<u8>,
112    ) -> io::Result<RevisionId> {
113        let rev = self.db.next_revision();
114        let address = Address::new(space, point);
115        self.push_row_raw(
116            WalEntry::Write {
117                address: address.clone(),
118                revision: rev,
119                data: data.clone(),
120            },
121            Record {
122                address,
123                revision: rev,
124                data,
125                tombstone: false,
126            },
127        )?;
128        self.touch_space(space);
129        self.record_operation(rev, 1);
130        Ok(rev)
131    }
132
133    /// Append one tombstone using buffered WAL I/O.
134    pub fn push_tombstone(
135        &mut self,
136        space: SpaceId,
137        point: DimensionVector,
138    ) -> io::Result<RevisionId> {
139        let rev = self.db.next_revision();
140        let address = Address::new(space, point);
141        self.push_row_raw(
142            WalEntry::Tombstone {
143                address: address.clone(),
144                revision: rev,
145            },
146            Record {
147                address,
148                revision: rev,
149                data: vec![],
150                tombstone: true,
151            },
152        )?;
153        self.touch_space(space);
154        self.record_operation(rev, 1);
155        Ok(rev)
156    }
157
158    /// Append multiple WAL frames as one logical bulk operation.
159    pub fn push_rows(&mut self, rows: Vec<(WalEntry, Record)>) -> io::Result<RevisionId> {
160        if rows.is_empty() {
161            return Err(io::Error::new(
162                io::ErrorKind::InvalidInput,
163                "bulk push_rows requires at least one row",
164            ));
165        }
166        let rev = rows[0].1.revision;
167        let n = rows.len();
168        for (entry, record) in rows {
169            self.push_row_raw(entry, record)?;
170        }
171        self.record_operation(rev, n);
172        Ok(rev)
173    }
174
175    /// Append WAL frames without incrementing the logical operation counter.
176    pub fn push_rows_only(&mut self, rows: Vec<(WalEntry, Record)>) -> io::Result<RevisionId> {
177        if rows.is_empty() {
178            return Err(io::Error::new(
179                io::ErrorKind::InvalidInput,
180                "bulk push_rows_only requires at least one row",
181            ));
182        }
183        let rev = rows[0].1.revision;
184        for (entry, record) in rows {
185            self.push_row_raw(entry, record)?;
186        }
187        Ok(rev)
188    }
189
190    pub fn push_row_raw(&mut self, entry: WalEntry, record: Record) -> io::Result<()> {
191        self.db.wal.append_frame(&entry)?;
192        self.db.buffer.push(record);
193        Self::maybe_sync(self.db.wal_mut())
194    }
195
196    fn maybe_sync(wal: &mut WalWriter) -> io::Result<()> {
197        let every = wal.durability().sync_every();
198        if wal.pending_frames() >= every {
199            wal.sync()?;
200        }
201        Ok(())
202    }
203
204    #[cfg(feature = "sync")]
205    pub fn defer_sync(&mut self, op: crate::infinitedb_sync::transport::SyncOperation) {
206        self.pending_sync.push(op);
207    }
208
209    #[cfg(feature = "sync")]
210    fn flush_sync_ops(&mut self) -> io::Result<()> {
211        let ops = std::mem::take(&mut self.pending_sync);
212        for op in ops {
213            self.db.enqueue_sync(op)?;
214        }
215        Ok(())
216    }
217
218    pub fn result_snapshot(&self) -> BulkWriteResult {
219        BulkWriteResult {
220            count: self.count,
221            wal_frames: self.wal_frames,
222            first_revision: self.first_revision,
223            last_revision: self.last_revision,
224        }
225    }
226
227    pub fn finish(mut self) -> io::Result<BulkWriteResult> {
228        self.db.wal.sync()?;
229        for &space_id in self.touched_spaces.clone().iter() {
230            self.db.flush(SpaceId(space_id))?;
231        }
232        #[cfg(feature = "sync")]
233        self.flush_sync_ops()?;
234        self.restore_inner()?;
235        self.finished = true;
236        Ok(self.result_snapshot())
237    }
238
239    fn restore_inner(&mut self) -> io::Result<()> {
240        self.db.wal.set_durability(self.saved_durability);
241        self.db.flush_threshold = self.saved_flush_threshold;
242        self.db.defer_auto_flush = false;
243        self.db.bulk_session_active = false;
244        Ok(())
245    }
246}
247
248impl Drop for BulkSessionCore<'_> {
249    fn drop(&mut self) {
250        if !self.finished {
251            let _ = self.restore_inner();
252        }
253    }
254}
255
256#[cfg(test)]
257mod tests {
258    use super::*;
259    use tempfile::TempDir;
260
261    #[test]
262    fn second_bulk_session_returns_already_exists() {
263        let dir = TempDir::new().unwrap();
264        let mut db = InfiniteDb::open(dir.path()).unwrap();
265        db.bulk_session_active = true;
266        let second = BulkSessionCore::begin(&mut db, BulkWriteOptions::default());
267        assert!(matches!(
268            second,
269            Err(e) if e.kind() == io::ErrorKind::AlreadyExists
270        ));
271    }
272
273    #[test]
274    fn drop_without_finish_clears_active_flag() {
275        let dir = TempDir::new().unwrap();
276        let mut db = InfiniteDb::open(dir.path()).unwrap();
277        let session = BulkSessionCore::begin(&mut db, BulkWriteOptions::default()).unwrap();
278        drop(session);
279        assert!(!db.bulk_session_active);
280    }
281}
282
283impl InfiniteDb {
284    pub(crate) fn wal_mut(&mut self) -> &mut WalWriter {
285        &mut self.wal
286    }
287
288    /// Force the WAL buffer to disk (strict durability escape hatch).
289    pub fn sync_wal(&mut self) -> std::io::Result<()> {
290        self.wal.sync()
291    }
292
293    pub(crate) fn apply_prepared_writes_strict(
294        &mut self,
295        rows: Vec<(WalEntry, Record)>,
296    ) -> std::io::Result<RevisionId> {
297        if rows.is_empty() {
298            return Err(std::io::Error::new(
299                std::io::ErrorKind::InvalidInput,
300                "no rows to write",
301            ));
302        }
303        let rev = rows[0].1.revision;
304        for (entry, record) in rows {
305            self.wal.append(&entry)?;
306            self.buffer.push(record);
307        }
308        Ok(rev)
309    }
310}