use sqlx::SqlitePool;
use crate::context::embedding::active_embedding_profile;
use super::schema::read_meta;
const RECENT_EMBEDDING_FALLBACK_WINDOW_MS: i64 = 10 * 60 * 1000;
#[derive(Debug, Clone, serde::Serialize)]
#[serde(rename_all = "camelCase")]
pub struct EmbeddingDiagnostics {
pub active_profile: String,
pub index_profile: Option<String>,
pub profile_match: bool,
pub degraded: bool,
pub degraded_reason: Option<String>,
pub vector_lane_available: bool,
}
fn profile_dim(profile: &str) -> Option<u32> {
profile.rsplit(':').next().and_then(|s| s.parse().ok())
}
fn recent_embedding_fallback_from_events(
events: &[crate::activity_stream::ActivityEvent],
now_ms: i64,
) -> bool {
use crate::activity_stream::ActivityPayload;
events
.iter()
.find_map(|event| match event.payload {
ActivityPayload::EmbeddingFallback { .. } => {
Some(now_ms.saturating_sub(event.ts_ms) <= RECENT_EMBEDDING_FALLBACK_WINDOW_MS)
}
ActivityPayload::RetrievalEmbedding { .. } => Some(false),
_ => None,
})
.unwrap_or(false)
}
fn recent_embedding_fallback() -> bool {
use crate::activity_stream::tail;
use std::time::{SystemTime, UNIX_EPOCH};
let now_ms = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map_or(0, |d| i64::try_from(d.as_millis()).unwrap_or(i64::MAX));
recent_embedding_fallback_from_events(&tail(16), now_ms)
}
fn recent_embedding_fallback_strict_from_events(
events: &[crate::activity_stream::ActivityEvent],
now_ms: i64,
) -> bool {
use crate::activity_stream::ActivityPayload;
events.iter().any(|event| {
matches!(event.payload, ActivityPayload::EmbeddingFallback { .. })
&& now_ms.saturating_sub(event.ts_ms) <= RECENT_EMBEDDING_FALLBACK_WINDOW_MS
})
}
fn recent_embedding_fallback_strict() -> bool {
use crate::activity_stream::{MAX_EVENTS, tail};
use std::time::{SystemTime, UNIX_EPOCH};
let now_ms = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map_or(0, |d| i64::try_from(d.as_millis()).unwrap_or(i64::MAX));
recent_embedding_fallback_strict_from_events(&tail(MAX_EVENTS), now_ms)
}
fn is_persistent_failure_reason(reason: &str) -> bool {
matches!(reason, "cap" | "forbidden" | "unauthorized")
}
const SUSTAINED_TRANSIENT_FALLBACK_THRESHOLD: usize = 5;
fn cloud_embed_outage_active_from_events(
events: &[crate::activity_stream::ActivityEvent],
now_ms: i64,
) -> bool {
use crate::activity_stream::ActivityPayload;
let mut transient = 0usize;
for event in events {
let ActivityPayload::EmbeddingFallback { reason } = &event.payload else {
continue;
};
if now_ms.saturating_sub(event.ts_ms) > RECENT_EMBEDDING_FALLBACK_WINDOW_MS {
continue;
}
if is_persistent_failure_reason(reason) {
return true;
}
transient += 1;
}
transient >= SUSTAINED_TRANSIENT_FALLBACK_THRESHOLD
}
pub(crate) fn cloud_embed_outage_active() -> bool {
use crate::activity_stream::{MAX_EVENTS, tail};
use std::sync::atomic::{AtomicBool, AtomicI64, Ordering};
use std::time::{SystemTime, UNIX_EPOCH};
static CACHED_AT_MS: AtomicI64 = AtomicI64::new(0);
static CACHED_VALUE: AtomicBool = AtomicBool::new(false);
const CACHE_TTL_MS: i64 = 3_000;
let now_ms = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map_or(0, |d| i64::try_from(d.as_millis()).unwrap_or(i64::MAX));
let cached_at = CACHED_AT_MS.load(Ordering::Relaxed);
if cached_at != 0 && now_ms.saturating_sub(cached_at) < CACHE_TTL_MS {
return CACHED_VALUE.load(Ordering::Relaxed);
}
let active = cloud_embed_outage_active_from_events(&tail(MAX_EVENTS), now_ms);
CACHED_VALUE.store(active, Ordering::Relaxed);
CACHED_AT_MS.store(now_ms, Ordering::Relaxed);
active
}
pub async fn gather_embedding_diagnostics(index_pool: &SqlitePool) -> EmbeddingDiagnostics {
let active = active_embedding_profile().await;
let index = read_meta(index_pool, "embedding_profile")
.await
.ok()
.flatten();
let profile_match = index.as_deref() == Some(active.as_str());
let Some(index_profile) = index else {
return EmbeddingDiagnostics {
active_profile: active,
index_profile: None,
profile_match: false,
degraded: false,
degraded_reason: Some("index_not_built".to_owned()),
vector_lane_available: false,
};
};
if profile_match {
return EmbeddingDiagnostics {
active_profile: active,
index_profile: Some(index_profile),
profile_match: true,
degraded: false,
degraded_reason: None,
vector_lane_available: true,
};
}
let provider_fallback = active.starts_with("sha1:")
&& (index_profile.starts_with("cloud:") || index_profile.starts_with("byok:"));
if provider_fallback {
return EmbeddingDiagnostics {
active_profile: active,
index_profile: Some(index_profile),
profile_match: false,
degraded: true,
degraded_reason: Some("provider_fallback".to_owned()),
vector_lane_available: false,
};
}
let active_dim = profile_dim(&active);
let index_dim = profile_dim(&index_profile);
if let (Some(a), Some(i)) = (active_dim, index_dim)
&& a != i
{
return EmbeddingDiagnostics {
active_profile: active,
index_profile: Some(index_profile),
profile_match: false,
degraded: true,
degraded_reason: Some("dimension_mismatch".to_owned()),
vector_lane_available: false,
};
}
EmbeddingDiagnostics {
active_profile: active,
index_profile: Some(index_profile),
profile_match: false,
degraded: true,
degraded_reason: Some("profile_mismatch".to_owned()),
vector_lane_available: true,
}
}
pub async fn gather_embedding_diagnostics_with_activity(
index_pool: &SqlitePool,
) -> EmbeddingDiagnostics {
let mut diag = gather_embedding_diagnostics(index_pool).await;
if !diag.degraded && diag.index_profile.is_some() && recent_embedding_fallback() {
diag.degraded = true;
diag.degraded_reason = Some("provider_fallback".to_owned());
diag.vector_lane_available = false;
}
diag
}
pub fn embedding_provider_recently_down() -> bool {
recent_embedding_fallback_strict()
}
fn freshness_expected_profile(
active_profile: &str,
persisted_profile: Option<&str>,
provider_recently_down: bool,
) -> String {
if provider_recently_down
&& !active_profile.starts_with("sha1:")
&& let Some(persisted) = persisted_profile
&& persisted.starts_with("sha1:")
{
return persisted.to_owned();
}
active_profile.to_owned()
}
pub async fn effective_embedding_profile_for_freshness(
index_pool: &SqlitePool,
active_profile: &str,
) -> String {
if active_profile.starts_with("sha1:") || !embedding_provider_recently_down() {
return active_profile.to_owned();
}
let persisted = read_meta(index_pool, "embedding_profile")
.await
.ok()
.flatten();
freshness_expected_profile(active_profile, persisted.as_deref(), true)
}
#[cfg(test)]
mod tests {
use crate::activity_stream::{ActivityEvent, ActivityPayload};
use super::super::schema::{open_pool_at, write_meta};
use super::*;
use tempfile::TempDir;
async fn fresh_pool(tmp: &TempDir) -> SqlitePool {
let path = tmp.path().join("diag-idx.db");
open_pool_at(&path).await.expect("open_pool_at")
}
#[test]
fn profile_dim_parses_trailing_segment() {
assert_eq!(profile_dim("cloud:text-embedding-3-small:1536"), Some(1536));
assert_eq!(profile_dim("byok:api.host.com:my-model:768"), Some(768));
assert_eq!(profile_dim("sha1:local:128"), Some(128));
assert_eq!(profile_dim("garbage-no-colon"), None);
assert_eq!(profile_dim("cloud:model:not-a-number"), None);
}
#[test]
fn recent_embedding_fallback_uses_latest_fresh_embedding_event() {
let now = 1_000_000;
let stale_fallback = ActivityEvent {
ts_ms: now - RECENT_EMBEDDING_FALLBACK_WINDOW_MS - 1,
payload: ActivityPayload::EmbeddingFallback {
reason: "network".to_owned(),
},
};
let fresh_fallback = ActivityEvent {
ts_ms: now - 1_000,
payload: ActivityPayload::EmbeddingFallback {
reason: "network".to_owned(),
},
};
let fresh_success = ActivityEvent {
ts_ms: now,
payload: ActivityPayload::RetrievalEmbedding {
hits: 3,
took_ms: 12,
},
};
assert!(recent_embedding_fallback_from_events(
std::slice::from_ref(&fresh_fallback),
now
));
assert!(!recent_embedding_fallback_from_events(
std::slice::from_ref(&stale_fallback),
now
));
assert!(!recent_embedding_fallback_from_events(
&[fresh_success, fresh_fallback],
now
));
}
fn fallback(ts_ms: i64, reason: &str) -> ActivityEvent {
ActivityEvent {
ts_ms,
payload: ActivityPayload::EmbeddingFallback {
reason: reason.to_owned(),
},
}
}
#[test]
fn cloud_outage_persistent_failure_trips_immediately() {
let now = 1_000_000;
assert!(cloud_embed_outage_active_from_events(
&[fallback(now - 1_000, "cap")],
now
));
assert!(cloud_embed_outage_active_from_events(
&[fallback(now - 1_000, "unauthorized")],
now
));
}
#[test]
fn cloud_outage_single_transient_blip_does_not_trip() {
let now = 1_000_000;
let few: Vec<_> = (0..SUSTAINED_TRANSIENT_FALLBACK_THRESHOLD - 1)
.map(|i| fallback(now - 1_000 - i as i64, "timeout"))
.collect();
assert!(!cloud_embed_outage_active_from_events(&few, now));
}
#[test]
fn cloud_outage_sustained_transient_run_trips() {
let now = 1_000_000;
let many: Vec<_> = (0..SUSTAINED_TRANSIENT_FALLBACK_THRESHOLD)
.map(|i| fallback(now - 1_000 - i as i64, "timeout"))
.collect();
assert!(cloud_embed_outage_active_from_events(&many, now));
}
#[test]
fn cloud_outage_ignores_events_outside_window() {
let now = 1_000_000;
let stale: Vec<_> = (0..SUSTAINED_TRANSIENT_FALLBACK_THRESHOLD + 3)
.map(|i| {
fallback(
now - RECENT_EMBEDDING_FALLBACK_WINDOW_MS - 1 - i as i64,
"timeout",
)
})
.collect();
assert!(!cloud_embed_outage_active_from_events(&stale, now));
}
#[test]
fn strict_fallback_is_not_masked_by_trailing_retrieval() {
let now = 1_000_000;
let fresh_fallback = ActivityEvent {
ts_ms: now - 1_000,
payload: ActivityPayload::EmbeddingFallback {
reason: "network".to_owned(),
},
};
let newer_retrieval = ActivityEvent {
ts_ms: now,
payload: ActivityPayload::RetrievalEmbedding {
hits: 3,
took_ms: 12,
},
};
let events = [newer_retrieval, fresh_fallback];
assert!(
!recent_embedding_fallback_from_events(&events, now),
"latest-event check is masked by the trailing retrieval (documents the bug)"
);
assert!(
recent_embedding_fallback_strict_from_events(&events, now),
"strict check must still see the fresh fallback"
);
let stale_fallback = ActivityEvent {
ts_ms: now - RECENT_EMBEDDING_FALLBACK_WINDOW_MS - 1,
payload: ActivityPayload::EmbeddingFallback {
reason: "network".to_owned(),
},
};
assert!(!recent_embedding_fallback_strict_from_events(
std::slice::from_ref(&stale_fallback),
now
));
}
#[test]
fn freshness_expected_profile_relaxes_only_for_remote_active_over_sha1_index() {
let cloud = "cloud:text-embedding-3-small:1536";
let byok = "byok:host:m:768";
let sha1 = "sha1:local:128";
assert_eq!(freshness_expected_profile(cloud, Some(sha1), true), sha1);
assert_eq!(freshness_expected_profile(byok, Some(sha1), true), sha1);
assert_eq!(freshness_expected_profile(cloud, Some(sha1), false), cloud);
assert_eq!(freshness_expected_profile(sha1, Some(sha1), true), sha1);
assert_eq!(freshness_expected_profile(cloud, Some(cloud), true), cloud);
assert_eq!(freshness_expected_profile(cloud, None, true), cloud);
}
#[tokio::test]
async fn persisted_sha1_meta_feeds_freshness_relaxation() {
let tmp = TempDir::new().unwrap();
let pool = fresh_pool(&tmp).await;
write_meta(&pool, "embedding_profile", "sha1:local:128")
.await
.unwrap();
let persisted = read_meta(&pool, "embedding_profile")
.await
.unwrap()
.unwrap();
assert_eq!(
freshness_expected_profile("cloud:m:1536", Some(&persisted), true),
"sha1:local:128"
);
}
#[tokio::test]
async fn equal_profiles_are_not_degraded() {
let tmp = TempDir::new().unwrap();
let pool = fresh_pool(&tmp).await;
let active = active_embedding_profile().await;
write_meta(&pool, "embedding_profile", &active)
.await
.unwrap();
let d = gather_embedding_diagnostics(&pool).await;
assert!(d.profile_match, "identical profiles must match");
assert!(!d.degraded, "matched lane must not be degraded");
assert_eq!(d.degraded_reason, None);
assert!(d.vector_lane_available);
assert_eq!(d.index_profile.as_deref(), Some(active.as_str()));
}
#[tokio::test]
async fn sha1_active_vs_cloud_index_is_provider_fallback() {
let tmp = TempDir::new().unwrap();
let pool = fresh_pool(&tmp).await;
write_meta(
&pool,
"embedding_profile",
"cloud:text-embedding-3-small:1536",
)
.await
.unwrap();
let d = gather_embedding_diagnostics(&pool).await;
if d.active_profile.starts_with("sha1:") {
assert!(d.degraded, "semantic corpus + SHA1 active is degraded");
assert_eq!(d.degraded_reason.as_deref(), Some("provider_fallback"));
assert!(
!d.vector_lane_available,
"lexical query vs semantic corpus = dead lane"
);
assert!(!d.profile_match);
}
}
#[tokio::test]
async fn cloud_dim_mismatch_is_dimension_mismatch() {
let tmp = TempDir::new().unwrap();
let pool = fresh_pool(&tmp).await;
write_meta(&pool, "embedding_profile", "cloud:model-x:768")
.await
.unwrap();
let d = gather_embedding_diagnostics(&pool).await;
if d.active_profile.starts_with("cloud:") && profile_dim(&d.active_profile) == Some(1536) {
assert!(d.degraded);
assert_eq!(d.degraded_reason.as_deref(), Some("dimension_mismatch"));
assert!(!d.vector_lane_available);
assert!(!d.profile_match);
}
}
#[tokio::test]
async fn same_dim_different_model_is_profile_mismatch() {
let tmp = TempDir::new().unwrap();
let pool = fresh_pool(&tmp).await;
write_meta(&pool, "embedding_profile", "cloud:model-a:1536")
.await
.unwrap();
let d = gather_embedding_diagnostics(&pool).await;
if d.active_profile.starts_with("cloud:")
&& profile_dim(&d.active_profile) == Some(1536)
&& d.active_profile != "cloud:model-a:1536"
{
assert!(d.degraded, "cross-model corpus is degraded");
assert_eq!(d.degraded_reason.as_deref(), Some("profile_mismatch"));
assert!(
d.vector_lane_available,
"same dim → cosine still runs, just weaker"
);
assert!(!d.profile_match);
}
}
#[tokio::test]
async fn missing_index_profile_is_index_not_built() {
let tmp = TempDir::new().unwrap();
let pool = fresh_pool(&tmp).await;
let d = gather_embedding_diagnostics(&pool).await;
assert_eq!(d.index_profile, None);
assert!(!d.profile_match, "no corpus profile cannot match");
assert!(!d.degraded, "unbuilt index is not a regression");
assert_eq!(d.degraded_reason.as_deref(), Some("index_not_built"));
assert!(!d.vector_lane_available);
}
}