Skip to main content

converge_knowledge/learning/
feedback.rs

1//! Implicit Feedback Collection
2//!
3//! Automatically captures user interaction signals to improve search quality.
4//! Unlike explicit feedback ("this was helpful"), implicit feedback is derived
5//! from natural usage patterns.
6//!
7//! # Signal Types
8//!
9//! | Signal | What It Means | Learning Impact |
10//! |--------|---------------|-----------------|
11//! | Query | User searched for something | Records query patterns |
12//! | View | User looked at a result | Weak positive signal |
13//! | Select | User chose this result | Strong positive signal |
14//! | Ignore | Result shown but not viewed | Weak negative signal |
15//! | Dwell | Time spent on result | Strength of interest |
16//! | FollowUp | Query after viewing result | Indicates gap or relation |
17//! | CoAccess | Items accessed together | Hidden relationship |
18//!
19//! # Example
20//!
21//! ```ignore
22//! use converge_knowledge::learning::FeedbackCollector;
23//!
24//! let collector = FeedbackCollector::new();
25//!
26//! // Start a session
27//! let session = collector.start_session();
28//!
29//! // Track a search
30//! let query_id = collector.record_query(&session, "rust async patterns", &results);
31//!
32//! // User views result #2
33//! collector.record_view(&session, query_id, results[1].entry_id);
34//!
35//! // User selects (clicks, copies, etc.) result #2
36//! collector.record_select(&session, query_id, results[1].entry_id);
37//!
38//! // Later, process accumulated feedback
39//! let signals = collector.drain_signals();
40//! learning_engine.apply_implicit_feedback(signals);
41//! ```
42
43use chrono::{DateTime, Utc};
44use serde::{Deserialize, Serialize};
45use std::collections::{HashMap, VecDeque};
46use std::sync::Arc;
47use std::sync::atomic::{AtomicU64, Ordering};
48use std::time::{Duration, Instant};
49use tokio::sync::RwLock;
50use uuid::Uuid;
51
52/// Session identifier for grouping related interactions.
53#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
54pub struct SessionId(Uuid);
55
56impl SessionId {
57    /// Create a new session ID.
58    pub fn new() -> Self {
59        Self(Uuid::new_v4())
60    }
61}
62
63impl Default for SessionId {
64    fn default() -> Self {
65        Self::new()
66    }
67}
68
69/// Query identifier for tracking result interactions.
70#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
71pub struct QueryId(Uuid);
72
73impl QueryId {
74    /// Create a new query ID.
75    pub fn new() -> Self {
76        Self(Uuid::new_v4())
77    }
78}
79
80impl Default for QueryId {
81    fn default() -> Self {
82        Self::new()
83    }
84}
85
86/// Type of implicit signal captured.
87#[derive(Debug, Clone, Serialize, Deserialize)]
88pub enum SignalType {
89    /// User issued a query.
90    Query {
91        /// The query text.
92        text: String,
93        /// Embedding of the query (if available).
94        embedding: Option<Vec<f32>>,
95        /// IDs of results returned, in order.
96        result_ids: Vec<Uuid>,
97    },
98
99    /// User viewed a result (e.g., expanded, hovered).
100    View {
101        /// The entry that was viewed.
102        entry_id: Uuid,
103        /// Position in the result list (0-indexed).
104        position: usize,
105    },
106
107    /// User selected/used a result (e.g., clicked, copied).
108    Select {
109        /// The entry that was selected.
110        entry_id: Uuid,
111        /// Position in the result list.
112        position: usize,
113    },
114
115    /// User explicitly dismissed a result.
116    Dismiss {
117        /// The entry that was dismissed.
118        entry_id: Uuid,
119        /// Position in the result list.
120        position: usize,
121    },
122
123    /// Time spent on a result (dwell time).
124    Dwell {
125        /// The entry.
126        entry_id: Uuid,
127        /// Time spent in milliseconds.
128        duration_ms: u64,
129    },
130
131    /// User made a follow-up query after viewing results.
132    FollowUp {
133        /// Previous query ID.
134        previous_query: QueryId,
135        /// Entry IDs that were viewed before the follow-up.
136        viewed_entries: Vec<Uuid>,
137    },
138
139    /// Multiple entries accessed in the same session.
140    CoAccess {
141        /// Entries accessed together.
142        entry_ids: Vec<Uuid>,
143    },
144
145    /// Session ended - compute final signals.
146    SessionEnd {
147        /// Total session duration in seconds.
148        duration_secs: u64,
149        /// Number of queries in session.
150        query_count: usize,
151    },
152}
153
154/// A captured feedback signal with context.
155#[derive(Debug, Clone, Serialize, Deserialize)]
156pub struct FeedbackSignal {
157    /// Unique signal ID.
158    pub id: Uuid,
159    /// Session this signal belongs to.
160    pub session_id: SessionId,
161    /// Query this signal relates to (if any).
162    pub query_id: Option<QueryId>,
163    /// The signal type and data.
164    pub signal: SignalType,
165    /// When this signal was captured.
166    pub timestamp: DateTime<Utc>,
167}
168
169impl FeedbackSignal {
170    /// Create a new feedback signal.
171    pub fn new(session_id: SessionId, query_id: Option<QueryId>, signal: SignalType) -> Self {
172        Self {
173            id: Uuid::new_v4(),
174            session_id,
175            query_id,
176            signal,
177            timestamp: Utc::now(),
178        }
179    }
180}
181
182/// Active session state.
183struct ActiveSession {
184    /// Session ID.
185    id: SessionId,
186    /// When session started.
187    started_at: Instant,
188    /// Queries in this session.
189    queries: Vec<QueryId>,
190    /// Entries viewed in this session.
191    viewed_entries: Vec<Uuid>,
192    /// Entries selected in this session.
193    selected_entries: Vec<Uuid>,
194    /// Last activity time.
195    last_activity: Instant,
196    /// Current query context.
197    current_query: Option<QueryContext>,
198}
199
200/// Context for the current query.
201#[allow(dead_code)]
202struct QueryContext {
203    /// Query ID.
204    id: QueryId,
205    /// Query text (reserved for future query-text analytics).
206    text: String,
207    /// Result IDs returned (reserved for future result-tracking analytics).
208    result_ids: Vec<Uuid>,
209    /// When query was issued (reserved for future temporal analytics).
210    timestamp: Instant,
211    /// Entries viewed for this query.
212    viewed: Vec<Uuid>,
213}
214
215impl ActiveSession {
216    fn new() -> Self {
217        let now = Instant::now();
218        Self {
219            id: SessionId::new(),
220            started_at: now,
221            queries: Vec::new(),
222            viewed_entries: Vec::new(),
223            selected_entries: Vec::new(),
224            last_activity: now,
225            current_query: None,
226        }
227    }
228
229    fn touch(&mut self) {
230        self.last_activity = Instant::now();
231    }
232
233    fn is_expired(&self, timeout: Duration) -> bool {
234        self.last_activity.elapsed() > timeout
235    }
236}
237
238/// Configuration for the feedback collector.
239#[derive(Debug, Clone)]
240pub struct FeedbackConfig {
241    /// Session timeout (inactive duration before session ends).
242    pub session_timeout: Duration,
243    /// Maximum signals to buffer before forcing drain.
244    pub max_buffer_size: usize,
245    /// Minimum dwell time to record (filters accidental views).
246    pub min_dwell_ms: u64,
247    /// Whether to automatically compute co-access signals.
248    pub compute_co_access: bool,
249    /// Minimum views for co-access signal.
250    pub co_access_min_views: usize,
251}
252
253impl Default for FeedbackConfig {
254    fn default() -> Self {
255        Self {
256            session_timeout: Duration::from_secs(30 * 60), // 30 minutes
257            max_buffer_size: 10_000,
258            min_dwell_ms: 500, // Half second minimum
259            compute_co_access: true,
260            co_access_min_views: 2,
261        }
262    }
263}
264
265/// Collector for implicit feedback signals.
266///
267/// Thread-safe and designed for concurrent access from multiple queries.
268pub struct FeedbackCollector {
269    config: FeedbackConfig,
270    /// Active sessions by ID.
271    sessions: Arc<RwLock<HashMap<SessionId, ActiveSession>>>,
272    /// Buffered signals waiting to be processed.
273    signals: Arc<RwLock<VecDeque<FeedbackSignal>>>,
274    /// Total signals collected.
275    total_signals: AtomicU64,
276    /// Total sessions created.
277    total_sessions: AtomicU64,
278}
279
280impl FeedbackCollector {
281    /// Create a new feedback collector with default config.
282    pub fn new() -> Self {
283        Self::with_config(FeedbackConfig::default())
284    }
285
286    /// Create with custom configuration.
287    pub fn with_config(config: FeedbackConfig) -> Self {
288        Self {
289            config,
290            sessions: Arc::new(RwLock::new(HashMap::new())),
291            signals: Arc::new(RwLock::new(VecDeque::new())),
292            total_signals: AtomicU64::new(0),
293            total_sessions: AtomicU64::new(0),
294        }
295    }
296
297    /// Start a new session.
298    pub async fn start_session(&self) -> SessionId {
299        let session = ActiveSession::new();
300        let id = session.id;
301
302        let mut sessions = self.sessions.write().await;
303        sessions.insert(id, session);
304        self.total_sessions.fetch_add(1, Ordering::Relaxed);
305
306        id
307    }
308
309    /// Get or create a session (for stateless APIs).
310    pub async fn get_or_create_session(&self, session_id: Option<SessionId>) -> SessionId {
311        if let Some(id) = session_id {
312            let sessions = self.sessions.read().await;
313            if sessions.contains_key(&id) {
314                return id;
315            }
316        }
317        self.start_session().await
318    }
319
320    /// Record a query and its results.
321    pub async fn record_query(
322        &self,
323        session_id: SessionId,
324        query_text: &str,
325        result_ids: Vec<Uuid>,
326        query_embedding: Option<Vec<f32>>,
327    ) -> QueryId {
328        let query_id = QueryId::new();
329
330        // Update session state
331        {
332            let mut sessions = self.sessions.write().await;
333            if let Some(session) = sessions.get_mut(&session_id) {
334                // Check if this is a follow-up query
335                if let Some(prev_query) = &session.current_query {
336                    if !prev_query.viewed.is_empty() {
337                        self.emit_signal(FeedbackSignal::new(
338                            session_id,
339                            Some(query_id),
340                            SignalType::FollowUp {
341                                previous_query: prev_query.id,
342                                viewed_entries: prev_query.viewed.clone(),
343                            },
344                        ))
345                        .await;
346                    }
347                }
348
349                // Set new query context
350                session.current_query = Some(QueryContext {
351                    id: query_id,
352                    text: query_text.to_string(),
353                    result_ids: result_ids.clone(),
354                    timestamp: Instant::now(),
355                    viewed: Vec::new(),
356                });
357                session.queries.push(query_id);
358                session.touch();
359            }
360        }
361
362        // Emit query signal
363        self.emit_signal(FeedbackSignal::new(
364            session_id,
365            Some(query_id),
366            SignalType::Query {
367                text: query_text.to_string(),
368                embedding: query_embedding,
369                result_ids,
370            },
371        ))
372        .await;
373
374        query_id
375    }
376
377    /// Record that a user viewed a result.
378    pub async fn record_view(&self, session_id: SessionId, entry_id: Uuid, position: usize) {
379        let query_id = {
380            let mut sessions = self.sessions.write().await;
381            if let Some(session) = sessions.get_mut(&session_id) {
382                session.viewed_entries.push(entry_id);
383                session.touch();
384
385                if let Some(query) = &mut session.current_query {
386                    query.viewed.push(entry_id);
387                    Some(query.id)
388                } else {
389                    None
390                }
391            } else {
392                None
393            }
394        };
395
396        self.emit_signal(FeedbackSignal::new(
397            session_id,
398            query_id,
399            SignalType::View { entry_id, position },
400        ))
401        .await;
402    }
403
404    /// Record that a user selected/used a result.
405    pub async fn record_select(&self, session_id: SessionId, entry_id: Uuid, position: usize) {
406        let query_id = {
407            let mut sessions = self.sessions.write().await;
408            if let Some(session) = sessions.get_mut(&session_id) {
409                session.selected_entries.push(entry_id);
410                session.touch();
411                session.current_query.as_ref().map(|q| q.id)
412            } else {
413                None
414            }
415        };
416
417        self.emit_signal(FeedbackSignal::new(
418            session_id,
419            query_id,
420            SignalType::Select { entry_id, position },
421        ))
422        .await;
423    }
424
425    /// Record that a user dismissed a result.
426    pub async fn record_dismiss(&self, session_id: SessionId, entry_id: Uuid, position: usize) {
427        let query_id = {
428            let sessions = self.sessions.read().await;
429            sessions
430                .get(&session_id)
431                .and_then(|s| s.current_query.as_ref().map(|q| q.id))
432        };
433
434        self.emit_signal(FeedbackSignal::new(
435            session_id,
436            query_id,
437            SignalType::Dismiss { entry_id, position },
438        ))
439        .await;
440    }
441
442    /// Record dwell time on a result.
443    pub async fn record_dwell(&self, session_id: SessionId, entry_id: Uuid, duration_ms: u64) {
444        // Filter out accidental/brief views
445        if duration_ms < self.config.min_dwell_ms {
446            return;
447        }
448
449        let query_id = {
450            let sessions = self.sessions.read().await;
451            sessions
452                .get(&session_id)
453                .and_then(|s| s.current_query.as_ref().map(|q| q.id))
454        };
455
456        self.emit_signal(FeedbackSignal::new(
457            session_id,
458            query_id,
459            SignalType::Dwell {
460                entry_id,
461                duration_ms,
462            },
463        ))
464        .await;
465    }
466
467    /// End a session explicitly.
468    pub async fn end_session(&self, session_id: SessionId) {
469        let session_data = {
470            let mut sessions = self.sessions.write().await;
471            sessions.remove(&session_id)
472        };
473
474        if let Some(session) = session_data {
475            // Emit session end signal
476            self.emit_signal(FeedbackSignal::new(
477                session_id,
478                None,
479                SignalType::SessionEnd {
480                    duration_secs: session.started_at.elapsed().as_secs(),
481                    query_count: session.queries.len(),
482                },
483            ))
484            .await;
485
486            // Compute co-access if enabled
487            if self.config.compute_co_access
488                && session.viewed_entries.len() >= self.config.co_access_min_views
489            {
490                self.emit_signal(FeedbackSignal::new(
491                    session_id,
492                    None,
493                    SignalType::CoAccess {
494                        entry_ids: session.viewed_entries,
495                    },
496                ))
497                .await;
498            }
499        }
500    }
501
502    /// Clean up expired sessions.
503    pub async fn cleanup_expired_sessions(&self) {
504        let expired: Vec<SessionId> = {
505            let sessions = self.sessions.read().await;
506            sessions
507                .iter()
508                .filter(|(_, s)| s.is_expired(self.config.session_timeout))
509                .map(|(id, _)| *id)
510                .collect()
511        };
512
513        for session_id in expired {
514            self.end_session(session_id).await;
515        }
516    }
517
518    /// Drain all buffered signals for processing.
519    pub async fn drain_signals(&self) -> Vec<FeedbackSignal> {
520        let mut signals = self.signals.write().await;
521        signals.drain(..).collect()
522    }
523
524    /// Get number of buffered signals.
525    pub async fn pending_signals(&self) -> usize {
526        self.signals.read().await.len()
527    }
528
529    /// Get total signals collected.
530    pub fn total_signals(&self) -> u64 {
531        self.total_signals.load(Ordering::Relaxed)
532    }
533
534    /// Get total sessions created.
535    pub fn total_sessions(&self) -> u64 {
536        self.total_sessions.load(Ordering::Relaxed)
537    }
538
539    /// Get number of active sessions.
540    pub async fn active_sessions(&self) -> usize {
541        self.sessions.read().await.len()
542    }
543
544    /// Internal: emit a signal to the buffer.
545    async fn emit_signal(&self, signal: FeedbackSignal) {
546        let mut signals = self.signals.write().await;
547        signals.push_back(signal);
548        self.total_signals.fetch_add(1, Ordering::Relaxed);
549
550        // Auto-cleanup if buffer is too large
551        if signals.len() > self.config.max_buffer_size {
552            // Remove oldest 10%
553            let to_remove = self.config.max_buffer_size / 10;
554            for _ in 0..to_remove {
555                signals.pop_front();
556            }
557        }
558    }
559}
560
561impl Default for FeedbackCollector {
562    fn default() -> Self {
563        Self::new()
564    }
565}
566
567/// Processed feedback ready for the learning engine.
568#[derive(Debug, Clone)]
569pub struct ProcessedFeedback {
570    /// Entry ID.
571    pub entry_id: Uuid,
572    /// Relevance delta (-1.0 to 1.0).
573    pub relevance_delta: f32,
574    /// Query embedding this feedback relates to.
575    pub query_embedding: Option<Vec<f32>>,
576    /// Confidence in this signal (0.0 to 1.0).
577    pub confidence: f32,
578}
579
580/// Process raw signals into learning updates.
581pub struct FeedbackProcessor {
582    /// Weight for view signals.
583    pub view_weight: f32,
584    /// Weight for select signals.
585    pub select_weight: f32,
586    /// Weight for dismiss signals.
587    pub dismiss_weight: f32,
588    /// Position decay factor (earlier positions worth more).
589    pub position_decay: f32,
590    /// Dwell time normalization (milliseconds for full weight).
591    pub dwell_normalization_ms: f32,
592}
593
594impl Default for FeedbackProcessor {
595    fn default() -> Self {
596        Self {
597            view_weight: 0.1,
598            select_weight: 0.5,
599            dismiss_weight: -0.3,
600            position_decay: 0.9,
601            dwell_normalization_ms: 30_000.0, // 30 seconds
602        }
603    }
604}
605
606impl FeedbackProcessor {
607    /// Process signals into learning updates.
608    pub fn process(&self, signals: Vec<FeedbackSignal>) -> Vec<ProcessedFeedback> {
609        let mut feedback = Vec::new();
610        let mut query_embeddings: HashMap<QueryId, Option<Vec<f32>>> = HashMap::new();
611
612        // First pass: collect query embeddings
613        for signal in &signals {
614            if let SignalType::Query {
615                embedding,
616                result_ids: _,
617                text: _,
618            } = &signal.signal
619            {
620                if let Some(query_id) = signal.query_id {
621                    query_embeddings.insert(query_id, embedding.clone());
622                }
623            }
624        }
625
626        // Second pass: process interaction signals
627        for signal in signals {
628            let query_embedding = signal
629                .query_id
630                .and_then(|qid| query_embeddings.get(&qid).cloned())
631                .flatten();
632
633            match signal.signal {
634                SignalType::View { entry_id, position } => {
635                    let position_factor = self.position_decay.powi(position as i32);
636                    feedback.push(ProcessedFeedback {
637                        entry_id,
638                        relevance_delta: self.view_weight * position_factor,
639                        query_embedding,
640                        confidence: 0.3 * position_factor,
641                    });
642                }
643
644                SignalType::Select { entry_id, position } => {
645                    let position_factor = self.position_decay.powi(position as i32);
646                    feedback.push(ProcessedFeedback {
647                        entry_id,
648                        relevance_delta: self.select_weight * position_factor,
649                        query_embedding,
650                        confidence: 0.8,
651                    });
652                }
653
654                SignalType::Dismiss { entry_id, position } => {
655                    let position_factor = self.position_decay.powi(position as i32);
656                    feedback.push(ProcessedFeedback {
657                        entry_id,
658                        relevance_delta: self.dismiss_weight * position_factor,
659                        query_embedding,
660                        confidence: 0.6,
661                    });
662                }
663
664                SignalType::Dwell {
665                    entry_id,
666                    duration_ms,
667                } => {
668                    let dwell_factor = (duration_ms as f32 / self.dwell_normalization_ms).min(1.0);
669                    feedback.push(ProcessedFeedback {
670                        entry_id,
671                        relevance_delta: self.select_weight * dwell_factor,
672                        query_embedding,
673                        confidence: 0.5 * dwell_factor,
674                    });
675                }
676
677                SignalType::CoAccess { entry_ids } => {
678                    // For co-access, create pairwise positive signals
679                    // (handled separately by the learning engine)
680                    for i in 0..entry_ids.len() {
681                        for j in (i + 1)..entry_ids.len() {
682                            feedback.push(ProcessedFeedback {
683                                entry_id: entry_ids[i],
684                                relevance_delta: 0.2, // Moderate co-access boost
685                                query_embedding: None,
686                                confidence: 0.4,
687                            });
688                            feedback.push(ProcessedFeedback {
689                                entry_id: entry_ids[j],
690                                relevance_delta: 0.2,
691                                query_embedding: None,
692                                confidence: 0.4,
693                            });
694                        }
695                    }
696                }
697
698                // Query, FollowUp, SessionEnd don't directly produce ProcessedFeedback
699                _ => {}
700            }
701        }
702
703        // Aggregate feedback for same entry
704        self.aggregate_feedback(feedback)
705    }
706
707    /// Aggregate multiple feedback signals for the same entry.
708    fn aggregate_feedback(&self, feedback: Vec<ProcessedFeedback>) -> Vec<ProcessedFeedback> {
709        let mut by_entry: HashMap<Uuid, Vec<ProcessedFeedback>> = HashMap::new();
710
711        for fb in feedback {
712            by_entry.entry(fb.entry_id).or_default().push(fb);
713        }
714
715        by_entry
716            .into_iter()
717            .map(|(entry_id, signals)| {
718                let total_weight: f32 = signals.iter().map(|s| s.confidence).sum();
719                let weighted_delta: f32 = signals
720                    .iter()
721                    .map(|s| s.relevance_delta * s.confidence)
722                    .sum::<f32>()
723                    / total_weight.max(0.001);
724
725                let avg_confidence = total_weight / signals.len() as f32;
726
727                // Use the last query embedding (most recent context)
728                let query_embedding = signals.into_iter().rev().find_map(|s| s.query_embedding);
729
730                ProcessedFeedback {
731                    entry_id,
732                    relevance_delta: weighted_delta,
733                    query_embedding,
734                    confidence: avg_confidence,
735                }
736            })
737            .collect()
738    }
739}
740
741#[cfg(test)]
742mod tests {
743    use super::*;
744
745    #[tokio::test]
746    async fn test_session_lifecycle() {
747        let collector = FeedbackCollector::new();
748
749        let session = collector.start_session().await;
750        assert_eq!(collector.active_sessions().await, 1);
751
752        collector.end_session(session).await;
753        assert_eq!(collector.active_sessions().await, 0);
754    }
755
756    #[tokio::test]
757    async fn test_query_recording() {
758        let collector = FeedbackCollector::new();
759        let session = collector.start_session().await;
760
761        let entry1 = Uuid::new_v4();
762        let entry2 = Uuid::new_v4();
763
764        let _query_id = collector
765            .record_query(session, "test query", vec![entry1, entry2], None)
766            .await;
767
768        let signals = collector.drain_signals().await;
769        assert_eq!(signals.len(), 1);
770
771        match &signals[0].signal {
772            SignalType::Query {
773                text, result_ids, ..
774            } => {
775                assert_eq!(text, "test query");
776                assert_eq!(result_ids.len(), 2);
777            }
778            _ => panic!("Expected Query signal"),
779        }
780    }
781
782    #[tokio::test]
783    async fn test_view_and_select() {
784        let collector = FeedbackCollector::new();
785        let session = collector.start_session().await;
786
787        let entry1 = Uuid::new_v4();
788        let entry2 = Uuid::new_v4();
789
790        collector
791            .record_query(session, "test", vec![entry1, entry2], None)
792            .await;
793        collector.record_view(session, entry1, 0).await;
794        collector.record_select(session, entry1, 0).await;
795
796        let signals = collector.drain_signals().await;
797        assert_eq!(signals.len(), 3); // query + view + select
798    }
799
800    #[tokio::test]
801    async fn test_follow_up_detection() {
802        let collector = FeedbackCollector::new();
803        let session = collector.start_session().await;
804
805        let entry1 = Uuid::new_v4();
806
807        // First query
808        collector
809            .record_query(session, "first query", vec![entry1], None)
810            .await;
811        collector.record_view(session, entry1, 0).await;
812
813        // Follow-up query
814        collector
815            .record_query(session, "follow up query", vec![], None)
816            .await;
817
818        let signals = collector.drain_signals().await;
819
820        // Should have: query1, view, follow_up, query2
821        let follow_up = signals
822            .iter()
823            .find(|s| matches!(s.signal, SignalType::FollowUp { .. }));
824        assert!(follow_up.is_some());
825    }
826
827    #[tokio::test]
828    async fn test_co_access_on_session_end() {
829        let mut config = FeedbackConfig::default();
830        config.co_access_min_views = 2;
831        let collector = FeedbackCollector::with_config(config);
832
833        let session = collector.start_session().await;
834
835        let entry1 = Uuid::new_v4();
836        let entry2 = Uuid::new_v4();
837
838        collector
839            .record_query(session, "test", vec![entry1, entry2], None)
840            .await;
841        collector.record_view(session, entry1, 0).await;
842        collector.record_view(session, entry2, 1).await;
843
844        collector.end_session(session).await;
845
846        let signals = collector.drain_signals().await;
847
848        let co_access = signals
849            .iter()
850            .find(|s| matches!(s.signal, SignalType::CoAccess { .. }));
851        assert!(co_access.is_some());
852    }
853
854    #[tokio::test]
855    async fn test_dwell_filtering() {
856        let mut config = FeedbackConfig::default();
857        config.min_dwell_ms = 500;
858        let collector = FeedbackCollector::with_config(config);
859
860        let session = collector.start_session().await;
861        let entry = Uuid::new_v4();
862
863        // Too short - should be filtered
864        collector.record_dwell(session, entry, 100).await;
865        assert_eq!(collector.pending_signals().await, 0);
866
867        // Long enough - should be recorded
868        collector.record_dwell(session, entry, 1000).await;
869        assert_eq!(collector.pending_signals().await, 1);
870    }
871
872    #[test]
873    fn test_feedback_processing() {
874        let processor = FeedbackProcessor::default();
875
876        let entry = Uuid::new_v4();
877        let signals = vec![
878            FeedbackSignal::new(
879                SessionId::new(),
880                Some(QueryId::new()),
881                SignalType::View {
882                    entry_id: entry,
883                    position: 0,
884                },
885            ),
886            FeedbackSignal::new(
887                SessionId::new(),
888                Some(QueryId::new()),
889                SignalType::Select {
890                    entry_id: entry,
891                    position: 0,
892                },
893            ),
894        ];
895
896        let processed = processor.process(signals);
897        assert_eq!(processed.len(), 1); // Aggregated
898
899        // Should be positive (view + select)
900        assert!(processed[0].relevance_delta > 0.0);
901    }
902
903    #[test]
904    fn test_position_decay() {
905        let processor = FeedbackProcessor::default();
906
907        let entry1 = Uuid::new_v4();
908        let entry2 = Uuid::new_v4();
909
910        let signals = vec![
911            FeedbackSignal::new(
912                SessionId::new(),
913                Some(QueryId::new()),
914                SignalType::Select {
915                    entry_id: entry1,
916                    position: 0,
917                },
918            ),
919            FeedbackSignal::new(
920                SessionId::new(),
921                Some(QueryId::new()),
922                SignalType::Select {
923                    entry_id: entry2,
924                    position: 5,
925                },
926            ),
927        ];
928
929        let processed = processor.process(signals);
930        assert_eq!(processed.len(), 2);
931
932        let fb1 = processed.iter().find(|p| p.entry_id == entry1).unwrap();
933        let fb2 = processed.iter().find(|p| p.entry_id == entry2).unwrap();
934
935        // Position 0 should have higher delta than position 5
936        assert!(fb1.relevance_delta > fb2.relevance_delta);
937    }
938}