1use chrono::{DateTime, Duration, Utc};
38use serde::{Deserialize, Serialize};
39use std::collections::HashMap;
40use std::sync::Arc;
41use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
42use tokio::sync::{RwLock, mpsc};
43use tokio::time::interval;
44use tracing::{debug, info, warn};
45use uuid::Uuid;
46
47use super::feedback::{FeedbackSignal, ProcessedFeedback, SignalType};
48
49#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
51pub enum JobType {
52 PatternDetector,
54 GapIdentifier,
56 KnowledgeClassifier,
58 RelationshipMiner,
60 ModelConsolidator,
62}
63
64impl JobType {
65 pub fn default_interval(&self) -> Duration {
67 match self {
68 JobType::PatternDetector => Duration::hours(1),
69 JobType::GapIdentifier => Duration::days(1),
70 JobType::KnowledgeClassifier => Duration::days(1),
71 JobType::RelationshipMiner => Duration::weeks(1),
72 JobType::ModelConsolidator => Duration::weeks(1),
73 }
74 }
75
76 pub fn name(&self) -> &'static str {
78 match self {
79 JobType::PatternDetector => "Pattern Detector",
80 JobType::GapIdentifier => "Gap Identifier",
81 JobType::KnowledgeClassifier => "Knowledge Classifier",
82 JobType::RelationshipMiner => "Relationship Miner",
83 JobType::ModelConsolidator => "Model Consolidator",
84 }
85 }
86}
87
88#[derive(Debug, Clone, Serialize, Deserialize)]
90pub enum JobStatus {
91 Pending,
93 Running {
95 started_at: DateTime<Utc>,
97 progress: u8,
99 },
100 Completed {
102 finished_at: DateTime<Utc>,
104 duration_secs: u64,
106 items_processed: usize,
108 },
109 Failed {
111 failed_at: DateTime<Utc>,
113 error: String,
115 },
116}
117
118#[derive(Debug, Clone, Serialize, Deserialize)]
120pub struct JobRun {
121 pub id: Uuid,
123 pub job_type: JobType,
125 pub status: JobStatus,
127 pub scheduled_at: DateTime<Utc>,
129}
130
131#[derive(Debug, Clone, Serialize, Deserialize)]
133pub enum Insight {
134 QueryPattern {
136 description: String,
138 example_queries: Vec<String>,
140 frequency: f32,
142 related_entries: Vec<Uuid>,
144 },
145
146 KnowledgeGap {
148 topic: String,
150 unresolved_queries: Vec<String>,
152 suggestions: Vec<String>,
154 severity: f32,
156 },
157
158 Classification {
160 entry_id: Uuid,
162 class: KnowledgeClass,
164 confidence: f32,
166 reason: String,
168 },
169
170 Relationship {
172 source_id: Uuid,
174 target_id: Uuid,
176 relationship: RelationshipType,
178 strength: f32,
180 },
181
182 HotTopic {
184 topic: String,
186 entry_ids: Vec<Uuid>,
188 interest_score: f32,
190 trend: Trend,
192 },
193}
194
195#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
197pub enum KnowledgeClass {
198 Core,
200 Derived,
202 Contextual,
204 Ephemeral,
206}
207
208#[derive(Debug, Clone, Serialize, Deserialize)]
210pub enum RelationshipType {
211 Extends,
213 Contradicts,
215 Prerequisite,
217 Related,
219 Supersedes,
221 CoAccessed,
223}
224
225#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
227pub enum Trend {
228 Rising,
230 Stable,
232 Falling,
234}
235
236pub struct InsightStore {
238 insights: RwLock<Vec<Insight>>,
240 by_entry: RwLock<HashMap<Uuid, Vec<usize>>>,
242 last_updated: RwLock<DateTime<Utc>>,
244}
245
246impl InsightStore {
247 pub fn new() -> Self {
249 Self {
250 insights: RwLock::new(Vec::new()),
251 by_entry: RwLock::new(HashMap::new()),
252 last_updated: RwLock::new(Utc::now()),
253 }
254 }
255
256 pub async fn add(&self, insight: Insight) {
258 let mut insights = self.insights.write().await;
259 let idx = insights.len();
260
261 let entry_ids = Self::extract_entry_ids(&insight);
263 {
264 let mut by_entry = self.by_entry.write().await;
265 for id in entry_ids {
266 by_entry.entry(id).or_default().push(idx);
267 }
268 }
269
270 insights.push(insight);
271 *self.last_updated.write().await = Utc::now();
272 }
273
274 pub async fn for_entry(&self, entry_id: Uuid) -> Vec<Insight> {
276 let by_entry = self.by_entry.read().await;
277 let insights = self.insights.read().await;
278
279 by_entry
280 .get(&entry_id)
281 .map(|indices| {
282 indices
283 .iter()
284 .filter_map(|&i| insights.get(i).cloned())
285 .collect()
286 })
287 .unwrap_or_default()
288 }
289
290 pub async fn gaps(&self) -> Vec<Insight> {
292 self.insights
293 .read()
294 .await
295 .iter()
296 .filter(|i| matches!(i, Insight::KnowledgeGap { .. }))
297 .cloned()
298 .collect()
299 }
300
301 pub async fn hot_topics(&self) -> Vec<Insight> {
303 self.insights
304 .read()
305 .await
306 .iter()
307 .filter(|i| matches!(i, Insight::HotTopic { .. }))
308 .cloned()
309 .collect()
310 }
311
312 pub async fn classification(&self, entry_id: Uuid) -> Option<KnowledgeClass> {
314 self.for_entry(entry_id).await.into_iter().find_map(|i| {
315 if let Insight::Classification { class, .. } = i {
316 Some(class)
317 } else {
318 None
319 }
320 })
321 }
322
323 pub async fn clear(&self) {
325 self.insights.write().await.clear();
326 self.by_entry.write().await.clear();
327 }
328
329 fn extract_entry_ids(insight: &Insight) -> Vec<Uuid> {
330 match insight {
331 Insight::QueryPattern {
332 related_entries, ..
333 } => related_entries.clone(),
334 Insight::KnowledgeGap { .. } => vec![],
335 Insight::Classification { entry_id, .. } => vec![*entry_id],
336 Insight::Relationship {
337 source_id,
338 target_id,
339 ..
340 } => vec![*source_id, *target_id],
341 Insight::HotTopic { entry_ids, .. } => entry_ids.clone(),
342 }
343 }
344}
345
346impl Default for InsightStore {
347 fn default() -> Self {
348 Self::new()
349 }
350}
351
352pub struct BatchInput {
354 pub signals: Vec<FeedbackSignal>,
356 pub processed_feedback: Vec<ProcessedFeedback>,
358 pub entry_metadata: HashMap<Uuid, EntryMetadata>,
360 pub entry_embeddings: HashMap<Uuid, Vec<f32>>,
362 pub relationships: Vec<(Uuid, Uuid, f32)>,
364}
365
366#[derive(Debug, Clone)]
368pub struct EntryMetadata {
369 pub id: Uuid,
371 pub created_at: DateTime<Utc>,
373 pub last_accessed: DateTime<Utc>,
375 pub access_count: u64,
377 pub relevance_score: f32,
379 pub tags: Vec<String>,
381 pub category: Option<String>,
383}
384
385#[async_trait::async_trait]
387pub trait BatchJob: Send + Sync {
388 fn job_type(&self) -> JobType;
390
391 async fn run(&self, input: &BatchInput) -> Result<Vec<Insight>, String>;
393
394 fn estimated_duration_secs(&self) -> u64 {
396 60
397 }
398}
399
400pub struct PatternDetectorJob {
402 pub min_frequency: f32,
404 pub max_patterns: usize,
406}
407
408impl Default for PatternDetectorJob {
409 fn default() -> Self {
410 Self {
411 min_frequency: 0.05,
412 max_patterns: 20,
413 }
414 }
415}
416
417#[async_trait::async_trait]
418impl BatchJob for PatternDetectorJob {
419 fn job_type(&self) -> JobType {
420 JobType::PatternDetector
421 }
422
423 async fn run(&self, input: &BatchInput) -> Result<Vec<Insight>, String> {
424 let mut insights = Vec::new();
425
426 let queries: Vec<String> = input
428 .signals
429 .iter()
430 .filter_map(|s| match &s.signal {
431 SignalType::Query { text, .. } => Some(text.clone()),
432 _ => None,
433 })
434 .collect();
435
436 if queries.is_empty() {
437 return Ok(insights);
438 }
439
440 let mut term_counts: HashMap<String, Vec<String>> = HashMap::new();
442 for query in &queries {
443 for word in query.to_lowercase().split_whitespace() {
444 if word.len() > 3 {
445 term_counts
446 .entry(word.to_string())
447 .or_default()
448 .push(query.clone());
449 }
450 }
451 }
452
453 let total_queries = queries.len() as f32;
455 let mut patterns: Vec<_> = term_counts
456 .into_iter()
457 .filter(|(_, qs)| qs.len() as f32 / total_queries >= self.min_frequency)
458 .map(|(term, qs)| (term, qs.len() as f32 / total_queries, qs))
459 .collect();
460
461 patterns.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap());
462 patterns.truncate(self.max_patterns);
463
464 for (term, freq, example_queries) in patterns {
465 let related_entries: Vec<Uuid> = input
467 .processed_feedback
468 .iter()
469 .filter(|fb| fb.relevance_delta > 0.0)
470 .map(|fb| fb.entry_id)
471 .collect();
472
473 insights.push(Insight::QueryPattern {
474 description: format!("Queries about '{}'", term),
475 example_queries: example_queries.into_iter().take(5).collect(),
476 frequency: freq,
477 related_entries,
478 });
479 }
480
481 Ok(insights)
482 }
483}
484
485pub struct GapIdentifierJob {
487 pub min_unresolved: usize,
489 pub max_gaps: usize,
491}
492
493impl Default for GapIdentifierJob {
494 fn default() -> Self {
495 Self {
496 min_unresolved: 3,
497 max_gaps: 10,
498 }
499 }
500}
501
502#[async_trait::async_trait]
503impl BatchJob for GapIdentifierJob {
504 fn job_type(&self) -> JobType {
505 JobType::GapIdentifier
506 }
507
508 async fn run(&self, input: &BatchInput) -> Result<Vec<Insight>, String> {
509 let mut insights = Vec::new();
510
511 let mut query_results: HashMap<String, (usize, usize)> = HashMap::new(); for signal in &input.signals {
515 if let SignalType::Query {
516 text, result_ids, ..
517 } = &signal.signal
518 {
519 let entry = query_results.entry(text.clone()).or_insert((0, 0));
520 entry.0 += 1;
521
522 let has_positive = input
524 .processed_feedback
525 .iter()
526 .any(|fb| result_ids.contains(&fb.entry_id) && fb.relevance_delta > 0.0);
527 if has_positive {
528 entry.1 += 1;
529 }
530 }
531 }
532
533 let low_success: Vec<_> = query_results
535 .into_iter()
536 .filter(|(_, (total, positive))| {
537 *total >= self.min_unresolved && (*positive as f32 / *total as f32) < 0.3
538 })
539 .collect();
540
541 if !low_success.is_empty() {
542 let mut by_topic: HashMap<String, Vec<String>> = HashMap::new();
544 for (query, _) in low_success {
545 let topic = query
546 .split_whitespace()
547 .find(|w| w.len() > 3)
548 .unwrap_or("general")
549 .to_string();
550 by_topic.entry(topic).or_default().push(query);
551 }
552
553 for (topic, queries) in by_topic.into_iter().take(self.max_gaps) {
554 let severity = (queries.len() as f32 / 10.0).min(1.0);
555 insights.push(Insight::KnowledgeGap {
556 topic: topic.clone(),
557 unresolved_queries: queries.clone(),
558 suggestions: vec![format!("Add documentation about {}", topic)],
559 severity,
560 });
561 }
562 }
563
564 Ok(insights)
565 }
566}
567
568pub struct KnowledgeClassifierJob {
570 pub core_access_threshold: f32,
572 pub ephemeral_age_days: i64,
574}
575
576impl Default for KnowledgeClassifierJob {
577 fn default() -> Self {
578 Self {
579 core_access_threshold: 1.0,
580 ephemeral_age_days: 7,
581 }
582 }
583}
584
585#[async_trait::async_trait]
586impl BatchJob for KnowledgeClassifierJob {
587 fn job_type(&self) -> JobType {
588 JobType::KnowledgeClassifier
589 }
590
591 async fn run(&self, input: &BatchInput) -> Result<Vec<Insight>, String> {
592 let mut insights = Vec::new();
593 let now = Utc::now();
594
595 for (id, meta) in &input.entry_metadata {
596 let age_days = (now - meta.created_at).num_days().max(1);
597 let access_rate = meta.access_count as f32 / age_days as f32;
598
599 let (class, confidence, reason) =
600 if access_rate >= self.core_access_threshold && age_days > 30 {
601 (
602 KnowledgeClass::Core,
603 0.8,
604 "High access rate over extended period".to_string(),
605 )
606 } else if age_days <= self.ephemeral_age_days && meta.access_count <= 2 {
607 (
608 KnowledgeClass::Ephemeral,
609 0.6,
610 "New entry with limited access".to_string(),
611 )
612 } else if meta
613 .tags
614 .iter()
615 .any(|t| t.contains("derived") || t.contains("computed"))
616 {
617 (
618 KnowledgeClass::Derived,
619 0.7,
620 "Tagged as derived knowledge".to_string(),
621 )
622 } else if meta
623 .category
624 .as_ref()
625 .is_some_and(|c| c.contains("project"))
626 {
627 (
628 KnowledgeClass::Contextual,
629 0.7,
630 "Project-specific content".to_string(),
631 )
632 } else {
633 (
634 KnowledgeClass::Derived,
635 0.4,
636 "Default classification".to_string(),
637 )
638 };
639
640 insights.push(Insight::Classification {
641 entry_id: *id,
642 class,
643 confidence,
644 reason,
645 });
646 }
647
648 Ok(insights)
649 }
650}
651
652pub struct RelationshipMinerJob {
654 pub min_co_access: usize,
656 pub min_similarity: f32,
658}
659
660impl Default for RelationshipMinerJob {
661 fn default() -> Self {
662 Self {
663 min_co_access: 3,
664 min_similarity: 0.7,
665 }
666 }
667}
668
669#[async_trait::async_trait]
670impl BatchJob for RelationshipMinerJob {
671 fn job_type(&self) -> JobType {
672 JobType::RelationshipMiner
673 }
674
675 async fn run(&self, input: &BatchInput) -> Result<Vec<Insight>, String> {
676 let mut insights = Vec::new();
677
678 let mut co_access_counts: HashMap<(Uuid, Uuid), usize> = HashMap::new();
680
681 for signal in &input.signals {
682 if let SignalType::CoAccess { entry_ids } = &signal.signal {
683 for i in 0..entry_ids.len() {
684 for j in (i + 1)..entry_ids.len() {
685 let pair = if entry_ids[i] < entry_ids[j] {
686 (entry_ids[i], entry_ids[j])
687 } else {
688 (entry_ids[j], entry_ids[i])
689 };
690 *co_access_counts.entry(pair).or_insert(0) += 1;
691 }
692 }
693 }
694 }
695
696 for ((id1, id2), count) in co_access_counts {
698 if count >= self.min_co_access {
699 let strength = (count as f32 / 10.0).min(1.0);
700 insights.push(Insight::Relationship {
701 source_id: id1,
702 target_id: id2,
703 relationship: RelationshipType::CoAccessed,
704 strength,
705 });
706 }
707 }
708
709 let entries: Vec<_> = input.entry_embeddings.iter().collect();
711 for i in 0..entries.len() {
712 for j in (i + 1)..entries.len() {
713 let (id1, emb1) = entries[i];
714 let (id2, emb2) = entries[j];
715
716 let similarity = cosine_similarity(emb1, emb2);
717 if similarity >= self.min_similarity {
718 insights.push(Insight::Relationship {
719 source_id: *id1,
720 target_id: *id2,
721 relationship: RelationshipType::Related,
722 strength: similarity,
723 });
724 }
725 }
726 }
727
728 Ok(insights)
729 }
730
731 fn estimated_duration_secs(&self) -> u64 {
732 300 }
734}
735
736fn cosine_similarity(a: &[f32], b: &[f32]) -> f32 {
738 if a.len() != b.len() {
739 return 0.0;
740 }
741
742 let dot: f32 = a.iter().zip(b.iter()).map(|(x, y)| x * y).sum();
743 let norm_a: f32 = a.iter().map(|x| x * x).sum::<f32>().sqrt();
744 let norm_b: f32 = b.iter().map(|x| x * x).sum::<f32>().sqrt();
745
746 if norm_a == 0.0 || norm_b == 0.0 {
747 0.0
748 } else {
749 dot / (norm_a * norm_b)
750 }
751}
752
753pub struct BatchScheduler {
755 jobs: Vec<Arc<dyn BatchJob>>,
757 history: RwLock<Vec<JobRun>>,
759 insight_store: Arc<InsightStore>,
761 running: AtomicBool,
763 total_runs: AtomicU64,
765}
766
767impl BatchScheduler {
768 pub fn new(insight_store: Arc<InsightStore>) -> Self {
770 Self {
771 jobs: vec![
772 Arc::new(PatternDetectorJob::default()),
773 Arc::new(GapIdentifierJob::default()),
774 Arc::new(KnowledgeClassifierJob::default()),
775 Arc::new(RelationshipMinerJob::default()),
776 ],
777 history: RwLock::new(Vec::new()),
778 insight_store,
779 running: AtomicBool::new(false),
780 total_runs: AtomicU64::new(0),
781 }
782 }
783
784 pub fn add_job(&mut self, job: Arc<dyn BatchJob>) {
786 self.jobs.push(job);
787 }
788
789 pub async fn run_job(&self, job_type: JobType, input: &BatchInput) -> Result<JobRun, String> {
791 let job = self
792 .jobs
793 .iter()
794 .find(|j| j.job_type() == job_type)
795 .ok_or_else(|| format!("Job type {:?} not found", job_type))?;
796
797 let run_id = Uuid::new_v4();
798 let started_at = Utc::now();
799
800 info!(job = %job_type.name(), "Starting batch job");
801
802 let run = JobRun {
803 id: run_id,
804 job_type,
805 status: JobStatus::Running {
806 started_at,
807 progress: 0,
808 },
809 scheduled_at: started_at,
810 };
811
812 self.history.write().await.push(run.clone());
814
815 let result = job.run(input).await;
817
818 let finished_at = Utc::now();
819 let duration_secs = (finished_at - started_at).num_seconds() as u64;
820
821 let final_status = match result {
822 Ok(insights) => {
823 let count = insights.len();
824 for insight in insights {
826 self.insight_store.add(insight).await;
827 }
828
829 info!(job = %job_type.name(), insights = count, duration_secs, "Batch job completed");
830
831 JobStatus::Completed {
832 finished_at,
833 duration_secs,
834 items_processed: count,
835 }
836 }
837 Err(e) => {
838 warn!(job = %job_type.name(), error = %e, "Batch job failed");
839 JobStatus::Failed {
840 failed_at: finished_at,
841 error: e,
842 }
843 }
844 };
845
846 {
848 let mut history = self.history.write().await;
849 if let Some(run) = history.iter_mut().find(|r| r.id == run_id) {
850 run.status = final_status.clone();
851 }
852 }
853
854 self.total_runs.fetch_add(1, Ordering::Relaxed);
855
856 Ok(JobRun {
857 id: run_id,
858 job_type,
859 status: final_status,
860 scheduled_at: started_at,
861 })
862 }
863
864 pub async fn run_all(&self, input: &BatchInput) -> Vec<JobRun> {
866 let mut runs = Vec::new();
867 for job in &self.jobs {
868 if let Ok(run) = self.run_job(job.job_type(), input).await {
869 runs.push(run);
870 }
871 }
872 runs
873 }
874
875 pub async fn history(&self) -> Vec<JobRun> {
877 self.history.read().await.clone()
878 }
879
880 pub async fn last_run(&self, job_type: JobType) -> Option<JobRun> {
882 self.history
883 .read()
884 .await
885 .iter()
886 .filter(|r| r.job_type == job_type)
887 .next_back()
888 .cloned()
889 }
890
891 pub fn total_runs(&self) -> u64 {
893 self.total_runs.load(Ordering::Relaxed)
894 }
895
896 pub fn start_background(
898 self: Arc<Self>,
899 mut input_receiver: mpsc::Receiver<BatchInput>,
900 ) -> tokio::task::JoinHandle<()> {
901 self.running.store(true, Ordering::SeqCst);
902
903 tokio::spawn(async move {
904 let mut check_interval = interval(std::time::Duration::from_secs(60));
905
906 while self.running.load(Ordering::SeqCst) {
907 tokio::select! {
908 Some(input) = input_receiver.recv() => {
909 debug!("Received batch input, running all jobs");
910 self.run_all(&input).await;
911 }
912 _ = check_interval.tick() => {
913 }
915 }
916 }
917 })
918 }
919
920 pub fn stop(&self) {
922 self.running.store(false, Ordering::SeqCst);
923 }
924}
925
926#[cfg(test)]
927mod tests {
928 use super::*;
929
930 #[test]
931 fn test_job_type_intervals() {
932 assert_eq!(
933 JobType::PatternDetector.default_interval(),
934 Duration::hours(1)
935 );
936 assert_eq!(JobType::GapIdentifier.default_interval(), Duration::days(1));
937 }
938
939 #[tokio::test]
940 async fn test_insight_store() {
941 let store = InsightStore::new();
942
943 let entry_id = Uuid::new_v4();
944 store
945 .add(Insight::Classification {
946 entry_id,
947 class: KnowledgeClass::Core,
948 confidence: 0.9,
949 reason: "Test".to_string(),
950 })
951 .await;
952
953 let class = store.classification(entry_id).await;
954 assert_eq!(class, Some(KnowledgeClass::Core));
955 }
956
957 #[tokio::test]
958 async fn test_pattern_detector() {
959 let job = PatternDetectorJob::default();
960
961 let signals = vec![
962 FeedbackSignal::new(
963 super::super::feedback::SessionId::new(),
964 None,
965 SignalType::Query {
966 text: "rust async programming".to_string(),
967 embedding: None,
968 result_ids: vec![],
969 },
970 ),
971 FeedbackSignal::new(
972 super::super::feedback::SessionId::new(),
973 None,
974 SignalType::Query {
975 text: "async rust patterns".to_string(),
976 embedding: None,
977 result_ids: vec![],
978 },
979 ),
980 FeedbackSignal::new(
981 super::super::feedback::SessionId::new(),
982 None,
983 SignalType::Query {
984 text: "rust async await".to_string(),
985 embedding: None,
986 result_ids: vec![],
987 },
988 ),
989 ];
990
991 let input = BatchInput {
992 signals,
993 processed_feedback: vec![],
994 entry_metadata: HashMap::new(),
995 entry_embeddings: HashMap::new(),
996 relationships: vec![],
997 };
998
999 let insights = job.run(&input).await.unwrap();
1000
1001 assert!(!insights.is_empty());
1003 }
1004
1005 #[tokio::test]
1006 async fn test_scheduler_run_job() {
1007 let store = Arc::new(InsightStore::new());
1008 let scheduler = BatchScheduler::new(store.clone());
1009
1010 let input = BatchInput {
1011 signals: vec![],
1012 processed_feedback: vec![],
1013 entry_metadata: HashMap::new(),
1014 entry_embeddings: HashMap::new(),
1015 relationships: vec![],
1016 };
1017
1018 let run = scheduler.run_job(JobType::PatternDetector, &input).await;
1019 assert!(run.is_ok());
1020
1021 let run = run.unwrap();
1022 assert!(matches!(run.status, JobStatus::Completed { .. }));
1023 }
1024
1025 #[test]
1026 fn test_cosine_similarity() {
1027 let a = vec![1.0, 0.0, 0.0];
1028 let b = vec![1.0, 0.0, 0.0];
1029 assert!((cosine_similarity(&a, &b) - 1.0).abs() < 0.001);
1030
1031 let c = vec![0.0, 1.0, 0.0];
1032 assert!((cosine_similarity(&a, &c) - 0.0).abs() < 0.001);
1033 }
1034}