1use bincode::{config::standard, encode_to_vec};
4
5use crate::infinitedb_core::{
6 address::{Address, RevisionId, SpaceId},
7 block::Record,
8 endpoint_index::{
9 encode_hyperedge_id, endpoint_index_point, edge_endpoints, ENDPOINT_INDEX_SPACE,
10 },
11 hyperedge::{Hyperedge, HyperedgeId},
12};
13use crate::infinitedb_storage::wal::WalEntry;
14
15use super::session::{
16 BulkImportResult, BulkSessionCore, BulkWriteOptions, BulkWriteResult, DEFAULT_BULK_FLUSH_THRESHOLD,
17 DEFAULT_BULK_SYNC_EVERY,
18};
19use super::super::{hyperedge_point, locator_point, InfiniteDb, HYPEREDGE_LOCATOR_SPACE};
20
21#[derive(Debug, Clone)]
23pub struct BulkHyperedgeImportOptions {
24 pub sync_every: usize,
25 pub flush_threshold: usize,
26 pub build_endpoint_index: bool,
27}
28
29impl Default for BulkHyperedgeImportOptions {
30 fn default() -> Self {
31 Self {
32 sync_every: DEFAULT_BULK_SYNC_EVERY,
33 flush_threshold: DEFAULT_BULK_FLUSH_THRESHOLD,
34 build_endpoint_index: true,
35 }
36 }
37}
38
39impl BulkHyperedgeImportOptions {
40 pub fn write_options(&self) -> BulkWriteOptions {
41 BulkWriteOptions {
42 sync_every: self.sync_every,
43 flush_threshold: self.flush_threshold,
44 }
45 }
46}
47
48pub struct BulkHyperedgeImport<'a> {
52 session: BulkSessionCore<'a>,
53 space: SpaceId,
54 build_endpoint_index: bool,
55 pending_index_edges: Vec<Hyperedge>,
56}
57
58impl<'a> BulkHyperedgeImport<'a> {
59 pub fn space(&self) -> SpaceId {
60 self.space
61 }
62
63 pub fn push(&mut self, mut edge: Hyperedge) -> std::io::Result<RevisionId> {
64 edge.validate().map_err(|e| {
65 std::io::Error::new(std::io::ErrorKind::InvalidInput, format!("{:?}", e))
66 })?;
67 let rev = push_hyperedge_bulk_session(
68 &mut self.session,
69 self.space,
70 &mut edge,
71 self.build_endpoint_index,
72 )?;
73 if !self.build_endpoint_index {
74 self.pending_index_edges.push(edge);
75 }
76 Ok(rev)
77 }
78
79 pub fn push_delete(&mut self, id: HyperedgeId) -> std::io::Result<RevisionId> {
80 delete_hyperedge_bulk_session(&mut self.session, self.space, id)
81 }
82
83 pub fn build_endpoint_index(&mut self) -> std::io::Result<usize> {
84 let edges = std::mem::take(&mut self.pending_index_edges);
85 let mut index_rows = 0usize;
86 for edge in &edges {
87 index_rows += index_hyperedge_endpoints_session(&mut self.session, edge)?;
88 self.session.touch_space(ENDPOINT_INDEX_SPACE);
89 }
90 Ok(index_rows)
91 }
92
93 pub fn finish(mut self) -> std::io::Result<BulkImportResult> {
94 if !self.pending_index_edges.is_empty() {
95 self.build_endpoint_index()?;
96 }
97 self.session.finish()
98 }
99}
100
101fn push_hyperedge_bulk_session(
102 session: &mut BulkSessionCore<'_>,
103 space: SpaceId,
104 edge: &mut Hyperedge,
105 build_endpoint_index: bool,
106) -> std::io::Result<RevisionId> {
107 let db = &mut *session.db;
108 if build_endpoint_index {
109 db.ensure_endpoint_index_space()?;
110 }
111 let (_, is_centroid) = db.edge_storage_point(space, edge);
112 if is_centroid {
113 db.ensure_locator_space()?;
114 }
115 let rows = db.prepare_hyperedge_writes(space, edge, build_endpoint_index)?;
116 session.touch_space(space);
117 if is_centroid {
118 session.touch_space(HYPEREDGE_LOCATOR_SPACE);
119 }
120 if build_endpoint_index {
121 session.touch_space(ENDPOINT_INDEX_SPACE);
122 }
123 let rev = session.push_rows(rows)?;
124 edge.valid_from = rev;
125 #[cfg(feature = "sync")]
126 session.defer_sync(crate::infinitedb_sync::transport::SyncOperation::WriteHyperedge {
127 space,
128 edge: edge.clone(),
129 revision: rev,
130 });
131 Ok(rev)
132}
133
134fn delete_hyperedge_bulk_session(
135 session: &mut BulkSessionCore<'_>,
136 space: SpaceId,
137 id: HyperedgeId,
138) -> std::io::Result<RevisionId> {
139 let (rows, touch) = {
140 let db = &mut *session.db;
141 let edge = db.fetch_hyperedge_by_id(space, id, None)?;
142 let point = match &edge {
143 Some(e) => db.edge_storage_point(space, e).0,
144 None => hyperedge_point(id),
145 };
146 let mut rows = Vec::new();
147 let mut touch = vec![space];
148 if let Some(e) = &edge {
149 db.ensure_endpoint_index_space()?;
150 for ep in edge_endpoints(e) {
151 let idx_point = endpoint_index_point(ep, e.id);
152 let rev = db.next_revision();
153 let address = Address::new(ENDPOINT_INDEX_SPACE, idx_point);
154 rows.push((
155 WalEntry::Tombstone {
156 address: address.clone(),
157 revision: rev,
158 },
159 Record {
160 address,
161 revision: rev,
162 data: vec![],
163 tombstone: true,
164 },
165 ));
166 }
167 touch.push(ENDPOINT_INDEX_SPACE);
168 }
169 let main_rev = db.next_revision();
170 let main_addr = Address::new(space, point);
171 rows.push((
172 WalEntry::Tombstone {
173 address: main_addr.clone(),
174 revision: main_rev,
175 },
176 Record {
177 address: main_addr,
178 revision: main_rev,
179 data: vec![],
180 tombstone: true,
181 },
182 ));
183 if edge.is_some() && db.uses_centroid_keying(space) {
184 db.ensure_locator_space()?;
185 let loc_rev = db.next_revision();
186 let loc_point = locator_point(space, id);
187 let loc_addr = Address::new(HYPEREDGE_LOCATOR_SPACE, loc_point);
188 rows.push((
189 WalEntry::Tombstone {
190 address: loc_addr.clone(),
191 revision: loc_rev,
192 },
193 Record {
194 address: loc_addr,
195 revision: loc_rev,
196 data: vec![],
197 tombstone: true,
198 },
199 ));
200 touch.push(HYPEREDGE_LOCATOR_SPACE);
201 }
202 (rows, touch)
203 };
204 let n = rows.len();
205 let rev = session.push_rows_only(rows)?;
206 session.record_operation(rev, n);
207 for s in touch {
208 session.touch_space(s);
209 }
210 #[cfg(feature = "sync")]
211 session.defer_sync(crate::infinitedb_sync::transport::SyncOperation::DeleteHyperedge {
212 space,
213 edge_id: id,
214 revision: rev,
215 });
216 Ok(rev)
217}
218
219fn index_hyperedge_endpoints_session(
220 session: &mut BulkSessionCore<'_>,
221 edge: &Hyperedge,
222) -> std::io::Result<usize> {
223 let db = &mut *session.db;
224 db.ensure_endpoint_index_space()?;
225 let mut rows = Vec::new();
226 for ep in edge_endpoints(edge) {
227 let idx_point = endpoint_index_point(ep, edge.id);
228 let idx_data = encode_hyperedge_id(edge.id);
229 let idx_addr = Address::new(ENDPOINT_INDEX_SPACE, idx_point);
230 let idx_rev = db.next_revision();
231 rows.push((
232 WalEntry::Write {
233 address: idx_addr.clone(),
234 revision: idx_rev,
235 data: idx_data.clone(),
236 },
237 Record {
238 address: idx_addr,
239 revision: idx_rev,
240 data: idx_data,
241 tombstone: false,
242 },
243 ));
244 }
245 if rows.is_empty() {
246 return Ok(0);
247 }
248 let n = rows.len();
249 session.push_rows(rows)?;
250 Ok(n)
251}
252
253impl InfiniteDb {
254 pub fn begin_hyperedge_import(
255 &mut self,
256 space: SpaceId,
257 ) -> std::io::Result<BulkHyperedgeImport<'_>> {
258 self.begin_hyperedge_import_with_options(space, BulkHyperedgeImportOptions::default())
259 }
260
261 pub fn begin_hyperedge_import_with_options(
262 &mut self,
263 space: SpaceId,
264 options: BulkHyperedgeImportOptions,
265 ) -> std::io::Result<BulkHyperedgeImport<'_>> {
266 let build_endpoint_index = options.build_endpoint_index;
267 let session = BulkSessionCore::begin(self, options.write_options())?;
268 Ok(BulkHyperedgeImport {
269 session,
270 space,
271 build_endpoint_index,
272 pending_index_edges: Vec::new(),
273 })
274 }
275
276 pub fn insert_hyperedges_bulk<I>(
277 &mut self,
278 space: SpaceId,
279 edges: I,
280 ) -> std::io::Result<BulkWriteResult>
281 where
282 I: IntoIterator<Item = Hyperedge>,
283 {
284 let mut import = self.begin_hyperedge_import(space)?;
285 for edge in edges {
286 import.push(edge)?;
287 }
288 import.finish()
289 }
290
291 pub fn insert_hyperedges_bulk_with_options<I>(
292 &mut self,
293 space: SpaceId,
294 edges: I,
295 options: BulkHyperedgeImportOptions,
296 ) -> std::io::Result<BulkWriteResult>
297 where
298 I: IntoIterator<Item = Hyperedge>,
299 {
300 let mut import = self.begin_hyperedge_import_with_options(space, options)?;
301 for edge in edges {
302 import.push(edge)?;
303 }
304 import.finish()
305 }
306
307 pub fn delete_hyperedges_bulk<I>(
308 &mut self,
309 space: SpaceId,
310 ids: I,
311 ) -> std::io::Result<BulkWriteResult>
312 where
313 I: IntoIterator<Item = HyperedgeId>,
314 {
315 let mut import = self.begin_hyperedge_import(space)?;
316 for id in ids {
317 import.push_delete(id)?;
318 }
319 import.finish()
320 }
321
322 pub(crate) fn prepare_hyperedge_writes(
323 &self,
324 space: SpaceId,
325 edge: &Hyperedge,
326 build_endpoint_index: bool,
327 ) -> std::io::Result<Vec<(WalEntry, Record)>> {
328 let (point, is_centroid) = self.edge_storage_point(space, edge);
329 let data = encode_to_vec(edge, standard())
330 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
331 let rev = self.next_revision();
332 let address = Address::new(space, point.clone());
333 let mut rows = vec![(
334 WalEntry::Write {
335 address: address.clone(),
336 revision: rev,
337 data: data.clone(),
338 },
339 Record {
340 address,
341 revision: rev,
342 data,
343 tombstone: false,
344 },
345 )];
346
347 if is_centroid {
348 let locator_data = encode_to_vec(&point, standard())
349 .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?;
350 let loc_point = locator_point(space, edge.id);
351 let loc_addr = Address::new(HYPEREDGE_LOCATOR_SPACE, loc_point);
352 let loc_rev = self.next_revision();
353 rows.push((
354 WalEntry::Write {
355 address: loc_addr.clone(),
356 revision: loc_rev,
357 data: locator_data.clone(),
358 },
359 Record {
360 address: loc_addr,
361 revision: loc_rev,
362 data: locator_data,
363 tombstone: false,
364 },
365 ));
366 }
367
368 if build_endpoint_index {
369 for ep in edge_endpoints(edge) {
370 let idx_point = endpoint_index_point(ep, edge.id);
371 let idx_data = encode_hyperedge_id(edge.id);
372 let idx_addr = Address::new(ENDPOINT_INDEX_SPACE, idx_point);
373 let idx_rev = self.next_revision();
374 rows.push((
375 WalEntry::Write {
376 address: idx_addr.clone(),
377 revision: idx_rev,
378 data: idx_data.clone(),
379 },
380 Record {
381 address: idx_addr,
382 revision: idx_rev,
383 data: idx_data,
384 tombstone: false,
385 },
386 ));
387 }
388 }
389
390 Ok(rows)
391 }
392
393}