use apithing::ApiOperation;
use shardex::{
api::{
operations::{
BatchAddPostings, CreateIndex, Flush, GetPerformanceStats, GetStats, IncrementalAdd, RemoveDocuments,
Search,
},
parameters::{
BatchAddPostingsParams, CreateIndexParams, FlushParams, GetPerformanceStatsParams, GetStatsParams,
IncrementalAddParams, RemoveDocumentsParams, SearchParams,
},
ShardexContext,
},
DocumentId, Posting,
};
use std::error::Error;
use std::thread::sleep;
use std::time::{Duration, Instant};
fn main() -> Result<(), Box<dyn Error>> {
println!("Shardex Batch Operations Example (Conservative)");
println!("===============================================");
let temp_dir = std::env::temp_dir().join("shardex_batch_conservative");
if temp_dir.exists() {
std::fs::remove_dir_all(&temp_dir)?;
}
std::fs::create_dir_all(&temp_dir)?;
let batch_size: usize = std::env::var("BATCH_SIZE")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(25); let num_batches: usize = std::env::var("NUM_BATCHES")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(3);
let vector_size: usize = std::env::var("VECTOR_SIZE")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(256);
let total_docs: usize = batch_size * num_batches;
let mut context = ShardexContext::new();
let shard_size: usize = std::env::var("SHARD_SIZE")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(10000); let batch_write_interval_ms: u64 = std::env::var("BATCH_WRITE_INTERVAL_MS")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(100); let slop_factor: u32 = std::env::var("SLOP_FACTOR")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(3); let bloom_filter_size: usize = std::env::var("BLOOM_FILTER_SIZE")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(1024); let wal_segment_size: usize = std::env::var("WAL_SEGMENT_SIZE")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(2 * 1024 * 1024);
let create_params = CreateIndexParams::builder()
.directory_path(temp_dir.clone())
.vector_size(vector_size)
.shard_size(shard_size)
.batch_write_interval_ms(batch_write_interval_ms)
.default_slop_factor(slop_factor)
.bloom_filter_size(bloom_filter_size)
.wal_segment_size(wal_segment_size)
.build()?;
println!("Index configuration:");
println!(" SHARD_SIZE={} (set via env var or default)", shard_size);
println!(
" BATCH_WRITE_INTERVAL_MS={} (set via env var or default)",
batch_write_interval_ms
);
println!(" SLOP_FACTOR={} (set via env var or default)", slop_factor);
println!(" BLOOM_FILTER_SIZE={} (set via env var or default)", bloom_filter_size);
println!(" WAL_SEGMENT_SIZE={} (set via env var or default)", wal_segment_size);
CreateIndex::execute(&mut context, &create_params)?;
println!("\n1. Conservative Batch Indexing");
println!("==============================");
println!("Batch configuration:");
println!(" BATCH_SIZE={} (set via env var or default)", batch_size);
println!(" NUM_BATCHES={} (set via env var or default)", num_batches);
println!(" VECTOR_SIZE={} (set via env var or default)", vector_size);
println!(
"Indexing {} documents in {} batches of {}",
total_docs, num_batches, batch_size
);
let mut total_indexing_time = Duration::new(0, 0);
for batch_num in 0..num_batches {
println!("\nProcessing batch {} of {}", batch_num + 1, num_batches);
let batch_start = batch_num * batch_size;
let postings = generate_document_batch(batch_start, batch_size, vector_size);
let batch_params = match BatchAddPostingsParams::with_flush_and_tracking(postings) {
Ok(params) => params,
Err(e) => {
eprintln!("Error creating batch parameters: {}", e);
eprintln!("This could happen with invalid postings or empty batches");
return Err(e.into());
}
};
let batch_stats = match BatchAddPostings::execute(&mut context, &batch_params) {
Ok(stats) => stats,
Err(e) => {
eprintln!("Batch operation failed: {}", e);
eprintln!("Recovery strategy: Continue with next batch or retry with smaller batch size");
return Err(e.into());
}
};
total_indexing_time += batch_stats.processing_time;
println!(" Batch completed in {:?}", batch_stats.processing_time);
println!(" Operations flushed: {}", batch_stats.operations_flushed);
println!(" Throughput: {:.0} docs/sec", batch_stats.throughput_docs_per_sec);
let stats_params = GetStatsParams::new();
let stats = GetStats::execute(&mut context, &stats_params)?;
println!(
" Total indexed: {} documents in {} shards",
stats.total_postings, stats.total_shards
);
}
println!("\nBatch indexing summary:");
println!(" Total time: {:?}", total_indexing_time);
println!(
" Overall throughput: {:.0} docs/sec",
total_docs as f64 / total_indexing_time.as_secs_f64()
);
println!("\n2. Incremental Operations with Monitoring");
println!("=========================================");
let initial_stats_params = GetStatsParams::new();
let initial_stats = GetStats::execute(&mut context, &initial_stats_params)?;
println!("Initial stats: {} postings", initial_stats.total_postings);
for i in 0..3 {
println!("Adding increment {} of 10 documents...", i + 1);
let increment = generate_document_batch(total_docs + i * 10, 10, vector_size);
let incremental_params = match IncrementalAddParams::with_batch_id(increment, format!("increment_{}", i + 1)) {
Ok(params) => params,
Err(e) => {
eprintln!("Error creating incremental parameters: {}", e);
eprintln!("Recovery strategy: Skip this increment and continue");
continue;
}
};
let incremental_stats = match IncrementalAdd::execute(&mut context, &incremental_params) {
Ok(stats) => stats,
Err(e) => {
eprintln!("Incremental add failed for increment {}: {}", i + 1, e);
eprintln!("Recovery strategy: Continue with next increment");
continue;
}
};
println!(
" Added {} postings in {:?}",
incremental_stats.postings_added, incremental_stats.processing_time
);
if i % 2 == 0 {
let flush_params = FlushParams::new();
Flush::execute(&mut context, &flush_params)?;
}
let current_stats_params = GetStatsParams::new();
let current_stats = GetStats::execute(&mut context, ¤t_stats_params)?;
println!(
" Current stats: {} postings, {:.1}MB memory",
current_stats.total_postings,
current_stats.memory_usage as f64 / 1024.0 / 1024.0
);
sleep(Duration::from_millis(200));
}
let final_flush_params = FlushParams::new();
Flush::execute(&mut context, &final_flush_params)?;
let final_stats_params = GetStatsParams::new();
let final_stats = GetStats::execute(&mut context, &final_stats_params)?;
println!("\n3. Conservative Document Removal");
println!("================================");
println!("Before removal - Active postings: {}", final_stats.active_postings);
let mut docs_to_remove = Vec::new();
for i in (5..=final_stats.total_postings).step_by(5) {
docs_to_remove.push(i as u128);
}
println!("Removing {} documents...", docs_to_remove.len());
if docs_to_remove.is_empty() {
println!("No documents to remove (stats show 0 total postings)");
println!(
"This is expected behavior with conservative batching - documents may not be immediately visible in stats"
);
} else {
let removal_params = match RemoveDocumentsParams::new(docs_to_remove) {
Ok(params) => params,
Err(e) => {
eprintln!("Error creating removal parameters: {}", e);
eprintln!("This could happen if the document list is empty or contains invalid IDs");
return Err(e.into());
}
};
let removal_stats = match RemoveDocuments::execute(&mut context, &removal_params) {
Ok(stats) => {
println!("Document removal completed successfully");
stats
}
Err(e) => {
eprintln!("Document removal failed: {}", e);
eprintln!("Recovery strategy: Check document IDs exist or continue without removal");
println!("Continuing example despite removal failure...");
return Ok(());
}
};
let flush_params = FlushParams::new();
Flush::execute(&mut context, &flush_params)?;
let after_removal_stats_params = GetStatsParams::new();
let after_removal_stats = GetStats::execute(&mut context, &after_removal_stats_params)?;
println!("Removal completed in {:?}", removal_stats.processing_time);
println!(
"Removed {} documents, {} not found",
removal_stats.documents_removed, removal_stats.documents_not_found
);
println!(
"After removal - Active postings: {}",
after_removal_stats.active_postings
);
}
println!("\n4. Search Performance Testing");
println!("=============================");
let query_vector = generate_test_vector(vector_size);
for k in [1, 3, 5, 10] {
let search_params = SearchParams::builder()
.query_vector(query_vector.clone())
.k(k)
.slop_factor(None)
.build()?;
let search_start = Instant::now();
let results = match Search::execute(&mut context, &search_params) {
Ok(results) => results,
Err(e) => {
eprintln!("Search failed for k={}: {}", k, e);
eprintln!("Recovery strategy: Try with different parameters or skip this search");
continue;
}
};
let search_time = search_start.elapsed();
println!(" k={}: {:?} ({} results)", k, search_time, results.len());
}
println!("\n5. Final Statistics");
println!("==================");
let detailed_stats_params = GetStatsParams::new();
let detailed_stats = GetStats::execute(&mut context, &detailed_stats_params)?;
let perf_stats_params = GetPerformanceStatsParams::detailed();
let perf_stats = GetPerformanceStats::execute(&mut context, &perf_stats_params)?;
println!("Final comprehensive statistics:");
println!(" Total shards: {}", detailed_stats.total_shards);
println!(" Total postings: {}", detailed_stats.total_postings);
println!(" Active postings: {}", detailed_stats.active_postings);
println!(
" Memory usage: {:.2} MB",
detailed_stats.memory_usage as f64 / 1024.0 / 1024.0
);
println!(
" Disk usage: {:.2} MB",
detailed_stats.disk_usage as f64 / 1024.0 / 1024.0
);
println!(
" Average shard utilization: {:.1}%",
detailed_stats.average_shard_utilization * 100.0
);
println!("\nPerformance statistics:");
println!(" Total operations: {}", perf_stats.total_operations);
println!(" Average latency: {:?}", perf_stats.average_latency);
println!(" Throughput: {:.2} ops/sec", perf_stats.throughput);
std::fs::remove_dir_all(&temp_dir)?;
println!("\nConservative batch operations example completed!");
Ok(())
}
fn generate_document_batch(start_id: usize, count: usize, vector_size: usize) -> Vec<Posting> {
(0..count)
.map(|i| {
let doc_id = start_id + i + 1;
let document_id = DocumentId::from_raw(doc_id as u128);
let vector = generate_deterministic_vector(doc_id, vector_size);
Posting {
document_id,
start: 0,
length: 50 + (doc_id % 50) as u32, vector,
}
})
.collect()
}
fn generate_deterministic_vector(seed: usize, size: usize) -> Vec<f32> {
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
let mut vector = Vec::with_capacity(size);
let mut hasher = DefaultHasher::new();
seed.hash(&mut hasher);
for i in 0..size {
(seed + i).hash(&mut hasher);
let value = ((hasher.finish() % 10000) as f32 - 5000.0) / 5000.0; vector.push(value);
}
let magnitude: f32 = vector.iter().map(|x| x * x).sum::<f32>().sqrt();
if magnitude > 0.0 {
for value in &mut vector {
*value /= magnitude;
}
}
vector
}
fn generate_test_vector(size: usize) -> Vec<f32> {
generate_deterministic_vector(99999, size) }