use std::collections::{BTreeMap, BTreeSet};
use fsqlite_types::{
BtreeRef, ColumnIdx, CommitSeq, IntentFootprint, IntentOp, IntentOpKind, PageNumber,
RebaseExpr, SemanticKeyKind, SemanticKeyRef, SqliteValue, StructuralEffects,
};
use crate::physical_merge::StructuredPagePatch;
#[derive(Debug, Clone, PartialEq)]
pub enum CompressedVersionData {
FullImage(Vec<u8>),
IntentLogPatch(Vec<IntentOp>),
StructuredPatch(StructuredPagePatch),
}
#[derive(Debug, Clone, PartialEq)]
pub struct CompressedPageVersion {
pub commit_seq: CommitSeq,
pub data: CompressedVersionData,
}
#[derive(Debug, Clone, PartialEq)]
pub struct CompressedPageHistory {
pub pgno: PageNumber,
pub versions: Vec<CompressedPageVersion>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum HistoryCompressionError {
EmptyHistory,
NewestNotFullImage,
DecodeError(String),
}
impl std::fmt::Display for HistoryCompressionError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::EmptyHistory => f.write_str("cannot compress empty page history"),
Self::NewestNotFullImage => f.write_str("newest version must be a full page image"),
Self::DecodeError(msg) => write!(f, "ECS decode error: {msg}"),
}
}
}
impl std::error::Error for HistoryCompressionError {}
#[allow(clippy::module_name_repetitions)]
pub fn compress_page_history(
pgno: PageNumber,
full_images: &[(CommitSeq, Vec<u8>)],
) -> Result<CompressedPageHistory, HistoryCompressionError> {
if full_images.is_empty() {
return Err(HistoryCompressionError::EmptyHistory);
}
let mut versions = Vec::with_capacity(full_images.len());
versions.push(CompressedPageVersion {
commit_seq: full_images[0].0,
data: CompressedVersionData::FullImage(full_images[0].1.clone()),
});
for &(seq, ref _page_bytes) in &full_images[1..] {
versions.push(CompressedPageVersion {
commit_seq: seq,
data: CompressedVersionData::IntentLogPatch(Vec::new()),
});
}
Ok(CompressedPageHistory { pgno, versions })
}
const TAG_FULL_IMAGE: u8 = 0;
const TAG_INTENT_LOG_PATCH: u8 = 1;
const TAG_STRUCTURED_PATCH: u8 = 2;
impl CompressedPageHistory {
#[must_use]
pub fn to_bytes(&self) -> Vec<u8> {
let mut buf = Vec::with_capacity(128);
buf.extend_from_slice(&self.pgno.get().to_le_bytes());
#[allow(clippy::cast_possible_truncation)]
let version_count = self.versions.len() as u32;
buf.extend_from_slice(&version_count.to_le_bytes());
for v in &self.versions {
buf.extend_from_slice(&v.commit_seq.get().to_le_bytes());
match &v.data {
CompressedVersionData::FullImage(img) => {
buf.push(TAG_FULL_IMAGE);
#[allow(clippy::cast_possible_truncation)]
let len = img.len() as u32;
buf.extend_from_slice(&len.to_le_bytes());
buf.extend_from_slice(img);
}
CompressedVersionData::IntentLogPatch(ops) => {
buf.push(TAG_INTENT_LOG_PATCH);
let payload = canonical_intent_ops_bytes(ops);
#[allow(clippy::cast_possible_truncation)]
let len = payload.len() as u32;
buf.extend_from_slice(&len.to_le_bytes());
buf.extend_from_slice(&payload);
}
CompressedVersionData::StructuredPatch(_patch) => {
buf.push(TAG_STRUCTURED_PATCH);
buf.extend_from_slice(&0u32.to_le_bytes());
}
}
}
buf
}
pub fn from_bytes(data: &[u8]) -> Result<Self, HistoryCompressionError> {
let err = |msg: &str| HistoryCompressionError::DecodeError(msg.to_owned());
if data.len() < 8 {
return Err(err("buffer too short for header"));
}
let pgno_raw = u32::from_le_bytes(
data[..4]
.try_into()
.map_err(|_| err("pgno decode failed"))?,
);
let pgno = PageNumber::new(pgno_raw).ok_or_else(|| err("page number 0 is invalid"))?;
let version_count = u32::from_le_bytes(
data[4..8]
.try_into()
.map_err(|_| err("version count decode failed"))?,
);
let mut offset = 8usize;
let mut versions = Vec::with_capacity(version_count as usize);
for _ in 0..version_count {
if offset + 9 > data.len() {
return Err(err("truncated version entry"));
}
let commit_seq = CommitSeq::new(u64::from_le_bytes(
data[offset..offset + 8]
.try_into()
.map_err(|_| err("commit_seq decode"))?,
));
offset += 8;
let tag = data[offset];
offset += 1;
if offset + 4 > data.len() {
return Err(err("truncated payload length"));
}
let payload_len = u32::from_le_bytes(
data[offset..offset + 4]
.try_into()
.map_err(|_| err("payload_len decode"))?,
) as usize;
offset += 4;
if offset + payload_len > data.len() {
return Err(err("truncated payload"));
}
let payload = &data[offset..offset + payload_len];
offset += payload_len;
let version_data = match tag {
TAG_FULL_IMAGE => CompressedVersionData::FullImage(payload.to_vec()),
TAG_INTENT_LOG_PATCH => {
let _ = payload; CompressedVersionData::IntentLogPatch(Vec::new())
}
TAG_STRUCTURED_PATCH => {
CompressedVersionData::StructuredPatch(StructuredPagePatch::default())
}
other => return Err(err(&format!("unknown version data tag: {other}"))),
};
versions.push(CompressedPageVersion {
commit_seq,
data: version_data,
});
}
Ok(Self { pgno, versions })
}
}
fn canonical_intent_ops_bytes(ops: &[IntentOp]) -> Vec<u8> {
let mut buf = Vec::with_capacity(ops.len() * 32);
#[allow(clippy::cast_possible_truncation)]
let count = ops.len() as u32;
buf.extend_from_slice(&count.to_le_bytes());
for op in ops {
buf.extend_from_slice(&canonical_intent_bytes(op));
}
buf
}
#[must_use]
pub fn compute_op_digest(op: &IntentOp) -> [u8; 16] {
let mut hasher = blake3::Hasher::new();
hasher.update(b"fsqlite:intent:v1");
let canonical = canonical_intent_bytes(op);
hasher.update(&canonical);
let hash = hasher.finalize();
let mut digest = [0u8; 16];
digest.copy_from_slice(&hash.as_bytes()[..16]);
digest
}
#[must_use]
pub fn compute_footprint_digest(footprints: &[&IntentFootprint]) -> [u8; 16] {
let mut hasher = blake3::Hasher::new();
hasher.update(b"fsqlite:footprint:v1");
for fp in footprints {
#[allow(clippy::cast_possible_truncation)]
let reads_len = fp.reads.len() as u32;
hasher.update(&reads_len.to_le_bytes());
for r in &fp.reads {
hasher.update(&r.key_digest);
}
#[allow(clippy::cast_possible_truncation)]
let writes_len = fp.writes.len() as u32;
hasher.update(&writes_len.to_le_bytes());
for w in &fp.writes {
hasher.update(&w.key_digest);
}
hasher.update(&fp.structural.bits().to_le_bytes());
}
let hash = hasher.finalize();
let mut digest = [0u8; 16];
digest.copy_from_slice(&hash.as_bytes()[..16]);
digest
}
#[must_use]
pub fn are_intent_ops_independent(a: &IntentOp, b: &IntentOp) -> bool {
if a.schema_epoch != b.schema_epoch {
return false;
}
if a.footprint.structural != StructuralEffects::NONE
|| b.footprint.structural != StructuralEffects::NONE
{
return false;
}
if let Some(independent) = check_update_expression_pair(&a.op, &b.op) {
return independent;
}
let writes_a: BTreeSet<&[u8; 16]> = a.footprint.writes.iter().map(|w| &w.key_digest).collect();
let writes_b: BTreeSet<&[u8; 16]> = b.footprint.writes.iter().map(|w| &w.key_digest).collect();
let reads_a: BTreeSet<&[u8; 16]> = a.footprint.reads.iter().map(|r| &r.key_digest).collect();
let reads_b: BTreeSet<&[u8; 16]> = b.footprint.reads.iter().map(|r| &r.key_digest).collect();
if !writes_a.is_disjoint(&writes_b) {
return false;
}
if !writes_a.is_disjoint(&reads_b) {
return false;
}
if !writes_b.is_disjoint(&reads_a) {
return false;
}
true
}
fn check_update_expression_pair(a: &IntentOpKind, b: &IntentOpKind) -> Option<bool> {
match (a, b) {
(
IntentOpKind::UpdateExpression {
table: ta,
key: ka,
column_updates: cols_a,
},
IntentOpKind::UpdateExpression {
table: tb,
key: kb,
column_updates: cols_b,
},
) => {
if ta != tb || ka != kb {
return None; }
let written_a: BTreeSet<ColumnIdx> = cols_a.iter().map(|(c, _)| *c).collect();
let written_b: BTreeSet<ColumnIdx> = cols_b.iter().map(|(c, _)| *c).collect();
if written_a.is_disjoint(&written_b) {
return Some(true);
}
let overlap: BTreeSet<ColumnIdx> =
written_a.intersection(&written_b).copied().collect();
let all_join_max = overlap.iter().all(|col_idx| {
let a_expr = cols_a.iter().find(|(c, _)| c == col_idx).map(|(_, e)| e);
let b_expr = cols_b.iter().find(|(c, _)| c == col_idx).map(|(_, e)| e);
match (a_expr, b_expr) {
(Some(ea), Some(eb)) => {
is_join_max_int_update(*col_idx, ea) && is_join_max_int_update(*col_idx, eb)
}
_ => false,
}
});
Some(all_join_max)
}
(
IntentOpKind::UpdateExpression {
table: ta, key: ka, ..
},
IntentOpKind::Update {
table: tb, key: kb, ..
},
)
| (
IntentOpKind::Update {
table: tb, key: kb, ..
},
IntentOpKind::UpdateExpression {
table: ta, key: ka, ..
},
)
| (
IntentOpKind::UpdateExpression {
table: ta, key: ka, ..
},
IntentOpKind::Delete {
table: tb, key: kb, ..
},
)
| (
IntentOpKind::Delete {
table: tb, key: kb, ..
},
IntentOpKind::UpdateExpression {
table: ta, key: ka, ..
},
) => {
if ta == tb && ka == kb {
Some(false)
} else {
None
}
}
_ => None,
}
}
#[must_use]
pub fn is_join_max_int_update(col_idx: ColumnIdx, expr: &RebaseExpr) -> bool {
if let RebaseExpr::FunctionCall { name, args } = expr {
if name.eq_ignore_ascii_case("MAX") && args.len() == 2 {
return is_column_ref_and_int_literal(col_idx, &args[0], &args[1])
|| is_column_ref_and_int_literal(col_idx, &args[1], &args[0]);
}
}
false
}
fn is_column_ref_and_int_literal(
col_idx: ColumnIdx,
maybe_col: &RebaseExpr,
maybe_lit: &RebaseExpr,
) -> bool {
matches!(maybe_col, RebaseExpr::ColumnRef(c) if *c == col_idx)
&& matches!(maybe_lit, RebaseExpr::Literal(SqliteValue::Integer(_)))
}
#[must_use]
pub fn extract_join_max_constant(col_idx: ColumnIdx, expr: &RebaseExpr) -> Option<i64> {
if let RebaseExpr::FunctionCall { name, args } = expr {
if name.eq_ignore_ascii_case("MAX") && args.len() == 2 {
return extract_int_constant_pair(col_idx, &args[0], &args[1])
.or_else(|| extract_int_constant_pair(col_idx, &args[1], &args[0]));
}
}
None
}
fn extract_int_constant_pair(
col_idx: ColumnIdx,
maybe_col: &RebaseExpr,
maybe_lit: &RebaseExpr,
) -> Option<i64> {
if matches!(maybe_col, RebaseExpr::ColumnRef(c) if *c == col_idx) {
if let RebaseExpr::Literal(SqliteValue::Integer(c)) = maybe_lit {
return Some(*c);
}
}
None
}
#[must_use]
pub fn collapse_join_max_updates(col_idx: ColumnIdx, exprs: &[&RebaseExpr]) -> Option<RebaseExpr> {
let constants: Vec<i64> = exprs
.iter()
.filter_map(|e| extract_join_max_constant(col_idx, e))
.collect();
if constants.is_empty() {
return None;
}
let max_c = constants
.into_iter()
.max()
.expect("non-empty checked above");
Some(RebaseExpr::FunctionCall {
name: "MAX".to_owned(),
args: vec![
RebaseExpr::ColumnRef(col_idx),
RebaseExpr::Literal(SqliteValue::Integer(max_c)),
],
})
}
#[must_use]
pub fn is_mergeable_intent(op: &IntentOp) -> bool {
if op.footprint.structural != StructuralEffects::NONE {
return false;
}
matches!(
op.op,
IntentOpKind::Insert { .. }
| IntentOpKind::Delete { .. }
| IntentOpKind::Update { .. }
| IntentOpKind::UpdateExpression { .. }
| IntentOpKind::IndexInsert { .. }
| IntentOpKind::IndexDelete { .. }
)
}
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
struct FoataSortKey {
btree_id: u64,
kind: u8,
key_digest: [u8; 16],
op_kind_tag: u8,
op_digest: [u8; 16],
}
fn op_sort_components(op: &IntentOp) -> FoataSortKey {
let (btree_id, kind, key_digest) = match &op.op {
IntentOpKind::Insert { table, key, .. }
| IntentOpKind::Delete { table, key }
| IntentOpKind::Update { table, key, .. }
| IntentOpKind::UpdateExpression { table, key, .. } => {
let btree = BtreeRef::Table(*table);
let digest = SemanticKeyRef::compute_digest(
SemanticKeyKind::TableRow,
btree,
&key.get().to_le_bytes(),
);
(u64::from(table.get()), 0u8, digest)
}
IntentOpKind::IndexInsert { index, key, .. }
| IntentOpKind::IndexDelete { index, key, .. } => {
let btree = BtreeRef::Index(*index);
let digest = SemanticKeyRef::compute_digest(SemanticKeyKind::IndexEntry, btree, key);
(u64::from(index.get()), 1u8, digest)
}
};
let op_kind_tag = match &op.op {
IntentOpKind::Insert { .. } => 0,
IntentOpKind::Delete { .. } => 1,
IntentOpKind::Update { .. } => 2,
IntentOpKind::UpdateExpression { .. } => 3,
IntentOpKind::IndexInsert { .. } => 4,
IntentOpKind::IndexDelete { .. } => 5,
};
FoataSortKey {
btree_id,
kind,
key_digest,
op_kind_tag,
op_digest: compute_op_digest(op),
}
}
#[must_use]
pub fn foata_normal_form(ops: &[IntentOp]) -> Vec<[u8; 16]> {
let n = ops.len();
if n == 0 {
return Vec::new();
}
let mut in_degree = vec![0u32; n];
let mut successors: Vec<Vec<usize>> = vec![Vec::new(); n];
for i in 0..n {
for j in (i + 1)..n {
if !are_intent_ops_independent(&ops[i], &ops[j]) {
successors[i].push(j);
in_degree[j] += 1;
}
}
}
let mut result = Vec::with_capacity(n);
let mut remaining_in_degree = in_degree;
loop {
let mut layer: Vec<usize> = (0..n).filter(|&i| remaining_in_degree[i] == 0).collect();
for &i in &layer {
remaining_in_degree[i] = u32::MAX;
}
if layer.is_empty() {
break;
}
let mut sort_keys: Vec<(FoataSortKey, usize)> = layer
.iter()
.map(|&i| (op_sort_components(&ops[i]), i))
.collect();
sort_keys.sort_by(|a, b| a.0.cmp(&b.0));
layer = sort_keys.into_iter().map(|(_, i)| i).collect();
for &i in &layer {
result.push(compute_op_digest(&ops[i]));
}
for &i in &layer {
for &j in &successors[i] {
if remaining_in_degree[j] != u32::MAX {
remaining_in_degree[j] -= 1;
}
}
}
}
result
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
pub enum MergeKind {
Rebase,
StructuredPatch,
RebaseAndPatch,
}
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub struct MergeCertificatePostState {
pub page_hashes: Vec<(PageNumber, [u8; 16])>,
pub btree_invariant_hash: [u8; 16],
}
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub struct MergeCertificate {
pub merge_kind: MergeKind,
pub base_commit_seq: u64,
pub schema_epoch: u64,
pub pages: Vec<PageNumber>,
pub intent_op_digests: Vec<[u8; 16]>,
pub footprint_digest: [u8; 16],
pub normal_form: Vec<[u8; 16]>,
pub post_state: MergeCertificatePostState,
pub verifier_version: u32,
}
pub const VERIFIER_VERSION: u32 = 1;
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum CertificateVerificationError {
OpDigestMismatch {
expected: Vec<[u8; 16]>,
actual: Vec<[u8; 16]>,
},
FootprintDigestMismatch {
expected: [u8; 16],
actual: [u8; 16],
},
InvalidNormalForm,
PageHashMismatch {
page: PageNumber,
expected: [u8; 16],
actual: [u8; 16],
},
BtreeInvariantHashMismatch {
expected: [u8; 16],
actual: [u8; 16],
},
}
impl std::fmt::Display for CertificateVerificationError {
#[allow(clippy::too_many_lines)]
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::OpDigestMismatch { .. } => f.write_str("op digest mismatch in merge certificate"),
Self::FootprintDigestMismatch { .. } => {
f.write_str("footprint digest mismatch in merge certificate")
}
Self::InvalidNormalForm => f.write_str("invalid normal form in merge certificate"),
Self::PageHashMismatch { page, .. } => {
write!(f, "page hash mismatch for page {page} in merge certificate")
}
Self::BtreeInvariantHashMismatch { .. } => {
f.write_str("B-tree invariant hash mismatch in merge certificate")
}
}
}
}
impl std::error::Error for CertificateVerificationError {}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct CircuitBreakerEvent {
pub error: CertificateVerificationError,
pub certificate_digest: [u8; 16],
pub disable_safe_merge: bool,
}
pub fn generate_merge_certificate(
merge_kind: MergeKind,
base_commit_seq: u64,
schema_epoch: u64,
intent_ops: &[IntentOp],
affected_pages: &[(PageNumber, Vec<u8>)],
btree_invariant_hash: [u8; 16],
) -> Result<MergeCertificate, CertificateVerificationError> {
let intent_op_digests: Vec<[u8; 16]> = intent_ops.iter().map(compute_op_digest).collect();
let footprints: Vec<&IntentFootprint> = intent_ops.iter().map(|op| &op.footprint).collect();
let footprint_digest = compute_footprint_digest(&footprints);
let normal_form = foata_normal_form(intent_ops);
let page_hashes: Vec<(PageNumber, [u8; 16])> = affected_pages
.iter()
.map(|(pgno, bytes)| {
let hash = blake3::hash(bytes);
let mut truncated = [0u8; 16];
truncated.copy_from_slice(&hash.as_bytes()[..16]);
(*pgno, truncated)
})
.collect();
let pages: Vec<PageNumber> = affected_pages.iter().map(|(pgno, _)| *pgno).collect();
Ok(MergeCertificate {
merge_kind,
base_commit_seq,
schema_epoch,
pages,
intent_op_digests,
footprint_digest,
normal_form,
post_state: MergeCertificatePostState {
page_hashes,
btree_invariant_hash,
},
verifier_version: VERIFIER_VERSION,
})
}
pub fn verify_merge_certificate(
intent_ops: &[IntentOp],
post_merge_pages: &[(PageNumber, Vec<u8>)],
btree_invariant_hash: [u8; 16],
certificate: &MergeCertificate,
) -> Result<(), CertificateVerificationError> {
let mut recomputed_digests: Vec<[u8; 16]> = intent_ops.iter().map(compute_op_digest).collect();
let mut cert_digests = certificate.intent_op_digests.clone();
recomputed_digests.sort_unstable();
cert_digests.sort_unstable();
if recomputed_digests != cert_digests {
return Err(CertificateVerificationError::OpDigestMismatch {
expected: cert_digests,
actual: recomputed_digests,
});
}
let footprints: Vec<&IntentFootprint> = intent_ops.iter().map(|op| &op.footprint).collect();
let recomputed_fp_digest = compute_footprint_digest(&footprints);
if recomputed_fp_digest != certificate.footprint_digest {
return Err(CertificateVerificationError::FootprintDigestMismatch {
expected: certificate.footprint_digest,
actual: recomputed_fp_digest,
});
}
let expected_normal_form = foata_normal_form(intent_ops);
if expected_normal_form != certificate.normal_form {
return Err(CertificateVerificationError::InvalidNormalForm);
}
let recomputed_page_hashes: BTreeMap<PageNumber, [u8; 16]> = post_merge_pages
.iter()
.map(|(pgno, bytes)| {
let hash = blake3::hash(bytes);
let mut truncated = [0u8; 16];
truncated.copy_from_slice(&hash.as_bytes()[..16]);
(*pgno, truncated)
})
.collect();
for &(pgno, expected_hash) in &certificate.post_state.page_hashes {
let actual_hash = recomputed_page_hashes
.get(&pgno)
.copied()
.unwrap_or([0u8; 16]);
if actual_hash != expected_hash {
return Err(CertificateVerificationError::PageHashMismatch {
page: pgno,
expected: expected_hash,
actual: actual_hash,
});
}
}
if btree_invariant_hash != certificate.post_state.btree_invariant_hash {
return Err(CertificateVerificationError::BtreeInvariantHashMismatch {
expected: certificate.post_state.btree_invariant_hash,
actual: btree_invariant_hash,
});
}
Ok(())
}
#[must_use]
pub fn circuit_breaker_check(
verification_error: CertificateVerificationError,
certificate: &MergeCertificate,
) -> CircuitBreakerEvent {
let mut cert_buf = Vec::with_capacity(128);
cert_buf.extend_from_slice(&certificate.base_commit_seq.to_le_bytes());
cert_buf.extend_from_slice(&certificate.schema_epoch.to_le_bytes());
cert_buf.extend_from_slice(&certificate.footprint_digest);
for digest in &certificate.normal_form {
cert_buf.extend_from_slice(digest);
}
let hash = blake3::hash(&cert_buf);
let mut cert_digest = [0u8; 16];
cert_digest.copy_from_slice(&hash.as_bytes()[..16]);
CircuitBreakerEvent {
error: verification_error,
certificate_digest: cert_digest,
disable_safe_merge: true,
}
}
fn canonical_intent_bytes(op: &IntentOp) -> Vec<u8> {
let mut buf = Vec::with_capacity(64);
buf.extend_from_slice(&op.schema_epoch.to_le_bytes());
canonical_footprint_bytes(&mut buf, &op.footprint);
canonical_op_kind_bytes(&mut buf, &op.op);
buf
}
fn canonical_footprint_bytes(buf: &mut Vec<u8>, fp: &IntentFootprint) {
#[allow(clippy::cast_possible_truncation)]
let reads_len = fp.reads.len() as u32;
buf.extend_from_slice(&reads_len.to_le_bytes());
for r in &fp.reads {
buf.extend_from_slice(&r.key_digest);
}
#[allow(clippy::cast_possible_truncation)]
let writes_len = fp.writes.len() as u32;
buf.extend_from_slice(&writes_len.to_le_bytes());
for w in &fp.writes {
buf.extend_from_slice(&w.key_digest);
}
buf.extend_from_slice(&fp.structural.bits().to_le_bytes());
}
#[allow(clippy::too_many_lines)]
fn canonical_op_kind_bytes(buf: &mut Vec<u8>, op: &IntentOpKind) {
match op {
IntentOpKind::Insert { table, key, record } => {
buf.push(0);
buf.extend_from_slice(&table.get().to_le_bytes());
buf.extend_from_slice(&key.get().to_le_bytes());
#[allow(clippy::cast_possible_truncation)]
let len = record.len() as u32;
buf.extend_from_slice(&len.to_le_bytes());
buf.extend_from_slice(record);
}
IntentOpKind::Delete { table, key } => {
buf.push(1);
buf.extend_from_slice(&table.get().to_le_bytes());
buf.extend_from_slice(&key.get().to_le_bytes());
}
IntentOpKind::Update {
table,
key,
new_record,
} => {
buf.push(2);
buf.extend_from_slice(&table.get().to_le_bytes());
buf.extend_from_slice(&key.get().to_le_bytes());
#[allow(clippy::cast_possible_truncation)]
let len = new_record.len() as u32;
buf.extend_from_slice(&len.to_le_bytes());
buf.extend_from_slice(new_record);
}
IntentOpKind::IndexInsert { index, key, rowid } => {
buf.push(3);
buf.extend_from_slice(&index.get().to_le_bytes());
#[allow(clippy::cast_possible_truncation)]
let len = key.len() as u32;
buf.extend_from_slice(&len.to_le_bytes());
buf.extend_from_slice(key);
buf.extend_from_slice(&rowid.get().to_le_bytes());
}
IntentOpKind::IndexDelete { index, key, rowid } => {
buf.push(4);
buf.extend_from_slice(&index.get().to_le_bytes());
#[allow(clippy::cast_possible_truncation)]
let len = key.len() as u32;
buf.extend_from_slice(&len.to_le_bytes());
buf.extend_from_slice(key);
buf.extend_from_slice(&rowid.get().to_le_bytes());
}
IntentOpKind::UpdateExpression {
table,
key,
column_updates,
} => {
buf.push(5);
buf.extend_from_slice(&table.get().to_le_bytes());
buf.extend_from_slice(&key.get().to_le_bytes());
#[allow(clippy::cast_possible_truncation)]
let len = column_updates.len() as u32;
buf.extend_from_slice(&len.to_le_bytes());
for (col, expr) in column_updates {
buf.extend_from_slice(&col.get().to_le_bytes());
canonical_rebase_expr_bytes(buf, expr);
}
}
}
}
fn canonical_rebase_expr_bytes(buf: &mut Vec<u8>, expr: &RebaseExpr) {
match expr {
RebaseExpr::ColumnRef(col) => {
buf.push(0);
buf.extend_from_slice(&col.get().to_le_bytes());
}
RebaseExpr::Literal(val) => {
buf.push(1);
canonical_sqlite_value_bytes(buf, val);
}
RebaseExpr::UnaryOp { op, operand } => {
buf.push(2);
buf.push(*op as u8);
canonical_rebase_expr_bytes(buf, operand);
}
RebaseExpr::BinaryOp { op, left, right } => {
buf.push(3);
buf.push(*op as u8);
canonical_rebase_expr_bytes(buf, left);
canonical_rebase_expr_bytes(buf, right);
}
RebaseExpr::FunctionCall { name, args } => {
buf.push(4);
#[allow(clippy::cast_possible_truncation)]
let name_len = name.len() as u32;
buf.extend_from_slice(&name_len.to_le_bytes());
buf.extend_from_slice(name.as_bytes());
#[allow(clippy::cast_possible_truncation)]
let args_len = args.len() as u32;
buf.extend_from_slice(&args_len.to_le_bytes());
for arg in args {
canonical_rebase_expr_bytes(buf, arg);
}
}
RebaseExpr::Cast { expr, type_name } => {
buf.push(5);
canonical_rebase_expr_bytes(buf, expr);
#[allow(clippy::cast_possible_truncation)]
let tn_len = type_name.len() as u32;
buf.extend_from_slice(&tn_len.to_le_bytes());
buf.extend_from_slice(type_name.as_bytes());
}
RebaseExpr::Case {
operand,
when_clauses,
else_clause,
} => {
buf.push(6);
if let Some(op) = operand {
buf.push(1);
canonical_rebase_expr_bytes(buf, op);
} else {
buf.push(0);
}
#[allow(clippy::cast_possible_truncation)]
let when_len = when_clauses.len() as u32;
buf.extend_from_slice(&when_len.to_le_bytes());
for (when, then) in when_clauses {
canonical_rebase_expr_bytes(buf, when);
canonical_rebase_expr_bytes(buf, then);
}
if let Some(el) = else_clause {
buf.push(1);
canonical_rebase_expr_bytes(buf, el);
} else {
buf.push(0);
}
}
RebaseExpr::Coalesce(args) => {
buf.push(7);
#[allow(clippy::cast_possible_truncation)]
let args_len = args.len() as u32;
buf.extend_from_slice(&args_len.to_le_bytes());
for arg in args {
canonical_rebase_expr_bytes(buf, arg);
}
}
RebaseExpr::NullIf { left, right } => {
buf.push(8);
canonical_rebase_expr_bytes(buf, left);
canonical_rebase_expr_bytes(buf, right);
}
RebaseExpr::Concat { left, right } => {
buf.push(9);
canonical_rebase_expr_bytes(buf, left);
canonical_rebase_expr_bytes(buf, right);
}
}
}
fn canonical_sqlite_value_bytes(buf: &mut Vec<u8>, val: &SqliteValue) {
match val {
SqliteValue::Null => buf.push(0),
SqliteValue::Integer(i) => {
buf.push(1);
buf.extend_from_slice(&i.to_le_bytes());
}
SqliteValue::Float(f) => {
buf.push(2);
buf.extend_from_slice(&f.to_le_bytes());
}
SqliteValue::Text(s) => {
buf.push(3);
#[allow(clippy::cast_possible_truncation)]
let len = s.len() as u32;
buf.extend_from_slice(&len.to_le_bytes());
buf.extend_from_slice(s.as_bytes());
}
SqliteValue::Blob(b) => {
buf.push(4);
#[allow(clippy::cast_possible_truncation)]
let len = b.len() as u32;
buf.extend_from_slice(&len.to_le_bytes());
buf.extend_from_slice(b);
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use fsqlite_types::{
BtreeRef, ColumnIdx, CommitSeq, IndexId, IntentFootprint, IntentOp, IntentOpKind,
PageNumber, RebaseExpr, RowId, SemanticKeyKind, SemanticKeyRef, SqliteValue,
StructuralEffects, TableId,
};
fn make_op(schema_epoch: u64, kind: IntentOpKind) -> IntentOp {
IntentOp {
schema_epoch,
footprint: IntentFootprint::empty(),
op: kind,
}
}
fn make_op_with_writes(
schema_epoch: u64,
kind: IntentOpKind,
writes: Vec<SemanticKeyRef>,
) -> IntentOp {
IntentOp {
schema_epoch,
footprint: IntentFootprint {
reads: Vec::new(),
writes,
structural: StructuralEffects::NONE,
},
op: kind,
}
}
fn table_key(table_id: u32, rowid: i64) -> SemanticKeyRef {
SemanticKeyRef::new(
BtreeRef::Table(TableId::new(table_id)),
SemanticKeyKind::TableRow,
&rowid.to_le_bytes(),
)
}
#[test]
fn test_page_history_newest_full_image_older_patches() {
let pgno = PageNumber::new(5).unwrap();
let images = vec![
(CommitSeq::new(100), vec![0xAA; 4096]),
(CommitSeq::new(50), vec![0xBB; 4096]),
(CommitSeq::new(10), vec![0xCC; 4096]),
];
let compressed = compress_page_history(pgno, &images).unwrap();
assert_eq!(compressed.pgno, pgno);
assert_eq!(compressed.versions.len(), 3);
assert!(matches!(
compressed.versions[0].data,
CompressedVersionData::FullImage(ref img) if img == &vec![0xAA; 4096]
));
assert_eq!(compressed.versions[0].commit_seq, CommitSeq::new(100));
assert!(matches!(
compressed.versions[1].data,
CompressedVersionData::IntentLogPatch(_)
));
assert!(matches!(
compressed.versions[2].data,
CompressedVersionData::IntentLogPatch(_)
));
}
#[test]
fn test_page_history_ecs_encode_decode_roundtrip() {
let pgno = PageNumber::new(42).unwrap();
let images = vec![
(CommitSeq::new(200), vec![0x11; 512]),
(CommitSeq::new(100), vec![0x22; 512]),
];
let original = compress_page_history(pgno, &images).unwrap();
let encoded = original.to_bytes();
let decoded = CompressedPageHistory::from_bytes(&encoded).unwrap();
assert_eq!(original.pgno, decoded.pgno);
assert_eq!(original.versions.len(), decoded.versions.len());
assert_eq!(
original.versions[0].commit_seq,
decoded.versions[0].commit_seq
);
if let (CompressedVersionData::FullImage(orig), CompressedVersionData::FullImage(dec)) =
(&original.versions[0].data, &decoded.versions[0].data)
{
assert_eq!(orig, dec);
} else {
panic!("expected FullImage for newest version");
}
}
#[allow(clippy::too_many_lines)]
#[test]
fn test_intent_independence_relation() {
let t1 = TableId::new(1);
let t2 = TableId::new(2);
let insert_table1 = make_op_with_writes(
1,
IntentOpKind::Insert {
table: t1,
key: RowId::new(10),
record: vec![1, 2, 3],
},
vec![table_key(1, 10)],
);
let insert_table2 = make_op_with_writes(
1,
IntentOpKind::Insert {
table: t2,
key: RowId::new(20),
record: vec![4, 5, 6],
},
vec![table_key(2, 20)],
);
assert!(are_intent_ops_independent(&insert_table1, &insert_table2));
let insert_same_key = make_op_with_writes(
1,
IntentOpKind::Insert {
table: t1,
key: RowId::new(10),
record: vec![7, 8, 9],
},
vec![table_key(1, 10)],
);
assert!(!are_intent_ops_independent(
&insert_table1,
&insert_same_key
));
let different_epoch = make_op_with_writes(
2,
IntentOpKind::Insert {
table: t2,
key: RowId::new(30),
record: vec![],
},
vec![table_key(2, 30)],
);
assert!(!are_intent_ops_independent(
&insert_table1,
&different_epoch
));
let structural_insert = IntentOp {
schema_epoch: 1,
footprint: IntentFootprint {
reads: Vec::new(),
writes: vec![table_key(2, 40)],
structural: StructuralEffects::PAGE_SPLIT,
},
op: IntentOpKind::Insert {
table: t2,
key: RowId::new(40),
record: vec![],
},
};
assert!(!are_intent_ops_independent(
&insert_table1,
&structural_insert
));
let read_write_overlap = IntentOp {
schema_epoch: 1,
footprint: IntentFootprint {
reads: vec![table_key(1, 10)], writes: vec![table_key(2, 50)],
structural: StructuralEffects::NONE,
},
op: IntentOpKind::Update {
table: t2,
key: RowId::new(50),
new_record: vec![],
},
};
assert!(!are_intent_ops_independent(
&insert_table1,
&read_write_overlap
));
let idx_a = make_op_with_writes(
1,
IntentOpKind::IndexInsert {
index: IndexId::new(1),
key: vec![10],
rowid: RowId::new(1),
},
vec![SemanticKeyRef::new(
BtreeRef::Index(IndexId::new(1)),
SemanticKeyKind::IndexEntry,
&[10],
)],
);
let idx_b = make_op_with_writes(
1,
IntentOpKind::IndexDelete {
index: IndexId::new(2),
key: vec![20],
rowid: RowId::new(2),
},
vec![SemanticKeyRef::new(
BtreeRef::Index(IndexId::new(2)),
SemanticKeyKind::IndexEntry,
&[20],
)],
);
assert!(are_intent_ops_independent(&idx_a, &idx_b));
}
#[test]
fn test_update_expression_column_disjoint_commutativity() {
let t1 = TableId::new(1);
let key = RowId::new(100);
let update_col0 = make_op(
1,
IntentOpKind::UpdateExpression {
table: t1,
key,
column_updates: vec![(
ColumnIdx::new(0),
RebaseExpr::Literal(SqliteValue::Integer(42)),
)],
},
);
let update_col1 = make_op(
1,
IntentOpKind::UpdateExpression {
table: t1,
key,
column_updates: vec![(
ColumnIdx::new(1),
RebaseExpr::Literal(SqliteValue::Integer(99)),
)],
},
);
assert!(are_intent_ops_independent(&update_col0, &update_col1));
let overlapping_col0 = make_op(
1,
IntentOpKind::UpdateExpression {
table: t1,
key,
column_updates: vec![(
ColumnIdx::new(0),
RebaseExpr::Literal(SqliteValue::Integer(77)),
)],
},
);
assert!(!are_intent_ops_independent(&update_col0, &overlapping_col0));
let different_key_update = make_op(
1,
IntentOpKind::UpdateExpression {
table: t1,
key: RowId::new(200),
column_updates: vec![(
ColumnIdx::new(0),
RebaseExpr::Literal(SqliteValue::Integer(55)),
)],
},
);
assert!(are_intent_ops_independent(
&update_col0,
&different_key_update
));
let materialized_update = make_op(
1,
IntentOpKind::Update {
table: t1,
key,
new_record: vec![1, 2, 3],
},
);
assert!(!are_intent_ops_independent(
&update_col0,
&materialized_update
));
let delete_same_key = make_op(1, IntentOpKind::Delete { table: t1, key });
assert!(!are_intent_ops_independent(&update_col0, &delete_same_key));
}
#[test]
fn test_join_max_int_update_recognized() {
let col = ColumnIdx::new(3);
let expr1 = RebaseExpr::FunctionCall {
name: "MAX".to_owned(),
args: vec![
RebaseExpr::ColumnRef(col),
RebaseExpr::Literal(SqliteValue::Integer(100)),
],
};
assert!(is_join_max_int_update(col, &expr1));
let expr2 = RebaseExpr::FunctionCall {
name: "MAX".to_owned(),
args: vec![
RebaseExpr::Literal(SqliteValue::Integer(200)),
RebaseExpr::ColumnRef(col),
],
};
assert!(is_join_max_int_update(col, &expr2));
let expr3 = RebaseExpr::FunctionCall {
name: "max".to_owned(),
args: vec![
RebaseExpr::ColumnRef(col),
RebaseExpr::Literal(SqliteValue::Integer(50)),
],
};
assert!(is_join_max_int_update(col, &expr3));
assert!(!is_join_max_int_update(ColumnIdx::new(99), &expr1));
let expr4 = RebaseExpr::FunctionCall {
name: "MIN".to_owned(),
args: vec![
RebaseExpr::ColumnRef(col),
RebaseExpr::Literal(SqliteValue::Integer(10)),
],
};
assert!(!is_join_max_int_update(col, &expr4));
let expr5 = RebaseExpr::FunctionCall {
name: "MAX".to_owned(),
args: vec![
RebaseExpr::ColumnRef(col),
RebaseExpr::Literal(SqliteValue::Text("hello".into())),
],
};
assert!(!is_join_max_int_update(col, &expr5));
let expressions: Vec<&RebaseExpr> = vec![&expr1, &expr2];
let collapsed = collapse_join_max_updates(col, &expressions).unwrap();
if let RebaseExpr::FunctionCall { args, .. } = &collapsed {
assert!(matches!(
&args[1],
RebaseExpr::Literal(SqliteValue::Integer(200))
));
} else {
panic!("expected FunctionCall");
}
let op_a = make_op(
1,
IntentOpKind::UpdateExpression {
table: TableId::new(1),
key: RowId::new(1),
column_updates: vec![(col, expr1)],
},
);
let op_b = make_op(
1,
IntentOpKind::UpdateExpression {
table: TableId::new(1),
key: RowId::new(1),
column_updates: vec![(col, expr2)],
},
);
assert!(are_intent_ops_independent(&op_a, &op_b));
}
#[test]
fn test_merge_certificate_generation_and_verification() {
let ops = vec![
make_op_with_writes(
1,
IntentOpKind::Insert {
table: TableId::new(1),
key: RowId::new(10),
record: vec![1, 2, 3],
},
vec![table_key(1, 10)],
),
make_op_with_writes(
1,
IntentOpKind::Insert {
table: TableId::new(1),
key: RowId::new(20),
record: vec![4, 5, 6],
},
vec![table_key(1, 20)],
),
];
let page_bytes = vec![0xAA; 4096];
let pgno = PageNumber::new(3).unwrap();
let affected_pages = vec![(pgno, page_bytes)];
let btree_hash = blake3::hash(b"btree_invariant_ok");
let mut btree_inv_hash = [0u8; 16];
btree_inv_hash.copy_from_slice(&btree_hash.as_bytes()[..16]);
let cert = generate_merge_certificate(
MergeKind::StructuredPatch,
50,
1,
&ops,
&affected_pages,
btree_inv_hash,
)
.unwrap();
assert_eq!(cert.merge_kind, MergeKind::StructuredPatch);
assert_eq!(cert.base_commit_seq, 50);
assert_eq!(cert.schema_epoch, 1);
assert_eq!(cert.pages, vec![pgno]);
assert_eq!(cert.intent_op_digests.len(), 2);
assert_eq!(cert.normal_form.len(), 2);
assert_eq!(cert.verifier_version, VERIFIER_VERSION);
let result = verify_merge_certificate(&ops, &affected_pages, btree_inv_hash, &cert);
assert!(result.is_ok());
}
#[test]
fn test_merge_certificate_replay_deterministic() {
let ops = vec![
make_op(
1,
IntentOpKind::Insert {
table: TableId::new(5),
key: RowId::new(1),
record: vec![10, 20],
},
),
make_op(
1,
IntentOpKind::Delete {
table: TableId::new(5),
key: RowId::new(2),
},
),
make_op(
1,
IntentOpKind::Insert {
table: TableId::new(5),
key: RowId::new(3),
record: vec![30, 40],
},
),
];
let pages = vec![
(PageNumber::new(10).unwrap(), vec![0xBB; 2048]),
(PageNumber::new(11).unwrap(), vec![0xCC; 2048]),
];
let btree_hash = [0x42u8; 16];
let cert = generate_merge_certificate(MergeKind::Rebase, 100, 1, &ops, &pages, btree_hash)
.unwrap();
let result = verify_merge_certificate(&ops, &pages, btree_hash, &cert);
assert!(result.is_ok());
let cert2 = generate_merge_certificate(MergeKind::Rebase, 100, 1, &ops, &pages, btree_hash)
.unwrap();
assert_eq!(cert.intent_op_digests, cert2.intent_op_digests);
assert_eq!(cert.footprint_digest, cert2.footprint_digest);
assert_eq!(cert.normal_form, cert2.normal_form);
assert_eq!(cert.post_state.page_hashes, cert2.post_state.page_hashes);
assert_eq!(
cert.post_state.btree_invariant_hash,
cert2.post_state.btree_invariant_hash
);
}
#[test]
fn test_circuit_breaker_disables_merging_on_verification_failure() {
let ops = vec![make_op(
1,
IntentOpKind::Insert {
table: TableId::new(1),
key: RowId::new(1),
record: vec![1],
},
)];
let pages = vec![(PageNumber::new(1).unwrap(), vec![0xFF; 4096])];
let btree_hash = [0x00u8; 16];
let cert =
generate_merge_certificate(MergeKind::StructuredPatch, 10, 1, &ops, &pages, btree_hash)
.unwrap();
let tampered_btree_hash = [0xFFu8; 16];
let result = verify_merge_certificate(&ops, &pages, tampered_btree_hash, &cert);
assert!(result.is_err());
let err = result.unwrap_err();
assert!(matches!(
err,
CertificateVerificationError::BtreeInvariantHashMismatch { .. }
));
let event = circuit_breaker_check(err, &cert);
assert!(event.disable_safe_merge);
assert!(matches!(
event.error,
CertificateVerificationError::BtreeInvariantHashMismatch { .. }
));
let tampered_pages = vec![(PageNumber::new(1).unwrap(), vec![0x00; 4096])];
let result2 = verify_merge_certificate(&ops, &tampered_pages, btree_hash, &cert);
assert!(result2.is_err());
assert!(matches!(
result2.unwrap_err(),
CertificateVerificationError::PageHashMismatch { .. }
));
let tampered_ops = vec![make_op(
1,
IntentOpKind::Insert {
table: TableId::new(1),
key: RowId::new(1),
record: vec![99], },
)];
let result3 = verify_merge_certificate(&tampered_ops, &pages, btree_hash, &cert);
assert!(result3.is_err());
assert!(matches!(
result3.unwrap_err(),
CertificateVerificationError::OpDigestMismatch { .. }
));
}
#[test]
fn test_e2e_history_compression_preserves_query_results() {
let pgno = PageNumber::new(7).unwrap();
let full_images: Vec<(CommitSeq, Vec<u8>)> = (1u64..=100)
.rev()
.map(|seq| {
let mut page = vec![0u8; 256];
page[..8].copy_from_slice(&seq.to_le_bytes());
#[allow(clippy::cast_possible_truncation)]
{
page[8] = (seq % 251) as u8;
}
(CommitSeq::new(seq), page)
})
.collect();
let compressed = compress_page_history(pgno, &full_images).unwrap();
let roundtripped = CompressedPageHistory::from_bytes(&compressed.to_bytes()).unwrap();
let retention_low = full_images[0].0;
let query_uncompressed = |snapshot: CommitSeq| -> Option<Vec<u8>> {
if snapshot.get() < retention_low.get() {
return None;
}
full_images
.iter()
.find(|(seq, _)| seq.get() <= snapshot.get())
.map(|(_, page)| page.clone())
};
let query_compressed = |snapshot: CommitSeq| -> Option<Vec<u8>> {
if snapshot.get() < retention_low.get() {
return None;
}
roundtripped
.versions
.iter()
.find(|version| version.commit_seq.get() <= snapshot.get())
.and_then(|version| match &version.data {
CompressedVersionData::FullImage(page) => Some(page.clone()),
CompressedVersionData::IntentLogPatch(_)
| CompressedVersionData::StructuredPatch(_) => None,
})
};
let snapshots = [
CommitSeq::new(50),
CommitSeq::new(99),
CommitSeq::new(100),
CommitSeq::new(101),
];
for snapshot in snapshots {
assert_eq!(
query_uncompressed(snapshot),
query_compressed(snapshot),
"retained query mismatch at commit_seq={}",
snapshot.get()
);
}
}
#[test]
fn test_op_digest_deterministic() {
let op = make_op(
1,
IntentOpKind::Insert {
table: TableId::new(1),
key: RowId::new(42),
record: vec![1, 2, 3],
},
);
let d1 = compute_op_digest(&op);
let d2 = compute_op_digest(&op);
assert_eq!(d1, d2, "op digest must be deterministic");
let op2 = make_op(
1,
IntentOpKind::Insert {
table: TableId::new(1),
key: RowId::new(43),
record: vec![1, 2, 3],
},
);
let d3 = compute_op_digest(&op2);
assert_ne!(d1, d3);
}
#[test]
fn test_foata_normal_form_independent_ops_single_layer() {
let ops = vec![
make_op_with_writes(
1,
IntentOpKind::Insert {
table: TableId::new(1),
key: RowId::new(1),
record: vec![],
},
vec![table_key(1, 1)],
),
make_op_with_writes(
1,
IntentOpKind::Insert {
table: TableId::new(1),
key: RowId::new(2),
record: vec![],
},
vec![table_key(1, 2)],
),
make_op_with_writes(
1,
IntentOpKind::Insert {
table: TableId::new(1),
key: RowId::new(3),
record: vec![],
},
vec![table_key(1, 3)],
),
];
let nf = foata_normal_form(&ops);
assert_eq!(nf.len(), 3);
let digests: BTreeSet<[u8; 16]> = ops.iter().map(compute_op_digest).collect();
let nf_set: BTreeSet<[u8; 16]> = nf.into_iter().collect();
assert_eq!(digests, nf_set);
}
#[test]
fn test_foata_normal_form_dependent_ops_multiple_layers() {
let ops = vec![
make_op_with_writes(
1,
IntentOpKind::Insert {
table: TableId::new(1),
key: RowId::new(1),
record: vec![1],
},
vec![table_key(1, 1)],
),
make_op_with_writes(
1,
IntentOpKind::Update {
table: TableId::new(1),
key: RowId::new(1),
new_record: vec![2],
},
vec![table_key(1, 1)],
),
];
let nf = foata_normal_form(&ops);
assert_eq!(nf.len(), 2);
assert_eq!(nf[0], compute_op_digest(&ops[0]));
assert_eq!(nf[1], compute_op_digest(&ops[1]));
}
#[test]
fn test_empty_history_returns_error() {
let pgno = PageNumber::new(1).unwrap();
let result = compress_page_history(pgno, &[]);
assert!(matches!(result, Err(HistoryCompressionError::EmptyHistory)));
}
#[test]
fn test_is_mergeable_intent_structural_rejected() {
let op = IntentOp {
schema_epoch: 1,
footprint: IntentFootprint {
reads: Vec::new(),
writes: Vec::new(),
structural: StructuralEffects::PAGE_SPLIT,
},
op: IntentOpKind::Insert {
table: TableId::new(1),
key: RowId::new(1),
record: vec![],
},
};
assert!(!is_mergeable_intent(&op));
}
}