1use crate::hnsw::{DistanceMetric, VectorIndex};
37use ipfrs_core::{Cid, Result};
38use std::collections::HashMap;
39use std::sync::atomic::{AtomicUsize, Ordering};
40use std::sync::Arc;
41
42#[derive(Debug, Clone)]
44pub struct MigrationConfig {
45 pub batch_size: usize,
47 pub verify_after_migration: bool,
49 pub max_concurrent: usize,
51 pub preserve_source: bool,
53}
54
55impl Default for MigrationConfig {
56 fn default() -> Self {
57 Self {
58 batch_size: 1000,
59 verify_after_migration: true,
60 max_concurrent: 4,
61 preserve_source: true,
62 }
63 }
64}
65
66#[derive(Debug, Clone)]
68pub struct MigrationProgress {
69 pub total_entries: usize,
71 pub migrated_entries: usize,
73 pub verified_entries: usize,
75 pub failed_entries: usize,
77 pub estimated_seconds_remaining: f64,
79}
80
81impl MigrationProgress {
82 pub fn completion_percent(&self) -> f64 {
84 if self.total_entries == 0 {
85 return 100.0;
86 }
87 (self.migrated_entries as f64 / self.total_entries as f64) * 100.0
88 }
89
90 pub fn is_complete(&self) -> bool {
92 self.migrated_entries >= self.total_entries
93 }
94}
95
96#[derive(Debug, Clone)]
98pub struct MigrationStats {
99 pub total_duration_seconds: f64,
101 pub throughput: f64,
103 pub success_rate: f64,
105 pub total_migrated: usize,
107 pub total_failed: usize,
109}
110
111pub struct IndexMigration {
113 config: MigrationConfig,
115 progress: Arc<AtomicUsize>,
117}
118
119impl IndexMigration {
120 pub fn new(config: MigrationConfig) -> Self {
122 Self {
123 config,
124 progress: Arc::new(AtomicUsize::new(0)),
125 }
126 }
127
128 pub fn migrate_hnsw_to_hnsw(
130 &self,
131 source: &VectorIndex,
132 target_m: usize,
133 target_ef_construction: usize,
134 ) -> Result<VectorIndex> {
135 let dimension = 768; let metric = DistanceMetric::Cosine;
138
139 let mut target = VectorIndex::new(dimension, metric, target_m, target_ef_construction)?;
140
141 let entries = source.get_all_embeddings();
143 let _total = entries.len();
144
145 for (i, chunk) in entries.chunks(self.config.batch_size).enumerate() {
147 for (cid, embedding) in chunk {
148 target.insert(cid, embedding)?;
149 }
150
151 self.progress
152 .store((i + 1) * self.config.batch_size, Ordering::Relaxed);
153 }
154
155 if self.config.verify_after_migration {
157 self.verify_migration(source, &target)?;
158 }
159
160 Ok(target)
161 }
162
163 fn verify_migration(&self, source: &VectorIndex, target: &VectorIndex) -> Result<()> {
165 let source_entries = source.get_all_embeddings();
166
167 for (cid, _embedding) in &source_entries {
168 if !target.contains(cid) {
169 return Err(ipfrs_core::Error::Internal(format!(
170 "Migration verification failed: CID {:?} missing in target",
171 cid
172 )));
173 }
174 }
175
176 Ok(())
177 }
178
179 pub fn get_progress(&self, total_entries: usize) -> MigrationProgress {
181 let migrated = self.progress.load(Ordering::Relaxed);
182
183 MigrationProgress {
184 total_entries,
185 migrated_entries: migrated,
186 verified_entries: 0,
187 failed_entries: 0,
188 estimated_seconds_remaining: 0.0,
189 }
190 }
191
192 pub fn migrate_with_transform<F>(
194 &self,
195 source: &VectorIndex,
196 dimension: usize,
197 metric: DistanceMetric,
198 m: usize,
199 ef_construction: usize,
200 transform: F,
201 ) -> Result<VectorIndex>
202 where
203 F: Fn(&[f32]) -> Vec<f32>,
204 {
205 let mut target = VectorIndex::new(dimension, metric, m, ef_construction)?;
206
207 let entries = source.get_all_embeddings();
208
209 for (cid, embedding) in entries {
210 let transformed = transform(&embedding);
211 target.insert(&cid, &transformed)?;
212 }
213
214 Ok(target)
215 }
216
217 pub fn export_entries(&self, index: &VectorIndex) -> Vec<(Cid, Vec<f32>)> {
219 index.get_all_embeddings()
220 }
221
222 pub fn import_entries(
224 &self,
225 entries: &[(Cid, Vec<f32>)],
226 dimension: usize,
227 metric: DistanceMetric,
228 m: usize,
229 ef_construction: usize,
230 ) -> Result<VectorIndex> {
231 let mut index = VectorIndex::new(dimension, metric, m, ef_construction)?;
232
233 for (cid, embedding) in entries {
234 index.insert(cid, embedding)?;
235 }
236
237 Ok(index)
238 }
239}
240
241pub struct ConfigMigration;
243
244impl ConfigMigration {
245 pub fn upgrade_quality(source: &VectorIndex) -> Result<VectorIndex> {
247 let migration = IndexMigration::new(MigrationConfig::default());
248
249 migration.migrate_hnsw_to_hnsw(source, 32, 400)
251 }
252
253 pub fn optimize_speed(source: &VectorIndex) -> Result<VectorIndex> {
255 let migration = IndexMigration::new(MigrationConfig::default());
256
257 migration.migrate_hnsw_to_hnsw(source, 8, 100)
259 }
260
261 pub fn balance(source: &VectorIndex) -> Result<VectorIndex> {
263 let migration = IndexMigration::new(MigrationConfig::default());
264
265 migration.migrate_hnsw_to_hnsw(source, 16, 200)
267 }
268}
269
270pub struct DimensionMigration;
272
273impl DimensionMigration {
274 pub fn reduce_dimension(source: &VectorIndex, target_dim: usize) -> Result<VectorIndex> {
277 let migration = IndexMigration::new(MigrationConfig::default());
278
279 let transform = |embedding: &[f32]| -> Vec<f32> {
281 embedding[..target_dim.min(embedding.len())].to_vec()
282 };
283
284 migration.migrate_with_transform(
285 source,
286 target_dim,
287 DistanceMetric::Cosine,
288 16,
289 200,
290 transform,
291 )
292 }
293}
294
295pub struct MetricMigration;
297
298impl MetricMigration {
299 pub fn change_metric(source: &VectorIndex, new_metric: DistanceMetric) -> Result<VectorIndex> {
301 let entries = source.get_all_embeddings();
302 let dimension = 768; let mut target = VectorIndex::new(dimension, new_metric, 16, 200)?;
305
306 for (cid, embedding) in entries {
307 target.insert(&cid, &embedding)?;
308 }
309
310 Ok(target)
311 }
312
313 pub fn normalize_for_cosine(source: &VectorIndex) -> Result<VectorIndex> {
315 let migration = IndexMigration::new(MigrationConfig::default());
316
317 let transform = |embedding: &[f32]| -> Vec<f32> {
318 let norm: f32 = embedding.iter().map(|x| x * x).sum::<f32>().sqrt();
319 if norm > 1e-6 {
320 embedding.iter().map(|x| x / norm).collect()
321 } else {
322 embedding.to_vec()
323 }
324 };
325
326 migration.migrate_with_transform(source, 768, DistanceMetric::Cosine, 16, 200, transform)
327 }
328}
329
330pub struct BatchMigration {
332 batch_size: usize,
334 stats: HashMap<String, usize>,
336}
337
338impl BatchMigration {
339 pub fn new(batch_size: usize) -> Self {
341 Self {
342 batch_size,
343 stats: HashMap::new(),
344 }
345 }
346
347 pub fn migrate_with_callback<F>(
349 &mut self,
350 source: &VectorIndex,
351 target: &mut VectorIndex,
352 mut callback: F,
353 ) -> Result<()>
354 where
355 F: FnMut(usize, usize),
356 {
357 let entries = source.get_all_embeddings();
358 let total = entries.len();
359
360 for (i, chunk) in entries.chunks(self.batch_size).enumerate() {
361 for (cid, embedding) in chunk {
362 target.insert(cid, embedding)?;
363 }
364
365 let migrated = (i + 1) * self.batch_size.min(total);
366 callback(migrated, total);
367 }
368
369 Ok(())
370 }
371
372 pub fn get_stats(&self) -> &HashMap<String, usize> {
374 &self.stats
375 }
376}
377
378impl Default for BatchMigration {
379 fn default() -> Self {
380 Self::new(1000)
381 }
382}
383
384#[cfg(test)]
385mod tests {
386 use super::*;
387 use multihash_codetable::{Code, MultihashDigest};
388
389 fn create_test_index() -> VectorIndex {
390 let mut index = VectorIndex::new(768, DistanceMetric::Cosine, 16, 200).unwrap();
391
392 for i in 0..10 {
393 let data = format!("test_vector_{}", i);
394 let hash = Code::Sha2_256.digest(data.as_bytes());
395 let cid = Cid::new_v1(0x55, hash);
396 let embedding = vec![i as f32 * 0.1; 768];
397 index.insert(&cid, &embedding).unwrap();
398 }
399
400 index
401 }
402
403 #[test]
404 fn test_migration_config_default() {
405 let config = MigrationConfig::default();
406 assert_eq!(config.batch_size, 1000);
407 assert!(config.verify_after_migration);
408 assert_eq!(config.max_concurrent, 4);
409 }
410
411 #[test]
412 fn test_migration_progress() {
413 let progress = MigrationProgress {
414 total_entries: 100,
415 migrated_entries: 50,
416 verified_entries: 0,
417 failed_entries: 0,
418 estimated_seconds_remaining: 10.0,
419 };
420
421 assert_eq!(progress.completion_percent(), 50.0);
422 assert!(!progress.is_complete());
423 }
424
425 #[test]
426 fn test_migration_progress_complete() {
427 let progress = MigrationProgress {
428 total_entries: 100,
429 migrated_entries: 100,
430 verified_entries: 100,
431 failed_entries: 0,
432 estimated_seconds_remaining: 0.0,
433 };
434
435 assert_eq!(progress.completion_percent(), 100.0);
436 assert!(progress.is_complete());
437 }
438
439 #[test]
440 fn test_index_migration_creation() {
441 let config = MigrationConfig::default();
442 let migration = IndexMigration::new(config);
443 let progress = migration.get_progress(100);
444
445 assert_eq!(progress.migrated_entries, 0);
446 }
447
448 #[test]
449 fn test_export_entries() {
450 let index = create_test_index();
451 let migration = IndexMigration::new(MigrationConfig::default());
452
453 let entries = migration.export_entries(&index);
454 assert_eq!(entries.len(), 10);
455 }
456
457 #[test]
458 fn test_import_entries() {
459 let source = create_test_index();
460 let migration = IndexMigration::new(MigrationConfig::default());
461
462 let entries = migration.export_entries(&source);
463 let imported = migration
464 .import_entries(&entries, 768, DistanceMetric::Cosine, 16, 200)
465 .unwrap();
466
467 assert_eq!(imported.len(), source.len());
468 }
469
470 #[test]
471 fn test_migrate_with_transform() {
472 let source = create_test_index();
473 let migration = IndexMigration::new(MigrationConfig::default());
474
475 let transform =
477 |embedding: &[f32]| -> Vec<f32> { embedding.iter().map(|x| x * 2.0).collect() };
478
479 let target = migration
480 .migrate_with_transform(&source, 768, DistanceMetric::Cosine, 16, 200, transform)
481 .unwrap();
482
483 assert_eq!(target.len(), source.len());
484 }
485
486 #[test]
487 fn test_config_migration_upgrade() {
488 let source = create_test_index();
489 let upgraded = ConfigMigration::upgrade_quality(&source).unwrap();
490
491 assert_eq!(upgraded.len(), source.len());
492 }
493
494 #[test]
495 fn test_config_migration_speed() {
496 let source = create_test_index();
497 let optimized = ConfigMigration::optimize_speed(&source).unwrap();
498
499 assert_eq!(optimized.len(), source.len());
500 }
501
502 #[test]
503 fn test_config_migration_balance() {
504 let source = create_test_index();
505 let balanced = ConfigMigration::balance(&source).unwrap();
506
507 assert_eq!(balanced.len(), source.len());
508 }
509
510 #[test]
511 fn test_dimension_reduction() {
512 let source = create_test_index();
513 let reduced = DimensionMigration::reduce_dimension(&source, 384).unwrap();
514
515 assert_eq!(reduced.len(), source.len());
516 }
517
518 #[test]
519 fn test_metric_change() {
520 let source = create_test_index();
521 let changed = MetricMigration::change_metric(&source, DistanceMetric::L2).unwrap();
522
523 assert_eq!(changed.len(), source.len());
524 }
525
526 #[test]
527 fn test_normalize_for_cosine() {
528 let source = create_test_index();
529 let normalized = MetricMigration::normalize_for_cosine(&source).unwrap();
530
531 assert_eq!(normalized.len(), source.len());
532 }
533
534 #[test]
535 fn test_batch_migration() {
536 let source = create_test_index();
537 let mut target = VectorIndex::new(768, DistanceMetric::Cosine, 16, 200).unwrap();
538
539 let mut batch_migration = BatchMigration::new(5);
540 let mut callback_count = 0;
541
542 batch_migration
543 .migrate_with_callback(&source, &mut target, |migrated, total| {
544 callback_count += 1;
545 assert!(migrated <= total);
546 })
547 .unwrap();
548
549 assert_eq!(target.len(), source.len());
550 assert!(callback_count > 0);
551 }
552}