use crate::config::Config;
use crate::error::{MemvidError, Result};
use crate::ml::embedding::{EmbeddingConfig, EmbeddingModel};
use crate::ml::index::IndexManager;
use crate::qr::decoder::QrDecoder;
use crate::storage::database::Database;
use crate::text::ChunkMetadata;
use crate::video::decoder::{VideoDecoder, VideoInfo};
use lru::LruCache;
use std::path::Path;
#[derive(Debug, Clone)]
pub struct SearchResult {
pub score: f32,
pub text: String,
pub metadata: Option<ChunkMetadata>,
}
pub struct MemvidRetriever {
config: Config,
video_path: String,
database_path: String,
database: Database,
video_decoder: VideoDecoder,
qr_decoder: QrDecoder,
frame_cache: LruCache<u32, String>, embedding_model: EmbeddingModel,
index_manager: Option<IndexManager>,
}
impl MemvidRetriever {
pub async fn new<P1: AsRef<Path>, P2: AsRef<Path>>(
video_file: P1,
database_file: P2,
) -> Result<Self> {
let video_path = video_file.as_ref().to_string_lossy().to_string();
let database_path = database_file.as_ref().to_string_lossy().to_string();
if !video_file.as_ref().exists() {
return Err(MemvidError::Io(std::io::Error::new(
std::io::ErrorKind::NotFound,
format!("Video file not found: {}", video_path),
)));
}
if !database_file.as_ref().exists() {
return Err(MemvidError::Io(std::io::Error::new(
std::io::ErrorKind::NotFound,
format!("Database file not found: {}", database_path),
)));
}
let database = Database::new(&database_path)?;
let video_decoder = VideoDecoder::new()?;
let qr_decoder = QrDecoder::new();
let embedding_config = EmbeddingConfig::default();
let embedding_model = EmbeddingModel::new(embedding_config).await?;
log::info!(
"MemvidRetriever initialized for {} with database {}",
video_path,
database_path
);
Ok(Self {
config: Config::default(),
video_path,
database_path,
database,
video_decoder,
qr_decoder,
frame_cache: LruCache::new(std::num::NonZeroUsize::new(1000).unwrap()), embedding_model,
index_manager: None,
})
}
pub async fn search(&mut self, query: &str, top_k: usize) -> Result<Vec<(f32, String)>> {
log::info!("Searching for: '{}' (top {})", query, top_k);
let query_embedding = self.embedding_model.encode(query)?;
if let Some(ref index_manager) = self.index_manager {
log::info!(
"🧠 Using TRUE SEMANTIC SEARCH with IndexManager for query: '{}'",
query
);
let search_results = index_manager.search(&query_embedding, top_k)?;
let mut results = Vec::new();
for result in search_results {
if let Some(chunk) = index_manager.get_chunk_by_id(result.id) {
let score = 1.0 - result.distance; results.push((score, chunk.text.clone()));
}
}
log::info!(
"Found {} TRUE SEMANTIC results for query '{}'",
results.len(),
query
);
return Ok(results);
}
let all_chunks = self.database.search_chunks("", top_k * 10)?; let chunks_with_embeddings: Vec<_> = all_chunks
.iter()
.filter(|chunk| chunk.embedding.is_some())
.collect();
if chunks_with_embeddings.is_empty() {
log::warn!(
"❌ NO SEMANTIC EMBEDDINGS FOUND: The database contains no stored embeddings for semantic search"
);
log::warn!(
"💡 SOLUTION: Re-encode the video with embedding generation enabled, or use a system with IndexManager"
);
log::warn!(
"🚫 REFUSING to fall back to keyword search as it may provide misleading results"
);
return Err(crate::error::MemvidError::MachineLearning(
"No semantic embeddings available in database. Refusing keyword fallback to avoid misleading results. Please re-encode video with embeddings enabled.".to_string()
));
}
log::info!(
"🧠 Using TRUE SEMANTIC SEARCH with stored embeddings for query: '{}'",
query
);
let chunks_with_embeddings = self.database.search_chunks("", top_k * 50)?; let valid_chunks: Vec<_> = chunks_with_embeddings
.into_iter()
.filter(|chunk| chunk.embedding.is_some())
.collect();
if valid_chunks.is_empty() {
return Ok(vec![]);
}
let mut results = Vec::new();
let frame_numbers_for_prefetch: Vec<u32> = valid_chunks
.iter()
.filter_map(|chunk| chunk.frame)
.take(5)
.collect();
for chunk in valid_chunks {
if let Some(ref chunk_embedding) = chunk.embedding {
let similarity = self.compute_cosine_similarity(&query_embedding, chunk_embedding);
results.push((similarity, chunk.text));
}
}
results.sort_by(|a, b| b.0.partial_cmp(&a.0).unwrap());
results.truncate(top_k);
log::info!(
"📊 TRUE SEMANTIC RESULTS: Found {} embedding-based results for query '{}'",
results.len(),
query
);
log::info!("Found {} results for query '{}'", results.len(), query);
if !results.is_empty() && frame_numbers_for_prefetch.len() > 1 {
let limited_prefetch: Vec<u32> = frame_numbers_for_prefetch
.into_iter()
.take(3) .collect();
if !limited_prefetch.is_empty() {
log::debug!(
"🧠 LLM-optimized prefetching {} most relevant frames",
limited_prefetch.len()
);
self.prefetch_frames(limited_prefetch).await?;
}
}
Ok(results)
}
pub async fn search_with_metadata(
&mut self,
query: &str,
top_k: usize,
) -> Result<Vec<SearchResult>> {
log::info!("Searching with metadata for: '{}' (top {})", query, top_k);
let query_embedding = self.embedding_model.encode(query)?;
if let Some(ref index_manager) = self.index_manager {
log::info!(
"🧠 Using TRUE SEMANTIC SEARCH with IndexManager for metadata query: '{}'",
query
);
let search_results = index_manager.search(&query_embedding, top_k)?;
let mut results = Vec::new();
for result in search_results {
if let Some(chunk) = index_manager.get_chunk_by_id(result.id) {
let score = 1.0 - result.distance; results.push(SearchResult {
score,
text: chunk.text.clone(),
metadata: Some(crate::text::ChunkMetadata {
id: chunk.id,
text: chunk.text.clone(),
source: Some("".to_string()), page: None,
offset: 0,
length: chunk.length,
frame: Some(chunk.frame_number as u32),
embedding: None,
}),
});
}
}
log::info!(
"Found {} TRUE SEMANTIC results with metadata for query '{}'",
results.len(),
query
);
return Ok(results);
}
let all_chunks = self.database.search_chunks("", top_k * 10)?; let chunks_with_embeddings: Vec<_> = all_chunks
.iter()
.filter(|chunk| chunk.embedding.is_some())
.collect();
if chunks_with_embeddings.is_empty() {
log::warn!(
"❌ NO SEMANTIC EMBEDDINGS FOUND: The database contains no stored embeddings for semantic search"
);
log::warn!(
"💡 SOLUTION: Re-encode the video with embedding generation enabled, or use a system with IndexManager"
);
log::warn!(
"🚫 REFUSING to fall back to keyword search as it may provide misleading results"
);
return Err(crate::error::MemvidError::MachineLearning(
"No semantic embeddings available in database. Refusing keyword fallback to avoid misleading results. Please re-encode video with embeddings enabled.".to_string()
));
}
log::info!(
"🧠 Using TRUE SEMANTIC SEARCH with stored embeddings for metadata query: '{}'",
query
);
let chunks_with_embeddings = self.database.search_chunks("", top_k * 50)?; let valid_chunks: Vec<_> = chunks_with_embeddings
.into_iter()
.filter(|chunk| chunk.embedding.is_some())
.collect();
if valid_chunks.is_empty() {
return Ok(vec![]);
}
let mut results = Vec::new();
for chunk in valid_chunks {
let score = if let Some(ref chunk_embedding) = chunk.embedding {
self.compute_cosine_similarity(&query_embedding, chunk_embedding)
} else {
continue; };
results.push(SearchResult {
score,
text: chunk.text.clone(),
metadata: Some(chunk),
});
}
results.sort_by(|a, b| b.score.partial_cmp(&a.score).unwrap());
results.truncate(top_k);
log::info!(
"📊 TRUE SEMANTIC METADATA RESULTS: Found {} embedding-based results for query '{}'",
results.len(),
query
);
Ok(results)
}
pub async fn get_chunk_by_id(&self, chunk_id: usize) -> Result<Option<String>> {
log::info!("Retrieving chunk by ID: {}", chunk_id);
let chunk = self.database.get_chunk_by_id(chunk_id)?;
Ok(chunk.map(|c| c.text))
}
pub async fn get_context_window(
&self,
chunk_id: usize,
window_size: usize,
) -> Result<Vec<String>> {
log::info!(
"Getting context window for chunk {} with size {}",
chunk_id,
window_size
);
let target_chunk = self.database.get_chunk_by_id(chunk_id)?;
if target_chunk.is_none() {
return Ok(vec![]);
}
let half_window = window_size / 2;
let start_id = chunk_id.saturating_sub(half_window);
let end_id = chunk_id + half_window;
let mut context = Vec::new();
for id in start_id..=end_id {
if let Some(chunk) = self.database.get_chunk_by_id(id)? {
context.push(chunk.text);
}
}
Ok(context)
}
pub async fn get_chunks_by_frame(&self, frame_number: u32) -> Result<Vec<ChunkMetadata>> {
log::info!("Retrieving chunks for frame {}", frame_number);
self.database.get_chunks_by_frame(frame_number)
}
pub async fn decode_frame(&mut self, frame_number: u32) -> Result<String> {
if let Some(cached_content) = self.frame_cache.get(&frame_number) {
log::debug!("Frame {} content retrieved from cache", frame_number);
return Ok(cached_content.clone());
}
log::info!("Decoding QR content from frame {}", frame_number);
let frame_image = self
.video_decoder
.extract_frame(&self.video_path, frame_number)
.await?;
let qr_result = self.qr_decoder.decode_image(&frame_image)?;
let content = qr_result.text;
self.frame_cache.put(frame_number, content.clone());
Ok(content)
}
pub async fn get_video_info(&self) -> Result<VideoInfo> {
self.video_decoder.get_video_info(&self.video_path).await
}
pub async fn prefetch_frames_parallel(&mut self, frame_numbers: Vec<u32>) -> Result<()> {
log::info!(
"LLM-optimized parallel prefetching {} frames",
frame_numbers.len()
);
let frames_to_fetch: Vec<u32> = frame_numbers
.into_iter()
.filter(|&frame_num| !self.frame_cache.contains(&frame_num))
.collect();
if frames_to_fetch.is_empty() {
log::debug!("All frames already cached, skipping prefetch");
return Ok(());
}
log::info!("Need to fetch {} new frames", frames_to_fetch.len());
let batch_size = std::cmp::min(3, frames_to_fetch.len());
let mut successful_count = 0;
let mut failed_count = 0;
for batch in frames_to_fetch.chunks(batch_size) {
let mut tasks = Vec::new();
for &frame_number in batch {
let video_path = self.video_path.clone();
let task = tokio::spawn(async move {
let video_decoder = VideoDecoder::new()?;
let frame_image = video_decoder
.extract_frame(&video_path, frame_number)
.await?;
let qr_decoder = QrDecoder::new();
let qr_result = qr_decoder.decode_image(&frame_image)?;
Ok::<(u32, String), MemvidError>((frame_number, qr_result.text))
});
tasks.push(task);
}
for task in tasks {
match task.await {
Ok(Ok((frame_number, content))) => {
self.frame_cache.put(frame_number, content);
successful_count += 1;
}
Ok(Err(e)) => {
log::warn!("Failed to decode frame: {}", e);
failed_count += 1;
}
Err(e) => {
log::warn!("Task failed: {}", e);
failed_count += 1;
}
}
}
}
log::info!(
"LLM-optimized prefetch completed: {} successful, {} failed",
successful_count,
failed_count
);
Ok(())
}
pub async fn prefetch_frames(&mut self, frame_numbers: Vec<u32>) -> Result<()> {
self.prefetch_frames_parallel(frame_numbers).await
}
pub async fn prefetch_frames_serial(&mut self, frame_numbers: Vec<u32>) -> Result<()> {
log::info!("Serial prefetching {} frames", frame_numbers.len());
for frame_number in frame_numbers {
if !self.frame_cache.contains(&frame_number) {
match self.decode_frame_internal(frame_number).await {
Ok(content) => {
self.frame_cache.put(frame_number, content);
}
Err(e) => {
log::warn!("Failed to prefetch frame {}: {}", frame_number, e);
}
}
}
}
Ok(())
}
async fn decode_frame_internal(&self, frame_number: u32) -> Result<String> {
let frame_image = self
.video_decoder
.extract_frame(&self.video_path, frame_number)
.await?;
let qr_result = self.qr_decoder.decode_image(&frame_image)?;
Ok(qr_result.text)
}
pub fn clear_cache(&mut self) {
let old_len = self.frame_cache.len();
self.frame_cache.clear();
log::info!("Frame cache cleared ({} entries removed)", old_len);
}
pub fn get_stats(&self) -> Result<RetrievalStats> {
let db_stats = self.database.get_stats()?;
Ok(RetrievalStats {
total_chunks: db_stats.chunk_count,
total_frames: db_stats.frame_count,
cache_hits: 0, cache_misses: 0, cached_frames: self.frame_cache.len(),
database_size_bytes: db_stats.file_size_bytes,
average_search_time: 0.0, })
}
pub fn video_path(&self) -> &str {
&self.video_path
}
pub fn database_path(&self) -> &str {
&self.database_path
}
pub fn config(&self) -> &Config {
&self.config
}
fn compute_cosine_similarity(&self, a: &[f32], b: &[f32]) -> f32 {
let dot_product: f32 = a.iter().zip(b.iter()).map(|(x, y)| x * y).sum();
let norm_a: f32 = a.iter().map(|x| x * x).sum::<f32>().sqrt();
let norm_b: f32 = b.iter().map(|x| x * x).sum::<f32>().sqrt();
if norm_a == 0.0 || norm_b == 0.0 {
0.0
} else {
dot_product / (norm_a * norm_b)
}
}
}
#[derive(Debug, Clone)]
pub struct RetrievalStats {
pub total_chunks: usize,
pub total_frames: usize,
pub cache_hits: usize,
pub cache_misses: usize,
pub cached_frames: usize,
pub database_size_bytes: usize,
pub average_search_time: f64,
}
#[cfg(test)]
mod tests {
use super::*;
use crate::api::MemvidEncoder;
use tempfile;
async fn setup_test_memory() -> (String, String, Vec<String>, tempfile::TempDir) {
let chunks = vec![
"Quantum computing uses qubits for parallel processing".to_string(),
"Machine learning models require large datasets".to_string(),
"Neural networks mimic brain structure".to_string(),
"Cloud computing provides scalable resources".to_string(),
"Blockchain ensures data immutability".to_string(),
];
let mut encoder = MemvidEncoder::new(None).await.unwrap();
encoder.add_chunks(chunks.clone()).unwrap();
let temp_dir = tempfile::tempdir().unwrap();
let video_file = temp_dir
.path()
.join("test.mp4")
.to_string_lossy()
.to_string();
let index_file = temp_dir
.path()
.join("test_index.db")
.to_string_lossy()
.to_string();
encoder.build_video(&video_file, &index_file).await.unwrap();
(video_file, index_file, chunks, temp_dir)
}
#[tokio::test]
async fn test_retriever_initialization() {
let (video_file, index_file, chunks, _temp_dir) = setup_test_memory().await;
let retriever = MemvidRetriever::new(&video_file, &index_file)
.await
.unwrap();
assert_eq!(retriever.video_path(), video_file);
let stats = retriever.get_stats().unwrap();
assert_eq!(stats.total_frames, chunks.len());
}
#[tokio::test]
async fn test_search() {
let (video_file, index_file, _chunks, _temp_dir) = setup_test_memory().await;
let mut retriever = MemvidRetriever::new(&video_file, &index_file)
.await
.unwrap();
let results = retriever.search("quantum computing", 3).await.unwrap();
assert!(results.len() <= 3);
assert!(!results.is_empty());
let results = retriever.search("machine learning", 3).await.unwrap();
assert!(results.len() <= 3);
assert!(!results.is_empty());
let results = retriever.search("blockchain", 3).await.unwrap();
assert!(results.len() <= 3);
assert!(!results.is_empty()); }
#[tokio::test]
async fn test_search_with_metadata() {
let (video_file, index_file, _chunks, _temp_dir) = setup_test_memory().await;
let mut retriever = MemvidRetriever::new(&video_file, &index_file)
.await
.unwrap();
let results = retriever
.search_with_metadata("blockchain", 2)
.await
.unwrap();
assert!(results.len() <= 2);
if !results.is_empty() {
let result = &results[0];
assert!(result.score > 0.0);
assert!(!result.text.is_empty());
assert!(result.metadata.is_some());
}
}
#[tokio::test]
async fn test_get_chunk_by_id() {
let (video_file, index_file, _chunks, _temp_dir) = setup_test_memory().await;
let retriever = MemvidRetriever::new(&video_file, &index_file)
.await
.unwrap();
let chunk = retriever.get_chunk_by_id(0).await.unwrap();
assert!(chunk.is_some());
assert!(chunk.unwrap().to_lowercase().contains("quantum"));
let chunk = retriever.get_chunk_by_id(999).await.unwrap();
assert!(chunk.is_none());
}
#[tokio::test]
async fn test_cache_operations() {
let (video_file, index_file, _chunks, _temp_dir) = setup_test_memory().await;
let mut retriever = MemvidRetriever::new(&video_file, &index_file)
.await
.unwrap();
let initial_stats = retriever.get_stats().unwrap();
assert_eq!(initial_stats.cached_frames, 0);
let _ = retriever.decode_frame(0).await;
retriever.clear_cache();
let stats_after_clear = retriever.get_stats().unwrap();
assert_eq!(stats_after_clear.cached_frames, 0);
}
#[tokio::test]
async fn test_retriever_stats() {
let (video_file, index_file, chunks, _temp_dir) = setup_test_memory().await;
let retriever = MemvidRetriever::new(&video_file, &index_file)
.await
.unwrap();
let stats = retriever.get_stats().unwrap();
assert_eq!(stats.total_frames, chunks.len());
assert!(stats.database_size_bytes > 0);
assert_eq!(stats.cached_frames, 0); }
#[tokio::test]
async fn test_video_info() {
let (video_file, index_file, _chunks, _temp_dir) = setup_test_memory().await;
let retriever = MemvidRetriever::new(&video_file, &index_file)
.await
.unwrap();
let video_info = retriever.get_video_info().await.unwrap();
assert!(video_info.width > 0);
assert!(video_info.height > 0);
assert!(video_info.fps > 0.0);
assert!(video_info.frame_count > 0);
}
#[tokio::test]
async fn test_context_window() {
let (video_file, index_file, _chunks, _temp_dir) = setup_test_memory().await;
let retriever = MemvidRetriever::new(&video_file, &index_file)
.await
.unwrap();
let context = retriever.get_context_window(1, 3).await.unwrap();
assert!(!context.is_empty());
assert!(context.len() <= 4); }
#[tokio::test]
async fn test_chunks_by_frame() {
let (video_file, index_file, _chunks, _temp_dir) = setup_test_memory().await;
let retriever = MemvidRetriever::new(&video_file, &index_file)
.await
.unwrap();
let chunks = retriever.get_chunks_by_frame(0).await.unwrap();
assert!(!chunks.is_empty());
for chunk in chunks {
assert!(!chunk.text.is_empty());
assert_eq!(chunk.frame, Some(0));
}
}
}