use chrono::{DateTime, Utc};
use dashmap::DashMap;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::sync::Arc;
use crate::{
application::services::projection::Projection, domain::entities::Event, error::Result,
prime::types::event_types,
};
pub const MEMORY_ACCESSED: &str = "prime.memory.accessed";
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RelevanceScore {
pub base_score: f64,
pub last_accessed: DateTime<Utc>,
pub access_count: u64,
pub decay_rate: f64,
}
impl RelevanceScore {
fn new(timestamp: DateTime<Utc>, decay_rate: f64) -> Self {
Self {
base_score: 1.0,
last_accessed: timestamp,
access_count: 1,
decay_rate,
}
}
pub fn current_score(&self, now: DateTime<Utc>) -> f64 {
let hours = (now - self.last_accessed).num_milliseconds().max(0) as f64 / 3_600_000.0;
self.base_score * (-self.decay_rate * hours).exp()
}
fn touch(&mut self, timestamp: DateTime<Utc>) {
self.base_score += 1.0;
self.access_count += 1;
self.last_accessed = timestamp;
}
}
pub struct RelevanceDecayProjection {
name: String,
default_decay_rate: f64,
scores: Arc<DashMap<String, RelevanceScore>>,
}
impl RelevanceDecayProjection {
pub fn new(name: impl Into<String>) -> Self {
Self {
name: name.into(),
default_decay_rate: 0.01,
scores: Arc::new(DashMap::new()),
}
}
pub fn with_decay_rate(mut self, rate: f64) -> Self {
self.default_decay_rate = rate;
self
}
pub fn relevance(&self, entity_id: &str) -> f64 {
self.relevance_at(entity_id, Utc::now())
}
pub fn relevance_at(&self, entity_id: &str, now: DateTime<Utc>) -> f64 {
self.scores
.get(entity_id)
.map_or(0.0, |s| s.current_score(now))
}
pub fn raw_score(&self, entity_id: &str) -> Option<RelevanceScore> {
self.scores.get(entity_id).map(|s| s.value().clone())
}
}
impl Projection for RelevanceDecayProjection {
fn name(&self) -> &str {
&self.name
}
fn process(&self, event: &Event) -> Result<()> {
let event_type = event.event_type_str();
if !event_type.starts_with("prime.") {
return Ok(());
}
let entity_id = event.entity_id_str().to_string();
let timestamp = event.timestamp;
match event_type {
event_types::NODE_CREATED | event_types::EDGE_CREATED => {
self.scores.insert(
entity_id,
RelevanceScore::new(timestamp, self.default_decay_rate),
);
}
event_types::NODE_UPDATED | MEMORY_ACCESSED => {
if let Some(mut entry) = self.scores.get_mut(&entity_id) {
entry.touch(timestamp);
}
}
event_types::NODE_DELETED | event_types::EDGE_DELETED => {
}
_ => {}
}
Ok(())
}
fn get_state(&self, entity_id: &str) -> Option<Value> {
self.scores
.get(entity_id)
.and_then(|s| serde_json::to_value(s.value()).ok())
}
fn clear(&self) {
self.scores.clear();
}
fn snapshot(&self) -> Option<Value> {
let entries: Vec<(String, RelevanceScore)> = self
.scores
.iter()
.map(|e| (e.key().clone(), e.value().clone()))
.collect();
serde_json::to_value(entries).ok()
}
fn restore(&self, snapshot: &Value) -> Result<()> {
let entries: Vec<(String, RelevanceScore)> = serde_json::from_value(snapshot.clone())
.map_err(|e| crate::error::AllSourceError::StorageError(e.to_string()))?;
self.scores.clear();
for (k, v) in entries {
self.scores.insert(k, v);
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use chrono::Duration;
use uuid::Uuid;
fn make_event(entity_id: &str, event_type: &str, payload: Value) -> Event {
Event::reconstruct_from_strings(
Uuid::new_v4(),
event_type.to_string(),
entity_id.to_string(),
"default".to_string(),
payload,
Utc::now(),
None,
1,
)
}
fn make_event_at(
entity_id: &str,
event_type: &str,
payload: Value,
timestamp: DateTime<Utc>,
) -> Event {
Event::reconstruct_from_strings(
Uuid::new_v4(),
event_type.to_string(),
entity_id.to_string(),
"default".to_string(),
payload,
timestamp,
None,
1,
)
}
#[test]
fn test_node_created_sets_initial_score() {
let proj = RelevanceDecayProjection::new("relevance");
proj.process(&make_event(
"node:person:alice",
event_types::NODE_CREATED,
serde_json::json!({"node_type": "person"}),
))
.unwrap();
let score = proj.relevance("node:person:alice");
assert!(score > 0.9); }
#[test]
fn test_score_decays_over_time() {
let proj = RelevanceDecayProjection::new("relevance").with_decay_rate(0.1);
let t0 = Utc::now() - Duration::hours(10);
proj.process(&make_event_at(
"node:person:alice",
event_types::NODE_CREATED,
serde_json::json!({"node_type": "person"}),
t0,
))
.unwrap();
let score = proj.relevance("node:person:alice");
assert!(score < 0.5, "score should have decayed, got {score}");
assert!(score > 0.3, "score too low, got {score}");
}
#[test]
fn test_access_boosts_score() {
let proj = RelevanceDecayProjection::new("relevance").with_decay_rate(0.1);
let t0 = Utc::now() - Duration::hours(10);
proj.process(&make_event_at(
"node:person:alice",
event_types::NODE_CREATED,
serde_json::json!({"node_type": "person"}),
t0,
))
.unwrap();
let score_before = proj.relevance("node:person:alice");
proj.process(&make_event(
"node:person:alice",
MEMORY_ACCESSED,
serde_json::json!({}),
))
.unwrap();
let score_after = proj.relevance("node:person:alice");
assert!(
score_after > score_before,
"access should boost score: before={score_before}, after={score_after}"
);
}
#[test]
fn test_update_boosts_score() {
let proj = RelevanceDecayProjection::new("relevance");
proj.process(&make_event(
"node:person:alice",
event_types::NODE_CREATED,
serde_json::json!({"node_type": "person"}),
))
.unwrap();
let raw_before = proj.raw_score("node:person:alice").unwrap();
assert_eq!(raw_before.access_count, 1);
proj.process(&make_event(
"node:person:alice",
event_types::NODE_UPDATED,
serde_json::json!({"properties": {"role": "manager"}}),
))
.unwrap();
let raw_after = proj.raw_score("node:person:alice").unwrap();
assert_eq!(raw_after.access_count, 2);
assert!(raw_after.base_score > raw_before.base_score);
}
#[test]
fn test_nonexistent_entity_returns_zero() {
let proj = RelevanceDecayProjection::new("relevance");
assert_eq!(proj.relevance("ghost"), 0.0);
}
#[test]
fn test_ignores_non_prime_events() {
let proj = RelevanceDecayProjection::new("relevance");
proj.process(&make_event(
"user-123",
"user.created",
serde_json::json!({"name": "Alice"}),
))
.unwrap();
assert_eq!(proj.relevance("user-123"), 0.0);
}
#[test]
fn test_snapshot_and_restore() {
let proj = RelevanceDecayProjection::new("relevance");
proj.process(&make_event(
"node:person:alice",
event_types::NODE_CREATED,
serde_json::json!({"node_type": "person"}),
))
.unwrap();
let snap = proj.snapshot().expect("snapshot should be Some");
proj.clear();
assert_eq!(proj.relevance("node:person:alice"), 0.0);
proj.restore(&snap).unwrap();
assert!(proj.relevance("node:person:alice") > 0.0);
}
#[test]
fn test_custom_decay_rate() {
let fast_decay = RelevanceDecayProjection::new("fast").with_decay_rate(1.0);
let slow_decay = RelevanceDecayProjection::new("slow").with_decay_rate(0.001);
let t0 = Utc::now() - Duration::hours(5);
fast_decay
.process(&make_event_at(
"node:a",
event_types::NODE_CREATED,
serde_json::json!({}),
t0,
))
.unwrap();
slow_decay
.process(&make_event_at(
"node:a",
event_types::NODE_CREATED,
serde_json::json!({}),
t0,
))
.unwrap();
let fast_score = fast_decay.relevance("node:a");
let slow_score = slow_decay.relevance("node:a");
assert!(
fast_score < slow_score,
"fast decay should produce lower score: fast={fast_score}, slow={slow_score}"
);
}
#[test]
fn test_edge_created_gets_score() {
let proj = RelevanceDecayProjection::new("relevance");
proj.process(&make_event(
"edge:e-1",
event_types::EDGE_CREATED,
serde_json::json!({"relation": "knows"}),
))
.unwrap();
assert!(proj.relevance("edge:e-1") > 0.0);
}
}