use std::{
ffi::OsStr,
fs::{File, metadata},
io::{self, BufReader, Read},
path::Path,
sync::Arc,
time::{Instant, SystemTime},
};
use chrono::{DateTime, NaiveDateTime, TimeZone, Utc};
use colored::Colorize;
use csv::{ReaderBuilder, Terminator};
use num_format::{Locale, ToFormattedString};
#[cfg(feature = "pdf")]
use pdfium_render::prelude::{PdfDocumentMetadataTagType, Pdfium};
use serde_json::{Deserializer, Value, json};
use tabled::{
Table,
settings::{
Color, Modify, Remove, Style, Width,
object::{Columns, Rows},
style::{BorderColor, HorizontalLine},
},
};
use tokio::sync::RwLock;
use walkdir::WalkDir;
use crate::{
commit::Commit,
index::{
Document, FileType, INDEX_FORMAT_VERSION_MAJOR, INDEX_FORMAT_VERSION_MINOR, Index,
IndexArc, IndexDocument, Info, META_FILENAME,
},
utils::{dir_size, encode_bytes_to_base64_string, truncate},
vector::{Embedding, Quantization, embedding_to_bytes_be},
vector_similarity::VectorSimilarity,
};
use lazy_static::lazy_static;
#[cfg(feature = "pdf")]
type PdfDocument<'a> = pdfium_render::prelude::PdfDocument<'a>;
#[cfg(not(feature = "pdf"))]
type PdfDocument<'a> = ();
#[cfg(feature = "pdf")]
lazy_static! {
pub(crate) static ref pdfium_option: Option<Pdfium> = if let Ok(pdfium) =
Pdfium::bind_to_library(Pdfium::pdfium_platform_library_name_at_path("./"))
.or_else(|_| Pdfium::bind_to_system_library())
{
Some(Pdfium::new(pdfium))
} else {
None
};
}
fn read_skipping_ws(mut reader: impl Read) -> io::Result<u8> {
loop {
let mut byte = 0u8;
reader.read_exact(std::slice::from_mut(&mut byte))?;
if !byte.is_ascii_whitespace() {
return Ok(byte);
}
}
}
#[allow(clippy::too_many_arguments)]
#[allow(async_fn_in_trait)]
pub trait IndexPdfFile {
async fn index_pdf_file(&self, file_path: &Path) -> Result<(), String>;
}
impl IndexPdfFile for IndexArc {
async fn index_pdf_file(&self, file_path: &Path) -> Result<(), String> {
#[cfg(feature = "pdf")]
{
if let Some(pdfium) = pdfium_option.as_ref() {
let file_size = file_path.metadata().unwrap().len() as usize;
let date: DateTime<Utc> = if let Ok(metadata) = metadata(file_path) {
if let Ok(time) = metadata.created() {
time
} else {
SystemTime::now()
}
} else {
SystemTime::now()
}
.into();
let file_date = date.timestamp();
if let Ok(pdf) = pdfium.load_pdf_from_file(file_path, None) {
self.index_pdf(
file_path,
file_size,
file_date,
FileType::Path(file_path.into()),
pdf,
)
.await;
Ok(())
} else {
println!("can't read PDF {} {}", file_path.display(), file_size);
Err("can't read PDF".to_string())
}
} else {
println!(
"Pdfium library not found: download and copy into the same folder as the seekstorm_server.exe: https://github.com/bblanchon/pdfium-binaries"
);
Err("Pdfium library not found".to_string())
}
}
#[cfg(not(feature = "pdf"))]
{
println!("pdf feature flag not enabled");
Err("pdf feature flag not enabled".to_string())
}
}
}
#[allow(clippy::too_many_arguments)]
#[allow(async_fn_in_trait)]
pub trait IndexPdfBytes {
async fn index_pdf_bytes(
&self,
file_path: &Path,
file_date: i64,
file_bytes: &[u8],
) -> Result<(), String>;
}
#[cfg(feature = "pdf")]
impl IndexPdfBytes for IndexArc {
async fn index_pdf_bytes(
&self,
file_path: &Path,
file_date: i64,
file_bytes: &[u8],
) -> Result<(), String> {
if let Some(pdfium) = pdfium_option.as_ref() {
let file_size = file_bytes.len();
if let Ok(pdf) = pdfium.load_pdf_from_byte_slice(file_bytes, None) {
self.index_pdf(
file_path,
file_size,
file_date,
FileType::Bytes(file_path.into(), file_bytes.into()),
pdf,
)
.await;
Ok(())
} else {
println!("can't read PDF {} {}", file_path.display(), file_size);
Err("can't read PDF".to_string())
}
} else {
println!(
"Pdfium library not found: download and copy into the same folder as the seekstorm_server.exe: https://github.com/bblanchon/pdfium-binaries"
);
Err("Pdfium library not found".to_string())
}
}
}
#[cfg(not(feature = "pdf"))]
impl IndexPdfBytes for IndexArc {
async fn index_pdf_bytes(
&self,
file_path: &Path,
file_date: i64,
file_bytes: &[u8],
) -> Result<(), String> {
println!("pdf feature flag not enabled");
Err("pdf feature flag not enabled".to_string())
}
}
#[allow(clippy::too_many_arguments)]
#[allow(async_fn_in_trait)]
trait IndexPdf {
async fn index_pdf(
&self,
file_path: &Path,
file_size: usize,
file_date: i64,
file: FileType,
pdf: PdfDocument<'_>,
);
}
#[cfg(feature = "pdf")]
impl IndexPdf for IndexArc {
async fn index_pdf(
&self,
file_path: &Path,
file_size: usize,
file_date: i64,
file: FileType,
pdf: PdfDocument<'_>,
) {
let mut text = String::with_capacity(file_size);
pdf.pages().iter().for_each(|page| {
text.push_str(&page.text().unwrap().all());
text.push_str(" \n");
});
if text.is_empty() {
println!("can't extract text from PDF {}", file_path.display(),);
} else {
let meta = pdf.metadata();
let title = if let Some(title) = meta.get(PdfDocumentMetadataTagType::Title) {
title.value().to_owned()
} else {
let mut i = 0;
let mut lines = text.lines();
loop {
i += 1;
if let Some(title) = lines.next() {
if title.trim().len() > 1 {
break truncate(title, 160).trim().to_owned();
} else if i < 10 {
continue;
}
}
break file_path
.file_stem()
.unwrap()
.to_string_lossy()
.to_string()
.replace("_", "");
}
};
let mut creation_timestamp =
if let Some(date) = meta.get(PdfDocumentMetadataTagType::CreationDate) {
let mut date_string = if date.value().starts_with("D:") {
&date.value()[2..]
} else {
&date.value()[0..]
};
if date_string.len() > 14
&& date_string
.chars()
.nth(14)
.unwrap()
.eq_ignore_ascii_case(&'z')
{
date_string = &date_string[0..14];
}
if date_string.len() == 14
|| date_string.len() == 19
|| date_string.len() == 20
|| date_string.len() == 21
{
let mut date_string2 = String::with_capacity(23);
date_string2.push_str(&date_string[0..4]);
date_string2.push('-');
date_string2.push_str(&date_string[4..6]);
date_string2.push('-');
date_string2.push_str(&date_string[6..8]);
date_string2.push('T');
date_string2.push_str(&date_string[8..10]);
date_string2.push(':');
date_string2.push_str(&date_string[10..12]);
date_string2.push(':');
date_string2.push_str(&date_string[12..14]);
if date_string.len() == 14 {
date_string2.push_str("+00:00")
} else if date_string.chars().nth(17).unwrap() == '\'' {
date_string2.push_str(&date_string[14..17]);
date_string2.push(':');
date_string2.push_str(&date_string[18..20]);
} else {
date_string2.push_str(&date_string[14..17]);
date_string2.push(':');
date_string2.push_str(&date_string[17..19]);
}
if let Ok(date) = DateTime::parse_from_rfc3339(&date_string2) {
date.timestamp()
} else {
file_date
}
} else if let Ok(date) =
NaiveDateTime::parse_from_str(date.value(), "%a %b %e %H:%M:%S %Y")
.map(|ndt| Utc.from_utc_datetime(&ndt))
{
date.timestamp()
} else if let Ok(date) =
NaiveDateTime::parse_from_str(date.value(), "%Y/%m/%d %H:%M:%S")
.map(|ndt| Utc.from_utc_datetime(&ndt))
{
date.timestamp()
} else if let Ok(date) =
NaiveDateTime::parse_from_str(date.value(), "%m/%e/%Y %H:%M:%S")
.map(|ndt| Utc.from_utc_datetime(&ndt))
{
date.timestamp()
} else {
file_date
}
} else {
file_date
};
if creation_timestamp > Utc::now().timestamp() || creation_timestamp < 0 {
creation_timestamp = file_date;
}
let document = Document::from([
("title".to_string(), json!(title)),
("body".to_string(), json!(text)),
("url".to_string(), json!(&file_path.display().to_string())),
("date".to_string(), json!(creation_timestamp)),
]);
self.index_document(document, file).await;
let date_time = Utc.timestamp_opt(creation_timestamp, 0).unwrap();
println!(
"indexed {} {} {} {}",
date_time.format("%d/%m/%Y %H:%M"),
file_path.display(),
text.len().to_formatted_string(&Locale::en),
title
);
}
}
}
#[cfg(not(feature = "pdf"))]
impl IndexPdf for IndexArc {
async fn index_pdf(
&self,
file_path: &Path,
file_size: usize,
file_date: i64,
file: FileType,
pdf: PdfDocument<'_>,
) {
println!("pdf feature flag not enabled");
}
}
pub(crate) async fn path_recurse(
index_arc: &Arc<RwLock<Index>>,
data_path: &Path,
docid: &mut usize,
) {
for entry in WalkDir::new(data_path) {
let entry = entry.unwrap();
let path = entry.path();
let md = metadata(path).unwrap();
if md.is_file()
&& let Some(extension) = path.extension().and_then(OsStr::to_str)
&& extension.to_lowercase() == "pdf"
&& index_arc.index_pdf_file(path).await.is_ok()
{
*docid += 1;
};
}
}
#[allow(clippy::too_many_arguments)]
#[allow(async_fn_in_trait)]
pub trait IngestPdf {
async fn ingest_pdf(&mut self, file_path: &Path);
}
#[cfg(feature = "pdf")]
impl IngestPdf for IndexArc {
async fn ingest_pdf(&mut self, data_path: &Path) {
if pdfium_option.is_some() {
match data_path.exists() {
true => {
println!("ingesting PDF from: {}", data_path.display());
let start_time = Instant::now();
let mut docid = 0usize;
let index_ref = self.read().await;
drop(index_ref);
let md = metadata(data_path).unwrap();
if md.is_file() {
if let Some(extension) = Path::new(&data_path.display().to_string())
.extension()
.and_then(OsStr::to_str)
&& extension.to_lowercase() == "pdf"
&& self.index_pdf_file(data_path).await.is_ok()
{
docid += 1;
}
} else {
path_recurse(self, data_path, &mut docid).await;
}
self.commit().await;
let elapsed_time = start_time.elapsed().as_nanos();
println!(
"{}: docs {} docs/sec {} docs/day {} minutes {:.2} seconds {}",
"Indexing finished".green(),
docid.to_formatted_string(&Locale::en),
(docid as u128 * 1_000_000_000 / elapsed_time)
.to_formatted_string(&Locale::en),
((docid as u128 * 1_000_000_000 / elapsed_time) * 3600 * 24)
.to_formatted_string(&Locale::en),
elapsed_time as f64 / 1_000_000_000.0 / 60.0,
elapsed_time / 1_000_000_000
);
}
false => {
println!("data file not found: {}", data_path.display());
}
}
} else {
println!(
"Pdfium library not found: download and copy into the same folder as the seekstorm_server.exe: https://github.com/bblanchon/pdfium-binaries"
)
}
}
}
#[cfg(not(feature = "pdf"))]
impl IngestPdf for IndexArc {
async fn ingest_pdf(&mut self, data_path: &Path) {
println!("pdf feature flag not enabled");
}
}
#[allow(clippy::too_many_arguments)]
#[allow(async_fn_in_trait)]
pub trait IngestJson {
async fn ingest_json(&mut self, data_path: &Path);
}
impl IngestJson for IndexArc {
async fn ingest_json(&mut self, data_path: &Path) {
match data_path.exists() {
true => {
println!("ingesting data from: {}", data_path.display());
let start_time = Instant::now();
let index_arc_clone2 = self.clone();
let index_ref = index_arc_clone2.read().await;
drop(index_ref);
let _index_arc_clone = self.clone();
let file = File::open(data_path).unwrap();
let mut reader = BufReader::new(file);
let is_vector = read_skipping_ws(&mut reader).unwrap() == b'[';
if !is_vector {
println!("Newline-delimited JSON (ndjson) or Concatenated JSON detected");
reader.seek_relative(-1).unwrap();
for doc_object in Deserializer::from_reader(reader).into_iter::<Document>() {
self.index_document(doc_object.unwrap(), FileType::None)
.await;
}
} else {
println!("JSON detected");
loop {
let next_obj = Deserializer::from_reader(reader.by_ref())
.into_iter::<Document>()
.next();
match next_obj {
Some(doc_object) => {
self.index_document(doc_object.unwrap(), FileType::None)
.await
}
None => break,
}
match read_skipping_ws(reader.by_ref()).unwrap() {
b',' => {}
b']' => break,
_ => break,
}
}
}
self.commit().await;
let elapsed_time = start_time.elapsed().as_nanos();
let _date: DateTime<Utc> = DateTime::from(SystemTime::now());
let time_label: &'static str = "index time";
let time_value = (elapsed_time / 1_000_000_000).to_string()
+ " s "
+ &format!("{:.2} min ", elapsed_time as f64 / 1_000_000_000.0 / 60.0)
+ &format!(
"{} doc/s",
(self.read().await.indexed_doc_count().await as u128 * 1_000_000_000
/ elapsed_time)
.to_formatted_string(&Locale::en)
)
+ " "
+ &format!(
"{} doc/day",
((self.read().await.indexed_doc_count().await as u128 * 1_000_000_000
/ elapsed_time)
* 3600
* 24)
.to_formatted_string(&Locale::en)
);
display_index_info(self, time_label, time_value).await
}
false => {
println!("data file not found: {}", data_path.display());
}
}
}
}
pub async fn display_index_info(
index_arc: &Arc<RwLock<Index>>,
time_label: &'static str,
time_value: String,
) {
let index_ref = index_arc.read().await;
let non_affine_string = if index_ref.quantization == Quantization::ScalarQuantizationI8
&& index_ref.vector_similarity == VectorSimilarity::Euclidean
{
let non_affine = index_ref.shard_vec[0].read().await.max_vector_value == f32::MIN;
if non_affine {
" (non-affine)".to_string()
} else {
" (affine)".to_string()
}
} else {
"".to_string()
};
let index_path = index_ref.index_path_string.clone();
let level_count = index_ref.level_count().await;
let file_date = metadata(Path::new(&index_path).join(META_FILENAME))
.unwrap()
.created()
.unwrap();
let dt_now_utc: DateTime<Utc> = file_date.into();
let s = if let Some(symspell) = index_ref.symspell_option.as_ref() {
symspell
.read()
.await
.get_candidates_size()
.to_formatted_string(&Locale::en)
} else {
"None".to_string()
};
let s2 = if let Some(completions) = index_ref.completion_option.as_ref() {
completions
.read()
.await
.len()
.to_formatted_string(&Locale::en)
} else {
"None".to_string()
};
for shard in index_ref.shard_vec.iter() {
shard.read().await.index_file.sync_all().unwrap();
shard.read().await.vector_file.sync_all().unwrap();
shard.read().await.facets_file.sync_all().unwrap();
shard.read().await.docstore_file.sync_all().unwrap();
}
let index_size = dir_size(Path::new(&index_ref.index_path_string)).unwrap_or(0);
let dimensions_label = if index_ref.vector_dimensions != index_ref.vector_dimensions_original {
" (".to_owned()
+ &index_ref.shard_vec[0]
.read()
.await
.vector_dimensions
.to_formatted_string(&Locale::en)
+ " padded)"
} else {
"".to_string()
};
let info_entries = vec![
Info {
entry: "Index",
value: "".to_string(),
},
Info {
entry: "index id, name",
value: index_ref.meta.id.to_string() + " " + &index_ref.meta.name,
},
Info {
entry: "index format version",
value: index_ref.index_format_version_major.to_string()
+ "."
+ &index_ref.index_format_version_minor.to_string()
+ " "
+ &INDEX_FORMAT_VERSION_MAJOR.to_string()
+ "."
+ &INDEX_FORMAT_VERSION_MINOR.to_string(),
},
Info {
entry: "indexed documents",
value: index_ref
.indexed_doc_count()
.await
.to_formatted_string(&Locale::en),
},
Info {
entry: "deleted documents",
value: index_ref.deleted_doc_count.to_formatted_string(&Locale::en),
},
Info {
entry: "fields",
value: index_ref.schema_map.len().to_string()
+ " ("
+ &index_ref.indexed_field_vec.len().to_string()
+ " indexed, "
+ &index_ref.stored_field_names.len().to_string()
+ " stored)",
},
Info {
entry: "shards",
value: index_ref.shard_count().await.to_string(),
},
Info {
entry: "levels",
value: level_count.to_string(),
},
Info {
entry: "document compression",
value: index_ref.meta.document_compression.to_string(),
},
Info {
entry: "index size",
value: index_size.to_formatted_string(&Locale::en) + " bytes",
},
Info {
entry: "creation date",
value: dt_now_utc.format("%Y-%m-%d %H:%M:%S").to_string(),
},
Info {
entry: time_label,
value: time_value,
},
Info {
entry: "Lexical",
value: index_ref.is_lexical_indexing.to_string(),
},
Info {
entry: "segments",
value: index_ref.segment_number1.to_string(),
},
Info {
entry: "n-grams",
value: format!("{:08b}", index_ref.meta.ngram_indexing),
},
Info {
entry: "facets",
value: index_ref.facets.len().to_string(),
},
Info {
entry: "tokenizer",
value: index_ref.meta.tokenizer.to_string(),
},
Info {
entry: "stemmer",
value: index_ref.meta.stemmer.to_string(),
},
Info {
entry: "stop words",
value: index_ref.shard_vec[0]
.read()
.await
.stop_words
.len()
.to_string(),
},
Info {
entry: "frequent words",
value: index_ref.shard_vec[0]
.read()
.await
.frequent_words
.len()
.to_string(),
},
Info {
entry: "synonyms",
value: index_ref.synonyms_map.len().to_string(),
},
Info {
entry: "similarity",
value: index_ref.meta.lexical_similarity.to_string(),
},
Info {
entry: "Vector",
value: index_ref.is_vector_indexing.to_string(),
},
Info {
entry: "indexed vectors",
value: index_ref
.indexed_vector_count()
.await
.to_formatted_string(&Locale::en)
+ " ("
+ &(index_ref.indexed_vector_count().await / level_count.max(1))
.to_formatted_string(&Locale::en)
+ " per level)",
},
Info {
entry: "indexed clusters",
value: index_ref
.indexed_cluster_count()
.await
.to_formatted_string(&Locale::en)
+ " ("
+ &(index_ref.indexed_cluster_count().await / level_count.max(1))
.to_formatted_string(&Locale::en)
+ " per level)",
},
Info {
entry: "dimensions",
value: index_ref
.vector_dimensions_original
.to_formatted_string(&Locale::en)
+ &dimensions_label,
},
Info {
entry: "precision, quantization",
value: index_ref.vector_precision.to_string()
+ ", "
+ &index_ref.quantization.to_string()
+ &non_affine_string,
},
Info {
entry: "inference",
value: index_ref.meta.inference.to_string(),
},
Info {
entry: "clustering",
value: index_ref.meta.clustering.to_string(),
},
Info {
entry: "similarity",
value: index_ref.vector_similarity.to_string(),
},
Info {
entry: "Query rewriting",
value: "".to_string(),
},
Info {
entry: "spelling correction",
value: if let Some(symspell) = index_ref.symspell_option.as_ref() {
symspell
.read()
.await
.get_dictionary_size()
.to_formatted_string(&Locale::en)
} else {
"None".to_string()
},
},
Info {
entry: "auto completion",
value: s + " " + &s2,
},
];
let mut table = Table::new(info_entries);
table.modify(Columns::first(), Width::increase(26));
table.modify(Columns::last(), Width::truncate(49).suffix("..."));
table.modify(Columns::last(), Width::increase(50));
table.with(Remove::row(Rows::first()));
if index_ref.is_lexical_indexing && index_ref.is_vector_indexing {
table
.with(Style::modern().remove_horizontal().horizontals([
(1, HorizontalLine::inherit(Style::modern())),
(12, HorizontalLine::inherit(Style::modern())),
(13, HorizontalLine::inherit(Style::modern())),
(22, HorizontalLine::inherit(Style::modern())),
(23, HorizontalLine::inherit(Style::modern())),
(30, HorizontalLine::inherit(Style::modern())),
(31, HorizontalLine::inherit(Style::modern())),
]))
.with(BorderColor::filled(Color::FG_BRIGHT_BLACK));
table.modify(
Columns::first(),
BorderColor::filled(Color::FG_BRIGHT_BLACK),
);
table.modify(Columns::last(), BorderColor::filled(Color::FG_BRIGHT_BLACK));
table.with(Modify::new(Rows::one(0)).with(Color::FG_CYAN));
table.with(Modify::new(Rows::one(12)).with(Color::FG_CYAN));
table.with(Modify::new(Rows::one(22)).with(Color::FG_CYAN));
table.with(Modify::new(Rows::one(30)).with(Color::FG_CYAN));
} else if index_ref.is_lexical_indexing {
table.with(Remove::row(Rows::new(23..30)));
table
.with(Style::modern().remove_horizontal().horizontals([
(1, HorizontalLine::inherit(Style::modern())),
(12, HorizontalLine::inherit(Style::modern())),
(13, HorizontalLine::inherit(Style::modern())),
(22, HorizontalLine::inherit(Style::modern())),
(23, HorizontalLine::inherit(Style::modern())),
(24, HorizontalLine::inherit(Style::modern())),
]))
.with(BorderColor::filled(Color::FG_BRIGHT_BLACK));
table.modify(
Columns::first(),
BorderColor::filled(Color::FG_BRIGHT_BLACK),
);
table.modify(Columns::last(), BorderColor::filled(Color::FG_BRIGHT_BLACK));
table.with(Modify::new(Rows::one(0)).with(Color::FG_CYAN));
table.with(Modify::new(Rows::one(12)).with(Color::FG_CYAN));
table.with(Modify::new(Rows::one(22)).with(Color::FG_CYAN));
table.with(Modify::new(Rows::one(23)).with(Color::FG_CYAN));
} else if index_ref.is_vector_indexing {
table.with(Remove::row(Rows::new(13..22)));
table
.with(Style::modern().remove_horizontal().horizontals([
(1, HorizontalLine::inherit(Style::modern())),
(12, HorizontalLine::inherit(Style::modern())),
(13, HorizontalLine::inherit(Style::modern())),
(14, HorizontalLine::inherit(Style::modern())),
(21, HorizontalLine::inherit(Style::modern())),
(22, HorizontalLine::inherit(Style::modern())),
]))
.with(BorderColor::filled(Color::FG_BRIGHT_BLACK));
table.modify(
Columns::first(),
BorderColor::filled(Color::FG_BRIGHT_BLACK),
);
table.modify(Columns::last(), BorderColor::filled(Color::FG_BRIGHT_BLACK));
table.with(Modify::new(Rows::one(0)).with(Color::FG_CYAN));
table.with(Modify::new(Rows::one(12)).with(Color::FG_CYAN));
table.with(Modify::new(Rows::one(13)).with(Color::FG_CYAN));
table.with(Modify::new(Rows::one(21)).with(Color::FG_CYAN));
} else {
table.with(Remove::row(Rows::new(23..30)));
table.with(Remove::row(Rows::new(13..22)));
table
.with(Style::modern().remove_horizontal().horizontals([
(1, HorizontalLine::inherit(Style::modern())),
(12, HorizontalLine::inherit(Style::modern())),
(13, HorizontalLine::inherit(Style::modern())),
(14, HorizontalLine::inherit(Style::modern())),
(15, HorizontalLine::inherit(Style::modern())),
(16, HorizontalLine::inherit(Style::modern())),
]))
.with(BorderColor::filled(Color::FG_BRIGHT_BLACK));
table.modify(
Columns::first(),
BorderColor::filled(Color::FG_BRIGHT_BLACK),
);
table.modify(Columns::last(), BorderColor::filled(Color::FG_BRIGHT_BLACK));
table.with(Modify::new(Rows::one(0)).with(Color::FG_CYAN));
table.with(Modify::new(Rows::one(12)).with(Color::FG_CYAN));
table.with(Modify::new(Rows::one(13)).with(Color::FG_CYAN));
table.with(Modify::new(Rows::one(14)).with(Color::FG_CYAN));
}
println!("{}", table);
}
#[allow(async_fn_in_trait)]
pub trait IngestCsv {
#[allow(clippy::too_many_arguments)]
async fn ingest_csv(
&mut self,
data_path: &Path,
has_header: bool,
quoting: bool,
flexible: bool,
delimiter: u8,
skip_docs: Option<usize>,
num_docs: Option<usize>,
);
}
impl IngestCsv for IndexArc {
async fn ingest_csv(
&mut self,
data_path: &Path,
has_header: bool,
quoting: bool,
flexible: bool,
delimiter: u8,
skip_docs: Option<usize>,
num_docs: Option<usize>,
) {
match data_path.exists() {
true => {
println!("ingesting data from: {}", data_path.display());
let start_time = Instant::now();
let mut docid: usize = 0;
let index_arc_clone = self.clone();
let index_arc_clone_clone = index_arc_clone.clone();
let index_ref = index_arc_clone.read().await;
let mut schema_vec: Vec<String> = vec!["".to_string(); index_ref.schema_map.len()];
for (key, value) in index_ref.schema_map.iter() {
schema_vec[value.field_id] = key.clone();
}
drop(index_ref);
let mut rdr = ReaderBuilder::new()
.has_headers(has_header)
.quoting(quoting)
.delimiter(delimiter)
.terminator(Terminator::CRLF)
.flexible(flexible)
.from_path(data_path)
.unwrap();
let skip = skip_docs.unwrap_or(0);
let max = num_docs.unwrap_or(usize::MAX);
let mut i: usize = 0;
let mut record = csv::StringRecord::new();
while rdr.read_record(&mut record).is_ok() && docid < max {
if i < skip {
i += 1;
continue;
}
if record.is_empty() {
break;
}
let mut document = Document::new();
for (i, element) in record.iter().take(schema_vec.len()).enumerate() {
document.insert(schema_vec[i].clone(), json!(element));
}
index_arc_clone_clone
.index_document(document, FileType::None)
.await;
docid += 1;
}
self.commit().await;
let elapsed_time = start_time.elapsed().as_nanos();
let time_label: &'static str = "index time";
let time_value = (elapsed_time / 1_000_000_000).to_string()
+ " s "
+ &format!("{:.2} min ", elapsed_time as f64 / 1_000_000_000.0 / 60.0)
+ &format!(
"{} doc/s",
(self.read().await.indexed_doc_count().await as u128 * 1_000_000_000
/ elapsed_time)
.to_formatted_string(&Locale::en)
)
+ " "
+ &format!(
"{} doc/day",
((self.read().await.indexed_doc_count().await as u128 * 1_000_000_000
/ elapsed_time)
* 3600
* 24)
.to_formatted_string(&Locale::en)
);
display_index_info(self, time_label, time_value).await
}
false => {
println!("data file not found: {}", data_path.display());
}
}
}
}
pub fn read_ivecs(path: &str) -> Result<Vec<Vec<i32>>, io::Error> {
let file = File::open(path)?;
let mut reader = BufReader::new(file);
let mut ground_truth = Vec::new();
loop {
let mut dim_buf = [0u8; 4];
if reader.read_exact(&mut dim_buf).is_err() {
break;
}
let k = i32::from_le_bytes(dim_buf) as usize;
let mut neighbors = vec![0i32; k];
let mut data_buf = vec![0u8; k * 4];
reader.read_exact(&mut data_buf)?;
for (i, neighbour) in neighbors.iter_mut().enumerate().take(k) {
let start = i * 4;
let bytes = data_buf[start..start + 4].try_into().unwrap();
*neighbour = i32::from_le_bytes(bytes);
}
ground_truth.push(neighbors);
}
Ok(ground_truth)
}
pub fn read_fvecs(path: &str) -> Result<Vec<Vec<f32>>, io::Error> {
let file = File::open(path)?;
let mut reader = BufReader::new(file);
let mut results = Vec::new();
loop {
let mut dim_buf = [0u8; 4];
if reader.read_exact(&mut dim_buf).is_err() {
break;
}
let dim = i32::from_le_bytes(dim_buf) as usize;
let mut vec_data = vec![0.0f32; dim];
let mut float_buf = vec![0u8; dim * 4];
reader.read_exact(&mut float_buf)?;
for (i, v_d) in vec_data.iter_mut().enumerate().take(dim) {
let start = i * 4;
let bytes = float_buf[start..start + 4].try_into().unwrap();
*v_d = f32::from_le_bytes(bytes);
}
results.push(vec_data);
}
Ok(results)
}
use bytemuck::checked::cast_slice_mut;
pub async fn ingest_sift(
index_arc: &Arc<RwLock<Index>>,
data_path: &Path,
max_count: Option<usize>,
) {
match data_path.exists() {
true => {
println!("ingesting data from: {}", data_path.display());
let start_time = Instant::now();
let fvecs_file = File::open(data_path).unwrap();
let mut fvecs_reader = BufReader::new(fvecs_file);
let file_length = fvecs_reader.get_ref().metadata().unwrap().len() as usize;
let mut byte_number = 0;
let mut docid = 0;
loop {
let mut b = [0u8; 4];
fvecs_reader.read_exact(&mut b).unwrap();
let dimension = i32::from_le_bytes(b) as usize;
let mut fvecs = vec![0f32; dimension];
let span = fvecs.as_mut_slice();
let bytes = cast_slice_mut(span);
fvecs_reader.read_exact(bytes).unwrap();
let document = Document::from([(
"vector".to_string(),
Value::String(encode_bytes_to_base64_string(&embedding_to_bytes_be(
&Embedding::F32(fvecs),
))),
)]);
index_arc.index_document(document, FileType::None).await;
docid += 1;
if docid >= max_count.unwrap_or(usize::MAX) {
break;
}
byte_number += 4 + (dimension * 4);
if byte_number >= file_length {
break;
}
}
index_arc.commit().await;
let elapsed_time = start_time.elapsed().as_nanos();
let time_label: &'static str = "index time";
let time_value = (elapsed_time / 1_000_000_000).to_string()
+ " s "
+ &format!("{:.2} min ", elapsed_time as f64 / 1_000_000_000.0 / 60.0)
+ &format!(
"{} doc/s",
(index_arc.read().await.indexed_doc_count().await as u128 * 1_000_000_000
/ elapsed_time)
.to_formatted_string(&Locale::en)
)
+ " "
+ &format!(
"{} doc/day",
((index_arc.read().await.indexed_doc_count().await as u128 * 1_000_000_000
/ elapsed_time)
* 3600
* 24)
.to_formatted_string(&Locale::en)
);
display_index_info(index_arc, time_label, time_value).await
}
false => {
println!("data file not found: {}", data_path.display());
}
}
}