use super::parallel_runtime::ParallelTask;
use super::*;
use crate::catalog::SYSTEM_PROJECT_ID;
use crate::commit::assertions::{evaluate_assertions, validate_assertions};
use crate::commit::tx::{AssertionActual, ReadAssertion};
use crate::commit::validation::KvIntegerAmount;
use crate::lib_helpers::{
ddl_would_apply, lifecycle_template_for_ddl, lifecycle_templates_for_mutation,
};
use crate::wal::segment::PendingFrame;
use primitive_types::U256;
use serde::Serialize;
use std::borrow::Cow;
use std::ops::Bound;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc as std_mpsc;
const MEMORY_ESTIMATE_INTERVAL_MICROS: u64 = 250_000;
const SINGLE_REQUEST_PARALLEL_APPLY_MUTATION_THRESHOLD: usize = 1024;
#[derive(Clone, Default)]
pub(super) struct ParallelMergeTargets {
namespace_id: Option<NamespaceId>,
kv_keys: HashSet<Vec<u8>>,
tables: HashSet<String>,
table_rows: HashMap<String, HashSet<EncodedKey>>,
accumulators: HashSet<String>,
}
pub(super) struct DeferredParallelCommit {
sequenced_index: usize,
merge_targets: ParallelMergeTargets,
}
fn apply_accumulator_hot_mutation(
keyspace: &mut Keyspace,
mutation: &Mutation,
commit_seq: u64,
) -> Option<Result<(), AedbError>> {
match mutation {
Mutation::Accumulate {
project_id,
scope_id,
accumulator_name,
delta,
dedupe_key,
order_key,
release_exposure_id,
} => {
let appended = keyspace.append_accumulator_delta(
project_id,
scope_id,
accumulator_name,
*delta,
dedupe_key,
*order_key,
commit_seq,
);
Some(appended.and_then(|append_result| {
if let Some(exposure_id) = release_exposure_id
&& matches!(
append_result,
crate::storage::keyspace::AccumulatorAppendResult::Applied
)
{
keyspace.release_accumulator_exposure(
project_id,
scope_id,
accumulator_name,
exposure_id,
)?;
}
Ok(())
}))
}
Mutation::ExposeAccumulator {
project_id,
scope_id,
accumulator_name,
amount,
exposure_id,
} => Some(keyspace.expose_accumulator(
project_id,
scope_id,
accumulator_name,
*amount,
exposure_id,
commit_seq,
)),
Mutation::ExposeAccumulatorBatch {
project_id,
scope_id,
accumulator_name,
exposures,
} => Some(keyspace.expose_accumulator_batch(
project_id,
scope_id,
accumulator_name,
exposures,
commit_seq,
)),
_ => None,
}
}
fn apply_keyspace_only_mutation(
keyspace: &mut Keyspace,
mutation: &Mutation,
commit_seq: u64,
) -> Option<Result<(), AedbError>> {
if let Some(result) = apply_accumulator_hot_mutation(keyspace, mutation, commit_seq) {
return Some(result);
}
match mutation {
Mutation::KvSet {
project_id,
scope_id,
key,
value,
} => Some(keyspace.kv_set(project_id, scope_id, key.clone(), value.clone(), commit_seq)),
Mutation::KvDel {
project_id,
scope_id,
key,
} => Some({
let _ = keyspace.kv_del(project_id, scope_id, key, commit_seq);
Ok(())
}),
Mutation::KvIncU256 {
project_id,
scope_id,
key,
amount_be,
} => Some(
keyspace
.kv_inc_u256(
project_id,
scope_id,
key.clone(),
U256::from_big_endian(amount_be),
commit_seq,
)
.map(|_| ()),
),
Mutation::KvDecU256 {
project_id,
scope_id,
key,
amount_be,
} => Some(
keyspace
.kv_dec_u256(
project_id,
scope_id,
key.clone(),
U256::from_big_endian(amount_be),
commit_seq,
)
.map(|_| ()),
),
Mutation::KvAddU256Ex {
project_id,
scope_id,
key,
amount_be,
on_missing,
on_overflow,
} => Some(keyspace.kv_add_u256_ex(
project_id,
scope_id,
key.clone(),
U256::from_big_endian(amount_be),
on_missing,
on_overflow,
commit_seq,
)),
Mutation::KvSubU256Ex {
project_id,
scope_id,
key,
amount_be,
on_missing,
on_underflow,
} => Some(keyspace.kv_sub_u256_ex(
project_id,
scope_id,
key.clone(),
U256::from_big_endian(amount_be),
on_missing,
on_underflow,
commit_seq,
)),
Mutation::KvMaxU256 {
project_id,
scope_id,
key,
candidate_be,
on_missing,
} => Some(keyspace.kv_max_u256(
project_id,
scope_id,
key.clone(),
U256::from_big_endian(candidate_be),
on_missing,
commit_seq,
)),
Mutation::KvMinU256 {
project_id,
scope_id,
key,
candidate_be,
on_missing,
} => Some(keyspace.kv_min_u256(
project_id,
scope_id,
key.clone(),
U256::from_big_endian(candidate_be),
on_missing,
commit_seq,
)),
Mutation::KvMutateU256 {
project_id,
scope_id,
key,
op,
operand_be,
expected_seq,
} => Some((|| {
if let Some(expected_seq) = expected_seq {
let actual = keyspace
.kv_get(project_id, scope_id, key)
.map(|entry| entry.version)
.unwrap_or(0);
if actual != *expected_seq {
return Err(AedbError::AssertionFailed {
index: 0,
assertion: Box::new(ReadAssertion::KeyVersion {
project_id: project_id.clone(),
scope_id: scope_id.clone(),
key: key.clone(),
expected_seq: *expected_seq,
}),
actual: Box::new(AssertionActual::Version(actual)),
});
}
}
keyspace.kv_mutate_u256(
project_id,
scope_id,
key.clone(),
*op,
U256::from_big_endian(operand_be),
commit_seq,
)
})()),
Mutation::KvAddU64Ex {
project_id,
scope_id,
key,
amount_be,
on_missing,
on_overflow,
} => Some(keyspace.kv_add_u64_ex(
project_id,
scope_id,
key.clone(),
u64::from_be_bytes(*amount_be),
on_missing,
on_overflow,
commit_seq,
)),
Mutation::KvSubU64Ex {
project_id,
scope_id,
key,
amount_be,
on_missing,
on_underflow,
} => Some(keyspace.kv_sub_u64_ex(
project_id,
scope_id,
key.clone(),
u64::from_be_bytes(*amount_be),
on_missing,
on_underflow,
commit_seq,
)),
Mutation::KvSubIntEx {
project_id,
scope_id,
key,
amount,
on_missing,
on_underflow,
} => Some(keyspace.kv_sub_int_ex(
project_id,
scope_id,
key.clone(),
*amount,
*on_missing,
*on_underflow,
commit_seq,
)),
Mutation::CounterAdd {
project_id,
scope_id,
key,
amount_be,
shard_count,
shard_hint,
} => Some(keyspace.counter_add_sharded(
project_id,
scope_id,
key.clone(),
*amount_be,
*shard_count,
*shard_hint,
commit_seq,
)),
Mutation::KvMaxU64 {
project_id,
scope_id,
key,
candidate_be,
on_missing,
} => Some(keyspace.kv_max_u64(
project_id,
scope_id,
key.clone(),
u64::from_be_bytes(*candidate_be),
on_missing,
commit_seq,
)),
Mutation::KvMinU64 {
project_id,
scope_id,
key,
candidate_be,
on_missing,
} => Some(keyspace.kv_min_u64(
project_id,
scope_id,
key.clone(),
u64::from_be_bytes(*candidate_be),
on_missing,
commit_seq,
)),
Mutation::KvMutateU64 {
project_id,
scope_id,
key,
op,
operand_be,
expected_seq,
} => Some((|| {
if let Some(expected_seq) = expected_seq {
let actual = keyspace
.kv_get(project_id, scope_id, key)
.map(|entry| entry.version)
.unwrap_or(0);
if actual != *expected_seq {
return Err(AedbError::AssertionFailed {
index: 0,
assertion: Box::new(ReadAssertion::KeyVersion {
project_id: project_id.clone(),
scope_id: scope_id.clone(),
key: key.clone(),
expected_seq: *expected_seq,
}),
actual: Box::new(AssertionActual::Version(actual)),
});
}
}
keyspace.kv_mutate_u64(
project_id,
scope_id,
key.clone(),
*op,
u64::from_be_bytes(*operand_be),
commit_seq,
)
})()),
Mutation::ReleaseAccumulatorExposure {
project_id,
scope_id,
accumulator_name,
exposure_id,
} => Some(keyspace.release_accumulator_exposure(
project_id,
scope_id,
accumulator_name,
exposure_id,
)),
_ => None,
}
}
#[derive(Clone, Copy)]
struct CommitMutationClass {
is_keyspace_only: bool,
has_ddl: bool,
has_emit_event: bool,
}
fn classify_commit_mutations(mutations: &[Mutation]) -> CommitMutationClass {
let mut is_keyspace_only = true;
let mut has_ddl = false;
let mut has_emit_event = false;
for mutation in mutations {
match mutation {
Mutation::Ddl(_) => {
has_ddl = true;
is_keyspace_only = false;
}
Mutation::EmitEvent { .. } => {
has_emit_event = true;
is_keyspace_only = false;
}
Mutation::KvSet { .. }
| Mutation::KvDel { .. }
| Mutation::KvIncU256 { .. }
| Mutation::KvDecU256 { .. }
| Mutation::KvAddU256Ex { .. }
| Mutation::KvSubU256Ex { .. }
| Mutation::KvMaxU256 { .. }
| Mutation::KvMinU256 { .. }
| Mutation::KvMutateU256 { .. }
| Mutation::KvAddU64Ex { .. }
| Mutation::KvSubU64Ex { .. }
| Mutation::KvSubIntEx { .. }
| Mutation::CounterAdd { .. }
| Mutation::KvMaxU64 { .. }
| Mutation::KvMinU64 { .. }
| Mutation::KvMutateU64 { .. }
| Mutation::Accumulate { .. }
| Mutation::ExposeAccumulator { .. }
| Mutation::ExposeAccumulatorBatch { .. }
| Mutation::ReleaseAccumulatorExposure { .. } => {}
_ => {
is_keyspace_only = false;
}
}
}
CommitMutationClass {
is_keyspace_only,
has_ddl,
has_emit_event,
}
}
pub(super) fn write_partitions_after_lifecycle<'a>(
catalog: &Catalog,
request: &'a CommitRequest,
mutations: &[Mutation],
lifecycle_mutation_added: bool,
) -> Cow<'a, HashSet<String>> {
if lifecycle_mutation_added {
Cow::Owned(derive_write_partitions_with_fk_expansion(
catalog, mutations,
))
} else {
Cow::Borrowed(&request.write_partitions)
}
}
pub(super) fn pre_stage_validate(
validation_catalog: &Arc<RwLock<Catalog>>,
envelope: &TransactionEnvelope,
config: &AedbConfig,
kv_sizes_prechecked: bool,
) -> Result<(HashSet<String>, HashSet<String>), AedbError> {
let catalog = validation_catalog.read();
if let Some(result) =
pre_stage_validate_kv_only(&catalog, envelope, config, kv_sizes_prechecked)
{
return result;
}
if !envelope.assertions.is_empty() {
validate_assertions(&catalog, &envelope.assertions)?;
}
let caller = envelope.caller.as_ref();
let mut staged_catalog: Option<Catalog> = None;
for mutation in &envelope.write_intent.mutations {
let active_catalog = staged_catalog.as_ref().unwrap_or(&catalog);
if caller.is_some() {
validate_permissions(active_catalog, caller, mutation)?;
}
crate::commit::validation::validate_mutation_with_config(active_catalog, mutation, config)?;
if let Mutation::Ddl(ddl) = mutation {
if staged_catalog.is_none() {
staged_catalog = Some(catalog.clone());
}
if let Some(next) = staged_catalog.as_mut() {
next.apply_ddl(ddl.clone())?;
}
}
}
let partition_catalog = staged_catalog.as_ref().unwrap_or(&catalog);
let write_partitions =
derive_write_partitions_for_envelope(partition_catalog, &envelope.write_intent.mutations);
let read_partitions = derive_read_partitions(envelope);
Ok((write_partitions, read_partitions))
}
type KvPartitionValidation = Result<(HashSet<String>, HashSet<String>), AedbError>;
fn pre_stage_validate_kv_only(
catalog: &Catalog,
envelope: &TransactionEnvelope,
config: &AedbConfig,
kv_sizes_prechecked: bool,
) -> Option<KvPartitionValidation> {
if envelope.caller.is_some()
|| !envelope.assertions.is_empty()
|| !envelope.read_set.points.is_empty()
|| !envelope.read_set.ranges.is_empty()
{
return None;
}
if let [mutation] = envelope.write_intent.mutations.as_slice() {
let parts = kv_validation_parts(mutation)?;
let validate_result = if kv_sizes_prechecked {
validate_kv_fast_path_scope(catalog, parts.project_id, parts.scope_id)
.and_then(|()| validate_kv_fast_path_counter_shards(&parts))
} else {
validate_kv_fast_path_parts(catalog, config, &parts)
};
if let Err(err) = validate_result {
return Some(Err(err));
}
let namespace = namespace_key(parts.project_id, parts.scope_id);
let mut write_partitions = HashSet::with_capacity(1);
write_partitions.insert(kv_key_partition_token(
&namespace,
parts.partition_key.as_ref(),
));
return Some(Ok((write_partitions, HashSet::new())));
}
let mut seen_scopes: Vec<KvFastPathScope<'_>> = Vec::new();
let mut last_scope_index: Option<usize> = None;
let mut write_partitions = HashSet::with_capacity(envelope.write_intent.mutations.len());
for mutation in &envelope.write_intent.mutations {
let parts = kv_validation_parts(mutation)?;
let scope_index = if let Some(index) = last_scope_index
&& seen_scopes[index].project_id == parts.project_id
&& seen_scopes[index].scope_id == parts.scope_id
{
index
} else if let Some(index) = seen_scopes
.iter()
.position(|seen| seen.project_id == parts.project_id && seen.scope_id == parts.scope_id)
{
index
} else {
if let Err(err) = validate_kv_fast_path_scope(catalog, parts.project_id, parts.scope_id)
{
return Some(Err(err));
}
seen_scopes.push(KvFastPathScope {
project_id: parts.project_id,
scope_id: parts.scope_id,
namespace: namespace_key(parts.project_id, parts.scope_id),
});
seen_scopes.len() - 1
};
last_scope_index = Some(scope_index);
if let Err(err) = validate_kv_fast_path_sizes(config, &parts) {
return Some(Err(err));
}
write_partitions.insert(kv_key_partition_token(
&seen_scopes[scope_index].namespace,
parts.partition_key.as_ref(),
));
}
Some(Ok((write_partitions, HashSet::new())))
}
fn validate_kv_fast_path_parts(
catalog: &Catalog,
config: &AedbConfig,
parts: &KvFastPathParts<'_>,
) -> Result<(), AedbError> {
validate_kv_fast_path_scope(catalog, parts.project_id, parts.scope_id)?;
validate_kv_fast_path_sizes(config, parts)
}
fn validate_kv_fast_path_scope(
catalog: &Catalog,
project_id: &str,
scope_id: &str,
) -> Result<(), AedbError> {
if !catalog.projects.contains_key(project_id) {
return Err(AedbError::Validation(format!(
"project does not exist: {project_id}"
)));
}
if !catalog_scope_exists(catalog, project_id, scope_id) {
return Err(AedbError::Validation(format!(
"scope does not exist: {project_id}.{scope_id}"
)));
}
Ok(())
}
fn validate_kv_fast_path_sizes(
config: &AedbConfig,
parts: &KvFastPathParts<'_>,
) -> Result<(), AedbError> {
if parts.key.len() > config.max_kv_key_bytes {
return Err(AedbError::Validation("kv key too large".into()));
}
if let Some(len) = parts.value_len
&& len > config.max_kv_value_bytes
{
return Err(AedbError::Validation("kv value too large".into()));
}
if let Some(shard_count) = parts.counter_shards {
validate_counter_shard_count(shard_count)?;
}
Ok(())
}
fn validate_kv_fast_path_counter_shards(parts: &KvFastPathParts<'_>) -> Result<(), AedbError> {
if let Some(shard_count) = parts.counter_shards {
validate_counter_shard_count(shard_count)?;
}
Ok(())
}
fn validate_counter_shard_count(shard_count: u16) -> Result<(), AedbError> {
if shard_count == 0 {
return Err(AedbError::Validation(
"counter shard_count must be > 0".into(),
));
}
if shard_count > crate::commit::validation::MAX_COUNTER_SHARDS {
return Err(AedbError::Validation(format!(
"counter shard_count exceeds maximum {}",
crate::commit::validation::MAX_COUNTER_SHARDS
)));
}
Ok(())
}
fn catalog_scope_exists(catalog: &Catalog, project_id: &str, scope_id: &str) -> bool {
catalog
.scopes
.contains_key(&(project_id.to_string(), scope_id.to_string()))
}
struct KvFastPathParts<'a> {
project_id: &'a str,
scope_id: &'a str,
key: &'a [u8],
partition_key: Cow<'a, [u8]>,
value_len: Option<usize>,
counter_shards: Option<u16>,
}
struct KvFastPathScope<'a> {
project_id: &'a str,
scope_id: &'a str,
namespace: String,
}
fn kv_validation_parts(mutation: &Mutation) -> Option<KvFastPathParts<'_>> {
match mutation {
Mutation::KvSet {
project_id,
scope_id,
key,
value,
} => Some(KvFastPathParts {
project_id,
scope_id,
key,
partition_key: Cow::Borrowed(key),
value_len: Some(value.len()),
counter_shards: None,
}),
Mutation::KvDel {
project_id,
scope_id,
key,
} => Some(KvFastPathParts {
project_id,
scope_id,
key,
partition_key: Cow::Borrowed(key),
value_len: None,
counter_shards: None,
}),
Mutation::KvIncU256 {
project_id,
scope_id,
key,
..
}
| Mutation::KvDecU256 {
project_id,
scope_id,
key,
..
}
| Mutation::KvAddU256Ex {
project_id,
scope_id,
key,
..
}
| Mutation::KvSubU256Ex {
project_id,
scope_id,
key,
..
}
| Mutation::KvMaxU256 {
project_id,
scope_id,
key,
..
}
| Mutation::KvMinU256 {
project_id,
scope_id,
key,
..
}
| Mutation::KvMutateU256 {
project_id,
scope_id,
key,
..
} => Some(KvFastPathParts {
project_id,
scope_id,
key,
partition_key: Cow::Borrowed(key),
value_len: Some(32),
counter_shards: None,
}),
Mutation::KvAddU64Ex {
project_id,
scope_id,
key,
..
}
| Mutation::KvSubU64Ex {
project_id,
scope_id,
key,
..
}
| Mutation::KvMaxU64 {
project_id,
scope_id,
key,
..
}
| Mutation::KvMinU64 {
project_id,
scope_id,
key,
..
}
| Mutation::KvMutateU64 {
project_id,
scope_id,
key,
..
} => Some(KvFastPathParts {
project_id,
scope_id,
key,
partition_key: Cow::Borrowed(key),
value_len: Some(8),
counter_shards: None,
}),
Mutation::KvSubIntEx {
project_id,
scope_id,
key,
amount,
..
} => Some(KvFastPathParts {
project_id,
scope_id,
key,
partition_key: Cow::Borrowed(key),
value_len: Some(match amount {
KvIntegerAmount::U64(_) => 8,
KvIntegerAmount::U256(_) => 32,
}),
counter_shards: None,
}),
Mutation::CounterAdd {
project_id,
scope_id,
key,
shard_count,
shard_hint,
..
} => {
let shard = crate::commit::validation::counter_shard_index(*shard_hint, *shard_count);
Some(KvFastPathParts {
project_id,
scope_id,
key,
partition_key: Cow::Owned(crate::commit::validation::counter_shard_storage_key(
key, shard,
)),
value_len: Some(8),
counter_shards: Some(*shard_count),
})
}
_ => None,
}
}
pub(super) fn derive_write_partitions_for_envelope(
catalog: &Catalog,
mutations: &[Mutation],
) -> HashSet<String> {
if let [mutation] = mutations
&& let Some(token) = single_write_partition_token(mutation)
{
let mut out = HashSet::with_capacity(1);
out.insert(token);
return out;
}
if let Some(partitions) = direct_write_partition_tokens(mutations) {
return partitions;
}
derive_write_partitions_with_fk_expansion(catalog, mutations)
}
pub(super) fn direct_write_partition_tokens(mutations: &[Mutation]) -> Option<HashSet<String>> {
let mut out = HashSet::with_capacity(mutations.len());
for mutation in mutations {
out.insert(single_write_partition_token(mutation)?);
}
Some(out)
}
pub(super) fn single_write_partition_token(mutation: &Mutation) -> Option<String> {
match mutation {
Mutation::KvSet {
project_id,
scope_id,
key,
..
}
| Mutation::KvDel {
project_id,
scope_id,
key,
..
}
| Mutation::KvIncU256 {
project_id,
scope_id,
key,
..
}
| Mutation::KvDecU256 {
project_id,
scope_id,
key,
..
}
| Mutation::KvAddU256Ex {
project_id,
scope_id,
key,
..
}
| Mutation::KvSubU256Ex {
project_id,
scope_id,
key,
..
}
| Mutation::KvMaxU256 {
project_id,
scope_id,
key,
..
}
| Mutation::KvMinU256 {
project_id,
scope_id,
key,
..
}
| Mutation::KvMutateU256 {
project_id,
scope_id,
key,
..
}
| Mutation::KvAddU64Ex {
project_id,
scope_id,
key,
..
}
| Mutation::KvSubU64Ex {
project_id,
scope_id,
key,
..
}
| Mutation::KvSubIntEx {
project_id,
scope_id,
key,
..
}
| Mutation::KvMaxU64 {
project_id,
scope_id,
key,
..
}
| Mutation::KvMinU64 {
project_id,
scope_id,
key,
..
}
| Mutation::KvMutateU64 {
project_id,
scope_id,
key,
..
} => {
let ns = namespace_key(project_id, scope_id);
Some(kv_key_partition_token(&ns, key))
}
Mutation::CounterAdd {
project_id,
scope_id,
key,
shard_count,
shard_hint,
..
} => {
let ns = namespace_key(project_id, scope_id);
let shard = crate::commit::validation::counter_shard_index(*shard_hint, *shard_count);
let shard_key = crate::commit::validation::counter_shard_storage_key(key, shard);
Some(kv_key_partition_token(&ns, &shard_key))
}
Mutation::Accumulate {
project_id,
scope_id,
accumulator_name,
..
}
| Mutation::ExposeAccumulator {
project_id,
scope_id,
accumulator_name,
..
}
| Mutation::ExposeAccumulatorBatch {
project_id,
scope_id,
accumulator_name,
..
}
| Mutation::ReleaseAccumulatorExposure {
project_id,
scope_id,
accumulator_name,
..
} => {
let ns = namespace_key(project_id, scope_id);
Some(format!("acc:{ns}:{accumulator_name}"))
}
Mutation::EmitEvent {
project_id,
scope_id,
topic,
..
} => {
let ns = namespace_key(project_id, scope_id);
Some(format!("evt:{ns}:{topic}"))
}
Mutation::OrderBookNew {
project_id,
scope_id,
request,
} => {
let ns = namespace_key(project_id, scope_id);
Some(order_book_partition_token(&ns, &request.instrument))
}
Mutation::OrderBookCancel {
project_id,
scope_id,
instrument,
..
}
| Mutation::OrderBookCancelReplace {
project_id,
scope_id,
instrument,
..
}
| Mutation::OrderBookMassCancel {
project_id,
scope_id,
instrument,
..
}
| Mutation::OrderBookReduce {
project_id,
scope_id,
instrument,
..
}
| Mutation::OrderBookMatch {
project_id,
scope_id,
instrument,
..
} => {
let ns = namespace_key(project_id, scope_id);
Some(order_book_partition_token(&ns, instrument))
}
Mutation::OrderBookDefineTable {
project_id,
scope_id,
table_id,
..
}
| Mutation::OrderBookDropTable {
project_id,
scope_id,
table_id,
} => {
let ns = namespace_key(project_id, scope_id);
Some(order_book_meta_partition_token(&ns, "table", table_id))
}
Mutation::OrderBookSetInstrumentConfig {
project_id,
scope_id,
instrument,
..
}
| Mutation::OrderBookSetInstrumentHalted {
project_id,
scope_id,
instrument,
..
} => {
let ns = namespace_key(project_id, scope_id);
Some(order_book_meta_partition_token(&ns, "cfg", instrument))
}
_ => None,
}
}
pub(super) fn shard_for_envelope(envelope: &TransactionEnvelope, shard_count: usize) -> usize {
if shard_count <= 1 {
return 0;
}
let mut hasher = std::collections::hash_map::DefaultHasher::new();
hash_scope_shard_key(&envelope.write_intent.mutations, &mut hasher);
(hasher.finish() as usize) % shard_count
}
fn hash_scope_shard_key<H: Hasher>(mutations: &[Mutation], state: &mut H) {
let Some(mutation) = mutations.first() else {
"empty".hash(state);
return;
};
match mutation {
Mutation::Insert {
project_id,
scope_id,
table_name,
..
}
| Mutation::InsertBatch {
project_id,
scope_id,
table_name,
..
}
| Mutation::Upsert {
project_id,
scope_id,
table_name,
..
}
| Mutation::UpsertBatch {
project_id,
scope_id,
table_name,
..
}
| Mutation::UpsertOnConflict {
project_id,
scope_id,
table_name,
..
}
| Mutation::UpsertBatchOnConflict {
project_id,
scope_id,
table_name,
..
}
| Mutation::Delete {
project_id,
scope_id,
table_name,
..
}
| Mutation::DeleteWhere {
project_id,
scope_id,
table_name,
..
}
| Mutation::UpdateWhere {
project_id,
scope_id,
table_name,
..
}
| Mutation::UpdateWhereExpr {
project_id,
scope_id,
table_name,
..
}
| Mutation::TableIncU256 {
project_id,
scope_id,
table_name,
..
}
| Mutation::TableDecU256 {
project_id,
scope_id,
table_name,
..
} => {
't'.hash(state);
project_id.hash(state);
scope_id.hash(state);
table_name.hash(state);
}
Mutation::KvSet {
project_id,
scope_id,
..
}
| Mutation::KvDel {
project_id,
scope_id,
..
}
| Mutation::KvIncU256 {
project_id,
scope_id,
..
}
| Mutation::KvDecU256 {
project_id,
scope_id,
..
}
| Mutation::KvAddU256Ex {
project_id,
scope_id,
..
}
| Mutation::KvSubU256Ex {
project_id,
scope_id,
..
}
| Mutation::KvMaxU256 {
project_id,
scope_id,
..
}
| Mutation::KvMinU256 {
project_id,
scope_id,
..
}
| Mutation::KvMutateU256 {
project_id,
scope_id,
..
}
| Mutation::KvAddU64Ex {
project_id,
scope_id,
..
}
| Mutation::KvSubU64Ex {
project_id,
scope_id,
..
}
| Mutation::KvSubIntEx {
project_id,
scope_id,
..
}
| Mutation::CounterAdd {
project_id,
scope_id,
..
}
| Mutation::KvMaxU64 {
project_id,
scope_id,
..
}
| Mutation::KvMinU64 {
project_id,
scope_id,
..
}
| Mutation::KvMutateU64 {
project_id,
scope_id,
..
} => {
'k'.hash(state);
project_id.hash(state);
scope_id.hash(state);
}
Mutation::Accumulate {
project_id,
scope_id,
accumulator_name,
..
}
| Mutation::ExposeAccumulator {
project_id,
scope_id,
accumulator_name,
..
}
| Mutation::ExposeAccumulatorBatch {
project_id,
scope_id,
accumulator_name,
..
}
| Mutation::ReleaseAccumulatorExposure {
project_id,
scope_id,
accumulator_name,
..
} => {
"acc".hash(state);
project_id.hash(state);
scope_id.hash(state);
accumulator_name.hash(state);
}
Mutation::EmitEvent {
project_id,
scope_id,
topic,
..
} => {
"evt".hash(state);
project_id.hash(state);
scope_id.hash(state);
topic.hash(state);
}
Mutation::OrderBookNew {
project_id,
scope_id,
request,
} => {
"ob".hash(state);
project_id.hash(state);
scope_id.hash(state);
request.instrument.hash(state);
}
Mutation::OrderBookCancel {
project_id,
scope_id,
instrument,
..
}
| Mutation::OrderBookCancelReplace {
project_id,
scope_id,
instrument,
..
}
| Mutation::OrderBookMassCancel {
project_id,
scope_id,
instrument,
..
}
| Mutation::OrderBookReduce {
project_id,
scope_id,
instrument,
..
}
| Mutation::OrderBookMatch {
project_id,
scope_id,
instrument,
..
} => {
"ob".hash(state);
project_id.hash(state);
scope_id.hash(state);
instrument.hash(state);
}
Mutation::OrderBookDefineTable {
project_id,
scope_id,
table_id,
..
}
| Mutation::OrderBookDropTable {
project_id,
scope_id,
table_id,
} => {
"obdef".hash(state);
project_id.hash(state);
scope_id.hash(state);
table_id.hash(state);
}
Mutation::OrderBookSetInstrumentConfig {
project_id,
scope_id,
instrument,
..
}
| Mutation::OrderBookSetInstrumentHalted {
project_id,
scope_id,
instrument,
..
} => {
"obcfg".hash(state);
project_id.hash(state);
scope_id.hash(state);
instrument.hash(state);
}
Mutation::Ddl(ddl) => {
"ddl".hash(state);
std::mem::discriminant(ddl).hash(state);
format!("{ddl:?}").hash(state);
}
}
}
pub(super) async fn build_epoch_requests(
pending: &mut VecDeque<CommitRequest>,
min_commits: usize,
max_commits: usize,
deadline: Instant,
rx: &mut tokio_mpsc::Receiver<CommitRequest>,
ingress_closed: bool,
) -> Vec<CommitRequest> {
let mut selected = Vec::new();
let mut epoch_writes = HashSet::new();
let mut epoch_reads = HashSet::new();
let mut epoch_writes_populated = true;
let mut has_economic = false;
let mut has_cross_partition = false;
while selected.len() < max_commits {
if pending.is_empty() {
if !selected.is_empty() && selected.len() >= min_commits {
break;
}
if ingress_closed {
break;
}
let now = Instant::now();
if now >= deadline {
break;
}
match tokio::time::timeout(deadline - now, rx.recv()).await {
Ok(Some(req)) => pending.push_back(req),
_ => break,
}
while let Ok(req) = rx.try_recv() {
pending.push_back(req);
}
continue;
}
let req = if can_select_front_without_conflict_scan(
pending,
&epoch_reads,
has_cross_partition,
has_economic,
) {
pending
.pop_front()
.expect("front request must exist for fast epoch selection")
} else {
if !epoch_writes_populated {
populate_epoch_writes_from_selected(&mut epoch_writes, &selected);
epoch_writes_populated = true;
}
let Some(candidate_idx) = find_compatible_candidate_index(
pending,
&epoch_writes,
&epoch_reads,
has_cross_partition,
has_economic,
) else {
break;
};
if candidate_idx > 0
&& let Some(front) = pending.front_mut()
{
front.defer_count = front.defer_count.saturating_add(1);
if front.defer_count >= MAX_EPOCH_DEFER {
break;
}
}
pending
.remove(candidate_idx)
.expect("compatible pending entry must exist")
};
let candidate_cross_partition = is_cross_partition_write_set(&req.write_partitions);
let candidate_economic = matches!(req.envelope.write_class, WriteClass::Economic);
has_cross_partition |= candidate_cross_partition;
has_economic |= candidate_economic;
if req.read_partitions.is_empty()
&& !has_cross_partition
&& !has_economic
&& epoch_reads.is_empty()
{
epoch_writes_populated = false;
} else {
if !epoch_writes_populated {
populate_epoch_writes_from_selected(&mut epoch_writes, &selected);
epoch_writes_populated = true;
}
epoch_writes.extend(req.write_partitions.iter().cloned());
}
epoch_reads.extend(req.read_partitions.iter().cloned());
selected.push(req);
if has_cross_partition || has_economic {
break;
}
while let Ok(req) = rx.try_recv() {
pending.push_back(req);
}
if selected.len() >= min_commits && pending.is_empty() {
break;
}
if selected.len() >= min_commits && Instant::now() >= deadline {
break;
}
}
selected
}
fn populate_epoch_writes_from_selected(
epoch_writes: &mut HashSet<String>,
selected: &[CommitRequest],
) {
for request in selected {
epoch_writes.extend(request.write_partitions.iter().cloned());
}
}
fn can_select_front_without_conflict_scan(
pending: &VecDeque<CommitRequest>,
epoch_reads: &HashSet<String>,
has_cross_partition: bool,
has_economic: bool,
) -> bool {
if has_cross_partition || has_economic || !epoch_reads.is_empty() {
return false;
}
pending.front().is_some_and(|candidate| {
candidate.read_partitions.is_empty()
&& !is_cross_partition_write_set(&candidate.write_partitions)
&& matches!(candidate.envelope.write_class, WriteClass::Standard)
})
}
pub(super) fn find_compatible_candidate_index(
pending: &VecDeque<CommitRequest>,
epoch_writes: &HashSet<String>,
epoch_reads: &HashSet<String>,
has_cross_partition: bool,
has_economic: bool,
) -> Option<usize> {
for (idx, candidate) in pending.iter().enumerate() {
let write_set = &candidate.write_partitions;
let read_set = &candidate.read_partitions;
let candidate_cross_partition = is_cross_partition_write_set(write_set);
let candidate_economic = matches!(candidate.envelope.write_class, WriteClass::Economic);
let reads_conflict = partition_set_conflicts(read_set, epoch_writes);
let anti_dependency = partition_set_conflicts(write_set, epoch_reads);
let structural_conflict =
has_cross_partition || candidate_cross_partition || has_economic || candidate_economic;
if epoch_writes.is_empty() || !(reads_conflict || anti_dependency || structural_conflict) {
return Some(idx);
}
}
None
}
fn partition_set_conflicts(left: &HashSet<String>, right: &HashSet<String>) -> bool {
if left.is_empty() || right.is_empty() {
return false;
}
let (outer, inner) = if left.len() <= right.len() {
(left, right)
} else {
(right, left)
};
outer
.iter()
.any(|a| inner.iter().any(|b| partition_token_conflicts(a, b)))
}
fn partition_token_conflicts(left: &str, right: &str) -> bool {
if left == right {
return true;
}
match (partition_token_kind(left), partition_token_kind(right)) {
(PartitionTokenKind::Table, PartitionTokenKind::TableRow) => {
table_token_conflicts(left, right)
}
(PartitionTokenKind::TableRow, PartitionTokenKind::Table) => {
table_token_conflicts(right, left)
}
(PartitionTokenKind::KvNamespace, PartitionTokenKind::KvKey) => {
kv_namespace_token_conflicts(left, right)
}
(PartitionTokenKind::KvKey, PartitionTokenKind::KvNamespace) => {
kv_namespace_token_conflicts(right, left)
}
_ => false,
}
}
#[derive(Clone, Copy, PartialEq, Eq)]
enum PartitionTokenKind {
Table,
TableRow,
KvKey,
KvNamespace,
Other,
}
fn partition_token_kind(token: &str) -> PartitionTokenKind {
if token.starts_with("tr:") {
PartitionTokenKind::TableRow
} else if token.starts_with("t:") {
PartitionTokenKind::Table
} else if token.starts_with("kns:") {
PartitionTokenKind::KvNamespace
} else if token.starts_with("k:") {
PartitionTokenKind::KvKey
} else {
PartitionTokenKind::Other
}
}
fn table_token_conflicts(table_or_range: &str, row: &str) -> bool {
let Some(table_rest) = table_or_range.strip_prefix("t:") else {
return false;
};
let Some(row_rest) = row.strip_prefix("tr:") else {
return false;
};
let Some((table_ns, table_name)) = table_rest.rsplit_once(':') else {
return false;
};
let Some((row_prefix, _pk)) = row_rest.rsplit_once(':') else {
return false;
};
let Some((row_ns, row_table)) = row_prefix.rsplit_once(':') else {
return false;
};
table_ns == row_ns && table_name == row_table
}
fn kv_namespace_token_conflicts(namespace_token: &str, key_token: &str) -> bool {
let Some(namespace) = namespace_token.strip_prefix("kns:") else {
return false;
};
let Some(rest) = key_token.strip_prefix("k:") else {
return false;
};
let Some((key_ns, _key)) = rest.rsplit_once(':') else {
return false;
};
namespace == key_ns
}
pub(super) fn process_commit_epoch(
state: &mut ExecutorState,
requests: Vec<CommitRequest>,
) -> EpochProcessResult {
let process_started = Instant::now();
if requests.is_empty() {
return EpochProcessResult::default();
}
if inline_kv_set_epoch_fast_path_eligible(state, &requests) {
return process_inline_kv_set_epoch_fast_path(state, requests, process_started);
}
let epoch_request_count = requests.len();
let mut outcomes = Vec::with_capacity(requests.len());
let mut coordinator_apply_attempts = 0u64;
let mut coordinator_apply_micros = 0u64;
let mut parallel_apply_micros = 0u64;
let mut read_set_conflicts = 0u64;
let mut wal_append_ops = 0u64;
let mut wal_append_bytes = 0u64;
let mut wal_append_micros = 0u64;
let mut wal_sync_ops = 0u64;
let mut wal_sync_micros = 0u64;
let mut sync_executed = false;
let mut working_keyspace = state.keyspace.clone();
let mut working_catalog = state.catalog.clone();
let mut working_idempotency: Option<HashMap<IdempotencyKey, IdempotencyRecord>> = None;
let mut working_global_unique_index = state.global_unique_index.clone();
let mut sequenced = Vec::with_capacity(epoch_request_count);
let mut internal_sequenced = Vec::new();
let mut deferred_parallel_commits = Vec::with_capacity(epoch_request_count);
let mut deferred_parallel_write_tokens = HashSet::new();
let mut next_seq = state.current_seq;
let mut catalog_changed = false;
for mut request in requests {
if request.envelope.write_intent.mutations.is_empty() {
outcomes.push(EpochOutcome {
request,
result: Err(AedbError::Validation(
"transaction envelope has no mutations".into(),
)),
post_apply_delta: None,
});
continue;
}
let effective_idempotency_key = request
.envelope
.idempotency_key
.as_ref()
.map(|key| scoped_idempotency_key(request.envelope.caller.as_ref(), key));
let mut request_fingerprint = None;
if let Some(key) = effective_idempotency_key.clone() {
let fingerprint = match request.envelope.request_fingerprint() {
Ok(fingerprint) => fingerprint,
Err(err) => {
outcomes.push(EpochOutcome {
request,
result: Err(err),
post_apply_delta: None,
});
continue;
}
};
request_fingerprint = Some(fingerprint);
let record = working_idempotency
.as_ref()
.and_then(|map| map.get(&key))
.or_else(|| state.idempotency.get(&key));
if let Some(record) = record {
if record.request_fingerprint != Some(fingerprint) {
outcomes.push(EpochOutcome {
request,
result: Err(AedbError::Validation(
"idempotency key reuse with different request".into(),
)),
post_apply_delta: None,
});
continue;
}
outcomes.push(EpochOutcome {
request,
result: Ok(CommitResult {
commit_seq: record.commit_seq,
durable_head_seq: state.durable_head_seq.max(record.commit_seq),
idempotency: IdempotencyOutcome::Duplicate,
canonical_commit_seq: record.commit_seq,
}),
post_apply_delta: None,
});
continue;
}
}
if request.has_read_set
&& let Err(err) = revalidate_read_set_for_keyspace(&working_keyspace, &request.envelope)
{
if is_read_set_conflict_error(&err) {
read_set_conflicts = read_set_conflicts.saturating_add(1);
}
outcomes.push(EpochOutcome {
request,
result: Err(err),
post_apply_delta: None,
});
continue;
}
let skip_assertions = request.prevalidated && request.assertions_engine_verified;
if request.has_assertions
&& !skip_assertions
&& let Err(err) = evaluate_assertions(
&working_catalog,
&working_keyspace,
&request.envelope.assertions,
state.config.max_scan_rows,
)
{
if let Some(internal) = build_assertion_audit_commit(
&request.envelope,
&err,
&mut working_catalog,
&mut working_keyspace,
&mut next_seq,
) {
internal_sequenced.push(internal);
}
outcomes.push(EpochOutcome {
request,
result: Err(err),
post_apply_delta: None,
});
continue;
}
let mut mutations = std::mem::take(&mut request.envelope.write_intent.mutations);
let caller = request.envelope.caller.as_ref();
if caller.is_some() {
augment_mutations_with_caller(&mut mutations, caller);
}
let mutation_class = classify_commit_mutations(&mutations);
if !request.prevalidated && caller.is_some() {
let mut permission_error = None;
for mutation in &mutations {
if let Err(err) = validate_permissions(&working_catalog, caller, mutation) {
permission_error = Some(err);
break;
}
}
if let Some(err) = permission_error {
outcomes.push(EpochOutcome {
request,
result: Err(err),
post_apply_delta: None,
});
continue;
}
}
let lifecycle_events = if mutation_class.has_ddl || mutation_class.has_emit_event {
match plan_lifecycle_outbox_events_for_classified_mutations(
&working_catalog,
&mutations,
mutation_class.has_ddl,
) {
Ok(events) => events,
Err(err) => {
outcomes.push(EpochOutcome {
request,
result: Err(err),
post_apply_delta: None,
});
continue;
}
}
} else {
Vec::new()
};
let snapshot_seq_before_commit = next_seq;
next_seq = next_seq.saturating_add(1);
let commit_seq = next_seq;
if !lifecycle_events.is_empty() {
match build_lifecycle_outbox_mutation(&lifecycle_events, commit_seq) {
Ok(mutation) => mutations.push(mutation),
Err(err) => {
outcomes.push(EpochOutcome {
request,
result: Err(err),
post_apply_delta: None,
});
next_seq = next_seq.saturating_sub(1);
continue;
}
}
}
let lifecycle_mutation_added = !lifecycle_events.is_empty();
let final_write_partitions = write_partitions_after_lifecycle(
&working_catalog,
&request,
&mutations,
lifecycle_mutation_added,
);
let is_keyspace_only = mutation_class.is_keyspace_only && !lifecycle_mutation_added;
let requires_coordinator = if is_keyspace_only {
false
} else {
request_requires_coordinator(&working_catalog, &request, &mutations)
};
let is_cross_partition = if is_keyspace_only {
keyspace_mutations_cross_partition(&mutations)
} else {
is_cross_partition_write_set(final_write_partitions.as_ref())
};
let can_consider_deferred_parallel_apply = !lifecycle_mutation_added
&& (epoch_request_count > 1
|| mutations.len() >= SINGLE_REQUEST_PARALLEL_APPLY_MUTATION_THRESHOLD)
&& state.config.parallel_apply_enabled
&& matches!(request.envelope.write_class, WriteClass::Standard)
&& !is_cross_partition
&& !requires_coordinator
&& !(kv_set_mutations_same_namespace(&mutations)
&& mutations.len() < SINGLE_REQUEST_PARALLEL_APPLY_MUTATION_THRESHOLD);
let deferred_merge_targets = if can_consider_deferred_parallel_apply
&& request.read_partitions.is_empty()
&& !partition_set_conflicts(
final_write_partitions.as_ref(),
&deferred_parallel_write_tokens,
) {
collect_parallel_merge_targets_if_safe(&working_catalog, &mutations)
} else {
None
};
let can_defer_parallel_apply = deferred_merge_targets.is_some();
if is_cross_partition || requires_coordinator {
coordinator_apply_attempts = coordinator_apply_attempts.saturating_add(1);
let partition_order = canonical_partition_order(final_write_partitions.as_ref());
let coordinator_started = Instant::now();
let apply_result = if is_keyspace_only {
let mut trial_keyspace = working_keyspace.clone();
let result = apply_via_keyspace_only_coordinator(
&mut trial_keyspace,
&mutations,
commit_seq,
&partition_order,
CoordinatorApplyOptions {
coordinator_locking_enabled: state.config.coordinator_locking_enabled,
lock_manager: &state.coordinator_locks,
global_unique_index_enabled: state.config.global_unique_index_enabled,
partition_lock_timeout_ms: state.config.partition_lock_timeout_ms,
max_scan_rows: state.config.max_scan_rows,
},
);
result.map(|()| {
working_keyspace = trial_keyspace;
})
} else {
let mut trial_keyspace = working_keyspace.clone();
let mut trial_catalog = working_catalog.clone();
let mut trial_global_unique_index = working_global_unique_index.clone();
let result = apply_via_coordinator(
&mut trial_catalog,
&mut trial_keyspace,
&mut trial_global_unique_index,
&mutations,
commit_seq,
&partition_order,
CoordinatorApplyOptions {
coordinator_locking_enabled: state.config.coordinator_locking_enabled,
lock_manager: &state.coordinator_locks,
global_unique_index_enabled: state.config.global_unique_index_enabled,
partition_lock_timeout_ms: state.config.partition_lock_timeout_ms,
max_scan_rows: state.config.max_scan_rows,
},
request.envelope.caller.as_ref(),
);
result.map(|()| {
working_keyspace = trial_keyspace;
working_catalog = trial_catalog;
working_global_unique_index = trial_global_unique_index;
})
};
coordinator_apply_micros = coordinator_apply_micros
.saturating_add(coordinator_started.elapsed().as_micros() as u64);
if let Err(err) = apply_result {
outcomes.push(EpochOutcome {
request,
result: Err(err),
post_apply_delta: None,
});
next_seq = next_seq.saturating_sub(1);
continue;
}
} else if !can_defer_parallel_apply {
if is_keyspace_only {
if apply_inline_kv_set_same_namespace(&mut working_keyspace, &mutations, commit_seq)
{
} else if let Some(apply_result) =
apply_kv_set_batch_same_namespace(&mut working_keyspace, &mutations, commit_seq)
{
if let Err(err) = apply_result {
outcomes.push(EpochOutcome {
request,
result: Err(err),
post_apply_delta: None,
});
next_seq = next_seq.saturating_sub(1);
continue;
}
} else {
let mut trial_keyspace = working_keyspace.clone();
let mut apply_error = None;
for mutation in &mutations {
let result =
apply_keyspace_only_mutation(&mut trial_keyspace, mutation, commit_seq)
.expect("checked keyspace-only mutations");
if let Err(err) = result {
apply_error = Some(err);
break;
}
}
if let Some(err) = apply_error {
outcomes.push(EpochOutcome {
request,
result: Err(err),
post_apply_delta: None,
});
next_seq = next_seq.saturating_sub(1);
continue;
}
working_keyspace = trial_keyspace;
}
} else {
let mut trial_keyspace = working_keyspace.clone();
let mut trial_catalog = working_catalog.clone();
let mut apply_error = None;
for mutation in &mutations {
let applied = if request.prevalidated {
apply_mutation_trusted_if_eligible(
&mut trial_catalog,
&mut trial_keyspace,
mutation.clone(),
commit_seq,
request.envelope.base_seq,
snapshot_seq_before_commit,
)
} else {
None
};
match applied {
Some(Ok(())) => {}
Some(Err(err)) => {
apply_error = Some(err);
break;
}
None => {
if let Err(err) = apply_mutation(
&mut trial_catalog,
&mut trial_keyspace,
mutation.clone(),
commit_seq,
Some(state.config.max_scan_rows),
request.envelope.caller.as_ref(),
) {
apply_error = Some(err);
break;
}
}
}
}
if let Some(err) = apply_error {
outcomes.push(EpochOutcome {
request,
result: Err(err),
post_apply_delta: None,
});
next_seq = next_seq.saturating_sub(1);
continue;
}
working_keyspace = trial_keyspace;
working_catalog = trial_catalog;
}
}
let payload_type = if is_keyspace_only {
0x04
} else {
payload_type_for_mutations(&mutations)
};
let payload = match encode_wal_payload_from_parts(
&mutations,
&request.envelope.assertions,
effective_idempotency_key.as_ref(),
request_fingerprint.as_ref(),
Some(request.encoded_len),
) {
Ok(payload) => payload,
Err(err) => {
outcomes.push(EpochOutcome {
request,
result: Err(err),
post_apply_delta: None,
});
next_seq = next_seq.saturating_sub(1);
continue;
}
};
let commit_ts_micros = now_micros();
if let Some(key) = effective_idempotency_key {
working_idempotency
.get_or_insert_with(|| state.idempotency.clone())
.insert(
key,
IdempotencyRecord {
commit_seq,
recorded_at_micros: commit_ts_micros,
request_fingerprint,
},
);
}
if !catalog_changed
&& !is_keyspace_only
&& mutations.iter().any(|m| matches!(m, Mutation::Ddl(_)))
{
catalog_changed = true;
}
let delta = Arc::new(CommitDelta {
seq: commit_seq,
mutations,
});
let deferred_commit_index = sequenced.len();
if can_defer_parallel_apply {
deferred_parallel_write_tokens.extend(final_write_partitions.iter().cloned());
}
sequenced.push(SequencedCommit {
request,
seq: commit_seq,
commit_ts_micros,
payload_type,
payload,
delta,
});
if can_defer_parallel_apply {
deferred_parallel_commits.push(DeferredParallelCommit {
sequenced_index: deferred_commit_index,
merge_targets: deferred_merge_targets
.expect("checked deferred parallel merge targets"),
});
}
}
if sequenced.is_empty() && internal_sequenced.is_empty() {
return EpochProcessResult {
outcomes,
coordinator_apply_attempts,
coordinator_apply_micros,
parallel_apply_micros,
pre_wal_micros: process_started.elapsed().as_micros() as u64,
finalize_micros: 0,
read_set_conflicts,
wal_append_ops,
wal_append_bytes,
wal_append_micros,
wal_sync_ops,
wal_sync_micros,
sync_executed,
catalog_changed,
};
}
if !deferred_parallel_commits.is_empty() {
let parallel_apply_started = Instant::now();
let parallel_apply_result = apply_deferred_parallel_single_partition_commits(
&working_catalog,
&mut working_keyspace,
&state.parallel_runtime,
&sequenced,
&deferred_parallel_commits,
state.config.epoch_apply_timeout_ms,
state.config.max_scan_rows,
);
parallel_apply_micros = parallel_apply_micros
.saturating_add(parallel_apply_started.elapsed().as_micros() as u64);
if let Err(err) = parallel_apply_result {
let wrapped = format!("epoch aborted during parallel apply: {err}");
for failed in sequenced {
let surfaced = match &err {
AedbError::EpochApplyTimeout => AedbError::EpochApplyTimeout,
AedbError::ParallelApplyCancelled => AedbError::ParallelApplyCancelled,
AedbError::ParallelApplyWorkerPanicked => {
AedbError::ParallelApplyWorkerPanicked
}
AedbError::AssertionFailed {
index,
assertion,
actual,
} => AedbError::AssertionFailed {
index: *index,
assertion: assertion.clone(),
actual: actual.clone(),
},
_ => AedbError::Validation(wrapped.clone()),
};
outcomes.push(EpochOutcome {
request: failed.request,
result: Err(surfaced),
post_apply_delta: None,
});
}
return EpochProcessResult {
outcomes,
coordinator_apply_attempts,
coordinator_apply_micros,
parallel_apply_micros,
pre_wal_micros: process_started.elapsed().as_micros() as u64,
finalize_micros: 0,
read_set_conflicts,
wal_append_ops,
wal_append_bytes,
wal_append_micros,
wal_sync_ops,
wal_sync_micros,
sync_executed,
catalog_changed,
};
}
}
let mut wal_payload_size_bytes = 0usize;
let requires_sync = sequenced
.iter()
.any(|c| matches!(c.request.envelope.write_class, WriteClass::Economic))
|| matches!(state.config.durability_mode, DurabilityMode::Full);
let mut user_idx = 0usize;
let mut internal_idx = 0usize;
let mut wal_frames =
Vec::with_capacity(sequenced.len().saturating_add(internal_sequenced.len()));
while user_idx < sequenced.len() || internal_idx < internal_sequenced.len() {
let next_is_user = match (
sequenced.get(user_idx).map(|c| c.seq),
internal_sequenced.get(internal_idx).map(|c| c.seq),
) {
(Some(user_seq), Some(internal_seq)) => user_seq <= internal_seq,
(Some(_), None) => true,
(None, Some(_)) => false,
(None, None) => false,
};
let (seq, ts, payload_type, payload) = if next_is_user {
let c = &sequenced[user_idx];
user_idx += 1;
(c.seq, c.commit_ts_micros, c.payload_type, &c.payload)
} else {
let c = &internal_sequenced[internal_idx];
internal_idx += 1;
(c.seq, c.commit_ts_micros, c.payload_type, &c.payload)
};
let payload_size_bytes = payload.len();
wal_payload_size_bytes = wal_payload_size_bytes.saturating_add(payload_size_bytes);
wal_frames.push(PendingFrame {
seq,
timestamp_micros: ts,
payload_type,
payload,
});
}
let pre_wal_micros = process_started.elapsed().as_micros() as u64;
let mut pre_wal_memory_estimate = working_keyspace.estimate_memory_bytes();
if pre_wal_memory_estimate > state.config.max_memory_estimate_bytes {
pre_wal_memory_estimate = match working_keyspace
.spill_kv_values_to_memory_target(state.config.max_memory_estimate_bytes)
{
Ok(memory_estimate) => memory_estimate,
Err(err) => {
overwrite_assertion_failures_with_wal_error(
&mut outcomes,
&err,
"epoch aborted during persistent value spill",
);
for failed in sequenced {
outcomes.push(EpochOutcome {
request: failed.request,
result: Err(AedbError::Validation(format!(
"epoch aborted during persistent value spill: {err}"
))),
post_apply_delta: None,
});
}
return EpochProcessResult {
outcomes,
coordinator_apply_attempts,
coordinator_apply_micros,
parallel_apply_micros,
pre_wal_micros,
finalize_micros: 0,
read_set_conflicts,
wal_append_ops,
wal_append_bytes,
wal_append_micros,
wal_sync_ops,
wal_sync_micros,
sync_executed,
catalog_changed,
};
}
};
}
if pre_wal_memory_estimate > state.config.max_memory_estimate_bytes {
pre_wal_memory_estimate = match working_keyspace
.flush_kv_to_segments_to_memory_target(state.config.max_memory_estimate_bytes)
{
Ok(memory_estimate) => memory_estimate,
Err(err) => {
overwrite_assertion_failures_with_wal_error(
&mut outcomes,
&err,
"epoch aborted during KV segment flush",
);
for failed in sequenced {
outcomes.push(EpochOutcome {
request: failed.request,
result: Err(AedbError::Validation(format!(
"epoch aborted during KV segment flush: {err}"
))),
post_apply_delta: None,
});
}
return EpochProcessResult {
outcomes,
coordinator_apply_attempts,
coordinator_apply_micros,
parallel_apply_micros,
pre_wal_micros,
finalize_micros: 0,
read_set_conflicts,
wal_append_ops,
wal_append_bytes,
wal_append_micros,
wal_sync_ops,
wal_sync_micros,
sync_executed,
catalog_changed,
};
}
};
}
if pre_wal_memory_estimate > state.config.max_memory_estimate_bytes {
let err_message = format!(
"memory estimate exceeded before WAL commit: memory_estimate_bytes={}, max_memory_estimate_bytes={}",
pre_wal_memory_estimate, state.config.max_memory_estimate_bytes
);
let err = AedbError::Validation(err_message.clone());
overwrite_assertion_failures_with_wal_error(
&mut outcomes,
&err,
"epoch aborted before WAL commit",
);
for failed in sequenced {
outcomes.push(EpochOutcome {
request: failed.request,
result: Err(AedbError::Validation(err_message.clone())),
post_apply_delta: None,
});
}
return EpochProcessResult {
outcomes,
coordinator_apply_attempts,
coordinator_apply_micros,
parallel_apply_micros,
pre_wal_micros,
finalize_micros: 0,
read_set_conflicts,
wal_append_ops,
wal_append_bytes,
wal_append_micros,
wal_sync_ops,
wal_sync_micros,
sync_executed,
catalog_changed,
};
}
let append_started = Instant::now();
if let Err(err) = state.wal.append_frames_with_sync(&wal_frames, false) {
let err = AedbError::Io(std::io::Error::other(err.to_string()));
overwrite_assertion_failures_with_wal_error(
&mut outcomes,
&err,
"epoch aborted before WAL commit",
);
for failed in sequenced {
outcomes.push(EpochOutcome {
request: failed.request,
result: Err(AedbError::Validation(format!(
"epoch aborted before WAL commit: {err}"
))),
post_apply_delta: None,
});
}
return EpochProcessResult {
outcomes,
coordinator_apply_attempts,
coordinator_apply_micros,
parallel_apply_micros,
pre_wal_micros,
finalize_micros: 0,
read_set_conflicts,
wal_append_ops,
wal_append_bytes,
wal_append_micros,
wal_sync_ops,
wal_sync_micros,
sync_executed,
catalog_changed,
};
}
if !wal_frames.is_empty() {
wal_append_ops = wal_append_ops.saturating_add(1);
wal_append_bytes = wal_append_bytes.saturating_add(wal_payload_size_bytes as u64);
wal_append_micros =
wal_append_micros.saturating_add(append_started.elapsed().as_micros() as u64);
}
if requires_sync {
let sync_started = Instant::now();
if let Err(err) = working_keyspace.sync_persistent_value_store() {
overwrite_assertion_failures_with_wal_error(
&mut outcomes,
&err,
"epoch aborted during persistent value sync",
);
for failed in sequenced {
outcomes.push(EpochOutcome {
request: failed.request,
result: Err(AedbError::Validation(format!(
"epoch aborted during persistent value sync: {err}"
))),
post_apply_delta: None,
});
}
return EpochProcessResult {
outcomes,
coordinator_apply_attempts,
coordinator_apply_micros,
parallel_apply_micros,
pre_wal_micros,
finalize_micros: 0,
read_set_conflicts,
wal_append_ops,
wal_append_bytes,
wal_append_micros,
wal_sync_ops,
wal_sync_micros,
sync_executed,
catalog_changed,
};
}
if let Err(err) = state.wal.sync_active() {
let err = AedbError::Io(std::io::Error::other(err.to_string()));
overwrite_assertion_failures_with_wal_error(
&mut outcomes,
&err,
"epoch aborted during WAL sync",
);
for failed in sequenced {
outcomes.push(EpochOutcome {
request: failed.request,
result: Err(AedbError::Validation(format!(
"epoch aborted during WAL sync: {err}"
))),
post_apply_delta: None,
});
}
return EpochProcessResult {
outcomes,
coordinator_apply_attempts,
coordinator_apply_micros,
parallel_apply_micros,
pre_wal_micros,
finalize_micros: 0,
read_set_conflicts,
wal_append_ops,
wal_append_bytes,
wal_append_micros,
wal_sync_ops,
wal_sync_micros,
sync_executed,
catalog_changed,
};
}
wal_sync_ops = wal_sync_ops.saturating_add(1);
wal_sync_micros = wal_sync_micros.saturating_add(sync_started.elapsed().as_micros() as u64);
sync_executed = true;
}
let last_user_seq = sequenced.last().map(|c| c.seq).unwrap_or(state.current_seq);
let last_internal_seq = internal_sequenced
.last()
.map(|c| c.seq)
.unwrap_or(state.current_seq);
let last_seq = last_user_seq.max(last_internal_seq);
debug_assert!(
last_seq >= state.current_seq,
"commit seq must be monotonic across epochs"
);
state.keyspace = working_keyspace;
state.catalog = working_catalog;
state.global_unique_index = working_global_unique_index;
if let Some(updated) = working_idempotency {
state.idempotency = updated;
}
state.current_seq = last_seq;
state.visible_head_seq = last_seq;
match state.config.durability_mode {
DurabilityMode::Full => {
state.durable_head_seq = last_seq;
state.pending_batch_bytes = 0;
state.pending_batch_max_seq = state.durable_head_seq;
}
DurabilityMode::Batch => {
if requires_sync {
state.durable_head_seq = last_seq;
state.pending_batch_bytes = 0;
state.pending_batch_max_seq = state.durable_head_seq;
} else {
state.pending_batch_bytes = state
.pending_batch_bytes
.saturating_add(wal_payload_size_bytes);
state.pending_batch_max_seq = last_seq;
if state.pending_batch_bytes >= state.config.batch_max_bytes {
let sync_started = Instant::now();
if state.keyspace.sync_persistent_value_store().is_ok()
&& state.wal.sync_active().is_ok()
{
wal_sync_ops = wal_sync_ops.saturating_add(1);
wal_sync_micros = wal_sync_micros
.saturating_add(sync_started.elapsed().as_micros() as u64);
sync_executed = true;
state.durable_head_seq = state.pending_batch_max_seq;
state.pending_batch_bytes = 0;
state.pending_batch_max_seq = state.durable_head_seq;
}
}
}
}
DurabilityMode::OsBuffered => {}
}
debug_assert!(
state.durable_head_seq <= state.visible_head_seq,
"durable head cannot exceed visible head"
);
prune_idempotency(state);
let forced_full_snapshot = state.version_store.publish_deltas(
sequenced
.iter()
.map(|commit| (commit.seq, Arc::clone(&commit.delta)))
.chain(
internal_sequenced
.iter()
.map(|commit| (commit.seq, Arc::clone(&commit.delta))),
),
);
let snapshot_due = now_micros().saturating_sub(state.last_full_snapshot_micros)
>= state.config.max_snapshot_age_ms.saturating_mul(1000);
if snapshot_due || forced_full_snapshot {
state.version_store.publish_full(
state.visible_head_seq,
state.keyspace.snapshot(),
state.catalog.snapshot(),
);
state.last_full_snapshot_micros = now_micros();
}
let now_micros = now_micros();
if now_micros.saturating_sub(state.last_memory_estimate_micros)
>= MEMORY_ESTIMATE_INTERVAL_MICROS
{
state.last_memory_estimate_micros = now_micros;
let mem_estimate = state.keyspace.estimate_memory_bytes();
if mem_estimate > state.config.max_memory_estimate_bytes {
warn!(
mem_estimate,
max_memory_estimate_bytes = state.config.max_memory_estimate_bytes,
"aedb memory estimate exceeded threshold"
);
}
}
if state.wal.should_rotate().is_some() {
let _ = state
.wal
.rotate()
.map_err(|e| AedbError::Io(std::io::Error::other(e.to_string())));
}
let durable_head = state.durable_head_seq;
let finalize_started = Instant::now();
for commit in sequenced {
outcomes.push(EpochOutcome {
request: commit.request,
result: Ok(CommitResult {
commit_seq: commit.seq,
durable_head_seq: durable_head,
idempotency: IdempotencyOutcome::Applied,
canonical_commit_seq: commit.seq,
}),
post_apply_delta: Some(commit.delta),
});
}
EpochProcessResult {
outcomes,
coordinator_apply_attempts,
coordinator_apply_micros,
parallel_apply_micros,
pre_wal_micros,
finalize_micros: finalize_started.elapsed().as_micros() as u64,
read_set_conflicts,
wal_append_ops,
wal_append_bytes,
wal_append_micros,
wal_sync_ops,
wal_sync_micros,
sync_executed,
catalog_changed,
}
}
fn inline_kv_set_epoch_fast_path_eligible(
state: &ExecutorState,
requests: &[CommitRequest],
) -> bool {
if requests.is_empty() {
return false;
}
let mut memory_upper_bound = state.keyspace.estimate_memory_bytes();
for request in requests {
if request.has_read_set
|| request.has_assertions
|| request.envelope.caller.is_some()
|| request.envelope.idempotency_key.is_some()
|| !matches!(request.envelope.write_class, WriteClass::Standard)
|| request.envelope.write_intent.mutations.is_empty()
|| request.envelope.write_intent.mutations.len()
>= SINGLE_REQUEST_PARALLEL_APPLY_MUTATION_THRESHOLD
|| request.write_partitions.is_empty()
|| keyspace_mutations_cross_partition(&request.envelope.write_intent.mutations)
{
return false;
}
for mutation in &request.envelope.write_intent.mutations {
let Mutation::KvSet { key, value, .. } = mutation else {
return false;
};
if parallel_worker_test_hook_key(key)
|| value.len() > state.keyspace.persistent_value_inline_threshold_bytes
{
return false;
}
memory_upper_bound = memory_upper_bound.saturating_add(
crate::storage::keyspace::kv_inline_entry_cost(key.len(), value.len()),
);
if memory_upper_bound > state.config.max_memory_estimate_bytes {
return false;
}
}
}
true
}
fn process_inline_kv_set_epoch_fast_path(
state: &mut ExecutorState,
requests: Vec<CommitRequest>,
process_started: Instant,
) -> EpochProcessResult {
let mut outcomes = Vec::with_capacity(requests.len());
let mut sequenced = Vec::with_capacity(requests.len());
let mut next_seq = state.current_seq;
for mut request in requests {
let mutations = std::mem::take(&mut request.envelope.write_intent.mutations);
next_seq = next_seq.saturating_add(1);
let commit_seq = next_seq;
let payload = match encode_wal_payload_from_parts(
&mutations,
&[],
None,
None,
Some(request.encoded_len),
) {
Ok(payload) => payload,
Err(err) => {
next_seq = next_seq.saturating_sub(1);
outcomes.push(EpochOutcome {
request,
result: Err(err),
post_apply_delta: None,
});
continue;
}
};
let delta = Arc::new(CommitDelta {
seq: commit_seq,
mutations,
});
sequenced.push(SequencedCommit {
request,
seq: commit_seq,
commit_ts_micros: now_micros(),
payload_type: 0x04,
payload,
delta,
});
}
if sequenced.is_empty() {
return EpochProcessResult {
outcomes,
pre_wal_micros: process_started.elapsed().as_micros() as u64,
..EpochProcessResult::default()
};
}
let mut wal_payload_size_bytes = 0usize;
let wal_frames = sequenced
.iter()
.map(|commit| {
wal_payload_size_bytes = wal_payload_size_bytes.saturating_add(commit.payload.len());
PendingFrame {
seq: commit.seq,
timestamp_micros: commit.commit_ts_micros,
payload_type: commit.payload_type,
payload: &commit.payload,
}
})
.collect::<Vec<_>>();
let pre_wal_micros = process_started.elapsed().as_micros() as u64;
let append_started = Instant::now();
let mut wal_append_ops = 0u64;
let mut wal_append_bytes = 0u64;
let mut wal_append_micros = 0u64;
let mut wal_sync_ops = 0u64;
let mut wal_sync_micros = 0u64;
let mut sync_executed = false;
if let Err(err) = state.wal.append_frames_with_sync(&wal_frames, false) {
let err = AedbError::Io(std::io::Error::other(err.to_string()));
for failed in sequenced {
outcomes.push(EpochOutcome {
request: failed.request,
result: Err(AedbError::Validation(format!(
"epoch aborted before WAL commit: {err}"
))),
post_apply_delta: None,
});
}
return EpochProcessResult {
outcomes,
pre_wal_micros,
wal_append_ops,
wal_append_bytes,
wal_append_micros,
wal_sync_ops,
wal_sync_micros,
..EpochProcessResult::default()
};
}
wal_append_ops = 1;
wal_append_bytes = wal_payload_size_bytes as u64;
wal_append_micros = append_started.elapsed().as_micros() as u64;
if matches!(state.config.durability_mode, DurabilityMode::Full) {
let sync_started = Instant::now();
if let Err(err) = state.wal.sync_active() {
let err = AedbError::Io(std::io::Error::other(err.to_string()));
for failed in sequenced {
outcomes.push(EpochOutcome {
request: failed.request,
result: Err(AedbError::Validation(format!(
"epoch aborted during WAL sync: {err}"
))),
post_apply_delta: None,
});
}
return EpochProcessResult {
outcomes,
pre_wal_micros,
wal_append_ops,
wal_append_bytes,
wal_append_micros,
wal_sync_ops,
wal_sync_micros,
..EpochProcessResult::default()
};
}
wal_sync_ops = 1;
wal_sync_micros = sync_started.elapsed().as_micros() as u64;
sync_executed = true;
}
if let Some((project_id, scope_id)) = inline_kv_set_epoch_namespace(&sequenced) {
state.keyspace.kv_set_many_inline_same_namespace_with_seq(
project_id,
scope_id,
sequenced.iter().flat_map(|commit| {
commit.delta.mutations.iter().map(move |mutation| {
let Mutation::KvSet { key, value, .. } = mutation else {
unreachable!("inline KV fast path only sequences KvSet commits");
};
(key, value, commit.seq)
})
}),
);
} else {
for commit in &sequenced {
for mutation in &commit.delta.mutations {
let Mutation::KvSet {
project_id,
scope_id,
key,
value,
} = mutation
else {
unreachable!("inline KV fast path only sequences KvSet commits");
};
state.keyspace.kv_set_inline(
project_id,
scope_id,
key.clone(),
value.clone(),
commit.seq,
);
}
}
}
let last_seq = sequenced
.last()
.map(|commit| commit.seq)
.unwrap_or(state.current_seq);
state.current_seq = last_seq;
state.visible_head_seq = last_seq;
match state.config.durability_mode {
DurabilityMode::Full => {
state.durable_head_seq = last_seq;
state.pending_batch_bytes = 0;
state.pending_batch_max_seq = state.durable_head_seq;
}
DurabilityMode::Batch => {
state.pending_batch_bytes = state
.pending_batch_bytes
.saturating_add(wal_payload_size_bytes);
state.pending_batch_max_seq = last_seq;
if state.pending_batch_bytes >= state.config.batch_max_bytes {
let sync_started = Instant::now();
if state.keyspace.sync_persistent_value_store().is_ok()
&& state.wal.sync_active().is_ok()
{
wal_sync_ops = wal_sync_ops.saturating_add(1);
wal_sync_micros =
wal_sync_micros.saturating_add(sync_started.elapsed().as_micros() as u64);
sync_executed = true;
state.durable_head_seq = state.pending_batch_max_seq;
state.pending_batch_bytes = 0;
state.pending_batch_max_seq = state.durable_head_seq;
}
}
}
DurabilityMode::OsBuffered => {}
}
debug_assert!(
state.durable_head_seq <= state.visible_head_seq,
"durable head cannot exceed visible head"
);
prune_idempotency(state);
let forced_full_snapshot = state.version_store.publish_deltas(
sequenced
.iter()
.map(|commit| (commit.seq, Arc::clone(&commit.delta))),
);
let snapshot_due = now_micros().saturating_sub(state.last_full_snapshot_micros)
>= state.config.max_snapshot_age_ms.saturating_mul(1000);
if snapshot_due || forced_full_snapshot {
state.version_store.publish_full(
state.visible_head_seq,
state.keyspace.snapshot(),
state.catalog.snapshot(),
);
state.last_full_snapshot_micros = now_micros();
}
let now_micros = now_micros();
if now_micros.saturating_sub(state.last_memory_estimate_micros)
>= MEMORY_ESTIMATE_INTERVAL_MICROS
{
state.last_memory_estimate_micros = now_micros;
let mem_estimate = state.keyspace.estimate_memory_bytes();
if mem_estimate > state.config.max_memory_estimate_bytes {
warn!(
mem_estimate,
max_memory_estimate_bytes = state.config.max_memory_estimate_bytes,
"aedb memory estimate exceeded threshold"
);
}
}
if state.wal.should_rotate().is_some() {
let _ = state
.wal
.rotate()
.map_err(|e| AedbError::Io(std::io::Error::other(e.to_string())));
}
let durable_head = state.durable_head_seq;
let finalize_started = Instant::now();
for commit in sequenced {
outcomes.push(EpochOutcome {
request: commit.request,
result: Ok(CommitResult {
commit_seq: commit.seq,
durable_head_seq: durable_head,
idempotency: IdempotencyOutcome::Applied,
canonical_commit_seq: commit.seq,
}),
post_apply_delta: Some(commit.delta),
});
}
EpochProcessResult {
outcomes,
pre_wal_micros,
finalize_micros: finalize_started.elapsed().as_micros() as u64,
wal_append_ops,
wal_append_bytes,
wal_append_micros,
wal_sync_ops,
wal_sync_micros,
sync_executed,
..EpochProcessResult::default()
}
}
fn inline_kv_set_epoch_namespace(sequenced: &[SequencedCommit]) -> Option<(&str, &str)> {
let first_mutation = sequenced.first()?.delta.mutations.first()?;
let Mutation::KvSet {
project_id,
scope_id,
..
} = first_mutation
else {
return None;
};
for commit in sequenced {
for mutation in &commit.delta.mutations {
let Mutation::KvSet {
project_id: candidate_project,
scope_id: candidate_scope,
..
} = mutation
else {
return None;
};
if candidate_project != project_id || candidate_scope != scope_id {
return None;
}
}
}
Some((project_id, scope_id))
}
fn overwrite_assertion_failures_with_wal_error(
outcomes: &mut [EpochOutcome],
wal_error: &AedbError,
context: &str,
) {
for outcome in outcomes {
if matches!(outcome.result, Err(AedbError::AssertionFailed { .. })) {
outcome.result = Err(AedbError::Validation(format!("{context}: {wal_error}")));
}
}
}
fn is_read_set_conflict_error(err: &AedbError) -> bool {
match err {
AedbError::Conflict(msg) => {
msg.starts_with("read set conflict")
|| msg.starts_with("range read set conflict")
|| msg.starts_with("range structural conflict")
}
AedbError::Validation(msg) => {
msg.starts_with("read set conflict")
|| msg.starts_with("range read set conflict")
|| msg.starts_with("range structural conflict")
}
_ => false,
}
}
const ASSERTION_AUDIT_TABLE: &str = "assertion_audit";
const ASSERTION_AUDIT_SCOPE_ID: &str = "app";
const LIFECYCLE_OUTBOX_TABLE: &str = "lifecycle_outbox";
const LIFECYCLE_OUTBOX_SCOPE_ID: &str = "app";
fn plan_lifecycle_outbox_events_for_classified_mutations(
catalog: &Catalog,
mutations: &[Mutation],
has_ddl: bool,
) -> Result<Vec<crate::lib_helpers::LifecycleEventTemplate>, AedbError> {
let mut planned_catalog = has_ddl.then(|| catalog.clone());
let mut events = Vec::new();
for mutation in mutations {
match mutation {
Mutation::Ddl(op) => {
let Some(working_catalog) = planned_catalog.as_mut() else {
continue;
};
let applied = ddl_would_apply(working_catalog, op);
working_catalog.apply_ddl(op.clone())?;
if applied && let Some(event) = lifecycle_template_for_ddl(op) {
events.push(event);
}
}
_ => {
events.extend(lifecycle_templates_for_mutation(mutation));
}
}
}
Ok(events)
}
fn build_lifecycle_outbox_mutation(
templates: &[crate::lib_helpers::LifecycleEventTemplate],
lifecycle_commit_seq: u64,
) -> Result<Mutation, AedbError> {
if templates.is_empty() {
return Err(AedbError::Validation(
"lifecycle outbox mutation requires at least one event".into(),
));
}
let events: Vec<crate::LifecycleEvent> = templates
.iter()
.cloned()
.map(|t| t.with_seq(lifecycle_commit_seq))
.collect();
let events_json =
serde_json::to_string(&events).map_err(|e| AedbError::Encode(e.to_string()))?;
let ts_micros = now_micros();
Ok(Mutation::Upsert {
project_id: SYSTEM_PROJECT_ID.to_string(),
scope_id: LIFECYCLE_OUTBOX_SCOPE_ID.to_string(),
table_name: LIFECYCLE_OUTBOX_TABLE.to_string(),
primary_key: vec![Value::Integer(lifecycle_commit_seq as i64)],
row: Row::from_values(vec![
Value::Integer(lifecycle_commit_seq as i64),
Value::Timestamp(ts_micros as i64),
Value::Integer(events.len() as i64),
Value::Json(events_json.into()),
]),
})
}
fn build_assertion_audit_commit(
envelope: &TransactionEnvelope,
err: &AedbError,
catalog: &mut Catalog,
keyspace: &mut Keyspace,
next_seq: &mut u64,
) -> Option<InternalSequencedCommit> {
let AedbError::AssertionFailed {
index,
assertion,
actual,
} = err
else {
return None;
};
let assertion_json =
serde_json::to_string(assertion).unwrap_or_else(|_| format!("{assertion:?}"));
let actual_json = serde_json::to_string(actual).unwrap_or_else(|_| format!("{actual:?}"));
let caller_id = envelope
.caller
.as_ref()
.map(|c| Value::Text(c.caller_id.clone().into()))
.unwrap_or(Value::Null);
let ts_micros = now_micros();
*next_seq = next_seq.saturating_add(1);
let commit_seq = *next_seq;
let mutations = vec![Mutation::Upsert {
project_id: SYSTEM_PROJECT_ID.to_string(),
scope_id: ASSERTION_AUDIT_SCOPE_ID.to_string(),
table_name: ASSERTION_AUDIT_TABLE.to_string(),
primary_key: vec![Value::Integer(commit_seq as i64)],
row: Row::from_values(vec![
Value::Integer(commit_seq as i64),
Value::Timestamp(ts_micros as i64),
caller_id,
Value::Integer(*index as i64),
Value::Json(assertion_json.into()),
Value::Json(actual_json.into()),
Value::Text("assertion_failed".into()),
]),
}];
for mutation in &mutations {
if let Err(apply_err) = apply_mutation(
catalog,
keyspace,
mutation.to_owned(),
commit_seq,
None,
None,
) {
warn!(error = ?apply_err, "failed to apply assertion audit mutation");
*next_seq = next_seq.saturating_sub(1);
return None;
}
}
let payload = match encode_wal_payload_from_parts(&mutations, &[], None, None, None) {
Ok(payload) => payload,
Err(encode_err) => {
warn!(error = ?encode_err, "failed to encode assertion audit payload");
*next_seq = next_seq.saturating_sub(1);
return None;
}
};
Some(InternalSequencedCommit {
seq: commit_seq,
commit_ts_micros: ts_micros,
payload_type: payload_type_for_mutations(&mutations),
payload,
delta: Arc::new(CommitDelta {
seq: commit_seq,
mutations,
}),
})
}
#[cfg(test)]
pub(super) fn is_parallel_single_partition_apply_candidate(
request: &CommitRequest,
mutations: &[Mutation],
catalog: &Catalog,
) -> bool {
if !request.read_partitions.is_empty() {
return false;
}
if is_cross_partition_write_set(&request.write_partitions) {
return false;
}
let mut ns: Option<NamespaceId> = None;
for mutation in mutations {
let Some(current_ns) = namespace_id_for_parallel_mutation(mutation) else {
return false;
};
if let Some(existing) = &ns {
if existing != ¤t_ns {
return false;
}
} else {
ns = Some(current_ns);
}
if !is_parallel_mutation_safe(catalog, mutation) {
return false;
}
}
if request_requires_coordinator(catalog, request, mutations) {
return false;
}
true
}
pub(super) fn request_requires_coordinator(
catalog: &Catalog,
request: &CommitRequest,
mutations: &[Mutation],
) -> bool {
request.write_partitions.contains(GLOBAL_PARTITION_TOKEN)
|| mutations
.iter()
.any(|m| mutation_requires_coordinator(catalog, m))
}
pub(super) fn apply_deferred_parallel_single_partition_commits(
catalog: &Catalog,
keyspace: &mut Keyspace,
runtime: &Arc<ParallelApplyRuntime>,
sequenced: &[SequencedCommit],
deferred_commits: &[DeferredParallelCommit],
epoch_apply_timeout_ms: u64,
max_scan_rows: usize,
) -> Result<(), AedbError> {
let started = Instant::now();
let mut receivers = Vec::with_capacity(deferred_commits.len());
let mut cancellations = Vec::with_capacity(deferred_commits.len());
let backend = keyspace.primary_index_backend;
let shared_catalog = Arc::new(catalog.clone());
for deferred in deferred_commits {
let commit = sequenced
.get(deferred.sequenced_index)
.expect("deferred index must reference sequenced commit");
let seq = commit.seq;
let mutations = commit.delta.mutations.clone();
let ns_id = deferred
.merge_targets
.namespace_id
.clone()
.ok_or_else(|| AedbError::Validation("parallel apply namespace missing".into()))?;
let base_namespace =
keyspace
.namespaces
.get(&ns_id)
.cloned()
.unwrap_or_else(|| Namespace {
id: ns_id.clone(),
tables: Default::default(),
kv: KvData::default(),
accumulators: Default::default(),
});
let cancel = Arc::new(AtomicBool::new(false));
let (tx, rx) = std_mpsc::channel::<Result<(NamespaceId, Namespace), AedbError>>();
receivers.push(rx);
runtime.submit(ParallelTask {
namespace_id: ns_id,
base_namespace,
mutations,
commit_seq: seq,
backend,
value_store: keyspace.value_store.clone(),
kv_segment_store: keyspace.kv_segment_store.clone(),
persistent_value_inline_threshold_bytes: keyspace
.persistent_value_inline_threshold_bytes,
catalog: Arc::clone(&shared_catalog),
max_scan_rows,
caller: commit.request.envelope.caller.clone(),
cancel: Arc::clone(&cancel),
response_tx: tx,
})?;
cancellations.push(cancel);
}
for (deferred, rx) in deferred_commits.iter().zip(receivers.into_iter()) {
let elapsed = started.elapsed();
if elapsed > Duration::from_millis(epoch_apply_timeout_ms) {
for c in &cancellations {
c.store(true, Ordering::Relaxed);
}
return Err(AedbError::EpochApplyTimeout);
}
let remaining = Duration::from_millis(epoch_apply_timeout_ms).saturating_sub(elapsed);
let result = match rx.recv_timeout(remaining) {
Ok(result) => result,
Err(std_mpsc::RecvTimeoutError::Timeout) => {
for c in &cancellations {
c.store(true, Ordering::Relaxed);
}
return Err(AedbError::EpochApplyTimeout);
}
Err(std_mpsc::RecvTimeoutError::Disconnected) => {
for c in &cancellations {
c.store(true, Ordering::Relaxed);
}
return Err(AedbError::ParallelApplyWorkerPanicked);
}
};
let (ns_id, namespace) = result?;
merge_parallel_namespace_result(keyspace, &ns_id, &namespace, &deferred.merge_targets);
}
if started.elapsed() > Duration::from_millis(epoch_apply_timeout_ms) {
for c in &cancellations {
c.store(true, Ordering::Relaxed);
}
return Err(AedbError::EpochApplyTimeout);
}
Ok(())
}
fn collect_parallel_merge_targets_if_safe(
catalog: &Catalog,
mutations: &[Mutation],
) -> Option<ParallelMergeTargets> {
let mut targets = ParallelMergeTargets::default();
for mutation in mutations {
if !is_parallel_mutation_safe(catalog, mutation) {
return None;
}
let current_ns = namespace_id_for_parallel_mutation(mutation)?;
match &targets.namespace_id {
Some(existing) if existing != ¤t_ns => return None,
None => targets.namespace_id = Some(current_ns),
_ => {}
}
match mutation {
Mutation::KvSet { key, .. }
| Mutation::KvDel { key, .. }
| Mutation::KvIncU256 { key, .. }
| Mutation::KvDecU256 { key, .. }
| Mutation::KvAddU256Ex { key, .. }
| Mutation::KvSubU256Ex { key, .. }
| Mutation::KvMaxU256 { key, .. }
| Mutation::KvMinU256 { key, .. }
| Mutation::KvMutateU256 { key, .. }
| Mutation::KvAddU64Ex { key, .. }
| Mutation::KvSubU64Ex { key, .. }
| Mutation::KvSubIntEx { key, .. }
| Mutation::KvMaxU64 { key, .. }
| Mutation::KvMinU64 { key, .. }
| Mutation::KvMutateU64 { key, .. } => {
targets.kv_keys.insert(key.clone());
}
Mutation::CounterAdd {
key,
shard_count,
shard_hint,
..
} => {
let shard =
crate::commit::validation::counter_shard_index(*shard_hint, *shard_count);
targets
.kv_keys
.insert(crate::commit::validation::counter_shard_storage_key(
key, shard,
));
}
Mutation::Accumulate {
accumulator_name, ..
}
| Mutation::ExposeAccumulator {
accumulator_name, ..
}
| Mutation::ExposeAccumulatorBatch {
accumulator_name, ..
} => {
targets.accumulators.insert(accumulator_name.clone());
}
Mutation::Insert {
table_name,
primary_key,
..
}
| Mutation::Upsert {
table_name,
primary_key,
..
}
| Mutation::Delete {
table_name,
primary_key,
..
}
| Mutation::TableIncU256 {
table_name,
primary_key,
..
}
| Mutation::TableDecU256 {
table_name,
primary_key,
..
} => {
let ns = match &targets.namespace_id {
Some(NamespaceId::Project(ns)) => ns.clone(),
_ => return None,
};
if table_supports_row_parallelism(catalog, &ns, table_name) {
targets
.table_rows
.entry(table_name.clone())
.or_default()
.insert(EncodedKey::from_values(primary_key));
} else {
targets.tables.insert(table_name.clone());
}
}
Mutation::InsertBatch {
table_name, rows, ..
}
| Mutation::UpsertBatch {
table_name, rows, ..
} => {
let ns = match &targets.namespace_id {
Some(NamespaceId::Project(ns)) => ns.clone(),
_ => return None,
};
if table_supports_row_parallelism(catalog, &ns, table_name) {
let primary_keys =
extract_row_primary_keys_for_partitions(catalog, &ns, table_name, rows)?;
let row_set = targets.table_rows.entry(table_name.clone()).or_default();
for primary_key in primary_keys {
row_set.insert(EncodedKey::from_values(&primary_key));
}
} else {
targets.tables.insert(table_name.clone());
}
}
Mutation::DeleteWhere { table_name, .. }
| Mutation::UpdateWhere { table_name, .. }
| Mutation::UpdateWhereExpr { table_name, .. } => {
targets.tables.insert(table_name.clone());
}
_ => return None,
}
}
Some(targets)
}
fn merge_parallel_namespace_result(
keyspace: &mut Keyspace,
ns_id: &NamespaceId,
namespace: &Namespace,
targets: &ParallelMergeTargets,
) {
use crate::storage::keyspace::{
accumulator_data_mem_cost, compact_kv_key, kv_entry_cost, kv_tombstone_cost, row_mem_cost,
small_kv_entry_cost, table_data_mem_cost,
};
let mut added: usize = 0;
let mut removed: usize = 0;
let dest = keyspace.namespace_mut(ns_id.clone());
let mut table_names: Vec<_> = targets.tables.iter().cloned().collect();
table_names.sort();
for table_name in table_names {
let prev_cost = dest
.tables
.get(&table_name)
.map(table_data_mem_cost)
.unwrap_or(0);
match namespace.tables.get(&table_name) {
Some(table) => {
added = added.saturating_add(table_data_mem_cost(table));
dest.tables.insert(table_name, table.clone());
}
None => {
dest.tables.remove(&table_name);
}
}
removed = removed.saturating_add(prev_cost);
}
let mut row_tables: Vec<_> = targets.table_rows.keys().cloned().collect();
row_tables.sort();
for table_name in row_tables {
let Some(row_keys) = targets.table_rows.get(&table_name) else {
continue;
};
let dest_table = dest.tables.entry(table_name.clone()).or_default();
let source_table = namespace.tables.get(&table_name);
for row_key in row_keys {
let prev_row_cost = dest_table.rows.get(row_key).map(row_mem_cost).unwrap_or(0);
match source_table.and_then(|table| table.rows.get(row_key)) {
Some(row) => {
added = added.saturating_add(row_mem_cost(row));
dest_table.rows.insert(row_key.clone(), row.clone());
dest_table.row_cache.insert(row_key.clone(), row.clone());
dest_table.pk_hash.insert(row_key.clone(), ());
}
None => {
dest_table.rows.remove(row_key);
dest_table.row_cache.remove(row_key);
dest_table.pk_hash.remove(row_key);
}
}
removed = removed.saturating_add(prev_row_cost);
match source_table.and_then(|table| table.row_versions.get(row_key)) {
Some(version) => {
dest_table.row_versions.insert(row_key.clone(), *version);
dest_table
.row_versions_cache
.insert(row_key.clone(), *version);
}
None => {
dest_table.row_versions.remove(row_key);
dest_table.row_versions_cache.remove(row_key);
}
}
}
if let Some(source_table) = source_table {
dest_table.structural_version = dest_table
.structural_version
.max(source_table.structural_version);
}
}
let mut kv_keys: Vec<_> = targets.kv_keys.iter().cloned().collect();
kv_keys.sort();
for key in kv_keys {
let prev_kv_cost = dest
.kv
.small_entries
.get(&compact_kv_key(&key))
.map(|e| small_kv_entry_cost(key.len(), e.resident_value_len()))
.or_else(|| {
dest.kv
.entries
.get(&key)
.map(|e| kv_entry_cost(key.len(), e.resident_memory_value_len()))
})
.or_else(|| {
dest.kv
.segment_tombstones
.get(&key)
.map(|_| kv_tombstone_cost(key.len()))
})
.unwrap_or(0);
match namespace.kv.small_entries.get(&compact_kv_key(&key)) {
Some(entry) => {
added = added
.saturating_add(small_kv_entry_cost(key.len(), entry.resident_value_len()));
dest.kv.entries.remove(&key);
dest.kv.segment_tombstones.remove(&key);
dest.kv
.small_entries
.insert(compact_kv_key(&key), entry.clone());
}
None => match namespace.kv.entries.get(&key) {
Some(entry) => {
added = added.saturating_add(kv_entry_cost(
key.len(),
entry.resident_memory_value_len(),
));
dest.kv.small_entries.remove(&compact_kv_key(&key));
dest.kv.segment_tombstones.remove(&key);
dest.kv.entries.insert(key, entry.clone());
}
None => {
if let Some(version) = namespace.kv.segment_tombstones.get(&key) {
added = added.saturating_add(kv_tombstone_cost(key.len()));
dest.kv.entries.remove(&key);
dest.kv.small_entries.remove(&compact_kv_key(&key));
dest.kv.segment_tombstones.insert(key, *version);
} else {
dest.kv.entries.remove(&key);
dest.kv.small_entries.remove(&compact_kv_key(&key));
dest.kv.segment_tombstones.remove(&key);
}
}
},
}
removed = removed.saturating_add(prev_kv_cost);
}
if !targets.kv_keys.is_empty() {
dest.kv.structural_version = dest
.kv
.structural_version
.max(namespace.kv.structural_version);
}
let mut accumulator_names: Vec<_> = targets.accumulators.iter().cloned().collect();
accumulator_names.sort();
for accumulator_name in accumulator_names {
let prev_acc_cost = dest
.accumulators
.get(&accumulator_name)
.map(accumulator_data_mem_cost)
.unwrap_or(0);
match namespace.accumulators.get(&accumulator_name) {
Some(accumulator) => {
added = added.saturating_add(accumulator_data_mem_cost(accumulator));
dest.accumulators
.insert(accumulator_name, accumulator.clone());
}
None => {
dest.accumulators.remove(&accumulator_name);
}
}
removed = removed.saturating_add(prev_acc_cost);
}
keyspace.mem_bytes = keyspace
.mem_bytes
.saturating_add(added)
.saturating_sub(removed);
}
pub(super) fn namespace_id_for_parallel_mutation(mutation: &Mutation) -> Option<NamespaceId> {
match mutation {
Mutation::KvSet {
project_id,
scope_id,
..
}
| Mutation::KvDel {
project_id,
scope_id,
..
}
| Mutation::KvIncU256 {
project_id,
scope_id,
..
}
| Mutation::KvDecU256 {
project_id,
scope_id,
..
}
| Mutation::KvAddU256Ex {
project_id,
scope_id,
..
}
| Mutation::KvSubU256Ex {
project_id,
scope_id,
..
}
| Mutation::KvMaxU256 {
project_id,
scope_id,
..
}
| Mutation::KvMinU256 {
project_id,
scope_id,
..
}
| Mutation::KvMutateU256 {
project_id,
scope_id,
..
}
| Mutation::KvAddU64Ex {
project_id,
scope_id,
..
}
| Mutation::KvSubU64Ex {
project_id,
scope_id,
..
}
| Mutation::KvSubIntEx {
project_id,
scope_id,
..
}
| Mutation::CounterAdd {
project_id,
scope_id,
..
}
| Mutation::KvMaxU64 {
project_id,
scope_id,
..
}
| Mutation::KvMinU64 {
project_id,
scope_id,
..
}
| Mutation::KvMutateU64 {
project_id,
scope_id,
..
}
| Mutation::Accumulate {
project_id,
scope_id,
..
}
| Mutation::ExposeAccumulator {
project_id,
scope_id,
..
}
| Mutation::ExposeAccumulatorBatch {
project_id,
scope_id,
..
}
| Mutation::Insert {
project_id,
scope_id,
..
}
| Mutation::InsertBatch {
project_id,
scope_id,
..
}
| Mutation::Upsert {
project_id,
scope_id,
..
}
| Mutation::UpsertBatch {
project_id,
scope_id,
..
}
| Mutation::Delete {
project_id,
scope_id,
..
}
| Mutation::TableIncU256 {
project_id,
scope_id,
..
}
| Mutation::TableDecU256 {
project_id,
scope_id,
..
}
| Mutation::OrderBookNew {
project_id,
scope_id,
..
}
| Mutation::OrderBookCancel {
project_id,
scope_id,
..
}
| Mutation::OrderBookCancelReplace {
project_id,
scope_id,
..
}
| Mutation::OrderBookMassCancel {
project_id,
scope_id,
..
}
| Mutation::OrderBookReduce {
project_id,
scope_id,
..
}
| Mutation::OrderBookMatch {
project_id,
scope_id,
..
}
| Mutation::OrderBookDefineTable {
project_id,
scope_id,
..
}
| Mutation::OrderBookDropTable {
project_id,
scope_id,
..
} => Some(NamespaceId::project_scope(project_id, scope_id)),
_ => None,
}
}
pub(super) fn is_parallel_mutation_safe(catalog: &Catalog, mutation: &Mutation) -> bool {
match mutation {
Mutation::KvSet { .. }
| Mutation::KvDel { .. }
| Mutation::KvIncU256 { .. }
| Mutation::KvDecU256 { .. }
| Mutation::KvAddU256Ex { .. }
| Mutation::KvSubU256Ex { .. }
| Mutation::KvMaxU256 { .. }
| Mutation::KvMinU256 { .. }
| Mutation::KvMutateU256 { .. }
| Mutation::KvAddU64Ex { .. }
| Mutation::KvSubU64Ex { .. }
| Mutation::KvSubIntEx { .. }
| Mutation::CounterAdd { .. }
| Mutation::KvMaxU64 { .. }
| Mutation::KvMinU64 { .. }
| Mutation::KvMutateU64 { .. }
| Mutation::Accumulate { .. }
| Mutation::ExposeAccumulator { .. }
| Mutation::ExposeAccumulatorBatch { .. } => true,
Mutation::Insert {
project_id,
scope_id,
table_name,
..
}
| Mutation::InsertBatch {
project_id,
scope_id,
table_name,
..
}
| Mutation::Upsert {
project_id,
scope_id,
table_name,
..
}
| Mutation::UpsertBatch {
project_id,
scope_id,
table_name,
..
}
| Mutation::Delete {
project_id,
scope_id,
table_name,
..
}
| Mutation::DeleteWhere {
project_id,
scope_id,
table_name,
..
}
| Mutation::UpdateWhere {
project_id,
scope_id,
table_name,
..
}
| Mutation::UpdateWhereExpr {
project_id,
scope_id,
table_name,
..
}
| Mutation::TableIncU256 {
project_id,
scope_id,
table_name,
..
}
| Mutation::TableDecU256 {
project_id,
scope_id,
table_name,
..
} => is_parallel_table_safe(catalog, project_id, scope_id, table_name),
_ => false,
}
}
pub(super) fn is_parallel_table_safe(
catalog: &Catalog,
project_id: &str,
scope_id: &str,
table_name: &str,
) -> bool {
let ns = namespace_key(project_id, scope_id);
is_parallel_table_safe_by_namespace(catalog, &ns, table_name)
}
pub(super) fn mutation_requires_coordinator(catalog: &Catalog, mutation: &Mutation) -> bool {
match mutation {
Mutation::Ddl(_) => true,
Mutation::UpsertOnConflict { .. } | Mutation::UpsertBatchOnConflict { .. } => true,
Mutation::Insert {
project_id,
scope_id,
table_name,
..
}
| Mutation::InsertBatch {
project_id,
scope_id,
table_name,
..
}
| Mutation::Upsert {
project_id,
scope_id,
table_name,
..
}
| Mutation::UpsertBatch {
project_id,
scope_id,
table_name,
..
}
| Mutation::Delete {
project_id,
scope_id,
table_name,
..
}
| Mutation::TableIncU256 {
project_id,
scope_id,
table_name,
..
}
| Mutation::TableDecU256 {
project_id,
scope_id,
table_name,
..
} => table_has_global_unique_index(catalog, project_id, scope_id, table_name),
_ => false,
}
}
pub(super) fn table_has_global_unique_index(
catalog: &Catalog,
project_id: &str,
scope_id: &str,
table_name: &str,
) -> bool {
if project_id != "_global" {
return false;
}
let ns = namespace_key(project_id, scope_id);
catalog.indexes.iter().any(|((idx_ns, idx_table, _), def)| {
(idx_ns == &ns || idx_ns.starts_with("_global::"))
&& idx_table == table_name
&& matches!(
def.index_type,
crate::catalog::schema::IndexType::UniqueHash
)
})
}
pub(super) fn enforce_global_unique_scope_invariants(
catalog: &Catalog,
keyspace: &Keyspace,
mutation: &Mutation,
) -> Result<(), AedbError> {
match mutation {
Mutation::Insert {
project_id,
scope_id,
table_name,
primary_key,
row,
} => enforce_global_unique_for_row(
catalog,
keyspace,
project_id,
scope_id,
table_name,
primary_key,
row,
),
Mutation::Upsert {
project_id,
scope_id,
table_name,
primary_key,
row,
} => enforce_global_unique_for_row(
catalog,
keyspace,
project_id,
scope_id,
table_name,
primary_key,
row,
),
Mutation::UpsertOnConflict {
project_id,
scope_id,
table_name,
row,
..
} => {
let schema = table_schema_for(catalog, project_id, scope_id, table_name)?;
let pk = extract_pk_from_row(&schema, row)?;
enforce_global_unique_for_row(
catalog, keyspace, project_id, scope_id, table_name, &pk, row,
)
}
Mutation::UpsertBatch {
project_id,
scope_id,
table_name,
rows,
} => {
let schema = table_schema_for(catalog, project_id, scope_id, table_name)?;
for row in rows {
let pk = extract_pk_from_row(&schema, row)?;
enforce_global_unique_for_row(
catalog, keyspace, project_id, scope_id, table_name, &pk, row,
)?;
}
Ok(())
}
Mutation::InsertBatch {
project_id,
scope_id,
table_name,
rows,
} => {
let schema = table_schema_for(catalog, project_id, scope_id, table_name)?;
for row in rows {
let pk = extract_pk_from_row(&schema, row)?;
enforce_global_unique_for_row(
catalog, keyspace, project_id, scope_id, table_name, &pk, row,
)?;
}
Ok(())
}
Mutation::UpsertBatchOnConflict {
project_id,
scope_id,
table_name,
rows,
..
} => {
let schema = table_schema_for(catalog, project_id, scope_id, table_name)?;
for row in rows {
let pk = extract_pk_from_row(&schema, row)?;
enforce_global_unique_for_row(
catalog, keyspace, project_id, scope_id, table_name, &pk, row,
)?;
}
Ok(())
}
Mutation::TableIncU256 {
project_id,
scope_id,
table_name,
primary_key,
column,
amount_be,
}
| Mutation::TableDecU256 {
project_id,
scope_id,
table_name,
primary_key,
column,
amount_be,
} => {
let schema = table_schema_for(catalog, project_id, scope_id, table_name)?;
let row_key = EncodedKey::from_values(primary_key);
let existing = keyspace
.table_by_namespace_key(&namespace_key(project_id, scope_id), table_name)
.and_then(|t| t.rows.get(&row_key))
.ok_or_else(|| AedbError::Validation("row not found".into()))?;
let Some(col_idx) = schema.columns.iter().position(|c| c.name == *column) else {
return Err(AedbError::Validation(format!("column not found: {column}")));
};
let current = match existing.values.get(col_idx) {
Some(Value::U256(bytes)) => U256::from_big_endian(bytes.as_slice()),
_ => {
return Err(AedbError::Validation(format!(
"column {column} must be U256"
)));
}
};
let amount = U256::from_big_endian(amount_be);
let next = if matches!(mutation, Mutation::TableIncU256 { .. }) {
current.saturating_add(amount)
} else if current < amount {
return Err(AedbError::Underflow);
} else {
current - amount
};
let mut next_be = [0u8; 32];
next.to_big_endian(&mut next_be);
let mut next_row = existing.clone();
next_row.values[col_idx] = Value::U256(next_be);
enforce_global_unique_for_row(
catalog,
keyspace,
project_id,
scope_id,
table_name,
primary_key,
&next_row,
)
}
Mutation::DeleteWhere {
project_id,
scope_id,
table_name,
..
}
| Mutation::UpdateWhere {
project_id,
scope_id,
table_name,
..
}
| Mutation::UpdateWhereExpr {
project_id,
scope_id,
table_name,
..
} => {
if table_has_global_unique_index(catalog, project_id, scope_id, table_name) {
return Err(AedbError::Validation(
"predicate mutations are not supported on tables with global unique indexes"
.into(),
));
}
Ok(())
}
_ => Ok(()),
}
}
pub(super) fn enforce_global_unique_for_row(
catalog: &Catalog,
keyspace: &Keyspace,
project_id: &str,
scope_id: &str,
table_name: &str,
incoming_pk: &[Value],
incoming_row: &Row,
) -> Result<(), AedbError> {
if project_id != "_global" {
return Ok(());
}
let defs = global_unique_defs_for_project_table(catalog, table_name);
if defs.is_empty() {
return Ok(());
}
let current_ns = namespace_key(project_id, scope_id);
let incoming_pk_encoded = EncodedKey::from_values(incoming_pk);
let current_schema = table_schema_for(catalog, project_id, scope_id, table_name)?;
for def in defs {
let incoming_index_key =
extract_index_key_encoded(incoming_row, ¤t_schema, &def.columns)?;
for (ns_id, ns_data) in keyspace.namespaces.iter() {
let NamespaceId::Project(ns_key) = ns_id else {
continue;
};
if !ns_key.starts_with("_global::") {
continue;
}
let Some(table) = ns_data.tables.get(table_name) else {
continue;
};
let Some(schema) = catalog
.tables
.get(&(ns_key.clone(), table_name.to_string()))
else {
continue;
};
if def
.columns
.iter()
.any(|col| !schema.columns.iter().any(|c| c.name == *col))
{
continue;
}
for (pk, row) in &table.rows {
if ns_key.as_str() == current_ns && pk == &incoming_pk_encoded {
continue;
}
let existing_key = extract_index_key_encoded(row, schema, &def.columns)?;
if existing_key == incoming_index_key {
return Err(AedbError::Validation(format!(
"global unique constraint violation on {} ({})",
table_name, def.index_name
)));
}
}
}
}
Ok(())
}
pub(super) fn table_schema_for(
catalog: &Catalog,
project_id: &str,
scope_id: &str,
table_name: &str,
) -> Result<TableSchema, AedbError> {
let ns = namespace_key(project_id, scope_id);
catalog
.tables
.get(&(ns, table_name.to_string()))
.cloned()
.ok_or_else(|| AedbError::Validation("table missing".into()))
}
pub(super) fn extract_pk_from_row(
schema: &TableSchema,
row: &Row,
) -> Result<Vec<Value>, AedbError> {
let mut pk = Vec::with_capacity(schema.primary_key.len());
for col in &schema.primary_key {
let column_index = schema
.columns
.iter()
.position(|c| c.name == *col)
.ok_or_else(|| AedbError::Validation(format!("primary key column missing: {col}")))?;
pk.push(row.values[column_index].clone());
}
Ok(pk)
}
#[derive(Clone)]
pub(super) struct GlobalUniqueDef {
index_name: String,
columns: Vec<String>,
}
pub(super) fn global_unique_defs_for_project_table(
catalog: &Catalog,
table_name: &str,
) -> Vec<GlobalUniqueDef> {
let mut out = Vec::new();
let mut seen = HashSet::new();
for ((ns, t, idx_name), def) in &catalog.indexes {
if !ns.starts_with("_global::") || t != table_name {
continue;
}
if !matches!(
def.index_type,
crate::catalog::schema::IndexType::UniqueHash
) {
continue;
}
let dedupe = format!("{idx_name}:{}", def.columns.join(","));
if seen.insert(dedupe) {
out.push(GlobalUniqueDef {
index_name: idx_name.clone(),
columns: def.columns.clone(),
});
}
}
out
}
pub(super) fn canonical_partition_order(write_partitions: &HashSet<String>) -> Vec<String> {
let mut ordered: Vec<String> = write_partitions.iter().cloned().collect();
ordered.sort();
ordered
}
pub(super) struct CoordinatorApplyOptions<'a> {
pub coordinator_locking_enabled: bool,
pub lock_manager: &'a Arc<CoordinatorLockManager>,
pub global_unique_index_enabled: bool,
pub partition_lock_timeout_ms: u64,
pub max_scan_rows: usize,
}
#[allow(clippy::too_many_arguments)]
pub(super) fn apply_via_coordinator(
catalog: &mut Catalog,
keyspace: &mut Keyspace,
global_unique_index: &mut GlobalUniqueIndexState,
mutations: &[Mutation],
commit_seq: u64,
ordered_partitions: &[String],
options: CoordinatorApplyOptions<'_>,
caller: Option<&CallerContext>,
) -> Result<(), AedbError> {
let started = Instant::now();
let _lock_guard = if options.coordinator_locking_enabled {
Some(options.lock_manager.acquire_all(
ordered_partitions,
Duration::from_millis(options.partition_lock_timeout_ms),
)?)
} else {
None
};
if let Some(result) = apply_kv_set_batch_same_namespace(keyspace, mutations, commit_seq) {
return result;
}
for mutation in mutations {
if started.elapsed() > Duration::from_millis(options.partition_lock_timeout_ms) {
return Err(AedbError::PartitionLockTimeout);
}
coordinator_test_delay();
if started.elapsed() > Duration::from_millis(options.partition_lock_timeout_ms) {
return Err(AedbError::PartitionLockTimeout);
}
if options.global_unique_index_enabled {
global_unique_index.enforce_and_apply(catalog, keyspace, mutation)?;
} else {
enforce_global_unique_scope_invariants(catalog, keyspace, mutation)?;
}
apply_mutation(
catalog,
keyspace,
mutation.clone(),
commit_seq,
Some(options.max_scan_rows),
caller,
)?;
if matches!(mutation, Mutation::Ddl(_)) {
*global_unique_index = GlobalUniqueIndexState::from_snapshot(catalog, keyspace)?;
}
}
Ok(())
}
pub(super) fn apply_via_keyspace_only_coordinator(
keyspace: &mut Keyspace,
mutations: &[Mutation],
commit_seq: u64,
ordered_partitions: &[String],
options: CoordinatorApplyOptions<'_>,
) -> Result<(), AedbError> {
let started = Instant::now();
let _lock_guard = if options.coordinator_locking_enabled {
Some(options.lock_manager.acquire_all(
ordered_partitions,
Duration::from_millis(options.partition_lock_timeout_ms),
)?)
} else {
None
};
for mutation in mutations {
if started.elapsed() > Duration::from_millis(options.partition_lock_timeout_ms) {
return Err(AedbError::PartitionLockTimeout);
}
coordinator_test_delay();
if started.elapsed() > Duration::from_millis(options.partition_lock_timeout_ms) {
return Err(AedbError::PartitionLockTimeout);
}
let Some(result) = apply_keyspace_only_mutation(keyspace, mutation, commit_seq) else {
return Err(AedbError::Validation(
"keyspace-only coordinator path received unsupported mutation".into(),
));
};
result?;
}
Ok(())
}
#[inline]
pub(super) fn coordinator_test_delay() {
#[cfg(test)]
{
let delay = COORDINATOR_TEST_DELAY_MS.load(Ordering::Relaxed);
if delay > 0 {
std::thread::sleep(Duration::from_millis(delay));
}
}
}
#[cfg(test)]
#[inline]
pub(super) fn parallel_worker_test_hook_for_mutation(mutation: &Mutation) {
const PANIC_KEY: &[u8] = b"__panic_parallel_worker__";
const SLOW_KEY: &[u8] = b"__slow_parallel_worker__";
match mutation {
Mutation::KvSet { key, .. } | Mutation::KvDel { key, .. } => {
if key.as_slice() == PANIC_KEY {
panic!("parallel apply worker injected panic");
}
if key.as_slice() == SLOW_KEY {
std::thread::sleep(Duration::from_millis(25));
}
}
_ => {}
}
}
#[cfg(not(test))]
#[inline]
pub(super) fn parallel_worker_test_hook_for_mutation(_mutation: &Mutation) {}
#[derive(Serialize)]
struct WalCommitPayloadRef<'a> {
mutations: &'a [Mutation],
assertions: &'a [ReadAssertion],
idempotency_key: Option<&'a IdempotencyKey>,
request_fingerprint: Option<&'a [u8; 32]>,
}
pub(super) fn encode_wal_payload_from_parts(
mutations: &[Mutation],
assertions: &[ReadAssertion],
idempotency_key: Option<&IdempotencyKey>,
request_fingerprint: Option<&[u8; 32]>,
capacity_hint: Option<usize>,
) -> Result<Vec<u8>, AedbError> {
let payload = WalCommitPayloadRef {
mutations,
assertions,
idempotency_key,
request_fingerprint,
};
let mut out = Vec::with_capacity(capacity_hint.unwrap_or(0));
rmp_serde::encode::write(&mut out, &payload).map_err(|e| AedbError::Encode(e.to_string()))?;
Ok(out)
}
fn apply_kv_set_batch_same_namespace(
keyspace: &mut Keyspace,
mutations: &[Mutation],
commit_seq: u64,
) -> Option<Result<(), AedbError>> {
let [first, ..] = mutations else {
return None;
};
let Mutation::KvSet {
project_id,
scope_id,
..
} = first
else {
return None;
};
if mutations.iter().all(|mutation| {
matches!(
mutation,
Mutation::KvSet {
project_id: candidate_project,
scope_id: candidate_scope,
..
} if candidate_project == project_id && candidate_scope == scope_id
)
}) {
Some(keyspace.kv_set_many_same_namespace(
project_id,
scope_id,
mutations.iter().filter_map(|mutation| match mutation {
Mutation::KvSet { key, value, .. } => Some((key, value)),
_ => None,
}),
commit_seq,
))
} else {
None
}
}
fn kv_set_mutations_same_namespace(mutations: &[Mutation]) -> bool {
if let [Mutation::KvSet { key, .. }] = mutations {
return !parallel_worker_test_hook_key(key);
}
let [first, ..] = mutations else {
return false;
};
let Mutation::KvSet {
project_id,
scope_id,
..
} = first
else {
return false;
};
mutations.iter().all(|mutation| match mutation {
Mutation::KvSet {
project_id: candidate_project,
scope_id: candidate_scope,
key,
..
} if candidate_project == project_id && candidate_scope == scope_id => {
!parallel_worker_test_hook_key(key)
}
_ => false,
})
}
#[cfg(test)]
fn parallel_worker_test_hook_key(key: &[u8]) -> bool {
key == b"__panic_parallel_worker__" || key == b"__slow_parallel_worker__"
}
#[cfg(not(test))]
fn parallel_worker_test_hook_key(_key: &[u8]) -> bool {
false
}
fn apply_inline_kv_set_same_namespace(
keyspace: &mut Keyspace,
mutations: &[Mutation],
commit_seq: u64,
) -> bool {
if let [
Mutation::KvSet {
project_id,
scope_id,
key,
value,
},
] = mutations
{
if value.len() <= keyspace.persistent_value_inline_threshold_bytes {
keyspace.kv_set_inline(project_id, scope_id, key.clone(), value.clone(), commit_seq);
return true;
}
return false;
}
let [first, ..] = mutations else {
return false;
};
let Mutation::KvSet {
project_id,
scope_id,
value,
..
} = first
else {
return false;
};
let inline_threshold_bytes = keyspace.persistent_value_inline_threshold_bytes;
if value.len() > inline_threshold_bytes {
return false;
}
if !mutations.iter().all(|mutation| {
matches!(
mutation,
Mutation::KvSet {
project_id: candidate_project,
scope_id: candidate_scope,
value,
..
} if candidate_project == project_id
&& candidate_scope == scope_id
&& value.len() <= inline_threshold_bytes
)
}) {
return false;
}
keyspace.kv_set_many_inline_same_namespace(
project_id,
scope_id,
mutations.iter().filter_map(|mutation| match mutation {
Mutation::KvSet { key, value, .. } => Some((key, value)),
_ => None,
}),
commit_seq,
);
true
}
pub(super) fn payload_type_for_mutations(mutations: &[Mutation]) -> u8 {
let mut all_ddl = true;
let mut all_kv = true;
for mutation in mutations {
if !matches!(mutation, Mutation::Ddl(_)) {
all_ddl = false;
}
if !matches!(
mutation,
Mutation::KvSet { .. }
| Mutation::KvDel { .. }
| Mutation::KvIncU256 { .. }
| Mutation::KvDecU256 { .. }
| Mutation::KvAddU256Ex { .. }
| Mutation::KvSubU256Ex { .. }
| Mutation::KvMaxU256 { .. }
| Mutation::KvMinU256 { .. }
| Mutation::KvMutateU256 { .. }
| Mutation::KvAddU64Ex { .. }
| Mutation::KvSubU64Ex { .. }
| Mutation::KvSubIntEx { .. }
| Mutation::CounterAdd { .. }
| Mutation::KvMaxU64 { .. }
| Mutation::KvMinU64 { .. }
| Mutation::KvMutateU64 { .. }
| Mutation::Accumulate { .. }
| Mutation::ExposeAccumulator { .. }
| Mutation::ExposeAccumulatorBatch { .. }
| Mutation::ReleaseAccumulatorExposure { .. }
| Mutation::EmitEvent { .. }
| Mutation::OrderBookNew { .. }
| Mutation::OrderBookCancel { .. }
| Mutation::OrderBookCancelReplace { .. }
| Mutation::OrderBookMassCancel { .. }
| Mutation::OrderBookReduce { .. }
| Mutation::OrderBookMatch { .. }
| Mutation::OrderBookDefineTable { .. }
| Mutation::OrderBookDropTable { .. }
) {
all_kv = false;
}
if !all_ddl && !all_kv {
return 0x01;
}
}
if all_ddl {
return 0x02;
}
if all_kv { 0x04 } else { 0x01 }
}
pub(super) fn derive_write_partitions_with_fk_expansion(
catalog: &Catalog,
mutations: &[Mutation],
) -> HashSet<String> {
let mut out = HashSet::new();
let mut touched_tables = HashSet::new();
for mutation in mutations {
match mutation {
Mutation::Insert {
project_id,
scope_id,
table_name,
primary_key,
..
}
| Mutation::Upsert {
project_id,
scope_id,
table_name,
primary_key,
..
}
| Mutation::Delete {
project_id,
scope_id,
table_name,
primary_key,
}
| Mutation::TableIncU256 {
project_id,
scope_id,
table_name,
primary_key,
..
}
| Mutation::TableDecU256 {
project_id,
scope_id,
table_name,
primary_key,
..
} => {
let ns = namespace_key(project_id, scope_id);
if table_supports_row_parallelism(catalog, &ns, table_name) {
out.insert(table_row_partition_token(&ns, table_name, primary_key));
} else {
out.insert(table_partition_token(&ns, table_name));
}
touched_tables.insert((ns, table_name.clone()));
}
Mutation::InsertBatch {
project_id,
scope_id,
table_name,
rows,
}
| Mutation::UpsertBatch {
project_id,
scope_id,
table_name,
rows,
} => {
let ns = namespace_key(project_id, scope_id);
if table_supports_row_parallelism(catalog, &ns, table_name) {
if let Some(primary_keys) =
extract_row_primary_keys_for_partitions(catalog, &ns, table_name, rows)
{
for primary_key in primary_keys {
out.insert(table_row_partition_token(&ns, table_name, &primary_key));
}
} else {
out.insert(table_partition_token(&ns, table_name));
}
} else {
out.insert(table_partition_token(&ns, table_name));
}
touched_tables.insert((ns, table_name.clone()));
}
Mutation::UpsertOnConflict {
project_id,
scope_id,
table_name,
..
}
| Mutation::UpsertBatchOnConflict {
project_id,
scope_id,
table_name,
..
}
| Mutation::DeleteWhere {
project_id,
scope_id,
table_name,
..
}
| Mutation::UpdateWhere {
project_id,
scope_id,
table_name,
..
}
| Mutation::UpdateWhereExpr {
project_id,
scope_id,
table_name,
..
} => {
let ns = namespace_key(project_id, scope_id);
out.insert(table_partition_token(&ns, table_name));
touched_tables.insert((ns, table_name.clone()));
}
Mutation::KvSet {
project_id,
scope_id,
key,
..
}
| Mutation::KvDel {
project_id,
scope_id,
key,
..
}
| Mutation::KvIncU256 {
project_id,
scope_id,
key,
..
}
| Mutation::KvDecU256 {
project_id,
scope_id,
key,
..
}
| Mutation::KvAddU256Ex {
project_id,
scope_id,
key,
..
}
| Mutation::KvSubU256Ex {
project_id,
scope_id,
key,
..
}
| Mutation::KvMaxU256 {
project_id,
scope_id,
key,
..
}
| Mutation::KvMinU256 {
project_id,
scope_id,
key,
..
}
| Mutation::KvMutateU256 {
project_id,
scope_id,
key,
..
}
| Mutation::KvAddU64Ex {
project_id,
scope_id,
key,
..
}
| Mutation::KvSubU64Ex {
project_id,
scope_id,
key,
..
}
| Mutation::KvSubIntEx {
project_id,
scope_id,
key,
..
}
| Mutation::KvMaxU64 {
project_id,
scope_id,
key,
..
}
| Mutation::KvMinU64 {
project_id,
scope_id,
key,
..
}
| Mutation::KvMutateU64 {
project_id,
scope_id,
key,
..
} => {
let ns = namespace_key(project_id, scope_id);
out.insert(kv_key_partition_token(&ns, key));
}
Mutation::CounterAdd {
project_id,
scope_id,
key,
shard_count,
shard_hint,
..
} => {
let ns = namespace_key(project_id, scope_id);
let shard =
crate::commit::validation::counter_shard_index(*shard_hint, *shard_count);
let shard_key = crate::commit::validation::counter_shard_storage_key(key, shard);
out.insert(kv_key_partition_token(&ns, &shard_key));
}
Mutation::Accumulate {
project_id,
scope_id,
accumulator_name,
..
}
| Mutation::ExposeAccumulator {
project_id,
scope_id,
accumulator_name,
..
}
| Mutation::ExposeAccumulatorBatch {
project_id,
scope_id,
accumulator_name,
..
}
| Mutation::ReleaseAccumulatorExposure {
project_id,
scope_id,
accumulator_name,
..
} => {
let ns = namespace_key(project_id, scope_id);
out.insert(format!("acc:{ns}:{accumulator_name}"));
}
Mutation::EmitEvent {
project_id,
scope_id,
topic,
..
} => {
let ns = namespace_key(project_id, scope_id);
out.insert(format!("evt:{ns}:{topic}"));
}
Mutation::OrderBookNew {
project_id,
scope_id,
request,
} => {
let ns = namespace_key(project_id, scope_id);
out.insert(order_book_partition_token(&ns, &request.instrument));
}
Mutation::OrderBookCancel {
project_id,
scope_id,
instrument,
..
}
| Mutation::OrderBookCancelReplace {
project_id,
scope_id,
instrument,
..
}
| Mutation::OrderBookMassCancel {
project_id,
scope_id,
instrument,
..
}
| Mutation::OrderBookReduce {
project_id,
scope_id,
instrument,
..
}
| Mutation::OrderBookMatch {
project_id,
scope_id,
instrument,
..
} => {
let ns = namespace_key(project_id, scope_id);
out.insert(order_book_partition_token(&ns, instrument));
}
Mutation::OrderBookDefineTable {
project_id,
scope_id,
table_id,
..
}
| Mutation::OrderBookDropTable {
project_id,
scope_id,
table_id,
} => {
let ns = namespace_key(project_id, scope_id);
out.insert(order_book_meta_partition_token(&ns, "table", table_id));
}
Mutation::OrderBookSetInstrumentConfig {
project_id,
scope_id,
instrument,
..
}
| Mutation::OrderBookSetInstrumentHalted {
project_id,
scope_id,
instrument,
..
} => {
let ns = namespace_key(project_id, scope_id);
out.insert(order_book_meta_partition_token(&ns, "cfg", instrument));
}
Mutation::Ddl(_) => {
out.insert(GLOBAL_PARTITION_TOKEN.to_string());
}
}
}
for (target_ns, target_table) in touched_tables {
for ((dep_ns, dep_table), dep_schema) in &catalog.tables {
for fk in &dep_schema.foreign_keys {
if namespace_key(&fk.references_project_id, &fk.references_scope_id) == target_ns
&& fk.references_table == target_table
{
out.insert(table_partition_token(dep_ns, dep_table));
break;
}
}
}
}
out
}
pub(super) fn derive_write_partitions(mutations: &[Mutation]) -> HashSet<String> {
derive_write_partitions_with_fk_expansion(&Catalog::default(), mutations)
}
pub(super) fn mutation_requires_fk_expansion(mutation: &Mutation) -> bool {
matches!(
mutation,
Mutation::Insert { .. }
| Mutation::InsertBatch { .. }
| Mutation::Upsert { .. }
| Mutation::UpsertBatch { .. }
| Mutation::UpsertOnConflict { .. }
| Mutation::UpsertBatchOnConflict { .. }
| Mutation::Delete { .. }
| Mutation::DeleteWhere { .. }
| Mutation::UpdateWhere { .. }
| Mutation::UpdateWhereExpr { .. }
| Mutation::TableIncU256 { .. }
| Mutation::TableDecU256 { .. }
)
}
fn table_supports_row_parallelism(catalog: &Catalog, namespace: &str, table_name: &str) -> bool {
if !is_parallel_table_safe_by_namespace(catalog, namespace, table_name) {
return false;
}
!catalog
.indexes
.keys()
.any(|(idx_ns, idx_table, _)| idx_ns == namespace && idx_table == table_name)
}
fn is_parallel_table_safe_by_namespace(
catalog: &Catalog,
namespace: &str,
table_name: &str,
) -> bool {
let Some(schema) = catalog
.tables
.get(&(namespace.to_string(), table_name.to_string()))
else {
return false;
};
if !schema.foreign_keys.is_empty() {
return false;
}
for ((_dep_ns, _dep_table), dep_schema) in &catalog.tables {
for fk in &dep_schema.foreign_keys {
if namespace_key(&fk.references_project_id, &fk.references_scope_id) == namespace
&& fk.references_table == table_name
{
return false;
}
}
}
true
}
fn extract_row_primary_keys_for_partitions(
catalog: &Catalog,
namespace: &str,
table_name: &str,
rows: &[Row],
) -> Option<Vec<Vec<Value>>> {
let schema = catalog
.tables
.get(&(namespace.to_string(), table_name.to_string()))?;
let mut primary_keys = Vec::with_capacity(rows.len());
for row in rows {
primary_keys.push(extract_primary_key_for_partition(schema, row)?);
}
Some(primary_keys)
}
fn extract_primary_key_for_partition(schema: &TableSchema, row: &Row) -> Option<Vec<Value>> {
let mut primary_key = Vec::with_capacity(schema.primary_key.len());
for pk_name in &schema.primary_key {
let column_index = schema.columns.iter().position(|c| c.name == *pk_name)?;
primary_key.push(row.values.get(column_index)?.clone());
}
Some(primary_key)
}
pub(super) fn derive_read_partitions(envelope: &TransactionEnvelope) -> HashSet<String> {
let mut out = HashSet::new();
for entry in &envelope.read_set.points {
match &entry.key {
ReadKey::TableRow {
project_id,
scope_id,
table_name,
primary_key,
} => {
let ns = namespace_key(project_id, scope_id);
out.insert(table_row_partition_token(&ns, table_name, primary_key));
}
ReadKey::KvKey {
project_id,
scope_id,
key,
..
} => {
let ns = namespace_key(project_id, scope_id);
out.insert(kv_key_partition_token(&ns, key));
}
}
}
for range in &envelope.read_set.ranges {
match &range.range {
ReadRange::TableRange {
project_id,
scope_id,
table_name,
..
} => {
let ns = namespace_key(project_id, scope_id);
out.insert(table_partition_token(&ns, table_name));
}
ReadRange::KvRange {
project_id,
scope_id,
..
} => {
let ns = namespace_key(project_id, scope_id);
out.insert(kv_namespace_partition_token(&ns));
}
}
}
derive_tokens_for_assertions(&mut out, &envelope.assertions);
out
}
fn derive_tokens_for_assertions(out: &mut HashSet<String>, assertions: &[ReadAssertion]) {
for assertion in assertions {
derive_tokens_for_assertion(out, assertion);
}
}
fn derive_tokens_for_assertion(out: &mut HashSet<String>, assertion: &ReadAssertion) {
match assertion {
ReadAssertion::AccumulatorAvailableAtLeast {
project_id,
scope_id,
accumulator_name,
..
}
| ReadAssertion::AccumulatorExposureWithinMargin {
project_id,
scope_id,
accumulator_name,
..
} => {
let ns = namespace_key(project_id, scope_id);
out.insert(format!("acc:{ns}:{accumulator_name}"));
}
ReadAssertion::KeyEquals {
project_id,
scope_id,
key,
..
}
| ReadAssertion::KeyCompare {
project_id,
scope_id,
key,
..
}
| ReadAssertion::KeyExists {
project_id,
scope_id,
key,
..
}
| ReadAssertion::KeyVersion {
project_id,
scope_id,
key,
..
} => {
let ns = namespace_key(project_id, scope_id);
out.insert(kv_key_partition_token(&ns, key));
}
ReadAssertion::RowVersion {
project_id,
scope_id,
table_name,
primary_key,
..
}
| ReadAssertion::RowExists {
project_id,
scope_id,
table_name,
primary_key,
..
} => {
let ns = namespace_key(project_id, scope_id);
out.insert(table_row_partition_token(&ns, table_name, primary_key));
}
ReadAssertion::RowColumnCompare {
project_id,
scope_id,
table_name,
primary_key,
..
} => {
let ns = namespace_key(project_id, scope_id);
out.insert(table_row_partition_token(&ns, table_name, primary_key));
}
ReadAssertion::CountCompare {
project_id,
scope_id,
table_name,
..
}
| ReadAssertion::SumCompare {
project_id,
scope_id,
table_name,
..
} => {
let ns = namespace_key(project_id, scope_id);
out.insert(table_partition_token(&ns, table_name));
}
ReadAssertion::All(inner) | ReadAssertion::Any(inner) => {
derive_tokens_for_assertions(out, inner);
}
ReadAssertion::Not(inner) => derive_tokens_for_assertion(out, inner),
}
}
fn table_partition_token(namespace: &str, table_name: &str) -> String {
format!("t:{namespace}:{table_name}")
}
fn table_row_partition_token(namespace: &str, table_name: &str, primary_key: &[Value]) -> String {
let encoded = EncodedKey::from_values(primary_key);
let mut out = String::with_capacity(
namespace.len() + table_name.len() + encoded.as_slice().len() * 2 + 6,
);
out.push_str("tr:");
out.push_str(namespace);
out.push(':');
out.push_str(table_name);
out.push(':');
append_hex_lower(&mut out, encoded.as_slice());
out
}
fn kv_key_partition_token(namespace: &str, key: &[u8]) -> String {
let mut out = String::with_capacity(key.len().saturating_mul(2) + namespace.len() + 3);
out.push_str("k:");
out.push_str(namespace);
out.push(':');
append_hex_lower(&mut out, key);
out
}
fn append_hex_lower(out: &mut String, bytes: &[u8]) {
const HEX: &[u8; 16] = b"0123456789abcdef";
out.reserve(bytes.len().saturating_mul(2));
let out_bytes = unsafe { out.as_mut_vec() };
for b in bytes {
out_bytes.push(HEX[(b >> 4) as usize]);
out_bytes.push(HEX[(b & 0x0f) as usize]);
}
}
fn kv_namespace_partition_token(namespace: &str) -> String {
format!("kns:{namespace}")
}
fn order_book_partition_token(namespace: &str, instrument: &str) -> String {
format!("ob:{namespace}:{instrument}")
}
fn order_book_meta_partition_token(namespace: &str, kind: &str, id: &str) -> String {
format!("obm:{namespace}:{kind}:{id}")
}
fn is_cross_partition_write_set(write_set: &HashSet<String>) -> bool {
if write_set.len() <= 1 {
return false;
}
if write_set.contains(GLOBAL_PARTITION_TOKEN) {
return true;
}
let mut first_ns: Option<&str> = None;
for token in write_set {
let Some(ns) = token_namespace(token) else {
continue;
};
if let Some(existing) = first_ns {
if existing != ns {
return true;
}
} else {
first_ns = Some(ns);
}
}
false
}
pub(super) fn keyspace_mutations_cross_partition(mutations: &[Mutation]) -> bool {
if mutations.len() <= 1 {
return false;
}
let mut first: Option<(&str, &str)> = None;
for mutation in mutations {
let Some((project_id, scope_id)) = keyspace_mutation_project_scope(mutation) else {
return true;
};
if let Some((first_project, first_scope)) = first {
if first_project != project_id || first_scope != scope_id {
return true;
}
} else {
first = Some((project_id, scope_id));
}
}
false
}
fn keyspace_mutation_project_scope(mutation: &Mutation) -> Option<(&str, &str)> {
match mutation {
Mutation::KvSet {
project_id,
scope_id,
..
}
| Mutation::KvDel {
project_id,
scope_id,
..
}
| Mutation::KvIncU256 {
project_id,
scope_id,
..
}
| Mutation::KvDecU256 {
project_id,
scope_id,
..
}
| Mutation::KvAddU256Ex {
project_id,
scope_id,
..
}
| Mutation::KvSubU256Ex {
project_id,
scope_id,
..
}
| Mutation::KvMaxU256 {
project_id,
scope_id,
..
}
| Mutation::KvMinU256 {
project_id,
scope_id,
..
}
| Mutation::KvMutateU256 {
project_id,
scope_id,
..
}
| Mutation::KvAddU64Ex {
project_id,
scope_id,
..
}
| Mutation::KvSubU64Ex {
project_id,
scope_id,
..
}
| Mutation::KvSubIntEx {
project_id,
scope_id,
..
}
| Mutation::CounterAdd {
project_id,
scope_id,
..
}
| Mutation::KvMaxU64 {
project_id,
scope_id,
..
}
| Mutation::KvMinU64 {
project_id,
scope_id,
..
}
| Mutation::KvMutateU64 {
project_id,
scope_id,
..
}
| Mutation::Accumulate {
project_id,
scope_id,
..
}
| Mutation::ExposeAccumulator {
project_id,
scope_id,
..
}
| Mutation::ExposeAccumulatorBatch {
project_id,
scope_id,
..
}
| Mutation::ReleaseAccumulatorExposure {
project_id,
scope_id,
..
} => Some((project_id, scope_id)),
_ => None,
}
}
fn token_namespace(token: &str) -> Option<&str> {
if let Some(rest) = token.strip_prefix("kns:") {
return Some(rest);
}
if let Some(rest) = token.strip_prefix("tr:")
&& let Some((prefix, _pk)) = rest.rsplit_once(':')
&& let Some((ns, _table)) = prefix.rsplit_once(':')
{
return Some(ns);
}
if let Some(rest) = token.strip_prefix("t:")
&& let Some((ns, _table)) = rest.rsplit_once(':')
{
return Some(ns);
}
if let Some(rest) = token.strip_prefix("k:")
&& let Some((ns, _key)) = rest.rsplit_once(':')
{
return Some(ns);
}
if let Some(rest) = token.strip_prefix("ob:")
&& let Some((ns, _instrument)) = rest.rsplit_once(':')
{
return Some(ns);
}
if let Some(rest) = token.strip_prefix("obm:")
&& let Some((ns, _tail)) = rest.split_once(':')
{
return Some(ns);
}
None
}
pub(super) fn revalidate_read_set_for_keyspace(
keyspace: &Keyspace,
envelope: &TransactionEnvelope,
) -> Result<(), AedbError> {
for entry in &envelope.read_set.points {
let current_version = match &entry.key {
ReadKey::TableRow {
project_id,
scope_id,
table_name,
primary_key,
} => keyspace.get_row_version(project_id, scope_id, table_name, primary_key),
ReadKey::KvKey {
project_id,
scope_id,
key,
} => keyspace.kv_version(project_id, scope_id, key),
};
if current_version > envelope.base_seq || current_version > entry.version_at_read {
return Err(AedbError::Conflict(format!(
"read set conflict at seq {current_version}"
)));
}
}
for range in &envelope.read_set.ranges {
let (current_max_version, current_structural_version) = match &range.range {
ReadRange::TableRange {
project_id,
scope_id,
table_name,
start,
end,
} => (
keyspace.max_row_version_in_encoded_range(
project_id,
scope_id,
table_name,
value_bound_to_encoded(start),
value_bound_to_encoded(end),
),
keyspace.table_structural_version(project_id, scope_id, table_name),
),
ReadRange::KvRange {
project_id,
scope_id,
start,
end,
} => (
keyspace.max_kv_version_in_range(
project_id,
scope_id,
bytes_bound_to_vec(start),
bytes_bound_to_vec(end),
),
keyspace.kv_structural_version(project_id, scope_id),
),
};
if current_max_version > envelope.base_seq
|| current_max_version > range.max_version_at_read
{
return Err(AedbError::Conflict(format!(
"range read set conflict at seq {current_max_version}"
)));
}
if current_structural_version > envelope.base_seq
|| current_structural_version > range.structural_version_at_read
{
return Err(AedbError::Conflict(format!(
"range structural conflict at seq {current_structural_version}"
)));
}
}
Ok(())
}
pub(super) fn value_bound_to_encoded(
bound: &ReadBound<Vec<crate::catalog::types::Value>>,
) -> Bound<EncodedKey> {
match bound {
ReadBound::Unbounded => Bound::Unbounded,
ReadBound::Included(values) => Bound::Included(EncodedKey::from_values(values)),
ReadBound::Excluded(values) => Bound::Excluded(EncodedKey::from_values(values)),
}
}
pub(super) fn bytes_bound_to_vec(bound: &ReadBound<Vec<u8>>) -> Bound<Vec<u8>> {
match bound {
ReadBound::Unbounded => Bound::Unbounded,
ReadBound::Included(value) => Bound::Included(value.clone()),
ReadBound::Excluded(value) => Bound::Excluded(value.clone()),
}
}
fn augment_mutations_with_caller(mutations: &mut [Mutation], caller: Option<&CallerContext>) {
let Some(caller) = caller else {
return;
};
for mutation in mutations {
if let Mutation::Ddl(crate::catalog::DdlOperation::GrantPermission { actor_id, .. })
| Mutation::Ddl(crate::catalog::DdlOperation::RevokePermission { actor_id, .. })
| Mutation::Ddl(crate::catalog::DdlOperation::SetReadPolicy { actor_id, .. })
| Mutation::Ddl(crate::catalog::DdlOperation::ClearReadPolicy { actor_id, .. })
| Mutation::Ddl(crate::catalog::DdlOperation::TransferOwnership { actor_id, .. }) =
mutation
{
*actor_id = Some(caller.caller_id.clone());
}
}
}
pub(super) fn prune_idempotency(state: &mut ExecutorState) {
let window_commits = state.config.idempotency_window_commits;
let window_micros = state
.config
.idempotency_window_seconds
.saturating_mul(1_000_000);
if window_commits == 0 && window_micros == 0 {
state.idempotency.clear();
return;
}
let min_seq = state.current_seq.saturating_sub(window_commits);
let min_recorded_at = now_micros().saturating_sub(window_micros);
state.idempotency.retain(|_, rec| {
let within_commit_window = window_commits == 0 || rec.commit_seq > min_seq;
let within_time_window = window_micros == 0 || rec.recorded_at_micros >= min_recorded_at;
within_commit_window && within_time_window
});
}
pub(super) fn now_micros() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_micros() as u64
}
fn scoped_idempotency_key(caller: Option<&CallerContext>, key: &IdempotencyKey) -> IdempotencyKey {
let mut hasher = blake3::Hasher::new();
hasher.update(b"aedb:idempotency:v1");
match caller {
Some(caller) => {
hasher.update(&[1]);
hasher.update(caller.caller_id.as_bytes());
}
None => {
hasher.update(&[0]);
}
}
hasher.update(&key.0);
let hash = hasher.finalize();
let mut out = [0u8; 16];
out.copy_from_slice(&hash.as_bytes()[..16]);
IdempotencyKey(out)
}
pub(super) fn refresh_async_indexes(
state: &mut ExecutorState,
pending_delta: Option<&CommitDelta>,
) -> Result<bool, AedbError> {
let target_seq = state.visible_head_seq;
let mut changed = false;
for ((ns, table_name, index_name), def) in &state.catalog.async_indexes {
let key = (
NamespaceId::Project(ns.clone()),
table_name.clone(),
index_name.clone(),
);
let current = state
.keyspace
.async_indexes
.get(&key)
.map(|v| v.materialized_seq)
.unwrap_or(0);
if current >= target_seq {
continue;
}
let schema = state
.catalog
.tables
.get(&(ns.clone(), table_name.clone()))
.ok_or_else(|| AedbError::Validation("table missing".into()))?;
let existing_projection = state.keyspace.take_async_projection(&key.0, &key.1, &key.2);
let (mut projection_rows, mut materialized_seq) = existing_projection
.map(|projection| (projection.rows, projection.materialized_seq))
.unwrap_or_default();
if current == 0
&& projection_rows.is_empty()
&& let Some(table) = state.keyspace.table_by_namespace_key(ns, table_name)
{
for (pk, row) in &table.rows {
let projected = project_row(row, schema, &def.projected_columns)?;
projection_rows.insert(pk.clone(), projected);
}
materialized_seq = target_seq;
}
if materialized_seq < target_seq {
let Some(deltas) = state
.version_store
.deltas_since(materialized_seq, target_seq)
else {
warn!(
from_seq = materialized_seq,
to_seq = target_seq,
"async projection delta history evicted; rebuilding projection"
);
projection_rows.clear();
if let Some(table) = state.keyspace.table_by_namespace_key(ns, table_name) {
for (pk, row) in &table.rows {
let projected = project_row(row, schema, &def.projected_columns)?;
projection_rows.insert(pk.clone(), projected);
}
}
materialized_seq = target_seq;
state.keyspace.insert_async_projection(
key.0.clone(),
key.1.clone(),
key.2.clone(),
crate::storage::keyspace::AsyncProjectionData {
rows: projection_rows,
materialized_seq,
},
);
changed = true;
continue;
};
let mut requires_rebuild = false;
for delta in deltas {
for mutation in &delta.mutations {
if matches!(
mutation,
Mutation::TableIncU256 {
project_id,
scope_id,
table_name: mutation_table,
..
} | Mutation::TableDecU256 {
project_id,
scope_id,
table_name: mutation_table,
..
} | Mutation::DeleteWhere {
project_id,
scope_id,
table_name: mutation_table,
..
} | Mutation::UpdateWhere {
project_id,
scope_id,
table_name: mutation_table,
..
} | Mutation::UpdateWhereExpr {
project_id,
scope_id,
table_name: mutation_table,
..
} if namespace_key(project_id, scope_id) == *ns
&& mutation_table == table_name
) {
requires_rebuild = true;
break;
}
apply_projection_delta(
mutation,
ns,
table_name,
schema,
&def.projected_columns,
&mut projection_rows,
)?;
}
materialized_seq = delta.seq;
}
if let Some(delta) = pending_delta
&& delta.seq > materialized_seq
&& delta.seq <= target_seq
{
for mutation in &delta.mutations {
apply_projection_delta(
mutation,
ns,
table_name,
schema,
&def.projected_columns,
&mut projection_rows,
)?;
}
if requires_rebuild {
break;
}
materialized_seq = delta.seq;
}
if requires_rebuild {
projection_rows.clear();
if let Some(table) = state.keyspace.table_by_namespace_key(ns, table_name) {
for (pk, row) in &table.rows {
let projected = project_row(row, schema, &def.projected_columns)?;
projection_rows.insert(pk.clone(), projected);
}
}
materialized_seq = target_seq;
}
}
state.keyspace.insert_async_projection(
key.0,
key.1,
key.2,
crate::storage::keyspace::AsyncProjectionData {
rows: projection_rows,
materialized_seq,
},
);
changed = true;
}
if refresh_kv_projection_tables(state, pending_delta)? {
changed = true;
}
if refresh_accumulators(state)? {
changed = true;
}
Ok(changed)
}
fn refresh_accumulators(state: &mut ExecutorState) -> Result<bool, AedbError> {
let mut changed = false;
let accumulator_defs = &state.catalog.accumulators;
for ((ns, accumulator_name), def) in accumulator_defs {
let Some(acc) = state
.keyspace
.namespace_mut(NamespaceId::Project(ns.clone()))
.accumulators
.get_mut(accumulator_name)
else {
continue;
};
let previous_error = acc.projector_error.clone();
acc.projector_error = None;
let mut next_value = acc.value;
let mut next_last_applied = acc.last_applied_order_key;
let mut next_materialized_seq = acc.materialized_seq;
let mut pending_sum_overflowed = false;
let mut next_pending_delta_sum = if acc.pending_delta_sum_cache_valid {
acc.pending_delta_sum
} else {
let mut recomputed = 0i128;
for (_, record) in acc.deltas.range((
Bound::Excluded(acc.last_applied_order_key),
Bound::Unbounded,
)) {
let Some(next) = recomputed.checked_add(record.delta as i128) else {
acc.projector_error = Some("accumulator pending delta overflow".to_string());
pending_sum_overflowed = true;
break;
};
recomputed = next;
}
recomputed
};
if !pending_sum_overflowed {
for (order_key, record) in acc.deltas.range((
Bound::Excluded(acc.last_applied_order_key),
Bound::Unbounded,
)) {
let Some(sum) = next_value.checked_add(record.delta) else {
acc.projector_error =
Some("accumulator overflow during projection".to_string());
break;
};
let Some(next_pending) = next_pending_delta_sum.checked_sub(record.delta as i128)
else {
acc.projector_error = Some("accumulator pending delta overflow".to_string());
break;
};
next_value = sum;
next_last_applied = *order_key;
next_materialized_seq = next_materialized_seq.max(record.commit_seq);
next_pending_delta_sum = next_pending;
acc.applied_since_snapshot = acc.applied_since_snapshot.saturating_add(1);
}
}
if acc.projector_error.is_none() {
acc.pending_delta_sum = next_pending_delta_sum;
acc.pending_delta_sum_cache_valid = true;
}
if next_last_applied != acc.last_applied_order_key {
acc.value = next_value;
acc.exposure_limit_cached =
((acc.value as i128 * (10_000i128 - acc.exposure_margin_bps as i128)) / 10_000)
.clamp(i64::MIN as i128, i64::MAX as i128) as i64;
acc.exposure_limit_cache_valid = true;
acc.last_applied_order_key = next_last_applied;
acc.materialized_seq = next_materialized_seq;
changed = true;
}
if acc.applied_since_snapshot >= def.snapshot_every {
acc.deltas = acc
.deltas
.iter()
.filter(|(order_key, _)| **order_key > acc.last_applied_order_key)
.map(|(order_key, record)| (*order_key, record.clone()))
.collect();
acc.applied_since_snapshot = 0;
changed = true;
}
if let Some(window_commits) = def.dedupe_retain_commits
&& !acc.dedupe.is_empty()
{
let min_commit = state.visible_head_seq.saturating_sub(window_commits);
let has_stale_dedupe = acc
.dedupe
.iter()
.any(|(_, rec)| rec.commit_seq < min_commit);
if has_stale_dedupe {
let before = acc.dedupe.len();
acc.dedupe = acc
.dedupe
.iter()
.filter(|(_, rec)| rec.commit_seq >= min_commit)
.map(|(key, rec)| (key.clone(), rec.clone()))
.collect();
if acc.dedupe.len() != before {
changed = true;
}
}
}
if let Some(ttl_commits) = def.exposure_ttl_commits
&& !acc.open_exposures.is_empty()
{
let min_open_seq = state.visible_head_seq.saturating_sub(ttl_commits);
let mut removed_total = 0i64;
let mut expired_found = false;
for (_, rec) in &acc.open_exposures {
if rec.opened_at_seq < min_open_seq {
removed_total = removed_total.saturating_add(rec.amount);
expired_found = true;
}
}
if expired_found {
let before = acc.open_exposures.len();
let next_open_exposures = acc
.open_exposures
.iter()
.filter_map(|(id, rec)| {
if rec.opened_at_seq < min_open_seq {
None
} else {
Some((id.clone(), rec.clone()))
}
})
.collect();
if acc.total_exposure < removed_total {
acc.exposure_rebuild_required = true;
} else {
acc.total_exposure -= removed_total;
}
acc.open_exposures = next_open_exposures;
if acc.open_exposures.len() != before {
changed = true;
}
}
}
if acc.exposure_rebuild_required {
let mut rebuilt_exposure = 0i64;
for (_, rec) in &acc.open_exposures {
let Some(next) = rebuilt_exposure.checked_add(rec.amount) else {
acc.projector_error =
Some("accumulator exposure overflow during rebuild".into());
break;
};
rebuilt_exposure = next;
}
if acc.projector_error.is_none() {
if rebuilt_exposure != acc.total_exposure {
acc.total_exposure = rebuilt_exposure;
changed = true;
}
acc.exposure_rebuild_required = false;
}
}
if acc.projector_error != previous_error {
changed = true;
}
}
Ok(changed)
}
fn refresh_kv_projection_tables(
state: &mut ExecutorState,
pending_delta: Option<&CommitDelta>,
) -> Result<bool, AedbError> {
let target_seq = state.visible_head_seq;
let mut changed = false;
let projections: Vec<(String, String, String)> = state
.catalog
.kv_projections
.iter()
.map(|((project_id, scope_id), def)| {
(project_id.clone(), scope_id.clone(), def.table_name.clone())
})
.collect();
for (project_id, scope_id, table_name) in projections {
let ns = namespace_key(&project_id, &scope_id);
let current = state
.keyspace
.table_structural_version(&project_id, &scope_id, &table_name);
if current >= target_seq {
continue;
}
if current == 0 {
rebuild_kv_projection_rows(state, &project_id, &scope_id, &table_name, target_seq)?;
changed = true;
continue;
}
let Some(deltas) = state.version_store.deltas_since(current, target_seq) else {
rebuild_kv_projection_rows(state, &project_id, &scope_id, &table_name, target_seq)?;
changed = true;
continue;
};
for delta in deltas {
for mutation in &delta.mutations {
apply_kv_projection_delta(
state,
mutation,
&ns,
&project_id,
&scope_id,
&table_name,
)?;
}
set_kv_projection_structural_version(
state,
&project_id,
&scope_id,
&table_name,
delta.seq,
);
}
if let Some(delta) = pending_delta
&& delta.seq > current
&& delta.seq <= target_seq
{
for mutation in &delta.mutations {
apply_kv_projection_delta(
state,
mutation,
&ns,
&project_id,
&scope_id,
&table_name,
)?;
}
set_kv_projection_structural_version(
state,
&project_id,
&scope_id,
&table_name,
delta.seq,
);
}
changed = true;
}
Ok(changed)
}
fn rebuild_kv_projection_rows(
state: &mut ExecutorState,
project_id: &str,
scope_id: &str,
table_name: &str,
materialized_seq: u64,
) -> Result<(), AedbError> {
let entries = state
.keyspace
.try_kv_scan_prefix(project_id, scope_id, &[], usize::MAX)?;
let ns = namespace_key(project_id, scope_id);
if let Some(table) = state.keyspace.table_by_namespace_key_mut(&ns, table_name) {
table.rows.clear();
table.row_versions.clear();
table.pk_hash.clear();
table.row_cache.clear();
table.row_versions_cache.clear();
table.structural_version = materialized_seq;
} else {
state
.keyspace
.table_mut(project_id, scope_id, table_name)
.structural_version = materialized_seq;
}
for (key, entry) in entries {
upsert_kv_projection_row(
state,
project_id,
scope_id,
table_name,
KvProjectionRow {
key,
value: entry.value,
commit_seq: entry.version,
updated_at: materialized_seq,
},
)?;
}
Ok(())
}
fn apply_kv_projection_delta(
state: &mut ExecutorState,
mutation: &Mutation,
namespace: &str,
project_id: &str,
scope_id: &str,
table_name: &str,
) -> Result<(), AedbError> {
match mutation {
Mutation::KvSet {
project_id: p,
scope_id: s,
key,
value,
} if namespace_key(p, s) == namespace => {
upsert_kv_projection_row(
state,
project_id,
scope_id,
table_name,
KvProjectionRow {
key: key.clone(),
value: value.clone(),
commit_seq: state.keyspace.kv_version(project_id, scope_id, key),
updated_at: now_micros(),
},
)?;
}
Mutation::KvIncU256 {
project_id: p,
scope_id: s,
key,
..
}
| Mutation::KvDecU256 {
project_id: p,
scope_id: s,
key,
..
}
| Mutation::KvAddU256Ex {
project_id: p,
scope_id: s,
key,
..
}
| Mutation::KvSubU256Ex {
project_id: p,
scope_id: s,
key,
..
}
| Mutation::KvMaxU256 {
project_id: p,
scope_id: s,
key,
..
}
| Mutation::KvMinU256 {
project_id: p,
scope_id: s,
key,
..
}
| Mutation::KvMutateU256 {
project_id: p,
scope_id: s,
key,
..
}
| Mutation::KvAddU64Ex {
project_id: p,
scope_id: s,
key,
..
}
| Mutation::KvSubU64Ex {
project_id: p,
scope_id: s,
key,
..
}
| Mutation::KvMaxU64 {
project_id: p,
scope_id: s,
key,
..
}
| Mutation::KvMinU64 {
project_id: p,
scope_id: s,
key,
..
}
| Mutation::KvMutateU64 {
project_id: p,
scope_id: s,
key,
..
} if namespace_key(p, s) == namespace => {
if let Some(entry) = state.keyspace.kv_get(project_id, scope_id, key) {
upsert_kv_projection_row(
state,
project_id,
scope_id,
table_name,
KvProjectionRow {
key: key.clone(),
value: entry.value,
commit_seq: entry.version,
updated_at: now_micros(),
},
)?;
}
}
Mutation::KvDel {
project_id: p,
scope_id: s,
key,
} if namespace_key(p, s) == namespace => {
let _ = state.keyspace.delete_row(
project_id,
scope_id,
table_name,
&[crate::catalog::types::Value::Blob(key.clone())],
now_micros(),
);
}
Mutation::Ddl(crate::catalog::DdlOperation::DropScope {
project_id: p,
scope_id: s,
if_exists: true,
..
}) if namespace_key(p, s) == namespace => {
state.keyspace.drop_table(project_id, scope_id, table_name);
}
Mutation::Ddl(crate::catalog::DdlOperation::DropProject { project_id: p, .. })
if namespace.starts_with(&(p.to_string() + "::")) =>
{
state.keyspace.drop_table(project_id, scope_id, table_name);
}
Mutation::Ddl(crate::catalog::DdlOperation::DisableKvProjection {
project_id: p,
scope_id: s,
}) if namespace_key(p, s) == namespace => {
state.keyspace.drop_table(project_id, scope_id, table_name);
}
_ => {}
}
Ok(())
}
struct KvProjectionRow {
key: Vec<u8>,
value: Vec<u8>,
commit_seq: u64,
updated_at: u64,
}
fn upsert_kv_projection_row(
state: &mut ExecutorState,
project_id: &str,
scope_id: &str,
table_name: &str,
row: KvProjectionRow,
) -> Result<(), AedbError> {
let commit_seq_i64 = i64::try_from(row.commit_seq)
.map_err(|_| AedbError::Validation("commit_seq overflow".into()))?;
let updated_at = i64::try_from(row.updated_at)
.map_err(|_| AedbError::Validation("updated_at overflow".into()))?;
state.keyspace.upsert_row(
project_id,
scope_id,
table_name,
vec![crate::catalog::types::Value::Blob(row.key.clone())],
crate::catalog::types::Row {
values: vec![
crate::catalog::types::Value::Text(project_id.into()),
crate::catalog::types::Value::Text(scope_id.into()),
crate::catalog::types::Value::Blob(row.key),
crate::catalog::types::Value::Blob(row.value),
crate::catalog::types::Value::Integer(commit_seq_i64),
crate::catalog::types::Value::Timestamp(updated_at),
],
},
row.commit_seq,
);
Ok(())
}
fn set_kv_projection_structural_version(
state: &mut ExecutorState,
project_id: &str,
scope_id: &str,
table_name: &str,
seq: u64,
) {
let ns = namespace_key(project_id, scope_id);
if let Some(table) = state.keyspace.table_by_namespace_key_mut(&ns, table_name) {
table.structural_version = seq;
}
}
pub(super) fn project_row(
row: &crate::catalog::types::Row,
schema: &TableSchema,
projected_columns: &[String],
) -> Result<crate::catalog::types::Row, AedbError> {
let mut values = Vec::with_capacity(projected_columns.len());
for col in projected_columns {
let column_index = schema
.columns
.iter()
.position(|c| c.name == *col)
.ok_or_else(|| AedbError::Validation(format!("projection column missing: {col}")))?;
values.push(row.values[column_index].clone());
}
Ok(crate::catalog::types::Row { values })
}
pub(super) fn apply_projection_delta(
mutation: &Mutation,
ns: &str,
table_name: &str,
schema: &TableSchema,
projected_columns: &[String],
projection_rows: &mut im::OrdMap<EncodedKey, crate::catalog::types::Row>,
) -> Result<(), AedbError> {
match mutation {
Mutation::Insert {
project_id,
scope_id,
table_name: mutation_table,
primary_key,
row,
}
| Mutation::Upsert {
project_id,
scope_id,
table_name: mutation_table,
primary_key,
row,
} => {
if namespace_key(project_id, scope_id) == ns && mutation_table == table_name {
let projected = project_row(row, schema, projected_columns)?;
projection_rows.insert(EncodedKey::from_values(primary_key), projected);
}
}
Mutation::InsertBatch {
project_id,
scope_id,
table_name: mutation_table,
rows,
} => {
if namespace_key(project_id, scope_id) == ns && mutation_table == table_name {
for row in rows {
let pk = extract_pk_from_row(schema, row)?;
let projected = project_row(row, schema, projected_columns)?;
projection_rows.insert(EncodedKey::from_values(&pk), projected);
}
}
}
Mutation::Delete {
project_id,
scope_id,
table_name: mutation_table,
primary_key,
} => {
if namespace_key(project_id, scope_id) == ns && mutation_table == table_name {
projection_rows.remove(&EncodedKey::from_values(primary_key));
}
}
Mutation::Ddl(crate::catalog::DdlOperation::DropTable {
project_id,
scope_id,
table_name: dropped_table,
..
}) => {
if namespace_key(project_id, scope_id) == ns && dropped_table == table_name {
projection_rows.clear();
}
}
Mutation::Ddl(crate::catalog::DdlOperation::DropScope {
project_id,
scope_id,
..
}) => {
if namespace_key(project_id, scope_id) == ns {
projection_rows.clear();
}
}
Mutation::Ddl(crate::catalog::DdlOperation::DropProject { project_id, .. }) => {
if ns.starts_with(&(project_id.to_owned() + "::")) {
projection_rows.clear();
}
}
_ => {}
}
Ok(())
}