1use chrono::{DateTime, Utc};
44use serde::{Deserialize, Serialize};
45use std::collections::{HashMap, VecDeque};
46use std::sync::Arc;
47use std::sync::atomic::{AtomicU64, Ordering};
48use std::time::{Duration, Instant};
49use tokio::sync::RwLock;
50use uuid::Uuid;
51
52#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
54pub struct SessionId(Uuid);
55
56impl SessionId {
57 pub fn new() -> Self {
59 Self(Uuid::new_v4())
60 }
61}
62
63impl Default for SessionId {
64 fn default() -> Self {
65 Self::new()
66 }
67}
68
69#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
71pub struct QueryId(Uuid);
72
73impl QueryId {
74 pub fn new() -> Self {
76 Self(Uuid::new_v4())
77 }
78}
79
80impl Default for QueryId {
81 fn default() -> Self {
82 Self::new()
83 }
84}
85
86#[derive(Debug, Clone, Serialize, Deserialize)]
88pub enum SignalType {
89 Query {
91 text: String,
93 embedding: Option<Vec<f32>>,
95 result_ids: Vec<Uuid>,
97 },
98
99 View {
101 entry_id: Uuid,
103 position: usize,
105 },
106
107 Select {
109 entry_id: Uuid,
111 position: usize,
113 },
114
115 Dismiss {
117 entry_id: Uuid,
119 position: usize,
121 },
122
123 Dwell {
125 entry_id: Uuid,
127 duration_ms: u64,
129 },
130
131 FollowUp {
133 previous_query: QueryId,
135 viewed_entries: Vec<Uuid>,
137 },
138
139 CoAccess {
141 entry_ids: Vec<Uuid>,
143 },
144
145 SessionEnd {
147 duration_secs: u64,
149 query_count: usize,
151 },
152}
153
154#[derive(Debug, Clone, Serialize, Deserialize)]
156pub struct FeedbackSignal {
157 pub id: Uuid,
159 pub session_id: SessionId,
161 pub query_id: Option<QueryId>,
163 pub signal: SignalType,
165 pub timestamp: DateTime<Utc>,
167}
168
169impl FeedbackSignal {
170 pub fn new(session_id: SessionId, query_id: Option<QueryId>, signal: SignalType) -> Self {
172 Self {
173 id: Uuid::new_v4(),
174 session_id,
175 query_id,
176 signal,
177 timestamp: Utc::now(),
178 }
179 }
180}
181
182struct ActiveSession {
184 id: SessionId,
186 started_at: Instant,
188 queries: Vec<QueryId>,
190 viewed_entries: Vec<Uuid>,
192 selected_entries: Vec<Uuid>,
194 last_activity: Instant,
196 current_query: Option<QueryContext>,
198}
199
200#[allow(dead_code)]
202struct QueryContext {
203 id: QueryId,
205 text: String,
207 result_ids: Vec<Uuid>,
209 timestamp: Instant,
211 viewed: Vec<Uuid>,
213}
214
215impl ActiveSession {
216 fn new() -> Self {
217 let now = Instant::now();
218 Self {
219 id: SessionId::new(),
220 started_at: now,
221 queries: Vec::new(),
222 viewed_entries: Vec::new(),
223 selected_entries: Vec::new(),
224 last_activity: now,
225 current_query: None,
226 }
227 }
228
229 fn touch(&mut self) {
230 self.last_activity = Instant::now();
231 }
232
233 fn is_expired(&self, timeout: Duration) -> bool {
234 self.last_activity.elapsed() > timeout
235 }
236}
237
238#[derive(Debug, Clone)]
240pub struct FeedbackConfig {
241 pub session_timeout: Duration,
243 pub max_buffer_size: usize,
245 pub min_dwell_ms: u64,
247 pub compute_co_access: bool,
249 pub co_access_min_views: usize,
251}
252
253impl Default for FeedbackConfig {
254 fn default() -> Self {
255 Self {
256 session_timeout: Duration::from_secs(30 * 60), max_buffer_size: 10_000,
258 min_dwell_ms: 500, compute_co_access: true,
260 co_access_min_views: 2,
261 }
262 }
263}
264
265pub struct FeedbackCollector {
269 config: FeedbackConfig,
270 sessions: Arc<RwLock<HashMap<SessionId, ActiveSession>>>,
272 signals: Arc<RwLock<VecDeque<FeedbackSignal>>>,
274 total_signals: AtomicU64,
276 total_sessions: AtomicU64,
278}
279
280impl FeedbackCollector {
281 pub fn new() -> Self {
283 Self::with_config(FeedbackConfig::default())
284 }
285
286 pub fn with_config(config: FeedbackConfig) -> Self {
288 Self {
289 config,
290 sessions: Arc::new(RwLock::new(HashMap::new())),
291 signals: Arc::new(RwLock::new(VecDeque::new())),
292 total_signals: AtomicU64::new(0),
293 total_sessions: AtomicU64::new(0),
294 }
295 }
296
297 pub async fn start_session(&self) -> SessionId {
299 let session = ActiveSession::new();
300 let id = session.id;
301
302 let mut sessions = self.sessions.write().await;
303 sessions.insert(id, session);
304 self.total_sessions.fetch_add(1, Ordering::Relaxed);
305
306 id
307 }
308
309 pub async fn get_or_create_session(&self, session_id: Option<SessionId>) -> SessionId {
311 if let Some(id) = session_id {
312 let sessions = self.sessions.read().await;
313 if sessions.contains_key(&id) {
314 return id;
315 }
316 }
317 self.start_session().await
318 }
319
320 pub async fn record_query(
322 &self,
323 session_id: SessionId,
324 query_text: &str,
325 result_ids: Vec<Uuid>,
326 query_embedding: Option<Vec<f32>>,
327 ) -> QueryId {
328 let query_id = QueryId::new();
329
330 {
332 let mut sessions = self.sessions.write().await;
333 if let Some(session) = sessions.get_mut(&session_id) {
334 if let Some(prev_query) = &session.current_query {
336 if !prev_query.viewed.is_empty() {
337 self.emit_signal(FeedbackSignal::new(
338 session_id,
339 Some(query_id),
340 SignalType::FollowUp {
341 previous_query: prev_query.id,
342 viewed_entries: prev_query.viewed.clone(),
343 },
344 ))
345 .await;
346 }
347 }
348
349 session.current_query = Some(QueryContext {
351 id: query_id,
352 text: query_text.to_string(),
353 result_ids: result_ids.clone(),
354 timestamp: Instant::now(),
355 viewed: Vec::new(),
356 });
357 session.queries.push(query_id);
358 session.touch();
359 }
360 }
361
362 self.emit_signal(FeedbackSignal::new(
364 session_id,
365 Some(query_id),
366 SignalType::Query {
367 text: query_text.to_string(),
368 embedding: query_embedding,
369 result_ids,
370 },
371 ))
372 .await;
373
374 query_id
375 }
376
377 pub async fn record_view(&self, session_id: SessionId, entry_id: Uuid, position: usize) {
379 let query_id = {
380 let mut sessions = self.sessions.write().await;
381 if let Some(session) = sessions.get_mut(&session_id) {
382 session.viewed_entries.push(entry_id);
383 session.touch();
384
385 if let Some(query) = &mut session.current_query {
386 query.viewed.push(entry_id);
387 Some(query.id)
388 } else {
389 None
390 }
391 } else {
392 None
393 }
394 };
395
396 self.emit_signal(FeedbackSignal::new(
397 session_id,
398 query_id,
399 SignalType::View { entry_id, position },
400 ))
401 .await;
402 }
403
404 pub async fn record_select(&self, session_id: SessionId, entry_id: Uuid, position: usize) {
406 let query_id = {
407 let mut sessions = self.sessions.write().await;
408 if let Some(session) = sessions.get_mut(&session_id) {
409 session.selected_entries.push(entry_id);
410 session.touch();
411 session.current_query.as_ref().map(|q| q.id)
412 } else {
413 None
414 }
415 };
416
417 self.emit_signal(FeedbackSignal::new(
418 session_id,
419 query_id,
420 SignalType::Select { entry_id, position },
421 ))
422 .await;
423 }
424
425 pub async fn record_dismiss(&self, session_id: SessionId, entry_id: Uuid, position: usize) {
427 let query_id = {
428 let sessions = self.sessions.read().await;
429 sessions
430 .get(&session_id)
431 .and_then(|s| s.current_query.as_ref().map(|q| q.id))
432 };
433
434 self.emit_signal(FeedbackSignal::new(
435 session_id,
436 query_id,
437 SignalType::Dismiss { entry_id, position },
438 ))
439 .await;
440 }
441
442 pub async fn record_dwell(&self, session_id: SessionId, entry_id: Uuid, duration_ms: u64) {
444 if duration_ms < self.config.min_dwell_ms {
446 return;
447 }
448
449 let query_id = {
450 let sessions = self.sessions.read().await;
451 sessions
452 .get(&session_id)
453 .and_then(|s| s.current_query.as_ref().map(|q| q.id))
454 };
455
456 self.emit_signal(FeedbackSignal::new(
457 session_id,
458 query_id,
459 SignalType::Dwell {
460 entry_id,
461 duration_ms,
462 },
463 ))
464 .await;
465 }
466
467 pub async fn end_session(&self, session_id: SessionId) {
469 let session_data = {
470 let mut sessions = self.sessions.write().await;
471 sessions.remove(&session_id)
472 };
473
474 if let Some(session) = session_data {
475 self.emit_signal(FeedbackSignal::new(
477 session_id,
478 None,
479 SignalType::SessionEnd {
480 duration_secs: session.started_at.elapsed().as_secs(),
481 query_count: session.queries.len(),
482 },
483 ))
484 .await;
485
486 if self.config.compute_co_access
488 && session.viewed_entries.len() >= self.config.co_access_min_views
489 {
490 self.emit_signal(FeedbackSignal::new(
491 session_id,
492 None,
493 SignalType::CoAccess {
494 entry_ids: session.viewed_entries,
495 },
496 ))
497 .await;
498 }
499 }
500 }
501
502 pub async fn cleanup_expired_sessions(&self) {
504 let expired: Vec<SessionId> = {
505 let sessions = self.sessions.read().await;
506 sessions
507 .iter()
508 .filter(|(_, s)| s.is_expired(self.config.session_timeout))
509 .map(|(id, _)| *id)
510 .collect()
511 };
512
513 for session_id in expired {
514 self.end_session(session_id).await;
515 }
516 }
517
518 pub async fn drain_signals(&self) -> Vec<FeedbackSignal> {
520 let mut signals = self.signals.write().await;
521 signals.drain(..).collect()
522 }
523
524 pub async fn pending_signals(&self) -> usize {
526 self.signals.read().await.len()
527 }
528
529 pub fn total_signals(&self) -> u64 {
531 self.total_signals.load(Ordering::Relaxed)
532 }
533
534 pub fn total_sessions(&self) -> u64 {
536 self.total_sessions.load(Ordering::Relaxed)
537 }
538
539 pub async fn active_sessions(&self) -> usize {
541 self.sessions.read().await.len()
542 }
543
544 async fn emit_signal(&self, signal: FeedbackSignal) {
546 let mut signals = self.signals.write().await;
547 signals.push_back(signal);
548 self.total_signals.fetch_add(1, Ordering::Relaxed);
549
550 if signals.len() > self.config.max_buffer_size {
552 let to_remove = self.config.max_buffer_size / 10;
554 for _ in 0..to_remove {
555 signals.pop_front();
556 }
557 }
558 }
559}
560
561impl Default for FeedbackCollector {
562 fn default() -> Self {
563 Self::new()
564 }
565}
566
567#[derive(Debug, Clone)]
569pub struct ProcessedFeedback {
570 pub entry_id: Uuid,
572 pub relevance_delta: f32,
574 pub query_embedding: Option<Vec<f32>>,
576 pub confidence: f32,
578}
579
580pub struct FeedbackProcessor {
582 pub view_weight: f32,
584 pub select_weight: f32,
586 pub dismiss_weight: f32,
588 pub position_decay: f32,
590 pub dwell_normalization_ms: f32,
592}
593
594impl Default for FeedbackProcessor {
595 fn default() -> Self {
596 Self {
597 view_weight: 0.1,
598 select_weight: 0.5,
599 dismiss_weight: -0.3,
600 position_decay: 0.9,
601 dwell_normalization_ms: 30_000.0, }
603 }
604}
605
606impl FeedbackProcessor {
607 pub fn process(&self, signals: Vec<FeedbackSignal>) -> Vec<ProcessedFeedback> {
609 let mut feedback = Vec::new();
610 let mut query_embeddings: HashMap<QueryId, Option<Vec<f32>>> = HashMap::new();
611
612 for signal in &signals {
614 if let SignalType::Query {
615 embedding,
616 result_ids: _,
617 text: _,
618 } = &signal.signal
619 {
620 if let Some(query_id) = signal.query_id {
621 query_embeddings.insert(query_id, embedding.clone());
622 }
623 }
624 }
625
626 for signal in signals {
628 let query_embedding = signal
629 .query_id
630 .and_then(|qid| query_embeddings.get(&qid).cloned())
631 .flatten();
632
633 match signal.signal {
634 SignalType::View { entry_id, position } => {
635 let position_factor = self.position_decay.powi(position as i32);
636 feedback.push(ProcessedFeedback {
637 entry_id,
638 relevance_delta: self.view_weight * position_factor,
639 query_embedding,
640 confidence: 0.3 * position_factor,
641 });
642 }
643
644 SignalType::Select { entry_id, position } => {
645 let position_factor = self.position_decay.powi(position as i32);
646 feedback.push(ProcessedFeedback {
647 entry_id,
648 relevance_delta: self.select_weight * position_factor,
649 query_embedding,
650 confidence: 0.8,
651 });
652 }
653
654 SignalType::Dismiss { entry_id, position } => {
655 let position_factor = self.position_decay.powi(position as i32);
656 feedback.push(ProcessedFeedback {
657 entry_id,
658 relevance_delta: self.dismiss_weight * position_factor,
659 query_embedding,
660 confidence: 0.6,
661 });
662 }
663
664 SignalType::Dwell {
665 entry_id,
666 duration_ms,
667 } => {
668 let dwell_factor = (duration_ms as f32 / self.dwell_normalization_ms).min(1.0);
669 feedback.push(ProcessedFeedback {
670 entry_id,
671 relevance_delta: self.select_weight * dwell_factor,
672 query_embedding,
673 confidence: 0.5 * dwell_factor,
674 });
675 }
676
677 SignalType::CoAccess { entry_ids } => {
678 for i in 0..entry_ids.len() {
681 for j in (i + 1)..entry_ids.len() {
682 feedback.push(ProcessedFeedback {
683 entry_id: entry_ids[i],
684 relevance_delta: 0.2, query_embedding: None,
686 confidence: 0.4,
687 });
688 feedback.push(ProcessedFeedback {
689 entry_id: entry_ids[j],
690 relevance_delta: 0.2,
691 query_embedding: None,
692 confidence: 0.4,
693 });
694 }
695 }
696 }
697
698 _ => {}
700 }
701 }
702
703 self.aggregate_feedback(feedback)
705 }
706
707 fn aggregate_feedback(&self, feedback: Vec<ProcessedFeedback>) -> Vec<ProcessedFeedback> {
709 let mut by_entry: HashMap<Uuid, Vec<ProcessedFeedback>> = HashMap::new();
710
711 for fb in feedback {
712 by_entry.entry(fb.entry_id).or_default().push(fb);
713 }
714
715 by_entry
716 .into_iter()
717 .map(|(entry_id, signals)| {
718 let total_weight: f32 = signals.iter().map(|s| s.confidence).sum();
719 let weighted_delta: f32 = signals
720 .iter()
721 .map(|s| s.relevance_delta * s.confidence)
722 .sum::<f32>()
723 / total_weight.max(0.001);
724
725 let avg_confidence = total_weight / signals.len() as f32;
726
727 let query_embedding = signals.into_iter().rev().find_map(|s| s.query_embedding);
729
730 ProcessedFeedback {
731 entry_id,
732 relevance_delta: weighted_delta,
733 query_embedding,
734 confidence: avg_confidence,
735 }
736 })
737 .collect()
738 }
739}
740
741#[cfg(test)]
742mod tests {
743 use super::*;
744
745 #[tokio::test]
746 async fn test_session_lifecycle() {
747 let collector = FeedbackCollector::new();
748
749 let session = collector.start_session().await;
750 assert_eq!(collector.active_sessions().await, 1);
751
752 collector.end_session(session).await;
753 assert_eq!(collector.active_sessions().await, 0);
754 }
755
756 #[tokio::test]
757 async fn test_query_recording() {
758 let collector = FeedbackCollector::new();
759 let session = collector.start_session().await;
760
761 let entry1 = Uuid::new_v4();
762 let entry2 = Uuid::new_v4();
763
764 let _query_id = collector
765 .record_query(session, "test query", vec![entry1, entry2], None)
766 .await;
767
768 let signals = collector.drain_signals().await;
769 assert_eq!(signals.len(), 1);
770
771 match &signals[0].signal {
772 SignalType::Query {
773 text, result_ids, ..
774 } => {
775 assert_eq!(text, "test query");
776 assert_eq!(result_ids.len(), 2);
777 }
778 _ => panic!("Expected Query signal"),
779 }
780 }
781
782 #[tokio::test]
783 async fn test_view_and_select() {
784 let collector = FeedbackCollector::new();
785 let session = collector.start_session().await;
786
787 let entry1 = Uuid::new_v4();
788 let entry2 = Uuid::new_v4();
789
790 collector
791 .record_query(session, "test", vec![entry1, entry2], None)
792 .await;
793 collector.record_view(session, entry1, 0).await;
794 collector.record_select(session, entry1, 0).await;
795
796 let signals = collector.drain_signals().await;
797 assert_eq!(signals.len(), 3); }
799
800 #[tokio::test]
801 async fn test_follow_up_detection() {
802 let collector = FeedbackCollector::new();
803 let session = collector.start_session().await;
804
805 let entry1 = Uuid::new_v4();
806
807 collector
809 .record_query(session, "first query", vec![entry1], None)
810 .await;
811 collector.record_view(session, entry1, 0).await;
812
813 collector
815 .record_query(session, "follow up query", vec![], None)
816 .await;
817
818 let signals = collector.drain_signals().await;
819
820 let follow_up = signals
822 .iter()
823 .find(|s| matches!(s.signal, SignalType::FollowUp { .. }));
824 assert!(follow_up.is_some());
825 }
826
827 #[tokio::test]
828 async fn test_co_access_on_session_end() {
829 let mut config = FeedbackConfig::default();
830 config.co_access_min_views = 2;
831 let collector = FeedbackCollector::with_config(config);
832
833 let session = collector.start_session().await;
834
835 let entry1 = Uuid::new_v4();
836 let entry2 = Uuid::new_v4();
837
838 collector
839 .record_query(session, "test", vec![entry1, entry2], None)
840 .await;
841 collector.record_view(session, entry1, 0).await;
842 collector.record_view(session, entry2, 1).await;
843
844 collector.end_session(session).await;
845
846 let signals = collector.drain_signals().await;
847
848 let co_access = signals
849 .iter()
850 .find(|s| matches!(s.signal, SignalType::CoAccess { .. }));
851 assert!(co_access.is_some());
852 }
853
854 #[tokio::test]
855 async fn test_dwell_filtering() {
856 let mut config = FeedbackConfig::default();
857 config.min_dwell_ms = 500;
858 let collector = FeedbackCollector::with_config(config);
859
860 let session = collector.start_session().await;
861 let entry = Uuid::new_v4();
862
863 collector.record_dwell(session, entry, 100).await;
865 assert_eq!(collector.pending_signals().await, 0);
866
867 collector.record_dwell(session, entry, 1000).await;
869 assert_eq!(collector.pending_signals().await, 1);
870 }
871
872 #[test]
873 fn test_feedback_processing() {
874 let processor = FeedbackProcessor::default();
875
876 let entry = Uuid::new_v4();
877 let signals = vec![
878 FeedbackSignal::new(
879 SessionId::new(),
880 Some(QueryId::new()),
881 SignalType::View {
882 entry_id: entry,
883 position: 0,
884 },
885 ),
886 FeedbackSignal::new(
887 SessionId::new(),
888 Some(QueryId::new()),
889 SignalType::Select {
890 entry_id: entry,
891 position: 0,
892 },
893 ),
894 ];
895
896 let processed = processor.process(signals);
897 assert_eq!(processed.len(), 1); assert!(processed[0].relevance_delta > 0.0);
901 }
902
903 #[test]
904 fn test_position_decay() {
905 let processor = FeedbackProcessor::default();
906
907 let entry1 = Uuid::new_v4();
908 let entry2 = Uuid::new_v4();
909
910 let signals = vec![
911 FeedbackSignal::new(
912 SessionId::new(),
913 Some(QueryId::new()),
914 SignalType::Select {
915 entry_id: entry1,
916 position: 0,
917 },
918 ),
919 FeedbackSignal::new(
920 SessionId::new(),
921 Some(QueryId::new()),
922 SignalType::Select {
923 entry_id: entry2,
924 position: 5,
925 },
926 ),
927 ];
928
929 let processed = processor.process(signals);
930 assert_eq!(processed.len(), 2);
931
932 let fb1 = processed.iter().find(|p| p.entry_id == entry1).unwrap();
933 let fb2 = processed.iter().find(|p| p.entry_id == entry2).unwrap();
934
935 assert!(fb1.relevance_delta > fb2.relevance_delta);
937 }
938}