use crate::id::{Group, Id, VertexName};
use anyhow::{bail, ensure, format_err, Result};
use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
use fs2::FileExt;
use indexedlog::log;
use std::borrow::Cow;
use std::fmt;
use std::fs::{self, File};
use std::io::{Cursor, Read, Write};
use std::ops::{Deref, DerefMut};
use std::path::{Path, PathBuf};
use std::sync::atomic::{self, AtomicU64};
pub struct IdMap {
log: log::Log,
path: PathBuf,
cached_next_free_ids: [AtomicU64; Group::COUNT],
pub(crate) need_rebuild_non_master: bool,
}
pub struct SyncableIdMap<'a> {
map: &'a mut IdMap,
lock_file: File,
}
impl IdMap {
const INDEX_ID_TO_NAME: usize = 0;
const INDEX_NAME_TO_ID: usize = 1;
const MAGIC_CLEAR_NON_MASTER: &'static [u8] = b"CLRNM";
pub fn open(path: impl AsRef<Path>) -> Result<Self> {
let path = path.as_ref();
let log = Self::log_open_options().open(path)?;
Self::open_from_log(log)
}
pub(crate) fn try_clone(&self) -> Result<Self> {
let result = Self {
log: self.log.try_clone()?,
path: self.path.clone(),
cached_next_free_ids: Default::default(),
need_rebuild_non_master: self.need_rebuild_non_master,
};
Ok(result)
}
pub(crate) fn open_from_log(log: log::Log) -> Result<Self> {
let path = log.path().as_opt_path().unwrap().to_path_buf();
Ok(Self {
log,
path,
cached_next_free_ids: Default::default(),
need_rebuild_non_master: false,
})
}
pub(crate) fn log_open_options() -> log::OpenOptions {
log::OpenOptions::new()
.create(true)
.index("id", |data| {
assert!(Self::MAGIC_CLEAR_NON_MASTER.len() < 8);
assert!(Group::BITS == 8);
if data.len() < 8 {
if data == Self::MAGIC_CLEAR_NON_MASTER {
vec![log::IndexOutput::RemovePrefix(Box::new([
Group::NON_MASTER.0 as u8,
]))]
} else {
panic!("bug: invalid segment {:?}", &data);
}
} else {
vec![log::IndexOutput::Reference(0..8)]
}
})
.index("name", |data| {
if data.len() >= 8 {
vec![log::IndexOutput::Reference(8..data.len() as u64)]
} else {
Vec::new()
}
})
.flush_filter(Some(|_, _| {
panic!("programming error: idmap changed by other process")
}))
}
pub fn prepare_filesystem_sync(&mut self) -> Result<SyncableIdMap> {
ensure!(
self.log.iter_dirty().next().is_none(),
"programming error: prepare_filesystem_sync must be called without dirty in-memory entries",
);
let lock_file = {
let mut path = self.path.clone();
path.push("wlock");
File::open(&path).or_else(|_| {
fs::OpenOptions::new()
.write(true)
.create_new(true)
.open(&path)
})?
};
lock_file.lock_exclusive()?;
self.reload()?;
Ok(SyncableIdMap {
map: self,
lock_file,
})
}
pub fn reload(&mut self) -> Result<()> {
self.log.clear_dirty()?;
self.log.sync()?;
self.cached_next_free_ids = Default::default();
Ok(())
}
pub fn find_name_by_id(&self, id: Id) -> Result<Option<&[u8]>> {
let mut key = Vec::with_capacity(8);
key.write_u64::<BigEndian>(id.0).unwrap();
let key = self.log.lookup(Self::INDEX_ID_TO_NAME, key)?.nth(0);
match key {
Some(Ok(entry)) => {
ensure!(entry.len() >= 8, "index key should have 8 bytes at least");
Ok(Some(&entry[8..]))
}
None => Ok(None),
Some(Err(err)) => Err(err.into()),
}
}
pub fn find_vertex_name_by_id(&self, id: Id) -> Result<Option<VertexName>> {
self.find_name_by_id(id)
.map(|v| v.map(|n| VertexName(self.log.slice_to_bytes(n))))
}
pub fn find_id_by_name(&self, name: &[u8]) -> Result<Option<Id>> {
let key = self.log.lookup(Self::INDEX_NAME_TO_ID, name)?.nth(0);
match key {
Some(Ok(mut entry)) => {
ensure!(entry.len() >= 8, "index key should have 8 bytes at least");
let id = Id(entry.read_u64::<BigEndian>().unwrap());
let group = id.group();
if group != Group::MASTER && self.next_free_id(group)? <= id {
Ok(None)
} else {
Ok(Some(id))
}
}
None => Ok(None),
Some(Err(err)) => Err(err.into()),
}
}
pub fn find_id_by_name_with_max_group(
&self,
name: &[u8],
max_group: Group,
) -> Result<Option<Id>> {
Ok(self.find_id_by_name(name)?.and_then(|id| {
if id.group() <= max_group {
Some(id)
} else {
None
}
}))
}
pub fn insert(&mut self, id: Id, name: &[u8]) -> Result<()> {
let group = id.group();
if id < self.next_free_id(group)? {
let existing_name = self.find_name_by_id(id)?;
if let Some(existing_name) = existing_name {
if existing_name == name {
return Ok(());
} else {
bail!(
"logic error: new entry {} = {:?} conflicts with an existing entry {} = {:?}",
id,
name,
id,
existing_name
);
}
}
}
let existing_id = self.find_id_by_name(name)?;
if let Some(existing_id) = existing_id {
if existing_id == id {
return Ok(());
} else if existing_id.group() <= group {
bail!(
"logic error: new entry {} = {:?} conflicts with an existing entry {} = {:?}",
id,
name,
existing_id,
name
);
}
self.need_rebuild_non_master = true;
}
let mut data = Vec::with_capacity(8 + name.len());
data.write_u64::<BigEndian>(id.0).unwrap();
data.write_all(name).unwrap();
self.log.append(data)?;
let next_free_id = self.cached_next_free_ids[group.0].get_mut();
if id.0 >= *next_free_id {
*next_free_id = id.0 + 1;
}
Ok(())
}
pub fn next_free_id(&self, group: Group) -> Result<Id> {
let cached = self.cached_next_free_ids[group.0].load(atomic::Ordering::SeqCst);
let id = if cached == 0 {
let id = Self::get_next_free_id(&self.log, group)?;
self.cached_next_free_ids[group.0].store(id.0, atomic::Ordering::SeqCst);
id
} else {
Id(cached)
};
Ok(id)
}
pub fn find_names_by_hex_prefix(
&self,
hex_prefix: &[u8],
limit: usize,
) -> Result<Vec<Cow<[u8]>>> {
self.log
.lookup_prefix_hex(Self::INDEX_NAME_TO_ID, hex_prefix)?
.take(limit)
.map(|entry| {
let (k, _v) = entry?;
Ok(k)
})
.collect::<Result<_>>()
}
fn get_next_free_id(log: &log::Log, group: Group) -> Result<Id> {
let lower_bound_id = group.min_id();
let upper_bound_id = group.max_id();
let lower_bound = lower_bound_id.to_bytearray();
let upper_bound = upper_bound_id.to_bytearray();
let range = &lower_bound[..]..=&upper_bound[..];
let mut iter = log.lookup_range(Self::INDEX_ID_TO_NAME, range)?.rev();
let id = match iter.nth(0) {
None => lower_bound_id,
Some(Ok((key, _))) => Id(Cursor::new(key).read_u64::<BigEndian>()? + 1),
_ => bail!("cannot read next_free_id for group {}", group),
};
debug_assert!(id >= lower_bound_id);
debug_assert!(id <= upper_bound_id);
Ok(id)
}
}
impl IdMap {
pub fn assign_head<F>(
&mut self,
head: VertexName,
parents_by_name: F,
group: Group,
) -> Result<Id>
where
F: Fn(VertexName) -> Result<Vec<VertexName>>,
{
enum Todo {
Visit(VertexName),
Assign(VertexName),
}
use Todo::{Assign, Visit};
let mut todo_stack: Vec<Todo> = vec![Visit(head.clone())];
while let Some(todo) = todo_stack.pop() {
match todo {
Visit(head) => {
if let None = self.find_id_by_name_with_max_group(head.as_ref(), group)? {
todo_stack.push(Todo::Assign(head.clone()));
for unassigned_parent in parents_by_name(head)?
.into_iter()
.filter(|p| match self.find_id_by_name_with_max_group(p.as_ref(), group) {
Ok(Some(_)) => false,
_ => true,
})
.rev()
{
todo_stack.push(Todo::Visit(unassigned_parent));
}
}
}
Assign(head) => {
if let None = self.find_id_by_name_with_max_group(head.as_ref(), group)? {
let id = self.next_free_id(group)?;
self.insert(id, head.as_ref())?;
}
}
}
}
self.find_id_by_name(head.as_ref())
.map(|v| v.expect("head should be assigned now"))
}
pub fn build_get_parents_by_id<'a>(
&'a self,
get_parents_by_name: &'a dyn Fn(VertexName) -> Result<Vec<VertexName>>,
) -> impl Fn(Id) -> Result<Vec<Id>> + 'a {
let func = move |id: Id| -> Result<Vec<Id>> {
let name = match self.find_vertex_name_by_id(id)? {
Some(name) => name,
None => {
let name = match self.find_name_by_id(id) {
Ok(Some(name)) => format!("{} ({:?})", id, name),
_ => format!("{}", id),
};
bail!("logic error: {} is referred but not assigned", name)
}
};
let parent_names: Vec<VertexName> = get_parents_by_name(name.clone())?;
let mut result = Vec::with_capacity(parent_names.len());
for parent_name in parent_names {
if let Some(parent_id) = self.find_id_by_name(parent_name.as_ref())? {
ensure!(
parent_id < id,
"parent {} {:?} should <= {} {:?}",
parent_id,
&parent_name,
id,
&name
);
result.push(parent_id);
} else {
bail!("logic error: ancestor ids must be available");
}
}
Ok(result)
};
func
}
}
impl IdMap {
pub fn remove_non_master(&mut self) -> Result<()> {
self.log.append(IdMap::MAGIC_CLEAR_NON_MASTER)?;
self.need_rebuild_non_master = false;
self.cached_next_free_ids = Default::default();
ensure!(
self.next_free_id(Group::NON_MASTER)? == Group::NON_MASTER.min_id(),
"bug: remove_non_master did not take effect"
);
Ok(())
}
}
impl<'a> SyncableIdMap<'a> {
pub fn sync(&mut self) -> Result<()> {
ensure!(
!self.need_rebuild_non_master,
"bug: cannot sync with re-assigned ids unresolved"
);
self.map.log.sync()?;
Ok(())
}
}
impl fmt::Debug for IdMap {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "IdMap {{\n")?;
for data in self.log.iter() {
if let Ok(mut data) = data {
let id = data.read_u64::<BigEndian>().unwrap();
let mut name = Vec::with_capacity(20);
data.read_to_end(&mut name).unwrap();
let name = if name.len() >= 20 {
VertexName::from(name).to_hex()
} else {
String::from_utf8_lossy(&name).to_string()
};
let id = Id(id);
write!(f, " {}: {},\n", name, id)?;
}
}
write!(f, "}}\n")?;
Ok(())
}
}
impl<'a> Deref for SyncableIdMap<'a> {
type Target = IdMap;
fn deref(&self) -> &Self::Target {
self.map
}
}
impl<'a> DerefMut for SyncableIdMap<'a> {
fn deref_mut(&mut self) -> &mut Self::Target {
self.map
}
}
pub trait IdMapLike {
fn vertex_id(&self, name: VertexName) -> Result<Id>;
fn vertex_name(&self, id: Id) -> Result<VertexName>;
}
impl IdMapLike for IdMap {
fn vertex_id(&self, name: VertexName) -> Result<Id> {
self.find_id_by_name(name.as_ref())?
.ok_or_else(|| format_err!("{:?} not found", name))
}
fn vertex_name(&self, id: Id) -> Result<VertexName> {
self.find_vertex_name_by_id(id)?
.ok_or_else(|| format_err!("{} not found", id))
}
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::tempdir;
#[test]
fn test_basic_operations() {
let dir = tempdir().unwrap();
let mut map = IdMap::open(dir.path()).unwrap();
let mut map = map.prepare_filesystem_sync().unwrap();
assert_eq!(map.next_free_id(Group::MASTER).unwrap().0, 0);
map.insert(Id(1), b"abc").unwrap();
assert_eq!(map.next_free_id(Group::MASTER).unwrap().0, 2);
map.insert(Id(2), b"def").unwrap();
assert_eq!(map.next_free_id(Group::MASTER).unwrap().0, 3);
map.insert(Id(10), b"ghi").unwrap();
assert_eq!(map.next_free_id(Group::MASTER).unwrap().0, 11);
map.insert(Id(11), b"ghi").unwrap_err();
map.insert(Id(10), b"ghi2").unwrap_err();
let id = map.next_free_id(Group::NON_MASTER).unwrap();
map.insert(id, b"jkl").unwrap();
map.insert(id, b"jkl").unwrap();
map.insert(id, b"jkl2").unwrap_err();
map.insert(id + 1, b"jkl2").unwrap();
map.insert(id + 2, b"jkl2").unwrap_err();
map.insert(Id(15), b"jkl2").unwrap();
map.insert(id + 3, b"abc").unwrap_err();
assert_eq!(map.next_free_id(Group::NON_MASTER).unwrap(), id + 2);
assert_eq!(0x6a, b'j');
assert_eq!(
map.find_names_by_hex_prefix(b"6a", 3).unwrap(),
[&b"jkl"[..], b"jkl2"]
);
assert_eq!(
map.find_names_by_hex_prefix(b"6a", 1).unwrap(),
[&b"jkl"[..]]
);
assert!(map.find_names_by_hex_prefix(b"6b", 1).unwrap().is_empty());
for _ in 0..=1 {
assert_eq!(map.find_name_by_id(Id(1)).unwrap().unwrap(), b"abc");
assert_eq!(map.find_name_by_id(Id(2)).unwrap().unwrap(), b"def");
assert!(map.find_name_by_id(Id(3)).unwrap().is_none());
assert_eq!(map.find_name_by_id(Id(10)).unwrap().unwrap(), b"ghi");
assert_eq!(map.find_id_by_name(b"abc").unwrap().unwrap().0, 1);
assert_eq!(map.find_id_by_name(b"def").unwrap().unwrap().0, 2);
assert_eq!(map.find_id_by_name(b"ghi").unwrap().unwrap().0, 10);
assert_eq!(map.find_id_by_name(b"jkl").unwrap().unwrap(), id);
assert_eq!(map.find_id_by_name(b"jkl2").unwrap().unwrap().0, 15);
assert!(map.find_id_by_name(b"jkl3").unwrap().is_none());
map.need_rebuild_non_master = false;
map.sync().unwrap();
}
assert_eq!(
format!("{:?}", map.deref()),
r#"IdMap {
abc: 1,
def: 2,
ghi: 10,
jkl: N0,
jkl2: N1,
jkl2: 15,
}
"#
);
}
}