1use std::fmt;
9use std::fs;
10use std::fs::File;
11use std::io::Read;
12use std::path::Path;
13use std::path::PathBuf;
14
15use byteorder::BigEndian;
16use byteorder::ReadBytesExt;
17use fs2::FileExt;
18use indexedlog::log;
19use vlqencoding::VLQDecode;
20use vlqencoding::VLQEncode;
21
22use super::IdMapWrite;
23use crate::errors::bug;
24use crate::errors::programming;
25use crate::errors::NotFoundError;
26use crate::id::Group;
27use crate::id::Id;
28use crate::id::VertexName;
29use crate::ops::IdConvert;
30use crate::ops::Persist;
31use crate::ops::PrefixLookup;
32use crate::ops::TryClone;
33use crate::Result;
34use crate::VerLink;
35
36pub struct IdMap {
40 pub(crate) log: log::Log,
41 path: PathBuf,
42 map_id: String,
43 map_version: VerLink,
44}
45
46impl IdMap {
47 const INDEX_ID_TO_NAME: usize = 0;
57 const INDEX_GROUP_NAME_TO_ID: usize = 1;
58
59 const MAGIC_CLEAR_NON_MASTER: &'static [u8] = b"CLRNM";
63
64 const MAGIC_DELETION_PREFIX: &'static [u8] = &u64::MAX.to_be_bytes();
67
68 const NAME_OFFSET: usize = 8 + Group::BYTES;
70
71 pub fn open(path: impl AsRef<Path>) -> Result<Self> {
76 let path = path.as_ref();
77 let log = Self::log_open_options().open(path)?;
78 Self::open_from_log(log)
79 }
80}
81
82impl TryClone for IdMap {
83 fn try_clone(&self) -> Result<Self> {
84 let result = Self {
85 log: self.log.try_clone()?,
86 path: self.path.clone(),
87 map_id: self.map_id.clone(),
88 map_version: self.map_version.clone(),
89 };
90 Ok(result)
91 }
92}
93
94impl IdMap {
95 pub(crate) fn open_from_log(log: log::Log) -> Result<Self> {
96 let path = log.path().as_opt_path().unwrap().to_path_buf();
97 let map_id = format!("ilog:{}", path.display());
98 Ok(Self {
99 log,
100 path,
101 map_id,
102 map_version: VerLink::new(),
103 })
104 }
105
106 pub(crate) fn log_open_options() -> log::OpenOptions {
107 assert!(Self::MAGIC_DELETION_PREFIX > &Id::MAX.0.to_be_bytes()[..]);
108 log::OpenOptions::new()
109 .create(true)
110 .index("id", |data| {
111 assert!(Self::MAGIC_CLEAR_NON_MASTER.len() < 8);
112 assert!(Group::BITS == 8);
113 if data.starts_with(Self::MAGIC_DELETION_PREFIX) {
114 let items =
115 decode_deletion_entry(data).expect("deletion entry should be valid");
116 items
117 .into_iter()
118 .map(|(id, _name)| log::IndexOutput::Remove(id.0.to_be_bytes().into()))
119 .collect()
120 } else if data.len() < 8 {
121 if data == Self::MAGIC_CLEAR_NON_MASTER {
122 vec![log::IndexOutput::RemovePrefix(Box::new([
123 Group::NON_MASTER.0 as u8,
124 ]))]
125 } else {
126 panic!("bug: invalid segment {:?}", &data);
127 }
128 } else {
129 vec![log::IndexOutput::Reference(0..8)]
130 }
131 })
132 .index("group-name", |data| {
133 if data.starts_with(Self::MAGIC_DELETION_PREFIX) {
134 let items =
135 decode_deletion_entry(data).expect("deletion entry should be valid");
136 items
137 .into_iter()
138 .map(|(id, name)| {
139 let mut key = Vec::with_capacity(name.len() + 1);
140 key.extend_from_slice(&id.group().bytes());
141 key.extend_from_slice(name);
142 log::IndexOutput::Remove(key.into())
143 })
144 .collect()
145 } else if data.len() >= 8 {
146 vec![log::IndexOutput::Reference(8..(data.len() as u64))]
147 } else {
148 if data == Self::MAGIC_CLEAR_NON_MASTER {
149 vec![log::IndexOutput::RemovePrefix(Box::new([
150 Group::NON_MASTER.0 as u8,
151 ]))]
152 } else {
153 panic!("bug: invalid segment {:?}", &data);
154 }
155 }
156 })
157 .flush_filter(Some(|_, _| {
158 panic!("programming error: idmap changed by other process")
159 }))
160 }
161
162 pub fn find_name_by_id(&self, id: Id) -> Result<Option<&[u8]>> {
164 let key = id.0.to_be_bytes();
165 let key = self.log.lookup(Self::INDEX_ID_TO_NAME, &key)?.nth(0);
166 match key {
167 Some(Ok(entry)) => {
168 if entry.len() < 8 {
169 return bug("index key should have 8 bytes at least");
170 }
171 Ok(Some(&entry[Self::NAME_OFFSET..]))
172 }
173 None => Ok(None),
174 Some(Err(err)) => Err(err.into()),
175 }
176 }
177
178 pub fn find_vertex_name_by_id(&self, id: Id) -> Result<Option<VertexName>> {
180 self.find_name_by_id(id)
181 .map(|v| v.map(|n| VertexName(self.log.slice_to_bytes(n))))
182 }
183
184 pub fn find_id_by_name(&self, name: &[u8]) -> Result<Option<Id>> {
186 for group in Group::ALL.iter() {
187 let mut group_name = Vec::with_capacity(Group::BYTES + name.len());
188 group_name.extend_from_slice(&group.bytes());
189 group_name.extend_from_slice(name);
190 let key = self
191 .log
192 .lookup(Self::INDEX_GROUP_NAME_TO_ID, group_name)?
193 .nth(0);
194 match key {
195 Some(Ok(mut entry)) => {
196 if entry.len() < 8 {
197 return bug("index key should have 8 bytes at least");
198 }
199 let id = Id(entry.read_u64::<BigEndian>().unwrap());
200 return Ok(Some(id));
201 }
202 None => {}
203 Some(Err(err)) => return Err(err.into()),
204 }
205 }
206 Ok(None)
207 }
208
209 pub fn find_id_by_name_with_max_group(
211 &self,
212 name: &[u8],
213 max_group: Group,
214 ) -> Result<Option<Id>> {
215 Ok(self.find_id_by_name(name)?.and_then(|id| {
216 if id.group() <= max_group {
217 Some(id)
218 } else {
219 None
220 }
221 }))
222 }
223
224 pub fn insert(&mut self, id: Id, name: &[u8]) -> Result<()> {
228 let existing_name = self.find_name_by_id(id)?;
229 if let Some(existing_name) = existing_name {
230 if existing_name == name {
231 return Ok(());
232 } else {
233 return bug(format!(
234 "new entry {} = {:?} conflicts with an existing entry {} = {:?}",
235 id, name, id, existing_name
236 ));
237 }
238 }
239 let existing_id = self.find_id_by_name(name)?;
240 if let Some(existing_id) = existing_id {
241 if existing_id == id {
247 return Ok(());
248 } else {
249 return bug(format!(
250 "new entry {} = {:?} conflicts with an existing entry {} = {:?}",
251 id, name, existing_id, name
252 ));
253 }
254 }
255
256 let mut data = Vec::with_capacity(8 + Group::BYTES + name.len());
257 data.extend_from_slice(&id.0.to_be_bytes());
258 data.extend_from_slice(&id.group().bytes());
259 data.extend_from_slice(&name);
260 self.log.append(data)?;
261 self.map_version.bump();
262 #[cfg(debug_assertions)]
263 {
264 let items = self.find_range(id, id).unwrap();
265 assert_eq!(items[0], (id, name));
266 }
267 Ok(())
268 }
269
270 fn find_range(&self, low: Id, high: Id) -> Result<Vec<(Id, &[u8])>> {
272 let low = low.0.to_be_bytes();
273 let high = high.0.to_be_bytes();
274 let range = &low[..]..=&high[..];
275 let mut items = Vec::new();
276 for entry in self.log.lookup_range(Self::INDEX_ID_TO_NAME, range)? {
277 let (key, values) = entry?;
278 let key: [u8; 8] = match key.as_ref().try_into() {
279 Ok(key) => key,
280 Err(_) => {
281 return bug("find_range got non-u64 keys in INDEX_ID_TO_NAME");
282 }
283 };
284 let id = Id(u64::from_be_bytes(key));
285 for value in values {
286 let value = value?;
287 if value.len() < 8 {
288 return bug(format!(
289 "find_range got entry {:?} shorter than expected",
290 &value
291 ));
292 }
293 let name: &[u8] = &value[9..];
294 items.push((id, name));
295 }
296 }
297 Ok(items)
298 }
299
300 fn remove_range(&mut self, low: Id, high: Id) -> Result<Vec<VertexName>> {
301 let items = self.find_range(low, high)?;
303 let names = items
304 .iter()
305 .map(|(_, bytes)| VertexName::copy_from(bytes))
306 .collect();
307 let data = encode_deletion_entry(&items);
311 self.log.append(data)?;
312 self.map_version = VerLink::new();
315 Ok(names)
316 }
317
318 fn find_names_by_hex_prefix(&self, hex_prefix: &[u8], limit: usize) -> Result<Vec<VertexName>> {
320 let mut result = Vec::with_capacity(limit);
321 for group in Group::ALL.iter().rev() {
322 let mut prefix = Vec::with_capacity(Group::BYTES * 2 + hex_prefix.len());
323 prefix.extend_from_slice(&group.hex_bytes());
324 prefix.extend_from_slice(hex_prefix);
325 for entry in self
326 .log
327 .lookup_prefix_hex(Self::INDEX_GROUP_NAME_TO_ID, prefix)?
328 {
329 let (k, _v) = entry?;
330 let vertex = VertexName(self.log.slice_to_bytes(&k[Group::BYTES..]));
331 if !result.contains(&vertex) {
332 result.push(vertex);
333 }
334 if result.len() >= limit {
335 return Ok(result);
336 }
337 }
338 }
339 Ok(result)
340 }
341}
342
343fn encode_deletion_entry(items: &[(Id, &[u8])]) -> Vec<u8> {
347 let len = IdMap::MAGIC_DELETION_PREFIX.len() + 9 + items.len() * 30;
349 let mut data = Vec::with_capacity(len);
350 data.extend_from_slice(IdMap::MAGIC_DELETION_PREFIX);
351 data.write_vlq(items.len()).unwrap();
352 for (id, name) in items {
353 data.write_vlq(id.0).unwrap();
354 data.write_vlq(name.len()).unwrap();
355 data.extend_from_slice(name);
356 }
357 data
358}
359
360fn decode_deletion_entry(data: &[u8]) -> Result<Vec<(Id, &[u8])>> {
363 assert!(data.starts_with(IdMap::MAGIC_DELETION_PREFIX));
364 let mut data = &data[IdMap::MAGIC_DELETION_PREFIX.len()..];
365 let n = data.read_vlq()?;
366 let mut items = Vec::with_capacity(n);
367 for _ in 0..n {
368 let id: u64 = data.read_vlq()?;
369 let id = Id(id);
370 let name_len: usize = data.read_vlq()?;
371 if name_len > data.len() {
372 return bug("decode_deletion_id_names got incomplete input");
373 }
374 let (name, rest) = data.split_at(name_len);
375 data = rest;
376 items.push((id, name));
377 }
378 Ok(items)
379}
380
381impl fmt::Debug for IdMap {
382 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
383 write!(f, "IdMap {{\n")?;
384 for data in self.log.iter() {
385 if let Ok(mut data) = data {
386 let id = data.read_u64::<BigEndian>().unwrap();
387 let _group = data.read_u8().unwrap();
388 let mut name = Vec::with_capacity(20);
389 data.read_to_end(&mut name).unwrap();
390 let name = if name.len() >= 20 {
391 VertexName::from(name).to_hex()
392 } else {
393 String::from_utf8_lossy(&name).to_string()
394 };
395 let id = Id(id);
396 write!(f, " {}: {},\n", name, id)?;
397 }
398 }
399 write!(f, "}}\n")?;
400 Ok(())
401 }
402}
403
404#[async_trait::async_trait]
405impl IdConvert for IdMap {
406 async fn vertex_id(&self, name: VertexName) -> Result<Id> {
407 self.find_id_by_name(name.as_ref())?
408 .ok_or_else(|| name.not_found_error())
409 }
410 async fn vertex_id_with_max_group(
411 &self,
412 name: &VertexName,
413 max_group: Group,
414 ) -> Result<Option<Id>> {
415 self.find_id_by_name_with_max_group(name.as_ref(), max_group)
416 }
417 async fn vertex_name(&self, id: Id) -> Result<VertexName> {
418 self.find_vertex_name_by_id(id)?
419 .ok_or_else(|| id.not_found_error())
420 }
421 async fn contains_vertex_name(&self, name: &VertexName) -> Result<bool> {
422 Ok(self.find_id_by_name(name.as_ref())?.is_some())
423 }
424 async fn contains_vertex_id_locally(&self, ids: &[Id]) -> Result<Vec<bool>> {
425 let mut list = Vec::with_capacity(ids.len());
426 for &id in ids {
427 list.push(self.find_name_by_id(id)?.is_some());
428 }
429 Ok(list)
430 }
431 async fn contains_vertex_name_locally(&self, names: &[VertexName]) -> Result<Vec<bool>> {
432 let mut list = Vec::with_capacity(names.len());
433 for name in names {
434 let contains = self.find_id_by_name(name.as_ref())?.is_some();
435 tracing::trace!("contains_vertex_name_locally({:?}) = {}", name, contains);
436 list.push(contains);
437 }
438 Ok(list)
439 }
440 fn map_id(&self) -> &str {
441 &self.map_id
442 }
443 fn map_version(&self) -> &VerLink {
444 &self.map_version
445 }
446}
447
448#[async_trait::async_trait]
449impl IdMapWrite for IdMap {
450 async fn insert(&mut self, id: Id, name: &[u8]) -> Result<()> {
451 IdMap::insert(self, id, name)
452 }
453 async fn remove_range(&mut self, low: Id, high: Id) -> Result<Vec<VertexName>> {
454 IdMap::remove_range(self, low, high)
455 }
456}
457
458impl Persist for IdMap {
459 type Lock = File;
460
461 fn lock(&mut self) -> Result<Self::Lock> {
462 if self.log.iter_dirty().next().is_some() {
463 return programming("lock() must be called without dirty in-memory entries");
464 }
465 let lock_file = {
466 let mut path = self.path.clone();
467 path.push("wlock");
468 File::open(&path).or_else(|_| {
469 fs::OpenOptions::new()
470 .write(true)
471 .create_new(true)
472 .open(&path)
473 })?
474 };
475 lock_file.lock_exclusive()?;
476 Ok(lock_file)
477 }
478
479 fn reload(&mut self, _lock: &Self::Lock) -> Result<()> {
480 self.log.clear_dirty()?;
481 self.log.sync()?;
482 Ok(())
483 }
484
485 fn persist(&mut self, _lock: &Self::Lock) -> Result<()> {
486 self.log.sync()?;
487 Ok(())
488 }
489}
490
491#[async_trait::async_trait]
492impl PrefixLookup for IdMap {
493 async fn vertexes_by_hex_prefix(
494 &self,
495 hex_prefix: &[u8],
496 limit: usize,
497 ) -> Result<Vec<VertexName>> {
498 self.find_names_by_hex_prefix(hex_prefix, limit)
499 }
500}
501
502#[cfg(test)]
503mod tests {
504 use super::*;
505
506 #[test]
507 fn test_encode_decode_deletion_entry() {
508 let items: &[(Id, &[u8])] = &[
509 (Id(0), b"a"),
510 (Id(1), b"bb"),
511 (Id(10), b"ccc"),
512 (Id(20), b"dd"),
513 ];
514 let data = encode_deletion_entry(items);
515 let decoded = decode_deletion_entry(&data).unwrap();
516 assert_eq!(&decoded, items);
517 }
518}