pub mod chunkers;
pub mod config;
pub mod embeddings;
pub mod file_loader;
pub mod file_processor;
pub mod models;
pub mod reranker;
pub mod text_loader;
use std::{collections::HashMap, fs, path::PathBuf, rc::Rc, sync::Arc};
use anyhow::Result;
use config::{ImageEmbedConfig, TextEmbedConfig};
use embeddings::{
embed::{EmbedData, EmbedImage, Embedder, TextEmbedder, VisionEmbedder},
embed_audio, get_text_metadata,
};
use file_loader::FileParser;
use file_processor::audio::audio_processor::{self, AudioDecoderModel};
use itertools::Itertools;
use rayon::prelude::*;
use text_cleaner::clean::Clean;
use text_loader::{SplittingStrategy, TextLoader};
use tokio::sync::mpsc;
pub async fn embed_query(
query: Vec<String>,
embedder: &Embedder,
config: Option<&TextEmbedConfig>,
) -> Result<Vec<EmbedData>> {
let binding = TextEmbedConfig::default();
let config = config.unwrap_or(&binding);
let _chunk_size = config.chunk_size.unwrap_or(256);
let batch_size = config.batch_size;
let encodings = embedder.embed(&query, batch_size).await.unwrap();
let embeddings = get_text_metadata(&Rc::new(encodings), &query, &None)?;
Ok(embeddings)
}
pub async fn embed_file<T: AsRef<std::path::Path>, F>(
file_name: T,
embedder: &Embedder,
config: Option<&TextEmbedConfig>,
adapter: Option<F>,
) -> Result<Option<Vec<EmbedData>>>
where
F: Fn(Vec<EmbedData>), {
let binding = TextEmbedConfig::default();
let config = config.unwrap_or(&binding);
let chunk_size = config.chunk_size.unwrap_or(256);
let overlap_ratio = config.overlap_ratio.unwrap_or(0.0);
let batch_size = config.batch_size;
let splitting_strategy = config
.splitting_strategy
.unwrap_or(SplittingStrategy::Sentence);
let semantic_encoder = config.semantic_encoder.clone();
let use_ocr = config.use_ocr.unwrap_or(false);
match embedder {
Embedder::Text(embedder) => {
emb_text(
file_name,
embedder,
Some(chunk_size),
Some(overlap_ratio),
batch_size,
Some(splitting_strategy),
semantic_encoder,
adapter,
use_ocr,
)
.await
}
Embedder::Vision(embedder) => Ok(Some(vec![emb_image(file_name, embedder).unwrap()])),
}
}
pub async fn embed_webpage<F>(
url: String,
embedder: &Embedder,
config: Option<&TextEmbedConfig>,
adapter: Option<F>,
) -> Result<Option<Vec<EmbedData>>>
where
F: Fn(Vec<EmbedData>),
{
let website_processor = file_processor::website_processor::WebsiteProcessor::new();
let webpage = website_processor.process_website(url.as_ref())?;
let binding = TextEmbedConfig::default();
let config = config.unwrap_or(&binding);
let chunk_size = config.chunk_size.unwrap_or(256);
let overlap_ratio = config.overlap_ratio.unwrap_or(0.0);
let batch_size = config.batch_size;
let embeddings = webpage
.embed_webpage(embedder, chunk_size, overlap_ratio, batch_size)
.await?;
if let Some(adapter) = adapter {
adapter(embeddings);
Ok(None)
} else {
Ok(Some(embeddings))
}
}
pub async fn embed_html(
file_name: impl AsRef<std::path::Path>,
origin: Option<impl Into<String>>,
embedder: &Embedder,
config: Option<&TextEmbedConfig>,
adapter: Option<Box<dyn FnOnce(Vec<EmbedData>)>>,
) -> Result<Option<Vec<EmbedData>>> {
let html_processor = file_processor::html_processor::HtmlProcessor::new();
let html = html_processor.process_html_file(file_name.as_ref(), origin)?;
let binding = TextEmbedConfig::default();
let config = config.unwrap_or(&binding);
let chunk_size = config.chunk_size.unwrap_or(256);
let overlap_ratio = config.overlap_ratio.unwrap_or(0.0);
let batch_size = config.batch_size;
let embeddings = html
.embed_webpage(embedder, chunk_size, overlap_ratio, batch_size)
.await?;
if let Some(adapter) = adapter {
adapter(embeddings);
Ok(None)
} else {
Ok(Some(embeddings))
}
}
#[allow(clippy::too_many_arguments)]
async fn emb_text<T: AsRef<std::path::Path>, F>(
file: T,
embedding_model: &TextEmbedder,
chunk_size: Option<usize>,
overlap_ratio: Option<f32>,
batch_size: Option<usize>,
splitting_strategy: Option<SplittingStrategy>,
semantic_encoder: Option<Arc<Embedder>>,
adapter: Option<F>,
use_ocr: bool,
) -> Result<Option<Vec<EmbedData>>>
where
F: Fn(Vec<EmbedData>),
{
let text = TextLoader::extract_text(&file, use_ocr)?
.remove_leading_spaces()
.remove_trailing_spaces()
.remove_empty_lines();
let textloader = TextLoader::new(chunk_size.unwrap_or(256), overlap_ratio.unwrap_or(0.0));
let chunks = textloader
.split_into_chunks(
&text,
splitting_strategy.unwrap_or(SplittingStrategy::Sentence),
semantic_encoder,
)
.unwrap_or_default();
let metadata = TextLoader::get_metadata(file).ok();
if let Some(adapter) = adapter {
let encodings = embedding_model.embed(&chunks, batch_size).await.unwrap();
let embeddings = get_text_metadata(&Rc::new(encodings), &chunks, &metadata).unwrap();
adapter(embeddings);
Ok(None)
} else {
let encodings = embedding_model.embed(&chunks, batch_size).await.unwrap();
let embeddings = get_text_metadata(&Rc::new(encodings), &chunks, &metadata).unwrap();
Ok(Some(embeddings))
}
}
fn emb_image<T: AsRef<std::path::Path>>(
image_path: T,
embedding_model: &VisionEmbedder,
) -> Result<EmbedData> {
let mut metadata = HashMap::new();
metadata.insert(
"file_name".to_string(),
fs::canonicalize(&image_path)?.to_str().unwrap().to_string(),
);
let embedding = embedding_model
.embed_image(&image_path, Some(metadata))
.unwrap();
Ok(embedding.clone())
}
pub async fn emb_audio<T: AsRef<std::path::Path>>(
audio_file: T,
audio_decoder: &mut AudioDecoderModel,
embedder: &Arc<Embedder>,
text_embed_config: Option<&TextEmbedConfig>,
) -> Result<Option<Vec<EmbedData>>> {
let segments: Vec<audio_processor::Segment> = audio_decoder.process_audio(&audio_file).unwrap();
let embeddings = embed_audio(
embedder,
segments,
audio_file,
text_embed_config
.unwrap_or(&TextEmbedConfig::default())
.batch_size,
)
.await?;
Ok(Some(embeddings))
}
pub async fn embed_image_directory<T: EmbedImage + Send + Sync + 'static, F>(
directory: PathBuf,
embedding_model: &Arc<T>,
config: Option<&ImageEmbedConfig>,
adapter: Option<F>,
) -> Result<Option<Vec<EmbedData>>>
where
F: Fn(Vec<EmbedData>),
{
let mut file_parser = FileParser::new();
file_parser.get_image_paths(&directory).unwrap();
let buffer_size = config
.unwrap_or(&ImageEmbedConfig::default())
.buffer_size
.unwrap_or(100);
let (tx, mut rx) = mpsc::unbounded_channel();
let (collector_tx, mut collector_rx) = mpsc::unbounded_channel();
let embedder = embedding_model.clone();
let pb = indicatif::ProgressBar::new(file_parser.files.len() as u64);
pb.set_style(
indicatif::ProgressStyle::with_template(
"{spinner:.green} [{elapsed_precise}] [{bar:40.cyan/blue}] {pos}/{len} ({eta})",
)
.unwrap(),
);
let processing_task = tokio::spawn({
async move {
let mut image_buffer = Vec::with_capacity(buffer_size);
let mut files_processed: std::collections::HashSet<String> =
std::collections::HashSet::new();
while let Some(image) = rx.recv().await {
image_buffer.push(image);
if image_buffer.len() == buffer_size {
match process_images(&image_buffer, embedder.clone()).await {
Ok(embeddings) => {
let files = embeddings
.iter()
.cloned()
.map(|e| e.metadata.unwrap().get("file_name").unwrap().to_string())
.collect::<Vec<_>>();
let unique_files = files.into_iter().unique().collect::<Vec<_>>();
let old_len = files_processed.len() as u64;
files_processed.extend(unique_files);
let new_len = files_processed.len() as u64;
pb.inc(new_len - old_len);
if let Err(e) = collector_tx.send(embeddings) {
eprintln!("Error sending embeddings to collector: {:?}", e);
}
}
Err(e) => eprintln!("Error processing images: {:?}", e),
}
image_buffer.clear();
}
}
if !image_buffer.is_empty() {
match process_images(&image_buffer, embedder).await {
Ok(embeddings) => {
let files = embeddings
.iter()
.cloned()
.map(|e| e.metadata.unwrap().get("file_name").unwrap().to_string())
.collect::<Vec<_>>();
let unique_files = files.into_iter().unique().collect::<Vec<_>>();
let old_len = files_processed.len() as u64;
files_processed.extend(unique_files);
let new_len = files_processed.len() as u64;
pb.inc(new_len - old_len);
if let Err(e) = collector_tx.send(embeddings) {
eprintln!("Error sending embeddings to collector: {:?}", e);
}
}
Err(e) => eprintln!("Error processing images: {:?}", e),
}
}
}
});
file_parser.files.par_iter().for_each(|image| {
if let Err(e) = tx.send(image.clone()) {
eprintln!("Error sending image: {:?}", e);
}
});
drop(tx);
let mut all_embeddings = Vec::new();
while let Some(embeddings) = collector_rx.recv().await {
if let Some(adapter) = &adapter {
adapter(embeddings.to_vec());
} else {
all_embeddings.extend(embeddings.to_vec());
}
}
processing_task.await.unwrap();
if adapter.is_some() {
Ok(None)
} else {
Ok(Some(all_embeddings))
}
}
async fn process_images<E: EmbedImage>(
image_buffer: &[String],
embedder: Arc<E>,
) -> Result<Arc<Vec<EmbedData>>> {
let embeddings = embedder.embed_image_batch(image_buffer)?;
Ok(Arc::new(embeddings))
}
pub async fn embed_directory_stream<F>(
directory: PathBuf,
embedder: &Arc<Embedder>,
extensions: Option<Vec<String>>,
config: Option<&TextEmbedConfig>,
adapter: Option<F>,
) -> Result<Option<Vec<EmbedData>>>
where
F: Fn(Vec<EmbedData>),
{
println!("Embedding directory: {:?}", directory);
let binding = TextEmbedConfig::default();
let config = config.unwrap_or(&binding);
let chunk_size = config.chunk_size.unwrap_or(binding.chunk_size.unwrap());
let buffer_size = config.buffer_size.unwrap_or(binding.buffer_size.unwrap());
let batch_size = config.batch_size;
let use_ocr = config.use_ocr.unwrap_or(false);
let overlap_ratio = config.overlap_ratio.unwrap_or(0.0);
let mut file_parser = FileParser::new();
file_parser.get_text_files(&directory, extensions)?;
let files = file_parser.files.clone();
let (tx, mut rx) = mpsc::unbounded_channel();
let (collector_tx, mut collector_rx) = mpsc::unbounded_channel();
let embedder = embedder.clone();
let pb = indicatif::ProgressBar::new(files.len() as u64);
pb.set_style(
indicatif::ProgressStyle::with_template(
"{spinner:.green} [{elapsed_precise}] [{bar:40.cyan/blue}] {pos}/{len} ({eta})",
)
.unwrap(),
);
let processing_task = tokio::spawn({
async move {
let mut chunk_buffer = Vec::with_capacity(buffer_size);
let mut metadata_buffer = Vec::with_capacity(buffer_size);
let mut files_processed: std::collections::HashSet<String> =
std::collections::HashSet::new();
while let Some((chunk, metadata)) = rx.recv().await {
chunk_buffer.push(chunk);
metadata_buffer.push(metadata);
if chunk_buffer.len() == buffer_size {
match process_chunks(&chunk_buffer, &metadata_buffer, &embedder, batch_size)
.await
{
Ok(embeddings) => {
let files = embeddings
.iter()
.cloned()
.map(|e| e.metadata.unwrap().get("file_name").unwrap().to_string())
.collect::<Vec<_>>();
let unique_files = files.into_iter().unique().collect::<Vec<_>>();
let old_len = files_processed.len() as u64;
files_processed.extend(unique_files);
let new_len = files_processed.len() as u64;
pb.inc(new_len - old_len);
if let Err(e) = collector_tx.send(embeddings) {
eprintln!("Error sending embeddings to collector: {:?}", e);
}
}
Err(e) => eprintln!("Error processing chunks: {:?}", e),
}
chunk_buffer.clear();
metadata_buffer.clear();
}
}
if !chunk_buffer.is_empty() {
match process_chunks(&chunk_buffer, &metadata_buffer, &embedder, batch_size).await {
Ok(embeddings) => {
let files = embeddings
.iter()
.cloned()
.map(|e| e.metadata.unwrap().get("file_name").unwrap().to_string())
.collect::<Vec<_>>();
let unique_files = files.into_iter().unique().collect::<Vec<_>>();
let old_len = files_processed.len() as u64;
files_processed.extend(unique_files);
let new_len = files_processed.len() as u64;
pb.inc(new_len - old_len);
if let Err(e) = collector_tx.send(embeddings) {
eprintln!("Error sending embeddings to collector: {:?}", e);
}
}
Err(e) => eprintln!("Error processing chunks: {:?}", e),
}
}
}
});
let textloader = TextLoader::new(chunk_size, overlap_ratio);
file_parser.files.iter().for_each(|file| {
let text = match TextLoader::extract_text(file, use_ocr) {
Ok(text) => text
.remove_leading_spaces()
.remove_trailing_spaces()
.remove_empty_lines(),
Err(_) => {
return;
}
};
let chunks = textloader
.split_into_chunks(&text, SplittingStrategy::Sentence, None)
.unwrap_or_else(|| vec![text.clone()])
.into_iter()
.filter(|chunk| !chunk.trim().is_empty())
.collect::<Vec<_>>();
if chunks.is_empty() {
return;
}
let metadata = TextLoader::get_metadata(file).unwrap();
for chunk in chunks {
if let Err(e) = tx.send((chunk, Some(metadata.clone()))) {
eprintln!("Error sending chunk: {:?}", e);
}
}
});
drop(tx);
let mut all_embeddings = Vec::new();
while let Some(embeddings) = collector_rx.recv().await {
if let Some(adapter) = &adapter {
adapter(embeddings.to_vec());
} else {
all_embeddings.extend(embeddings.to_vec());
}
}
processing_task.await.unwrap();
if adapter.is_some() {
Ok(None)
} else {
Ok(Some(all_embeddings))
}
}
pub async fn process_chunks(
chunks: &Vec<String>,
metadata: &Vec<Option<HashMap<String, String>>>,
embedding_model: &Arc<Embedder>,
batch_size: Option<usize>,
) -> Result<Arc<Vec<EmbedData>>> {
let encodings = embedding_model.embed(chunks, batch_size).await?;
let embeddings = encodings
.into_iter()
.zip(chunks)
.zip(metadata)
.map(|((encoding, chunk), metadata)| {
EmbedData::new(encoding.clone(), Some(chunk.clone()), metadata.clone())
})
.collect::<Vec<_>>();
Ok(Arc::new(embeddings))
}