use super::address::*;
use super::cksumvfs;
use super::index;
use super::xid::*;
use serde::{Deserialize, Serialize};
use std::borrow::Cow;
use std::path::Path;
pub struct SendLog {
db_conn: rusqlite::Connection,
}
pub struct SendLogSession<'a> {
gc_generation: Xid,
session_id: Xid,
tx_active: bool,
log: &'a mut SendLog,
}
#[derive(Serialize, Deserialize)]
pub struct StatCacheEntry<'a, 'b, 'c> {
pub addresses: Cow<'a, [Address]>,
pub data_cursors: Cow<'b, [index::RelativeDataCursor]>,
pub hashes: Cow<'c, [index::ContentCryptoHash]>,
}
const SCHEMA_VERSION: i64 = 6;
impl SendLog {
pub fn open(p: &Path) -> Result<SendLog, anyhow::Error> {
let mut db_conn = rusqlite::Connection::open(p)?;
cksumvfs::enable_sqlite_checksums(&db_conn)?;
db_conn.execute("pragma cache_size = -20000;", [])?;
db_conn.query_row("pragma locking_mode = EXCLUSIVE;", [], |_r| Ok(()))?;
db_conn.busy_timeout(std::time::Duration::new(7 * 24 * 60 * 60, 0))?;
let needs_init = match db_conn.query_row(
"select Value from LogMeta where Key = 'schema-version';",
[],
|r| {
let v: i64 = r.get(0)?;
Ok(v)
},
) {
Ok(v) => v != SCHEMA_VERSION,
Err(rusqlite::Error::SqliteFailure(err, _))
if err.code == rusqlite::ErrorCode::SystemIoFailure =>
{
db_conn.query_row("pragma checksum_verification=OFF;", [], |_r| Ok(()))?;
db_conn.execute("vacuum;", [])?;
db_conn.query_row("pragma checksum_verification=ON;", [], |_r| Ok(()))?;
true
}
Err(rusqlite::Error::SqliteFailure(err, _))
if err.code == rusqlite::ErrorCode::DatabaseBusy
|| err.code == rusqlite::ErrorCode::DatabaseLocked =>
{
anyhow::bail!("send log is busy")
}
Err(_) => true,
};
if needs_init {
let tx = db_conn.transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)?;
tx.execute("drop table if exists LogMeta;", [])?;
tx.execute("drop table if exists Sent;", [])?;
tx.execute("drop table if exists StatCache;", [])?;
tx.execute(
"create table LogMeta(Key primary key, Value) without rowid;",
[],
)?;
tx.execute(
"insert into LogMeta(Key, Value) values('schema-version', ?);",
[&SCHEMA_VERSION],
)?;
tx.execute(
"insert into LogMeta(Key, Value) values('sequence-number', 1);",
[],
)?;
tx.execute(
"create table Sent(Address primary key, GCGeneration, LatestSessionId, ItemId) without rowid;",
[],
)?;
tx.execute(
"create table StatCache(Hash primary key, Cached, GCGeneration, LatestSessionId, ItemId) without rowid; ",
[],
)?;
tx.commit()?;
db_conn.execute("vacuum;", [])?;
let integrity_check = db_conn.query_row("pragma integrity_check;", [], |r| {
let v: String = r.get(0)?;
Ok(v)
})?;
if integrity_check != "ok" {
anyhow::bail!("send log integrity check failed")
};
}
let tx = db_conn.transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)?;
let sequence_number = match tx.query_row(
"select Value from LogMeta where Key = 'sequence-number';",
[],
|r| r.get(0),
) {
Ok(n) => {
tx.execute(
"update LogMeta set Value = Value + 1 where Key = 'sequence-number';",
[],
)?;
n
}
Err(rusqlite::Error::QueryReturnedNoRows) => {
tx.execute(
"insert into LogMeta(Key, Value) values('sequence-number', 1);",
[],
)?;
1
}
Err(err) => return Err(err.into()),
};
tx.commit()?;
if sequence_number % 10 == 0 {
db_conn.execute("vacuum;", [])?;
}
Ok(SendLog { db_conn })
}
pub fn session(&mut self, gc_generation: Xid) -> Result<SendLogSession, anyhow::Error> {
self.db_conn.execute("begin immediate;", [])?;
Ok(SendLogSession {
gc_generation,
session_id: Xid::new(),
log: self,
tx_active: true,
})
}
pub fn last_send_id(&self) -> Result<Option<Xid>, anyhow::Error> {
match self.db_conn.query_row(
"select value from LogMeta where key = 'last-send-id';",
[],
|r| {
let send_id: Xid = r.get(0)?;
Ok(send_id)
},
) {
Ok(send_id) => Ok(Some(send_id)),
Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
Err(err) => Err(err.into()),
}
}
}
impl<'a> SendLogSession<'a> {
pub fn last_send_id(&self) -> Result<Option<Xid>, anyhow::Error> {
self.log.last_send_id()
}
pub fn perform_cache_invalidations(&self, had_send_id: bool) -> Result<(), anyhow::Error> {
if !self.tx_active {
anyhow::bail!("no active transaction");
};
let last_send_id = self.last_send_id()?;
if had_send_id {
self.log.db_conn.execute(
"delete from Sent where (GCGeneration != ?) and (ItemId != ?);",
rusqlite::params![self.gc_generation, last_send_id.unwrap()],
)?;
self.log.db_conn.execute(
"delete from StatCache where (GCGeneration != ?) and (ItemId != ?);",
rusqlite::params![self.gc_generation, last_send_id.unwrap()],
)?;
} else {
self.log.db_conn.execute(
"delete from Sent where GCGeneration != ?;",
[self.gc_generation],
)?;
self.log.db_conn.execute(
"delete from StatCache where GCGeneration != ?;",
[self.gc_generation],
)?;
}
Ok(())
}
pub fn add_address(&self, addr: &Address) -> Result<(), anyhow::Error> {
if !self.tx_active {
anyhow::bail!("no active transaction");
};
let mut stmt = self.log.db_conn.prepare_cached(concat!(
"insert into Sent(GCGeneration, LatestSessionId, Address) values(?, ?, ?)",
"on conflict(Address) do update set LatestSessionId=excluded.LatestSessionId;",
))?;
stmt.execute(rusqlite::params![
self.gc_generation,
self.session_id,
&addr.bytes[..]
])?;
Ok(())
}
pub fn add_address_if_cached(&self, addr: &Address) -> Result<bool, anyhow::Error> {
if !self.tx_active {
anyhow::bail!("no active transaction");
};
let mut stmt = self
.log
.db_conn
.prepare_cached("update Sent set LatestSessionId = ? where Address = ? returning 1;")?;
let updated = match stmt
.query_row(rusqlite::params![self.session_id, &addr.bytes[..]], |_r| {
Ok(())
}) {
Ok(_) => true,
Err(rusqlite::Error::QueryReturnedNoRows) => false,
Err(err) => return Err(err.into()),
};
Ok(updated)
}
pub fn cached_address(&self, addr: &Address) -> Result<bool, anyhow::Error> {
let mut stmt = self
.log
.db_conn
.prepare_cached("select 1 from Sent where Address = ?;")?;
let hit = match stmt.query_row([&addr.bytes[..]], |_r| Ok(())) {
Ok(_) => true,
Err(rusqlite::Error::QueryReturnedNoRows) => false,
Err(err) => return Err(err.into()),
};
Ok(hit)
}
pub fn add_stat_cache_data(
&self,
hash: &[u8],
data: &StatCacheEntry,
) -> Result<(), anyhow::Error> {
if !self.tx_active {
anyhow::bail!("no active transaction");
};
let mut stmt = self.log.db_conn.prepare_cached(
"insert into StatCache(GCGeneration, LatestSessionId, Hash, Cached) Values(?1, ?2, ?3, ?4) \
on conflict(Hash) do update set LatestSessionId = ?2;"
)?;
stmt.execute(rusqlite::params![
self.gc_generation,
self.session_id,
hash,
serde_bare::to_vec(data)?,
])?;
Ok(())
}
pub fn stat_cache_lookup_and_update(
&self,
hash: &[u8],
) -> Result<Option<StatCacheEntry<'static, 'static, 'static>>, anyhow::Error> {
let mut stmt = self.log.db_conn.prepare_cached(
"update StatCache set LatestSessionId = ?1 where Hash = ?2 returning Cached;",
)?;
match stmt.query_row(rusqlite::params![self.session_id, hash], |r| {
let data: Vec<u8> = r.get(0)?;
Ok(data)
}) {
Ok(cached) => {
let entry: StatCacheEntry = serde_bare::from_slice(&cached)?;
for address in entry.addresses.iter() {
self.add_address(address)?;
}
Ok(Some(entry))
}
Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
Err(err) => Err(err.into()),
}
}
pub fn checkpoint(&mut self) -> Result<(), anyhow::Error> {
if !self.tx_active {
anyhow::bail!("no active transaction");
};
self.tx_active = false;
self.log
.db_conn
.execute("commit;", [])
.map_err(|err| anyhow::format_err!("unable to checkpoint send log changes: {}", err))?;
self.log.db_conn.execute("begin immediate;", [])?;
self.tx_active = true;
Ok(())
}
pub fn commit(&mut self, id: &Xid) -> Result<(), anyhow::Error> {
if !self.tx_active {
anyhow::bail!("no active transaction");
};
self.log.db_conn.execute(
"delete from StatCache where LatestSessionId != ?;",
[&self.session_id],
)?;
self.log.db_conn.execute(
"delete from Sent where LatestSessionId != ?;",
[&self.session_id],
)?;
self.log.db_conn.execute(
"update StatCache set ItemId = ? where LatestSessionId = ?;",
[id, &self.session_id],
)?;
self.log.db_conn.execute(
"update Sent set ItemId = ? where LatestSessionId = ?;",
[id, &self.session_id],
)?;
self.log.db_conn.execute(
"insert or replace into LogMeta(Key, Value) Values('last-send-id', ?);",
[id],
)?;
self.tx_active = false;
self.log
.db_conn
.execute("commit;", [])
.map_err(|err| anyhow::format_err!("unable to commit send log changes: {}", err))?;
Ok(())
}
}
impl<'a> Drop for SendLogSession<'a> {
fn drop(&mut self) {
if self.tx_active {
self.log.db_conn.execute("rollback;", []).unwrap();
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::path::PathBuf;
#[test]
fn cache_commit() {
let tmp_dir = tempfile::tempdir().unwrap();
let log_path = {
let mut d = std::path::PathBuf::from(tmp_dir.path());
d.push("send.log");
d
};
let gc_generation = Xid::new();
let id = Xid::new();
let addr = Address::default();
let mut sendlog = SendLog::open(&log_path).unwrap();
{
let mut session = sendlog.session(gc_generation).unwrap();
assert!(!session.cached_address(&addr).unwrap());
assert!(!session
.stat_cache_lookup_and_update(&[32; 0])
.unwrap()
.is_some());
session.add_address(&addr).unwrap();
session
.add_stat_cache_data(
&[32; 0],
&StatCacheEntry {
addresses: Cow::Owned(vec![]),
data_cursors: Cow::Owned(vec![]),
hashes: Cow::Owned(vec![]),
},
)
.unwrap();
assert!(session.cached_address(&addr).unwrap());
assert!(session
.stat_cache_lookup_and_update(&[32; 0])
.unwrap()
.is_some());
session.commit(&id).unwrap();
}
drop(sendlog);
let mut sendlog = SendLog::open(&log_path).unwrap();
{
let session = sendlog.session(gc_generation).unwrap();
assert_eq!(session.last_send_id().unwrap(), Some(id));
session.perform_cache_invalidations(false).unwrap();
assert!(session.cached_address(&addr).unwrap());
assert!(session
.stat_cache_lookup_and_update(&[32; 0])
.unwrap()
.is_some());
}
drop(sendlog);
let mut sendlog = SendLog::open(&log_path).unwrap();
{
let session = sendlog.session(gc_generation).unwrap();
assert_eq!(session.last_send_id().unwrap(), Some(id));
session.perform_cache_invalidations(true).unwrap();
assert!(session.cached_address(&addr).unwrap());
assert!(session
.stat_cache_lookup_and_update(&[32; 0])
.unwrap()
.is_some());
}
drop(sendlog);
let mut sendlog = SendLog::open(&log_path).unwrap();
{
let session = sendlog.session(Xid::new()).unwrap();
assert_eq!(session.last_send_id().unwrap(), Some(id));
session.perform_cache_invalidations(false).unwrap();
assert!(!session.cached_address(&addr).unwrap());
assert!(!session
.stat_cache_lookup_and_update(&[32; 0])
.unwrap()
.is_some());
}
drop(sendlog);
}
#[test]
fn cache_checkpoint() {
let tmp_dir = tempfile::tempdir().unwrap();
let log_path = {
let mut d = PathBuf::from(tmp_dir.path());
d.push("send.log");
d
};
let gc_generation = Xid::new();
let addr = Address::default();
let mut sendlog = SendLog::open(&log_path).unwrap();
{
let mut session = sendlog.session(gc_generation).unwrap();
session.add_address(&addr).unwrap();
session
.add_stat_cache_data(
&[32; 0],
&StatCacheEntry {
addresses: Cow::Owned(vec![]),
data_cursors: Cow::Owned(vec![]),
hashes: Cow::Owned(vec![]),
},
)
.unwrap();
session.checkpoint().unwrap();
}
drop(sendlog);
let mut sendlog = SendLog::open(&log_path).unwrap();
{
let session = sendlog.session(gc_generation).unwrap();
session.perform_cache_invalidations(false).unwrap();
assert!(session.cached_address(&addr).unwrap());
assert!(session
.stat_cache_lookup_and_update(&[32; 0])
.unwrap()
.is_some());
}
drop(sendlog);
let mut sendlog = SendLog::open(&log_path).unwrap();
{
let session = sendlog.session(gc_generation).unwrap();
session.perform_cache_invalidations(false).unwrap();
assert!(session.cached_address(&addr).unwrap());
assert!(session
.stat_cache_lookup_and_update(&[32; 0])
.unwrap()
.is_some());
}
drop(sendlog);
let mut sendlog = SendLog::open(&log_path).unwrap();
{
let session = sendlog.session(Xid::new()).unwrap();
session.perform_cache_invalidations(false).unwrap();
assert!(!session.cached_address(&addr).unwrap());
assert!(!session
.stat_cache_lookup_and_update(&[32; 0])
.unwrap()
.is_some());
}
drop(sendlog);
}
#[test]
fn cache_commit_then_checkpoint() {
let tmp_dir = tempfile::tempdir().unwrap();
let log_path = {
let mut d = PathBuf::from(tmp_dir.path());
d.push("send.log");
d
};
let gc_generation = Xid::new();
let id = Xid::new();
let addr = Address::default();
let mut sendlog = SendLog::open(&log_path).unwrap();
{
let mut session = sendlog.session(gc_generation).unwrap();
session.add_address(&addr).unwrap();
session
.add_stat_cache_data(
&[32; 0],
&StatCacheEntry {
addresses: Cow::Owned(vec![]),
data_cursors: Cow::Owned(vec![]),
hashes: Cow::Owned(vec![]),
},
)
.unwrap();
session.commit(&id).unwrap();
}
drop(sendlog);
let mut sendlog = SendLog::open(&log_path).unwrap();
{
let mut session = sendlog.session(gc_generation).unwrap();
session.add_address(&addr).unwrap();
session
.add_stat_cache_data(
&[32; 0],
&StatCacheEntry {
addresses: Cow::Owned(vec![]),
data_cursors: Cow::Owned(vec![]),
hashes: Cow::Owned(vec![]),
},
)
.unwrap();
session.checkpoint().unwrap();
}
drop(sendlog);
let mut sendlog = SendLog::open(&log_path).unwrap();
{
let mut session = sendlog.session(gc_generation).unwrap();
session.checkpoint().unwrap();
}
drop(sendlog);
let mut sendlog = SendLog::open(&log_path).unwrap();
{
let session = sendlog.session(gc_generation).unwrap();
assert!(session.cached_address(&addr).unwrap());
assert!(session
.stat_cache_lookup_and_update(&[32; 0])
.unwrap()
.is_some());
session.perform_cache_invalidations(true).unwrap();
assert!(session.cached_address(&addr).unwrap());
assert!(session
.stat_cache_lookup_and_update(&[32; 0])
.unwrap()
.is_some());
session.perform_cache_invalidations(false).unwrap();
assert!(session.cached_address(&addr).unwrap());
assert!(session
.stat_cache_lookup_and_update(&[32; 0])
.unwrap()
.is_some());
}
drop(sendlog);
let mut sendlog = SendLog::open(&log_path).unwrap();
{
let session = sendlog.session(Xid::new()).unwrap();
session.perform_cache_invalidations(false).unwrap();
assert!(!session.cached_address(&addr).unwrap());
assert!(!session
.stat_cache_lookup_and_update(&[32; 0])
.unwrap()
.is_some());
}
drop(sendlog);
}
}