use super::keys::{
EdgeRef, edge_version_prefix, is_gdpr_erasure, is_sentinel, is_tombstone,
parse_versioned_edge_key, versioned_edge_key,
};
use super::payload::EdgeValuePayload;
use crate::engine::graph::edge_store::store::{EDGES, Edge, EdgeStore, redb_err};
impl EdgeStore {
pub fn ceiling_resolve_edge(
&self,
edge: EdgeRef<'_>,
system_as_of: i64,
valid_at_ms: Option<i64>,
) -> crate::Result<Option<Vec<u8>>> {
if system_as_of < 0 {
return Err(crate::Error::BadRequest {
detail: format!("ceiling_resolve_edge: negative system_as_of={system_as_of}"),
});
}
let prefix = edge_version_prefix(edge.collection, edge.src, edge.label, edge.dst);
let upper = versioned_edge_key(
edge.collection,
edge.src,
edge.label,
edge.dst,
system_as_of,
)?;
let t = edge.tid.as_u64();
let read_txn = self
.db
.begin_read()
.map_err(|e| redb_err("begin_read", e))?;
let table = read_txn
.open_table(EDGES)
.map_err(|e| redb_err("open edges", e))?;
let range = table
.range((t, prefix.as_str())..=(t, upper.as_str()))
.map_err(|e| redb_err("ceiling range", e))?;
for entry in range.rev() {
let (k, v) = entry.map_err(|e| redb_err("ceiling iter", e))?;
let (kt, composite) = k.value();
if kt != t || !composite.starts_with(&prefix) {
break;
}
let bytes = v.value();
if is_tombstone(bytes) || is_gdpr_erasure(bytes) {
return Ok(None);
}
let payload = EdgeValuePayload::decode(bytes)?;
match valid_at_ms {
Some(vt) if !(payload.valid_from_ms <= vt && vt < payload.valid_until_ms) => {
continue;
}
_ => return Ok(Some(payload.properties)),
}
}
Ok(None)
}
}
#[allow(dead_code)]
pub(crate) fn edge_from_versioned_entry(
composite: &str,
value: &[u8],
) -> Option<(Edge, EdgeValuePayload)> {
if is_sentinel(value) {
return None;
}
let (collection, src, label, dst, _sys) = parse_versioned_edge_key(composite)?;
let payload = EdgeValuePayload::decode(value).ok()?;
Some((
Edge {
collection: collection.to_string(),
src_id: src.to_string(),
label: label.to_string(),
dst_id: dst.to_string(),
properties: payload.properties.clone(),
},
payload,
))
}