use super::*;
use crate::small_string;
use ::sanakirja::*;
use byteorder::{ByteOrder, LittleEndian};
use rand::rngs::ThreadRng;
use rand::thread_rng;
use std::cell::RefCell;
use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::path::Path;
use std::sync::Arc;
pub struct Pristine {
pub env: Arc<::sanakirja::Env<::sanakirja::Exclusive>>,
}
impl Pristine {
pub fn new<P: AsRef<Path>>(name: P) -> Result<Self, anyhow::Error> {
Self::new_with_size(name, 1 << 20)
}
pub unsafe fn new_nolock<P: AsRef<Path>>(name: P) -> Result<Self, anyhow::Error> {
Self::new_with_size_nolock(name, 1 << 20)
}
pub fn new_with_size<P: AsRef<Path>>(name: P, size: u64) -> Result<Self, anyhow::Error> {
let env = sanakirja::Env::try_new(name, size);
match env {
Ok(env) => Ok(Pristine { env: Arc::new(env) }),
Err(sanakirja::Error::IO(e)) => {
if let std::io::ErrorKind::WouldBlock = e.kind() {
Err(crate::Error::PristineLocked.into())
} else {
Err(e.into())
}
}
Err(e) => Err(e.into()),
}
}
pub unsafe fn new_with_size_nolock<P: AsRef<Path>>(
name: P,
size: u64,
) -> Result<Self, anyhow::Error> {
Ok(Pristine {
env: Arc::new(sanakirja::Env::new_nolock(name, size)?),
})
}
pub fn new_anon() -> Result<Self, anyhow::Error> {
Self::new_anon_with_size(1 << 20)
}
pub fn new_anon_with_size(size: u64) -> Result<Self, anyhow::Error> {
Ok(Pristine {
env: Arc::new(sanakirja::Env::new_anon(size)?),
})
}
}
#[derive(Debug, PartialEq, Clone, Copy)]
#[repr(usize)]
enum Root {
Tree,
RevTree,
Inodes,
RevInodes,
Internal,
External,
RevDep,
Channels,
TouchedFiles,
Dep,
RevTouchedFiles,
Partials,
Remotes,
}
impl Pristine {
pub fn txn_begin(&self) -> Result<Txn, anyhow::Error> {
let txn = ::sanakirja::Env::txn_begin(self.env.clone())?;
fn begin(
txn: ::sanakirja::Txn<
::sanakirja::Exclusive,
Arc<::sanakirja::Env<::sanakirja::Exclusive>>,
>,
) -> Option<Txn> {
Some(Txn {
channels: txn.root(Root::Channels as usize)?,
external: txn.root(Root::External as usize)?,
internal: txn.root(Root::Internal as usize)?,
inodes: txn.root(Root::Inodes as usize)?,
revinodes: txn.root(Root::RevInodes as usize)?,
tree: txn.root(Root::Tree as usize)?,
revtree: txn.root(Root::RevTree as usize)?,
revdep: txn.root(Root::RevDep as usize)?,
touched_files: txn.root(Root::TouchedFiles as usize)?,
rev_touched_files: txn.root(Root::RevTouchedFiles as usize)?,
partials: txn.root(Root::Partials as usize)?,
dep: txn.root(Root::Dep as usize)?,
remotes: txn.root(Root::Remotes as usize)?,
rng: thread_rng(),
open_channels: RefCell::new(HashMap::new()),
open_remotes: RefCell::new(HashMap::new()),
txn,
})
}
if let Some(txn) = begin(txn) {
Ok(txn)
} else {
Err(crate::Error::PristineCorrupt.into())
}
}
pub fn mut_txn_begin(&self) -> MutTxn<()> {
let mut txn = ::sanakirja::Env::mut_txn_begin(self.env.clone()).unwrap();
MutTxn {
channels: txn
.root(Root::Channels as usize)
.unwrap_or_else(|| txn.create_db().unwrap()),
external: txn
.root(Root::External as usize)
.unwrap_or_else(|| txn.create_db().unwrap()),
internal: txn
.root(Root::Internal as usize)
.unwrap_or_else(|| txn.create_db().unwrap()),
inodes: txn
.root(Root::Inodes as usize)
.unwrap_or_else(|| txn.create_db().unwrap()),
revinodes: txn
.root(Root::RevInodes as usize)
.unwrap_or_else(|| txn.create_db().unwrap()),
tree: txn
.root(Root::Tree as usize)
.unwrap_or_else(|| txn.create_db().unwrap()),
revtree: txn
.root(Root::RevTree as usize)
.unwrap_or_else(|| txn.create_db().unwrap()),
revdep: txn
.root(Root::RevDep as usize)
.unwrap_or_else(|| txn.create_db().unwrap()),
dep: txn
.root(Root::Dep as usize)
.unwrap_or_else(|| txn.create_db().unwrap()),
touched_files: txn
.root(Root::TouchedFiles as usize)
.unwrap_or_else(|| txn.create_db().unwrap()),
rev_touched_files: txn
.root(Root::RevTouchedFiles as usize)
.unwrap_or_else(|| txn.create_db().unwrap()),
partials: txn
.root(Root::Partials as usize)
.unwrap_or_else(|| txn.create_db().unwrap()),
remotes: txn
.root(Root::Remotes as usize)
.unwrap_or_else(|| txn.create_db().unwrap()),
rng: thread_rng(),
open_channels: RefCell::new(HashMap::new()),
open_remotes: RefCell::new(HashMap::new()),
txn,
}
}
}
type Graph = ::sanakirja::Db<Vertex<ChangeId>, Edge>;
type ChangeSet = ::sanakirja::Db<ChangeId, ApplyTimestamp>;
type RevChangeSet = ::sanakirja::Db<ApplyTimestamp, (ChangeId, Merkle)>;
type ChannelStates = ::sanakirja::Db<Merkle, u64>;
pub type Txn = GenericTxn<
::sanakirja::Txn<::sanakirja::Exclusive, Arc<::sanakirja::Env<::sanakirja::Exclusive>>>,
>;
pub type MutTxn<T> =
GenericTxn<::sanakirja::MutTxn<Arc<::sanakirja::Env<::sanakirja::Exclusive>>, T>>;
pub struct GenericTxn<T: ::sanakirja::Transaction> {
txn: T,
internal: Db<Hash, ChangeId>,
external: Db<ChangeId, Hash>,
inodes: Db<Inode, Position<ChangeId>>,
revinodes: Db<Position<ChangeId>, Inode>,
tree: Db<UnsafePathId, Inode>,
revtree: Db<Inode, UnsafePathId>,
revdep: Db<ChangeId, ChangeId>,
dep: Db<ChangeId, ChangeId>,
touched_files: Db<Position<ChangeId>, ChangeId>,
rev_touched_files: Db<ChangeId, Position<ChangeId>>,
partials: Db<UnsafeSmallStr, Position<ChangeId>>,
channels: Db<UnsafeSmallStr, (Graph, ChangeSet, RevChangeSet, ChannelStates, u64, u64)>,
remotes: Db<UnsafeSmallStr, (Db<u64, (Hash, Merkle)>, Db<Hash, u64>, Db<Merkle, u64>)>,
rng: ThreadRng,
open_channels: RefCell<HashMap<SmallString, ChannelRef<Self>>>,
open_remotes: RefCell<HashMap<SmallString, RemoteRef<Self>>>,
}
unsafe impl<T: ::sanakirja::Transaction> Send for GenericTxn<T> {}
#[derive(Debug)]
pub struct DatabaseReport {
pub refs: usize,
pub stats: ::sanakirja::Statistics,
}
impl Txn {
pub fn check_database(&self) -> DatabaseReport {
let mut refs = HashMap::new();
self.txn.references(&mut refs, self.internal);
info!("refs = {:?}", refs);
self.txn.references(&mut refs, self.external);
info!("refs = {:?}", refs);
self.txn.references(&mut refs, self.inodes);
info!("refs = {:?}", refs);
self.txn.references(&mut refs, self.revinodes);
info!("refs = {:?}", refs);
self.txn.references(&mut refs, self.tree);
info!("refs = {:?}", refs);
self.txn.references(&mut refs, self.revtree);
info!("refs = {:?}", refs);
self.txn.references(&mut refs, self.revdep);
info!("refs = {:?}", refs);
self.txn.references(&mut refs, self.dep);
info!("refs = {:?}", refs);
self.txn.references(&mut refs, self.touched_files);
info!("refs = {:?}", refs);
self.txn.references(&mut refs, self.rev_touched_files);
info!("refs = {:?}", refs);
self.txn.references(&mut refs, self.partials);
info!("refs = {:?}", refs);
self.txn.references(&mut refs, self.channels);
info!("refs = {:?}", refs);
for (a, (g, c, r, s, _, _)) in self.txn.iter(&self.channels, None) {
info!("channel = {:?}", a);
self.txn.references(&mut refs, g);
info!("refs = {:?}", refs);
self.txn.references(&mut refs, c);
info!("refs = {:?}", refs);
self.txn.references(&mut refs, r);
info!("refs = {:?}", refs);
self.txn.references(&mut refs, s);
info!("refs = {:?}", refs);
}
self.txn.references(&mut refs, self.remotes);
info!("refs = {:?}", refs);
for (a, (u, v, w)) in self.txn.iter(&self.remotes, None) {
info!("remote = {:?}", a);
self.txn.references(&mut refs, u);
info!("refs = {:?}", refs);
self.txn.references(&mut refs, v);
info!("refs = {:?}", refs);
self.txn.references(&mut refs, w);
info!("refs = {:?}", refs);
}
let stats = self.txn.statistics();
let report = DatabaseReport {
refs: refs.len(),
stats: stats.clone(),
};
let mut channel_roots: Vec<UnsafeDb> = Vec::new();
for (a, (g, c, r, s, _, _)) in self.txn.iter(&self.channels, None) {
info!("channel: {:?}", a);
unsafe {
channel_roots.push(std::mem::transmute(g));
channel_roots.push(std::mem::transmute(c));
channel_roots.push(std::mem::transmute(r));
channel_roots.push(std::mem::transmute(s));
}
}
::sanakirja::debug_(&self.txn, &channel_roots[..], "debug_sanakirja", true);
self.txn.check_references(&mut refs);
info!("stats = {:?}", stats);
let occupied_pages =
stats.total_pages - stats.free_pages.len() - stats.bookkeeping_pages.len();
for i in 1..(stats.total_pages as u64) {
let p = i * 4096;
if !refs.contains_key(&p)
&& !stats.free_pages.contains(&p)
&& !stats.bookkeeping_pages.contains(&p)
{
panic!("does not contain {:?} ({:?})", i, p);
}
}
for (r, _) in refs.iter() {
if stats.free_pages.contains(r) {
panic!("referenced page is free: {:?}", r);
}
if stats.bookkeeping_pages.contains(r) {
panic!("referenced page is a bookkeeping page: {:?}", r);
}
}
for p in stats.free_pages.iter() {
if stats.bookkeeping_pages.contains(p) {
panic!("bookkeeping inter free: {:?}", p);
}
}
assert_eq!(1 + refs.len(), occupied_pages);
report
}
}
impl<T: ::sanakirja::Transaction> TxnT for GenericTxn<T> {
type Graph = Db<Vertex<ChangeId>, Edge>;
sanakirja_cursor_ref!(graph, Vertex<ChangeId>, Edge);
sanakirja_get!(graph, Vertex<ChangeId>, Edge);
fn get_external(&self, p: ChangeId) -> Option<Hash> {
if p.is_root() {
Some(Hash::None)
} else {
self.txn.get(&self.external, p, None)
}
}
fn get_internal(&self, p: Hash) -> Option<ChangeId> {
if let Hash::None = p {
Some(ChangeId::ROOT)
} else {
self.txn.get(&self.internal, p, None)
}
}
fn hash_from_prefix(&self, s: &str) -> Result<(Hash, ChangeId), anyhow::Error> {
let h = if let Some(h) = Hash::from_prefix(s) {
h
} else {
return Err((crate::Error::ParseError { s: s.to_string() }).into());
};
let mut result = None;
debug!("h = {:?}", h);
for (e, i) in self.txn.iter(&self.internal, Some((h, None))) {
debug!("{:?} {:?}", e, i);
if e < h {
continue;
} else {
let b32 = e.to_base32();
debug!("{:?}", b32);
let (b32, _) = b32.split_at(s.len().min(b32.len()));
if b32 != s {
break;
} else if result.is_none() {
result = Some((e, i))
} else {
return Err((crate::Error::AmbiguousHashPrefix {
prefix: s.to_string(),
})
.into());
}
}
}
if let Some(result) = result {
Ok(result)
} else {
Err((crate::Error::ChangeNotFound {
hash: s.to_string(),
})
.into())
}
}
fn hash_from_prefix_remote<'txn>(
&'txn self,
remote: &RemoteRef<Self>,
s: &str,
) -> Result<Hash, anyhow::Error> {
let remote = remote.borrow();
let h = if let Some(h) = Hash::from_prefix(s) {
h
} else {
return Err((crate::Error::ParseError { s: s.to_string() }).into());
};
let mut result = None;
debug!("h = {:?}", h);
for (e, _) in self.txn.iter(&remote.rev, Some((h, None))) {
debug!("{:?}", e);
if e < h {
continue;
} else {
let b32 = e.to_base32();
debug!("{:?}", b32);
let (b32, _) = b32.split_at(s.len().min(b32.len()));
if b32 != s {
break;
} else if result.is_none() {
result = Some(e)
} else {
return Err((crate::Error::AmbiguousHashPrefix {
prefix: s.to_string(),
})
.into());
}
}
}
if let Some(result) = result {
Ok(result)
} else {
Err((crate::Error::ChangeNotFound {
hash: s.to_string(),
})
.into())
}
}
type Inodes = Db<Inode, Position<ChangeId>>;
type Revinodes = Db<Position<ChangeId>, Inode>;
sanakirja_table_get!(inodes, Inode, Position<ChangeId>);
sanakirja_table_get!(revinodes, Position<ChangeId>, Inode);
sanakirja_cursor!(inodes, Inode, Position<ChangeId>);
#[cfg(debug_assertions)]
sanakirja_cursor!(revinodes, Position<ChangeId>, Inode);
type Tree = Db<UnsafePathId, Inode>;
sanakirja_table_get!(tree, PathId, Inode, (UnsafePathId::from_fileid(key), value),);
sanakirja_iter!(
tree,
OwnedPathId,
Inode,
if let Some((ref k, ref v)) = pos {
info!("tree iter {:?} {:?}", k, v);
Some((UnsafePathId::from_fileid(k.as_file_id()), *v))
} else {
None
},
map(|(k, v): (UnsafePathId, Inode)| (unsafe { k.to_fileid().to_owned() }, v))
);
sanakirja_iter!(
revtree,
Inode,
OwnedPathId,
if let Some((ref k, ref v)) = pos {
let v = if let Some(ref v) = *v {
Some(UnsafePathId::from_fileid(v.as_file_id()))
} else {
None
};
Some((*k, v))
} else {
None
},
map(|(k, v): (Inode, UnsafePathId)| (k, unsafe { v.to_fileid().to_owned() }))
);
type Revtree = Db<Inode, UnsafePathId>;
sanakirja_table_get!(
revtree,
Inode,
PathId,
(
key,
if let Some(value) = value {
Some(UnsafePathId::from_fileid(value))
} else {
None
}
),
map(|value| unsafe { value.to_fileid() })
);
type Changeset = Db<ChangeId, u64>;
type Revchangeset = Db<u64, (ChangeId, Merkle)>;
type Channelstates = Db<Merkle, u64>;
sanakirja_get!(changeset, ChangeId, u64);
sanakirja_get!(revchangeset, u64, (ChangeId, Merkle));
sanakirja_cursor!(changeset, ChangeId, u64);
sanakirja_cursor_ref!(revchangeset, u64, (ChangeId, Merkle));
sanakirja_rev_cursor!(revchangeset, u64, (ChangeId, Merkle));
type Dep = Db<ChangeId, ChangeId>;
type Revdep = Db<ChangeId, ChangeId>;
sanakirja_table_get!(dep, ChangeId, ChangeId);
sanakirja_table_get!(revdep, ChangeId, ChangeId);
sanakirja_cursor_ref!(dep, ChangeId, ChangeId);
sanakirja_table_get!(touched_files, Position<ChangeId>, ChangeId);
sanakirja_table_get!(rev_touched_files, ChangeId, Position<ChangeId>);
fn iter_dep_ref<RT: std::ops::Deref<Target = Self> + Clone>(
txn: RT,
p: ChangeId,
) -> super::Cursor<Self, RT, Self::DepCursor, ChangeId, ChangeId> {
let curs = Self::cursor_dep_ref(txn.clone(), &txn.dep, Some((p, None)));
curs
}
type Touched_files = Db<Position<ChangeId>, ChangeId>;
type Rev_touched_files = Db<ChangeId, Position<ChangeId>>;
sanakirja_iter!(touched_files, Position<ChangeId>, ChangeId);
sanakirja_iter!(rev_touched_files, ChangeId, Position<ChangeId>);
type Partials = Db<UnsafeSmallStr, Position<ChangeId>>;
sanakirja_cursor!(
partials,
SmallString,
Position<ChangeId>,
if let Some((ref k, ref v)) = pos {
Some((UnsafeSmallStr::from_small_str(k.as_small_str()), *v))
} else {
None
},
map(|(k, v): (UnsafeSmallStr, Position<ChangeId>)| (
unsafe { k.to_small_str().to_owned() },
v
))
);
fn load_channel(&self, name: &str) -> Option<ChannelRef<Self>> {
let name = SmallString::from_str(name);
match self.open_channels.borrow_mut().entry(name.clone()) {
Entry::Vacant(v) => {
if let Some((channel, changes, revchanges, states, counter, last_modified)) =
self.txn.get(
&self.channels,
UnsafeSmallStr::from_small_str(name.as_small_str()),
None,
)
{
let r = ChannelRef {
r: Rc::new(RefCell::new(Channel {
graph: channel,
changes,
revchanges,
apply_counter: counter,
states,
name: name.clone(),
last_modified,
})),
};
v.insert(r);
} else {
return None;
}
}
Entry::Occupied(_) => {}
}
self.open_channels.borrow().get(&name).map(|x| x.clone())
}
fn load_remote(&self, name: &str) -> Option<RemoteRef<Self>> {
let name = SmallString::from_str(name);
match self.open_remotes.borrow_mut().entry(name.clone()) {
Entry::Vacant(v) => {
if let Some(remote) = self.txn.get(
&self.remotes,
UnsafeSmallStr::from_small_str(name.as_small_str()),
None,
) {
let r = RemoteRef {
db: Rc::new(RefCell::new(Remote {
remote: remote.0,
rev: remote.1,
states: remote.2,
})),
name: name.clone(),
};
v.insert(r);
} else {
return None;
}
}
Entry::Occupied(_) => {}
}
self.open_remotes.borrow().get(&name).map(|x| x.clone())
}
type Channels = Db<UnsafeSmallStr, (u64, u64, u64, u64, u64, u64)>;
sanakirja_cursor!(
channels,
SmallString,
(u64, u64, u64, u64, u64, u64),
if let Some((ref k, ref v)) = pos {
Some((UnsafeSmallStr::from_small_str(k.as_small_str()), *v))
} else {
None
},
map(|(k, v): (UnsafeSmallStr, (u64, u64, u64, u64, u64, u64))| (
unsafe { k.to_small_str().to_owned() },
v
))
);
fn iter_channels<'txn>(&'txn self, start: &str) -> ChannelIterator<'txn, Self> {
let name = SmallString::from_str(start);
let name = UnsafeSmallStr::from_small_str(name.as_small_str());
ChannelIterator {
cursor: self.txn.set_cursors(&self.channels, Some((name, None))).0,
txn: self,
}
}
type Remotes = Db<UnsafeSmallStr, (u64, u64, u64)>;
sanakirja_cursor!(
remotes,
SmallString,
(u64, u64, u64),
if let Some((ref k, ref v)) = pos {
Some((UnsafeSmallStr::from_small_str(k.as_small_str()), *v))
} else {
None
},
map(|(k, v): (UnsafeSmallStr, (u64, u64, u64))| (
unsafe { k.to_small_str().to_owned() },
v
))
);
fn iter_remotes<'txn>(&'txn self, start: &str) -> RemotesIterator<'txn, Self> {
let name = SmallString::from_str(start);
let name = UnsafeSmallStr::from_small_str(name.as_small_str());
RemotesIterator {
cursor: self.txn.set_cursors(&self.remotes, Some((name, None))).0,
txn: self,
}
}
fn iter_inodes<'txn>(
&'txn self,
) -> super::Cursor<Self, &'txn Self, Self::InodesCursor, Inode, Position<ChangeId>> {
self.cursor_inodes(&self.inodes, None)
}
#[cfg(debug_assertions)]
fn iter_revinodes<'txn>(
&'txn self,
) -> super::Cursor<Self, &'txn Self, Self::RevinodesCursor, Position<ChangeId>, Inode> {
self.cursor_revinodes(&self.revinodes, None)
}
fn iter_revdep<'txn>(
&'txn self,
k: ChangeId,
) -> super::Cursor<Self, &'txn Self, Self::DepCursor, ChangeId, ChangeId> {
self.cursor_dep(&self.revdep, Some((k, None)))
}
fn iter_dep<'txn>(
&'txn self,
k: ChangeId,
) -> super::Cursor<Self, &'txn Self, Self::DepCursor, ChangeId, ChangeId> {
self.cursor_dep(&self.dep, Some((k, None)))
}
fn iter_touched<'txn>(
&'txn self,
k: Position<ChangeId>,
) -> super::Cursor<Self, &'txn Self, Self::Touched_filesCursor, Position<ChangeId>, ChangeId>
{
self.cursor_touched_files(&self.touched_files, Some((k, None)))
}
fn iter_rev_touched<'txn>(
&'txn self,
k: ChangeId,
) -> super::Cursor<Self, &'txn Self, Self::Rev_touched_filesCursor, ChangeId, Position<ChangeId>>
{
self.cursor_rev_touched_files(&self.rev_touched_files, Some((k, None)))
}
fn iter_partials<'txn>(
&'txn self,
k: &str,
) -> super::Cursor<Self, &'txn Self, Self::PartialsCursor, SmallString, Position<ChangeId>>
{
let k0 = SmallString::from_str(k);
self.cursor_partials(&self.partials, Some((k0, None)))
}
type Remote = Db<u64, (Hash, Merkle)>;
type Revremote = Db<Hash, u64>;
type Remotestates = Db<Merkle, u64>;
sanakirja_cursor!(remote, u64, (Hash, Merkle));
sanakirja_rev_cursor!(remote, u64, (Hash, Merkle));
fn iter_remote<'txn>(
&'txn self,
remote: &Self::Remote,
k: u64,
) -> super::Cursor<Self, &'txn Self, Self::RemoteCursor, u64, (Hash, Merkle)> {
self.cursor_remote(remote, Some((k, None)))
}
fn iter_rev_remote<'txn>(
&'txn self,
remote: &Self::Remote,
k: Option<u64>,
) -> super::RevCursor<Self, &'txn Self, Self::RemoteCursor, u64, (Hash, Merkle)> {
self.rev_cursor_remote(remote, k.map(|k| (k, None)))
}
fn get_remote(&mut self, name: &str) -> Option<RemoteRef<Self>> {
let name = SmallString::from_str(name);
match self.open_remotes.borrow_mut().entry(name.clone()) {
Entry::Vacant(v) => {
if let Some(remote) = self.txn.get(
&self.remotes,
UnsafeSmallStr::from_small_str(name.as_small_str()),
None,
) {
let r = RemoteRef {
db: Rc::new(RefCell::new(Remote {
remote: remote.0,
rev: remote.1,
states: remote.2,
})),
name: name.clone(),
};
v.insert(r);
} else {
return None;
}
}
Entry::Occupied(_) => {}
}
self.open_remotes.borrow().get(&name).map(|x| x.clone())
}
fn last_remote(&self, remote: &Self::Remote) -> Option<(u64, (Hash, Merkle))> {
self.txn.rev_iter(remote, None).next()
}
fn get_remote_state(&self, remote: &Self::Remote, n: u64) -> Option<(u64, (Hash, Merkle))> {
self.txn
.iter(remote, Some((n, None)))
.filter(|(k, _)| *k >= n)
.next()
}
fn remote_has_change(&self, remote: &RemoteRef<Self>, hash: Hash) -> bool {
self.txn.get(&remote.db.borrow().rev, hash, None).is_some()
}
fn remote_has_state(&self, remote: &RemoteRef<Self>, m: Merkle) -> bool {
self.txn.get(&remote.db.borrow().states, m, None).is_some()
}
fn channel_has_state(&self, channel: &ChannelRef<Self>, m: Merkle) -> bool {
self.txn.get(&channel.borrow().states, m, None).is_some()
}
}
impl MutTxnT for MutTxn<()> {
sanakirja_put_del!(internal, Hash, ChangeId);
sanakirja_put_del!(external, ChangeId, Hash);
sanakirja_put_del!(inodes, Inode, Position<ChangeId>);
sanakirja_put_del!(revinodes, Position<ChangeId>, Inode);
sanakirja_put_del!(tree, PathId, Inode, UnsafePathId::from_fileid(k), v);
sanakirja_put_del!(revtree, Inode, PathId, k, UnsafePathId::from_fileid(v),);
sanakirja_put_del!(dep, ChangeId, ChangeId);
sanakirja_put_del!(revdep, ChangeId, ChangeId);
sanakirja_put_del!(touched_files, Position<ChangeId>, ChangeId);
sanakirja_put_del!(rev_touched_files, ChangeId, Position<ChangeId>);
fn put_graph(
&mut self,
graph: &mut Self::Graph,
k: Vertex<ChangeId>,
e: Edge,
) -> Result<bool, anyhow::Error> {
Ok(self.txn.put(&mut self.rng, graph, k, e)?)
}
fn del_graph(
&mut self,
graph: &mut Self::Graph,
k: Vertex<ChangeId>,
e: Option<Edge>,
) -> Result<bool, anyhow::Error> {
debug!("del_graph {:?} {:?}", k, e);
Ok(self.txn.del(&mut self.rng, graph, k, e)?)
}
fn put_partials(&mut self, k: &str, e: Position<ChangeId>) -> Result<bool, anyhow::Error> {
let k = SmallString::from_str(k);
Ok(self.txn.put(
&mut self.rng,
&mut self.partials,
UnsafeSmallStr::from_small_str(k.as_small_str()),
e,
)?)
}
fn del_partials(
&mut self,
k: &str,
e: Option<Position<ChangeId>>,
) -> Result<bool, anyhow::Error> {
let k = SmallString::from_str(k);
Ok(self.txn.del(
&mut self.rng,
&mut self.partials,
UnsafeSmallStr::from_small_str(k.as_small_str()),
e,
)?)
}
fn put_changes(
&mut self,
channel: &mut Channel<Self>,
p: ChangeId,
t: ApplyTimestamp,
h: &Hash,
) -> Result<Option<Merkle>, anyhow::Error> {
if self.get_changeset(&channel.changes, p, None).is_none() {
channel.apply_counter += 1;
debug!("put_changes {:?} {:?}", t, p);
let m = if let Some((_, (_, m))) = self.txn.rev_iter(&channel.revchanges, None).next() {
m
} else {
Merkle::zero()
};
let m = m.next(h);
assert!(self
.get_revchangeset(&channel.revchanges, t, None)
.is_none());
assert!(self.txn.put(&mut self.rng, &mut channel.changes, p, t)?);
assert!(self
.txn
.put(&mut self.rng, &mut channel.revchanges, t, (p, m))?);
Ok(Some(m))
} else {
Ok(None)
}
}
fn del_changes(
&mut self,
channel: &mut Channel<Self>,
p: ChangeId,
t: ApplyTimestamp,
) -> Result<bool, anyhow::Error> {
let mut repl = Vec::new();
for (t_, (p, _)) in self.txn.iter(&channel.revchanges, Some((t, None))) {
if t_ >= t {
repl.push((t_, p))
}
}
let mut m = Merkle::zero();
for (t_, (_, m_)) in self.txn.rev_iter(&channel.revchanges, Some((t, None))) {
if t_ < t {
m = m_;
break;
}
}
for (t_, p) in repl.iter() {
debug!("del_changes {:?} {:?}", t_, p);
self.txn
.del(&mut self.rng, &mut channel.revchanges, *t_, None)?;
if *t_ > t {
m = m.next(&self.get_external(*p).unwrap());
self.txn
.put(&mut self.rng, &mut channel.revchanges, *t_, (*p, m))?;
}
}
Ok(self
.txn
.del(&mut self.rng, &mut channel.changes, p, Some(t))?)
}
fn put_remote(
&mut self,
remote: &mut RemoteRef<Self>,
k: u64,
v: (Hash, Merkle),
) -> Result<bool, anyhow::Error> {
let mut remote = remote.borrow_mut();
self.txn.put(&mut self.rng, &mut remote.remote, k, v)?;
self.txn.put(&mut self.rng, &mut remote.states, v.1, k)?;
Ok(self.txn.put(&mut self.rng, &mut remote.rev, v.0, k)?)
}
fn del_remote(&mut self, remote: &mut RemoteRef<Self>, k: u64) -> Result<bool, anyhow::Error> {
let mut remote = remote.borrow_mut();
if let Some((h, m)) = self.txn.get(&remote.remote, k, None) {
self.txn.del(&mut self.rng, &mut remote.rev, h, None)?;
self.txn.del(&mut self.rng, &mut remote.states, m, None)?;
Ok(self.txn.del(&mut self.rng, &mut remote.remote, k, None)?)
} else {
Ok(false)
}
}
fn open_or_create_channel(&mut self, name: &str) -> Result<ChannelRef<Self>, anyhow::Error> {
let name = small_string::SmallString::from_str(name);
let mut commit = None;
match self.open_channels.borrow_mut().entry(name.clone()) {
Entry::Vacant(v) => {
let r = if let Some((
graph,
changes,
revchanges,
states,
apply_counter,
last_modified,
)) = self.txn.get(
&self.channels,
UnsafeSmallStr::from_small_str(name.as_small_str()),
None,
) {
ChannelRef {
r: Rc::new(RefCell::new(Channel {
graph,
changes,
revchanges,
states,
apply_counter,
name: name.clone(),
last_modified,
})),
}
} else {
let br = ChannelRef {
r: Rc::new(RefCell::new(Channel {
graph: self.txn.create_db()?,
changes: self.txn.create_db()?,
revchanges: self.txn.create_db()?,
states: self.txn.create_db()?,
apply_counter: 0,
name: name.clone(),
last_modified: 0,
})),
};
commit = Some(br.clone());
br
};
v.insert(r);
}
Entry::Occupied(_) => {}
}
if let Some(commit) = commit {
self.put_channel(commit.clone())?;
}
Ok(self.open_channels.borrow().get(&name).unwrap().clone())
}
fn fork(
&mut self,
channel: &ChannelRef<Self>,
new_name: &str,
) -> Result<ChannelRef<Self>, anyhow::Error> {
let channel = channel.r.borrow();
let name = SmallString::from_str(new_name);
if self
.txn
.get(
&self.channels,
UnsafeSmallStr::from_small_str(name.as_small_str()),
None,
)
.is_none()
{
let br = ChannelRef {
r: Rc::new(RefCell::new(Channel {
graph: self.txn.fork(&mut self.rng, &channel.graph)?,
changes: self.txn.fork(&mut self.rng, &channel.changes)?,
revchanges: self.txn.fork(&mut self.rng, &channel.revchanges)?,
states: self.txn.fork(&mut self.rng, &channel.states)?,
name: name.clone(),
apply_counter: channel.apply_counter,
last_modified: channel.last_modified,
})),
};
self.open_channels
.borrow_mut()
.insert(name.clone(), br.clone());
Ok(br)
} else {
Err((crate::Error::ChannelNameExists {
name: new_name.to_string(),
})
.into())
}
}
fn rename_channel(
&mut self,
channel: &mut ChannelRef<Self>,
new_name: &str,
) -> Result<(), anyhow::Error> {
let name = SmallString::from_str(new_name);
if self
.txn
.get(
&self.channels,
UnsafeSmallStr::from_small_str(name.as_small_str()),
None,
)
.is_none()
{
std::mem::drop(
self.open_channels
.borrow_mut()
.remove(&channel.borrow().name)
.unwrap(),
);
std::cell::RefCell::borrow_mut(&std::rc::Rc::get_mut(&mut channel.r).unwrap()).name =
name.clone();
self.open_channels
.borrow_mut()
.insert(name, channel.clone());
Ok(())
} else {
Err((crate::Error::ChannelNameExists {
name: new_name.to_string(),
})
.into())
}
}
fn drop_channel(&mut self, name: &str) -> Result<bool, anyhow::Error> {
let name = SmallString::from_str(name);
self.open_channels.borrow_mut().remove(&name);
debug!("drop_channel {:?}", name);
let result = self.txn.del(
&mut self.rng,
&mut self.channels,
UnsafeSmallStr::from_small_str(name.as_small_str()),
None,
)?;
debug!("/drop_channel {:?}: {:?}", name, result);
Ok(result)
}
fn open_or_create_remote(&mut self, name: &str) -> Result<RemoteRef<Self>, anyhow::Error> {
let name = small_string::SmallString::from_str(name);
let mut commit = None;
match self.open_remotes.borrow_mut().entry(name.clone()) {
Entry::Vacant(v) => {
let r = if let Some(remote) = self.txn.get(
&self.remotes,
UnsafeSmallStr::from_small_str(name.as_small_str()),
None,
) {
RemoteRef {
db: Rc::new(RefCell::new(Remote {
remote: remote.0,
rev: remote.1,
states: remote.2,
})),
name: name.clone(),
}
} else {
let br = RemoteRef {
db: Rc::new(RefCell::new(Remote {
remote: self.txn.create_db()?,
rev: self.txn.create_db()?,
states: self.txn.create_db()?,
})),
name: name.clone(),
};
commit = Some(br.clone());
br
};
v.insert(r);
}
Entry::Occupied(_) => {}
}
if let Some(commit) = commit {
self.put_remotes(commit.clone())?;
}
Ok(self.open_remotes.borrow().get(&name).unwrap().clone())
}
fn drop_remote(&mut self, remote: RemoteRef<Self>) -> Result<bool, anyhow::Error> {
let name = remote.name.clone();
let r = self.open_remotes.borrow_mut().remove(&name).unwrap();
std::mem::drop(remote);
assert_eq!(Rc::strong_count(&r.db), 1);
Ok(self.txn.del(
&mut self.rng,
&mut self.remotes,
UnsafeSmallStr::from_small_str(name.as_small_str()),
None,
)?)
}
fn drop_named_remote(&mut self, name: &str) -> Result<bool, anyhow::Error> {
let name = SmallString::from_str(name);
if let Some(r) = self.open_remotes.borrow_mut().remove(&name) {
assert_eq!(Rc::strong_count(&r.db), 1);
}
Ok(self.txn.del(
&mut self.rng,
&mut self.remotes,
UnsafeSmallStr::from_small_str(name.as_small_str()),
None,
)?)
}
fn commit(mut self) -> Result<(), anyhow::Error> {
use std::ops::DerefMut;
{
let open_channels =
std::mem::replace(self.open_channels.borrow_mut().deref_mut(), HashMap::new());
for (name, channel) in open_channels {
debug!("commit_channel {:?}", name);
self.commit_channel(channel)?
}
}
{
let open_remotes =
std::mem::replace(self.open_remotes.borrow_mut().deref_mut(), HashMap::new());
for (_, remote) in open_remotes {
self.commit_remote(remote)?
}
}
self.txn.set_root(Root::Tree as usize, self.tree);
self.txn.set_root(Root::RevTree as usize, self.revtree);
self.txn.set_root(Root::Inodes as usize, self.inodes);
self.txn.set_root(Root::RevInodes as usize, self.revinodes);
self.txn.set_root(Root::Internal as usize, self.internal);
self.txn.set_root(Root::External as usize, self.external);
self.txn.set_root(Root::RevDep as usize, self.revdep);
self.txn.set_root(Root::Channels as usize, self.channels);
self.txn.set_root(Root::Remotes as usize, self.remotes);
self.txn
.set_root(Root::TouchedFiles as usize, self.touched_files);
self.txn.set_root(Root::Dep as usize, self.dep);
self.txn
.set_root(Root::RevTouchedFiles as usize, self.rev_touched_files);
self.txn.set_root(Root::Partials as usize, self.partials);
self.txn.commit()?;
Ok(())
}
}
impl Txn {
pub fn load_const_channel(&self, name: &str) -> Option<Channel<Self>> {
let name = SmallString::from_str(name);
if let Some((channel, changes, revchanges, states, counter, last_modified)) = self.txn.get(
&self.channels,
UnsafeSmallStr::from_small_str(name.as_small_str()),
None,
) {
Some(Channel {
graph: channel,
changes: changes,
revchanges: revchanges,
states,
apply_counter: counter,
name: name.clone(),
last_modified,
})
} else {
None
}
}
}
impl<T> MutTxn<T> {
fn put_channel(&mut self, channel: ChannelRef<Self>) -> Result<(), anyhow::Error> {
debug!("Commit_channel. This is not too safe.");
let channel = channel.r.try_borrow()?;
let mut dbs_channels: ::sanakirja::Db<UnsafeSmallStr, (u64, u64, u64, u64, u64, u64)> =
unsafe { std::mem::transmute(self.channels) };
debug!("Commit_channel, dbs_channels = {:?}", dbs_channels);
self.txn.del(
&mut self.rng,
&mut dbs_channels,
UnsafeSmallStr::from_small_str(channel.name.as_small_str()),
None,
)?;
debug!("Commit_channel, dbs_channels = {:?}", dbs_channels);
self.channels = unsafe { std::mem::transmute(dbs_channels) };
self.txn.put(
&mut self.rng,
&mut self.channels,
UnsafeSmallStr::from_small_str(channel.name.as_small_str()),
(
channel.graph,
channel.changes,
channel.revchanges,
channel.states,
channel.apply_counter,
channel.last_modified,
),
)?;
debug!("Commit_channel, self.dbs.channels = {:?}", self.channels);
Ok(())
}
fn commit_channel(&mut self, channel: ChannelRef<Self>) -> Result<(), anyhow::Error> {
debug!("Commit_channel. This is not too safe.");
std::mem::drop(
self.open_channels
.borrow_mut()
.remove(&channel.r.borrow().name),
);
self.put_channel(channel)
}
fn put_remotes(&mut self, remote: RemoteRef<Self>) -> Result<(), anyhow::Error> {
let mut dbs_remotes: ::sanakirja::Db<UnsafeSmallStr, (u64, u64, u64)> =
unsafe { std::mem::transmute(self.remotes) };
debug!("Commit_remote, dbs_remotes = {:?}", dbs_remotes);
self.txn.del(
&mut self.rng,
&mut dbs_remotes,
UnsafeSmallStr::from_small_str(remote.name.as_small_str()),
None,
)?;
debug!("Commit_remote, dbs_remotes = {:?}", dbs_remotes);
self.remotes = unsafe { std::mem::transmute(dbs_remotes) };
let r = remote.db.borrow();
self.txn.put(
&mut self.rng,
&mut self.remotes,
UnsafeSmallStr::from_small_str(remote.name.as_small_str()),
(r.remote, r.rev, r.states),
)?;
debug!("Commit_remote, self.dbs.remotes = {:?}", self.remotes);
Ok(())
}
fn commit_remote(&mut self, remote: RemoteRef<Self>) -> Result<(), anyhow::Error> {
std::mem::drop(self.open_remotes.borrow_mut().remove(&remote.name));
self.put_remotes(remote)
}
}
const CHANGE_ID_SIZE: usize = 8;
impl Representable for ChangeId {
fn alignment() -> Alignment {
Alignment::B8
}
fn onpage_size(&self) -> u16 {
CHANGE_ID_SIZE as u16
}
unsafe fn write_value(&self, p: *mut u8) {
LittleEndian::write_u64(std::slice::from_raw_parts_mut(p, 8), self.0)
}
unsafe fn read_value(p: *const u8) -> Self {
ChangeId(LittleEndian::read_u64(std::slice::from_raw_parts(p, 8)))
}
unsafe fn cmp_value<T>(&self, _: &T, x: Self) -> std::cmp::Ordering {
self.0.cmp(&x.0)
}
type PageOffsets = std::iter::Empty<u64>;
fn page_offsets(&self) -> Self::PageOffsets {
std::iter::empty()
}
}
const VERTEX_SIZE: usize = CHANGE_ID_SIZE + 16;
impl Representable for Vertex<ChangeId> {
fn alignment() -> Alignment {
Alignment::B1
}
fn onpage_size(&self) -> u16 {
VERTEX_SIZE as u16
}
unsafe fn write_value(&self, p: *mut u8) {
let p = std::slice::from_raw_parts_mut(p, VERTEX_SIZE);
LittleEndian::write_u64(p, self.change.0);
LittleEndian::write_u64(&mut p[CHANGE_ID_SIZE..], self.start.0);
LittleEndian::write_u64(&mut p[CHANGE_ID_SIZE + 8..], self.end.0);
}
unsafe fn read_value(p: *const u8) -> Self {
let p = std::slice::from_raw_parts(p, VERTEX_SIZE);
let change = LittleEndian::read_u64(p);
let start = LittleEndian::read_u64(&p[CHANGE_ID_SIZE..]);
let end = LittleEndian::read_u64(&p[CHANGE_ID_SIZE + 8..]);
Vertex {
change: ChangeId(change),
start: ChangePosition(start),
end: ChangePosition(end),
}
}
unsafe fn cmp_value<T>(&self, _: &T, x: Self) -> std::cmp::Ordering {
self.cmp(&x)
}
type PageOffsets = std::iter::Empty<u64>;
fn page_offsets(&self) -> Self::PageOffsets {
std::iter::empty()
}
}
impl Representable for Position<ChangeId> {
fn alignment() -> Alignment {
Alignment::B1
}
fn onpage_size(&self) -> u16 {
(CHANGE_ID_SIZE + 8) as u16
}
unsafe fn write_value(&self, p: *mut u8) {
let p = std::slice::from_raw_parts_mut(p, CHANGE_ID_SIZE + 8);
LittleEndian::write_u64(p, self.change.0);
LittleEndian::write_u64(&mut p[CHANGE_ID_SIZE..], self.pos.0);
}
unsafe fn read_value(p: *const u8) -> Self {
let p = std::slice::from_raw_parts(p, CHANGE_ID_SIZE + 8);
let change = LittleEndian::read_u64(p);
let pos = LittleEndian::read_u64(&p[CHANGE_ID_SIZE..]);
Position {
change: ChangeId(change),
pos: ChangePosition(pos),
}
}
unsafe fn cmp_value<T>(&self, _: &T, x: Self) -> std::cmp::Ordering {
self.cmp(&x)
}
type PageOffsets = std::iter::Empty<u64>;
fn page_offsets(&self) -> Self::PageOffsets {
std::iter::empty()
}
}
impl Representable for Edge {
fn alignment() -> Alignment {
Alignment::B1
}
fn onpage_size(&self) -> u16 {
25
}
unsafe fn write_value(&self, p: *mut u8) {
let s = std::slice::from_raw_parts_mut(p, 25);
s[0] = (*self).flag.bits();
LittleEndian::write_u64(&mut s[1..], (*self).dest.change.0);
LittleEndian::write_u64(&mut s[9..], (*self).dest.pos.0);
LittleEndian::write_u64(&mut s[17..], (*self).introduced_by.0);
}
unsafe fn read_value(p: *const u8) -> Self {
let s = std::slice::from_raw_parts(p, 25);
Edge {
flag: if let Some(b) = EdgeFlags::from_bits(s[0]) {
b
} else {
panic!("read_value, edge = {:?}", s);
},
dest: Position {
change: ChangeId(LittleEndian::read_u64(&s[1..])),
pos: ChangePosition(LittleEndian::read_u64(&s[9..])),
},
introduced_by: ChangeId(LittleEndian::read_u64(&s[17..])),
}
}
unsafe fn cmp_value<T>(&self, _: &T, x: Self) -> std::cmp::Ordering {
let a: &Edge = self;
let b: &Edge = &x;
a.cmp(b)
}
type PageOffsets = std::iter::Empty<u64>;
fn page_offsets(&self) -> Self::PageOffsets {
std::iter::empty()
}
}
#[derive(Clone, Copy, Debug)]
pub struct UnsafePathId {
parent_inode: Inode,
basename: UnsafeSmallStr,
}
impl UnsafePathId {
pub fn from_fileid(f: PathId) -> UnsafePathId {
UnsafePathId {
parent_inode: f.parent_inode,
basename: UnsafeSmallStr::from_small_str(f.basename),
}
}
pub unsafe fn to_fileid<'a>(&self) -> PathId<'a> {
PathId {
parent_inode: self.parent_inode,
basename: self.basename.to_small_str(),
}
}
}
impl Representable for UnsafePathId {
fn alignment() -> Alignment {
Alignment::B1
}
fn onpage_size(&self) -> u16 {
INODE_SIZE + self.basename.onpage_size()
}
unsafe fn write_value(&self, p: *mut u8) {
self.parent_inode.write_value(p);
self.basename.write_value(p.offset(INODE_SIZE as isize));
}
unsafe fn read_value(p: *const u8) -> Self {
UnsafePathId {
parent_inode: Inode::read_value(p),
basename: UnsafeSmallStr::read_value(p.offset(INODE_SIZE as isize)),
}
}
unsafe fn cmp_value<T>(&self, _: &T, x: Self) -> std::cmp::Ordering {
let a: PathId = self.to_fileid();
let b: PathId = x.to_fileid();
a.cmp(&b)
}
type PageOffsets = std::iter::Empty<u64>;
fn page_offsets(&self) -> Self::PageOffsets {
std::iter::empty()
}
}
const INODE_SIZE: u16 = 8;
impl Representable for Inode {
fn alignment() -> Alignment {
Alignment::B8
}
fn onpage_size(&self) -> u16 {
INODE_SIZE
}
unsafe fn write_value(&self, p: *mut u8) {
LittleEndian::write_u64(std::slice::from_raw_parts_mut(p, 8), self.0)
}
unsafe fn read_value(p: *const u8) -> Self {
Inode(LittleEndian::read_u64(std::slice::from_raw_parts(p, 8)))
}
unsafe fn cmp_value<T>(&self, _: &T, x: Self) -> std::cmp::Ordering {
self.0.cmp(&x.0)
}
type PageOffsets = std::iter::Empty<u64>;
fn page_offsets(&self) -> Self::PageOffsets {
std::iter::empty()
}
}
impl Representable for Hash {
fn alignment() -> Alignment {
Alignment::B1
}
fn onpage_size(&self) -> u16 {
1 + (match *self {
Hash::Blake3(_) => 32,
Hash::None => 0,
})
}
unsafe fn write_value(&self, p: *mut u8) {
match *self {
Hash::Blake3(q) => {
*p = HashAlgorithm::Blake3 as u8;
std::ptr::copy(q.as_ptr(), p.offset(1), 32)
}
Hash::None => *p = HashAlgorithm::None as u8,
}
}
unsafe fn read_value(p: *const u8) -> Self {
assert!(*p <= HashAlgorithm::Blake3 as u8);
match std::mem::transmute(*p) {
HashAlgorithm::Blake3 => {
let mut h = [0; BLAKE3_BYTES];
std::ptr::copy(p.offset(1), h.as_mut_ptr(), BLAKE3_BYTES);
Hash::Blake3(h)
}
HashAlgorithm::None => Hash::None,
}
}
unsafe fn cmp_value<T>(&self, _: &T, x: Self) -> std::cmp::Ordering {
self.cmp(&x)
}
type PageOffsets = std::iter::Empty<u64>;
fn page_offsets(&self) -> Self::PageOffsets {
std::iter::empty()
}
}
impl Representable for Merkle {
fn alignment() -> Alignment {
Alignment::B1
}
fn onpage_size(&self) -> u16 {
33
}
unsafe fn write_value(&self, p: *mut u8) {
match *self {
Merkle::Ed25519(q) => {
*p = MerkleAlgorithm::Ed25519 as u8;
assert_eq!(*p, 1);
let q = q.compress();
let q = q.as_bytes();
std::ptr::copy(q.as_ptr(), p.offset(1), 32);
}
}
}
unsafe fn read_value(p: *const u8) -> Self {
assert_eq!(*p, MerkleAlgorithm::Ed25519 as u8);
let slice = std::slice::from_raw_parts(p.offset(1), 32);
Merkle::Ed25519(
curve25519_dalek::edwards::CompressedEdwardsY::from_slice(slice)
.decompress()
.unwrap(),
)
}
unsafe fn cmp_value<T>(&self, _: &T, x: Self) -> std::cmp::Ordering {
self.to_bytes().cmp(&x.to_bytes())
}
type PageOffsets = std::iter::Empty<u64>;
fn page_offsets(&self) -> Self::PageOffsets {
std::iter::empty()
}
}