use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
use std::time::{SystemTime, UNIX_EPOCH};
use parking_lot;
use arrow_array::{Array, RecordBatch};
use hirn_core::MemoryId;
use hirn_core::embed::Embedder;
use hirn_core::prospective::ProspectiveImplication;
use hirn_core::svo_event::SvoEvent;
use hirn_core::types::Namespace;
use hirn_storage::PhysicalStore;
use hirn_storage::store::VectorSearchOptions;
use tracing;
#[derive(Debug, Clone)]
pub struct RpeResult {
pub score: f32,
pub max_similarity: f32,
pub is_fast_path: bool,
}
pub type RunningRpeStats = hirn_core::WelfordStats;
#[derive(Debug, Clone, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
pub struct RpePartitionKey {
realm: String,
namespace: Namespace,
model_id: String,
layer: hirn_core::types::Layer,
}
impl RpePartitionKey {
pub fn new(
realm: impl Into<String>,
namespace: Namespace,
model_id: impl Into<String>,
layer: hirn_core::types::Layer,
) -> Self {
Self {
realm: realm.into(),
namespace,
model_id: model_id.into(),
layer,
}
}
pub fn realm(&self) -> &str {
&self.realm
}
pub fn namespace(&self) -> Namespace {
self.namespace
}
pub fn model_id(&self) -> &str {
&self.model_id
}
#[cfg(test)]
pub fn layer(&self) -> hirn_core::types::Layer {
self.layer
}
}
pub const RPE_THRESHOLD_NEAR_EPSILON: f32 = 0.05;
pub fn rpe_threshold_band(score: f32, threshold: f32) -> &'static str {
if (score - threshold).abs() <= RPE_THRESHOLD_NEAR_EPSILON {
"near"
} else {
"far"
}
}
pub const RPE_CIRCUIT_FAILURE_THRESHOLD: u32 = 5;
pub const RPE_CIRCUIT_OPEN_SECS: u64 = 30;
#[derive(Debug, Default)]
pub struct RpeCircuitBreaker {
consecutive_failures: AtomicU32,
open_until_unix_secs: AtomicU64,
}
impl RpeCircuitBreaker {
pub fn new() -> Self {
Self::default()
}
pub fn is_open(&self) -> bool {
let until = self.open_until_unix_secs.load(Ordering::Relaxed);
if until == 0 {
return false;
}
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
now < until
}
pub fn record_success(&self) {
self.consecutive_failures.store(0, Ordering::Relaxed);
self.open_until_unix_secs.store(0, Ordering::Relaxed);
}
pub fn record_failure(&self, open_secs: u64) {
let failures = self
.consecutive_failures
.fetch_add(1, Ordering::Relaxed)
.saturating_add(1);
if failures >= RPE_CIRCUIT_FAILURE_THRESHOLD {
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
let target = now.saturating_add(open_secs);
let _ = self.open_until_unix_secs.fetch_update(
Ordering::Relaxed,
Ordering::Relaxed,
|current| if target > current { Some(target) } else { None },
);
}
}
}
pub async fn compute_rpe(
storage: &dyn PhysicalStore,
embedding: &[f32],
threshold: f32,
search_limit: usize,
stats: &mut RunningRpeStats,
circuit_breaker: &RpeCircuitBreaker,
) -> RpeResult {
if circuit_breaker.is_open() {
tracing::warn!("RPE circuit open — skipping vector search, using fast-path default");
return RpeResult {
score: 0.0,
max_similarity: 1.0,
is_fast_path: true,
};
}
let datasets = ["episodic", "semantic", "procedural"];
let mut max_sim: f32 = 0.0;
let mut any_search_error = false;
for dataset in &datasets {
let exists = matches!(storage.exists(dataset).await, Ok(true));
if !exists {
continue;
}
let opts = VectorSearchOptions {
query: embedding.to_vec(),
column: "embedding".into(),
limit: search_limit,
..Default::default()
};
match storage.vector_search(dataset, opts).await {
Ok(batches) => {
for batch in &batches {
if let Some(dist_col) = batch.column_by_name("_distance") {
if let Some(dists) = dist_col
.as_any()
.downcast_ref::<arrow_array::Float32Array>()
{
for j in 0..dists.len() {
if !dists.is_null(j) {
let dist = dists.value(j);
let sim = 1.0 / (1.0 + dist);
max_sim = max_sim.max(sim);
}
}
}
}
}
}
Err(e) => {
tracing::debug!(dataset, error = %e, "RPE vector search failed");
any_search_error = true;
}
}
}
if any_search_error {
circuit_breaker.record_failure(RPE_CIRCUIT_OPEN_SECS);
} else {
circuit_breaker.record_success();
}
let distance = 1.0 - max_sim;
let z_score = stats.z_score(distance as f64) as f32;
stats.update(distance as f64);
let rpe = (distance * (1.0 + z_score)).clamp(0.0, 2.0);
RpeResult {
score: rpe,
max_similarity: max_sim,
is_fast_path: rpe < threshold,
}
}
pub struct BatchSearchResult {
pub max_sims: Vec<f32>,
pub had_storage_error: bool,
}
pub async fn batch_vector_search_max_sim(
storage: &dyn PhysicalStore,
embeddings: &[Vec<f32>],
search_limit: usize,
) -> Option<BatchSearchResult> {
if embeddings.is_empty() {
return None;
}
let n = embeddings.len();
let mut max_sims = vec![0.0_f32; n];
let mut had_storage_error = false;
let datasets = ["episodic", "semantic", "procedural"];
for dataset in &datasets {
let Ok(true) = storage.exists(dataset).await else {
continue;
};
let queries: Vec<VectorSearchOptions> = embeddings
.iter()
.map(|emb| VectorSearchOptions {
query: emb.clone(),
column: "embedding".into(),
limit: search_limit,
..Default::default()
})
.collect();
match storage.vector_search_many(dataset, queries).await {
Ok(results) => {
for (i, batches) in results.into_iter().enumerate() {
for batch in &batches {
if let Some(dist_col) = batch.column_by_name("_distance") {
if let Some(dists) =
dist_col.as_any().downcast_ref::<arrow_array::Float32Array>()
{
for j in 0..dists.len() {
if !dists.is_null(j) {
let sim = 1.0 / (1.0 + dists.value(j));
max_sims[i] = max_sims[i].max(sim);
}
}
}
}
}
}
}
Err(e) => {
tracing::debug!(dataset, error = %e, "RPE batch vector search failed");
had_storage_error = true;
}
}
}
Some(BatchSearchResult {
max_sims,
had_storage_error,
})
}
pub async fn prepare_prospective_implications_batch(
embedder: &dyn Embedder,
source_id: MemoryId,
content: &str,
num_questions: usize,
timeout_secs: u64,
templates: &[String],
namespace: &str,
) -> Option<RecordBatch> {
let words: Vec<&str> = content.split_whitespace().collect();
if words.len() < 3 {
return None;
}
let truncated = hirn_core::text_util::truncate_at_word_boundary(content, 80);
let questions: Vec<String> = templates
.iter()
.take(num_questions)
.map(|t| t.replace("{content}", &truncated))
.collect();
if questions.is_empty() {
return None;
}
let refs: Vec<&str> = questions.iter().map(|q| q.as_str()).collect();
let embeddings = match tokio::time::timeout(
std::time::Duration::from_secs(timeout_secs),
embedder.embed(&refs),
)
.await
{
Ok(Ok(embs)) => embs,
Ok(Err(e)) => {
tracing::warn!(error = %e, "Prospective embedding failed");
return None;
}
Err(_) => {
tracing::warn!(timeout_secs, "Prospective embedding timed out");
return None;
}
};
if embeddings.is_empty() {
return None;
}
if embeddings.len() != questions.len() {
tracing::warn!(
expected = questions.len(),
actual = embeddings.len(),
"Prospective embedding count mismatch"
);
}
let count = questions.len().min(embeddings.len());
if count == 0 {
return None;
}
let records: Vec<ProspectiveImplication> = questions
.into_iter()
.take(count)
.map(|question| ProspectiveImplication::new(source_id, question))
.collect();
let embedding_values: Vec<Option<Vec<f32>>> = embeddings
.into_iter()
.take(count)
.map(|embedding| Some(embedding.vector))
.collect();
let embedding_dims = embedding_values
.iter()
.find_map(|embedding| embedding.as_ref().map(Vec::len))
.unwrap_or(0);
let namespaces: Vec<&str> = std::iter::repeat_n(namespace, count).collect();
match hirn_storage::datasets::prospective_implications::to_batch_with_namespaces(
&records,
&embedding_values,
&namespaces,
embedding_dims,
) {
Ok(batch) => Some(batch),
Err(e) => {
tracing::warn!(error = %e, "Failed to build prospective batch");
None
}
}
}
pub async fn store_prospective_implications(
storage: &dyn PhysicalStore,
embedder: &dyn Embedder,
source_id: MemoryId,
content: &str,
num_questions: usize,
timeout_secs: u64,
templates: &[String],
namespace: &str,
) -> usize {
let Some(batch) = prepare_prospective_implications_batch(
embedder,
source_id,
content,
num_questions,
timeout_secs,
templates,
namespace,
)
.await
else {
return 0;
};
let count = batch.num_rows();
match storage
.append(
hirn_storage::datasets::prospective_implications::DATASET_NAME,
batch,
)
.await
{
Ok(()) => count,
Err(e) => {
tracing::warn!(error = %e, "Failed to write prospective implications");
0
}
}
}
pub fn prepare_svo_events_batch(
source_id: MemoryId,
content: &str,
confidence_threshold: f32,
namespace: &str,
embedding_dims: usize,
) -> Option<RecordBatch> {
let events = hirn_exec::operators::extract_svo_regex(content, confidence_threshold);
if events.is_empty() {
return None;
}
let count = events.len();
let records: Vec<SvoEvent> = events
.into_iter()
.map(|event| {
SvoEvent::from_extraction(
event.subject,
event.verb,
event.object,
event.time_start,
event.time_end,
event.confidence,
vec![source_id],
)
})
.collect();
let embeddings = vec![None; count];
let namespaces: Vec<&str> = std::iter::repeat_n(namespace, count).collect();
match hirn_storage::datasets::svo_events::to_batch_with_namespaces(
&records,
&embeddings,
&namespaces,
embedding_dims,
) {
Ok(batch) => Some(batch),
Err(e) => {
tracing::warn!(error = %e, "Failed to build SVO batch");
None
}
}
}
pub async fn extract_and_store_svo_events(
storage: &dyn PhysicalStore,
source_id: MemoryId,
content: &str,
confidence_threshold: f32,
namespace: &str,
embedding_dims: usize,
) -> usize {
let Some(batch) = prepare_svo_events_batch(
source_id,
content,
confidence_threshold,
namespace,
embedding_dims,
) else {
return 0;
};
let count = batch.num_rows();
match storage
.append(hirn_storage::datasets::svo_events::DATASET_NAME, batch)
.await
{
Ok(()) => count,
Err(e) => {
tracing::warn!(error = %e, "Failed to write SVO events");
0
}
}
}
#[derive(Debug, Clone)]
struct PendingConsolidation {
snapshot_scores: std::collections::HashMap<hirn_core::types::Namespace, f32>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum InterferenceTriggerCause {
ThresholdExceeded,
}
impl InterferenceTriggerCause {
pub const fn as_str(self) -> &'static str {
match self {
Self::ThresholdExceeded => "threshold_exceeded",
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum InterferenceSuppressionReason {
CooldownActive,
AwaitingFeedback,
}
impl InterferenceSuppressionReason {
pub const fn as_str(self) -> &'static str {
match self {
Self::CooldownActive => "cooldown_active",
Self::AwaitingFeedback => "awaiting_feedback",
}
}
}
#[derive(Debug)]
pub enum InterferenceAction {
None,
TriggerConsolidation {
namespaces: Vec<hirn_core::types::Namespace>,
backlog_score: f32,
cause: InterferenceTriggerCause,
},
Suppressed {
reason: InterferenceSuppressionReason,
backlog_score: f32,
},
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ConsolidationFeedback {
Succeeded { progress_made: bool },
Failed,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ConsolidationFeedbackOutcome {
ProgressRecorded,
NoProgress,
Failed,
NoPendingTrigger,
}
impl ConsolidationFeedbackOutcome {
pub const fn as_str(self) -> &'static str {
match self {
Self::ProgressRecorded => "progress",
Self::NoProgress => "no_progress",
Self::Failed => "failed",
Self::NoPendingTrigger => "no_pending_trigger",
}
}
}
#[derive(Debug, Clone, Copy, PartialEq)]
pub struct ConsolidationFeedbackResult {
pub outcome: ConsolidationFeedbackOutcome,
pub reduced_score: f32,
pub remaining_score: f32,
}
#[derive(Debug, Clone, Copy, PartialEq)]
pub struct InterferenceStateSnapshot {
pub backlog_score: f32,
pub namespace_count: usize,
pub awaiting_feedback: bool,
}
fn sorted_namespaces(
namespaces: impl Iterator<Item = hirn_core::types::Namespace>,
) -> Vec<hirn_core::types::Namespace> {
let mut namespaces: Vec<_> = namespaces.collect();
namespaces.sort_by(|left, right| left.as_str().cmp(right.as_str()));
namespaces
}
pub fn interference_score_from_similarity(max_similarity: f32) -> f32 {
((max_similarity - 0.5) * 2.0).clamp(0.0, 1.0)
}
#[derive(Debug, Default)]
struct TriggerState {
last_trigger: Option<std::time::Instant>,
pending_consolidation: Option<PendingConsolidation>,
}
pub struct ShardedInterferenceTracker {
backlog: dashmap::DashMap<hirn_core::types::Namespace, f32>,
trigger: parking_lot::Mutex<TriggerState>,
}
impl Default for ShardedInterferenceTracker {
fn default() -> Self {
Self {
backlog: dashmap::DashMap::new(),
trigger: parking_lot::Mutex::new(TriggerState::default()),
}
}
}
impl std::fmt::Debug for ShardedInterferenceTracker {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ShardedInterferenceTracker")
.field("namespace_count", &self.backlog.len())
.field("total_backlog_score", &self.total_backlog_score())
.finish_non_exhaustive()
}
}
impl ShardedInterferenceTracker {
pub fn accumulate(
&self,
score: f32,
namespace: hirn_core::types::Namespace,
threshold: f32,
cooldown_secs: u64,
) -> InterferenceAction {
*self.backlog.entry(namespace).or_default() += score.max(0.0);
let backlog_score = self.total_backlog_score();
if backlog_score < threshold {
return InterferenceAction::None;
}
let mut trigger = self.trigger.lock();
if trigger.pending_consolidation.is_some() {
return InterferenceAction::Suppressed {
reason: InterferenceSuppressionReason::AwaitingFeedback,
backlog_score,
};
}
if let Some(last) = trigger.last_trigger {
if last.elapsed().as_secs() < cooldown_secs {
return InterferenceAction::Suppressed {
reason: InterferenceSuppressionReason::CooldownActive,
backlog_score,
};
}
}
let snapshot_scores: std::collections::HashMap<_, _> =
self.backlog.iter().map(|e| (*e.key(), *e.value())).collect();
let namespaces = sorted_namespaces(snapshot_scores.keys().copied());
trigger.pending_consolidation = Some(PendingConsolidation { snapshot_scores });
trigger.last_trigger = Some(std::time::Instant::now());
tracing::info!(
namespace_count = namespaces.len(),
backlog_score,
"Interference-driven consolidation triggered"
);
InterferenceAction::TriggerConsolidation {
namespaces,
backlog_score,
cause: InterferenceTriggerCause::ThresholdExceeded,
}
}
pub fn record_consolidation_feedback(
&self,
feedback: ConsolidationFeedback,
) -> ConsolidationFeedbackResult {
let mut trigger = self.trigger.lock();
let Some(pending) = trigger.pending_consolidation.take() else {
return ConsolidationFeedbackResult {
outcome: ConsolidationFeedbackOutcome::NoPendingTrigger,
reduced_score: 0.0,
remaining_score: self.total_backlog_score(),
};
};
match feedback {
ConsolidationFeedback::Succeeded { progress_made: true } => {
let reduced_score = self.subtract_snapshot(&pending.snapshot_scores);
ConsolidationFeedbackResult {
outcome: ConsolidationFeedbackOutcome::ProgressRecorded,
reduced_score,
remaining_score: self.total_backlog_score(),
}
}
ConsolidationFeedback::Succeeded {
progress_made: false,
} => ConsolidationFeedbackResult {
outcome: ConsolidationFeedbackOutcome::NoProgress,
reduced_score: 0.0,
remaining_score: self.total_backlog_score(),
},
ConsolidationFeedback::Failed => ConsolidationFeedbackResult {
outcome: ConsolidationFeedbackOutcome::Failed,
reduced_score: 0.0,
remaining_score: self.total_backlog_score(),
},
}
}
pub fn total_backlog_score(&self) -> f32 {
self.backlog.iter().map(|e| *e.value()).sum()
}
pub fn snapshot(&self) -> InterferenceStateSnapshot {
let trigger = self.trigger.lock();
InterferenceStateSnapshot {
backlog_score: self.total_backlog_score(),
namespace_count: self.backlog.len(),
awaiting_feedback: trigger.pending_consolidation.is_some(),
}
}
fn subtract_snapshot(
&self,
snapshot_scores: &std::collections::HashMap<hirn_core::types::Namespace, f32>,
) -> f32 {
let mut reduced_score = 0.0;
for (&namespace, &snapshot_score) in snapshot_scores {
if let Some(mut entry) = self.backlog.get_mut(&namespace) {
let applied = (*entry).min(snapshot_score);
*entry -= applied;
reduced_score += applied;
}
}
self.backlog.retain(|_, v| *v > 1e-6);
reduced_score
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn interference_score_low_similarity() {
assert_eq!(interference_score_from_similarity(0.3), 0.0);
assert_eq!(interference_score_from_similarity(0.5), 0.0);
}
#[test]
fn interference_score_high_similarity() {
let score = interference_score_from_similarity(0.8);
assert!((score - 0.6).abs() < 1e-6);
assert_eq!(interference_score_from_similarity(1.0), 1.0);
}
#[test]
fn tracker_below_threshold_returns_none() {
let ns = hirn_core::types::Namespace::default();
let tracker = ShardedInterferenceTracker::default();
let action = tracker.accumulate(0.1, ns, 0.3, 300);
assert!(matches!(action, InterferenceAction::None));
}
#[test]
fn tracker_triggers_above_threshold() {
let ns = hirn_core::types::Namespace::default();
let tracker = ShardedInterferenceTracker::default();
let _ = tracker.accumulate(0.2, ns, 0.3, 300);
let action = tracker.accumulate(0.2, ns, 0.3, 300);
assert!(matches!(
action,
InterferenceAction::TriggerConsolidation {
backlog_score,
cause: InterferenceTriggerCause::ThresholdExceeded,
..
} if backlog_score >= 0.4
));
}
#[test]
fn tracker_suppresses_while_feedback_is_pending() {
let ns = hirn_core::types::Namespace::default();
let tracker = ShardedInterferenceTracker::default();
let _ = tracker.accumulate(0.4, ns, 0.3, 300);
let action = tracker.accumulate(0.4, ns, 0.3, 300);
assert!(matches!(
action,
InterferenceAction::Suppressed {
reason: InterferenceSuppressionReason::AwaitingFeedback,
backlog_score,
} if backlog_score >= 0.8
));
}
#[test]
fn tracker_success_feedback_preserves_post_trigger_backlog() {
let ns_a = hirn_core::types::Namespace::default();
let ns_b = hirn_core::types::Namespace::shared();
let tracker = ShardedInterferenceTracker::default();
let _ = tracker.accumulate(0.4, ns_a, 0.3, 300);
let _ = tracker.accumulate(0.2, ns_a, 0.3, 300);
let _ = tracker.accumulate(0.3, ns_b, 0.3, 300);
let feedback = tracker.record_consolidation_feedback(ConsolidationFeedback::Succeeded {
progress_made: true,
});
assert!(
matches!(
feedback,
ConsolidationFeedbackResult {
outcome: ConsolidationFeedbackOutcome::ProgressRecorded,
..
}
),
"Expected progress to be recorded, got {feedback:?}",
);
assert!((feedback.reduced_score - 0.4).abs() < 1e-6);
assert!((feedback.remaining_score - 0.5).abs() < 1e-6);
let snapshot = tracker.snapshot();
assert!((snapshot.backlog_score - 0.5).abs() < 1e-6);
assert_eq!(snapshot.namespace_count, 2);
assert!(!snapshot.awaiting_feedback);
}
#[test]
fn tracker_no_progress_feedback_keeps_backlog() {
let ns = hirn_core::types::Namespace::default();
let tracker = ShardedInterferenceTracker::default();
let _ = tracker.accumulate(0.4, ns, 0.3, 300);
let feedback = tracker.record_consolidation_feedback(ConsolidationFeedback::Succeeded {
progress_made: false,
});
assert_eq!(feedback.outcome, ConsolidationFeedbackOutcome::NoProgress);
assert_eq!(feedback.reduced_score, 0.0);
assert!((feedback.remaining_score - 0.4).abs() < 1e-6);
}
#[test]
fn tracker_failure_feedback_keeps_backlog() {
let ns = hirn_core::types::Namespace::default();
let tracker = ShardedInterferenceTracker::default();
let _ = tracker.accumulate(0.4, ns, 0.3, 300);
let feedback = tracker.record_consolidation_feedback(ConsolidationFeedback::Failed);
assert_eq!(feedback.outcome, ConsolidationFeedbackOutcome::Failed);
assert_eq!(feedback.reduced_score, 0.0);
assert!((feedback.remaining_score - 0.4).abs() < 1e-6);
}
#[test]
fn tracker_cooldown_is_checked_after_feedback() {
let ns = hirn_core::types::Namespace::default();
let tracker = ShardedInterferenceTracker::default();
let _ = tracker.accumulate(0.4, ns, 0.3, 300);
let _ = tracker.record_consolidation_feedback(ConsolidationFeedback::Succeeded {
progress_made: true,
});
let action = tracker.accumulate(0.4, ns, 0.3, 300);
assert!(matches!(
action,
InterferenceAction::Suppressed {
reason: InterferenceSuppressionReason::CooldownActive,
backlog_score,
} if backlog_score >= 0.4
));
}
#[test]
fn tracker_scopes_namespaces() {
let ns_a = hirn_core::types::Namespace::default();
let ns_b = hirn_core::types::Namespace::shared();
let tracker = ShardedInterferenceTracker::default();
let _ = tracker.accumulate(0.1, ns_a, 0.3, 300);
let action = tracker.accumulate(0.3, ns_b, 0.3, 300);
assert!(matches!(
action,
InterferenceAction::TriggerConsolidation { ref namespaces, .. }
if namespaces.len() == 2
&& namespaces.contains(&ns_a)
&& namespaces.contains(&ns_b)
));
}
#[test]
fn rpe_stats_empty_returns_zero_zscore() {
let stats = RunningRpeStats::default();
assert_eq!(stats.z_score(0.5), 0.0);
}
#[test]
fn rpe_stats_single_sample_returns_zero_zscore() {
let mut stats = RunningRpeStats::default();
stats.update(0.5);
assert_eq!(stats.z_score(0.5), 0.0);
}
#[test]
fn rpe_stats_novel_content_gets_positive_zscore() {
let mut stats = RunningRpeStats::default();
for d in &[0.1, 0.15, 0.12, 0.08, 0.11, 0.09, 0.14, 0.13, 0.10, 0.07] {
stats.update(*d);
}
let z = stats.z_score(0.9);
assert!(
z > 0.0,
"Novel content should get positive z-score, got {z}",
);
}
#[test]
fn rpe_stats_familiar_content_gets_negative_zscore() {
let mut stats = RunningRpeStats::default();
for d in &[0.7, 0.8, 0.75, 0.85, 0.9, 0.72, 0.88, 0.79, 0.82, 0.77] {
stats.update(*d);
}
let z = stats.z_score(0.05);
assert!(
z < 0.0,
"Familiar content should get negative z-score, got {z}",
);
}
#[test]
fn rpe_stats_identical_samples_return_zero_zscore() {
let mut stats = RunningRpeStats::default();
for _ in 0..5 {
stats.update(0.5);
}
assert_eq!(stats.z_score(0.5), 0.0);
}
}
#[derive(Debug)]
pub struct PendingEmbedQueue {
pending: std::collections::VecDeque<PendingEmbed>,
max_capacity: usize,
}
#[derive(Debug, Clone)]
pub struct PendingEmbed {
pub id: MemoryId,
pub attempts: u32,
#[allow(dead_code)] pub enqueued_at: std::time::Instant,
}
impl Default for PendingEmbedQueue {
fn default() -> Self {
Self {
pending: std::collections::VecDeque::new(),
max_capacity: 10_000,
}
}
}
impl PendingEmbedQueue {
pub fn enqueue(&mut self, id: MemoryId) {
if self.pending.len() >= self.max_capacity {
self.pending.pop_front();
tracing::warn!(
max_capacity = self.max_capacity,
"PendingEmbedQueue capacity reached, dropping oldest entry"
);
}
self.pending.push_back(PendingEmbed {
id,
attempts: 0,
enqueued_at: std::time::Instant::now(),
});
}
pub fn drain_all(&mut self) -> Vec<PendingEmbed> {
self.pending.drain(..).collect()
}
pub fn requeue_failed(&mut self, items: Vec<PendingEmbed>, max_attempts: u32) {
for mut item in items {
item.attempts += 1;
if item.attempts < max_attempts {
self.pending.push_back(item);
} else {
tracing::warn!(
id = %item.id,
attempts = item.attempts,
"Dropping pending embed after max retry attempts"
);
}
}
}
pub fn len(&self) -> usize {
self.pending.len()
}
#[allow(dead_code)] pub fn is_empty(&self) -> bool {
self.pending.is_empty()
}
}