use std::{
collections::{BTreeMap, BTreeSet},
fs,
path::{Path, PathBuf},
};
const CONTINUATION_POLICY_METHOD_REFERENCE_BASELINE_0432: usize = 3;
const CONTINUATION_POLICY_METHOD_REFERENCE_SOFT_BUDGET_DELTA: usize = 0;
const CONTINUATION_POLICY_METHOD_TOKENS: &[&str] = &[
"requires_anchor(",
"requires_strict_advance(",
"is_grouped_safe(",
];
const EXECUTOR_FORBIDDEN_CONTINUATION_DEFINITION_FUNCTIONS: &[&str] = &[
"continuation_advanced",
"resume_bounds_from_refs",
"validate_index_scan_continuation_envelope",
"validate_index_scan_continuation_advancement",
"next_cursor_for_materialized_rows",
"effective_page_offset_for_window",
"effective_keep_count_for_limit",
];
const CURSOR_SIGNATURE_VALIDATION_FACADE_TOKENS: &[&str] = &[
"crate::db::cursor::validate_cursor_compatibility::<",
"crate::db::cursor::revalidate_cursor::<",
"crate::db::cursor::revalidate_grouped_cursor(",
];
const CURSOR_SIGNATURE_VALIDATION_INTERNAL_TOKENS: &[&str] = &[
"crate::db::cursor::spine::",
"crate::db::cursor::validation::",
"cursor::spine::",
"cursor::validation::",
];
fn collect_rust_sources(root: &Path, out: &mut Vec<PathBuf>) {
let entries = fs::read_dir(root)
.unwrap_or_else(|err| panic!("failed to read source directory {}: {err}", root.display()));
for entry in entries {
let entry = entry.unwrap_or_else(|err| {
panic!(
"failed to read source directory entry under {}: {err}",
root.display()
)
});
let path = entry.path();
if path.is_dir() {
collect_rust_sources(path.as_path(), out);
continue;
}
if path.extension().is_some_and(|ext| ext == "rs") {
out.push(path);
}
}
}
fn strip_cfg_test_items(source: &str) -> String {
let mut output = String::new();
let mut pending_cfg_test = false;
let mut skip_depth = 0usize;
for line in source.lines() {
let trimmed = line.trim();
if skip_depth > 0 {
skip_depth = skip_depth
.saturating_add(line.matches('{').count())
.saturating_sub(line.matches('}').count());
continue;
}
if trimmed.starts_with("#[cfg(test)]") {
pending_cfg_test = true;
continue;
}
if pending_cfg_test {
let opens = line.matches('{').count();
let closes = line.matches('}').count();
if opens > 0 {
skip_depth = opens.saturating_sub(closes);
}
pending_cfg_test = false;
continue;
}
output.push_str(line);
output.push('\n');
}
output
}
fn continuation_policy_method_reference_count(source: &str) -> usize {
CONTINUATION_POLICY_METHOD_TOKENS
.iter()
.map(|token| source.matches(token).count())
.sum()
}
fn contains_function_definition(source: &str, function_name: &str) -> bool {
let non_generic = format!("fn {function_name}(");
let generic = format!("fn {function_name}<");
source.contains(non_generic.as_str()) || source.contains(generic.as_str())
}
fn runtime_continuation_policy_reference_counts() -> BTreeMap<String, usize> {
let source_root = Path::new(env!("CARGO_MANIFEST_DIR")).join("src/db/executor");
let mut sources = Vec::new();
collect_rust_sources(source_root.as_path(), &mut sources);
sources.sort();
let mut counts = BTreeMap::new();
for source_path in sources {
if source_path
.components()
.any(|part| part.as_os_str() == "tests")
|| source_path
.file_name()
.is_some_and(|name| name == "tests.rs")
{
continue;
}
let source = fs::read_to_string(&source_path)
.unwrap_or_else(|err| panic!("failed to read {}: {err}", source_path.display()));
let runtime_source = strip_cfg_test_items(source.as_str());
let reference_count = continuation_policy_method_reference_count(runtime_source.as_str());
if reference_count == 0 {
continue;
}
let relative = source_path
.strip_prefix(Path::new(env!("CARGO_MANIFEST_DIR")))
.unwrap_or_else(|err| {
panic!(
"failed to compute relative source path for {}: {err}",
source_path.display()
)
})
.to_string_lossy()
.replace('\\', "/");
counts.insert(relative, reference_count);
}
counts
}
fn runtime_forbidden_continuation_definitions() -> BTreeMap<String, Vec<String>> {
let source_root = Path::new(env!("CARGO_MANIFEST_DIR")).join("src/db/executor");
let mut sources = Vec::new();
collect_rust_sources(source_root.as_path(), &mut sources);
sources.sort();
let mut violations = BTreeMap::new();
for source_path in sources {
if source_path
.components()
.any(|part| part.as_os_str() == "tests")
|| source_path
.file_name()
.is_some_and(|name| name == "tests.rs")
{
continue;
}
let source = fs::read_to_string(&source_path)
.unwrap_or_else(|err| panic!("failed to read {}: {err}", source_path.display()));
let runtime_source = strip_cfg_test_items(source.as_str());
let mut matched = Vec::new();
for function_name in EXECUTOR_FORBIDDEN_CONTINUATION_DEFINITION_FUNCTIONS {
if contains_function_definition(runtime_source.as_str(), function_name) {
matched.push((*function_name).to_string());
}
}
if matched.is_empty() {
continue;
}
let relative = source_path
.strip_prefix(Path::new(env!("CARGO_MANIFEST_DIR")))
.unwrap_or_else(|err| {
panic!(
"failed to compute relative source path for {}: {err}",
source_path.display()
)
})
.to_string_lossy()
.replace('\\', "/");
violations.insert(relative, matched);
}
violations
}
fn runtime_cursor_signature_validation_facade_sites() -> BTreeMap<String, Vec<String>> {
let source_root = Path::new(env!("CARGO_MANIFEST_DIR")).join("src/db");
let mut sources = Vec::new();
collect_rust_sources(source_root.as_path(), &mut sources);
sources.sort();
let mut sites = BTreeMap::new();
for source_path in sources {
if source_path
.components()
.any(|part| part.as_os_str() == "tests")
|| source_path
.file_name()
.is_some_and(|name| name == "tests.rs")
{
continue;
}
let source = fs::read_to_string(&source_path)
.unwrap_or_else(|err| panic!("failed to read {}: {err}", source_path.display()));
let runtime_source = strip_cfg_test_items(source.as_str());
let matched = CURSOR_SIGNATURE_VALIDATION_FACADE_TOKENS
.iter()
.filter(|token| runtime_source.contains(**token))
.map(|token| (*token).to_string())
.collect::<Vec<_>>();
if matched.is_empty() {
continue;
}
let relative = source_path
.strip_prefix(Path::new(env!("CARGO_MANIFEST_DIR")))
.unwrap_or_else(|err| {
panic!(
"failed to compute relative source path for {}: {err}",
source_path.display()
)
})
.to_string_lossy()
.replace('\\', "/");
sites.insert(relative, matched);
}
sites
}
fn runtime_cursor_signature_validation_internal_references() -> BTreeMap<String, Vec<String>> {
let source_root = Path::new(env!("CARGO_MANIFEST_DIR")).join("src/db");
let mut sources = Vec::new();
collect_rust_sources(source_root.as_path(), &mut sources);
sources.sort();
let mut references = BTreeMap::new();
for source_path in sources {
if source_path
.components()
.any(|part| part.as_os_str() == "tests")
|| source_path
.file_name()
.is_some_and(|name| name == "tests.rs")
{
continue;
}
let source = fs::read_to_string(&source_path)
.unwrap_or_else(|err| panic!("failed to read {}: {err}", source_path.display()));
let runtime_source = strip_cfg_test_items(source.as_str());
let matched = CURSOR_SIGNATURE_VALIDATION_INTERNAL_TOKENS
.iter()
.filter(|token| runtime_source.contains(**token))
.map(|token| (*token).to_string())
.collect::<Vec<_>>();
if matched.is_empty() {
continue;
}
let relative = source_path
.strip_prefix(Path::new(env!("CARGO_MANIFEST_DIR")))
.unwrap_or_else(|err| {
panic!(
"failed to compute relative source path for {}: {err}",
source_path.display()
)
})
.to_string_lossy()
.replace('\\', "/");
references.insert(relative, matched);
}
references
}
#[test]
fn runtime_continuation_policy_method_references_stay_within_continuation_boundary() {
let counts = runtime_continuation_policy_reference_counts();
let allowed_prefix = "src/db/executor/planning/continuation/";
let violations = counts
.keys()
.filter(|relative| !relative.starts_with(allowed_prefix))
.cloned()
.collect::<Vec<_>>();
assert!(
violations.is_empty(),
"runtime ContinuationPolicy method references must stay continuation-boundary-local; violations: {}",
violations.join(", "),
);
}
#[test]
fn runtime_continuation_policy_method_reference_count_stays_within_soft_budget() {
let counts = runtime_continuation_policy_reference_counts();
let total_references = counts.values().copied().sum::<usize>();
let max_references = CONTINUATION_POLICY_METHOD_REFERENCE_BASELINE_0432
+ CONTINUATION_POLICY_METHOD_REFERENCE_SOFT_BUDGET_DELTA;
if max_references == 0 {
assert_eq!(
total_references, 0,
"continuation-policy method fan-out exceeded zero-reference contract",
);
} else {
assert!(
total_references <= max_references,
"continuation-policy method fan-out exceeded baseline budget: total={total_references}, max={max_references}",
);
}
}
#[test]
fn runtime_continuation_semantic_definitions_stay_cursor_owned() {
let violations = runtime_forbidden_continuation_definitions();
assert!(
violations.is_empty(),
"continuation semantic definitions must remain cursor-owned (`cursor/envelope.rs` + `cursor/continuation.rs`); executor duplicates found: {violations:?}",
);
}
#[test]
fn runtime_cursor_signature_validation_entrypoints_stay_cursor_facade_owned() {
let facade_sites = runtime_cursor_signature_validation_facade_sites();
let actual = facade_sites.keys().cloned().collect::<BTreeSet<_>>();
let expected =
std::iter::once("src/db/query/plan/continuation.rs".to_string()).collect::<BTreeSet<_>>();
assert_eq!(
actual, expected,
"cursor-signature validation entrypoints must remain centralized in cursor facade calls from query/plan continuation; actual sites: {facade_sites:?}",
);
let internal_references = runtime_cursor_signature_validation_internal_references();
let has_internal_violations = internal_references
.keys()
.filter(|relative| !relative.starts_with("src/db/cursor/"))
.any(|_| true);
assert!(
!has_internal_violations,
"cursor-signature validation internals must stay cursor-owned; non-cursor references: {internal_references:?}",
);
}