smgrep 0.5.1

Semantic code search tool with GPU acceleration - Rust port of osgrep
use std::{
   collections::HashSet,
   path::{Path, PathBuf},
};

use indicatif::ProgressBar;
use rayon::prelude::*;

pub use crate::types::SyncProgress;
use crate::{
   Result, Str,
   chunker::{Chunker, anchor::create_anchor_chunk},
   config,
   embed::Embedder,
   file::FileSystem,
   meta::{FileHash, MetaStore},
   store::Store,
   types::{PreparedChunk, VectorRecord},
};

fn get_mtime(path: &Path) -> u64 {
   path
      .metadata()
      .and_then(|m| m.modified())
      .ok()
      .and_then(|t| t.duration_since(std::time::UNIX_EPOCH).ok())
      .map_or(0, |d| d.as_secs())
}

pub struct SyncEngine<F: FileSystem, E: Embedder, S: Store> {
   file_system: F,
   chunker:     Chunker,
   embedder:    E,
   store:       S,
}

#[derive(Debug, Clone)]
pub struct SyncResult {
   pub processed: usize,
   pub indexed:   usize,
   pub skipped:   usize,
   pub deleted:   usize,
}

pub trait SyncProgressCallback: Send {
   fn progress(&mut self, progress: SyncProgress);
}

impl<F: FnMut(SyncProgress) + Send> SyncProgressCallback for F {
   fn progress(&mut self, progress: SyncProgress) {
      self(progress);
   }
}

impl SyncProgressCallback for () {
   fn progress(&mut self, _progress: SyncProgress) {}
}

impl SyncProgressCallback for ProgressBar {
   fn progress(&mut self, progress: SyncProgress) {
      self.update(|state| {
         state.set_len(progress.total as u64);
         state.set_pos(progress.processed as u64);
      });
      if let Some(file) = &progress.current_file {
         let short = file.rsplit('/').next().unwrap_or(&**file);
         self.set_message(short.to_string());
      }
   }
}

impl<F, E, S> SyncEngine<F, E, S>
where
   F: FileSystem + Sync,
   E: Embedder + Send + Sync,
   S: Store + Send + Sync,
{
   pub const fn new(file_system: F, chunker: Chunker, embedder: E, store: S) -> Self {
      Self { file_system, chunker, embedder, store }
   }

   pub async fn initial_sync(
      &self,
      store_id: &str,
      root: &Path,
      dry_run: bool,
      callback: &mut dyn SyncProgressCallback,
   ) -> Result<SyncResult> {
      const SAVE_INTERVAL: usize = 25;

      let mut meta_store = MetaStore::load(store_id)?;
      let batch_size = config::get().batch_size();

      let files = self.file_system.get_files(root)?.collect::<HashSet<_>>();

      let mut processed = 0;
      let mut indexed = 0;
      let mut skipped = 0;

      let meta_paths = meta_store.all_paths().cloned().collect::<HashSet<_>>();

      let deleted_paths = meta_paths.difference(&files).cloned().collect::<Vec<_>>();

      if !dry_run && !deleted_paths.is_empty() {
         self.store.delete_files(store_id, &deleted_paths).await?;
         for path in &deleted_paths {
            meta_store.remove(path);
         }
      }

      let deleted_count = deleted_paths.len();

      let hash_results: Vec<_> = files
         .into_iter()
         .filter_map(|file_path| {
            let current_mtime = get_mtime(&file_path);

            if let Some(stored_mtime) = meta_store.get_mtime(&file_path)
               && stored_mtime == current_mtime
            {
               return None;
            }

            let content = std::fs::read(&file_path).ok()?;
            let hash = FileHash::sum(&content);

            let existing_hash = meta_store.get_hash(file_path.as_path());
            let needs_indexing = existing_hash != Some(hash);

            Some((file_path, hash, content, current_mtime, needs_indexing))
         })
         .collect();

      let changed_files: Vec<PathBuf> = hash_results
         .iter()
         .filter_map(|(path, _, _, _, needs_indexing)| {
            let has_existing_hash = meta_store.get_hash(path).is_some();
            if *needs_indexing && has_existing_hash {
               Some(path.clone())
            } else {
               None
            }
         })
         .collect();

      if !dry_run && !changed_files.is_empty() {
         self.store.delete_files(store_id, &changed_files).await?;
      }

      let files_to_index: Vec<_> = hash_results
         .into_iter()
         .filter_map(|(path_str, hash, content, mtime, needs_indexing)| {
            processed += 1;
            if !needs_indexing {
               skipped += 1;
               None
            } else if dry_run {
               indexed += 1;
               None
            } else {
               Some((path_str, hash, content, mtime))
            }
         })
         .collect();

      let chunked_files: Vec<_> = files_to_index
         .par_iter()
         .filter_map(|(path, hash, content, mtime)| {
            let content_str = Str::from_utf8_lossy(content);

            let chunks = self.chunker.chunk(&content_str, path).ok()?;
            let anchor_chunk = create_anchor_chunk(&content_str, path);

            let mut prepared_chunks = Vec::new();

            let anchor_prepared = PreparedChunk {
               id:           format!("{}:anchor", path.display()),
               path:         path.clone(),
               hash:         *hash,
               content:      anchor_chunk.content.clone(),
               start_line:   anchor_chunk.start_line as u32,
               end_line:     anchor_chunk.end_line as u32,
               chunk_index:  Some(0),
               is_anchor:    Some(true),
               chunk_type:   anchor_chunk.chunk_type,
               context_prev: None,
               context_next: None,
            };
            prepared_chunks.push(anchor_prepared);

            for (idx, chunk) in chunks.iter().enumerate() {
               let context_prev: Option<Str> = if idx > 0 {
                  Some(chunks[idx - 1].content.clone())
               } else {
                  None
               };

               let context_next: Option<Str> = if idx < chunks.len() - 1 {
                  Some(chunks[idx + 1].content.clone())
               } else {
                  None
               };

               let prepared = PreparedChunk {
                  id: format!("{}:{}", path.display(), idx),
                  path: path.clone(),
                  hash: *hash,
                  content: chunk.content.clone(),
                  start_line: chunk.start_line as u32,
                  end_line: chunk.end_line as u32,
                  chunk_index: Some(idx as u32 + 1),
                  is_anchor: Some(false),
                  chunk_type: chunk.chunk_type,
                  context_prev,
                  context_next,
               };
               prepared_chunks.push(prepared);
            }

            Some((path.clone(), *hash, *mtime, prepared_chunks))
         })
         .collect();

      let mut embed_queue: Vec<(PathBuf, FileHash, u64, Vec<PreparedChunk>)> = Vec::new();
      let mut since_save = 0;
      let total_to_embed = chunked_files.len();
      let mut embedded = 0;

      for (path, hash, mtime, prepared_chunks) in chunked_files {
         embed_queue.push((path, hash, mtime, prepared_chunks));

         if embed_queue.len() >= batch_size {
            callback.progress(SyncProgress {
               processed: embedded,
               indexed,
               total: total_to_embed,
               current_file: Some(
                  format!("Embedding batch ({} files)...", embed_queue.len()).into(),
               ),
            });

            let batch = std::mem::take(&mut embed_queue);
            let batch_count = batch.len();
            let batch_indexed = self
               .process_embed_batch(store_id, batch, &mut meta_store)
               .await?;
            indexed += batch_indexed;
            embedded += batch_count;
            since_save += batch_count;

            if since_save >= SAVE_INTERVAL {
               meta_store.save()?;
               since_save = 0;
            }

            callback.progress(SyncProgress {
               processed: embedded,
               indexed,
               total: total_to_embed,
               current_file: None,
            });
         }
      }

      if !dry_run && !embed_queue.is_empty() {
         callback.progress(SyncProgress {
            processed: embedded,
            indexed,
            total: total_to_embed,
            current_file: Some(
               format!("Embedding final batch ({} files)...", embed_queue.len()).into(),
            ),
         });

         let batch = std::mem::take(&mut embed_queue);
         let batch_count = batch.len();
         let batch_indexed = self
            .process_embed_batch(store_id, batch, &mut meta_store)
            .await?;
         indexed += batch_indexed;
         embedded += batch_count;
      }

      if !dry_run {
         callback.progress(SyncProgress {
            processed: embedded,
            indexed,
            total: total_to_embed,
            current_file: Some("Creating indexes...".into()),
         });

         meta_store.save()?;

         if indexed > 0 {
            self.store.create_fts_index(store_id).await?;
            self.store.create_vector_index(store_id).await?;
         }
      }

      callback.progress(SyncProgress {
         processed: total_to_embed,
         indexed,
         total: total_to_embed,
         current_file: None,
      });

      Ok(SyncResult { processed, indexed, skipped, deleted: deleted_count })
   }

   async fn process_embed_batch(
      &self,
      store_id: &str,
      batch: Vec<(PathBuf, FileHash, u64, Vec<PreparedChunk>)>,
      meta_store: &mut MetaStore,
   ) -> Result<usize> {
      let mut all_chunks = Vec::new();
      let mut file_paths = Vec::new();

      for (path, _hash, _mtime, chunks) in &batch {
         for chunk in chunks {
            all_chunks.push(chunk.clone());
         }
         file_paths.push(path.clone());
      }

      if all_chunks.is_empty() {
         return Ok(0);
      }

      let texts: Vec<Str> = all_chunks.iter().map(|c| c.content.clone()).collect();

      let embeddings = self.embedder.compute_hybrid(&texts).await?;

      let records: Vec<VectorRecord> = all_chunks
         .into_iter()
         .zip(embeddings.into_iter())
         .map(|(chunk, embedding)| VectorRecord {
            id:            chunk.id,
            path:          chunk.path,
            hash:          chunk.hash,
            content:       chunk.content,
            start_line:    chunk.start_line,
            end_line:      chunk.end_line,
            chunk_index:   chunk.chunk_index,
            is_anchor:     chunk.is_anchor,
            chunk_type:    chunk.chunk_type,
            context_prev:  chunk.context_prev,
            context_next:  chunk.context_next,
            vector:        embedding.dense,
            colbert:       embedding.colbert,
            colbert_scale: embedding.colbert_scale,
         })
         .collect();

      self.store.insert_batch(store_id, records).await?;

      for (path, hash, mtime, _) in batch {
         meta_store.set_meta(path, hash, mtime);
      }

      Ok(file_paths.len())
   }
}