use serde::{Deserialize, Serialize};
use sha2::{Digest, Sha256};
use crate::model_task::ModelTask;
pub const MANIFEST_VERSION: u32 = 3;
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(transparent)]
pub struct DefinitionHash(pub String);
impl DefinitionHash {
pub fn as_str(&self) -> &str {
&self.0
}
}
impl std::fmt::Display for DefinitionHash {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(&self.0)
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(transparent)]
pub struct ArtifactDigest(pub String);
impl ArtifactDigest {
pub fn of_bytes(bytes: &[u8]) -> Self {
Self(hex::encode(Sha256::digest(bytes)))
}
pub fn as_str(&self) -> &str {
&self.0
}
}
impl std::fmt::Display for ArtifactDigest {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(&self.0)
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum ComputeDevice {
Cpu,
Cuda {
ordinal: u32,
},
Metal {
ordinal: u32,
},
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct MaterializationEnv {
pub engine_version: String,
pub device: ComputeDevice,
pub models: Vec<ModelIdentity>,
}
impl MaterializationEnv {
pub fn new(device: ComputeDevice, models: Vec<ModelIdentity>) -> Self {
Self {
engine_version: env!("CARGO_PKG_VERSION").to_string(),
device,
models,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct ModelIdentity {
pub model_id: String,
pub backend: String,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(tag = "producer", rename_all = "snake_case")]
pub enum ProducingDescriptor {
Inference {
model_id: String,
task: ModelTask,
source_id: String,
content_columns: Vec<String>,
key_column: String,
},
Embedding {
model_id: String,
task: ModelTask,
source_id: String,
columns: Vec<String>,
key_column: String,
dimensions: usize,
},
NeighborGraph {
source_table: String,
k: usize,
min_similarity_bits: Option<u32>,
mutual: bool,
self_exclude: bool,
exact: bool,
exact_max_rows: usize,
},
GraphPropagation {
source_table: String,
edge_source: EdgeSourceBinding,
kernel_id: String,
direction: PropagationDirection,
hops: usize,
alpha_bits: u64,
weighting: PropagationWeighting,
output: PropagationOutput,
dimensions: usize,
},
ContextSet {
encoder_id: String,
source_id: String,
embedding_table: Option<String>,
candidate_source: ContextCandidateSource,
value_columns: Vec<String>,
aggregator: ContextAggregator,
exclude_self: bool,
split: Option<String>,
dimensions: usize,
},
AsofJoin {
spine: String,
facts: String,
spine_by: Vec<String>,
facts_by: Vec<String>,
spine_time: String,
facts_time: String,
direction: AsofDirection,
boundary: AsofBoundary,
tolerance: Option<AsofTolerance>,
tie_break_column: Option<String>,
project: Vec<String>,
},
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum AsofDirection {
Backward,
Forward,
Nearest,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum AsofBoundary {
Inclusive,
Exclusive,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum AsofTolerance {
Duration(i64),
Steps(i64),
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum PropagationDirection {
Out,
In,
Undirected,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum PropagationWeighting {
Uniform,
DegreeNormalized,
EdgeSimilarity,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum PropagationOutput {
Final,
JumpingKnowledge,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum ContextAggregator {
Mean,
Sum,
Max,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum ContextHybridMerge {
Union,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(tag = "candidate", rename_all = "snake_case")]
pub enum ContextCandidateSource {
Ann {
k: usize,
},
Edges {
gather: ContextEdgeGather,
},
Hybrid {
ann_k: usize,
gather: ContextEdgeGather,
merge: ContextHybridMerge,
},
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct ContextEdgeGather {
pub edge_source: EdgeSourceBinding,
pub hops: usize,
pub fanout: Option<usize>,
pub direction: PropagationDirection,
pub edge_types: Option<Vec<String>>,
pub min_weight_bits: Option<u64>,
pub as_of: Option<String>,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(tag = "edge_source", rename_all = "snake_case")]
pub enum EdgeSourceBinding {
NeighborGraph {
table_name: String,
},
Registered {
source_id: String,
src_column: String,
dst_column: String,
type_column: Option<String>,
weight_column: Option<String>,
as_of_column: Option<String>,
},
}
impl ProducingDescriptor {
fn canonical_bytes(&self) -> Result<Vec<u8>, ManifestError> {
let value = serde_json::to_value(self)
.map_err(|e| ManifestError::UncanonicalDescriptor(e.to_string()))?;
let canonical = canonicalize_json(&value);
serde_json::to_vec(&canonical)
.map_err(|e| ManifestError::UncanonicalDescriptor(e.to_string()))
}
}
#[derive(Debug)]
pub struct Materialization<'a> {
pub descriptor: &'a ProducingDescriptor,
pub env: &'a MaterializationEnv,
pub inputs: Vec<InputAnchor>,
}
impl<'a> Materialization<'a> {
pub fn new(
descriptor: &'a ProducingDescriptor,
env: &'a MaterializationEnv,
inputs: Vec<InputAnchor>,
) -> Self {
Self {
descriptor,
env,
inputs,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct InputAnchor {
pub source: String,
pub anchor: AnchorValue,
pub kind: AnchorKind,
}
impl InputAnchor {
pub fn result_digest(source: impl Into<String>, digest: &ArtifactDigest) -> Self {
Self {
source: source.into(),
anchor: AnchorValue(digest.0.clone()),
kind: AnchorKind::ResultDigest,
}
}
pub fn mutable_version(source: impl Into<String>, version: u64) -> Self {
Self {
source: source.into(),
anchor: AnchorValue(version.to_string()),
kind: AnchorKind::MutableVersion,
}
}
pub fn source_version(source: impl Into<String>, version: impl Into<String>) -> Self {
Self {
source: source.into(),
anchor: AnchorValue(version.into()),
kind: AnchorKind::SourceVersion,
}
}
pub fn unpinned_at_instant(source: impl Into<String>, instant: impl Into<String>) -> Self {
Self {
source: source.into(),
anchor: AnchorValue(instant.into()),
kind: AnchorKind::UnpinnedAtInstant,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum AnchorKind {
ResultDigest,
MutableVersion,
SourceVersion,
UnpinnedAtInstant,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(transparent)]
pub struct AnchorValue(pub String);
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct MaterializationManifest {
pub artifact: ArtifactDigest,
pub definition_hash: DefinitionHash,
pub descriptor: ProducingDescriptor,
pub input_anchors: Vec<InputAnchor>,
pub produced_by: String,
pub produced_at: String,
pub engine_version: String,
pub manifest_version: u32,
}
impl MaterializationManifest {
pub fn compute(
descriptor: &ProducingDescriptor,
env: &MaterializationEnv,
inputs: Vec<InputAnchor>,
artifact: ArtifactDigest,
produced_by: String,
produced_at: String,
) -> Result<Self, ManifestError> {
let definition_hash = Self::definition_of(descriptor, env)?;
Ok(Self {
artifact,
definition_hash,
descriptor: descriptor.clone(),
input_anchors: inputs,
produced_by,
produced_at,
engine_version: env.engine_version.clone(),
manifest_version: MANIFEST_VERSION,
})
}
pub fn definition_of(
descriptor: &ProducingDescriptor,
env: &MaterializationEnv,
) -> Result<DefinitionHash, ManifestError> {
definition_hash(descriptor, env)
}
pub fn unpinned_inputs(&self) -> Vec<String> {
self.input_anchors
.iter()
.filter(|a| a.kind == AnchorKind::UnpinnedAtInstant)
.map(|a| a.source.clone())
.collect()
}
pub fn to_json_bytes(&self) -> Result<Vec<u8>, ManifestError> {
Ok(serde_json::to_vec_pretty(self)?)
}
pub fn from_json_bytes(bytes: &[u8]) -> Result<Self, ManifestError> {
let manifest: Self = serde_json::from_slice(bytes)?;
if manifest.manifest_version != MANIFEST_VERSION {
return Err(ManifestError::UnsupportedManifestVersion {
found: manifest.manifest_version,
supported: MANIFEST_VERSION,
});
}
Ok(manifest)
}
}
fn definition_hash(
descriptor: &ProducingDescriptor,
env: &MaterializationEnv,
) -> Result<DefinitionHash, ManifestError> {
let descriptor_bytes = descriptor.canonical_bytes()?;
let env_value = serde_json::to_value(env)?;
let env_bytes = serde_json::to_vec(&canonicalize_json(&env_value))?;
let mut hasher = Sha256::new();
hasher.update(b"jammi.materialization.definition.v1");
hasher.update(b"\0descriptor\0");
hasher.update((descriptor_bytes.len() as u64).to_le_bytes());
hasher.update(&descriptor_bytes);
hasher.update(b"\0env\0");
hasher.update((env_bytes.len() as u64).to_le_bytes());
hasher.update(&env_bytes);
Ok(DefinitionHash(hex::encode(hasher.finalize())))
}
fn canonicalize_json(value: &serde_json::Value) -> serde_json::Value {
match value {
serde_json::Value::Object(map) => {
let sorted: std::collections::BTreeMap<String, serde_json::Value> = map
.iter()
.map(|(k, v)| (k.clone(), canonicalize_json(v)))
.collect();
serde_json::Value::Object(sorted.into_iter().collect())
}
serde_json::Value::Array(items) => {
serde_json::Value::Array(items.iter().map(canonicalize_json).collect())
}
other => other.clone(),
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(tag = "verdict", rename_all = "snake_case")]
pub enum MatchVerdict {
Match,
Mismatch {
expected: String,
found: String,
},
MatchWithUnpinnedInputs {
unpinned: Vec<String>,
},
MissingManifest,
}
#[derive(Debug, thiserror::Error)]
pub enum ManifestError {
#[error("producing descriptor is not canonicalisable: {0}")]
UncanonicalDescriptor(String),
#[error("manifest sidecar missing for table `{0}`")]
MissingManifest(String),
#[error(
"table `{0}` is post-contract but has no manifest sidecar (torn write or bypassed funnel)"
)]
PostContractManifestMissing(String),
#[error(
"manifest format version {found} is incompatible with supported version {supported}; \
re-emit the artifact"
)]
UnsupportedManifestVersion {
found: u32,
supported: u32,
},
#[error("manifest serialisation error: {0}")]
Serde(#[from] serde_json::Error),
#[error(transparent)]
Storage(#[from] crate::storage::StorageError),
}
#[cfg(test)]
mod tests {
use super::*;
fn embedding_descriptor() -> ProducingDescriptor {
ProducingDescriptor::Embedding {
model_id: "sentence-transformers/all-MiniLM-L6-v2".into(),
task: ModelTask::TextEmbedding,
source_id: "docs".into(),
columns: vec!["title".into(), "body".into()],
key_column: "_row_id".into(),
dimensions: 384,
}
}
fn cpu_env() -> MaterializationEnv {
MaterializationEnv::new(
ComputeDevice::Cpu,
vec![ModelIdentity {
model_id: "sentence-transformers/all-MiniLM-L6-v2".into(),
backend: "candle".into(),
}],
)
}
#[test]
fn definition_hash_is_deterministic() {
let d = embedding_descriptor();
let env = cpu_env();
assert_eq!(
definition_hash(&d, &env).unwrap(),
definition_hash(&d, &env).unwrap()
);
}
#[test]
fn changing_a_parameter_changes_the_hash() {
let env = cpu_env();
let base = definition_hash(&embedding_descriptor(), &env).unwrap();
let mut other = embedding_descriptor();
if let ProducingDescriptor::Embedding { columns, .. } = &mut other {
columns.push("extra".into());
}
assert_ne!(base, definition_hash(&other, &env).unwrap());
}
#[test]
fn different_device_changes_the_hash() {
let d = embedding_descriptor();
let cpu = definition_hash(&d, &cpu_env()).unwrap();
let cuda = definition_hash(
&d,
&MaterializationEnv::new(
ComputeDevice::Cuda { ordinal: 0 },
vec![ModelIdentity {
model_id: "sentence-transformers/all-MiniLM-L6-v2".into(),
backend: "candle".into(),
}],
),
)
.unwrap();
assert_ne!(
cpu, cuda,
"CPU and CUDA must hash differently — float outputs differ"
);
}
#[test]
fn different_engine_version_changes_the_hash() {
let d = embedding_descriptor();
let base = definition_hash(&d, &cpu_env()).unwrap();
let mut bumped = cpu_env();
bumped.engine_version = "0.0.0-other".into();
assert_ne!(base, definition_hash(&d, &bumped).unwrap());
}
#[test]
fn different_model_version_changes_the_hash() {
let d = embedding_descriptor();
let base = definition_hash(&d, &cpu_env()).unwrap();
let other_model = MaterializationEnv::new(
ComputeDevice::Cpu,
vec![ModelIdentity {
model_id: "sentence-transformers/all-MiniLM-L12-v2".into(),
backend: "candle".into(),
}],
);
assert_ne!(base, definition_hash(&d, &other_model).unwrap());
}
#[test]
fn manifest_round_trips_through_json() {
let manifest = MaterializationManifest::compute(
&embedding_descriptor(),
&cpu_env(),
vec![InputAnchor::mutable_version("ref_ranges", 7)],
ArtifactDigest::of_bytes(b"parquet-bytes"),
"run-123".into(),
"2026-06-17T00:00:00Z".into(),
)
.unwrap();
let bytes = manifest.to_json_bytes().unwrap();
let back = MaterializationManifest::from_json_bytes(&bytes).unwrap();
assert_eq!(manifest, back);
}
fn manifest_at_version(version: u32) -> Vec<u8> {
let mut manifest = MaterializationManifest::compute(
&embedding_descriptor(),
&cpu_env(),
vec![],
ArtifactDigest::of_bytes(b"x"),
"run".into(),
"2026-06-17T00:00:00Z".into(),
)
.unwrap();
manifest.manifest_version = version;
serde_json::to_vec(&manifest).unwrap()
}
#[test]
fn newer_manifest_version_is_rejected() {
assert!(matches!(
MaterializationManifest::from_json_bytes(&manifest_at_version(MANIFEST_VERSION + 1)),
Err(ManifestError::UnsupportedManifestVersion { found, supported })
if found == MANIFEST_VERSION + 1 && supported == MANIFEST_VERSION
));
}
fn old_shape_manifest_without_descriptor(version: u32) -> Vec<u8> {
let manifest = MaterializationManifest::compute(
&embedding_descriptor(),
&cpu_env(),
vec![],
ArtifactDigest::of_bytes(b"x"),
"run".into(),
"2026-06-17T00:00:00Z".into(),
)
.unwrap();
let mut value = serde_json::to_value(&manifest).unwrap();
let object = value.as_object_mut().unwrap();
object.remove("descriptor");
object.insert("manifest_version".into(), serde_json::json!(version));
serde_json::to_vec(&value).unwrap()
}
#[test]
fn older_manifest_version_is_rejected_cleanly() {
let err =
MaterializationManifest::from_json_bytes(&old_shape_manifest_without_descriptor(1))
.expect_err(
"an old descriptor-less manifest must be rejected, not silently accepted",
);
assert!(
matches!(
err,
ManifestError::Serde(_) | ManifestError::UnsupportedManifestVersion { .. }
),
"expected a clean typed rejection (serde or version guard), got {err:?}"
);
}
fn no_model_env() -> MaterializationEnv {
MaterializationEnv::new(ComputeDevice::Cpu, Vec::new())
}
type LabelledMutation<T> = (&'static str, fn(&mut T));
fn assert_each_change_moves_hash<T: Clone>(
base: &T,
env: &MaterializationEnv,
to_descriptor: impl Fn(&T) -> ProducingDescriptor,
mutations: &[LabelledMutation<T>],
) {
let base_hash = definition_hash(&to_descriptor(base), env).unwrap();
for (label, mutate) in mutations {
let mut changed = base.clone();
mutate(&mut changed);
assert_ne!(
base_hash,
definition_hash(&to_descriptor(&changed), env).unwrap(),
"changing `{label}` must change the definition hash (lossy descriptor otherwise)"
);
}
}
fn neighbor_graph_descriptor(p: &BuildNeighborGraphFields) -> ProducingDescriptor {
ProducingDescriptor::NeighborGraph {
source_table: "emb".into(),
k: p.k,
min_similarity_bits: p.min_similarity_bits,
mutual: p.mutual,
self_exclude: p.self_exclude,
exact: p.exact,
exact_max_rows: p.exact_max_rows,
}
}
#[derive(Clone)]
struct BuildNeighborGraphFields {
k: usize,
min_similarity_bits: Option<u32>,
mutual: bool,
self_exclude: bool,
exact: bool,
exact_max_rows: usize,
}
#[test]
fn neighbor_graph_each_param_moves_the_hash() {
let base = BuildNeighborGraphFields {
k: 10,
min_similarity_bits: None,
mutual: false,
self_exclude: true,
exact: false,
exact_max_rows: 50_000,
};
assert_each_change_moves_hash(
&base,
&no_model_env(),
neighbor_graph_descriptor,
&[
("k", |p| p.k = 25),
("min_similarity", |p| {
p.min_similarity_bits = Some(0.7_f32.to_bits())
}),
("mutual", |p| p.mutual = true),
("self_exclude", |p| p.self_exclude = false),
("exact", |p| p.exact = true),
("exact_max_rows", |p| p.exact_max_rows = 1_000),
],
);
}
#[derive(Clone)]
struct GraphPropagationFields {
edge_source: EdgeSourceBinding,
direction: PropagationDirection,
hops: usize,
alpha_bits: u64,
weighting: PropagationWeighting,
output: PropagationOutput,
dimensions: usize,
}
fn graph_propagation_descriptor(p: &GraphPropagationFields) -> ProducingDescriptor {
ProducingDescriptor::GraphPropagation {
source_table: "emb".into(),
edge_source: p.edge_source.clone(),
kernel_id: "graph_propagate".into(),
direction: p.direction,
hops: p.hops,
alpha_bits: p.alpha_bits,
weighting: p.weighting,
output: p.output,
dimensions: p.dimensions,
}
}
#[test]
fn graph_propagation_each_param_moves_the_hash() {
let base = GraphPropagationFields {
edge_source: EdgeSourceBinding::NeighborGraph {
table_name: "graph".into(),
},
direction: PropagationDirection::Out,
hops: 2,
alpha_bits: 0.1_f64.to_bits(),
weighting: PropagationWeighting::DegreeNormalized,
output: PropagationOutput::Final,
dimensions: 384,
};
assert_each_change_moves_hash(
&base,
&no_model_env(),
graph_propagation_descriptor,
&[
("edge_source", |p| {
p.edge_source = EdgeSourceBinding::NeighborGraph {
table_name: "other_graph".into(),
}
}),
("direction", |p| {
p.direction = PropagationDirection::Undirected
}),
("hops", |p| p.hops = 3),
("alpha", |p| p.alpha_bits = 0.25_f64.to_bits()),
("weighting", |p| {
p.weighting = PropagationWeighting::EdgeSimilarity
}),
("output", |p| p.output = PropagationOutput::JumpingKnowledge),
("dimensions", |p| p.dimensions = 768),
],
);
}
#[test]
fn graph_propagation_registered_edge_columns_move_the_hash() {
let base = GraphPropagationFields {
edge_source: EdgeSourceBinding::Registered {
source_id: "edges".into(),
src_column: "from".into(),
dst_column: "to".into(),
type_column: None,
weight_column: None,
as_of_column: None,
},
direction: PropagationDirection::Out,
hops: 2,
alpha_bits: 0.1_f64.to_bits(),
weighting: PropagationWeighting::DegreeNormalized,
output: PropagationOutput::Final,
dimensions: 384,
};
let swapped = GraphPropagationFields {
edge_source: EdgeSourceBinding::Registered {
source_id: "edges".into(),
src_column: "to".into(),
dst_column: "from".into(),
type_column: None,
weight_column: None,
as_of_column: None,
},
..base.clone()
};
assert_ne!(
definition_hash(&graph_propagation_descriptor(&base), &no_model_env()).unwrap(),
definition_hash(&graph_propagation_descriptor(&swapped), &no_model_env()).unwrap(),
"swapping src/dst on the registered edge source must move the hash — \
two propagations over the same source with swapped columns are different graphs"
);
assert_each_change_moves_hash(
&base,
&no_model_env(),
graph_propagation_descriptor,
&[
("src_column", |p| {
if let EdgeSourceBinding::Registered { src_column, .. } = &mut p.edge_source {
*src_column = "s".into();
}
}),
("dst_column", |p| {
if let EdgeSourceBinding::Registered { dst_column, .. } = &mut p.edge_source {
*dst_column = "d".into();
}
}),
("type_column", |p| {
if let EdgeSourceBinding::Registered { type_column, .. } = &mut p.edge_source {
*type_column = Some("etype".into());
}
}),
("weight_column", |p| {
if let EdgeSourceBinding::Registered { weight_column, .. } = &mut p.edge_source
{
*weight_column = Some("w".into());
}
}),
("as_of_column", |p| {
if let EdgeSourceBinding::Registered { as_of_column, .. } = &mut p.edge_source {
*as_of_column = Some("valid_at".into());
}
}),
],
);
}
#[derive(Clone)]
struct ContextSetFields {
embedding_table: Option<String>,
candidate_source: ContextCandidateSource,
value_columns: Vec<String>,
aggregator: ContextAggregator,
exclude_self: bool,
split: Option<String>,
dimensions: usize,
}
fn context_set_descriptor(p: &ContextSetFields) -> ProducingDescriptor {
ProducingDescriptor::ContextSet {
encoder_id: "jammi:context-set".into(),
source_id: "patents".into(),
embedding_table: p.embedding_table.clone(),
candidate_source: p.candidate_source.clone(),
value_columns: p.value_columns.clone(),
aggregator: p.aggregator,
exclude_self: p.exclude_self,
split: p.split.clone(),
dimensions: p.dimensions,
}
}
#[test]
fn context_set_each_param_moves_the_hash() {
let base = ContextSetFields {
embedding_table: None,
candidate_source: ContextCandidateSource::Ann { k: 5 },
value_columns: vec!["label".into()],
aggregator: ContextAggregator::Mean,
exclude_self: true,
split: None,
dimensions: 32,
};
assert_each_change_moves_hash(
&base,
&no_model_env(),
context_set_descriptor,
&[
("embedding_table", |p| {
p.embedding_table = Some("pinned_source_table".into())
}),
("candidate_source.k", |p| {
p.candidate_source = ContextCandidateSource::Ann { k: 9 }
}),
("candidate_source.kind", |p| {
p.candidate_source = ContextCandidateSource::Edges {
gather: ContextEdgeGather {
edge_source: EdgeSourceBinding::NeighborGraph {
table_name: "g".into(),
},
hops: 1,
fanout: None,
direction: PropagationDirection::Out,
edge_types: None,
min_weight_bits: None,
as_of: None,
},
}
}),
("value_columns", |p| p.value_columns.push("extra".into())),
("aggregator", |p| p.aggregator = ContextAggregator::Max),
("exclude_self", |p| p.exclude_self = false),
("split", |p| p.split = Some("split = 'train'".into())),
("dimensions", |p| p.dimensions = 64),
],
);
}
#[test]
fn context_edge_gather_each_knob_moves_the_hash() {
let base = ContextSetFields {
embedding_table: None,
candidate_source: ContextCandidateSource::Edges {
gather: ContextEdgeGather {
edge_source: EdgeSourceBinding::Registered {
source_id: "edges".into(),
src_column: "from".into(),
dst_column: "to".into(),
type_column: None,
weight_column: None,
as_of_column: None,
},
hops: 1,
fanout: None,
direction: PropagationDirection::Out,
edge_types: None,
min_weight_bits: None,
as_of: None,
},
},
value_columns: Vec::new(),
aggregator: ContextAggregator::Mean,
exclude_self: true,
split: None,
dimensions: 32,
};
let with_gather = |mutate: fn(&mut ContextEdgeGather)| {
let mut f = base.clone();
if let ContextCandidateSource::Edges { gather } = &mut f.candidate_source {
mutate(gather);
}
f
};
let base_hash = definition_hash(&context_set_descriptor(&base), &no_model_env()).unwrap();
let cases: &[LabelledMutation<ContextEdgeGather>] = &[
("hops", |g| g.hops = 3),
("fanout", |g| g.fanout = Some(8)),
("direction", |g| g.direction = PropagationDirection::In),
("edge_types", |g| g.edge_types = Some(vec!["cites".into()])),
("min_weight", |g| {
g.min_weight_bits = Some(0.5_f64.to_bits())
}),
("as_of", |g| g.as_of = Some("2026-01-01".into())),
("edge_source", |g| {
g.edge_source = EdgeSourceBinding::NeighborGraph {
table_name: "g".into(),
}
}),
];
for (label, mutate) in cases {
let f = with_gather(*mutate);
assert_ne!(
base_hash,
definition_hash(&context_set_descriptor(&f), &no_model_env()).unwrap(),
"changing gather `{label}` must change the definition hash"
);
}
}
fn asof_descriptor(direction: AsofDirection) -> ProducingDescriptor {
ProducingDescriptor::AsofJoin {
spine: "spine".into(),
facts: "facts".into(),
spine_by: vec!["acct".into()],
facts_by: vec!["acct".into()],
spine_time: "ts".into(),
facts_time: "ts".into(),
direction,
boundary: AsofBoundary::Inclusive,
tolerance: None,
tie_break_column: None,
project: vec!["px".into()],
}
}
#[test]
fn asof_join_each_knob_moves_the_hash() {
let env = no_model_env();
let base_d = asof_descriptor(AsofDirection::Backward);
let base = definition_hash(&base_d, &env).unwrap();
assert_ne!(
base,
definition_hash(&asof_descriptor(AsofDirection::Forward), &env).unwrap()
);
let variants: Vec<(&str, ProducingDescriptor)> = vec![
("boundary", {
let mut d = base_d.clone();
if let ProducingDescriptor::AsofJoin { boundary, .. } = &mut d {
*boundary = AsofBoundary::Exclusive;
}
d
}),
("tolerance", {
let mut d = base_d.clone();
if let ProducingDescriptor::AsofJoin { tolerance, .. } = &mut d {
*tolerance = Some(AsofTolerance::Steps(3));
}
d
}),
("tie_break_column", {
let mut d = base_d.clone();
if let ProducingDescriptor::AsofJoin {
tie_break_column, ..
} = &mut d
{
*tie_break_column = Some("seq".into());
}
d
}),
("project", {
let mut d = base_d.clone();
if let ProducingDescriptor::AsofJoin { project, .. } = &mut d {
project.push("py".into());
}
d
}),
("spine_by", {
let mut d = base_d.clone();
if let ProducingDescriptor::AsofJoin { spine_by, .. } = &mut d {
spine_by.push("region".into());
}
d
}),
];
for (label, d) in variants {
assert_ne!(
base,
definition_hash(&d, &env).unwrap(),
"changing asof `{label}` must change the definition hash"
);
}
}
#[test]
fn unpinned_inputs_are_reported() {
let manifest = MaterializationManifest::compute(
&embedding_descriptor(),
&cpu_env(),
vec![
InputAnchor::mutable_version("pinned", 3),
InputAnchor::unpinned_at_instant("federated", "2026-06-17T00:00:00Z"),
],
ArtifactDigest::of_bytes(b"x"),
"run".into(),
"2026-06-17T00:00:00Z".into(),
)
.unwrap();
assert_eq!(manifest.unpinned_inputs(), vec!["federated".to_string()]);
}
#[test]
fn artifact_digest_is_content_addressed() {
assert_eq!(
ArtifactDigest::of_bytes(b"same"),
ArtifactDigest::of_bytes(b"same")
);
assert_ne!(
ArtifactDigest::of_bytes(b"a"),
ArtifactDigest::of_bytes(b"b")
);
}
}