1use std::fmt;
9use std::fs;
10use std::fs::File;
11use std::path::Path;
12use std::path::PathBuf;
13
14use byteorder::BigEndian;
15use byteorder::ReadBytesExt;
16use fs2::FileExt;
17use indexedlog::log;
18use vlqencoding::VLQDecode;
19use vlqencoding::VLQEncode;
20
21use super::mem_idmap::CoreMemIdMap;
22use super::IdMapWrite;
23use crate::errors::bug;
24use crate::errors::programming;
25use crate::errors::NotFoundError;
26use crate::id::Group;
27use crate::id::Id;
28use crate::id::Vertex;
29use crate::ops::IdConvert;
30use crate::ops::Persist;
31use crate::ops::PrefixLookup;
32use crate::ops::TryClone;
33use crate::Result;
34use crate::VerLink;
35
36pub struct IdMap {
40 pub(crate) log: log::Log,
41 path: PathBuf,
42 map_id: String,
43 map_version: VerLink,
44 virtual_map: CoreMemIdMap,
46}
47
48impl IdMap {
49 const INDEX_ID_TO_NAME: usize = 0;
59 const INDEX_GROUP_NAME_TO_ID: usize = 1;
60
61 const MAGIC_CLEAR_NON_MASTER: &'static [u8] = b"CLRNM";
65
66 const MAGIC_DELETION_PREFIX: &'static [u8] = &u64::MAX.to_be_bytes();
69
70 const NAME_OFFSET: usize = 8 + Group::BYTES;
72
73 pub fn open(path: impl AsRef<Path>) -> Result<Self> {
78 let path = path.as_ref();
79 let log = Self::log_open_options().open(path)?;
80 Self::open_from_log(log)
81 }
82}
83
84impl TryClone for IdMap {
85 fn try_clone(&self) -> Result<Self> {
86 let result = Self {
87 log: self.log.try_clone()?,
88 path: self.path.clone(),
89 map_id: self.map_id.clone(),
90 map_version: self.map_version.clone(),
91 virtual_map: self.virtual_map.clone(),
92 };
93 Ok(result)
94 }
95}
96
97impl IdMap {
98 pub(crate) fn open_from_log(log: log::Log) -> Result<Self> {
99 let path = log.path().as_opt_path().unwrap().to_path_buf();
100 let map_id = format!("ilog:{}", path.display());
101 let map_version = VerLink::from_storage_version_or_new(&map_id, log.version());
102 Ok(Self {
103 log,
104 path,
105 map_id,
106 map_version,
107 virtual_map: Default::default(),
108 })
109 }
110
111 pub(crate) fn log_open_options() -> log::OpenOptions {
112 assert!(Self::MAGIC_DELETION_PREFIX > &Id::MAX.0.to_be_bytes()[..]);
113 log::OpenOptions::new()
114 .create(true)
115 .index("id", |data| {
116 assert!(Self::MAGIC_CLEAR_NON_MASTER.len() < 8);
117 assert!(Group::BITS == 8);
118 if data.starts_with(Self::MAGIC_DELETION_PREFIX) {
119 let items =
120 decode_deletion_entry(data).expect("deletion entry should be valid");
121 items
122 .into_iter()
123 .map(|(id, _name)| log::IndexOutput::Remove(id.0.to_be_bytes().into()))
124 .collect()
125 } else if data.len() < 8 {
126 if data == Self::MAGIC_CLEAR_NON_MASTER {
127 vec![log::IndexOutput::RemovePrefix(Box::new([
128 Group::NON_MASTER.0 as u8,
129 ]))]
130 } else {
131 panic!("bug: invalid segment {:?}", &data);
132 }
133 } else {
134 let slice = &data[..8];
135 let slice: [u8; 8] = slice.try_into().unwrap(); let id = Id(u64::from_be_bytes(slice));
137 assert!(
138 !id.is_virtual(),
139 "bug: VIRTUAL group should never be written to disk"
140 );
141 vec![log::IndexOutput::Reference(0..8)]
142 }
143 })
144 .index("group-name", |data| {
145 if data.starts_with(Self::MAGIC_DELETION_PREFIX) {
146 let items =
147 decode_deletion_entry(data).expect("deletion entry should be valid");
148 items
149 .into_iter()
150 .map(|(id, name)| {
151 let mut key = Vec::with_capacity(name.len() + 1);
152 key.extend_from_slice(&id.group().bytes());
153 key.extend_from_slice(name);
154 log::IndexOutput::Remove(key.into())
155 })
156 .collect()
157 } else if data.len() >= 8 {
158 vec![log::IndexOutput::Reference(8..(data.len() as u64))]
159 } else if data == Self::MAGIC_CLEAR_NON_MASTER {
160 vec![log::IndexOutput::RemovePrefix(Box::new([
161 Group::NON_MASTER.0 as u8,
162 ]))]
163 } else {
164 panic!("bug: invalid segment {:?}", &data);
165 }
166 })
167 .flush_filter(Some(|_, _| {
168 panic!("programming error: idmap changed by other process")
169 }))
170 }
171
172 pub fn find_name_by_id(&self, id: Id) -> Result<Option<&[u8]>> {
174 if id.is_virtual() {
175 return Ok(self.virtual_map.lookup_vertex_name(id).map(|v| v.as_ref()));
176 }
177 let key = id.0.to_be_bytes();
178 let key = self.log.lookup(Self::INDEX_ID_TO_NAME, key)?.nth(0);
179 match key {
180 Some(Ok(entry)) => {
181 if entry.len() < 8 {
182 return bug("index key should have 8 bytes at least");
183 }
184 Ok(Some(&entry[Self::NAME_OFFSET..]))
185 }
186 None => Ok(None),
187 Some(Err(err)) => Err(err.into()),
188 }
189 }
190
191 pub fn find_vertex_name_by_id(&self, id: Id) -> Result<Option<Vertex>> {
193 if !id.is_valid() {
194 return Ok(None);
195 }
196 if id.is_virtual() {
197 return Ok(self.virtual_map.lookup_vertex_name(id).cloned());
198 }
199 self.find_name_by_id(id)
200 .map(|v| v.map(|n| Vertex(self.log.slice_to_bytes(n))))
201 }
202
203 pub fn find_id_by_name(&self, name: &[u8]) -> Result<Option<Id>> {
205 for group in Group::ALL.iter() {
206 if *group == Group::VIRTUAL {
207 if let Some(found_id) = self.virtual_map.lookup_vertex_id(&Vertex::copy_from(name))
208 {
209 return Ok(Some(found_id));
210 }
211 } else {
212 let mut group_name = Vec::with_capacity(Group::BYTES + name.len());
213 group_name.extend_from_slice(&group.bytes());
214 group_name.extend_from_slice(name);
215 let key = self
216 .log
217 .lookup(Self::INDEX_GROUP_NAME_TO_ID, group_name)?
218 .nth(0);
219 match key {
220 Some(Ok(mut entry)) => {
221 if entry.len() < 8 {
222 return bug("index key should have 8 bytes at least");
223 }
224 let id = Id(entry.read_u64::<BigEndian>().unwrap());
225 return Ok(Some(id));
226 }
227 None => {}
228 Some(Err(err)) => return Err(err.into()),
229 }
230 }
231 }
232 Ok(None)
233 }
234
235 pub fn find_id_by_name_with_max_group(
237 &self,
238 name: &[u8],
239 max_group: Group,
240 ) -> Result<Option<Id>> {
241 Ok(self.find_id_by_name(name)?.and_then(|id| {
242 if id.group() <= max_group {
243 Some(id)
244 } else {
245 None
246 }
247 }))
248 }
249
250 pub fn insert(&mut self, id: Id, name: &[u8]) -> Result<()> {
254 let existing_name = self.find_name_by_id(id)?;
255 if let Some(existing_name) = existing_name {
256 if existing_name == name {
257 return Ok(());
258 } else {
259 return bug(format!(
260 "new entry {} = {:?} conflicts with an existing entry {} = {:?}",
261 id, name, id, existing_name
262 ));
263 }
264 }
265 let existing_id = self.find_id_by_name(name)?;
266 if let Some(existing_id) = existing_id {
267 if existing_id == id {
273 return Ok(());
274 } else {
275 return bug(format!(
276 "new entry {} = {:?} conflicts with an existing entry {} = {:?}",
277 id, name, existing_id, name
278 ));
279 }
280 }
281
282 if id.is_virtual() {
283 self.virtual_map
284 .insert_vertex_id_name(id, Vertex::copy_from(name));
285 return Ok(());
286 }
287
288 let mut data = Vec::with_capacity(8 + Group::BYTES + name.len());
289 data.extend_from_slice(&id.0.to_be_bytes());
290 data.extend_from_slice(&id.group().bytes());
291 data.extend_from_slice(name);
292 self.log.append(data)?;
293 self.map_version.bump();
294 #[cfg(debug_assertions)]
295 {
296 let items = self.find_range(id, id).unwrap();
297 assert_eq!(items[0], (id, name));
298 }
299 Ok(())
300 }
301
302 fn find_range(&self, low: Id, high: Id) -> Result<Vec<(Id, &[u8])>> {
304 let low_bytes = low.0.to_be_bytes();
305 let high_bytes = high.0.to_be_bytes();
306 let range = &low_bytes[..]..=&high_bytes[..];
307 let mut items = Vec::new();
308 if !low.is_virtual() {
309 for entry in self.log.lookup_range(Self::INDEX_ID_TO_NAME, range)? {
310 let (key, values) = entry?;
311 let key: [u8; 8] = match key.as_ref().try_into() {
312 Ok(key) => key,
313 Err(_) => {
314 return bug("find_range got non-u64 keys in INDEX_ID_TO_NAME");
315 }
316 };
317 let id = Id(u64::from_be_bytes(key));
318 for value in values {
319 let value = value?;
320 if value.len() < 8 {
321 return bug(format!(
322 "find_range got entry {:?} shorter than expected",
323 &value
324 ));
325 }
326 let name: &[u8] = &value[9..];
327 items.push((id, name));
328 }
329 }
330 }
331 if high.is_virtual() {
332 for (id, name) in self.virtual_map.lookup_range(low, high) {
333 items.push((*id, name.as_ref()))
334 }
335 }
336 Ok(items)
337 }
338
339 fn remove_range(&mut self, low: Id, high: Id) -> Result<Vec<Vertex>> {
340 let mut names = Vec::new();
341 if !low.is_virtual() {
342 debug_assert!(Group::ALL.contains(&(Group::VIRTUAL - 1)));
344 let items = self.find_range(low, (Group::VIRTUAL - 1).max_id().min(high))?;
345 names.extend(items.iter().map(|(_, bytes)| Vertex::copy_from(bytes)));
346 let data = encode_deletion_entry(&items);
350 self.log.append(data)?;
351 self.map_version = VerLink::new();
354 }
355 if high.is_virtual() {
357 names.extend(self.virtual_map.remove_range(low, high)?);
358 }
359 Ok(names)
360 }
361
362 fn find_names_by_hex_prefix(&self, hex_prefix: &[u8], limit: usize) -> Result<Vec<Vertex>> {
364 let mut result = Vec::with_capacity(limit);
365 for group in Group::ALL.iter().rev() {
366 if result.len() >= limit {
367 break;
368 }
369 if *group == Group::VIRTUAL {
370 result.extend(self.virtual_map.lookup_vertexes_by_hex_prefix(
371 hex_prefix,
372 limit.saturating_sub(result.len()),
373 )?);
374 } else {
375 let mut prefix = Vec::with_capacity(Group::BYTES * 2 + hex_prefix.len());
376 prefix.extend_from_slice(&group.hex_bytes());
377 prefix.extend_from_slice(hex_prefix);
378 for entry in self
379 .log
380 .lookup_prefix_hex(Self::INDEX_GROUP_NAME_TO_ID, prefix)?
381 {
382 if result.len() >= limit {
383 break;
384 }
385 let (k, _v) = entry?;
386 let vertex = Vertex(self.log.slice_to_bytes(&k[Group::BYTES..]));
387 if !result.contains(&vertex) {
388 result.push(vertex);
389 }
390 }
391 }
392 }
393 Ok(result)
394 }
395}
396
397fn encode_deletion_entry(items: &[(Id, &[u8])]) -> Vec<u8> {
401 let len = IdMap::MAGIC_DELETION_PREFIX.len() + 9 + items.len() * 30;
403 let mut data = Vec::with_capacity(len);
404 data.extend_from_slice(IdMap::MAGIC_DELETION_PREFIX);
405 data.write_vlq(items.len()).unwrap();
406 for (id, name) in items {
407 data.write_vlq(id.0).unwrap();
408 data.write_vlq(name.len()).unwrap();
409 data.extend_from_slice(name);
410 }
411 data
412}
413
414fn decode_deletion_entry(data: &[u8]) -> Result<Vec<(Id, &[u8])>> {
417 assert!(data.starts_with(IdMap::MAGIC_DELETION_PREFIX));
418 let mut data = &data[IdMap::MAGIC_DELETION_PREFIX.len()..];
419 let n = data.read_vlq()?;
420 let mut items = Vec::with_capacity(n);
421 for _ in 0..n {
422 let id: u64 = data.read_vlq()?;
423 let id = Id(id);
424 let name_len: usize = data.read_vlq()?;
425 if name_len > data.len() {
426 return bug("decode_deletion_id_names got incomplete input");
427 }
428 let (name, rest) = data.split_at(name_len);
429 data = rest;
430 items.push((id, name));
431 }
432 Ok(items)
433}
434
435impl fmt::Debug for IdMap {
436 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
437 write!(f, "IdMap {{\n")?;
438 let vec = self.find_range(Id::MIN, Id::MAX).map_err(|_| fmt::Error)?;
439 for (id, name) in vec {
440 let name = if name.len() >= 20 {
441 Vertex::copy_from(name).to_hex()
442 } else {
443 String::from_utf8_lossy(&name).to_string()
444 };
445 write!(f, " {}: {},\n", name, id)?;
446 }
447 write!(f, "}}\n")?;
448 Ok(())
449 }
450}
451
452#[async_trait::async_trait]
453impl IdConvert for IdMap {
454 async fn vertex_id(&self, name: Vertex) -> Result<Id> {
455 self.find_id_by_name(name.as_ref())?
456 .ok_or_else(|| name.not_found_error())
457 }
458 async fn vertex_id_with_max_group(
459 &self,
460 name: &Vertex,
461 max_group: Group,
462 ) -> Result<Option<Id>> {
463 self.find_id_by_name_with_max_group(name.as_ref(), max_group)
464 }
465 async fn vertex_name(&self, id: Id) -> Result<Vertex> {
466 self.find_vertex_name_by_id(id)?
467 .ok_or_else(|| id.not_found_error())
468 }
469 async fn contains_vertex_name(&self, name: &Vertex) -> Result<bool> {
470 Ok(self.find_id_by_name(name.as_ref())?.is_some())
471 }
472 async fn contains_vertex_id_locally(&self, ids: &[Id]) -> Result<Vec<bool>> {
473 let mut list = Vec::with_capacity(ids.len());
474 for &id in ids {
475 list.push(self.find_name_by_id(id)?.is_some());
476 }
477 Ok(list)
478 }
479 async fn contains_vertex_name_locally(&self, names: &[Vertex]) -> Result<Vec<bool>> {
480 let mut list = Vec::with_capacity(names.len());
481 for name in names {
482 let contains = self.find_id_by_name(name.as_ref())?.is_some();
483 tracing::trace!("contains_vertex_name_locally({:?}) = {}", name, contains);
484 list.push(contains);
485 }
486 Ok(list)
487 }
488 fn map_id(&self) -> &str {
489 &self.map_id
490 }
491 fn map_version(&self) -> &VerLink {
492 &self.map_version
493 }
494}
495
496#[async_trait::async_trait]
497impl IdMapWrite for IdMap {
498 async fn insert(&mut self, id: Id, name: &[u8]) -> Result<()> {
499 IdMap::insert(self, id, name)
500 }
501 async fn remove_range(&mut self, low: Id, high: Id) -> Result<Vec<Vertex>> {
502 IdMap::remove_range(self, low, high)
503 }
504}
505
506impl Persist for IdMap {
507 type Lock = File;
508
509 fn lock(&mut self) -> Result<Self::Lock> {
510 if self.log.iter_dirty().next().is_some() {
511 return programming("lock() must be called without dirty in-memory entries");
512 }
513 let lock_file = {
514 let path = self.path.join("wlock");
515 fs::OpenOptions::new()
518 .read(true)
519 .write(true)
520 .create(true)
521 .open(&path)?
522 };
523 lock_file.lock_exclusive()?;
524 Ok(lock_file)
525 }
526
527 fn reload(&mut self, _lock: &Self::Lock) -> Result<()> {
528 self.log.clear_dirty()?;
529 self.log.sync()?;
530 Ok(())
531 }
532
533 fn persist(&mut self, _lock: &Self::Lock) -> Result<()> {
534 self.log.sync()?;
535 self.map_version
536 .associate_storage_version(self.map_id.clone(), self.log.version());
537 Ok(())
538 }
539}
540
541#[async_trait::async_trait]
542impl PrefixLookup for IdMap {
543 async fn vertexes_by_hex_prefix(&self, hex_prefix: &[u8], limit: usize) -> Result<Vec<Vertex>> {
544 self.find_names_by_hex_prefix(hex_prefix, limit)
545 }
546}
547
548#[cfg(test)]
549mod tests {
550 use super::*;
551
552 #[test]
553 fn test_encode_decode_deletion_entry() {
554 let items: &[(Id, &[u8])] = &[
555 (Id(0), b"a"),
556 (Id(1), b"bb"),
557 (Id(10), b"ccc"),
558 (Id(20), b"dd"),
559 ];
560 let data = encode_deletion_entry(items);
561 let decoded = decode_deletion_entry(&data).unwrap();
562 assert_eq!(&decoded, items);
563 }
564}