#![allow(dead_code)]
use std::collections::HashMap;
use std::sync::{LazyLock, Mutex};
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use anyhow::{Context, Result};
use greentic_deploy_spec::{DeploymentId, RevisionId};
use redis::aio::ConnectionManager;
use ulid::Ulid;
pub(crate) const MAX_PINS: usize = 16_384;
pub(crate) const MAX_PINS_PER_TENANT: usize = 4_096;
const MAX_SCOPE_BYTES: usize = 256;
const REDIS_OP_TIMEOUT: Duration = Duration::from_millis(50);
const REDIS_KEY_PREFIX: &str = "gt:rev_pin";
const REDIS_TRACKING_PREFIX: &str = "gt:rev_pin_set";
#[derive(Clone, Copy, Debug)]
pub struct PinKey<'a> {
pub env_id: &'a str,
pub deployment_id: DeploymentId,
pub tenant: &'a str,
pub hint: &'a str,
}
#[async_trait::async_trait]
pub trait RevisionPinStore: Send + Sync {
async fn try_pin(
&self,
key: PinKey<'_>,
revision_id: RevisionId,
generation: u64,
ttl: Duration,
) -> PinOutcome;
async fn lookup(&self, key: PinKey<'_>, current_generation: u64) -> Option<RevisionId>;
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum PinOutcome {
Inserted { revision_id: RevisionId },
Existing { revision_id: RevisionId },
Skipped { revision_id: RevisionId },
}
impl PinOutcome {
pub fn revision_id(&self) -> RevisionId {
match self {
Self::Inserted { revision_id }
| Self::Existing { revision_id }
| Self::Skipped { revision_id } => *revision_id,
}
}
}
#[derive(Clone, Debug)]
struct InMemoryEntry {
revision_id: RevisionId,
generation: u64,
expires_at: SystemTime,
}
#[derive(Debug, Default)]
pub struct InMemoryPinStore {
inner: Mutex<HashMap<(DeploymentId, String, String, String), InMemoryEntry>>,
}
impl InMemoryPinStore {
pub fn new() -> Self {
Self::default()
}
#[cfg(test)]
pub(crate) fn len(&self) -> usize {
self.inner.lock().expect("pin mutex poisoned").len()
}
}
fn owned_key(key: PinKey<'_>) -> (DeploymentId, String, String, String) {
(
key.deployment_id,
key.env_id.to_string(),
key.tenant.to_string(),
key.hint.to_string(),
)
}
#[async_trait::async_trait]
impl RevisionPinStore for InMemoryPinStore {
async fn try_pin(
&self,
key: PinKey<'_>,
revision_id: RevisionId,
generation: u64,
ttl: Duration,
) -> PinOutcome {
let map_key = owned_key(key);
let now = SystemTime::now();
let mut guard = self.inner.lock().expect("pin mutex poisoned");
if let Some(existing) = guard.get(&map_key)
&& existing.expires_at > now
&& existing.generation == generation
{
return PinOutcome::Existing {
revision_id: existing.revision_id,
};
}
guard.remove(&map_key);
if guard.len() >= MAX_PINS {
guard.retain(|_, e| e.expires_at > now);
if guard.len() >= MAX_PINS
&& let Some(victim) = guard
.iter()
.min_by_key(|(_, e)| e.expires_at)
.map(|(k, _)| k.clone())
{
guard.remove(&victim);
}
}
guard.insert(
map_key,
InMemoryEntry {
revision_id,
generation,
expires_at: now + ttl,
},
);
PinOutcome::Inserted { revision_id }
}
async fn lookup(&self, key: PinKey<'_>, current_generation: u64) -> Option<RevisionId> {
let map_key = owned_key(key);
let now = SystemTime::now();
let mut guard = self.inner.lock().expect("pin mutex poisoned");
match guard.get(&map_key) {
Some(entry) if entry.expires_at > now && entry.generation == current_generation => {
Some(entry.revision_id)
}
Some(_) => {
guard.remove(&map_key);
None
}
None => None,
}
}
}
const TRY_PIN_SCRIPT: &str = r#"
local existing = redis.call('GET', KEYS[1])
if existing then
local _, gen = string.match(existing, '^([^|]+)|([^|]+)|')
if gen == ARGV[1] then
return {1, existing}
end
redis.call('DEL', KEYS[1])
redis.call('SREM', KEYS[2], KEYS[1])
end
local card = redis.call('SCARD', KEYS[2])
local cap = tonumber(ARGV[4])
if card >= cap then
return {2}
end
redis.call('SET', KEYS[1], ARGV[2], 'EX', tonumber(ARGV[3]))
redis.call('SADD', KEYS[2], KEYS[1])
redis.call('EXPIRE', KEYS[2], tonumber(ARGV[3]))
return {0, ARGV[2]}
"#;
static TRY_PIN_SCRIPT_HANDLE: LazyLock<redis::Script> =
LazyLock::new(|| redis::Script::new(TRY_PIN_SCRIPT));
pub struct RedisPinStore {
conn: ConnectionManager,
op_timeout: Duration,
cardinality_cap: usize,
}
impl std::fmt::Debug for RedisPinStore {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("RedisPinStore")
.field("op_timeout", &self.op_timeout)
.field("cardinality_cap", &self.cardinality_cap)
.finish_non_exhaustive()
}
}
impl RedisPinStore {
pub async fn from_url(url: impl AsRef<str>) -> Result<Self> {
let client = redis::Client::open(url.as_ref())
.with_context(|| format!("invalid redis url `{}`", url.as_ref()))?;
let manager = ConnectionManager::new(client)
.await
.context("redis ConnectionManager init failed")?;
Ok(Self {
conn: manager,
op_timeout: REDIS_OP_TIMEOUT,
cardinality_cap: MAX_PINS_PER_TENANT,
})
}
pub fn with_op_timeout(mut self, timeout: Duration) -> Self {
self.op_timeout = timeout;
self
}
pub fn with_cardinality_cap(mut self, cap: usize) -> Self {
self.cardinality_cap = cap;
self
}
}
fn encode_value(revision_id: RevisionId, generation: u64, expires_at: SystemTime) -> String {
let secs = expires_at
.duration_since(UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0);
format!("{revision_id}|{generation}|{secs}")
}
fn decode_value(raw: &str) -> Option<(RevisionId, u64, u64)> {
let mut parts = raw.splitn(3, '|');
let rid_str = parts.next()?;
let gen_str = parts.next()?;
let exp_str = parts.next()?;
let rid = Ulid::from_string(rid_str).ok()?;
let generation = gen_str.parse::<u64>().ok()?;
let expires_at = exp_str.parse::<u64>().ok()?;
Some((RevisionId(rid), generation, expires_at))
}
fn redis_key(env_id: &str, deployment_id: DeploymentId, tenant: &str, hint: &str) -> String {
let mut k = scope_prefix(REDIS_KEY_PREFIX, env_id, deployment_id, tenant);
k.push(':');
k.push_str(&urlencoding::encode(hint));
k
}
fn redis_tracking_key(env_id: &str, deployment_id: DeploymentId, tenant: &str) -> String {
scope_prefix(REDIS_TRACKING_PREFIX, env_id, deployment_id, tenant)
}
fn scope_prefix(prefix: &str, env_id: &str, deployment_id: DeploymentId, tenant: &str) -> String {
format!(
"{prefix}:{env_id}:{deployment_id}:{}",
urlencoding::encode(tenant),
)
}
pub(crate) fn now_secs() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0)
}
fn scope_within_bounds(tenant: &str, hint: &str) -> bool {
tenant.len() <= MAX_SCOPE_BYTES && hint.len() <= MAX_SCOPE_BYTES
}
#[async_trait::async_trait]
impl RevisionPinStore for RedisPinStore {
async fn try_pin(
&self,
key: PinKey<'_>,
revision_id: RevisionId,
generation: u64,
ttl: Duration,
) -> PinOutcome {
if !scope_within_bounds(key.tenant, key.hint) {
tracing::warn!(
target: "greentic_start::revision_pin",
tenant_len = key.tenant.len(),
hint_len = key.hint.len(),
"rejecting pin: tenant/hint exceeds MAX_SCOPE_BYTES",
);
return PinOutcome::Skipped { revision_id };
}
let pin_key = redis_key(key.env_id, key.deployment_id, key.tenant, key.hint);
let set_key = redis_tracking_key(key.env_id, key.deployment_id, key.tenant);
let ttl_secs = ttl.as_secs().max(1);
let value = encode_value(revision_id, generation, SystemTime::now() + ttl);
let mut invocation = TRY_PIN_SCRIPT_HANDLE.prepare_invoke();
invocation
.key(&pin_key)
.key(&set_key)
.arg(generation.to_string())
.arg(&value)
.arg(ttl_secs.to_string())
.arg(self.cardinality_cap.to_string());
let mut conn = self.conn.clone();
let fut = invocation.invoke_async::<(i64, Option<String>)>(&mut conn);
match tokio::time::timeout(self.op_timeout, fut).await {
Ok(Ok((0, _))) => PinOutcome::Inserted { revision_id },
Ok(Ok((1, Some(existing)))) => match decode_value(&existing) {
Some((existing_rev, _, _)) => PinOutcome::Existing {
revision_id: existing_rev,
},
None => PinOutcome::Skipped { revision_id },
},
Ok(Ok((2, _))) => {
tracing::warn!(
target: "greentic_start::revision_pin",
cap = self.cardinality_cap,
pin_key = %pin_key,
"rejecting pin: cardinality cap reached for (env, deployment, tenant)",
);
PinOutcome::Skipped { revision_id }
}
Ok(Ok(other)) => {
tracing::warn!(
target: "greentic_start::revision_pin",
response = ?other,
"redis try_pin script returned unexpected shape; soft-falling through",
);
PinOutcome::Skipped { revision_id }
}
Ok(Err(err)) => {
tracing::warn!(
target: "greentic_start::revision_pin",
error = %err,
pin_key = %pin_key,
"redis try_pin script failed; soft-falling through to no-pin path",
);
PinOutcome::Skipped { revision_id }
}
Err(_) => {
tracing::warn!(
target: "greentic_start::revision_pin",
timeout_ms = self.op_timeout.as_millis() as u64,
pin_key = %pin_key,
"redis try_pin timed out; soft-falling through to no-pin path",
);
PinOutcome::Skipped { revision_id }
}
}
}
async fn lookup(&self, key: PinKey<'_>, current_generation: u64) -> Option<RevisionId> {
if !scope_within_bounds(key.tenant, key.hint) {
return None;
}
let pin_key = redis_key(key.env_id, key.deployment_id, key.tenant, key.hint);
let mut conn = self.conn.clone();
let mut get_cmd = redis::cmd("GET");
get_cmd.arg(&pin_key);
let get_fut = get_cmd.query_async::<Option<String>>(&mut conn);
let raw = match tokio::time::timeout(self.op_timeout, get_fut).await {
Ok(Ok(v)) => v?,
Ok(Err(err)) => {
tracing::warn!(
target: "greentic_start::revision_pin",
error = %err,
pin_key = %pin_key,
"redis GET failed; treating as cache miss",
);
return None;
}
Err(_) => {
tracing::warn!(
target: "greentic_start::revision_pin",
timeout_ms = self.op_timeout.as_millis() as u64,
pin_key = %pin_key,
"redis GET timed out; treating as cache miss",
);
return None;
}
};
let (revision_id, generation, expires_at) = decode_value(&raw)?;
if generation != current_generation || expires_at <= now_secs() {
let set_key = redis_tracking_key(key.env_id, key.deployment_id, key.tenant);
let _ = tokio::time::timeout(self.op_timeout, async {
let mut c = self.conn.clone();
let mut del_cmd = redis::cmd("DEL");
del_cmd.arg(&pin_key);
let _: redis::RedisResult<()> = del_cmd.query_async(&mut c).await;
let mut srem_cmd = redis::cmd("SREM");
srem_cmd.arg(&set_key).arg(&pin_key);
let _: redis::RedisResult<()> = srem_cmd.query_async(&mut c).await;
})
.await;
return None;
}
Some(revision_id)
}
}
#[cfg(test)]
mod tests {
use super::*;
use ulid::Ulid;
fn dep() -> DeploymentId {
DeploymentId::new()
}
fn rev() -> RevisionId {
RevisionId::new()
}
#[tokio::test]
async fn in_memory_inserts_then_returns_existing() {
let store = InMemoryPinStore::new();
let dep_id = dep();
let r = rev();
let out1 = store
.try_pin(
PinKey {
env_id: "local",
deployment_id: dep_id,
tenant: "t",
hint: "h",
},
r,
1,
Duration::from_secs(60),
)
.await;
assert_eq!(out1, PinOutcome::Inserted { revision_id: r });
let out2 = store
.try_pin(
PinKey {
env_id: "local",
deployment_id: dep_id,
tenant: "t",
hint: "h",
},
rev(),
1,
Duration::from_secs(60),
)
.await;
assert_eq!(out2, PinOutcome::Existing { revision_id: r });
}
#[tokio::test]
async fn in_memory_lookup_returns_pinned_until_generation_bumps() {
let store = InMemoryPinStore::new();
let dep_id = dep();
let r = rev();
store
.try_pin(
PinKey {
env_id: "local",
deployment_id: dep_id,
tenant: "t",
hint: "h",
},
r,
1,
Duration::from_secs(60),
)
.await;
assert_eq!(
store
.lookup(
PinKey {
env_id: "local",
deployment_id: dep_id,
tenant: "t",
hint: "h"
},
1
)
.await,
Some(r)
);
assert_eq!(
store
.lookup(
PinKey {
env_id: "local",
deployment_id: dep_id,
tenant: "t",
hint: "h"
},
2
)
.await,
None
);
assert_eq!(
store
.lookup(
PinKey {
env_id: "local",
deployment_id: dep_id,
tenant: "t",
hint: "h"
},
1
)
.await,
None
);
}
#[tokio::test]
async fn in_memory_lookup_drops_expired_entry() {
let store = InMemoryPinStore::new();
let dep_id = dep();
let r = rev();
store
.try_pin(
PinKey {
env_id: "local",
deployment_id: dep_id,
tenant: "t",
hint: "h",
},
r,
1,
Duration::from_millis(10),
)
.await;
tokio::time::sleep(Duration::from_millis(30)).await;
assert_eq!(
store
.lookup(
PinKey {
env_id: "local",
deployment_id: dep_id,
tenant: "t",
hint: "h"
},
1
)
.await,
None
);
}
#[tokio::test]
async fn in_memory_replaces_stale_generation_entry_on_try_pin() {
let store = InMemoryPinStore::new();
let dep_id = dep();
let r1 = rev();
store
.try_pin(
PinKey {
env_id: "local",
deployment_id: dep_id,
tenant: "t",
hint: "h",
},
r1,
1,
Duration::from_secs(60),
)
.await;
let r2 = rev();
let out = store
.try_pin(
PinKey {
env_id: "local",
deployment_id: dep_id,
tenant: "t",
hint: "h",
},
r2,
2,
Duration::from_secs(60),
)
.await;
assert_eq!(out, PinOutcome::Inserted { revision_id: r2 });
assert_eq!(
store
.lookup(
PinKey {
env_id: "local",
deployment_id: dep_id,
tenant: "t",
hint: "h"
},
2
)
.await,
Some(r2)
);
}
#[tokio::test]
async fn in_memory_bounded_under_rotating_hints() {
let store = InMemoryPinStore::new();
let dep_id = dep();
for i in 0..(MAX_PINS * 2) {
let hint = format!("sess-{i}");
store
.try_pin(
PinKey {
env_id: "local",
deployment_id: dep_id,
tenant: "t",
hint: &hint,
},
rev(),
1,
Duration::from_secs(60),
)
.await;
}
assert!(store.len() <= MAX_PINS);
}
#[tokio::test]
async fn in_memory_distinct_keys_isolated() {
let store = InMemoryPinStore::new();
let dep_a = dep();
let dep_b = dep();
let r_a = rev();
let r_b = rev();
store
.try_pin(
PinKey {
env_id: "local",
deployment_id: dep_a,
tenant: "t",
hint: "h",
},
r_a,
1,
Duration::from_secs(60),
)
.await;
store
.try_pin(
PinKey {
env_id: "local",
deployment_id: dep_b,
tenant: "t",
hint: "h",
},
r_b,
1,
Duration::from_secs(60),
)
.await;
assert_eq!(
store
.lookup(
PinKey {
env_id: "local",
deployment_id: dep_a,
tenant: "t",
hint: "h"
},
1
)
.await,
Some(r_a)
);
assert_eq!(
store
.lookup(
PinKey {
env_id: "local",
deployment_id: dep_b,
tenant: "t",
hint: "h"
},
1
)
.await,
Some(r_b)
);
}
#[test]
fn encode_decode_roundtrip() {
let r = RevisionId(Ulid::new());
let exp = SystemTime::now() + Duration::from_secs(60);
let encoded = encode_value(r, 7, exp);
let (decoded_r, decoded_g, decoded_exp) = decode_value(&encoded).unwrap();
assert_eq!(decoded_r, r);
assert_eq!(decoded_g, 7);
let exp_secs = exp.duration_since(UNIX_EPOCH).unwrap().as_secs();
assert!(decoded_exp.abs_diff(exp_secs) <= 1);
}
#[test]
fn decode_rejects_malformed() {
assert!(decode_value("").is_none());
assert!(decode_value("not-a-ulid|1|2").is_none());
assert!(decode_value(&format!("{}|notanum|2", Ulid::new())).is_none());
assert!(decode_value(&format!("{}|1|notanum", Ulid::new())).is_none());
assert!(decode_value(&format!("{}|1", Ulid::new())).is_none());
}
#[test]
fn redis_key_matches_plan_format() {
let id = DeploymentId(Ulid::from_string("01F8MECHZX3TBDSZ7XR8KZ9V8K").unwrap());
let key = redis_key("local", id, "tenant-a", "session-x");
assert_eq!(
key,
"gt:rev_pin:local:01F8MECHZX3TBDSZ7XR8KZ9V8K:tenant-a:session-x"
);
}
#[test]
fn redis_key_is_injective_across_tenant_and_hint_with_colons() {
let id = DeploymentId(Ulid::from_string("01F8MECHZX3TBDSZ7XR8KZ9V8K").unwrap());
let k1 = redis_key("local", id, "a", "b:c");
let k2 = redis_key("local", id, "a:b", "c");
assert_ne!(k1, k2);
assert_eq!(k1, "gt:rev_pin:local:01F8MECHZX3TBDSZ7XR8KZ9V8K:a:b%3Ac");
assert_eq!(k2, "gt:rev_pin:local:01F8MECHZX3TBDSZ7XR8KZ9V8K:a%3Ab:c");
}
#[test]
fn redis_tracking_key_scopes_by_env_deployment_tenant() {
let id = DeploymentId(Ulid::from_string("01F8MECHZX3TBDSZ7XR8KZ9V8K").unwrap());
assert_eq!(
redis_tracking_key("local", id, "tenant-a"),
"gt:rev_pin_set:local:01F8MECHZX3TBDSZ7XR8KZ9V8K:tenant-a"
);
}
fn redis_url_or_skip() -> Option<String> {
match std::env::var("GREENTIC_TEST_REDIS_URL") {
Ok(url) if !url.is_empty() => Some(url),
_ => {
eprintln!("skipping: GREENTIC_TEST_REDIS_URL not set");
None
}
}
}
fn unique_hint(label: &str) -> String {
format!("test-{label}-{}", Ulid::new())
}
#[tokio::test]
async fn redis_inserts_then_returns_existing() {
let Some(url) = redis_url_or_skip() else {
return;
};
let store = RedisPinStore::from_url(&url).await.expect("redis open");
let dep_id = dep();
let hint = unique_hint("insert");
let r = rev();
let out1 = store
.try_pin(
PinKey {
env_id: "local",
deployment_id: dep_id,
tenant: "t",
hint: &hint,
},
r,
1,
Duration::from_secs(60),
)
.await;
assert_eq!(out1, PinOutcome::Inserted { revision_id: r });
let out2 = store
.try_pin(
PinKey {
env_id: "local",
deployment_id: dep_id,
tenant: "t",
hint: &hint,
},
rev(),
1,
Duration::from_secs(60),
)
.await;
assert_eq!(out2, PinOutcome::Existing { revision_id: r });
}
#[tokio::test]
async fn redis_lookup_drops_stale_generation() {
let Some(url) = redis_url_or_skip() else {
return;
};
let store = RedisPinStore::from_url(&url).await.expect("redis open");
let dep_id = dep();
let hint = unique_hint("staleness");
let r = rev();
store
.try_pin(
PinKey {
env_id: "local",
deployment_id: dep_id,
tenant: "t",
hint: &hint,
},
r,
1,
Duration::from_secs(60),
)
.await;
assert_eq!(
store
.lookup(
PinKey {
env_id: "local",
deployment_id: dep_id,
tenant: "t",
hint: &hint
},
1
)
.await,
Some(r)
);
assert_eq!(
store
.lookup(
PinKey {
env_id: "local",
deployment_id: dep_id,
tenant: "t",
hint: &hint
},
2
)
.await,
None
);
assert_eq!(
store
.lookup(
PinKey {
env_id: "local",
deployment_id: dep_id,
tenant: "t",
hint: &hint
},
1
)
.await,
None
);
}
#[tokio::test]
async fn redis_ttl_expires() {
let Some(url) = redis_url_or_skip() else {
return;
};
let store = RedisPinStore::from_url(&url).await.expect("redis open");
let dep_id = dep();
let hint = unique_hint("ttl");
let r = rev();
store
.try_pin(
PinKey {
env_id: "local",
deployment_id: dep_id,
tenant: "t",
hint: &hint,
},
r,
1,
Duration::from_secs(1),
)
.await;
tokio::time::sleep(Duration::from_millis(1500)).await;
assert_eq!(
store
.lookup(
PinKey {
env_id: "local",
deployment_id: dep_id,
tenant: "t",
hint: &hint
},
1
)
.await,
None
);
}
#[tokio::test]
async fn redis_replaces_stale_generation_entry_on_try_pin() {
let Some(url) = redis_url_or_skip() else {
return;
};
let store = RedisPinStore::from_url(&url).await.expect("redis open");
let dep_id = dep();
let hint = unique_hint("stale-replace");
let r1 = rev();
store
.try_pin(
PinKey {
env_id: "local",
deployment_id: dep_id,
tenant: "t",
hint: &hint,
},
r1,
1,
Duration::from_secs(60),
)
.await;
assert_eq!(
store
.lookup(
PinKey {
env_id: "local",
deployment_id: dep_id,
tenant: "t",
hint: &hint
},
1
)
.await,
Some(r1)
);
let r2 = rev();
let out = store
.try_pin(
PinKey {
env_id: "local",
deployment_id: dep_id,
tenant: "t",
hint: &hint,
},
r2,
2,
Duration::from_secs(60),
)
.await;
assert_eq!(out, PinOutcome::Inserted { revision_id: r2 });
assert_eq!(
store
.lookup(
PinKey {
env_id: "local",
deployment_id: dep_id,
tenant: "t",
hint: &hint
},
2
)
.await,
Some(r2)
);
}
#[tokio::test]
async fn redis_concurrent_repinners_converge_on_one_winner() {
let Some(url) = redis_url_or_skip() else {
return;
};
let store = std::sync::Arc::new(RedisPinStore::from_url(&url).await.expect("redis open"));
let dep_id = dep();
let hint = unique_hint("concurrent");
let r_stale = rev();
store
.try_pin(
PinKey {
env_id: "local",
deployment_id: dep_id,
tenant: "t",
hint: &hint,
},
r_stale,
1,
Duration::from_secs(60),
)
.await;
let mut handles = Vec::new();
for _ in 0..8 {
let store = std::sync::Arc::clone(&store);
let hint = hint.clone();
handles.push(tokio::spawn(async move {
let r = rev();
let out = store
.try_pin(
PinKey {
env_id: "local",
deployment_id: dep_id,
tenant: "t",
hint: &hint,
},
r,
2,
Duration::from_secs(60),
)
.await;
(r, out)
}));
}
let mut inserted_count = 0;
let mut existing_revs = std::collections::HashSet::new();
for h in handles {
let (own_r, out) = h.await.unwrap();
match out {
PinOutcome::Inserted { revision_id } => {
assert_eq!(revision_id, own_r);
inserted_count += 1;
}
PinOutcome::Existing { revision_id } => {
existing_revs.insert(revision_id);
}
PinOutcome::Skipped { .. } => {
panic!("no soft-fail expected under non-overloaded Redis");
}
}
}
assert_eq!(inserted_count, 1, "exactly one writer should insert");
assert!(
existing_revs.len() <= 1,
"racing writers must observe a single winning revision, saw {existing_revs:?}",
);
let after = store
.lookup(
PinKey {
env_id: "local",
deployment_id: dep_id,
tenant: "t",
hint: &hint,
},
2,
)
.await;
assert!(after.is_some(), "winner must survive the race");
}
#[tokio::test]
async fn redis_cardinality_cap_bounds_rotating_hints() {
let Some(url) = redis_url_or_skip() else {
return;
};
let store = RedisPinStore::from_url(&url)
.await
.expect("redis open")
.with_cardinality_cap(4);
let dep_id = dep();
let tenant = format!("tenant-card-{}", Ulid::new());
for i in 0..4 {
let hint = format!("h-{i}");
store
.try_pin(
PinKey {
env_id: "local",
deployment_id: dep_id,
tenant: &tenant,
hint: &hint,
},
rev(),
1,
Duration::from_secs(60),
)
.await;
}
let r5 = rev();
let out = store
.try_pin(
PinKey {
env_id: "local",
deployment_id: dep_id,
tenant: &tenant,
hint: "h-5",
},
r5,
1,
Duration::from_secs(60),
)
.await;
assert_eq!(out, PinOutcome::Skipped { revision_id: r5 });
assert_eq!(
store
.lookup(
PinKey {
env_id: "local",
deployment_id: dep_id,
tenant: &tenant,
hint: "h-5"
},
1
)
.await,
None,
"5th distinct hint must not be persisted past the cap",
);
}
#[tokio::test]
async fn redis_tenant_and_hint_scopes_do_not_collide() {
let Some(url) = redis_url_or_skip() else {
return;
};
let store = RedisPinStore::from_url(&url).await.expect("redis open");
let dep_id = dep();
let suffix = Ulid::new().to_string();
let tenant_a = format!("a-{suffix}");
let tenant_ab = format!("a-{suffix}:b");
let hint_bc = "b:c";
let hint_c = "c";
let r1 = rev();
let r2 = rev();
store
.try_pin(
PinKey {
env_id: "local",
deployment_id: dep_id,
tenant: &tenant_a,
hint: hint_bc,
},
r1,
1,
Duration::from_secs(60),
)
.await;
store
.try_pin(
PinKey {
env_id: "local",
deployment_id: dep_id,
tenant: &tenant_ab,
hint: hint_c,
},
r2,
1,
Duration::from_secs(60),
)
.await;
assert_eq!(
store
.lookup(
PinKey {
env_id: "local",
deployment_id: dep_id,
tenant: &tenant_a,
hint: hint_bc
},
1
)
.await,
Some(r1),
);
assert_eq!(
store
.lookup(
PinKey {
env_id: "local",
deployment_id: dep_id,
tenant: &tenant_ab,
hint: hint_c
},
1
)
.await,
Some(r2),
);
}
}