Skip to main content

infinite_db/bulk/
record.rs

1//! Bulk record import for translation-scale loads.
2
3use crate::infinitedb_core::address::{DimensionVector, RevisionId, SpaceId};
4
5use super::session::{BulkSessionCore, BulkWriteOptions, BulkWriteResult};
6use super::super::InfiniteDb;
7
8/// Bulk insert/delete session for raw records in one space.
9///
10/// Only one bulk session may be active per [`InfiniteDb`] at a time.
11/// Parallelize parsing upstream; use a single thread for `push`.
12pub struct BulkRecordImport<'a> {
13    session: BulkSessionCore<'a>,
14    space: SpaceId,
15}
16
17impl<'a> BulkRecordImport<'a> {
18    pub fn space(&self) -> SpaceId {
19        self.space
20    }
21
22    pub fn push(&mut self, point: DimensionVector, data: Vec<u8>) -> std::io::Result<RevisionId> {
23        self.session.push_write(self.space, point, data)
24    }
25
26    pub fn push_delete(&mut self, point: DimensionVector) -> std::io::Result<RevisionId> {
27        self.session.push_tombstone(self.space, point)
28    }
29
30    pub fn finish(self) -> std::io::Result<BulkWriteResult> {
31        self.session.finish()
32    }
33}
34
35impl InfiniteDb {
36    /// Begin a buffered bulk record import into `space`.
37    pub fn begin_record_import(&mut self, space: SpaceId) -> std::io::Result<BulkRecordImport<'_>> {
38        self.begin_record_import_with_options(space, BulkWriteOptions::default())
39    }
40
41    /// Begin bulk record import with explicit WAL/flush tuning.
42    pub fn begin_record_import_with_options(
43        &mut self,
44        space: SpaceId,
45        options: BulkWriteOptions,
46    ) -> std::io::Result<BulkRecordImport<'_>> {
47        Ok(BulkRecordImport {
48            session: BulkSessionCore::begin(self, options)?,
49            space,
50        })
51    }
52
53    /// Import many records in one bulk session.
54    pub fn insert_records_bulk<I>(
55        &mut self,
56        space: SpaceId,
57        rows: I,
58    ) -> std::io::Result<BulkWriteResult>
59    where
60        I: IntoIterator<Item = (DimensionVector, Vec<u8>)>,
61    {
62        let mut import = self.begin_record_import(space)?;
63        for (point, data) in rows {
64            import.push(point, data)?;
65        }
66        import.finish()
67    }
68
69    /// Tombstone many record addresses in one bulk session.
70    pub fn delete_records_bulk<I>(
71        &mut self,
72        space: SpaceId,
73        points: I,
74    ) -> std::io::Result<BulkWriteResult>
75    where
76        I: IntoIterator<Item = DimensionVector>,
77    {
78        let mut import = self.begin_record_import(space)?;
79        for point in points {
80            import.push_delete(point)?;
81        }
82        import.finish()
83    }
84}