use serde::{Deserialize, Serialize};
use crate::catalog::result_repo::ResultTableRecord;
use crate::error::{JammiError, Result};
use crate::storage::StorageUrl;
use super::manifest::{AnchorKind, DefinitionHash, InputAnchor, ProducingDescriptor};
use super::ResultStore;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum CachePolicy {
Use,
#[default]
Bypass,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(tag = "cache_outcome", rename_all = "snake_case")]
pub enum CacheOutcome {
Computed,
Reused {
table: String,
},
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(tag = "staleness", rename_all = "snake_case")]
pub enum Staleness {
Fresh,
Stale {
reasons: Vec<StaleReason>,
},
Undecidable {
unpinned: Vec<String>,
decided_reasons: Vec<StaleReason>,
},
MissingManifest,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(tag = "reason", rename_all = "snake_case")]
pub enum StaleReason {
DefinitionChanged {
recorded: String,
current: String,
},
InputAdvanced {
source: String,
recorded: String,
current: String,
},
InputVanished {
source: String,
},
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(tag = "current_anchor", rename_all = "snake_case")]
pub enum CurrentAnchor {
ResultDigest(String),
Undecidable,
Vanished,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct DerivesFromEdge {
pub input: String,
pub derived: String,
pub kind: AnchorKind,
}
impl ResultStore {
pub async fn lookup_cached(
&self,
definition: &DefinitionHash,
inputs: &[InputAnchor],
) -> Result<Option<String>> {
if inputs
.iter()
.any(|a| a.kind == AnchorKind::UnpinnedAtInstant)
{
return Ok(None);
}
let candidates = self
.catalog()
.find_ready_result_tables_by_definition(definition.as_str())
.await?;
for candidate in candidates {
let Some(ref anchors_json) = candidate.input_anchors_json else {
continue;
};
let recorded: Vec<InputAnchor> = serde_json::from_str(anchors_json)?;
if anchor_sets_equal(&recorded, inputs) {
return Ok(Some(candidate.table_name));
}
}
Ok(None)
}
pub async fn probe_cache(
&self,
definition: &DefinitionHash,
inputs: &[InputAnchor],
) -> Result<Option<String>> {
Ok(self
.probe_cache_record(definition, inputs)
.await?
.map(|record| record.table_name))
}
pub async fn probe_cache_record(
&self,
definition: &DefinitionHash,
inputs: &[InputAnchor],
) -> Result<Option<ResultTableRecord>> {
let Some(table_name) = self.lookup_cached(definition, inputs).await? else {
return Ok(None);
};
let Some(record) = self.catalog().get_result_table(&table_name).await? else {
return Ok(None);
};
let parquet_url = StorageUrl::parse(&record.parquet_path)?;
let handle = self.open_parquet(&parquet_url)?;
let path = handle.data_path()?;
if handle.exists(&path).await? {
Ok(Some(record))
} else {
Ok(None)
}
}
pub async fn staleness(
&self,
table: &ResultTableRecord,
current_definition: &DefinitionHash,
) -> Result<Staleness> {
let (Some(recorded_hash), Some(anchors_json)) =
(&table.definition_hash, &table.input_anchors_json)
else {
return Ok(Staleness::MissingManifest);
};
let mut decided: Vec<StaleReason> = Vec::new();
let mut unpinned: Vec<String> = Vec::new();
if recorded_hash != current_definition.as_str() {
decided.push(StaleReason::DefinitionChanged {
recorded: recorded_hash.clone(),
current: current_definition.as_str().to_string(),
});
}
let recorded_anchors: Vec<InputAnchor> = serde_json::from_str(anchors_json)?;
for anchor in &recorded_anchors {
match self.current_anchor(anchor).await? {
CurrentAnchor::ResultDigest(current) => {
if current != anchor.anchor.0 {
decided.push(StaleReason::InputAdvanced {
source: anchor.source.clone(),
recorded: anchor.anchor.0.clone(),
current,
});
}
}
CurrentAnchor::Vanished => {
decided.push(StaleReason::InputVanished {
source: anchor.source.clone(),
});
}
CurrentAnchor::Undecidable => {
unpinned.push(anchor.source.clone());
}
}
}
if !unpinned.is_empty() {
return Ok(Staleness::Undecidable {
unpinned,
decided_reasons: decided,
});
}
if decided.is_empty() {
Ok(Staleness::Fresh)
} else {
Ok(Staleness::Stale { reasons: decided })
}
}
pub async fn current_anchor(&self, anchor: &InputAnchor) -> Result<CurrentAnchor> {
match anchor.kind {
AnchorKind::ResultDigest => {
let Some(parent) = self.catalog().get_result_table(&anchor.source).await? else {
return Ok(CurrentAnchor::Vanished);
};
let parquet_url = StorageUrl::parse(&parent.parquet_path)?;
match self.read_materialization_manifest(&parquet_url).await? {
Some(manifest) => Ok(CurrentAnchor::ResultDigest(manifest.artifact.0)),
None => {
let handle = self.open_parquet(&parquet_url)?;
let path = handle.data_path()?;
let bytes = handle.get_bytes(&path).await?;
Ok(CurrentAnchor::ResultDigest(
super::manifest::ArtifactDigest::of_bytes(&bytes).0,
))
}
}
}
AnchorKind::UnpinnedAtInstant
| AnchorKind::MutableVersion
| AnchorKind::SourceVersion => Ok(CurrentAnchor::Undecidable),
}
}
pub async fn producing_descriptor(
&self,
table: &ResultTableRecord,
) -> Result<ProducingDescriptor> {
let parquet_url = StorageUrl::parse(&table.parquet_path)?;
match self.read_materialization_manifest(&parquet_url).await? {
Some(manifest) => Ok(manifest.descriptor),
None => Err(JammiError::NotRecomputable {
table: table.table_name.clone(),
}),
}
}
pub async fn derives_from_closure(&self, source: &str) -> Result<Vec<DerivesFromEdge>> {
struct Frame {
node: String,
edges: std::vec::IntoIter<DerivesFromEdge>,
}
let mut expanded: std::collections::HashSet<String> = std::collections::HashSet::new();
let mut on_path: std::collections::HashSet<String> = std::collections::HashSet::new();
let mut collected: Vec<DerivesFromEdge> = Vec::new();
on_path.insert(source.to_string());
let mut stack: Vec<Frame> = vec![Frame {
node: source.to_string(),
edges: self.derives_from(source).await?.into_iter(),
}];
while let Some(frame) = stack.last_mut() {
match frame.edges.next() {
Some(edge) => {
let child = edge.derived.clone();
if on_path.contains(&child) {
return Err(JammiError::DependencyCycle { table: child });
}
collected.push(edge);
if expanded.contains(&child) {
continue;
}
on_path.insert(child.clone());
let edges = self.derives_from(&child).await?.into_iter();
stack.push(Frame { node: child, edges });
}
None => {
let done = stack.pop().expect("frame present in this arm");
on_path.remove(&done.node);
expanded.insert(done.node);
}
}
}
Ok(collected)
}
pub async fn derives_from(&self, source: &str) -> Result<Vec<DerivesFromEdge>> {
let candidates = self
.catalog()
.find_ready_result_tables_anchored_on(source)
.await?;
let mut edges = Vec::new();
for candidate in candidates {
let Some(ref anchors_json) = candidate.input_anchors_json else {
continue;
};
let anchors: Vec<InputAnchor> = serde_json::from_str(anchors_json)?;
for anchor in anchors {
if anchor.source == source {
edges.push(DerivesFromEdge {
input: source.to_string(),
derived: candidate.table_name.clone(),
kind: anchor.kind,
});
}
}
}
Ok(edges)
}
}
fn anchor_sets_equal(a: &[InputAnchor], b: &[InputAnchor]) -> bool {
a.len() == b.len() && a.iter().all(|x| b.contains(x)) && b.iter().all(|y| a.contains(y))
}