Skip to main content

converge_knowledge/learning/
batch.rs

1//! Batch Learning Jobs
2//!
3//! Background jobs that analyze patterns, identify gaps, classify knowledge,
4//! and enrich the knowledge base over time.
5//!
6//! # Job Types
7//!
8//! | Job | Frequency | Purpose |
9//! |-----|-----------|---------|
10//! | PatternDetector | Hourly | Cluster queries, find hot topics |
11//! | GapIdentifier | Daily | Find missing knowledge areas |
12//! | KnowledgeClassifier | Daily | Core vs Derived classification |
13//! | RelationshipMiner | Weekly | Discover hidden connections |
14//! | ModelConsolidator | Weekly | Update EWC weights, prune old data |
15//!
16//! # Architecture
17//!
18//! ```text
19//! ┌─────────────────────────────────────────────────────┐
20//! │                  BatchScheduler                      │
21//! │  ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐   │
22//! │  │ Pattern │ │   Gap   │ │Classify │ │ Miner   │   │
23//! │  │Detector │ │Identifier│ │  Job   │ │   Job   │   │
24//! │  └────┬────┘ └────┬────┘ └────┬────┘ └────┬────┘   │
25//! │       │           │           │           │         │
26//! │       └───────────┴───────────┴───────────┘         │
27//! │                       │                             │
28//! │                       ▼                             │
29//! │              ┌─────────────────┐                    │
30//! │              │  InsightStore   │                    │
31//! │              │  (publishes to  │                    │
32//! │              │  knowledge base)│                    │
33//! │              └─────────────────┘                    │
34//! └─────────────────────────────────────────────────────┘
35//! ```
36
37use chrono::{DateTime, Duration, Utc};
38use serde::{Deserialize, Serialize};
39use std::collections::HashMap;
40use std::sync::Arc;
41use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
42use tokio::sync::{RwLock, mpsc};
43use tokio::time::interval;
44use tracing::{debug, info, warn};
45use uuid::Uuid;
46
47use super::feedback::{FeedbackSignal, ProcessedFeedback, SignalType};
48
49/// Type of batch job.
50#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
51pub enum JobType {
52    /// Detect query patterns and hot topics.
53    PatternDetector,
54    /// Identify gaps in knowledge coverage.
55    GapIdentifier,
56    /// Classify knowledge as core/derived/contextual.
57    KnowledgeClassifier,
58    /// Mine relationships between entries.
59    RelationshipMiner,
60    /// Consolidate learning models (EWC, pruning).
61    ModelConsolidator,
62}
63
64impl JobType {
65    /// Default interval for this job type.
66    pub fn default_interval(&self) -> Duration {
67        match self {
68            JobType::PatternDetector => Duration::hours(1),
69            JobType::GapIdentifier => Duration::days(1),
70            JobType::KnowledgeClassifier => Duration::days(1),
71            JobType::RelationshipMiner => Duration::weeks(1),
72            JobType::ModelConsolidator => Duration::weeks(1),
73        }
74    }
75
76    /// Human-readable name.
77    pub fn name(&self) -> &'static str {
78        match self {
79            JobType::PatternDetector => "Pattern Detector",
80            JobType::GapIdentifier => "Gap Identifier",
81            JobType::KnowledgeClassifier => "Knowledge Classifier",
82            JobType::RelationshipMiner => "Relationship Miner",
83            JobType::ModelConsolidator => "Model Consolidator",
84        }
85    }
86}
87
88/// Status of a job run.
89#[derive(Debug, Clone, Serialize, Deserialize)]
90pub enum JobStatus {
91    /// Job is scheduled but not started.
92    Pending,
93    /// Job is currently running.
94    Running {
95        /// When it started.
96        started_at: DateTime<Utc>,
97        /// Progress percentage (0-100).
98        progress: u8,
99    },
100    /// Job completed successfully.
101    Completed {
102        /// When it finished.
103        finished_at: DateTime<Utc>,
104        /// Duration in seconds.
105        duration_secs: u64,
106        /// Items processed.
107        items_processed: usize,
108    },
109    /// Job failed.
110    Failed {
111        /// When it failed.
112        failed_at: DateTime<Utc>,
113        /// Error message.
114        error: String,
115    },
116}
117
118/// Record of a job execution.
119#[derive(Debug, Clone, Serialize, Deserialize)]
120pub struct JobRun {
121    /// Unique run ID.
122    pub id: Uuid,
123    /// Job type.
124    pub job_type: JobType,
125    /// Status.
126    pub status: JobStatus,
127    /// When this run was scheduled.
128    pub scheduled_at: DateTime<Utc>,
129}
130
131/// Insight produced by a batch job.
132#[derive(Debug, Clone, Serialize, Deserialize)]
133pub enum Insight {
134    /// A pattern detected in queries.
135    QueryPattern {
136        /// Pattern description.
137        description: String,
138        /// Representative query texts.
139        example_queries: Vec<String>,
140        /// Frequency of this pattern.
141        frequency: f32,
142        /// Associated entry IDs.
143        related_entries: Vec<Uuid>,
144    },
145
146    /// A gap in knowledge coverage.
147    KnowledgeGap {
148        /// Topic or area with gap.
149        topic: String,
150        /// Queries that couldn't be answered well.
151        unresolved_queries: Vec<String>,
152        /// Suggested content to add.
153        suggestions: Vec<String>,
154        /// Severity (0.0 to 1.0).
155        severity: f32,
156    },
157
158    /// Classification of an entry.
159    Classification {
160        /// Entry ID.
161        entry_id: Uuid,
162        /// Knowledge class.
163        class: KnowledgeClass,
164        /// Confidence (0.0 to 1.0).
165        confidence: f32,
166        /// Reasoning.
167        reason: String,
168    },
169
170    /// A discovered relationship.
171    Relationship {
172        /// Source entry.
173        source_id: Uuid,
174        /// Target entry.
175        target_id: Uuid,
176        /// Relationship type.
177        relationship: RelationshipType,
178        /// Strength (0.0 to 1.0).
179        strength: f32,
180    },
181
182    /// Hot topic detection.
183    HotTopic {
184        /// Topic name.
185        topic: String,
186        /// Related entry IDs.
187        entry_ids: Vec<Uuid>,
188        /// Current interest score.
189        interest_score: f32,
190        /// Trend direction.
191        trend: Trend,
192    },
193}
194
195/// Knowledge classification.
196#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
197pub enum KnowledgeClass {
198    /// Fundamental knowledge that rarely changes.
199    Core,
200    /// Knowledge derived from core knowledge.
201    Derived,
202    /// Context-specific knowledge.
203    Contextual,
204    /// Temporary or soon-to-expire knowledge.
205    Ephemeral,
206}
207
208/// Relationship type between entries.
209#[derive(Debug, Clone, Serialize, Deserialize)]
210pub enum RelationshipType {
211    /// One entry extends another.
212    Extends,
213    /// One entry contradicts another.
214    Contradicts,
215    /// One entry is a prerequisite for another.
216    Prerequisite,
217    /// Entries are related but distinct.
218    Related,
219    /// One entry supersedes another.
220    Supersedes,
221    /// Entries are frequently accessed together.
222    CoAccessed,
223}
224
225/// Trend direction.
226#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
227pub enum Trend {
228    /// Interest is increasing.
229    Rising,
230    /// Interest is stable.
231    Stable,
232    /// Interest is decreasing.
233    Falling,
234}
235
236/// Store for insights produced by batch jobs.
237pub struct InsightStore {
238    /// All insights.
239    insights: RwLock<Vec<Insight>>,
240    /// Insights by type for quick lookup.
241    by_entry: RwLock<HashMap<Uuid, Vec<usize>>>,
242    /// When last updated.
243    last_updated: RwLock<DateTime<Utc>>,
244}
245
246impl InsightStore {
247    /// Create a new insight store.
248    pub fn new() -> Self {
249        Self {
250            insights: RwLock::new(Vec::new()),
251            by_entry: RwLock::new(HashMap::new()),
252            last_updated: RwLock::new(Utc::now()),
253        }
254    }
255
256    /// Add an insight.
257    pub async fn add(&self, insight: Insight) {
258        let mut insights = self.insights.write().await;
259        let idx = insights.len();
260
261        // Index by entry ID
262        let entry_ids = Self::extract_entry_ids(&insight);
263        {
264            let mut by_entry = self.by_entry.write().await;
265            for id in entry_ids {
266                by_entry.entry(id).or_default().push(idx);
267            }
268        }
269
270        insights.push(insight);
271        *self.last_updated.write().await = Utc::now();
272    }
273
274    /// Get insights for an entry.
275    pub async fn for_entry(&self, entry_id: Uuid) -> Vec<Insight> {
276        let by_entry = self.by_entry.read().await;
277        let insights = self.insights.read().await;
278
279        by_entry
280            .get(&entry_id)
281            .map(|indices| {
282                indices
283                    .iter()
284                    .filter_map(|&i| insights.get(i).cloned())
285                    .collect()
286            })
287            .unwrap_or_default()
288    }
289
290    /// Get all gaps.
291    pub async fn gaps(&self) -> Vec<Insight> {
292        self.insights
293            .read()
294            .await
295            .iter()
296            .filter(|i| matches!(i, Insight::KnowledgeGap { .. }))
297            .cloned()
298            .collect()
299    }
300
301    /// Get all hot topics.
302    pub async fn hot_topics(&self) -> Vec<Insight> {
303        self.insights
304            .read()
305            .await
306            .iter()
307            .filter(|i| matches!(i, Insight::HotTopic { .. }))
308            .cloned()
309            .collect()
310    }
311
312    /// Get classification for an entry.
313    pub async fn classification(&self, entry_id: Uuid) -> Option<KnowledgeClass> {
314        self.for_entry(entry_id).await.into_iter().find_map(|i| {
315            if let Insight::Classification { class, .. } = i {
316                Some(class)
317            } else {
318                None
319            }
320        })
321    }
322
323    /// Clear old insights.
324    pub async fn clear(&self) {
325        self.insights.write().await.clear();
326        self.by_entry.write().await.clear();
327    }
328
329    fn extract_entry_ids(insight: &Insight) -> Vec<Uuid> {
330        match insight {
331            Insight::QueryPattern {
332                related_entries, ..
333            } => related_entries.clone(),
334            Insight::KnowledgeGap { .. } => vec![],
335            Insight::Classification { entry_id, .. } => vec![*entry_id],
336            Insight::Relationship {
337                source_id,
338                target_id,
339                ..
340            } => vec![*source_id, *target_id],
341            Insight::HotTopic { entry_ids, .. } => entry_ids.clone(),
342        }
343    }
344}
345
346impl Default for InsightStore {
347    fn default() -> Self {
348        Self::new()
349    }
350}
351
352/// Input data for batch jobs.
353pub struct BatchInput {
354    /// Recent feedback signals.
355    pub signals: Vec<FeedbackSignal>,
356    /// Processed feedback.
357    pub processed_feedback: Vec<ProcessedFeedback>,
358    /// Entry metadata (id -> metadata).
359    pub entry_metadata: HashMap<Uuid, EntryMetadata>,
360    /// Entry embeddings (id -> embedding).
361    pub entry_embeddings: HashMap<Uuid, Vec<f32>>,
362    /// Current relationships.
363    pub relationships: Vec<(Uuid, Uuid, f32)>,
364}
365
366/// Metadata about an entry for batch processing.
367#[derive(Debug, Clone)]
368pub struct EntryMetadata {
369    /// Entry ID.
370    pub id: Uuid,
371    /// When created.
372    pub created_at: DateTime<Utc>,
373    /// Last accessed.
374    pub last_accessed: DateTime<Utc>,
375    /// Access count.
376    pub access_count: u64,
377    /// Current relevance score.
378    pub relevance_score: f32,
379    /// Tags.
380    pub tags: Vec<String>,
381    /// Category.
382    pub category: Option<String>,
383}
384
385/// Trait for batch job implementations.
386#[async_trait::async_trait]
387pub trait BatchJob: Send + Sync {
388    /// Job type.
389    fn job_type(&self) -> JobType;
390
391    /// Run the job with given input.
392    async fn run(&self, input: &BatchInput) -> Result<Vec<Insight>, String>;
393
394    /// Estimated duration in seconds.
395    fn estimated_duration_secs(&self) -> u64 {
396        60
397    }
398}
399
400/// Pattern detection job.
401pub struct PatternDetectorJob {
402    /// Minimum query frequency to be a pattern.
403    pub min_frequency: f32,
404    /// Maximum number of patterns to report.
405    pub max_patterns: usize,
406}
407
408impl Default for PatternDetectorJob {
409    fn default() -> Self {
410        Self {
411            min_frequency: 0.05,
412            max_patterns: 20,
413        }
414    }
415}
416
417#[async_trait::async_trait]
418impl BatchJob for PatternDetectorJob {
419    fn job_type(&self) -> JobType {
420        JobType::PatternDetector
421    }
422
423    async fn run(&self, input: &BatchInput) -> Result<Vec<Insight>, String> {
424        let mut insights = Vec::new();
425
426        // Extract query texts and cluster them
427        let queries: Vec<String> = input
428            .signals
429            .iter()
430            .filter_map(|s| match &s.signal {
431                SignalType::Query { text, .. } => Some(text.clone()),
432                _ => None,
433            })
434            .collect();
435
436        if queries.is_empty() {
437            return Ok(insights);
438        }
439
440        // Simple clustering by common terms
441        let mut term_counts: HashMap<String, Vec<String>> = HashMap::new();
442        for query in &queries {
443            for word in query.to_lowercase().split_whitespace() {
444                if word.len() > 3 {
445                    term_counts
446                        .entry(word.to_string())
447                        .or_default()
448                        .push(query.clone());
449                }
450            }
451        }
452
453        // Find patterns (terms that appear frequently)
454        let total_queries = queries.len() as f32;
455        let mut patterns: Vec<_> = term_counts
456            .into_iter()
457            .filter(|(_, qs)| qs.len() as f32 / total_queries >= self.min_frequency)
458            .map(|(term, qs)| (term, qs.len() as f32 / total_queries, qs))
459            .collect();
460
461        patterns.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap());
462        patterns.truncate(self.max_patterns);
463
464        for (term, freq, example_queries) in patterns {
465            // Find related entries (those that were viewed/selected for these queries)
466            let related_entries: Vec<Uuid> = input
467                .processed_feedback
468                .iter()
469                .filter(|fb| fb.relevance_delta > 0.0)
470                .map(|fb| fb.entry_id)
471                .collect();
472
473            insights.push(Insight::QueryPattern {
474                description: format!("Queries about '{}'", term),
475                example_queries: example_queries.into_iter().take(5).collect(),
476                frequency: freq,
477                related_entries,
478            });
479        }
480
481        Ok(insights)
482    }
483}
484
485/// Gap identification job.
486pub struct GapIdentifierJob {
487    /// Minimum unresolved queries to report a gap.
488    pub min_unresolved: usize,
489    /// Maximum gaps to report.
490    pub max_gaps: usize,
491}
492
493impl Default for GapIdentifierJob {
494    fn default() -> Self {
495        Self {
496            min_unresolved: 3,
497            max_gaps: 10,
498        }
499    }
500}
501
502#[async_trait::async_trait]
503impl BatchJob for GapIdentifierJob {
504    fn job_type(&self) -> JobType {
505        JobType::GapIdentifier
506    }
507
508    async fn run(&self, input: &BatchInput) -> Result<Vec<Insight>, String> {
509        let mut insights = Vec::new();
510
511        // Find queries with low or no positive feedback
512        let mut query_results: HashMap<String, (usize, usize)> = HashMap::new(); // (total, positive)
513
514        for signal in &input.signals {
515            if let SignalType::Query {
516                text, result_ids, ..
517            } = &signal.signal
518            {
519                let entry = query_results.entry(text.clone()).or_insert((0, 0));
520                entry.0 += 1;
521
522                // Check if any result got positive feedback
523                let has_positive = input
524                    .processed_feedback
525                    .iter()
526                    .any(|fb| result_ids.contains(&fb.entry_id) && fb.relevance_delta > 0.0);
527                if has_positive {
528                    entry.1 += 1;
529                }
530            }
531        }
532
533        // Group low-success queries by topic
534        let low_success: Vec<_> = query_results
535            .into_iter()
536            .filter(|(_, (total, positive))| {
537                *total >= self.min_unresolved && (*positive as f32 / *total as f32) < 0.3
538            })
539            .collect();
540
541        if !low_success.is_empty() {
542            // Simple topic extraction (first significant word)
543            let mut by_topic: HashMap<String, Vec<String>> = HashMap::new();
544            for (query, _) in low_success {
545                let topic = query
546                    .split_whitespace()
547                    .find(|w| w.len() > 3)
548                    .unwrap_or("general")
549                    .to_string();
550                by_topic.entry(topic).or_default().push(query);
551            }
552
553            for (topic, queries) in by_topic.into_iter().take(self.max_gaps) {
554                let severity = (queries.len() as f32 / 10.0).min(1.0);
555                insights.push(Insight::KnowledgeGap {
556                    topic: topic.clone(),
557                    unresolved_queries: queries.clone(),
558                    suggestions: vec![format!("Add documentation about {}", topic)],
559                    severity,
560                });
561            }
562        }
563
564        Ok(insights)
565    }
566}
567
568/// Knowledge classification job.
569pub struct KnowledgeClassifierJob {
570    /// Core knowledge access threshold (accesses per day).
571    pub core_access_threshold: f32,
572    /// Age threshold for ephemeral (days).
573    pub ephemeral_age_days: i64,
574}
575
576impl Default for KnowledgeClassifierJob {
577    fn default() -> Self {
578        Self {
579            core_access_threshold: 1.0,
580            ephemeral_age_days: 7,
581        }
582    }
583}
584
585#[async_trait::async_trait]
586impl BatchJob for KnowledgeClassifierJob {
587    fn job_type(&self) -> JobType {
588        JobType::KnowledgeClassifier
589    }
590
591    async fn run(&self, input: &BatchInput) -> Result<Vec<Insight>, String> {
592        let mut insights = Vec::new();
593        let now = Utc::now();
594
595        for (id, meta) in &input.entry_metadata {
596            let age_days = (now - meta.created_at).num_days().max(1);
597            let access_rate = meta.access_count as f32 / age_days as f32;
598
599            let (class, confidence, reason) =
600                if access_rate >= self.core_access_threshold && age_days > 30 {
601                    (
602                        KnowledgeClass::Core,
603                        0.8,
604                        "High access rate over extended period".to_string(),
605                    )
606                } else if age_days <= self.ephemeral_age_days && meta.access_count <= 2 {
607                    (
608                        KnowledgeClass::Ephemeral,
609                        0.6,
610                        "New entry with limited access".to_string(),
611                    )
612                } else if meta
613                    .tags
614                    .iter()
615                    .any(|t| t.contains("derived") || t.contains("computed"))
616                {
617                    (
618                        KnowledgeClass::Derived,
619                        0.7,
620                        "Tagged as derived knowledge".to_string(),
621                    )
622                } else if meta
623                    .category
624                    .as_ref()
625                    .is_some_and(|c| c.contains("project"))
626                {
627                    (
628                        KnowledgeClass::Contextual,
629                        0.7,
630                        "Project-specific content".to_string(),
631                    )
632                } else {
633                    (
634                        KnowledgeClass::Derived,
635                        0.4,
636                        "Default classification".to_string(),
637                    )
638                };
639
640            insights.push(Insight::Classification {
641                entry_id: *id,
642                class,
643                confidence,
644                reason,
645            });
646        }
647
648        Ok(insights)
649    }
650}
651
652/// Relationship mining job.
653pub struct RelationshipMinerJob {
654    /// Minimum co-access count to create relationship.
655    pub min_co_access: usize,
656    /// Minimum embedding similarity for related.
657    pub min_similarity: f32,
658}
659
660impl Default for RelationshipMinerJob {
661    fn default() -> Self {
662        Self {
663            min_co_access: 3,
664            min_similarity: 0.7,
665        }
666    }
667}
668
669#[async_trait::async_trait]
670impl BatchJob for RelationshipMinerJob {
671    fn job_type(&self) -> JobType {
672        JobType::RelationshipMiner
673    }
674
675    async fn run(&self, input: &BatchInput) -> Result<Vec<Insight>, String> {
676        let mut insights = Vec::new();
677
678        // Count co-accesses from signals
679        let mut co_access_counts: HashMap<(Uuid, Uuid), usize> = HashMap::new();
680
681        for signal in &input.signals {
682            if let SignalType::CoAccess { entry_ids } = &signal.signal {
683                for i in 0..entry_ids.len() {
684                    for j in (i + 1)..entry_ids.len() {
685                        let pair = if entry_ids[i] < entry_ids[j] {
686                            (entry_ids[i], entry_ids[j])
687                        } else {
688                            (entry_ids[j], entry_ids[i])
689                        };
690                        *co_access_counts.entry(pair).or_insert(0) += 1;
691                    }
692                }
693            }
694        }
695
696        // Create co-access relationships
697        for ((id1, id2), count) in co_access_counts {
698            if count >= self.min_co_access {
699                let strength = (count as f32 / 10.0).min(1.0);
700                insights.push(Insight::Relationship {
701                    source_id: id1,
702                    target_id: id2,
703                    relationship: RelationshipType::CoAccessed,
704                    strength,
705                });
706            }
707        }
708
709        // Find similar entries by embedding
710        let entries: Vec<_> = input.entry_embeddings.iter().collect();
711        for i in 0..entries.len() {
712            for j in (i + 1)..entries.len() {
713                let (id1, emb1) = entries[i];
714                let (id2, emb2) = entries[j];
715
716                let similarity = cosine_similarity(emb1, emb2);
717                if similarity >= self.min_similarity {
718                    insights.push(Insight::Relationship {
719                        source_id: *id1,
720                        target_id: *id2,
721                        relationship: RelationshipType::Related,
722                        strength: similarity,
723                    });
724                }
725            }
726        }
727
728        Ok(insights)
729    }
730
731    fn estimated_duration_secs(&self) -> u64 {
732        300 // 5 minutes for relationship mining
733    }
734}
735
736/// Compute cosine similarity between two vectors.
737fn cosine_similarity(a: &[f32], b: &[f32]) -> f32 {
738    if a.len() != b.len() {
739        return 0.0;
740    }
741
742    let dot: f32 = a.iter().zip(b.iter()).map(|(x, y)| x * y).sum();
743    let norm_a: f32 = a.iter().map(|x| x * x).sum::<f32>().sqrt();
744    let norm_b: f32 = b.iter().map(|x| x * x).sum::<f32>().sqrt();
745
746    if norm_a == 0.0 || norm_b == 0.0 {
747        0.0
748    } else {
749        dot / (norm_a * norm_b)
750    }
751}
752
753/// Scheduler for batch jobs.
754pub struct BatchScheduler {
755    /// Available jobs.
756    jobs: Vec<Arc<dyn BatchJob>>,
757    /// Job history.
758    history: RwLock<Vec<JobRun>>,
759    /// Insight store.
760    insight_store: Arc<InsightStore>,
761    /// Whether scheduler is running.
762    running: AtomicBool,
763    /// Total jobs executed.
764    total_runs: AtomicU64,
765}
766
767impl BatchScheduler {
768    /// Create a new scheduler with default jobs.
769    pub fn new(insight_store: Arc<InsightStore>) -> Self {
770        Self {
771            jobs: vec![
772                Arc::new(PatternDetectorJob::default()),
773                Arc::new(GapIdentifierJob::default()),
774                Arc::new(KnowledgeClassifierJob::default()),
775                Arc::new(RelationshipMinerJob::default()),
776            ],
777            history: RwLock::new(Vec::new()),
778            insight_store,
779            running: AtomicBool::new(false),
780            total_runs: AtomicU64::new(0),
781        }
782    }
783
784    /// Add a custom job.
785    pub fn add_job(&mut self, job: Arc<dyn BatchJob>) {
786        self.jobs.push(job);
787    }
788
789    /// Run a specific job type immediately.
790    pub async fn run_job(&self, job_type: JobType, input: &BatchInput) -> Result<JobRun, String> {
791        let job = self
792            .jobs
793            .iter()
794            .find(|j| j.job_type() == job_type)
795            .ok_or_else(|| format!("Job type {:?} not found", job_type))?;
796
797        let run_id = Uuid::new_v4();
798        let started_at = Utc::now();
799
800        info!(job = %job_type.name(), "Starting batch job");
801
802        let run = JobRun {
803            id: run_id,
804            job_type,
805            status: JobStatus::Running {
806                started_at,
807                progress: 0,
808            },
809            scheduled_at: started_at,
810        };
811
812        // Record start
813        self.history.write().await.push(run.clone());
814
815        // Execute
816        let result = job.run(input).await;
817
818        let finished_at = Utc::now();
819        let duration_secs = (finished_at - started_at).num_seconds() as u64;
820
821        let final_status = match result {
822            Ok(insights) => {
823                let count = insights.len();
824                // Publish insights
825                for insight in insights {
826                    self.insight_store.add(insight).await;
827                }
828
829                info!(job = %job_type.name(), insights = count, duration_secs, "Batch job completed");
830
831                JobStatus::Completed {
832                    finished_at,
833                    duration_secs,
834                    items_processed: count,
835                }
836            }
837            Err(e) => {
838                warn!(job = %job_type.name(), error = %e, "Batch job failed");
839                JobStatus::Failed {
840                    failed_at: finished_at,
841                    error: e,
842                }
843            }
844        };
845
846        // Update history
847        {
848            let mut history = self.history.write().await;
849            if let Some(run) = history.iter_mut().find(|r| r.id == run_id) {
850                run.status = final_status.clone();
851            }
852        }
853
854        self.total_runs.fetch_add(1, Ordering::Relaxed);
855
856        Ok(JobRun {
857            id: run_id,
858            job_type,
859            status: final_status,
860            scheduled_at: started_at,
861        })
862    }
863
864    /// Run all jobs.
865    pub async fn run_all(&self, input: &BatchInput) -> Vec<JobRun> {
866        let mut runs = Vec::new();
867        for job in &self.jobs {
868            if let Ok(run) = self.run_job(job.job_type(), input).await {
869                runs.push(run);
870            }
871        }
872        runs
873    }
874
875    /// Get job history.
876    pub async fn history(&self) -> Vec<JobRun> {
877        self.history.read().await.clone()
878    }
879
880    /// Get last run for a job type.
881    pub async fn last_run(&self, job_type: JobType) -> Option<JobRun> {
882        self.history
883            .read()
884            .await
885            .iter()
886            .filter(|r| r.job_type == job_type)
887            .next_back()
888            .cloned()
889    }
890
891    /// Total runs executed.
892    pub fn total_runs(&self) -> u64 {
893        self.total_runs.load(Ordering::Relaxed)
894    }
895
896    /// Start background scheduling (non-blocking).
897    pub fn start_background(
898        self: Arc<Self>,
899        mut input_receiver: mpsc::Receiver<BatchInput>,
900    ) -> tokio::task::JoinHandle<()> {
901        self.running.store(true, Ordering::SeqCst);
902
903        tokio::spawn(async move {
904            let mut check_interval = interval(std::time::Duration::from_secs(60));
905
906            while self.running.load(Ordering::SeqCst) {
907                tokio::select! {
908                    Some(input) = input_receiver.recv() => {
909                        debug!("Received batch input, running all jobs");
910                        self.run_all(&input).await;
911                    }
912                    _ = check_interval.tick() => {
913                        // Periodic maintenance could go here
914                    }
915                }
916            }
917        })
918    }
919
920    /// Stop background scheduling.
921    pub fn stop(&self) {
922        self.running.store(false, Ordering::SeqCst);
923    }
924}
925
926#[cfg(test)]
927mod tests {
928    use super::*;
929
930    #[test]
931    fn test_job_type_intervals() {
932        assert_eq!(
933            JobType::PatternDetector.default_interval(),
934            Duration::hours(1)
935        );
936        assert_eq!(JobType::GapIdentifier.default_interval(), Duration::days(1));
937    }
938
939    #[tokio::test]
940    async fn test_insight_store() {
941        let store = InsightStore::new();
942
943        let entry_id = Uuid::new_v4();
944        store
945            .add(Insight::Classification {
946                entry_id,
947                class: KnowledgeClass::Core,
948                confidence: 0.9,
949                reason: "Test".to_string(),
950            })
951            .await;
952
953        let class = store.classification(entry_id).await;
954        assert_eq!(class, Some(KnowledgeClass::Core));
955    }
956
957    #[tokio::test]
958    async fn test_pattern_detector() {
959        let job = PatternDetectorJob::default();
960
961        let signals = vec![
962            FeedbackSignal::new(
963                super::super::feedback::SessionId::new(),
964                None,
965                SignalType::Query {
966                    text: "rust async programming".to_string(),
967                    embedding: None,
968                    result_ids: vec![],
969                },
970            ),
971            FeedbackSignal::new(
972                super::super::feedback::SessionId::new(),
973                None,
974                SignalType::Query {
975                    text: "async rust patterns".to_string(),
976                    embedding: None,
977                    result_ids: vec![],
978                },
979            ),
980            FeedbackSignal::new(
981                super::super::feedback::SessionId::new(),
982                None,
983                SignalType::Query {
984                    text: "rust async await".to_string(),
985                    embedding: None,
986                    result_ids: vec![],
987                },
988            ),
989        ];
990
991        let input = BatchInput {
992            signals,
993            processed_feedback: vec![],
994            entry_metadata: HashMap::new(),
995            entry_embeddings: HashMap::new(),
996            relationships: vec![],
997        };
998
999        let insights = job.run(&input).await.unwrap();
1000
1001        // Should detect "async" and "rust" as patterns
1002        assert!(!insights.is_empty());
1003    }
1004
1005    #[tokio::test]
1006    async fn test_scheduler_run_job() {
1007        let store = Arc::new(InsightStore::new());
1008        let scheduler = BatchScheduler::new(store.clone());
1009
1010        let input = BatchInput {
1011            signals: vec![],
1012            processed_feedback: vec![],
1013            entry_metadata: HashMap::new(),
1014            entry_embeddings: HashMap::new(),
1015            relationships: vec![],
1016        };
1017
1018        let run = scheduler.run_job(JobType::PatternDetector, &input).await;
1019        assert!(run.is_ok());
1020
1021        let run = run.unwrap();
1022        assert!(matches!(run.status, JobStatus::Completed { .. }));
1023    }
1024
1025    #[test]
1026    fn test_cosine_similarity() {
1027        let a = vec![1.0, 0.0, 0.0];
1028        let b = vec![1.0, 0.0, 0.0];
1029        assert!((cosine_similarity(&a, &b) - 1.0).abs() < 0.001);
1030
1031        let c = vec![0.0, 1.0, 0.0];
1032        assert!((cosine_similarity(&a, &c) - 0.0).abs() < 0.001);
1033    }
1034}