use std::fmt;
pub const SCHEME: &str = "noetl://";
pub const DEFAULT_SHARD_COUNT: u32 = 256;
pub const DEFAULT_TENANT: &str = "default";
pub const DEFAULT_PROJECT: &str = "default";
pub const KIND_RESULTS: &str = "results";
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum LocatorError {
MissingScheme(String),
TooFewSegments(String),
EmptySegment(String),
}
impl fmt::Display for LocatorError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
LocatorError::MissingScheme(s) => {
write!(f, "locator must start with '{SCHEME}', got: {s:?}")
}
LocatorError::TooFewSegments(s) => write!(
f,
"locator must have at least tenant/project/kind/logical_path segments: {s:?}"
),
LocatorError::EmptySegment(s) => write!(f, "locator has an empty required segment: {s:?}"),
}
}
}
impl std::error::Error for LocatorError {}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ResourceLocator {
pub tenant: String,
pub project: String,
pub kind: String,
pub logical_path: String,
pub version: Option<String>,
}
impl ResourceLocator {
pub fn new(
tenant: impl Into<String>,
project: impl Into<String>,
kind: impl Into<String>,
logical_path: impl Into<String>,
version: Option<String>,
) -> Self {
Self {
tenant: tenant.into(),
project: project.into(),
kind: kind.into(),
logical_path: logical_path.into(),
version,
}
}
pub fn to_uri(&self) -> String {
let mut s = format!(
"{SCHEME}{}/{}/{}/{}",
self.tenant, self.project, self.kind, self.logical_path
);
if let Some(v) = &self.version {
s.push('@');
s.push_str(v);
}
s
}
pub fn parse(uri: &str) -> Result<Self, LocatorError> {
let rest = uri
.strip_prefix(SCHEME)
.ok_or_else(|| LocatorError::MissingScheme(uri.to_string()))?;
let (path, version) = match rest.rsplit_once('@') {
Some((p, v)) if !v.contains('/') => (p, Some(v.to_string())),
_ => (rest, None),
};
let parts: Vec<&str> = path.split('/').collect();
if parts.len() < 4 {
return Err(LocatorError::TooFewSegments(uri.to_string()));
}
let tenant = parts[0];
let project = parts[1];
let kind = parts[2];
let logical_path = parts[3..].join("/");
if tenant.is_empty() || project.is_empty() || kind.is_empty() || logical_path.is_empty() {
return Err(LocatorError::EmptySegment(uri.to_string()));
}
Ok(Self::new(
tenant,
project,
kind,
logical_path,
version,
))
}
pub fn shard_key(&self, affinity: Option<&str>, shard_count: u32) -> u32 {
shard_key(&self.tenant, &self.project, affinity, shard_count)
}
}
impl fmt::Display for ResourceLocator {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str(&self.to_uri())
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ResultCoordinates {
pub tenant: String,
pub project: String,
pub execution_id: i64,
pub step: String,
pub frame: u64,
pub row: u64,
pub attempt: u32,
}
impl ResultCoordinates {
#[allow(clippy::too_many_arguments)]
pub fn new(
tenant: Option<&str>,
project: Option<&str>,
execution_id: i64,
step: impl Into<String>,
frame: u64,
row: u64,
attempt: u32,
) -> Self {
Self {
tenant: tenant.unwrap_or(DEFAULT_TENANT).to_string(),
project: project.unwrap_or(DEFAULT_PROJECT).to_string(),
execution_id,
step: step.into(),
frame,
row,
attempt,
}
}
pub fn to_locator(&self) -> ResourceLocator {
ResourceLocator::new(
self.tenant.clone(),
self.project.clone(),
KIND_RESULTS,
format!(
"{}/{}/{}/{}/{}",
self.execution_id, self.step, self.frame, self.row, self.attempt
),
None,
)
}
pub fn logical_uri(&self) -> String {
self.to_locator().to_uri()
}
pub fn shard_key(&self, shard_count: u32) -> u32 {
shard_key(
&self.tenant,
&self.project,
Some(&self.execution_id.to_string()),
shard_count,
)
}
pub fn physical_key(&self, placement: &CellPlacement, date: &str, ext: &str) -> String {
format!(
"noetl/env={env}/region={region}/cell={cell}/shard={shard}/\
tenant={tenant}/project={project}/date={date}/execution={eid}/\
results/{step}/{frame}/{row}/{attempt}.{ext}",
env = placement.env,
region = placement.region,
cell = placement.cell,
shard = placement.shard,
tenant = self.tenant,
project = self.project,
date = date,
eid = self.execution_id,
step = self.step,
frame = self.frame,
row = self.row,
attempt = self.attempt,
ext = ext,
)
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct CellPlacement {
pub env: String,
pub region: String,
pub cell: String,
pub shard: String,
}
impl CellPlacement {
pub fn new(
env: impl Into<String>,
region: impl Into<String>,
cell: impl Into<String>,
shard_id: u32,
) -> Self {
Self {
env: env.into(),
region: region.into(),
cell: cell.into(),
shard: format!("s{shard_id:04}"),
}
}
}
fn fnv1a_64(bytes: &[u8]) -> u64 {
const OFFSET: u64 = 0xcbf2_9ce4_8422_2325;
const PRIME: u64 = 0x0000_0100_0000_01b3;
let mut hash = OFFSET;
for &b in bytes {
hash ^= b as u64;
hash = hash.wrapping_mul(PRIME);
}
hash
}
pub fn shard_key(tenant: &str, project: &str, affinity: Option<&str>, shard_count: u32) -> u32 {
debug_assert!(shard_count > 0, "shard_count must be non-zero");
let shard_count = shard_count.max(1);
let mut buf = Vec::with_capacity(tenant.len() + project.len() + 8);
buf.extend_from_slice(tenant.as_bytes());
buf.push(0x1f);
buf.extend_from_slice(project.as_bytes());
if let Some(a) = affinity {
buf.push(0x1f);
buf.extend_from_slice(a.as_bytes());
}
(fnv1a_64(&buf) % shard_count as u64) as u32
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct LegacyExecutionRef {
pub execution_id: i64,
pub name: String,
pub result_id: i64,
}
pub fn is_legacy_execution_ref(uri: &str) -> bool {
uri.strip_prefix(SCHEME)
.map(|rest| rest.starts_with("execution/"))
.unwrap_or(false)
}
pub fn parse_legacy_execution_ref(uri: &str) -> Result<LegacyExecutionRef, LocatorError> {
let path = uri
.strip_prefix(SCHEME)
.ok_or_else(|| LocatorError::MissingScheme(uri.to_string()))?;
let parts: Vec<&str> = path.split('/').collect();
if parts.len() < 5 || parts[0] != "execution" || parts[2] != "result" {
return Err(LocatorError::TooFewSegments(uri.to_string()));
}
let execution_id = parts[1]
.parse::<i64>()
.map_err(|_| LocatorError::EmptySegment(uri.to_string()))?;
let result_id = parts[parts.len() - 1]
.parse::<i64>()
.map_err(|_| LocatorError::EmptySegment(uri.to_string()))?;
let name = parts[3..parts.len() - 1].join("/");
if name.is_empty() {
return Err(LocatorError::EmptySegment(uri.to_string()));
}
Ok(LegacyExecutionRef {
execution_id,
name,
result_id,
})
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn logical_uri_round_trips() {
let loc = ResourceLocator::new("t_acme", "p_gen", "results", "exec_1/align/main", Some("v1".into()));
let uri = loc.to_uri();
assert_eq!(uri, "noetl://t_acme/p_gen/results/exec_1/align/main@v1");
assert_eq!(ResourceLocator::parse(&uri).unwrap(), loc);
}
#[test]
fn logical_uri_round_trips_without_version() {
let uri = "noetl://t/p/datasets/market/snap";
let loc = ResourceLocator::parse(uri).unwrap();
assert_eq!(loc.version, None);
assert_eq!(loc.logical_path, "market/snap");
assert_eq!(loc.to_uri(), uri);
}
#[test]
fn parse_rejects_bad_input() {
assert!(matches!(
ResourceLocator::parse("https://x/y/z/w"),
Err(LocatorError::MissingScheme(_))
));
assert!(matches!(
ResourceLocator::parse("noetl://t/p/results"),
Err(LocatorError::TooFewSegments(_))
));
assert!(matches!(
ResourceLocator::parse("noetl://t//results/x"),
Err(LocatorError::EmptySegment(_))
));
}
#[test]
fn result_coordinates_build_the_logical_uri() {
let c = ResultCoordinates::new(Some("t_acme"), Some("p_gen"), 325, "load_next_facility", 2, 4, 1);
assert_eq!(
c.logical_uri(),
"noetl://t_acme/p_gen/results/325/load_next_facility/2/4/1"
);
let loc = ResourceLocator::parse(&c.logical_uri()).unwrap();
assert_eq!(loc.kind, "results");
assert_eq!(loc.logical_path, "325/load_next_facility/2/4/1");
}
#[test]
fn result_coordinates_default_tenant_project() {
let c = ResultCoordinates::new(None, None, 7, "s", 0, 0, 1);
assert_eq!(c.tenant, "default");
assert_eq!(c.project, "default");
assert_eq!(c.logical_uri(), "noetl://default/default/results/7/s/0/0/1");
}
#[test]
fn physical_key_matches_blueprint_layout() {
let c = ResultCoordinates::new(Some("t_acme"), Some("p_gen"), 325, "align_reads", 3, 7, 2);
let placement = CellPlacement::new("prod", "usw2", "usw2-a", 42);
let key = c.physical_key(&placement, "2026-06-16", "feather");
assert_eq!(
key,
"noetl/env=prod/region=usw2/cell=usw2-a/shard=s0042/\
tenant=t_acme/project=p_gen/date=2026-06-16/execution=325/\
results/align_reads/3/7/2.feather"
);
}
#[test]
fn frame_row_and_attempt_are_collision_free() {
let base = ResultCoordinates::new(Some("t"), Some("p"), 1, "s", 0, 0, 1);
let other_frame = ResultCoordinates { frame: 5, ..base.clone() };
let other_row = ResultCoordinates { row: 3, ..base.clone() };
let other_attempt = ResultCoordinates { attempt: 2, ..base.clone() };
assert_ne!(base.logical_uri(), other_frame.logical_uri());
assert_ne!(base.logical_uri(), other_row.logical_uri());
assert_ne!(base.logical_uri(), other_attempt.logical_uri());
let pl = CellPlacement::new("prod", "usw2", "usw2-a", 0);
for other in [&other_frame, &other_row, &other_attempt] {
assert_ne!(
base.physical_key(&pl, "d", "feather"),
other.physical_key(&pl, "d", "feather")
);
}
}
#[test]
fn shard_key_is_stable() {
assert_eq!(shard_key("t_acme", "p_gen", Some("325"), 256), 235);
assert_eq!(shard_key("t_acme", "p_gen", None, 256), 244);
assert_eq!(
shard_key("t", "p", Some("e"), 256),
shard_key("t", "p", Some("e"), 256)
);
}
#[test]
fn shard_key_separator_prevents_boundary_collision() {
assert_ne!(
shard_key("a", "bc", None, 256),
shard_key("ab", "c", None, 256)
);
}
#[test]
fn shard_key_distributes_across_the_space() {
use std::collections::HashSet;
let mut buckets = HashSet::new();
for i in 0..2000 {
buckets.insert(shard_key("t", "p", Some(&i.to_string()), 256));
}
assert!(
buckets.len() > 200,
"expected wide shard spread, hit only {}",
buckets.len()
);
}
#[test]
fn shard_key_respects_count() {
for i in 0..500 {
let k = shard_key("t", "p", Some(&i.to_string()), 16);
assert!(k < 16, "shard {k} out of range for count 16");
}
}
#[test]
fn legacy_ref_parses_and_is_detected() {
let uri = "noetl://execution/123/result/my_step/456";
assert!(is_legacy_execution_ref(uri));
assert!(!is_legacy_execution_ref("noetl://t/p/results/1/s/0/1"));
let r = parse_legacy_execution_ref(uri).unwrap();
assert_eq!(r.execution_id, 123);
assert_eq!(r.name, "my_step");
assert_eq!(r.result_id, 456);
}
}