Skip to main content

infinite_db/bulk/
hyperedge.rs

1//! Bulk hyperedge import and delete.
2
3use bincode::{config::standard, encode_to_vec};
4
5use crate::infinitedb_core::{
6    address::{Address, RevisionId, SpaceId},
7    block::Record,
8    endpoint_index::{
9        encode_hyperedge_id, endpoint_index_point, edge_endpoints, ENDPOINT_INDEX_SPACE,
10    },
11    hyperedge::{Hyperedge, HyperedgeId},
12};
13use crate::infinitedb_storage::wal::WalEntry;
14
15use super::session::{
16    BulkImportResult, BulkSessionCore, BulkWriteOptions, BulkWriteResult, DEFAULT_BULK_FLUSH_THRESHOLD,
17    DEFAULT_BULK_SYNC_EVERY,
18};
19use super::super::{hyperedge_point, locator_point, InfiniteDb, HYPEREDGE_LOCATOR_SPACE};
20
21/// Tuning for bulk hyperedge import.
22#[derive(Debug, Clone)]
23pub struct BulkHyperedgeImportOptions {
24    pub sync_every: usize,
25    pub flush_threshold: usize,
26    pub build_endpoint_index: bool,
27}
28
29impl Default for BulkHyperedgeImportOptions {
30    fn default() -> Self {
31        Self {
32            sync_every: DEFAULT_BULK_SYNC_EVERY,
33            flush_threshold: DEFAULT_BULK_FLUSH_THRESHOLD,
34            build_endpoint_index: true,
35        }
36    }
37}
38
39impl BulkHyperedgeImportOptions {
40    pub fn write_options(&self) -> BulkWriteOptions {
41        BulkWriteOptions {
42            sync_every: self.sync_every,
43            flush_threshold: self.flush_threshold,
44        }
45    }
46}
47
48/// Bulk hyperedge insert/delete session.
49///
50/// Only one bulk session per [`InfiniteDb`] at a time. Call [`Self::finish`] before drop.
51pub struct BulkHyperedgeImport<'a> {
52    session: BulkSessionCore<'a>,
53    space: SpaceId,
54    build_endpoint_index: bool,
55    pending_index_edges: Vec<Hyperedge>,
56}
57
58impl<'a> BulkHyperedgeImport<'a> {
59    pub fn space(&self) -> SpaceId {
60        self.space
61    }
62
63    pub fn push(&mut self, mut edge: Hyperedge) -> std::io::Result<RevisionId> {
64        edge.validate().map_err(|e| {
65            std::io::Error::new(std::io::ErrorKind::InvalidInput, format!("{:?}", e))
66        })?;
67        let rev = push_hyperedge_bulk_session(
68            &mut self.session,
69            self.space,
70            &mut edge,
71            self.build_endpoint_index,
72        )?;
73        if !self.build_endpoint_index {
74            self.pending_index_edges.push(edge);
75        }
76        Ok(rev)
77    }
78
79    pub fn push_delete(&mut self, id: HyperedgeId) -> std::io::Result<RevisionId> {
80        delete_hyperedge_bulk_session(&mut self.session, self.space, id)
81    }
82
83    pub fn build_endpoint_index(&mut self) -> std::io::Result<usize> {
84        let edges = std::mem::take(&mut self.pending_index_edges);
85        let mut index_rows = 0usize;
86        for edge in &edges {
87            index_rows += index_hyperedge_endpoints_session(&mut self.session, edge)?;
88            self.session.touch_space(ENDPOINT_INDEX_SPACE);
89        }
90        Ok(index_rows)
91    }
92
93    pub fn finish(mut self) -> std::io::Result<BulkImportResult> {
94        if !self.pending_index_edges.is_empty() {
95            self.build_endpoint_index()?;
96        }
97        self.session.finish()
98    }
99}
100
101fn push_hyperedge_bulk_session(
102    session: &mut BulkSessionCore<'_>,
103    space: SpaceId,
104    edge: &mut Hyperedge,
105    build_endpoint_index: bool,
106) -> std::io::Result<RevisionId> {
107    let db = &mut *session.db;
108    if build_endpoint_index {
109        db.ensure_endpoint_index_space()?;
110    }
111    let (_, is_centroid) = db.edge_storage_point(space, edge);
112    if is_centroid {
113        db.ensure_locator_space()?;
114    }
115    let rows = db.prepare_hyperedge_writes(space, edge, build_endpoint_index)?;
116    session.touch_space(space);
117    if is_centroid {
118        session.touch_space(HYPEREDGE_LOCATOR_SPACE);
119    }
120    if build_endpoint_index {
121        session.touch_space(ENDPOINT_INDEX_SPACE);
122    }
123    let rev = session.push_rows(rows)?;
124    edge.valid_from = rev;
125    #[cfg(feature = "sync")]
126    session.defer_sync(crate::infinitedb_sync::transport::SyncOperation::WriteHyperedge {
127        space,
128        edge: edge.clone(),
129        revision: rev,
130    });
131    Ok(rev)
132}
133
134fn delete_hyperedge_bulk_session(
135    session: &mut BulkSessionCore<'_>,
136    space: SpaceId,
137    id: HyperedgeId,
138) -> std::io::Result<RevisionId> {
139    let (rows, touch) = {
140        let db = &mut *session.db;
141        let edge = db.fetch_hyperedge_by_id(space, id, None)?;
142        let point = match &edge {
143            Some(e) => db.edge_storage_point(space, e).0,
144            None => hyperedge_point(id),
145        };
146        let mut rows = Vec::new();
147        let mut touch = vec![space];
148        if let Some(e) = &edge {
149            db.ensure_endpoint_index_space()?;
150            for ep in edge_endpoints(e) {
151                let idx_point = endpoint_index_point(ep, e.id);
152                let rev = db.next_revision();
153                let address = Address::new(ENDPOINT_INDEX_SPACE, idx_point);
154                rows.push((
155                    WalEntry::Tombstone {
156                        address: address.clone(),
157                        revision: rev,
158                    },
159                    Record {
160                        address,
161                        revision: rev,
162                        data: vec![],
163                        tombstone: true,
164                    },
165                ));
166            }
167            touch.push(ENDPOINT_INDEX_SPACE);
168        }
169        let main_rev = db.next_revision();
170        let main_addr = Address::new(space, point);
171        rows.push((
172            WalEntry::Tombstone {
173                address: main_addr.clone(),
174                revision: main_rev,
175            },
176            Record {
177                address: main_addr,
178                revision: main_rev,
179                data: vec![],
180                tombstone: true,
181            },
182        ));
183        if edge.is_some() && db.uses_centroid_keying(space) {
184            db.ensure_locator_space()?;
185            let loc_rev = db.next_revision();
186            let loc_point = locator_point(space, id);
187            let loc_addr = Address::new(HYPEREDGE_LOCATOR_SPACE, loc_point);
188            rows.push((
189                WalEntry::Tombstone {
190                    address: loc_addr.clone(),
191                    revision: loc_rev,
192                },
193                Record {
194                    address: loc_addr,
195                    revision: loc_rev,
196                    data: vec![],
197                    tombstone: true,
198                },
199            ));
200            touch.push(HYPEREDGE_LOCATOR_SPACE);
201        }
202        (rows, touch)
203    };
204    let n = rows.len();
205    let rev = session.push_rows_only(rows)?;
206    session.record_operation(rev, n);
207    for s in touch {
208        session.touch_space(s);
209    }
210    #[cfg(feature = "sync")]
211    session.defer_sync(crate::infinitedb_sync::transport::SyncOperation::DeleteHyperedge {
212        space,
213        edge_id: id,
214        revision: rev,
215    });
216    Ok(rev)
217}
218
219fn index_hyperedge_endpoints_session(
220    session: &mut BulkSessionCore<'_>,
221    edge: &Hyperedge,
222) -> std::io::Result<usize> {
223    let db = &mut *session.db;
224    db.ensure_endpoint_index_space()?;
225    let mut rows = Vec::new();
226    for ep in edge_endpoints(edge) {
227        let idx_point = endpoint_index_point(ep, edge.id);
228        let idx_data = encode_hyperedge_id(edge.id);
229        let idx_addr = Address::new(ENDPOINT_INDEX_SPACE, idx_point);
230        let idx_rev = db.next_revision();
231        rows.push((
232            WalEntry::Write {
233                address: idx_addr.clone(),
234                revision: idx_rev,
235                data: idx_data.clone(),
236            },
237            Record {
238                address: idx_addr,
239                revision: idx_rev,
240                data: idx_data,
241                tombstone: false,
242            },
243        ));
244    }
245    if rows.is_empty() {
246        return Ok(0);
247    }
248    let n = rows.len();
249    session.push_rows(rows)?;
250    Ok(n)
251}
252
253impl InfiniteDb {
254    pub fn begin_hyperedge_import(
255        &mut self,
256        space: SpaceId,
257    ) -> std::io::Result<BulkHyperedgeImport<'_>> {
258        self.begin_hyperedge_import_with_options(space, BulkHyperedgeImportOptions::default())
259    }
260
261    pub fn begin_hyperedge_import_with_options(
262        &mut self,
263        space: SpaceId,
264        options: BulkHyperedgeImportOptions,
265    ) -> std::io::Result<BulkHyperedgeImport<'_>> {
266        let build_endpoint_index = options.build_endpoint_index;
267        let session = BulkSessionCore::begin(self, options.write_options())?;
268        Ok(BulkHyperedgeImport {
269            session,
270            space,
271            build_endpoint_index,
272            pending_index_edges: Vec::new(),
273        })
274    }
275
276    pub fn insert_hyperedges_bulk<I>(
277        &mut self,
278        space: SpaceId,
279        edges: I,
280    ) -> std::io::Result<BulkWriteResult>
281    where
282        I: IntoIterator<Item = Hyperedge>,
283    {
284        let mut import = self.begin_hyperedge_import(space)?;
285        for edge in edges {
286            import.push(edge)?;
287        }
288        import.finish()
289    }
290
291    pub fn insert_hyperedges_bulk_with_options<I>(
292        &mut self,
293        space: SpaceId,
294        edges: I,
295        options: BulkHyperedgeImportOptions,
296    ) -> std::io::Result<BulkWriteResult>
297    where
298        I: IntoIterator<Item = Hyperedge>,
299    {
300        let mut import = self.begin_hyperedge_import_with_options(space, options)?;
301        for edge in edges {
302            import.push(edge)?;
303        }
304        import.finish()
305    }
306
307    pub fn delete_hyperedges_bulk<I>(
308        &mut self,
309        space: SpaceId,
310        ids: I,
311    ) -> std::io::Result<BulkWriteResult>
312    where
313        I: IntoIterator<Item = HyperedgeId>,
314    {
315        let mut import = self.begin_hyperedge_import(space)?;
316        for id in ids {
317            import.push_delete(id)?;
318        }
319        import.finish()
320    }
321
322    pub(crate) fn prepare_hyperedge_writes(
323        &self,
324        space: SpaceId,
325        edge: &Hyperedge,
326        build_endpoint_index: bool,
327    ) -> std::io::Result<Vec<(WalEntry, Record)>> {
328        let (point, is_centroid) = self.edge_storage_point(space, edge);
329        let data = encode_to_vec(edge, standard())
330            .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
331        let rev = self.next_revision();
332        let address = Address::new(space, point.clone());
333        let mut rows = vec![(
334            WalEntry::Write {
335                address: address.clone(),
336                revision: rev,
337                data: data.clone(),
338            },
339            Record {
340                address,
341                revision: rev,
342                data,
343                tombstone: false,
344            },
345        )];
346
347        if is_centroid {
348            let locator_data = encode_to_vec(&point, standard())
349                .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?;
350            let loc_point = locator_point(space, edge.id);
351            let loc_addr = Address::new(HYPEREDGE_LOCATOR_SPACE, loc_point);
352            let loc_rev = self.next_revision();
353            rows.push((
354                WalEntry::Write {
355                    address: loc_addr.clone(),
356                    revision: loc_rev,
357                    data: locator_data.clone(),
358                },
359                Record {
360                    address: loc_addr,
361                    revision: loc_rev,
362                    data: locator_data,
363                    tombstone: false,
364                },
365            ));
366        }
367
368        if build_endpoint_index {
369            for ep in edge_endpoints(edge) {
370                let idx_point = endpoint_index_point(ep, edge.id);
371                let idx_data = encode_hyperedge_id(edge.id);
372                let idx_addr = Address::new(ENDPOINT_INDEX_SPACE, idx_point);
373                let idx_rev = self.next_revision();
374                rows.push((
375                    WalEntry::Write {
376                        address: idx_addr.clone(),
377                        revision: idx_rev,
378                        data: idx_data.clone(),
379                    },
380                    Record {
381                        address: idx_addr,
382                        revision: idx_rev,
383                        data: idx_data,
384                        tombstone: false,
385                    },
386                ));
387            }
388        }
389
390        Ok(rows)
391    }
392
393}