use std::collections::BTreeMap;
use std::fs::File;
use std::io::{BufReader, Read, Write};
use std::path::Path;
use std::time::{SystemTime, UNIX_EPOCH};
use log::{info, warn};
use rustc_hash::FxHashMap;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use infisearch_common::dictionary::Dictionary;
use infisearch_common::{bitmap, METADATA_FILE};
use infisearch_common::metadata::MetadataReader;
use crate::indexer::output_config::InfiOutputConfig;
use crate::utils::fs_utils;
use crate::{INFISEARCH_VER, i_debug, OLD_SOURCE_CONFIG, OUTPUT_CONFIG_FILE};
lazy_static! {
static ref CURRENT_MILLIS: u128 = SystemTime::now().duration_since(UNIX_EPOCH)
.expect("Failed to obtain current system time. Consider using the --incremental-content-hash option.")
.as_millis();
}
static INCREMENTAL_INFO_FILE_NAME: &str = "_incremental_info.json";
fn get_default_dictionary() -> Dictionary {
Dictionary { term_infos: BTreeMap::default() }
}
#[derive(Serialize, Deserialize)]
struct DocIdsAndFileHash(
Vec<u32>, u32, #[serde(skip)] bool, Vec<String>, );
static ABSENT_HASH: u32 = 126487710;
#[derive(Serialize, Deserialize)]
pub struct IncrementalIndexInfo {
pub use_content_hash: bool,
mappings: FxHashMap<String, DocIdsAndFileHash>,
#[serde(skip)]
inv_mappings: FxHashMap<u32, String>,
#[serde(skip)]
inv_mappings_secondary: Vec<FxHashMap<u32, Vec<String>>>,
pub last_pl_number: u32,
pub num_deleted_docs: u32,
pub pl_names_to_cache: Vec<u32>,
#[serde(skip)]
pub invalidation_vector: Vec<u8>,
#[serde(skip, default = "get_default_dictionary")]
pub dictionary: Dictionary,
}
impl IncrementalIndexInfo {
pub fn empty(use_content_hash: bool) -> (
Option<InfiOutputConfig>,
Option<MetadataReader>,
IncrementalIndexInfo,
) {
(
None,
None,
IncrementalIndexInfo {
use_content_hash,
mappings: FxHashMap::default(),
inv_mappings: FxHashMap::default(),
inv_mappings_secondary: Vec::new(),
last_pl_number: 0,
num_deleted_docs: 0,
pl_names_to_cache: Vec::new(),
invalidation_vector: Vec::new(),
dictionary: get_default_dictionary(),
},
)
}
pub fn new_from_output_folder(
output_folder_path_inner: &Path,
output_folder_path: &Path,
json_config: &Value,
is_incremental: bool,
use_content_hash: bool,
) -> (Option<InfiOutputConfig>, Option<MetadataReader>, IncrementalIndexInfo) {
if !is_incremental {
return IncrementalIndexInfo::empty(use_content_hash);
}
if let Ok(meta) = std::fs::metadata(output_folder_path.join(INCREMENTAL_INFO_FILE_NAME)) {
if !meta.is_file() {
info!("Old incremental index info missing. Running a full reindex.");
return IncrementalIndexInfo::empty(use_content_hash);
}
} else {
info!("Old incremental index info missing. Running a full reindex.");
return IncrementalIndexInfo::empty(use_content_hash);
}
let old_output_config = output_folder_path.join(OUTPUT_CONFIG_FILE);
let old_output_config = if old_output_config.exists() {
let old_output_conf_str = std::fs::read_to_string(&old_output_config).unwrap();
let deserialized: Result<InfiOutputConfig, _> = serde_json::from_str(&old_output_conf_str);
if let Ok(old_output_conf) = deserialized {
if old_output_conf.ver != INFISEARCH_VER {
info!("InfiSearch version changed. Running a full reindex.");
return IncrementalIndexInfo::empty(use_content_hash);
}
old_output_conf
} else {
info!("Old output config invalid. Running a full reindex.");
return IncrementalIndexInfo::empty(use_content_hash);
}
} else {
warn!("Old output config missing. Running a full reindex.");
return IncrementalIndexInfo::empty(use_content_hash);
};
if let Ok(mut file) = File::open(output_folder_path.join(OLD_SOURCE_CONFIG)) {
let mut old_config = "".to_owned();
file.read_to_string(&mut old_config).expect("Unable to read old config file");
let old_json_config: Value = serde_json::from_str(&old_config)
.expect(&(OLD_SOURCE_CONFIG.to_owned() + " does not match schema!"));
if *json_config != old_json_config {
info!("Configuration file changed. Running a full reindex.");
return IncrementalIndexInfo::empty(use_content_hash);
}
} else {
warn!("Old configuration file missing. Running a full reindex.");
return IncrementalIndexInfo::empty(use_content_hash);
}
let info_file = File::open(output_folder_path.join(INCREMENTAL_INFO_FILE_NAME))
.expect("Failed to obtain incremental index info file handle.");
let mut info: IncrementalIndexInfo = serde_json::from_reader(BufReader::new(info_file))
.expect("incremental index info deserialization failed!");
if info.use_content_hash != use_content_hash {
info!("Content hash option changed. Running a full reindex.");
return IncrementalIndexInfo::empty(use_content_hash);
}
let old_output_folder_inner = output_folder_path.join(&old_output_config.index_ver);
if let Err(e) = std::fs::rename(&old_output_folder_inner, &output_folder_path_inner) {
info!("Failed to rename old output folder, trying copy-paste & removal.");
if let Ok(_) = dircpy::copy_dir(&old_output_folder_inner, &output_folder_path_inner) {
fs_utils::clean_dir(&old_output_folder_inner);
} else {
warn!(
"Failed to rename old output folder, performing a full index.\n Old: {}\n New: {}\n Cause: {}",
old_output_folder_inner.to_string_lossy(),
output_folder_path_inner.to_string_lossy(),
e,
);
return IncrementalIndexInfo::empty(use_content_hash);
}
}
let metadata_rdr = if let Ok(mut file) = File::open(output_folder_path_inner.join(METADATA_FILE)) {
let mut buf = Vec::new();
file.read_to_end(&mut buf).unwrap();
MetadataReader::new(buf)
} else {
warn!("metadata file missing. Running a full reindex.");
return IncrementalIndexInfo::empty(use_content_hash);
};
metadata_rdr.get_invalidation_vec(&mut info.invalidation_vector);
(Some(old_output_config), Some(metadata_rdr), info)
}
pub fn setup_dictionary(&mut self, metadata_rdr: &MetadataReader) {
self.dictionary = metadata_rdr.setup_dictionary();
}
pub fn add_doc_to_file(&mut self, external_id: &str, doc_id: u32) {
self.mappings
.get_mut(external_id)
.expect("Get path for index file should always have an entry when adding doc id")
.0
.push(doc_id);
self.inv_mappings.insert(doc_id, external_id.to_owned());
}
pub fn set_file(&mut self, external_id: &str, path: &Path, input_folder_path: &Path) -> bool {
if let Some(old_hash) = self.mappings.get_mut(external_id) {
let new_hash = Self::get_file_hash(
self.use_content_hash,
path,
input_folder_path,
&old_hash.3,
);
old_hash.2 = true;
if old_hash.1 != new_hash {
i_debug!("{} was updated", external_id);
self.num_deleted_docs += old_hash.0.len() as u32;
for doc_id in old_hash.0.drain(..) {
bitmap::set(&mut self.invalidation_vector, doc_id as usize);
}
old_hash.3.clear();
return false;
}
true
} else {
self.mappings.insert(external_id.to_owned(), DocIdsAndFileHash(Vec::new(), 0, true, Vec::new()));
false
}
}
pub fn extend_secondary_inv_mappings(&mut self, mappings: Vec<FxHashMap<u32, Vec<String>>>) {
self.inv_mappings_secondary.extend(mappings);
}
fn get_timestamp(path: &Path) -> u128 {
if !path.is_file() {
return ABSENT_HASH as u128;
}
if let Ok(metadata) = std::fs::metadata(path) {
if let Ok(modified) = metadata.modified() {
modified.duration_since(UNIX_EPOCH)
.expect("Failed to calculate timestamp. Consider using the --incremental-content-hash option")
.as_millis()
} else {
i_debug!("Obtaining modified timestamp failed for {}", path.to_string_lossy());
*CURRENT_MILLIS
}
} else {
i_debug!("Obtaining metadata failed for {}", path.to_string_lossy());
*CURRENT_MILLIS
}
}
fn get_file_hash(
use_content_hash: bool,
path: &Path,
input_folder_path: &Path,
secondary_paths: &Vec<String>,
) -> u32 {
if use_content_hash {
static ERR: &str = "Failed to read file for calculating content hash!";
let mut buf = if path.is_file() {
std::fs::read(path).expect(ERR)
} else {
ABSENT_HASH.to_le_bytes().to_vec()
};
for secondary_path in secondary_paths {
let secondary_path = input_folder_path.join(secondary_path);
if secondary_path.is_file() {
File::open(secondary_path)
.expect(ERR)
.read_to_end(&mut buf)
.expect(ERR);
} else {
buf.extend_from_slice(&ABSENT_HASH.to_le_bytes());
}
}
crc32fast::hash(&buf)
} else {
let mut timestamps = Vec::with_capacity(1 + secondary_paths.len());
timestamps.push(Self::get_timestamp(path));
for secondary_path in secondary_paths {
timestamps.push(Self::get_timestamp(&input_folder_path.join(secondary_path)));
}
crc32fast::hash(unsafe {
std::slice::from_raw_parts(timestamps.as_ptr() as *const u8, timestamps.len() * 16)
})
}
}
pub fn delete_unencountered_external_ids(&mut self) {
self.mappings = std::mem::take(&mut self.mappings)
.into_iter()
.filter(|(_path, docids_and_filehash)| {
if !docids_and_filehash.2 {
i_debug!("{} was deleted", _path);
for &doc_id in docids_and_filehash.0.iter() {
bitmap::set(&mut self.invalidation_vector, doc_id as usize);
self.num_deleted_docs += 1;
}
}
docids_and_filehash.2
})
.collect();
}
pub fn write_invalidation_vec(&mut self, doc_id_counter: u32) -> Vec<u8> {
let num_bytes = (doc_id_counter as f64 / 8.0).ceil() as usize;
self.invalidation_vector.extend(vec![0; num_bytes - self.invalidation_vector.len()]);
self.invalidation_vector.clone()
}
fn update_file_hashes(&mut self, input_folder_path: &Path) {
for map in std::mem::take(&mut self.inv_mappings_secondary) {
for (doc_id, secondary_ids) in map {
let main_id = self.inv_mappings.get(&doc_id)
.expect("Inverse mapping should contain doc_id");
let doc_id_and_filehash = self.mappings.get_mut(main_id)
.expect("Mappings should contain main_id");
doc_id_and_filehash.3.extend(secondary_ids);
}
}
for (main_id, doc_id_and_filehash) in self.mappings.iter_mut() {
doc_id_and_filehash.1 = Self::get_file_hash(
self.use_content_hash,
&input_folder_path.join(main_id),
input_folder_path,
&doc_id_and_filehash.3,
);
}
}
pub fn write_info(&mut self, input_folder_path: &Path, output_folder_path: &Path) {
self.update_file_hashes(input_folder_path);
let serialized = serde_json::to_string(self).unwrap();
File::create(output_folder_path.join(INCREMENTAL_INFO_FILE_NAME))
.unwrap()
.write_all(serialized.as_bytes())
.unwrap();
}
}