1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
use std::any::TypeId;
use std::path::PathBuf;
use heed::types::DecodeIgnore;
use heed::{PutFlags, RoTxn, RwTxn};
use rand::{Rng, SeedableRng};
use roaring::RoaringBitmap;
use steppe::NoProgress;
use tracing::{debug, error};
use crate::distance::Distance;
use crate::hnsw::HnswBuilder;
use crate::internals::KeyCodec;
use crate::item_iter::ItemIter;
use crate::node::{Item, ItemIds, NodeCodec};
use crate::progress::HannoyBuild;
use crate::reader::get_item;
use crate::unaligned_vector::UnalignedVector;
use crate::update_status::{UpdateStatus, UpdateStatusCodec};
use crate::version::{Version, VersionCodec};
use crate::{
Database, Error, ItemId, Key, Metadata, MetadataCodec, Node, Prefix, PrefixCodec, Result,
CANCELLATION_PROBING,
};
/// The options available when configuring the hannoy database.
pub struct HannoyBuilder<'a, D: Distance, R: Rng + SeedableRng, P> {
writer: &'a Writer<D>,
rng: &'a mut R,
inner: BuildOption<'a, P>,
}
/// The options available when building the hannoy database.
pub(crate) struct BuildOption<'a, P> {
pub(crate) ef_construction: usize,
pub(crate) alpha: f32,
pub(crate) available_memory: Option<usize>,
pub(crate) cancel: Box<dyn Fn() -> bool + 'a + Sync + Send>,
pub(crate) progress: P,
/// An optimization that allows for faster relinking of all items.
///
/// Avoids marking all the items as "updated" and
/// let the rebuild function take them all.
pub(crate) relink_all_items: bool,
}
impl Default for BuildOption<'_, NoProgress> {
fn default() -> Self {
Self {
ef_construction: 100,
alpha: 1.0,
available_memory: None,
cancel: Box::new(|| false),
progress: NoProgress,
relink_all_items: false,
}
}
}
impl<'a, D: Distance, R: Rng + SeedableRng, P> HannoyBuilder<'a, D, R, P> {
// NOTE: unused in hannoy
// pub fn available_memory(&mut self, memory: usize) -> &mut Self {
// self.inner.available_memory = Some(memory);
// self
// }
/// Provides a closure that can cancel the indexing process early if needed.
/// There is no guarantee on when the process is going to cancel itself, but
/// hannoy will try to stop as soon as possible once the closure returns `true`.
///
/// Since the closure is not mutable and will be called from multiple threads
/// at the same time it's encouraged to make it quick to execute. A common
/// way to use it is to fetch an `AtomicBool` inside it that can be set
/// from another thread without lock.
///
/// # Example
///
/// ```no_run
/// # use hannoy::{Writer, distances::Euclidean};
/// # let (writer, wtxn): (Writer<Euclidean>, heed::RwTxn) = todo!();
/// use rand::rngs::StdRng;
/// use rand::SeedableRng;
/// use std::sync::atomic::{AtomicBool, Ordering};
///
/// let stops_after = AtomicBool::new(false);
///
/// // Cancel the task after one minute
/// std::thread::spawn(|| {
/// let one_minute = std::time::Duration::from_secs(60);
/// std::thread::sleep(one_minute);
/// stops_after.store(true, Ordering::Relaxed);
/// });
///
/// let mut rng = StdRng::seed_from_u64(92);
/// writer.builder(&mut rng).cancel(|| stops_after.load(Ordering::Relaxed)).build::<16,32>(&mut wtxn);
/// ```
pub fn cancel(&mut self, cancel: impl Fn() -> bool + 'a + Sync + Send) -> &mut Self {
self.inner.cancel = Box::new(cancel);
self
}
/// The provided object handles reporting build steps.
///
/// # Example
///
/// ```no_run
/// # use hannoy::{Writer, distances::Euclidean};
/// # let (writer, wtxn): (Writer<Euclidean>, heed::RwTxn) = todo!();
/// use rand::rngs::StdRng;
/// use rand::SeedableRng;
/// use std::sync::atomic::{AtomicBool, Ordering};
/// use steppe::NoProgress;
///
/// let mut rng = StdRng::seed_from_u64(4729);
/// writer.builder(&mut rng).progress(NoProgress).build::<16,32>(&mut wtxn);
/// ```
pub fn progress<NP: steppe::Progress>(self, progress: NP) -> HannoyBuilder<'a, D, R, NP> {
let HannoyBuilder {
writer,
rng,
inner:
BuildOption {
ef_construction,
available_memory,
cancel,
progress: _,
alpha,
relink_all_items,
},
} = self;
HannoyBuilder {
writer,
rng,
inner: BuildOption {
ef_construction,
available_memory,
cancel,
progress,
alpha,
relink_all_items,
},
}
}
/// Controls the search range when inserting a new item into the graph. This value must be
/// greater than or equal to the `M` used in [`Self::build<M,M0>`]
///
/// Typical values range from 50 to 500, with larger `ef_construction` producing higher
/// quality hnsw graphs at the expense of longer builds. The default value used in hannoy is
/// 100.
///
/// # Example
///
/// ```no_run
/// # use hannoy::{Writer, distances::Euclidean};
/// # let (writer, wtxn): (Writer<Euclidean>, heed::RwTxn) = todo!();
/// use rand::rngs::StdRng;
/// use rand::SeedableRng;
///
/// let mut rng = StdRng::seed_from_u64(4729);
/// writer.builder(&mut rng).ef_construction(100).build::<16,32>(&mut wtxn);
/// ```
pub fn ef_construction(&mut self, ef_construction: usize) -> &mut Self {
self.inner.ef_construction = ef_construction;
self
}
/// Tunable hyperparameter for the graph building process. Alpha decreases the tolerance for
/// link creation during index time. Alpha = 1 is the normal HNSW build while alpha > 1 is
/// more similar to DiskANN. Increasing alpha increases indexing times as more neighbours are
/// considered per linking step, but results in higher recall.
///
/// DiskANN authors suggest using alpha=1.1 or alpha=1.2. By default alpha=1.0 in hannoy.
///
/// # Example
///
/// ```no_run
/// # use hannoy::{Writer, distances::Euclidean};
/// # let (writer, wtxn): (Writer<Euclidean>, heed::RwTxn) = todo!();
/// use rand::rngs::StdRng;
/// use rand::SeedableRng;
///
/// let mut rng = StdRng::seed_from_u64(4729);
/// writer.builder(&mut rng).alpha(1.1).build::<16,32>(&mut wtxn);
/// ```
pub fn alpha(&mut self, alpha: f32) -> &mut Self {
self.inner.alpha = alpha;
self
}
/// Generates an HNSW graph with max `M` links per node in layers > 0 and max `M0` links in layer 0.
///
/// A general rule of thumb is to take `M0`= 2*`M`, with `M` >=3. Some common choices for
/// `M` include : 8, 12, 16, 32. Note that increasing `M` produces a denser graph at the cost
/// of longer build times.
///
/// This function is using rayon to spawn threads. It can be configured by using the
/// [`rayon::ThreadPoolBuilder`].
///
/// # Example
///
/// ```no_run
/// # use hannoy::{Writer, distances::Euclidean};
/// # let (writer, wtxn): (Writer<Euclidean>, heed::RwTxn) = todo!();
/// use rayon;
/// use rand::rngs::StdRng;
/// use rand::SeedableRng;
///
/// // configure global threadpool if you want!
/// rayon::ThreadPoolBuilder::new().num_threads(4).build_global().unwrap();
///
/// let mut rng = StdRng::seed_from_u64(4729);
/// writer.builder(&mut rng).build::<16,32>(&mut wtxn);
/// ```
pub fn build<const M: usize, const M0: usize>(&mut self, wtxn: &mut RwTxn) -> Result<()>
where
P: steppe::Progress,
{
self.writer.build::<R, P, M, M0>(wtxn, self.rng, &self.inner)
}
/// Rebuilds an HNSW graph from scratch.
///
/// Assumes you've previously built one or more times. This function will drop all graph edges
/// from previous builds and reconstruct the hnsw with the vectors found in the db.
///
/// Standard builds work by first adding or deleting some nodes, here we're marking all
/// vectors found on disk as updated to force a rebuild. When in doubt prefer [`Self::build<M,M0>`] over
/// this method.
///
/// # Example
///
/// ```no_run
/// # use hannoy::{Writer, distances::Euclidean};
/// # let (writer, wtxn): (Writer<Euclidean>, heed::RwTxn) = todo!();
/// use rayon;
/// use rand::rngs::StdRng;
/// use rand::SeedableRng;
///
/// // configure global threadpool if you want!
/// rayon::ThreadPoolBuilder::new().num_threads(4).build_global().unwrap();
///
/// let mut rng = StdRng::seed_from_u64(4729);
/// writer.builder(&mut rng).force_rebuild::<16,32>(&mut wtxn);
/// ```
pub fn force_rebuild<const M: usize, const M0: usize>(&mut self, wtxn: &mut RwTxn) -> Result<()>
where
P: steppe::Progress,
{
// Use this option to mark all nodes as updated
self.inner.relink_all_items = true;
self.writer.force_rebuild::<R, P, M, M0>(wtxn, self.rng, &self.inner)?;
// As this builder can be reused, we need to reset this parameter
self.inner.relink_all_items = false;
Ok(())
}
/// Converts an arroy db into a hannoy one.
#[cfg(any(test, feature = "arroy"))]
#[cfg_attr(docsrs, doc(cfg(feature = "arroy")))]
pub fn prepare_arroy_conversion(&self, wtxn: &mut RwTxn) -> Result<()>
where
P: steppe::Progress,
{
self.writer.prepare_arroy_conversion(wtxn, &self.inner)
}
}
/// A writer to store new items, remove existing ones, and build the search
/// index to query the nearest neighbors to items or vectors.
#[derive(Debug)]
pub struct Writer<D: Distance> {
database: Database<D>,
index: u16,
dimensions: usize,
/// The folder in which tempfile will write its temporary files.
tmpdir: Option<PathBuf>,
}
impl<D: Distance> Writer<D> {
/// Creates a new writer from a database, index and dimensions.
pub fn new(database: Database<D>, index: u16, dimensions: usize) -> Writer<D> {
Writer { database, index, dimensions, tmpdir: None }
}
/// After opening an arroy database this function will prepare it for conversion,
/// cleanup the arroy database and only keep the items/vectors entries.
#[cfg(any(test, feature = "arroy"))]
pub(crate) fn prepare_arroy_conversion<P: steppe::Progress>(
&self,
wtxn: &mut RwTxn,
options: &BuildOption<P>,
) -> Result<()> {
use crate::node_id::{NodeId, NodeMode};
use crate::unaligned_vector::UnalignedVectorCodec;
debug!("Preparing dumpless upgrade from arroy to hannoy");
options.progress.update(HannoyBuild::ConvertingArroyToHannoy);
let mut iter = self
.database
.remap_key_type::<PrefixCodec>()
.prefix_iter_mut(wtxn, &Prefix::all(self.index))?
.remap_key_type::<KeyCodec>();
// binary quantized have len vec.len().div_ceil(64)*64 >= vec.len()
let word_size = <D::VectorCodec as UnalignedVectorCodec>::word_size();
let on_disk_dim: usize = self.dimensions.div_ceil(word_size) * word_size;
let mut new_items = RoaringBitmap::new();
while let Some(result) = iter.next() {
match result {
Ok((
Key { index: _, node: NodeId { mode: NodeMode::Item, item, .. } },
Node::Item(Item { header: _, vector }),
)) => {
// We only take care of the entries that can be decoded as Node Items (vectors) and
// mark them as newly inserted so the Writer::build method can compute the links for them.
new_items.insert(item);
if vector.len() != on_disk_dim {
return Err(Error::InvalidVecDimension {
expected: on_disk_dim,
received: vector.len(),
});
}
}
Ok((Key { .. }, _)) | Err(heed::Error::Decoding(_)) => unsafe {
// Every other entry that fails to decode can be considered as something
// else than an item, is useless for the conversion and is deleted.
// SAFETY: Safe because we don't keep any references to the entry
iter.del_current()?;
},
// If there is another error (lmdb...), it is returned.
Err(e) => return Err(e.into()),
}
}
drop(iter);
// We mark all the items as updated so
// the Writer::build method can handle them.
for item in new_items {
self.database.remap_data_type::<UpdateStatusCodec>().put(
wtxn,
&Key::updated(self.index, item),
&UpdateStatus::Updated,
)?;
}
Ok(())
}
/// Returns a writer after having deleted the tree nodes and rewrote all the items
/// for the new [`Distance`] format to be able to modify items safely.
pub fn prepare_changing_distance<ND: Distance>(self, wtxn: &mut RwTxn) -> Result<Writer<ND>> {
if TypeId::of::<ND>() != TypeId::of::<D>() {
// If we are moving from a distance to the same but binary quantized
// distance we do not need to clear links, otherwise we do.
if ND::name()
.strip_prefix("binary quantized ")
.is_none_or(|raw_name| raw_name != D::name())
{
clear_links(wtxn, self.database, self.index)?;
self.database.delete(wtxn, &Key::metadata(self.index))?;
}
let mut cursor = self
.database
.remap_key_type::<PrefixCodec>()
.prefix_iter_mut(wtxn, &Prefix::item(self.index))?
.remap_key_type::<KeyCodec>();
let mut updated_items = RoaringBitmap::new();
while let Some((item_id, node)) = cursor.next().transpose()? {
match node {
Node::Item(Item { header: _, vector }) => {
updated_items.insert(item_id.node.item);
let vector = vector.to_vec();
let vector = UnalignedVector::from_vec(vector);
let new_leaf = Node::Item(Item { header: ND::new_header(&vector), vector });
unsafe {
// safety: We do not keep a reference to the current value, we own it.
cursor.put_current_with_options::<NodeCodec<ND>>(
PutFlags::empty(),
&item_id,
&new_leaf,
)?
};
}
Node::Links(_) => unreachable!("Node must not be a link"),
}
}
drop(cursor);
for item in updated_items {
self.database.remap_types::<KeyCodec, UpdateStatusCodec>().put(
wtxn,
&Key::updated(self.index, item),
&UpdateStatus::Updated,
)?;
}
}
let Writer { database, index, dimensions, tmpdir } = self;
Ok(Writer { database: database.remap_data_type(), index, dimensions, tmpdir })
}
/// Sets the path to the temporary directory where files are written.
pub fn set_tmpdir(&mut self, path: impl Into<PathBuf>) {
self.tmpdir = Some(path.into());
}
/// Returns `true` if the index is empty.
pub fn is_empty(&self, rtxn: &RoTxn) -> Result<bool> {
self.iter(rtxn).map(|mut iter| iter.next().is_none())
}
/// Returns `true` if the index needs to be built before being able to read in it.
pub fn need_build(&self, rtxn: &RoTxn) -> Result<bool> {
Ok(self
.database
.remap_types::<PrefixCodec, DecodeIgnore>()
.prefix_iter(rtxn, &Prefix::updated(self.index))?
.remap_key_type::<KeyCodec>()
.next()
.is_some()
|| self
.database
.remap_data_type::<DecodeIgnore>()
.get(rtxn, &Key::metadata(self.index))?
.is_none())
}
/// Returns an `Option`al vector previous stored in this database.
pub fn item_vector(&self, rtxn: &RoTxn, item: ItemId) -> Result<Option<Vec<f32>>> {
Ok(get_item(self.database, self.index, rtxn, item)?.map(|item| {
let mut vec = item.vector.to_vec();
vec.truncate(self.dimensions);
vec
}))
}
/// Returns `true` if the database contains the given item.
pub fn contains_item(&self, rtxn: &RoTxn, item: ItemId) -> Result<bool> {
self.database
.remap_data_type::<DecodeIgnore>()
.get(rtxn, &Key::item(self.index, item))
.map(|opt| opt.is_some())
.map_err(Into::into)
}
/// Returns an iterator over the items vector.
pub fn iter<'t>(&self, rtxn: &'t RoTxn) -> Result<ItemIter<'t, D>> {
Ok(ItemIter::new(self.database, self.index, self.dimensions, rtxn)?)
}
/// Add an item associated to a vector in the database.
pub fn add_item(&self, wtxn: &mut RwTxn, item: ItemId, vector: &[f32]) -> Result<()> {
if vector.len() != self.dimensions {
return Err(Error::InvalidVecDimension {
expected: self.dimensions,
received: vector.len(),
});
}
let vector = UnalignedVector::from_slice(vector);
let db_item = Item { header: D::new_header(&vector), vector };
self.database.put(wtxn, &Key::item(self.index, item), &Node::Item(db_item))?;
self.database.remap_data_type::<UpdateStatusCodec>().put(
wtxn,
&Key::updated(self.index, item),
&UpdateStatus::Updated,
)?;
Ok(())
}
/// Deletes an item stored in this database and returns `true` if it existed.
pub fn del_item(&self, wtxn: &mut RwTxn, item: ItemId) -> Result<bool> {
if self.database.delete(wtxn, &Key::item(self.index, item))? {
self.database.remap_data_type::<UpdateStatusCodec>().put(
wtxn,
&Key::updated(self.index, item),
&UpdateStatus::Removed,
)?;
Ok(true)
} else {
Ok(false)
}
}
/// Removes everything in the database, user items and internal graph links.
pub fn clear(&self, wtxn: &mut RwTxn) -> Result<()> {
let mut cursor = self
.database
.remap_key_type::<PrefixCodec>()
.prefix_iter_mut(wtxn, &Prefix::all(self.index))?
.remap_types::<DecodeIgnore, DecodeIgnore>();
while let Some((_id, _node)) = cursor.next().transpose()? {
// SAFETY: Safe because we don't keep any references to the entry
unsafe { cursor.del_current() }?;
}
Ok(())
}
/// Returns an [`HannoyBuilder`] to configure the available options to build the database.
pub fn builder<'a, R>(&'a self, rng: &'a mut R) -> HannoyBuilder<'a, D, R, NoProgress>
where
R: Rng + SeedableRng,
{
HannoyBuilder { writer: self, rng, inner: BuildOption::default() }
}
fn build<R, P, const M: usize, const M0: usize>(
&self,
wtxn: &mut RwTxn,
rng: &mut R,
options: &BuildOption<P>,
) -> Result<()>
where
R: Rng + SeedableRng,
P: steppe::Progress,
{
// Get the list of items we already registered in the metadata
let indexed_items = self
.database
.remap_data_type::<MetadataCodec>()
.get(wtxn, &Key::metadata(self.index))?
.map_or_else(RoaringBitmap::default, |m| m.items);
// In case we have to rebuild all links we can skip the deletion step.
let (item_indices, to_delete, to_insert) = if options.relink_all_items {
(indexed_items.clone(), RoaringBitmap::new(), indexed_items)
} else {
// updated items can be an update, an addition or a removed item
// they are identified by a "updated" stone key
let (all_updated_items, deleted_items) =
self.reset_and_retrieve_updated_items(wtxn, options)?;
// Item indices corresponds to all items, known ones and updates ones
let updated_items = &all_updated_items - &deleted_items;
let item_indices = (&updated_items | indexed_items) - &deleted_items;
let to_delete = all_updated_items.clone() - &item_indices;
let to_insert = &item_indices & all_updated_items;
(item_indices, to_delete, to_insert)
};
let metadata = self
.database
.remap_data_type::<MetadataCodec>()
.get(wtxn, &Key::metadata(self.index))?;
let (entry_points, max_level) = metadata.as_ref().map_or_else(
|| (Vec::new(), usize::MIN),
|metadata| (metadata.entry_points.iter().collect(), metadata.max_level as usize),
);
// we should not keep a reference to the metadata since they're going to be moved by LMDB
drop(metadata);
let mut hnsw = HnswBuilder::<D, M, M0>::new(options)
.with_entry_points(entry_points)
.with_max_level(max_level);
let stats =
hnsw.build(to_insert, &to_delete, self.database, self.index, wtxn, rng, options)?;
debug!("{stats:?}");
// Remove deleted links from lmdb AFTER build; in DiskANN we use a deleted item's
// neighbours when filling in the "gaps" left in the graph from deletions. See
// [`HnswBuilder::maybe_patch_old_links`] for more details.
self.delete_links_from_db(&to_delete, wtxn, options)?;
debug!("write the metadata...");
options.progress.update(HannoyBuild::WriteTheMetadata);
self.database.remap_data_type::<MetadataCodec>().put(
wtxn,
&Key::metadata(self.index),
&Metadata {
dimensions: self.dimensions.try_into().unwrap(),
items: item_indices,
entry_points: ItemIds::from_slice(&hnsw.entry_points),
max_level: hnsw.max_level as u8,
distance: D::name(),
},
)?;
self.database.remap_data_type::<VersionCodec>().put(
wtxn,
&Key::version(self.index),
&Version::current(),
)?;
Ok(())
}
/// Kinda like clear and create, but only for links
///
/// You must ensure that no items were inserted that are not
/// part of the metadata. Which means that a build must have
/// been performed or you'll see item leaking.
fn force_rebuild<R, P, const M: usize, const M0: usize>(
&self,
wtxn: &mut RwTxn,
rng: &mut R,
options: &BuildOption<P>,
) -> Result<()>
where
R: Rng + SeedableRng,
P: steppe::Progress,
{
// 1. Ensure we have the right settings
assert!(
options.relink_all_items,
"forcing relinking of all items requires the relink_all_items option to be set to true"
);
// 2. Fetch the list of items from the metadata
let Metadata { items: item_ids, .. } = self
.database
.remap_data_type::<MetadataCodec>()
.get(wtxn, &Key::metadata(self.index))?
.expect("The metadata must be there");
// 3. delete all links
self.delete_links_from_db(&item_ids, wtxn, options)?;
// 4. trigger build
self.build::<R, P, M, M0>(wtxn, rng, options)
}
/// Removes all the "updated" stones from the database
/// and returns the list of updated and deleted items.
///
/// The updated items corresponds to all the items modified, inserted or deleted.
/// The deleted items corresponds to all the items deleted.
fn reset_and_retrieve_updated_items<P>(
&self,
wtxn: &mut RwTxn,
options: &BuildOption<P>,
) -> Result<(RoaringBitmap, RoaringBitmap), Error>
where
P: steppe::Progress,
{
debug!("reset and retrieve the updated items...");
options.progress.update(HannoyBuild::RetrieveTheUpdatedItems);
let mut updated_items = RoaringBitmap::new();
let mut deleted_items = RoaringBitmap::new();
let mut updated_iter = self
.database
.remap_types::<PrefixCodec, UpdateStatusCodec>()
.prefix_iter_mut(wtxn, &Prefix::updated(self.index))?
.remap_key_type::<KeyCodec>();
let mut index = 0;
while let Some((key, update_status)) = updated_iter.next().transpose()? {
if index % CANCELLATION_PROBING == 0 && (options.cancel)() {
return Err(Error::BuildCancelled);
}
let inserted = updated_items.insert(key.node.item);
debug_assert!(inserted, "The keys should be sorted by LMDB");
if update_status == UpdateStatus::Removed {
let inserted = deleted_items.insert(key.node.item);
debug_assert!(inserted, "The keys should be sorted by LMDB");
}
// SAFETY: Safe because we don't hold any reference to the database currently
let did_delete = unsafe { updated_iter.del_current()? };
if !did_delete {
error!(item = key.node.item, "failed to remove item")
}
index += 1;
}
Ok((updated_items, deleted_items))
}
// Iterates over links in lmdb and deletes those in `to_delete`. There can be several links
// with the same NodeId.item, each differing by their layer
fn delete_links_from_db<P>(
&self,
to_delete: &RoaringBitmap,
wtxn: &mut RwTxn,
options: &BuildOption<P>,
) -> Result<()>
where
P: steppe::Progress,
{
debug!("started deleting the links...");
options.progress.update(HannoyBuild::DeletingTheLinks);
let mut cursor = self
.database
.remap_key_type::<PrefixCodec>()
.prefix_iter_mut(wtxn, &Prefix::links(self.index))?
.remap_types::<KeyCodec, DecodeIgnore>();
while let Some((key, _)) = cursor.next().transpose()? {
if to_delete.contains(key.node.item) {
// SAFETY: Safe because we don't keep any references to the entry
unsafe { cursor.del_current() }?;
}
}
Ok(())
}
}
/// Clears all the links. Starts from the last node and stops at the first item.
fn clear_links<D: Distance>(wtxn: &mut RwTxn, database: Database<D>, index: u16) -> Result<()> {
let mut cursor = database
.remap_types::<PrefixCodec, DecodeIgnore>()
.prefix_iter_mut(wtxn, &Prefix::links(index))?
.remap_key_type::<DecodeIgnore>();
while let Some((_id, _node)) = cursor.next().transpose()? {
// SAFETY: Safe because we don't keep any references to the entry
unsafe { cursor.del_current()? };
}
Ok(())
}