use chrono::{DateTime, Duration, Utc};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use tokio::sync::{RwLock, mpsc};
use tokio::time::interval;
use tracing::{debug, info, warn};
use uuid::Uuid;
use super::feedback::{FeedbackSignal, ProcessedFeedback, SignalType};
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub enum JobType {
PatternDetector,
GapIdentifier,
KnowledgeClassifier,
RelationshipMiner,
ModelConsolidator,
}
impl JobType {
pub fn default_interval(&self) -> Duration {
match self {
JobType::PatternDetector => Duration::hours(1),
JobType::GapIdentifier | JobType::KnowledgeClassifier => Duration::days(1),
JobType::RelationshipMiner | JobType::ModelConsolidator => Duration::weeks(1),
}
}
pub fn name(&self) -> &'static str {
match self {
JobType::PatternDetector => "Pattern Detector",
JobType::GapIdentifier => "Gap Identifier",
JobType::KnowledgeClassifier => "Knowledge Classifier",
JobType::RelationshipMiner => "Relationship Miner",
JobType::ModelConsolidator => "Model Consolidator",
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum JobStatus {
Pending,
Running {
started_at: DateTime<Utc>,
progress: u8,
},
Completed {
finished_at: DateTime<Utc>,
duration_secs: u64,
items_processed: usize,
},
Failed {
failed_at: DateTime<Utc>,
error: String,
},
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct JobRun {
pub id: Uuid,
pub job_type: JobType,
pub status: JobStatus,
pub scheduled_at: DateTime<Utc>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum Insight {
QueryPattern {
description: String,
example_queries: Vec<String>,
frequency: f32,
related_entries: Vec<Uuid>,
},
KnowledgeGap {
topic: String,
unresolved_queries: Vec<String>,
suggestions: Vec<String>,
severity: f32,
},
Classification {
entry_id: Uuid,
class: KnowledgeClass,
confidence: f32,
reason: String,
},
Relationship {
source_id: Uuid,
target_id: Uuid,
relationship: RelationshipType,
strength: f32,
},
HotTopic {
topic: String,
entry_ids: Vec<Uuid>,
interest_score: f32,
trend: Trend,
},
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum KnowledgeClass {
Core,
Derived,
Contextual,
Ephemeral,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum RelationshipType {
Extends,
Contradicts,
Prerequisite,
Related,
Supersedes,
CoAccessed,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum Trend {
Rising,
Stable,
Falling,
}
pub struct InsightStore {
insights: RwLock<Vec<Insight>>,
by_entry: RwLock<HashMap<Uuid, Vec<usize>>>,
last_updated: RwLock<DateTime<Utc>>,
}
impl InsightStore {
pub fn new() -> Self {
Self {
insights: RwLock::new(Vec::new()),
by_entry: RwLock::new(HashMap::new()),
last_updated: RwLock::new(Utc::now()),
}
}
pub async fn add(&self, insight: Insight) {
let mut insights = self.insights.write().await;
let idx = insights.len();
let entry_ids = Self::extract_entry_ids(&insight);
{
let mut by_entry = self.by_entry.write().await;
for id in entry_ids {
by_entry.entry(id).or_default().push(idx);
}
}
insights.push(insight);
*self.last_updated.write().await = Utc::now();
}
pub async fn for_entry(&self, entry_id: Uuid) -> Vec<Insight> {
let by_entry = self.by_entry.read().await;
let insights = self.insights.read().await;
by_entry
.get(&entry_id)
.map(|indices| {
indices
.iter()
.filter_map(|&i| insights.get(i).cloned())
.collect()
})
.unwrap_or_default()
}
pub async fn gaps(&self) -> Vec<Insight> {
self.insights
.read()
.await
.iter()
.filter(|i| matches!(i, Insight::KnowledgeGap { .. }))
.cloned()
.collect()
}
pub async fn hot_topics(&self) -> Vec<Insight> {
self.insights
.read()
.await
.iter()
.filter(|i| matches!(i, Insight::HotTopic { .. }))
.cloned()
.collect()
}
pub async fn classification(&self, entry_id: Uuid) -> Option<KnowledgeClass> {
self.for_entry(entry_id).await.into_iter().find_map(|i| {
if let Insight::Classification { class, .. } = i {
Some(class)
} else {
None
}
})
}
pub async fn clear(&self) {
self.insights.write().await.clear();
self.by_entry.write().await.clear();
}
fn extract_entry_ids(insight: &Insight) -> Vec<Uuid> {
match insight {
Insight::QueryPattern {
related_entries, ..
} => related_entries.clone(),
Insight::KnowledgeGap { .. } => vec![],
Insight::Classification { entry_id, .. } => vec![*entry_id],
Insight::Relationship {
source_id,
target_id,
..
} => vec![*source_id, *target_id],
Insight::HotTopic { entry_ids, .. } => entry_ids.clone(),
}
}
}
impl Default for InsightStore {
fn default() -> Self {
Self::new()
}
}
pub struct BatchInput {
pub signals: Vec<FeedbackSignal>,
pub processed_feedback: Vec<ProcessedFeedback>,
pub entry_metadata: HashMap<Uuid, EntryMetadata>,
pub entry_embeddings: HashMap<Uuid, Vec<f32>>,
pub relationships: Vec<(Uuid, Uuid, f32)>,
}
#[derive(Debug, Clone)]
pub struct EntryMetadata {
pub id: Uuid,
pub created_at: DateTime<Utc>,
pub last_accessed: DateTime<Utc>,
pub access_count: u64,
pub relevance_score: f32,
pub tags: Vec<String>,
pub category: Option<String>,
}
#[async_trait::async_trait]
pub trait BatchJob: Send + Sync {
fn job_type(&self) -> JobType;
async fn run(&self, input: &BatchInput) -> Result<Vec<Insight>, String>;
fn estimated_duration_secs(&self) -> u64 {
60
}
}
pub struct PatternDetectorJob {
pub min_frequency: f32,
pub max_patterns: usize,
}
impl Default for PatternDetectorJob {
fn default() -> Self {
Self {
min_frequency: 0.05,
max_patterns: 20,
}
}
}
#[async_trait::async_trait]
impl BatchJob for PatternDetectorJob {
fn job_type(&self) -> JobType {
JobType::PatternDetector
}
async fn run(&self, input: &BatchInput) -> Result<Vec<Insight>, String> {
let mut insights = Vec::new();
let queries: Vec<String> = input
.signals
.iter()
.filter_map(|s| match &s.signal {
SignalType::Query { text, .. } => Some(text.clone()),
_ => None,
})
.collect();
if queries.is_empty() {
return Ok(insights);
}
let mut term_counts: HashMap<String, Vec<String>> = HashMap::new();
for query in &queries {
for word in query.to_lowercase().split_whitespace() {
if word.len() > 3 {
term_counts
.entry(word.to_string())
.or_default()
.push(query.clone());
}
}
}
let total_queries = queries.len() as f32;
let mut patterns: Vec<_> = term_counts
.into_iter()
.filter(|(_, qs)| qs.len() as f32 / total_queries >= self.min_frequency)
.map(|(term, qs)| (term, qs.len() as f32 / total_queries, qs))
.collect();
patterns.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap());
patterns.truncate(self.max_patterns);
for (term, freq, example_queries) in patterns {
let related_entries: Vec<Uuid> = input
.processed_feedback
.iter()
.filter(|fb| fb.relevance_delta > 0.0)
.map(|fb| fb.entry_id)
.collect();
insights.push(Insight::QueryPattern {
description: format!("Queries about '{}'", term),
example_queries: example_queries.into_iter().take(5).collect(),
frequency: freq,
related_entries,
});
}
Ok(insights)
}
}
pub struct GapIdentifierJob {
pub min_unresolved: usize,
pub max_gaps: usize,
}
impl Default for GapIdentifierJob {
fn default() -> Self {
Self {
min_unresolved: 3,
max_gaps: 10,
}
}
}
#[async_trait::async_trait]
impl BatchJob for GapIdentifierJob {
fn job_type(&self) -> JobType {
JobType::GapIdentifier
}
async fn run(&self, input: &BatchInput) -> Result<Vec<Insight>, String> {
let mut insights = Vec::new();
let mut query_results: HashMap<String, (usize, usize)> = HashMap::new();
for signal in &input.signals {
if let SignalType::Query {
text, result_ids, ..
} = &signal.signal
{
let entry = query_results.entry(text.clone()).or_insert((0, 0));
entry.0 += 1;
let has_positive = input
.processed_feedback
.iter()
.any(|fb| result_ids.contains(&fb.entry_id) && fb.relevance_delta > 0.0);
if has_positive {
entry.1 += 1;
}
}
}
let low_success: Vec<_> = query_results
.into_iter()
.filter(|(_, (total, positive))| {
*total >= self.min_unresolved && (*positive as f32 / *total as f32) < 0.3
})
.collect();
if !low_success.is_empty() {
let mut by_topic: HashMap<String, Vec<String>> = HashMap::new();
for (query, _) in low_success {
let topic = query
.split_whitespace()
.find(|w| w.len() > 3)
.unwrap_or("general")
.to_string();
by_topic.entry(topic).or_default().push(query);
}
for (topic, queries) in by_topic.into_iter().take(self.max_gaps) {
let severity = (queries.len() as f32 / 10.0).min(1.0);
insights.push(Insight::KnowledgeGap {
topic: topic.clone(),
unresolved_queries: queries.clone(),
suggestions: vec![format!("Add documentation about {}", topic)],
severity,
});
}
}
Ok(insights)
}
}
pub struct KnowledgeClassifierJob {
pub core_access_threshold: f32,
pub ephemeral_age_days: i64,
}
impl Default for KnowledgeClassifierJob {
fn default() -> Self {
Self {
core_access_threshold: 1.0,
ephemeral_age_days: 7,
}
}
}
#[async_trait::async_trait]
impl BatchJob for KnowledgeClassifierJob {
fn job_type(&self) -> JobType {
JobType::KnowledgeClassifier
}
async fn run(&self, input: &BatchInput) -> Result<Vec<Insight>, String> {
let mut insights = Vec::new();
let now = Utc::now();
for (id, meta) in &input.entry_metadata {
let age_days = (now - meta.created_at).num_days().max(1);
let access_rate = meta.access_count as f32 / age_days as f32;
let (class, confidence, reason) =
if access_rate >= self.core_access_threshold && age_days > 30 {
(
KnowledgeClass::Core,
0.8,
"High access rate over extended period".to_string(),
)
} else if age_days <= self.ephemeral_age_days && meta.access_count <= 2 {
(
KnowledgeClass::Ephemeral,
0.6,
"New entry with limited access".to_string(),
)
} else if meta
.tags
.iter()
.any(|t| t.contains("derived") || t.contains("computed"))
{
(
KnowledgeClass::Derived,
0.7,
"Tagged as derived knowledge".to_string(),
)
} else if meta
.category
.as_ref()
.is_some_and(|c| c.contains("project"))
{
(
KnowledgeClass::Contextual,
0.7,
"Project-specific content".to_string(),
)
} else {
(
KnowledgeClass::Derived,
0.4,
"Default classification".to_string(),
)
};
insights.push(Insight::Classification {
entry_id: *id,
class,
confidence,
reason,
});
}
Ok(insights)
}
}
pub struct RelationshipMinerJob {
pub min_co_access: usize,
pub min_similarity: f32,
}
impl Default for RelationshipMinerJob {
fn default() -> Self {
Self {
min_co_access: 3,
min_similarity: 0.7,
}
}
}
#[async_trait::async_trait]
impl BatchJob for RelationshipMinerJob {
fn job_type(&self) -> JobType {
JobType::RelationshipMiner
}
async fn run(&self, input: &BatchInput) -> Result<Vec<Insight>, String> {
let mut insights = Vec::new();
let mut co_access_counts: HashMap<(Uuid, Uuid), usize> = HashMap::new();
for signal in &input.signals {
if let SignalType::CoAccess { entry_ids } = &signal.signal {
for i in 0..entry_ids.len() {
for j in (i + 1)..entry_ids.len() {
let pair = if entry_ids[i] < entry_ids[j] {
(entry_ids[i], entry_ids[j])
} else {
(entry_ids[j], entry_ids[i])
};
*co_access_counts.entry(pair).or_insert(0) += 1;
}
}
}
}
for ((id1, id2), count) in co_access_counts {
if count >= self.min_co_access {
let strength = (count as f32 / 10.0).min(1.0);
insights.push(Insight::Relationship {
source_id: id1,
target_id: id2,
relationship: RelationshipType::CoAccessed,
strength,
});
}
}
let entries: Vec<_> = input.entry_embeddings.iter().collect();
for i in 0..entries.len() {
for j in (i + 1)..entries.len() {
let (id1, emb1) = entries[i];
let (id2, emb2) = entries[j];
let similarity = cosine_similarity(emb1, emb2);
if similarity >= self.min_similarity {
insights.push(Insight::Relationship {
source_id: *id1,
target_id: *id2,
relationship: RelationshipType::Related,
strength: similarity,
});
}
}
}
Ok(insights)
}
fn estimated_duration_secs(&self) -> u64 {
300 }
}
fn cosine_similarity(a: &[f32], b: &[f32]) -> f32 {
if a.len() != b.len() {
return 0.0;
}
let dot: f32 = a.iter().zip(b.iter()).map(|(x, y)| x * y).sum();
let norm_a: f32 = a.iter().map(|x| x * x).sum::<f32>().sqrt();
let norm_b: f32 = b.iter().map(|x| x * x).sum::<f32>().sqrt();
if norm_a == 0.0 || norm_b == 0.0 {
0.0
} else {
dot / (norm_a * norm_b)
}
}
pub struct BatchScheduler {
jobs: Vec<Arc<dyn BatchJob>>,
history: RwLock<Vec<JobRun>>,
insight_store: Arc<InsightStore>,
running: AtomicBool,
total_runs: AtomicU64,
}
impl BatchScheduler {
pub fn new(insight_store: Arc<InsightStore>) -> Self {
Self {
jobs: vec![
Arc::new(PatternDetectorJob::default()),
Arc::new(GapIdentifierJob::default()),
Arc::new(KnowledgeClassifierJob::default()),
Arc::new(RelationshipMinerJob::default()),
],
history: RwLock::new(Vec::new()),
insight_store,
running: AtomicBool::new(false),
total_runs: AtomicU64::new(0),
}
}
pub fn add_job(&mut self, job: Arc<dyn BatchJob>) {
self.jobs.push(job);
}
pub async fn run_job(&self, job_type: JobType, input: &BatchInput) -> Result<JobRun, String> {
let job = self
.jobs
.iter()
.find(|j| j.job_type() == job_type)
.ok_or_else(|| format!("Job type {:?} not found", job_type))?;
let run_id = Uuid::new_v4();
let started_at = Utc::now();
info!(job = %job_type.name(), "Starting batch job");
let run = JobRun {
id: run_id,
job_type,
status: JobStatus::Running {
started_at,
progress: 0,
},
scheduled_at: started_at,
};
self.history.write().await.push(run.clone());
let result = job.run(input).await;
let finished_at = Utc::now();
let duration_secs = (finished_at - started_at).num_seconds() as u64;
let final_status = match result {
Ok(insights) => {
let count = insights.len();
for insight in insights {
self.insight_store.add(insight).await;
}
info!(job = %job_type.name(), insights = count, duration_secs, "Batch job completed");
JobStatus::Completed {
finished_at,
duration_secs,
items_processed: count,
}
}
Err(e) => {
warn!(job = %job_type.name(), error = %e, "Batch job failed");
JobStatus::Failed {
failed_at: finished_at,
error: e,
}
}
};
{
let mut history = self.history.write().await;
if let Some(run) = history.iter_mut().find(|r| r.id == run_id) {
run.status = final_status.clone();
}
}
self.total_runs.fetch_add(1, Ordering::Relaxed);
Ok(JobRun {
id: run_id,
job_type,
status: final_status,
scheduled_at: started_at,
})
}
pub async fn run_all(&self, input: &BatchInput) -> Vec<JobRun> {
let mut runs = Vec::new();
for job in &self.jobs {
if let Ok(run) = self.run_job(job.job_type(), input).await {
runs.push(run);
}
}
runs
}
pub async fn history(&self) -> Vec<JobRun> {
self.history.read().await.clone()
}
pub async fn last_run(&self, job_type: JobType) -> Option<JobRun> {
self.history
.read()
.await
.iter()
.rfind(|r| r.job_type == job_type)
.cloned()
}
pub fn total_runs(&self) -> u64 {
self.total_runs.load(Ordering::Relaxed)
}
pub fn start_background(
self: Arc<Self>,
mut input_receiver: mpsc::Receiver<BatchInput>,
) -> tokio::task::JoinHandle<()> {
self.running.store(true, Ordering::SeqCst);
tokio::spawn(async move {
let mut check_interval = interval(std::time::Duration::from_secs(60));
while self.running.load(Ordering::SeqCst) {
tokio::select! {
Some(input) = input_receiver.recv() => {
debug!("Received batch input, running all jobs");
self.run_all(&input).await;
}
_ = check_interval.tick() => {
}
}
}
})
}
pub fn stop(&self) {
self.running.store(false, Ordering::SeqCst);
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_job_type_intervals() {
assert_eq!(
JobType::PatternDetector.default_interval(),
Duration::hours(1)
);
assert_eq!(JobType::GapIdentifier.default_interval(), Duration::days(1));
}
#[tokio::test]
async fn test_insight_store() {
let store = InsightStore::new();
let entry_id = Uuid::new_v4();
store
.add(Insight::Classification {
entry_id,
class: KnowledgeClass::Core,
confidence: 0.9,
reason: "Test".to_string(),
})
.await;
let class = store.classification(entry_id).await;
assert_eq!(class, Some(KnowledgeClass::Core));
}
#[tokio::test]
async fn test_pattern_detector() {
let job = PatternDetectorJob::default();
let signals = vec![
FeedbackSignal::new(
super::super::feedback::SessionId::new(),
None,
SignalType::Query {
text: "rust async programming".to_string(),
embedding: None,
result_ids: vec![],
},
),
FeedbackSignal::new(
super::super::feedback::SessionId::new(),
None,
SignalType::Query {
text: "async rust patterns".to_string(),
embedding: None,
result_ids: vec![],
},
),
FeedbackSignal::new(
super::super::feedback::SessionId::new(),
None,
SignalType::Query {
text: "rust async await".to_string(),
embedding: None,
result_ids: vec![],
},
),
];
let input = BatchInput {
signals,
processed_feedback: vec![],
entry_metadata: HashMap::new(),
entry_embeddings: HashMap::new(),
relationships: vec![],
};
let insights = job.run(&input).await.unwrap();
assert!(!insights.is_empty());
}
#[tokio::test]
async fn test_scheduler_run_job() {
let store = Arc::new(InsightStore::new());
let scheduler = BatchScheduler::new(store.clone());
let input = BatchInput {
signals: vec![],
processed_feedback: vec![],
entry_metadata: HashMap::new(),
entry_embeddings: HashMap::new(),
relationships: vec![],
};
let run = scheduler.run_job(JobType::PatternDetector, &input).await;
assert!(run.is_ok());
let run = run.unwrap();
assert!(matches!(run.status, JobStatus::Completed { .. }));
}
#[test]
fn test_cosine_similarity() {
let a = vec![1.0, 0.0, 0.0];
let b = vec![1.0, 0.0, 0.0];
assert!((cosine_similarity(&a, &b) - 1.0).abs() < 0.001);
let c = vec![0.0, 1.0, 0.0];
assert!((cosine_similarity(&a, &c) - 0.0).abs() < 0.001);
}
}