use crate::catalog::schema::TableSchema;
use crate::catalog::types::Value;
use crate::commit::tx::{ReadBound, ReadKey, ReadRange, ReadRangeEntry, ReadSet, ReadSetEntry};
use crate::storage::encoded_key::EncodedKey;
use crate::storage::keyspace::{KeyspaceSnapshot, TableData};
#[derive(Debug, Default)]
pub struct ReadSetCollector {
set: ReadSet,
}
impl ReadSetCollector {
pub fn new() -> Self {
Self::default()
}
pub fn into_inner(self) -> ReadSet {
self.set
}
pub fn record_point(
&mut self,
snapshot: &KeyspaceSnapshot,
project_id: &str,
scope_id: &str,
table_name: &str,
primary_key: Vec<Value>,
) {
let encoded = EncodedKey::from_values(&primary_key);
let version_at_read = snapshot
.table(project_id, scope_id, table_name)
.and_then(|t| t.row_versions.get(&encoded).copied())
.unwrap_or(0);
self.set.points.push(ReadSetEntry {
key: ReadKey::TableRow {
project_id: project_id.to_string(),
scope_id: scope_id.to_string(),
table_name: table_name.to_string(),
primary_key,
},
version_at_read,
});
}
pub fn record_touched_pks(
&mut self,
snapshot: &KeyspaceSnapshot,
schema: &TableSchema,
project_id: &str,
scope_id: &str,
table_name: &str,
pks: &[EncodedKey],
) {
let table = snapshot.table(project_id, scope_id, table_name);
let pk_column_indices = pk_column_indices_in_schema(schema);
for pk in pks {
let version_at_read = table
.and_then(|t| t.row_versions.get(pk).copied())
.unwrap_or(0);
let primary_key = extract_pk_values(table, pk, &pk_column_indices);
self.set.points.push(ReadSetEntry {
key: ReadKey::TableRow {
project_id: project_id.to_string(),
scope_id: scope_id.to_string(),
table_name: table_name.to_string(),
primary_key,
},
version_at_read,
});
}
}
pub fn record_full_table_scan(
&mut self,
snapshot: &KeyspaceSnapshot,
project_id: &str,
scope_id: &str,
table_name: &str,
) {
let table = snapshot.table(project_id, scope_id, table_name);
let (max_version, structural_version) = match table {
Some(t) => (
t.row_versions.values().copied().max().unwrap_or(0),
t.structural_version,
),
None => (0, 0),
};
self.set.ranges.push(ReadRangeEntry {
range: ReadRange::TableRange {
project_id: project_id.to_string(),
scope_id: scope_id.to_string(),
table_name: table_name.to_string(),
start: ReadBound::Unbounded,
end: ReadBound::Unbounded,
},
max_version_at_read: max_version,
structural_version_at_read: structural_version,
});
}
}
fn pk_column_indices_in_schema(schema: &TableSchema) -> Vec<usize> {
schema
.primary_key
.iter()
.filter_map(|pk| schema.columns.iter().position(|c| c.name == *pk))
.collect()
}
fn extract_pk_values(
table: Option<&TableData>,
pk_encoded: &EncodedKey,
pk_column_indices: &[usize],
) -> Vec<Value> {
if let Some(t) = table
&& let Some(row) = t.rows.get(pk_encoded)
{
return pk_column_indices
.iter()
.filter_map(|i| row.values.get(*i).cloned())
.collect();
}
vec![Value::Blob(pk_encoded.as_slice().to_vec())]
}