#![allow(dead_code)]
use std::collections::HashMap;
use rayon::prelude::*;
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub enum BatchStrategy {
Popularity,
ContentBased,
Collaborative,
Hybrid,
}
impl std::fmt::Display for BatchStrategy {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Popularity => write!(f, "popularity"),
Self::ContentBased => write!(f, "content_based"),
Self::Collaborative => write!(f, "collaborative"),
Self::Hybrid => write!(f, "hybrid"),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BatchItem {
pub item_id: String,
pub popularity: f64,
pub categories: Vec<String>,
pub features: Vec<f64>,
}
impl BatchItem {
#[must_use]
pub fn new(item_id: impl Into<String>, popularity: f64) -> Self {
Self {
item_id: item_id.into(),
popularity: popularity.clamp(0.0, 1.0),
categories: Vec::new(),
features: Vec::new(),
}
}
#[must_use]
pub fn with_categories(mut self, categories: Vec<String>) -> Self {
self.categories = categories;
self
}
#[must_use]
pub fn with_features(mut self, features: Vec<f64>) -> Self {
self.features = features;
self
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct UserContext {
pub user_id: String,
pub interest_vector: Vec<f64>,
pub cf_embedding: Vec<f64>,
pub seen_items: Vec<String>,
}
impl UserContext {
#[must_use]
pub fn new(user_id: impl Into<String>) -> Self {
Self {
user_id: user_id.into(),
interest_vector: Vec::new(),
cf_embedding: Vec::new(),
seen_items: Vec::new(),
}
}
#[must_use]
pub fn with_interest_vector(mut self, v: Vec<f64>) -> Self {
self.interest_vector = v;
self
}
#[must_use]
pub fn with_cf_embedding(mut self, v: Vec<f64>) -> Self {
self.cf_embedding = v;
self
}
#[must_use]
pub fn with_seen_items(mut self, items: Vec<String>) -> Self {
self.seen_items = items;
self
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BatchRecommendation {
pub item_id: String,
pub score: f64,
pub rank: usize,
pub strategy: BatchStrategy,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct UserRecommendations {
pub user_id: String,
pub recommendations: Vec<BatchRecommendation>,
pub computed_at_ms: i64,
pub ttl_ms: i64,
}
impl UserRecommendations {
#[must_use]
pub fn is_expired(&self, now_ms: i64) -> bool {
if self.ttl_ms == 0 {
return false;
}
now_ms > self.computed_at_ms + self.ttl_ms
}
}
#[derive(Debug, Clone)]
pub struct BatchJobConfig {
pub top_k: usize,
pub strategy: BatchStrategy,
pub hybrid_content_weight: f64,
pub result_ttl_ms: i64,
pub shard_size: usize,
}
impl Default for BatchJobConfig {
fn default() -> Self {
Self {
top_k: 20,
strategy: BatchStrategy::Hybrid,
hybrid_content_weight: 0.5,
result_ttl_ms: 3_600_000, shard_size: 64,
}
}
}
fn cosine_sim(a: &[f64], b: &[f64]) -> f64 {
if a.len() != b.len() || a.is_empty() {
return 0.0;
}
let dot: f64 = a.iter().zip(b).map(|(x, y)| x * y).sum();
let na: f64 = a.iter().map(|x| x * x).sum::<f64>().sqrt();
let nb: f64 = b.iter().map(|x| x * x).sum::<f64>().sqrt();
if na < f64::EPSILON || nb < f64::EPSILON {
return 0.0;
}
(dot / (na * nb)).clamp(-1.0, 1.0)
}
fn score_pair(item: &BatchItem, user: &UserContext, config: &BatchJobConfig) -> f64 {
match config.strategy {
BatchStrategy::Popularity => item.popularity,
BatchStrategy::ContentBased => {
if user.interest_vector.is_empty() || item.features.is_empty() {
item.popularity * 0.5
} else {
cosine_sim(&user.interest_vector, &item.features)
}
}
BatchStrategy::Collaborative => {
if user.cf_embedding.is_empty() || item.features.is_empty() {
item.popularity * 0.5
} else {
cosine_sim(&user.cf_embedding, &item.features)
}
}
BatchStrategy::Hybrid => {
let cb = if user.interest_vector.is_empty() || item.features.is_empty() {
item.popularity * 0.5
} else {
cosine_sim(&user.interest_vector, &item.features)
};
let cf = if user.cf_embedding.is_empty() || item.features.is_empty() {
item.popularity * 0.5
} else {
cosine_sim(&user.cf_embedding, &item.features)
};
let w = config.hybrid_content_weight.clamp(0.0, 1.0);
w * cb + (1.0 - w) * cf
}
}
}
#[derive(Debug)]
pub struct BatchProcessor {
config: BatchJobConfig,
}
impl BatchProcessor {
#[must_use]
pub fn new() -> Self {
Self {
config: BatchJobConfig::default(),
}
}
#[must_use]
pub fn with_config(config: BatchJobConfig) -> Self {
Self { config }
}
#[must_use]
pub fn run(
&self,
users: &[UserContext],
catalog: &[BatchItem],
now_ms: i64,
) -> Vec<UserRecommendations> {
let shards: Vec<&[UserContext]> = users.chunks(self.config.shard_size.max(1)).collect();
let results: Vec<Vec<UserRecommendations>> = shards
.par_iter()
.map(|shard| {
shard
.iter()
.map(|user| self.score_user(user, catalog, now_ms))
.collect()
})
.collect();
results.into_iter().flatten().collect()
}
fn score_user(
&self,
user: &UserContext,
catalog: &[BatchItem],
now_ms: i64,
) -> UserRecommendations {
use std::collections::HashSet;
let seen: HashSet<&str> = user.seen_items.iter().map(String::as_str).collect();
let mut scored: Vec<(f64, &str)> = catalog
.iter()
.filter(|item| !seen.contains(item.item_id.as_str()))
.map(|item| (score_pair(item, user, &self.config), item.item_id.as_str()))
.collect();
scored.sort_by(|a, b| b.0.partial_cmp(&a.0).unwrap_or(std::cmp::Ordering::Equal));
scored.truncate(self.config.top_k);
let recommendations: Vec<BatchRecommendation> = scored
.into_iter()
.enumerate()
.map(|(i, (score, item_id))| BatchRecommendation {
item_id: item_id.to_string(),
score,
rank: i + 1,
strategy: self.config.strategy,
})
.collect();
UserRecommendations {
user_id: user.user_id.clone(),
recommendations,
computed_at_ms: now_ms,
ttl_ms: self.config.result_ttl_ms,
}
}
}
impl Default for BatchProcessor {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Default)]
pub struct BatchResultStore {
results: HashMap<String, UserRecommendations>,
}
impl BatchResultStore {
#[must_use]
pub fn new() -> Self {
Self::default()
}
pub fn put(&mut self, recs: UserRecommendations) {
self.results.insert(recs.user_id.clone(), recs);
}
pub fn put_batch(&mut self, batch: Vec<UserRecommendations>) {
for recs in batch {
self.put(recs);
}
}
#[must_use]
pub fn get(&self, user_id: &str, now_ms: i64) -> Option<&UserRecommendations> {
let recs = self.results.get(user_id)?;
if recs.is_expired(now_ms) {
return None;
}
Some(recs)
}
pub fn evict_expired(&mut self, now_ms: i64) {
self.results.retain(|_, v| !v.is_expired(now_ms));
}
#[must_use]
pub fn len(&self) -> usize {
self.results.len()
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.results.is_empty()
}
pub fn clear(&mut self) {
self.results.clear();
}
}
#[derive(Debug)]
pub struct BatchJob {
pub users: Vec<UserContext>,
pub catalog: Vec<BatchItem>,
pub config: BatchJobConfig,
}
impl BatchJob {
#[must_use]
pub fn new(users: Vec<UserContext>, catalog: Vec<BatchItem>) -> Self {
Self {
users,
catalog,
config: BatchJobConfig::default(),
}
}
#[must_use]
pub fn with_config(mut self, config: BatchJobConfig) -> Self {
self.config = config;
self
}
#[must_use]
pub fn run(&self, now_ms: i64) -> Vec<UserRecommendations> {
let processor = BatchProcessor::with_config(self.config.clone());
processor.run(&self.users, &self.catalog, now_ms)
}
}
#[cfg(test)]
mod tests {
use super::*;
fn make_catalog() -> Vec<BatchItem> {
vec![
BatchItem::new("item_a", 0.9)
.with_categories(vec!["action".to_string()])
.with_features(vec![1.0, 0.0, 0.0]),
BatchItem::new("item_b", 0.7)
.with_categories(vec!["drama".to_string()])
.with_features(vec![0.0, 1.0, 0.0]),
BatchItem::new("item_c", 0.5)
.with_categories(vec!["comedy".to_string()])
.with_features(vec![0.0, 0.0, 1.0]),
BatchItem::new("item_d", 0.6)
.with_categories(vec!["action".to_string(), "drama".to_string()])
.with_features(vec![0.7, 0.7, 0.0]),
]
}
fn make_users() -> Vec<UserContext> {
vec![
UserContext::new("alice")
.with_interest_vector(vec![1.0, 0.0, 0.0])
.with_cf_embedding(vec![1.0, 0.0, 0.0]),
UserContext::new("bob")
.with_interest_vector(vec![0.0, 1.0, 0.0])
.with_cf_embedding(vec![0.0, 1.0, 0.0]),
]
}
#[test]
fn test_batch_item_creation() {
let item = BatchItem::new("x", 0.75);
assert_eq!(item.item_id, "x");
assert!((item.popularity - 0.75).abs() < f64::EPSILON);
assert!(item.categories.is_empty());
}
#[test]
fn test_batch_item_popularity_clamp() {
let item = BatchItem::new("x", 1.5);
assert!((item.popularity - 1.0).abs() < f64::EPSILON);
let item2 = BatchItem::new("y", -0.5);
assert!((item2.popularity - 0.0).abs() < f64::EPSILON);
}
#[test]
fn test_user_context_seen_items_excluded() {
let catalog = make_catalog();
let user = UserContext::new("u1")
.with_seen_items(vec!["item_a".to_string(), "item_b".to_string()]);
let config = BatchJobConfig {
strategy: BatchStrategy::Popularity,
top_k: 10,
..Default::default()
};
let processor = BatchProcessor::with_config(config);
let results = processor.run(&[user], &catalog, 0);
assert_eq!(results.len(), 1);
let recs = &results[0].recommendations;
assert!(recs.iter().all(|r| r.item_id != "item_a"));
assert!(recs.iter().all(|r| r.item_id != "item_b"));
}
#[test]
fn test_popularity_strategy_sorts_by_popularity() {
let catalog = make_catalog();
let users = vec![UserContext::new("u1")];
let config = BatchJobConfig {
strategy: BatchStrategy::Popularity,
top_k: 4,
..Default::default()
};
let processor = BatchProcessor::with_config(config);
let results = processor.run(&users, &catalog, 0);
let recs = &results[0].recommendations;
assert!(!recs.is_empty());
assert_eq!(recs[0].item_id, "item_a");
}
#[test]
fn test_content_based_prefers_similar_items() {
let catalog = make_catalog();
let users = vec![UserContext::new("alice").with_interest_vector(vec![1.0, 0.0, 0.0])];
let config = BatchJobConfig {
strategy: BatchStrategy::ContentBased,
top_k: 4,
..Default::default()
};
let processor = BatchProcessor::with_config(config);
let results = processor.run(&users, &catalog, 0);
let recs = &results[0].recommendations;
assert_eq!(recs[0].item_id, "item_a");
}
#[test]
fn test_hybrid_strategy_returns_results() {
let catalog = make_catalog();
let users = make_users();
let config = BatchJobConfig {
strategy: BatchStrategy::Hybrid,
top_k: 3,
hybrid_content_weight: 0.5,
..Default::default()
};
let processor = BatchProcessor::with_config(config);
let results = processor.run(&users, &catalog, 1000);
assert_eq!(results.len(), 2);
for result in &results {
assert!(result.recommendations.len() <= 3);
for (i, rec) in result.recommendations.iter().enumerate() {
assert_eq!(rec.rank, i + 1);
}
}
}
#[test]
fn test_top_k_limit_respected() {
let catalog = make_catalog();
let users = vec![UserContext::new("u1")];
let config = BatchJobConfig {
strategy: BatchStrategy::Popularity,
top_k: 2,
..Default::default()
};
let processor = BatchProcessor::with_config(config);
let results = processor.run(&users, &catalog, 0);
assert!(results[0].recommendations.len() <= 2);
}
#[test]
fn test_parallel_batch_multiple_users() {
let catalog = make_catalog();
let users: Vec<UserContext> = (0..20).map(|i| UserContext::new(format!("u{i}"))).collect();
let config = BatchJobConfig {
strategy: BatchStrategy::Popularity,
top_k: 4,
shard_size: 5,
..Default::default()
};
let processor = BatchProcessor::with_config(config);
let results = processor.run(&users, &catalog, 0);
assert_eq!(results.len(), 20);
}
#[test]
fn test_store_put_and_get() {
let mut store = BatchResultStore::new();
let recs = UserRecommendations {
user_id: "u1".to_string(),
recommendations: vec![],
computed_at_ms: 1000,
ttl_ms: 3600_000,
};
store.put(recs);
assert_eq!(store.len(), 1);
let retrieved = store.get("u1", 2000);
assert!(retrieved.is_some());
}
#[test]
fn test_store_ttl_expiry() {
let mut store = BatchResultStore::new();
let recs = UserRecommendations {
user_id: "u1".to_string(),
recommendations: vec![],
computed_at_ms: 0,
ttl_ms: 1000, };
store.put(recs);
assert!(store.get("u1", 500).is_some());
assert!(store.get("u1", 2000).is_none());
}
#[test]
fn test_store_evict_expired() {
let mut store = BatchResultStore::new();
store.put(UserRecommendations {
user_id: "u1".to_string(),
recommendations: vec![],
computed_at_ms: 0,
ttl_ms: 1000,
});
store.put(UserRecommendations {
user_id: "u2".to_string(),
recommendations: vec![],
computed_at_ms: 0,
ttl_ms: 0, });
store.evict_expired(2000);
assert_eq!(store.len(), 1);
assert!(store.get("u2", 2000).is_some());
}
#[test]
fn test_store_put_batch() {
let mut store = BatchResultStore::new();
let batch: Vec<UserRecommendations> = (0..5)
.map(|i| UserRecommendations {
user_id: format!("u{i}"),
recommendations: vec![],
computed_at_ms: 0,
ttl_ms: 0,
})
.collect();
store.put_batch(batch);
assert_eq!(store.len(), 5);
}
#[test]
fn test_batch_job_run() {
let catalog = make_catalog();
let users = make_users();
let job = BatchJob::new(users, catalog).with_config(BatchJobConfig {
strategy: BatchStrategy::ContentBased,
top_k: 3,
..Default::default()
});
let results = job.run(1_000_000);
assert_eq!(results.len(), 2);
assert_eq!(results[0].computed_at_ms, 1_000_000);
}
#[test]
fn test_user_recommendations_never_expires_when_ttl_zero() {
let recs = UserRecommendations {
user_id: "u1".to_string(),
recommendations: vec![],
computed_at_ms: 0,
ttl_ms: 0,
};
assert!(!recs.is_expired(i64::MAX));
}
#[test]
fn test_batch_strategy_display() {
assert_eq!(BatchStrategy::Popularity.to_string(), "popularity");
assert_eq!(BatchStrategy::Hybrid.to_string(), "hybrid");
assert_eq!(BatchStrategy::ContentBased.to_string(), "content_based");
assert_eq!(BatchStrategy::Collaborative.to_string(), "collaborative");
}
#[test]
fn test_empty_catalog_returns_empty_recs() {
let users = vec![UserContext::new("u1")];
let processor = BatchProcessor::new();
let results = processor.run(&users, &[], 0);
assert_eq!(results.len(), 1);
assert!(results[0].recommendations.is_empty());
}
#[test]
fn test_empty_users_returns_empty() {
let catalog = make_catalog();
let processor = BatchProcessor::new();
let results = processor.run(&[], &catalog, 0);
assert!(results.is_empty());
}
}