1use std::collections::HashMap;
35
36#[derive(Clone, Copy, PartialEq, Eq, Hash, Debug)]
45pub struct BlockKey {
46 pub tensor_id: u128,
47 pub block_index: u32,
48}
49
50#[derive(Clone, Copy, PartialEq, Eq, Debug, Hash)]
55#[repr(u8)]
56pub enum Tier {
57 Tier0 = 0,
59 Tier1 = 1,
61 Tier2 = 2,
63 Tier3 = 3,
65}
66
67#[derive(Clone, Copy, PartialEq, Eq, Debug)]
69#[repr(u8)]
70pub enum DType {
71 F32 = 0,
72 F16 = 1,
73 BF16 = 2,
74}
75
76#[derive(Clone, Copy, PartialEq, Eq, Debug)]
78#[repr(u8)]
79pub enum ReconstructPolicy {
80 None = 0,
82 Delta = 1,
84 Factor = 2,
86}
87
88#[derive(Clone, Debug)]
90pub struct BlockMeta {
91 pub key: BlockKey,
92 pub dtype: DType,
93 pub tier: Tier,
94 pub bits: u8,
96 pub scale: f32,
98 pub zero_point: i16,
100 pub created_at: u64,
102 pub last_access_at: u64,
104 pub access_count: u32,
106 pub ema_rate: f32,
108 pub window: u64,
110 pub checksum: u32,
112 pub reconstruct: ReconstructPolicy,
114 pub tier_age: u32,
116 pub lineage_parent: Option<u128>,
118 pub block_bytes: u32,
120}
121
122#[derive(Clone, Debug, PartialEq, Eq)]
124pub enum StoreError {
125 TensorEvicted,
127 BlockNotFound,
129 ChecksumMismatch,
131 IOError,
133 BudgetExhausted,
135 InvalidBlock,
137 DeltaChainTooLong,
139 ReconstructionFailed,
141 InvalidData,
143 ChainFull,
145}
146
147pub trait Clock {
153 fn now_ticks(&self) -> u64;
155}
156
157pub trait BlockIO {
159 fn read_block(&self, tier: Tier, key: BlockKey, dst: &mut [u8]) -> Result<usize, StoreError>;
162
163 fn write_block(&mut self, tier: Tier, key: BlockKey, src: &[u8]) -> Result<(), StoreError>;
165
166 fn delete_block(&mut self, tier: Tier, key: BlockKey) -> Result<(), StoreError>;
168}
169
170pub trait MetaLog {
172 fn append(&mut self, rec: &BlockMeta) -> Result<(), StoreError>;
174
175 fn get(&self, key: BlockKey) -> Option<&BlockMeta>;
177
178 fn iter(&self) -> Box<dyn Iterator<Item = &BlockMeta> + '_>;
180}
181
182pub fn crc32(data: &[u8]) -> u32 {
192 let mut crc: u32 = 0xFFFF_FFFF;
193 for &byte in data {
194 crc ^= byte as u32;
195 for _ in 0..8 {
196 if crc & 1 != 0 {
197 crc = (crc >> 1) ^ 0xEDB8_8320;
198 } else {
199 crc >>= 1;
200 }
201 }
202 }
203 !crc
204}
205
206fn bits_for_tier(tier: Tier) -> u8 {
212 match tier {
213 Tier::Tier0 => 0,
214 Tier::Tier1 => 8,
215 Tier::Tier2 => 7,
216 Tier::Tier3 => 3,
217 }
218}
219
220#[inline]
224fn qmax(bits: u8) -> i32 {
225 if bits == 0 || bits > 8 {
226 return 0;
227 }
228 (1i32 << (bits - 1)) - 1
229}
230
231struct BlockData {
233 element_count: u32,
235 packed: Vec<u8>,
237}
238
239fn quantize_block(data: &[f32], bits: u8) -> (Vec<u8>, f32) {
243 let qm = qmax(bits);
244 if qm == 0 || data.is_empty() {
245 return (Vec::new(), 0.0);
246 }
247 let qm_f = qm as f32;
248
249 let max_abs = data
251 .iter()
252 .filter(|v| v.is_finite())
253 .fold(0.0f32, |acc, v| acc.max(v.abs()));
254
255 let scale = if max_abs == 0.0 { 0.0 } else { max_abs / qm_f };
256 let inv_scale = if scale == 0.0 { 0.0 } else { 1.0 / scale };
257
258 let bits_u32 = bits as u32;
259 let needed = (data.len() * bits as usize).div_ceil(8);
260 let mut packed = Vec::with_capacity(needed);
261
262 let mut acc: u64 = 0;
263 let mut acc_bits: u32 = 0;
264
265 for &v in data {
266 let q = if v.is_finite() {
267 (v * inv_scale).round() as i32
268 } else {
269 0
270 }
271 .clamp(-qm, qm);
272
273 let u = (q + qm) as u32;
274 acc |= (u as u64) << acc_bits;
275 acc_bits += bits_u32;
276
277 while acc_bits >= 8 {
278 packed.push((acc & 0xFF) as u8);
279 acc >>= 8;
280 acc_bits -= 8;
281 }
282 }
283
284 if acc_bits > 0 {
285 packed.push((acc & 0xFF) as u8);
286 }
287
288 (packed, scale)
289}
290
291fn dequantize_block(packed: &[u8], scale: f32, bits: u8, count: usize, out: &mut [f32]) -> usize {
295 let qm = qmax(bits);
296 if qm == 0 || packed.is_empty() {
297 return 0;
298 }
299
300 let bits_u32 = bits as u32;
301 let mask = (1u64 << bits_u32) - 1;
302 let limit = count.min(out.len());
303
304 let mut acc: u64 = 0;
305 let mut acc_bits: u32 = 0;
306 let mut byte_idx: usize = 0;
307 let mut written: usize = 0;
308
309 while written < limit {
310 while acc_bits < bits_u32 && byte_idx < packed.len() {
311 acc |= (packed[byte_idx] as u64) << acc_bits;
312 acc_bits += 8;
313 byte_idx += 1;
314 }
315 if acc_bits < bits_u32 {
316 break;
317 }
318
319 let u = (acc & mask) as i32;
320 acc >>= bits_u32;
321 acc_bits -= bits_u32;
322
323 out[written] = (u - qm) as f32 * scale;
324 written += 1;
325 }
326
327 written
328}
329
330fn block_checksum(packed: &[u8], scale: f32) -> u32 {
332 let scale_bytes = scale.to_le_bytes();
333 let total = packed.len() + scale_bytes.len();
334 let mut buf = Vec::with_capacity(total);
335 buf.extend_from_slice(packed);
336 buf.extend_from_slice(&scale_bytes);
337 crc32(&buf)
338}
339
340#[derive(Debug, Default)]
346pub struct TickResult {
347 pub upgrades: u32,
349 pub downgrades: u32,
351 pub evictions: u32,
353 pub bytes_freed: usize,
355 pub ops_used: u32,
357 pub candidates_found: u32,
359}
360
361fn to_tiering_tier(tier: Tier) -> crate::tiering::Tier {
367 match tier {
368 Tier::Tier0 => crate::tiering::Tier::Tier0,
369 Tier::Tier1 => crate::tiering::Tier::Tier1,
370 Tier::Tier2 => crate::tiering::Tier::Tier2,
371 Tier::Tier3 => crate::tiering::Tier::Tier3,
372 }
373}
374
375fn from_tiering_tier(tier: crate::tiering::Tier) -> Tier {
377 match tier {
378 crate::tiering::Tier::Tier0 => Tier::Tier0,
379 crate::tiering::Tier::Tier1 => Tier::Tier1,
380 crate::tiering::Tier::Tier2 => Tier::Tier2,
381 crate::tiering::Tier::Tier3 => Tier::Tier3,
382 }
383}
384
385fn to_tiering_meta(meta: &BlockMeta, now: u64) -> crate::tiering::BlockMeta {
387 crate::tiering::BlockMeta {
388 ema_rate: meta.ema_rate,
389 access_window: meta.window,
390 last_access: meta.last_access_at,
391 access_count: meta.access_count as u64,
392 current_tier: to_tiering_tier(meta.tier),
393 tier_since: now.saturating_sub(meta.tier_age as u64),
394 }
395}
396
397pub struct TieredStore {
407 block_bytes: usize,
410
411 index: HashMap<BlockKey, BlockMeta>,
413
414 tier1_data: HashMap<BlockKey, BlockData>,
416 tier2_data: HashMap<BlockKey, BlockData>,
418 tier3_data: HashMap<BlockKey, BlockData>,
420
421 tier1_keys: Vec<BlockKey>,
423 tier2_keys: Vec<BlockKey>,
424 tier3_keys: Vec<BlockKey>,
425
426 witness_log: crate::metrics::WitnessLog,
428
429 coherence: Option<crate::coherence::CoherenceCheck>,
431 epoch_tracker: crate::coherence::EpochTracker,
433 metrics_series: crate::metrics::MetricsSeries,
435}
436
437const EMA_ALPHA: f32 = 0.1;
439
440impl TieredStore {
441 pub fn new(block_bytes: usize) -> Self {
443 Self {
444 block_bytes,
445 index: HashMap::new(),
446 tier1_data: HashMap::new(),
447 tier2_data: HashMap::new(),
448 tier3_data: HashMap::new(),
449 tier1_keys: Vec::new(),
450 tier2_keys: Vec::new(),
451 tier3_keys: Vec::new(),
452 witness_log: crate::metrics::WitnessLog::new(10_000),
453 coherence: None,
454 epoch_tracker: crate::coherence::EpochTracker::new(),
455 metrics_series: crate::metrics::MetricsSeries::new(256),
456 }
457 }
458
459 #[inline]
461 pub fn block_bytes(&self) -> usize {
462 self.block_bytes
463 }
464
465 pub fn witness_log(&self) -> &crate::metrics::WitnessLog {
467 &self.witness_log
468 }
469
470 pub fn witness_log_mut(&mut self) -> &mut crate::metrics::WitnessLog {
472 &mut self.witness_log
473 }
474
475 pub fn enable_coherence(&mut self, check: crate::coherence::CoherenceCheck) {
481 self.coherence = Some(check);
482 }
483
484 pub fn disable_coherence(&mut self) {
486 self.coherence = None;
487 }
488
489 pub fn epoch_tracker(&self) -> &crate::coherence::EpochTracker {
491 &self.epoch_tracker
492 }
493
494 pub fn epoch_tracker_mut(&mut self) -> &mut crate::coherence::EpochTracker {
496 &mut self.epoch_tracker
497 }
498
499 pub fn metrics_series(&self) -> &crate::metrics::MetricsSeries {
501 &self.metrics_series
502 }
503
504 pub fn metrics_series_mut(&mut self) -> &mut crate::metrics::MetricsSeries {
506 &mut self.metrics_series
507 }
508
509 pub fn coherence_check(
515 &mut self,
516 key: BlockKey,
517 original_data: &[f32],
518 now: u64,
519 ) -> Option<Result<crate::coherence::CoherenceResult, StoreError>> {
520 let check = self.coherence.clone()?;
521 Some(check.check_coherence(self, key, original_data, now))
522 }
523
524 pub fn metrics(&self) -> crate::metrics::StoreMetrics {
526 let mut m = crate::metrics::StoreMetrics::new();
527 m.total_blocks = self.index.len() as u64;
528 m.tier0_blocks = self.index.values().filter(|b| b.tier == Tier::Tier0).count() as u64;
529 m.tier1_blocks = self.tier1_keys.len() as u64;
530 m.tier2_blocks = self.tier2_keys.len() as u64;
531 m.tier3_blocks = self.tier3_keys.len() as u64;
532 m.tier1_bytes = self.tier1_data.values().map(|d| d.packed.len() as u64).sum();
533 m.tier2_bytes = self.tier2_data.values().map(|d| d.packed.len() as u64).sum();
534 m.tier3_bytes = self.tier3_data.values().map(|d| d.packed.len() as u64).sum();
535 m.total_evictions = self.witness_log.count_evictions() as u64;
536 m.tier_flips_last_minute = self.witness_log.tier_flip_rate(60, self.index.len() as u64);
537 m
538 }
539
540 pub fn put(
548 &mut self,
549 key: BlockKey,
550 data: &[f32],
551 tier: Tier,
552 now: u64,
553 ) -> Result<(), StoreError> {
554 if tier == Tier::Tier0 {
555 return Err(StoreError::InvalidBlock);
556 }
557
558 let bits = bits_for_tier(tier);
559 let (packed, scale) = quantize_block(data, bits);
560 let checksum = block_checksum(&packed, scale);
561
562 if let Some(old_meta) = self.index.get(&key) {
564 let old_tier = old_meta.tier;
565 self.remove_data(old_tier, key);
566 self.remove_from_bucket(old_tier, key);
567 }
568
569 let byte_count = packed.len() as u32;
570 let block = BlockData {
571 element_count: data.len() as u32,
572 packed,
573 };
574
575 match tier {
576 Tier::Tier1 => { self.tier1_data.insert(key, block); }
577 Tier::Tier2 => { self.tier2_data.insert(key, block); }
578 Tier::Tier3 => { self.tier3_data.insert(key, block); }
579 Tier::Tier0 => unreachable!(),
580 }
581 self.add_to_bucket(tier, key);
582
583 let meta = BlockMeta {
584 key,
585 dtype: DType::F32,
586 tier,
587 bits,
588 scale,
589 zero_point: 0,
590 created_at: now,
591 last_access_at: now,
592 access_count: 1,
593 ema_rate: 0.0,
594 window: 1,
595 checksum,
596 reconstruct: ReconstructPolicy::None,
597 tier_age: 0,
598 lineage_parent: None,
599 block_bytes: byte_count,
600 };
601 self.index.insert(key, meta);
602
603 self.witness_log.record(now, crate::metrics::WitnessEvent::Access {
605 key,
606 score: 0.0,
607 tier,
608 });
609
610 self.epoch_tracker.record_write(key);
612
613 Ok(())
614 }
615
616 pub fn get(&mut self, key: BlockKey, out: &mut [f32], now: u64) -> Result<usize, StoreError> {
630 let meta = self.index.get(&key).ok_or(StoreError::BlockNotFound)?;
631
632 if meta.tier == Tier::Tier0 {
633 return Err(StoreError::TensorEvicted);
634 }
635
636 let tier = meta.tier;
637 let scale = meta.scale;
638 let bits = meta.bits;
639 let checksum = meta.checksum;
640
641 let block = self
642 .data_map(tier)
643 .and_then(|m| m.get(&key))
644 .ok_or(StoreError::BlockNotFound)?;
645
646 let actual_crc = block_checksum(&block.packed, scale);
648 if actual_crc != checksum {
649 return Err(StoreError::ChecksumMismatch);
650 }
651
652 let n = dequantize_block(
653 &block.packed,
654 scale,
655 bits,
656 block.element_count as usize,
657 out,
658 );
659
660 self.touch(key, now);
662
663 self.witness_log.record(now, crate::metrics::WitnessEvent::Access {
665 key,
666 score: 0.0, tier,
668 });
669
670 Ok(n)
671 }
672
673 pub fn touch(&mut self, key: BlockKey, now: u64) {
679 if let Some(meta) = self.index.get_mut(&key) {
680 let delta = now.saturating_sub(meta.last_access_at);
681
682 if delta >= 64 {
684 meta.window = 1;
685 } else if delta > 0 {
686 meta.window = (meta.window << delta) | 1;
687 }
688 if delta > 0 {
692 let instant_rate = 1.0 / delta as f32;
693 meta.ema_rate = EMA_ALPHA * instant_rate + (1.0 - EMA_ALPHA) * meta.ema_rate;
694 }
695
696 meta.last_access_at = now;
697 meta.access_count = meta.access_count.saturating_add(1);
698 }
699 }
700
701 pub fn meta(&self, key: BlockKey) -> Option<&BlockMeta> {
703 self.index.get(&key)
704 }
705
706 pub fn block_count(&self) -> usize {
708 self.index.len()
709 }
710
711 pub fn tier_count(&self, tier: Tier) -> usize {
713 match tier {
714 Tier::Tier0 => self
715 .index
716 .values()
717 .filter(|m| m.tier == Tier::Tier0)
718 .count(),
719 Tier::Tier1 => self.tier1_keys.len(),
720 Tier::Tier2 => self.tier2_keys.len(),
721 Tier::Tier3 => self.tier3_keys.len(),
722 }
723 }
724
725 pub fn total_bytes(&self) -> usize {
727 let sum = |map: &HashMap<BlockKey, BlockData>| -> usize {
728 map.values().map(|b| b.packed.len()).sum()
729 };
730 sum(&self.tier1_data) + sum(&self.tier2_data) + sum(&self.tier3_data)
731 }
732
733 pub fn blocks_in_tier(&self, tier: Tier) -> &[BlockKey] {
737 match tier {
738 Tier::Tier0 => &[],
739 Tier::Tier1 => &self.tier1_keys,
740 Tier::Tier2 => &self.tier2_keys,
741 Tier::Tier3 => &self.tier3_keys,
742 }
743 }
744
745 pub fn evict(
753 &mut self,
754 key: BlockKey,
755 policy: ReconstructPolicy,
756 ) -> Result<(), StoreError> {
757 let meta = self.index.get_mut(&key).ok_or(StoreError::BlockNotFound)?;
758 let old_tier = meta.tier;
759
760 if old_tier == Tier::Tier0 {
761 meta.reconstruct = policy;
763 return Ok(());
764 }
765
766 let bytes_freed = meta.block_bytes as usize;
767 let evict_ts = meta.last_access_at;
768
769 meta.tier = Tier::Tier0;
772 meta.reconstruct = policy;
773 meta.tier_age = 0;
774 meta.block_bytes = 0;
775 meta.bits = 0;
776
777 self.remove_data(old_tier, key);
779 self.remove_from_bucket(old_tier, key);
780
781 self.witness_log.record(evict_ts, crate::metrics::WitnessEvent::Eviction {
783 key,
784 score: 0.0,
785 bytes_freed,
786 });
787
788 Ok(())
789 }
790
791 fn data_map(&self, tier: Tier) -> Option<&HashMap<BlockKey, BlockData>> {
795 match tier {
796 Tier::Tier0 => None,
797 Tier::Tier1 => Some(&self.tier1_data),
798 Tier::Tier2 => Some(&self.tier2_data),
799 Tier::Tier3 => Some(&self.tier3_data),
800 }
801 }
802
803 fn remove_data(&mut self, tier: Tier, key: BlockKey) {
805 match tier {
806 Tier::Tier1 => { self.tier1_data.remove(&key); }
807 Tier::Tier2 => { self.tier2_data.remove(&key); }
808 Tier::Tier3 => { self.tier3_data.remove(&key); }
809 Tier::Tier0 => {}
810 }
811 }
812
813 fn remove_from_bucket(&mut self, tier: Tier, key: BlockKey) {
815 let bucket = match tier {
816 Tier::Tier1 => &mut self.tier1_keys,
817 Tier::Tier2 => &mut self.tier2_keys,
818 Tier::Tier3 => &mut self.tier3_keys,
819 Tier::Tier0 => return,
820 };
821 if let Some(pos) = bucket.iter().position(|k| *k == key) {
822 bucket.swap_remove(pos);
823 }
824 }
825
826 fn add_to_bucket(&mut self, tier: Tier, key: BlockKey) {
828 match tier {
829 Tier::Tier1 => self.tier1_keys.push(key),
830 Tier::Tier2 => self.tier2_keys.push(key),
831 Tier::Tier3 => self.tier3_keys.push(key),
832 Tier::Tier0 => {}
833 }
834 }
835
836 pub fn tick(
844 &mut self,
845 config: &crate::tiering::TierConfig,
846 now: u64,
847 budget_bytes: usize,
848 budget_ops: u32,
849 ) -> TickResult {
850 let mut result = TickResult::default();
851
852 let store_keys: Vec<BlockKey> = self.index.keys().copied().collect();
855 if store_keys.is_empty() {
856 return result;
857 }
858
859 let tiering_blocks: Vec<(crate::tiering::BlockKey, crate::tiering::BlockMeta)> =
860 store_keys
861 .iter()
862 .enumerate()
863 .map(|(idx, key)| {
864 let meta = &self.index[key];
865 (
866 crate::tiering::BlockKey(idx as u64),
867 to_tiering_meta(meta, now),
868 )
869 })
870 .collect();
871
872 let blocks_ref: Vec<(crate::tiering::BlockKey, &crate::tiering::BlockMeta)> =
873 tiering_blocks.iter().map(|(k, m)| (*k, m)).collect();
874
875 let candidates = crate::tiering::select_candidates(config, now, &blocks_ref);
878 result.candidates_found = candidates.len() as u32;
879
880 let mut remaining_bytes = budget_bytes;
882 let mut remaining_ops = budget_ops;
883 let mut migrated = std::collections::HashSet::new();
884
885 for candidate in &candidates {
886 if remaining_ops == 0 {
887 break;
888 }
889
890 let store_key = store_keys[candidate.key.0 as usize];
891 let target_tier = from_tiering_tier(candidate.target_tier);
892 let current_tier = from_tiering_tier(candidate.current_tier);
893
894 let old_bytes = self
895 .index
896 .get(&store_key)
897 .map(|m| m.block_bytes as usize)
898 .unwrap_or(0);
899
900 if old_bytes > remaining_bytes {
902 continue;
903 }
904
905 if target_tier == Tier::Tier0 {
906 if self.evict(store_key, ReconstructPolicy::None).is_ok() {
908 result.evictions += 1;
909 result.bytes_freed += old_bytes;
910 remaining_ops -= 1;
911 result.ops_used += 1;
912 remaining_bytes = remaining_bytes.saturating_sub(old_bytes);
913 migrated.insert(store_key);
914 }
915 } else {
916 let warm_bytes: usize =
918 self.tier2_data.values().map(|b| b.packed.len()).sum();
919 let target_bits = crate::tiering::bits_for_tier(
920 config,
921 to_tiering_tier(target_tier),
922 warm_bytes,
923 );
924
925 let old_tier_u8 = current_tier as u8;
926 let new_tier_u8 = target_tier as u8;
927
928 if self.migrate_block(store_key, target_tier, target_bits).is_ok() {
929 let new_bytes = self
930 .index
931 .get(&store_key)
932 .map(|m| m.block_bytes as usize)
933 .unwrap_or(0);
934
935 if new_tier_u8 < old_tier_u8 {
936 result.upgrades += 1;
938 } else {
939 result.downgrades += 1;
941 result.bytes_freed += old_bytes.saturating_sub(new_bytes);
942 }
943
944 let reason = if new_tier_u8 < old_tier_u8 {
946 crate::metrics::TierChangeReason::ScoreUpgrade
947 } else {
948 crate::metrics::TierChangeReason::ScoreDowngrade
949 };
950 self.witness_log.record(
951 now,
952 crate::metrics::WitnessEvent::TierChange {
953 key: store_key,
954 from_tier: current_tier,
955 to_tier: target_tier,
956 score: candidate.score,
957 reason,
958 },
959 );
960
961 remaining_ops -= 1;
962 result.ops_used += 1;
963 remaining_bytes = remaining_bytes.saturating_sub(old_bytes);
964 migrated.insert(store_key);
965 }
966 }
967 }
968
969 for key in &store_keys {
971 if migrated.contains(key) {
972 continue;
973 }
974 if let Some(meta) = self.index.get_mut(key) {
975 meta.tier_age = meta.tier_age.saturating_add(1);
976 let mut tm = crate::tiering::BlockMeta {
978 ema_rate: meta.ema_rate,
979 access_window: meta.window,
980 last_access: meta.last_access_at,
981 access_count: meta.access_count as u64,
982 current_tier: to_tiering_tier(meta.tier),
983 tier_since: now.saturating_sub(meta.tier_age as u64),
984 };
985 crate::tiering::tick_decay(config, &mut tm);
986 meta.ema_rate = tm.ema_rate;
987 meta.window = tm.access_window;
988 }
989 }
990
991 self.witness_log.record(
993 now,
994 crate::metrics::WitnessEvent::Maintenance {
995 upgrades: result.upgrades,
996 downgrades: result.downgrades,
997 evictions: result.evictions,
998 bytes_freed: result.bytes_freed,
999 budget_remaining_bytes: remaining_bytes.min(u32::MAX as usize) as u32,
1000 budget_remaining_ops: remaining_ops,
1001 },
1002 );
1003
1004 let snapshot_metrics = self.metrics();
1006 self.metrics_series.record(now, snapshot_metrics);
1007
1008 result
1009 }
1010
1011 fn migrate_block(
1017 &mut self,
1018 key: BlockKey,
1019 target_tier: Tier,
1020 target_bits: u8,
1021 ) -> Result<(), StoreError> {
1022 let meta = self.index.get(&key).ok_or(StoreError::BlockNotFound)?;
1024 let old_tier = meta.tier;
1025 let old_bits = meta.bits;
1026 let old_scale = meta.scale;
1027
1028 if old_tier == Tier::Tier0 {
1029 return Err(StoreError::TensorEvicted);
1030 }
1031 if target_tier == Tier::Tier0 {
1032 return Err(StoreError::InvalidBlock);
1033 }
1034
1035 let (element_count, f32_data) = {
1039 let block = self
1040 .data_map(old_tier)
1041 .and_then(|m| m.get(&key))
1042 .ok_or(StoreError::BlockNotFound)?;
1043 let ec = block.element_count;
1044 let mut data = vec![0.0f32; ec as usize];
1045 dequantize_block(&block.packed, old_scale, old_bits, ec as usize, &mut data);
1046 (ec, data)
1047 };
1048
1049 let (packed, scale) = quantize_block(&f32_data, target_bits);
1051 let checksum = block_checksum(&packed, scale);
1052 let byte_count = packed.len() as u32;
1053 let new_block = BlockData {
1054 element_count,
1055 packed,
1056 };
1057
1058 self.remove_data(old_tier, key);
1060 self.remove_from_bucket(old_tier, key);
1061
1062 match target_tier {
1064 Tier::Tier1 => { self.tier1_data.insert(key, new_block); }
1065 Tier::Tier2 => { self.tier2_data.insert(key, new_block); }
1066 Tier::Tier3 => { self.tier3_data.insert(key, new_block); }
1067 Tier::Tier0 => unreachable!(),
1068 }
1069 self.add_to_bucket(target_tier, key);
1070
1071 let meta = self.index.get_mut(&key).unwrap();
1073 meta.tier = target_tier;
1074 meta.bits = target_bits;
1075 meta.scale = scale;
1076 meta.checksum = checksum;
1077 meta.tier_age = 0;
1078 meta.block_bytes = byte_count;
1079
1080 Ok(())
1081 }
1082
1083 pub fn score_block(
1088 &self,
1089 key: BlockKey,
1090 config: &crate::tiering::TierConfig,
1091 now: u64,
1092 ) -> Option<f32> {
1093 let meta = self.index.get(&key)?;
1094 let tm = to_tiering_meta(meta, now);
1095 Some(crate::tiering::compute_score(config, now, &tm))
1096 }
1097
1098 pub fn touch_block(
1104 &mut self,
1105 key: BlockKey,
1106 config: &crate::tiering::TierConfig,
1107 now: u64,
1108 ) {
1109 if let Some(meta) = self.index.get_mut(&key) {
1110 let mut tm = crate::tiering::BlockMeta {
1111 ema_rate: meta.ema_rate,
1112 access_window: meta.window,
1113 last_access: meta.last_access_at,
1114 access_count: meta.access_count as u64,
1115 current_tier: to_tiering_tier(meta.tier),
1116 tier_since: now.saturating_sub(meta.tier_age as u64),
1117 };
1118 crate::tiering::touch(config, now, &mut tm);
1119 meta.ema_rate = tm.ema_rate;
1120 meta.window = tm.access_window;
1121 meta.last_access_at = tm.last_access;
1122 meta.access_count = tm.access_count.min(u32::MAX as u64) as u32;
1123 }
1124 }
1125}
1126
1127impl BlockIO for TieredStore {
1132 fn read_block(&self, tier: Tier, key: BlockKey, dst: &mut [u8]) -> Result<usize, StoreError> {
1133 let map = self.data_map(tier).ok_or(StoreError::BlockNotFound)?;
1134 let block = map.get(&key).ok_or(StoreError::BlockNotFound)?;
1135 let n = block.packed.len().min(dst.len());
1136 dst[..n].copy_from_slice(&block.packed[..n]);
1137 Ok(n)
1138 }
1139
1140 fn write_block(&mut self, tier: Tier, key: BlockKey, src: &[u8]) -> Result<(), StoreError> {
1141 if tier == Tier::Tier0 {
1142 return Err(StoreError::InvalidBlock);
1143 }
1144 let block = BlockData {
1145 element_count: 0, packed: src.to_vec(),
1147 };
1148 match tier {
1149 Tier::Tier1 => { self.tier1_data.insert(key, block); }
1150 Tier::Tier2 => { self.tier2_data.insert(key, block); }
1151 Tier::Tier3 => { self.tier3_data.insert(key, block); }
1152 Tier::Tier0 => unreachable!(),
1153 }
1154 Ok(())
1155 }
1156
1157 fn delete_block(&mut self, tier: Tier, key: BlockKey) -> Result<(), StoreError> {
1158 let removed = match tier {
1159 Tier::Tier1 => self.tier1_data.remove(&key).is_some(),
1160 Tier::Tier2 => self.tier2_data.remove(&key).is_some(),
1161 Tier::Tier3 => self.tier3_data.remove(&key).is_some(),
1162 Tier::Tier0 => false,
1163 };
1164 if removed {
1165 Ok(())
1166 } else {
1167 Err(StoreError::BlockNotFound)
1168 }
1169 }
1170}
1171
1172impl MetaLog for TieredStore {
1173 fn append(&mut self, rec: &BlockMeta) -> Result<(), StoreError> {
1174 self.index.insert(rec.key, rec.clone());
1175 Ok(())
1176 }
1177
1178 fn get(&self, key: BlockKey) -> Option<&BlockMeta> {
1179 self.index.get(&key)
1180 }
1181
1182 fn iter(&self) -> Box<dyn Iterator<Item = &BlockMeta> + '_> {
1183 Box::new(self.index.values())
1184 }
1185}
1186
1187#[cfg(test)]
1192mod tests {
1193 use super::*;
1194 use std::collections::hash_map::DefaultHasher;
1195 use std::hash::{Hash, Hasher};
1196
1197 fn make_key(tid: u128, idx: u32) -> BlockKey {
1198 BlockKey {
1199 tensor_id: tid,
1200 block_index: idx,
1201 }
1202 }
1203
1204 #[test]
1207 fn test_crc32_known_vector() {
1208 let data = b"123456789";
1210 assert_eq!(crc32(data), 0xCBF4_3926);
1211 }
1212
1213 #[test]
1214 fn test_crc32_empty() {
1215 assert_eq!(crc32(&[]), 0x0000_0000);
1216 }
1217
1218 #[test]
1219 fn test_crc32_single_byte() {
1220 assert_eq!(crc32(&[0x00]), 0xD202_EF8D);
1222 }
1223
1224 #[test]
1227 fn test_block_key_equality() {
1228 let a = make_key(1, 0);
1229 let b = make_key(1, 0);
1230 let c = make_key(1, 1);
1231 assert_eq!(a, b);
1232 assert_ne!(a, c);
1233 }
1234
1235 #[test]
1236 fn test_block_key_hash_differs() {
1237 fn hash_of(k: &BlockKey) -> u64 {
1238 let mut h = DefaultHasher::new();
1239 k.hash(&mut h);
1240 h.finish()
1241 }
1242 let a = make_key(1, 0);
1243 let b = make_key(2, 0);
1244 let c = make_key(1, 1);
1245 assert_ne!(hash_of(&a), hash_of(&b));
1247 assert_ne!(hash_of(&a), hash_of(&c));
1248 }
1249
1250 #[test]
1251 fn test_block_key_hash_stable() {
1252 fn hash_of(k: &BlockKey) -> u64 {
1253 let mut h = DefaultHasher::new();
1254 k.hash(&mut h);
1255 h.finish()
1256 }
1257 let a = make_key(42, 7);
1258 let b = make_key(42, 7);
1259 assert_eq!(hash_of(&a), hash_of(&b));
1260 }
1261
1262 #[test]
1265 fn test_qmax_values() {
1266 assert_eq!(qmax(8), 127);
1267 assert_eq!(qmax(7), 63);
1268 assert_eq!(qmax(5), 15);
1269 assert_eq!(qmax(3), 3);
1270 assert_eq!(qmax(1), 0);
1271 assert_eq!(qmax(0), 0);
1272 assert_eq!(qmax(9), 0);
1273 }
1274
1275 #[test]
1278 fn test_quantize_roundtrip_8bit() {
1279 let data: Vec<f32> = (0..128).map(|i| (i as f32 - 64.0) * 0.1).collect();
1280 let (packed, scale) = quantize_block(&data, 8);
1281 let mut out = vec![0.0f32; 128];
1282 let n = dequantize_block(&packed, scale, 8, 128, &mut out);
1283 assert_eq!(n, 128);
1284 for (i, (&orig, &dec)) in data.iter().zip(out.iter()).enumerate() {
1285 let err = (orig - dec).abs();
1286 let tol = if orig.abs() > 0.01 { orig.abs() * 0.02 } else { 0.1 };
1287 assert!(err < tol, "i={i} orig={orig} dec={dec} err={err}");
1288 }
1289 }
1290
1291 #[test]
1292 fn test_quantize_roundtrip_3bit() {
1293 let data: Vec<f32> = (0..64).map(|i| (i as f32 - 32.0) * 0.5).collect();
1294 let (packed, scale) = quantize_block(&data, 3);
1295 let mut out = vec![0.0f32; 64];
1296 let n = dequantize_block(&packed, scale, 3, 64, &mut out);
1297 assert_eq!(n, 64);
1298 let max_val = data.iter().map(|v| v.abs()).fold(0.0f32, f32::max);
1299 for (&orig, &dec) in data.iter().zip(out.iter()) {
1300 let err = (orig - dec).abs();
1301 assert!(err < max_val * 0.35, "orig={orig} dec={dec} err={err}");
1302 }
1303 }
1304
1305 #[test]
1306 fn test_quantize_zeros() {
1307 let data = vec![0.0f32; 64];
1308 let (packed, scale) = quantize_block(&data, 8);
1309 assert_eq!(scale, 0.0);
1310 let mut out = vec![1.0f32; 64];
1311 let n = dequantize_block(&packed, scale, 8, 64, &mut out);
1312 assert_eq!(n, 64);
1313 for &v in &out {
1314 assert_eq!(v, 0.0);
1315 }
1316 }
1317
1318 #[test]
1321 fn test_store_put_get_roundtrip() {
1322 let mut store = TieredStore::new(4096);
1323 let key = make_key(1, 0);
1324 let data: Vec<f32> = (0..64).map(|i| i as f32 * 0.25).collect();
1325
1326 store.put(key, &data, Tier::Tier1, 0).unwrap();
1327
1328 let mut out = vec![0.0f32; 64];
1329 let n = TieredStore::get(&mut store, key, &mut out, 1).unwrap();
1330 assert_eq!(n, 64);
1331
1332 for (i, (&orig, &dec)) in data.iter().zip(out.iter()).enumerate() {
1333 let err = (orig - dec).abs();
1334 let tol = if orig.abs() > 0.01 { orig.abs() * 0.02 } else { 0.15 };
1335 assert!(err < tol, "i={i} orig={orig} dec={dec} err={err}");
1336 }
1337 }
1338
1339 #[test]
1340 fn test_store_put_tier3_roundtrip() {
1341 let mut store = TieredStore::new(4096);
1342 let key = make_key(10, 5);
1343 let data: Vec<f32> = (0..32).map(|i| (i as f32 - 16.0) * 0.5).collect();
1344
1345 store.put(key, &data, Tier::Tier3, 100).unwrap();
1346
1347 let meta = store.meta(key).unwrap();
1348 assert_eq!(meta.tier, Tier::Tier3);
1349 assert_eq!(meta.bits, 3);
1350 assert_eq!(meta.created_at, 100);
1351
1352 let mut out = vec![0.0f32; 32];
1353 let n = TieredStore::get(&mut store, key, &mut out, 101).unwrap();
1354 assert_eq!(n, 32);
1355
1356 let max_val = data.iter().map(|v| v.abs()).fold(0.0f32, f32::max);
1357 for (&orig, &dec) in data.iter().zip(out.iter()) {
1358 let err = (orig - dec).abs();
1359 assert!(err < max_val * 0.35, "orig={orig} dec={dec} err={err}");
1360 }
1361 }
1362
1363 #[test]
1364 fn test_store_get_not_found() {
1365 let mut store = TieredStore::new(4096);
1366 let key = make_key(99, 0);
1367 let mut out = vec![0.0f32; 8];
1368 assert_eq!(TieredStore::get(&mut store, key, &mut out, 0), Err(StoreError::BlockNotFound));
1369 }
1370
1371 #[test]
1372 fn test_store_put_tier0_rejected() {
1373 let mut store = TieredStore::new(4096);
1374 let key = make_key(1, 0);
1375 let data = vec![1.0f32; 8];
1376 assert_eq!(
1377 store.put(key, &data, Tier::Tier0, 0),
1378 Err(StoreError::InvalidBlock)
1379 );
1380 }
1381
1382 #[test]
1385 fn test_eviction() {
1386 let mut store = TieredStore::new(4096);
1387 let key = make_key(1, 0);
1388 let data = vec![1.0f32; 64];
1389
1390 store.put(key, &data, Tier::Tier1, 0).unwrap();
1391 assert_eq!(store.tier_count(Tier::Tier1), 1);
1392 assert!(store.total_bytes() > 0);
1393
1394 store.evict(key, ReconstructPolicy::Delta).unwrap();
1395
1396 let meta = store.meta(key).unwrap();
1397 assert_eq!(meta.tier, Tier::Tier0);
1398 assert_eq!(meta.reconstruct, ReconstructPolicy::Delta);
1399 assert_eq!(meta.block_bytes, 0);
1400 assert_eq!(meta.bits, 0);
1401 assert_eq!(meta.tier_age, 0);
1402
1403 let mut out = vec![0.0f32; 64];
1405 assert_eq!(TieredStore::get(&mut store, key, &mut out, 1), Err(StoreError::TensorEvicted));
1406
1407 assert_eq!(store.tier_count(Tier::Tier1), 0);
1409 assert_eq!(store.tier_count(Tier::Tier0), 1);
1410
1411 assert_eq!(store.block_count(), 1);
1413 }
1414
1415 #[test]
1416 fn test_eviction_not_found() {
1417 let mut store = TieredStore::new(4096);
1418 let key = make_key(1, 0);
1419 assert_eq!(
1420 store.evict(key, ReconstructPolicy::None),
1421 Err(StoreError::BlockNotFound),
1422 );
1423 }
1424
1425 #[test]
1426 fn test_eviction_idempotent() {
1427 let mut store = TieredStore::new(4096);
1428 let key = make_key(1, 0);
1429 store.put(key, &[1.0; 16], Tier::Tier2, 0).unwrap();
1430
1431 store.evict(key, ReconstructPolicy::None).unwrap();
1432 store.evict(key, ReconstructPolicy::Factor).unwrap();
1434
1435 let meta = store.meta(key).unwrap();
1436 assert_eq!(meta.reconstruct, ReconstructPolicy::Factor);
1437 }
1438
1439 #[test]
1442 fn test_tier_counts() {
1443 let mut store = TieredStore::new(4096);
1444 let data = vec![1.0f32; 16];
1445
1446 store.put(make_key(1, 0), &data, Tier::Tier1, 0).unwrap();
1447 store.put(make_key(2, 0), &data, Tier::Tier1, 0).unwrap();
1448 store.put(make_key(3, 0), &data, Tier::Tier2, 0).unwrap();
1449 store.put(make_key(4, 0), &data, Tier::Tier3, 0).unwrap();
1450 store.put(make_key(5, 0), &data, Tier::Tier3, 0).unwrap();
1451 store.put(make_key(6, 0), &data, Tier::Tier3, 0).unwrap();
1452
1453 assert_eq!(store.block_count(), 6);
1454 assert_eq!(store.tier_count(Tier::Tier0), 0);
1455 assert_eq!(store.tier_count(Tier::Tier1), 2);
1456 assert_eq!(store.tier_count(Tier::Tier2), 1);
1457 assert_eq!(store.tier_count(Tier::Tier3), 3);
1458
1459 assert_eq!(store.blocks_in_tier(Tier::Tier1).len(), 2);
1460 assert_eq!(store.blocks_in_tier(Tier::Tier0).len(), 0);
1461 }
1462
1463 #[test]
1466 fn test_total_bytes() {
1467 let mut store = TieredStore::new(4096);
1468 assert_eq!(store.total_bytes(), 0);
1469
1470 let data = vec![1.0f32; 64];
1471 store.put(make_key(1, 0), &data, Tier::Tier1, 0).unwrap();
1472 let bytes_after_one = store.total_bytes();
1473 assert!(bytes_after_one > 0);
1474
1475 store.put(make_key(2, 0), &data, Tier::Tier2, 0).unwrap();
1476 assert!(store.total_bytes() > bytes_after_one);
1477 }
1478
1479 #[test]
1480 fn test_total_bytes_decreases_on_evict() {
1481 let mut store = TieredStore::new(4096);
1482 let data = vec![1.0f32; 64];
1483 let key = make_key(1, 0);
1484
1485 store.put(key, &data, Tier::Tier1, 0).unwrap();
1486 let before = store.total_bytes();
1487
1488 store.evict(key, ReconstructPolicy::None).unwrap();
1489 assert_eq!(store.total_bytes(), before - before); }
1491
1492 #[test]
1495 fn test_touch_updates_stats() {
1496 let mut store = TieredStore::new(4096);
1497 let key = make_key(1, 0);
1498 store.put(key, &[1.0; 16], Tier::Tier1, 0).unwrap();
1499
1500 let meta = store.meta(key).unwrap();
1502 assert_eq!(meta.access_count, 1);
1503 assert_eq!(meta.last_access_at, 0);
1504 assert_eq!(meta.window, 1);
1505
1506 store.touch(key, 5);
1508 let meta = store.meta(key).unwrap();
1509 assert_eq!(meta.access_count, 2);
1510 assert_eq!(meta.last_access_at, 5);
1511 assert_eq!(meta.window, (1u64 << 5) | 1);
1513 assert!(meta.ema_rate > 0.0);
1514
1515 store.touch(key, 5);
1517 let meta = store.meta(key).unwrap();
1518 assert_eq!(meta.access_count, 3);
1519 assert_eq!(meta.window, (1u64 << 5) | 1);
1521 }
1522
1523 #[test]
1524 fn test_touch_window_overflow() {
1525 let mut store = TieredStore::new(4096);
1526 let key = make_key(1, 0);
1527 store.put(key, &[1.0; 16], Tier::Tier1, 0).unwrap();
1528
1529 store.touch(key, 100);
1531 let meta = store.meta(key).unwrap();
1532 assert_eq!(meta.window, 1);
1533 assert_eq!(meta.last_access_at, 100);
1534 }
1535
1536 #[test]
1537 fn test_touch_nonexistent_noop() {
1538 let mut store = TieredStore::new(4096);
1539 store.touch(make_key(42, 0), 10);
1541 }
1542
1543 #[test]
1546 fn test_put_overwrite() {
1547 let mut store = TieredStore::new(4096);
1548 let key = make_key(1, 0);
1549
1550 store.put(key, &[1.0; 16], Tier::Tier1, 0).unwrap();
1551 assert_eq!(store.tier_count(Tier::Tier1), 1);
1552
1553 store.put(key, &[2.0; 16], Tier::Tier3, 10).unwrap();
1555 assert_eq!(store.block_count(), 1);
1556 assert_eq!(store.tier_count(Tier::Tier1), 0);
1557 assert_eq!(store.tier_count(Tier::Tier3), 1);
1558
1559 let meta = store.meta(key).unwrap();
1560 assert_eq!(meta.tier, Tier::Tier3);
1561 assert_eq!(meta.created_at, 10);
1562 }
1563
1564 #[test]
1567 fn test_checksum_stored_correctly() {
1568 let mut store = TieredStore::new(4096);
1569 let key = make_key(1, 0);
1570 let data: Vec<f32> = (0..32).map(|i| i as f32).collect();
1571
1572 store.put(key, &data, Tier::Tier1, 0).unwrap();
1573
1574 let meta = store.meta(key).unwrap();
1575 assert_ne!(meta.checksum, 0);
1576
1577 let (packed, scale) = quantize_block(&data, 8);
1579 let expected = block_checksum(&packed, scale);
1580 assert_eq!(meta.checksum, expected);
1581 }
1582
1583 #[test]
1586 fn test_block_io_write_read() {
1587 let mut store = TieredStore::new(4096);
1588 let key = make_key(1, 0);
1589 let raw = vec![0xAA, 0xBB, 0xCC, 0xDD];
1590
1591 store.write_block(Tier::Tier1, key, &raw).unwrap();
1592
1593 let mut dst = vec![0u8; 8];
1594 let n = store.read_block(Tier::Tier1, key, &mut dst).unwrap();
1595 assert_eq!(n, 4);
1596 assert_eq!(&dst[..4], &raw);
1597 }
1598
1599 #[test]
1600 fn test_block_io_delete() {
1601 let mut store = TieredStore::new(4096);
1602 let key = make_key(1, 0);
1603 store.write_block(Tier::Tier2, key, &[1, 2, 3]).unwrap();
1604
1605 store.delete_block(Tier::Tier2, key).unwrap();
1606
1607 let mut dst = vec![0u8; 4];
1608 assert_eq!(
1609 store.read_block(Tier::Tier2, key, &mut dst),
1610 Err(StoreError::BlockNotFound),
1611 );
1612 }
1613
1614 #[test]
1615 fn test_block_io_write_tier0_rejected() {
1616 let mut store = TieredStore::new(4096);
1617 let key = make_key(1, 0);
1618 assert_eq!(
1619 store.write_block(Tier::Tier0, key, &[1]),
1620 Err(StoreError::InvalidBlock),
1621 );
1622 }
1623
1624 #[test]
1627 fn test_meta_log_append_get() {
1628 let mut store = TieredStore::new(4096);
1629 let key = make_key(1, 0);
1630 let meta = BlockMeta {
1631 key,
1632 dtype: DType::F32,
1633 tier: Tier::Tier1,
1634 bits: 8,
1635 scale: 0.5,
1636 zero_point: 0,
1637 created_at: 42,
1638 last_access_at: 42,
1639 access_count: 1,
1640 ema_rate: 0.0,
1641 window: 1,
1642 checksum: 0,
1643 reconstruct: ReconstructPolicy::None,
1644 tier_age: 0,
1645 lineage_parent: None,
1646 block_bytes: 64,
1647 };
1648
1649 MetaLog::append(&mut store, &meta).unwrap();
1650 let retrieved = MetaLog::get(&store, key).unwrap();
1651 assert_eq!(retrieved.key, key);
1652 assert_eq!(retrieved.created_at, 42);
1653 }
1654
1655 #[test]
1656 fn test_meta_log_iter() {
1657 let mut store = TieredStore::new(4096);
1658 let data = vec![1.0f32; 8];
1659
1660 store.put(make_key(1, 0), &data, Tier::Tier1, 0).unwrap();
1661 store.put(make_key(2, 0), &data, Tier::Tier2, 0).unwrap();
1662 store.put(make_key(3, 0), &data, Tier::Tier3, 0).unwrap();
1663
1664 let entries: Vec<_> = MetaLog::iter(&store).collect();
1665 assert_eq!(entries.len(), 3);
1666 }
1667
1668 #[test]
1671 fn test_bits_for_tier() {
1672 assert_eq!(bits_for_tier(Tier::Tier0), 0);
1673 assert_eq!(bits_for_tier(Tier::Tier1), 8);
1674 assert_eq!(bits_for_tier(Tier::Tier2), 7);
1675 assert_eq!(bits_for_tier(Tier::Tier3), 3);
1676 }
1677
1678 #[test]
1681 fn test_tier_repr() {
1682 assert_eq!(Tier::Tier0 as u8, 0);
1683 assert_eq!(Tier::Tier1 as u8, 1);
1684 assert_eq!(Tier::Tier2 as u8, 2);
1685 assert_eq!(Tier::Tier3 as u8, 3);
1686 }
1687
1688 #[test]
1689 fn test_dtype_repr() {
1690 assert_eq!(DType::F32 as u8, 0);
1691 assert_eq!(DType::F16 as u8, 1);
1692 assert_eq!(DType::BF16 as u8, 2);
1693 }
1694
1695 #[test]
1696 fn test_reconstruct_policy_repr() {
1697 assert_eq!(ReconstructPolicy::None as u8, 0);
1698 assert_eq!(ReconstructPolicy::Delta as u8, 1);
1699 assert_eq!(ReconstructPolicy::Factor as u8, 2);
1700 }
1701
1702 #[test]
1705 fn test_multi_block_workflow() {
1706 let mut store = TieredStore::new(4096);
1707
1708 for i in 0..10u32 {
1710 let key = make_key(1, i);
1711 let data: Vec<f32> = (0..32).map(|j| (i * 32 + j) as f32 * 0.1).collect();
1712 let tier = match i % 3 {
1713 0 => Tier::Tier1,
1714 1 => Tier::Tier2,
1715 _ => Tier::Tier3,
1716 };
1717 store.put(key, &data, tier, i as u64).unwrap();
1718 }
1719
1720 assert_eq!(store.block_count(), 10);
1721 assert_eq!(store.tier_count(Tier::Tier1), 4); assert_eq!(store.tier_count(Tier::Tier2), 3); assert_eq!(store.tier_count(Tier::Tier3), 3); store.touch(make_key(1, 0), 20);
1727 store.touch(make_key(1, 5), 25);
1728
1729 store.evict(make_key(1, 8), ReconstructPolicy::Delta).unwrap();
1731 assert_eq!(store.tier_count(Tier::Tier3), 2);
1732 assert_eq!(store.tier_count(Tier::Tier0), 1);
1733 assert_eq!(store.block_count(), 10); let mut out = vec![0.0f32; 32];
1737 let n = TieredStore::get(&mut store, make_key(1, 0), &mut out, 30).unwrap();
1738 assert_eq!(n, 32);
1739 }
1740
1741 #[test]
1744 fn test_tick_empty_store() {
1745 let mut store = TieredStore::new(4096);
1746 let config = crate::tiering::TierConfig::default();
1747 let result = store.tick(&config, 100, 1_000_000, 100);
1748 assert_eq!(result.upgrades, 0);
1749 assert_eq!(result.downgrades, 0);
1750 assert_eq!(result.evictions, 0);
1751 assert_eq!(result.bytes_freed, 0);
1752 assert_eq!(result.ops_used, 0);
1753 assert_eq!(result.candidates_found, 0);
1754 }
1755
1756 #[test]
1757 fn test_tick_migrates_cold_to_hot() {
1758 let mut store = TieredStore::new(4096);
1759 let key = make_key(1, 0);
1760 let data: Vec<f32> = (0..64).map(|i| i as f32 * 0.1).collect();
1761
1762 store.put(key, &data, Tier::Tier3, 0).unwrap();
1764 assert_eq!(store.tier_count(Tier::Tier3), 1);
1765
1766 if let Some(meta) = store.index.get_mut(&key) {
1769 meta.ema_rate = 1.0;
1770 meta.window = u64::MAX; meta.last_access_at = 100;
1772 meta.access_count = 100;
1773 meta.tier_age = 10; }
1775
1776 let config = crate::tiering::TierConfig::default();
1777 let result = store.tick(&config, 100, 1_000_000, 100);
1778
1779 assert!(result.upgrades > 0, "expected at least one upgrade, got {}", result.upgrades);
1780 assert_eq!(result.downgrades, 0);
1781 assert!(result.candidates_found > 0);
1782
1783 let meta = store.meta(key).unwrap();
1784 assert_eq!(meta.tier, Tier::Tier1, "block should be in Tier1 after upgrade");
1785 assert_eq!(meta.bits, 8, "Tier1 should use 8-bit quantization");
1786 assert_eq!(meta.tier_age, 0, "tier_age should reset after migration");
1787
1788 let mut out = vec![0.0f32; 64];
1790 let n = TieredStore::get(&mut store, key, &mut out, 101).unwrap();
1791 assert_eq!(n, 64);
1792 }
1793
1794 #[test]
1795 fn test_tick_respects_budget_ops() {
1796 let mut store = TieredStore::new(4096);
1797 let data: Vec<f32> = (0..64).map(|i| i as f32 * 0.1).collect();
1798
1799 for i in 0..5u32 {
1801 let key = make_key(i as u128 + 1, 0);
1802 store.put(key, &data, Tier::Tier3, 0).unwrap();
1803 if let Some(meta) = store.index.get_mut(&key) {
1804 meta.ema_rate = 1.0;
1805 meta.window = u64::MAX;
1806 meta.last_access_at = 100;
1807 meta.access_count = 100;
1808 meta.tier_age = 10;
1809 }
1810 }
1811
1812 let config = crate::tiering::TierConfig::default();
1813 let result = store.tick(&config, 100, 1_000_000, 2);
1815
1816 assert_eq!(result.ops_used, 2, "should use exactly 2 ops");
1817 assert_eq!(result.upgrades, 2, "should upgrade only 2 blocks");
1818 assert!(result.candidates_found >= 5, "should find all 5 candidates");
1819 }
1820
1821 #[test]
1822 fn test_touch_block_updates_ema_and_window() {
1823 let mut store = TieredStore::new(4096);
1824 let key = make_key(1, 0);
1825 store.put(key, &[1.0; 16], Tier::Tier1, 0).unwrap();
1826
1827 let config = crate::tiering::TierConfig::default();
1828
1829 let meta = store.meta(key).unwrap();
1831 assert_eq!(meta.ema_rate, 0.0);
1832
1833 store.touch_block(key, &config, 5);
1835 let meta = store.meta(key).unwrap();
1836
1837 assert!(
1840 (meta.ema_rate - config.alpha).abs() < 1e-6,
1841 "ema_rate={}, expected={}",
1842 meta.ema_rate,
1843 config.alpha,
1844 );
1845 assert_eq!(meta.last_access_at, 5);
1846 assert_ne!(meta.window & 1, 0, "bit 0 should be set");
1848 assert_eq!(meta.window, (1u64 << 5) | 1);
1851 }
1852
1853 #[test]
1854 fn test_score_block_none_for_missing() {
1855 let store = TieredStore::new(4096);
1856 let config = crate::tiering::TierConfig::default();
1857 let result = store.score_block(make_key(99, 0), &config, 100);
1858 assert_eq!(result, None);
1859 }
1860
1861 #[test]
1866 fn test_epoch_tracker_wired_into_put() {
1867 let mut store = TieredStore::new(4096);
1868 let key = BlockKey { tensor_id: 1, block_index: 0 };
1869 let data = vec![1.0f32; 64];
1870
1871 assert_eq!(store.epoch_tracker().check_epoch(key), None);
1872
1873 store.put(key, &data, Tier::Tier1, 0).unwrap();
1874 assert!(store.epoch_tracker().check_epoch(key).is_some());
1875
1876 let epoch1 = store.epoch_tracker().check_epoch(key).unwrap();
1877 store.put(key, &data, Tier::Tier1, 1).unwrap();
1878 let epoch2 = store.epoch_tracker().check_epoch(key).unwrap();
1879 assert!(epoch2 > epoch1, "epoch should increment on overwrite");
1880 }
1881
1882 #[test]
1883 fn test_coherence_disabled_by_default() {
1884 let mut store = TieredStore::new(4096);
1885 let key = BlockKey { tensor_id: 1, block_index: 0 };
1886 let data = vec![1.0f32; 64];
1887 store.put(key, &data, Tier::Tier1, 0).unwrap();
1888
1889 assert!(store.coherence_check(key, &data, 1).is_none());
1890 }
1891
1892 #[test]
1893 fn test_coherence_enabled_passes() {
1894 let mut store = TieredStore::new(4096);
1895 store.enable_coherence(crate::coherence::CoherenceCheck::default());
1896
1897 let key = BlockKey { tensor_id: 1, block_index: 0 };
1898 let data: Vec<f32> = (0..64).map(|i| (i as f32 + 1.0) * 0.25).collect();
1899 store.put(key, &data, Tier::Tier1, 0).unwrap();
1900
1901 let result = store.coherence_check(key, &data, 1).unwrap().unwrap();
1902 assert!(result.passed, "Tier1 coherence should pass; err={}", result.max_error);
1903 }
1904
1905 #[test]
1910 fn test_metrics_series_wired_into_tick() {
1911 use crate::tiering::TierConfig;
1912
1913 let mut store = TieredStore::new(4096);
1914 let config = TierConfig::default();
1915
1916 for i in 0..5u128 {
1918 let key = BlockKey { tensor_id: i, block_index: 0 };
1919 store.put(key, &vec![1.0f32; 64], Tier::Tier1, 0).unwrap();
1920 }
1921
1922 assert!(store.metrics_series().is_empty());
1923
1924 store.tick(&config, 100, 1_000_000, 100);
1926 assert_eq!(store.metrics_series().len(), 1);
1927
1928 store.tick(&config, 200, 1_000_000, 100);
1930 assert_eq!(store.metrics_series().len(), 2);
1931
1932 let (ts, m) = store.metrics_series().latest().unwrap();
1934 assert_eq!(*ts, 200);
1935 assert_eq!(m.total_blocks, 5);
1936 }
1937
1938 #[test]
1946 fn bench_batch_scoring_10k() {
1947 use std::time::Instant;
1948 use crate::tiering::{
1949 TierConfig, BlockMeta as TBlockMeta, Tier as TTier,
1950 compute_scores_batch, compute_score,
1951 };
1952
1953 let cfg = TierConfig::default();
1954 let metas: Vec<TBlockMeta> = (0..10_000).map(|i| {
1955 TBlockMeta {
1956 ema_rate: (i as f32) * 0.0001,
1957 access_window: 0x5555_5555_5555_5555,
1958 last_access: 50 + (i as u64 % 100),
1959 access_count: i as u64,
1960 current_tier: TTier::Tier1,
1961 tier_since: 0,
1962 }
1963 }).collect();
1964
1965 let iters = 1000;
1966
1967 let start = Instant::now();
1969 for _ in 0..iters {
1970 for m in &metas {
1971 std::hint::black_box(compute_score(&cfg, 100, m));
1972 }
1973 }
1974 let individual = start.elapsed();
1975
1976 let start = Instant::now();
1978 for _ in 0..iters {
1979 std::hint::black_box(compute_scores_batch(&cfg, 100, &metas));
1980 }
1981 let batch = start.elapsed();
1982
1983 eprintln!("Individual scoring 10k x {iters}: {:?} ({:.0} ns/block)",
1984 individual, individual.as_nanos() as f64 / (iters * 10_000) as f64);
1985 eprintln!("Batch scoring 10k x {iters}: {:?} ({:.0} ns/block)",
1986 batch, batch.as_nanos() as f64 / (iters * 10_000) as f64);
1987 }
1988
1989 #[test]
1990 fn bench_dequant_5bit_4096() {
1991 use std::time::Instant;
1992
1993 let data: Vec<f32> = (0..4096).map(|i| (i as f32 - 2048.0) * 0.01).collect();
1994 let (packed, scale) = quantize_block(&data, 5);
1995 let mut out = vec![0.0f32; 4096];
1996
1997 let iters = 10_000;
1998 let start = Instant::now();
1999 for _ in 0..iters {
2000 std::hint::black_box(dequantize_block(&packed, scale, 5, 4096, &mut out));
2001 }
2002 let elapsed = start.elapsed();
2003
2004 let total_bytes = 4096u64 * 4 * iters as u64;
2005 let gbs = total_bytes as f64 / elapsed.as_secs_f64() / 1e9;
2006 eprintln!("Dequant 5-bit 4096 x {iters}: {:?} ({:.2} GB/s output throughput)",
2007 elapsed, gbs);
2008 }
2009
2010 #[test]
2011 fn bench_dequant_7bit_4096() {
2012 use std::time::Instant;
2013
2014 let data: Vec<f32> = (0..4096).map(|i| (i as f32 - 2048.0) * 0.01).collect();
2015 let (packed, scale) = quantize_block(&data, 7);
2016 let mut out = vec![0.0f32; 4096];
2017
2018 let iters = 10_000;
2019 let start = Instant::now();
2020 for _ in 0..iters {
2021 std::hint::black_box(dequantize_block(&packed, scale, 7, 4096, &mut out));
2022 }
2023 let elapsed = start.elapsed();
2024
2025 let total_bytes = 4096u64 * 4 * iters as u64;
2026 let gbs = total_bytes as f64 / elapsed.as_secs_f64() / 1e9;
2027 eprintln!("Dequant 7-bit 4096 x {iters}: {:?} ({:.2} GB/s output throughput)",
2028 elapsed, gbs);
2029 }
2030
2031 #[test]
2032 fn bench_quant_5bit_4096() {
2033 use std::time::Instant;
2034
2035 let data: Vec<f32> = (0..4096).map(|i| (i as f32 - 2048.0) * 0.01).collect();
2036
2037 let iters = 10_000;
2038 let start = Instant::now();
2039 for _ in 0..iters {
2040 std::hint::black_box(quantize_block(&data, 5));
2041 }
2042 let elapsed = start.elapsed();
2043
2044 let total_bytes = 4096u64 * 4 * iters as u64;
2045 let gbs = total_bytes as f64 / elapsed.as_secs_f64() / 1e9;
2046 eprintln!("Quant 5-bit 4096 x {iters}: {:?} ({:.2} GB/s input throughput)",
2047 elapsed, gbs);
2048 }
2049
2050 #[test]
2051 fn bench_svd_adaptive_64x64() {
2052 use std::time::Instant;
2053 use crate::delta::FactorSet;
2054
2055 let (rows, cols) = (64, 64);
2056 let data: Vec<f32> = (0..rows * cols)
2057 .map(|i| (i as f32 * 0.37).sin() + (i as f32 * 0.73).cos())
2058 .collect();
2059
2060 let iters = 100;
2061 let start = Instant::now();
2062 for _ in 0..iters {
2063 std::hint::black_box(
2064 FactorSet::from_data_adaptive(&data, rows, cols, 16, 0.05)
2065 );
2066 }
2067 let elapsed = start.elapsed();
2068
2069 eprintln!("SVD adaptive 64x64 (max_rank=16, target=0.05) x {iters}: {:?} ({:.2} ms/iter)",
2070 elapsed, elapsed.as_secs_f64() * 1000.0 / iters as f64);
2071 }
2072
2073 #[test]
2074 fn bench_format_report() {
2075 use std::time::Instant;
2076 use crate::metrics::StoreMetrics;
2077
2078 let m = StoreMetrics {
2079 total_blocks: 10_000,
2080 tier0_blocks: 500,
2081 tier1_blocks: 4000,
2082 tier2_blocks: 3500,
2083 tier3_blocks: 2000,
2084 tier1_bytes: 4_000_000,
2085 tier2_bytes: 2_500_000,
2086 tier3_bytes: 750_000,
2087 total_reads: 1_000_000,
2088 total_writes: 500_000,
2089 total_evictions: 5000,
2090 total_upgrades: 12_000,
2091 total_downgrades: 8000,
2092 total_reconstructions: 200,
2093 total_checksum_failures: 0,
2094 total_compactions: 150,
2095 tier_flips_last_minute: 0.023,
2096 avg_score_tier1: 0.85,
2097 avg_score_tier2: 0.45,
2098 avg_score_tier3: 0.12,
2099 };
2100
2101 let iters = 10_000;
2102 let start = Instant::now();
2103 for _ in 0..iters {
2104 std::hint::black_box(m.format_report());
2105 }
2106 let elapsed = start.elapsed();
2107
2108 eprintln!("format_report x {iters}: {:?} ({:.0} ns/call)",
2109 elapsed, elapsed.as_nanos() as f64 / iters as f64);
2110 }
2111
2112 #[test]
2113 fn bench_format_json() {
2114 use std::time::Instant;
2115 use crate::metrics::StoreMetrics;
2116
2117 let m = StoreMetrics {
2118 total_blocks: 10_000,
2119 tier0_blocks: 500,
2120 tier1_blocks: 4000,
2121 tier2_blocks: 3500,
2122 tier3_blocks: 2000,
2123 tier1_bytes: 4_000_000,
2124 tier2_bytes: 2_500_000,
2125 tier3_bytes: 750_000,
2126 total_reads: 1_000_000,
2127 total_writes: 500_000,
2128 total_evictions: 5000,
2129 total_upgrades: 12_000,
2130 total_downgrades: 8000,
2131 total_reconstructions: 200,
2132 total_checksum_failures: 0,
2133 total_compactions: 150,
2134 tier_flips_last_minute: 0.023,
2135 avg_score_tier1: 0.85,
2136 avg_score_tier2: 0.45,
2137 avg_score_tier3: 0.12,
2138 };
2139
2140 let iters = 10_000;
2141 let start = Instant::now();
2142 for _ in 0..iters {
2143 std::hint::black_box(m.format_json());
2144 }
2145 let elapsed = start.elapsed();
2146
2147 eprintln!("format_json x {iters}: {:?} ({:.0} ns/call)",
2148 elapsed, elapsed.as_nanos() as f64 / iters as f64);
2149 }
2150
2151 #[test]
2152 fn bench_metrics_series_trend_100() {
2153 use std::time::Instant;
2154 use crate::metrics::{StoreMetrics, MetricsSeries};
2155
2156 let mut series = MetricsSeries::new(256);
2157 for i in 0..100u64 {
2158 series.record(i, StoreMetrics {
2159 total_blocks: 1000 + i,
2160 tier1_blocks: 400 + i % 50,
2161 tier2_blocks: 350,
2162 tier3_blocks: 250,
2163 tier1_bytes: 400_000 + i * 100,
2164 tier2_bytes: 250_000,
2165 tier3_bytes: 75_000,
2166 total_evictions: i * 3,
2167 ..Default::default()
2168 });
2169 }
2170
2171 let iters = 10_000;
2172 let start = Instant::now();
2173 for _ in 0..iters {
2174 std::hint::black_box(series.trend());
2175 }
2176 let elapsed = start.elapsed();
2177
2178 eprintln!("MetricsSeries trend (100 snapshots) x {iters}: {:?} ({:.0} ns/call)",
2179 elapsed, elapsed.as_nanos() as f64 / iters as f64);
2180 }
2181}