use std::collections::HashMap;
use crate::sql::db::table::Table;
use super::{ActiveTxRegistry, MvccClock, TxHandle};
#[derive(Debug)]
pub struct ConcurrentTx {
pub handle: TxHandle,
pub tables: HashMap<String, Table>,
pub tables_at_begin: HashMap<String, Table>,
pub schema_at_begin: Vec<String>,
}
impl ConcurrentTx {
pub fn begin(
clock: &MvccClock,
registry: &ActiveTxRegistry,
live_tables: &HashMap<String, Table>,
) -> Self {
let handle = registry.register(clock);
let tables: HashMap<String, Table> = live_tables
.iter()
.map(|(k, v)| (k.clone(), v.deep_clone()))
.collect();
let tables_at_begin: HashMap<String, Table> = live_tables
.iter()
.map(|(k, v)| (k.clone(), v.deep_clone()))
.collect();
let mut schema_at_begin: Vec<String> = live_tables.keys().cloned().collect();
schema_at_begin.sort();
Self {
handle,
tables,
tables_at_begin,
schema_at_begin,
}
}
pub fn begin_ts(&self) -> u64 {
self.handle.begin_ts()
}
pub fn schema_unchanged(&self, live_tables: &HashMap<String, Table>) -> bool {
let mut current: Vec<&String> = live_tables.keys().collect();
current.sort();
if current.len() != self.schema_at_begin.len() {
return false;
}
current
.iter()
.zip(self.schema_at_begin.iter())
.all(|(a, b)| **a == *b)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::sql::db::table::Table;
use crate::sql::parser::create::CreateQuery;
use std::collections::HashMap;
fn empty_table(name: &str) -> Table {
let _ = name;
use crate::sql::dialect::SqlriteDialect;
use sqlparser::parser::Parser;
let sql = format!(
"CREATE TABLE {name} (id INTEGER PRIMARY KEY, v TEXT);",
name = name,
);
let dialect = SqlriteDialect::new();
let mut ast = Parser::parse_sql(&dialect, &sql).unwrap();
let stmt = ast.pop().unwrap();
let q = CreateQuery::new(&stmt).unwrap();
Table::new(q)
}
fn live_with_one_table(name: &str) -> HashMap<String, Table> {
let mut m = HashMap::new();
m.insert(name.to_string(), empty_table(name));
m
}
#[test]
fn begin_clones_tables_and_advances_clock() {
let clock = MvccClock::new(0);
let registry = ActiveTxRegistry::new();
let live = live_with_one_table("t");
let tx = ConcurrentTx::begin(&clock, ®istry, &live);
assert_eq!(clock.now(), 1);
assert_eq!(tx.begin_ts(), 1);
assert!(tx.tables.contains_key("t"));
assert_eq!(tx.schema_at_begin, vec!["t".to_string()]);
assert_eq!(registry.active_count(), 1);
}
#[test]
fn dropping_tx_unregisters() {
let clock = MvccClock::new(0);
let registry = ActiveTxRegistry::new();
let live = live_with_one_table("t");
let tx = ConcurrentTx::begin(&clock, ®istry, &live);
assert_eq!(registry.active_count(), 1);
drop(tx);
assert_eq!(registry.active_count(), 0);
}
#[test]
fn clone_is_independent_of_live_tables() {
let clock = MvccClock::new(0);
let registry = ActiveTxRegistry::new();
let mut live = live_with_one_table("t");
let tx = ConcurrentTx::begin(&clock, ®istry, &live);
live.insert("u".to_string(), empty_table("u"));
assert_eq!(tx.tables.len(), 1);
assert!(tx.tables.contains_key("t"));
assert!(!tx.tables.contains_key("u"));
assert!(!tx.schema_unchanged(&live));
}
#[test]
fn schema_unchanged_recognises_identical_set() {
let clock = MvccClock::new(0);
let registry = ActiveTxRegistry::new();
let live = live_with_one_table("t");
let tx = ConcurrentTx::begin(&clock, ®istry, &live);
assert!(tx.schema_unchanged(&live));
}
}