1use std::collections::HashMap;
35
36#[derive(Clone, Copy, PartialEq, Eq, Hash, Debug)]
45pub struct BlockKey {
46 pub tensor_id: u128,
47 pub block_index: u32,
48}
49
50#[derive(Clone, Copy, PartialEq, Eq, Debug, Hash)]
55#[repr(u8)]
56pub enum Tier {
57 Tier0 = 0,
59 Tier1 = 1,
61 Tier2 = 2,
63 Tier3 = 3,
65}
66
67#[derive(Clone, Copy, PartialEq, Eq, Debug)]
69#[repr(u8)]
70pub enum DType {
71 F32 = 0,
72 F16 = 1,
73 BF16 = 2,
74}
75
76#[derive(Clone, Copy, PartialEq, Eq, Debug)]
78#[repr(u8)]
79pub enum ReconstructPolicy {
80 None = 0,
82 Delta = 1,
84 Factor = 2,
86}
87
88#[derive(Clone, Debug)]
90pub struct BlockMeta {
91 pub key: BlockKey,
92 pub dtype: DType,
93 pub tier: Tier,
94 pub bits: u8,
96 pub scale: f32,
98 pub zero_point: i16,
100 pub created_at: u64,
102 pub last_access_at: u64,
104 pub access_count: u32,
106 pub ema_rate: f32,
108 pub window: u64,
110 pub checksum: u32,
112 pub reconstruct: ReconstructPolicy,
114 pub tier_age: u32,
116 pub lineage_parent: Option<u128>,
118 pub block_bytes: u32,
120}
121
122#[derive(Clone, Debug, PartialEq, Eq)]
124pub enum StoreError {
125 TensorEvicted,
127 BlockNotFound,
129 ChecksumMismatch,
131 IOError,
133 BudgetExhausted,
135 InvalidBlock,
137 DeltaChainTooLong,
139 ReconstructionFailed,
141 InvalidData,
143 ChainFull,
145}
146
147pub trait Clock {
153 fn now_ticks(&self) -> u64;
155}
156
157pub trait BlockIO {
159 fn read_block(&self, tier: Tier, key: BlockKey, dst: &mut [u8]) -> Result<usize, StoreError>;
162
163 fn write_block(&mut self, tier: Tier, key: BlockKey, src: &[u8]) -> Result<(), StoreError>;
165
166 fn delete_block(&mut self, tier: Tier, key: BlockKey) -> Result<(), StoreError>;
168}
169
170pub trait MetaLog {
172 fn append(&mut self, rec: &BlockMeta) -> Result<(), StoreError>;
174
175 fn get(&self, key: BlockKey) -> Option<&BlockMeta>;
177
178 fn iter(&self) -> Box<dyn Iterator<Item = &BlockMeta> + '_>;
180}
181
182pub fn crc32(data: &[u8]) -> u32 {
192 let mut crc: u32 = 0xFFFF_FFFF;
193 for &byte in data {
194 crc ^= byte as u32;
195 for _ in 0..8 {
196 if crc & 1 != 0 {
197 crc = (crc >> 1) ^ 0xEDB8_8320;
198 } else {
199 crc >>= 1;
200 }
201 }
202 }
203 !crc
204}
205
206fn bits_for_tier(tier: Tier) -> u8 {
212 match tier {
213 Tier::Tier0 => 0,
214 Tier::Tier1 => 8,
215 Tier::Tier2 => 7,
216 Tier::Tier3 => 3,
217 }
218}
219
220#[inline]
224fn qmax(bits: u8) -> i32 {
225 if bits == 0 || bits > 8 {
226 return 0;
227 }
228 (1i32 << (bits - 1)) - 1
229}
230
231struct BlockData {
233 element_count: u32,
235 packed: Vec<u8>,
237}
238
239fn quantize_block(data: &[f32], bits: u8) -> (Vec<u8>, f32) {
243 let qm = qmax(bits);
244 if qm == 0 || data.is_empty() {
245 return (Vec::new(), 0.0);
246 }
247 let qm_f = qm as f32;
248
249 let max_abs = data
251 .iter()
252 .filter(|v| v.is_finite())
253 .fold(0.0f32, |acc, v| acc.max(v.abs()));
254
255 let scale = if max_abs == 0.0 { 0.0 } else { max_abs / qm_f };
256 let inv_scale = if scale == 0.0 { 0.0 } else { 1.0 / scale };
257
258 let bits_u32 = bits as u32;
259 let needed = (data.len() * bits as usize).div_ceil(8);
260 let mut packed = Vec::with_capacity(needed);
261
262 let mut acc: u64 = 0;
263 let mut acc_bits: u32 = 0;
264
265 for &v in data {
266 let q = if v.is_finite() {
267 (v * inv_scale).round() as i32
268 } else {
269 0
270 }
271 .clamp(-qm, qm);
272
273 let u = (q + qm) as u32;
274 acc |= (u as u64) << acc_bits;
275 acc_bits += bits_u32;
276
277 while acc_bits >= 8 {
278 packed.push((acc & 0xFF) as u8);
279 acc >>= 8;
280 acc_bits -= 8;
281 }
282 }
283
284 if acc_bits > 0 {
285 packed.push((acc & 0xFF) as u8);
286 }
287
288 (packed, scale)
289}
290
291fn dequantize_block(packed: &[u8], scale: f32, bits: u8, count: usize, out: &mut [f32]) -> usize {
295 let qm = qmax(bits);
296 if qm == 0 || packed.is_empty() {
297 return 0;
298 }
299
300 let bits_u32 = bits as u32;
301 let mask = (1u64 << bits_u32) - 1;
302 let limit = count.min(out.len());
303
304 let mut acc: u64 = 0;
305 let mut acc_bits: u32 = 0;
306 let mut byte_idx: usize = 0;
307 let mut written: usize = 0;
308
309 while written < limit {
310 while acc_bits < bits_u32 && byte_idx < packed.len() {
311 acc |= (packed[byte_idx] as u64) << acc_bits;
312 acc_bits += 8;
313 byte_idx += 1;
314 }
315 if acc_bits < bits_u32 {
316 break;
317 }
318
319 let u = (acc & mask) as i32;
320 acc >>= bits_u32;
321 acc_bits -= bits_u32;
322
323 out[written] = (u - qm) as f32 * scale;
324 written += 1;
325 }
326
327 written
328}
329
330fn block_checksum(packed: &[u8], scale: f32) -> u32 {
332 let scale_bytes = scale.to_le_bytes();
333 let total = packed.len() + scale_bytes.len();
334 let mut buf = Vec::with_capacity(total);
335 buf.extend_from_slice(packed);
336 buf.extend_from_slice(&scale_bytes);
337 crc32(&buf)
338}
339
340#[derive(Debug, Default)]
346pub struct TickResult {
347 pub upgrades: u32,
349 pub downgrades: u32,
351 pub evictions: u32,
353 pub bytes_freed: usize,
355 pub ops_used: u32,
357 pub candidates_found: u32,
359}
360
361fn to_tiering_tier(tier: Tier) -> crate::tiering::Tier {
367 match tier {
368 Tier::Tier0 => crate::tiering::Tier::Tier0,
369 Tier::Tier1 => crate::tiering::Tier::Tier1,
370 Tier::Tier2 => crate::tiering::Tier::Tier2,
371 Tier::Tier3 => crate::tiering::Tier::Tier3,
372 }
373}
374
375fn from_tiering_tier(tier: crate::tiering::Tier) -> Tier {
377 match tier {
378 crate::tiering::Tier::Tier0 => Tier::Tier0,
379 crate::tiering::Tier::Tier1 => Tier::Tier1,
380 crate::tiering::Tier::Tier2 => Tier::Tier2,
381 crate::tiering::Tier::Tier3 => Tier::Tier3,
382 }
383}
384
385fn to_tiering_meta(meta: &BlockMeta, now: u64) -> crate::tiering::BlockMeta {
387 crate::tiering::BlockMeta {
388 ema_rate: meta.ema_rate,
389 access_window: meta.window,
390 last_access: meta.last_access_at,
391 access_count: meta.access_count as u64,
392 current_tier: to_tiering_tier(meta.tier),
393 tier_since: now.saturating_sub(meta.tier_age as u64),
394 }
395}
396
397pub struct TieredStore {
407 block_bytes: usize,
410
411 index: HashMap<BlockKey, BlockMeta>,
413
414 tier1_data: HashMap<BlockKey, BlockData>,
416 tier2_data: HashMap<BlockKey, BlockData>,
418 tier3_data: HashMap<BlockKey, BlockData>,
420
421 tier1_keys: Vec<BlockKey>,
423 tier2_keys: Vec<BlockKey>,
424 tier3_keys: Vec<BlockKey>,
425
426 witness_log: crate::metrics::WitnessLog,
428
429 coherence: Option<crate::coherence::CoherenceCheck>,
431 epoch_tracker: crate::coherence::EpochTracker,
433 metrics_series: crate::metrics::MetricsSeries,
435}
436
437const EMA_ALPHA: f32 = 0.1;
439
440impl TieredStore {
441 pub fn new(block_bytes: usize) -> Self {
443 Self {
444 block_bytes,
445 index: HashMap::new(),
446 tier1_data: HashMap::new(),
447 tier2_data: HashMap::new(),
448 tier3_data: HashMap::new(),
449 tier1_keys: Vec::new(),
450 tier2_keys: Vec::new(),
451 tier3_keys: Vec::new(),
452 witness_log: crate::metrics::WitnessLog::new(10_000),
453 coherence: None,
454 epoch_tracker: crate::coherence::EpochTracker::new(),
455 metrics_series: crate::metrics::MetricsSeries::new(256),
456 }
457 }
458
459 #[inline]
461 pub fn block_bytes(&self) -> usize {
462 self.block_bytes
463 }
464
465 pub fn witness_log(&self) -> &crate::metrics::WitnessLog {
467 &self.witness_log
468 }
469
470 pub fn witness_log_mut(&mut self) -> &mut crate::metrics::WitnessLog {
472 &mut self.witness_log
473 }
474
475 pub fn enable_coherence(&mut self, check: crate::coherence::CoherenceCheck) {
481 self.coherence = Some(check);
482 }
483
484 pub fn disable_coherence(&mut self) {
486 self.coherence = None;
487 }
488
489 pub fn epoch_tracker(&self) -> &crate::coherence::EpochTracker {
491 &self.epoch_tracker
492 }
493
494 pub fn epoch_tracker_mut(&mut self) -> &mut crate::coherence::EpochTracker {
496 &mut self.epoch_tracker
497 }
498
499 pub fn metrics_series(&self) -> &crate::metrics::MetricsSeries {
501 &self.metrics_series
502 }
503
504 pub fn metrics_series_mut(&mut self) -> &mut crate::metrics::MetricsSeries {
506 &mut self.metrics_series
507 }
508
509 pub fn coherence_check(
515 &mut self,
516 key: BlockKey,
517 original_data: &[f32],
518 now: u64,
519 ) -> Option<Result<crate::coherence::CoherenceResult, StoreError>> {
520 let check = self.coherence.clone()?;
521 Some(check.check_coherence(self, key, original_data, now))
522 }
523
524 pub fn metrics(&self) -> crate::metrics::StoreMetrics {
526 let mut m = crate::metrics::StoreMetrics::new();
527 m.total_blocks = self.index.len() as u64;
528 m.tier0_blocks = self
529 .index
530 .values()
531 .filter(|b| b.tier == Tier::Tier0)
532 .count() as u64;
533 m.tier1_blocks = self.tier1_keys.len() as u64;
534 m.tier2_blocks = self.tier2_keys.len() as u64;
535 m.tier3_blocks = self.tier3_keys.len() as u64;
536 m.tier1_bytes = self
537 .tier1_data
538 .values()
539 .map(|d| d.packed.len() as u64)
540 .sum();
541 m.tier2_bytes = self
542 .tier2_data
543 .values()
544 .map(|d| d.packed.len() as u64)
545 .sum();
546 m.tier3_bytes = self
547 .tier3_data
548 .values()
549 .map(|d| d.packed.len() as u64)
550 .sum();
551 m.total_evictions = self.witness_log.count_evictions() as u64;
552 m.tier_flips_last_minute = self.witness_log.tier_flip_rate(60, self.index.len() as u64);
553 m
554 }
555
556 pub fn put(
564 &mut self,
565 key: BlockKey,
566 data: &[f32],
567 tier: Tier,
568 now: u64,
569 ) -> Result<(), StoreError> {
570 if tier == Tier::Tier0 {
571 return Err(StoreError::InvalidBlock);
572 }
573
574 let bits = bits_for_tier(tier);
575 let (packed, scale) = quantize_block(data, bits);
576 let checksum = block_checksum(&packed, scale);
577
578 if let Some(old_meta) = self.index.get(&key) {
580 let old_tier = old_meta.tier;
581 self.remove_data(old_tier, key);
582 self.remove_from_bucket(old_tier, key);
583 }
584
585 let byte_count = packed.len() as u32;
586 let block = BlockData {
587 element_count: data.len() as u32,
588 packed,
589 };
590
591 match tier {
592 Tier::Tier1 => {
593 self.tier1_data.insert(key, block);
594 }
595 Tier::Tier2 => {
596 self.tier2_data.insert(key, block);
597 }
598 Tier::Tier3 => {
599 self.tier3_data.insert(key, block);
600 }
601 Tier::Tier0 => unreachable!(),
602 }
603 self.add_to_bucket(tier, key);
604
605 let meta = BlockMeta {
606 key,
607 dtype: DType::F32,
608 tier,
609 bits,
610 scale,
611 zero_point: 0,
612 created_at: now,
613 last_access_at: now,
614 access_count: 1,
615 ema_rate: 0.0,
616 window: 1,
617 checksum,
618 reconstruct: ReconstructPolicy::None,
619 tier_age: 0,
620 lineage_parent: None,
621 block_bytes: byte_count,
622 };
623 self.index.insert(key, meta);
624
625 self.witness_log.record(
627 now,
628 crate::metrics::WitnessEvent::Access {
629 key,
630 score: 0.0,
631 tier,
632 },
633 );
634
635 self.epoch_tracker.record_write(key);
637
638 Ok(())
639 }
640
641 pub fn get(&mut self, key: BlockKey, out: &mut [f32], now: u64) -> Result<usize, StoreError> {
655 let meta = self.index.get(&key).ok_or(StoreError::BlockNotFound)?;
656
657 if meta.tier == Tier::Tier0 {
658 return Err(StoreError::TensorEvicted);
659 }
660
661 let tier = meta.tier;
662 let scale = meta.scale;
663 let bits = meta.bits;
664 let checksum = meta.checksum;
665
666 let block = self
667 .data_map(tier)
668 .and_then(|m| m.get(&key))
669 .ok_or(StoreError::BlockNotFound)?;
670
671 let actual_crc = block_checksum(&block.packed, scale);
673 if actual_crc != checksum {
674 return Err(StoreError::ChecksumMismatch);
675 }
676
677 let n = dequantize_block(
678 &block.packed,
679 scale,
680 bits,
681 block.element_count as usize,
682 out,
683 );
684
685 self.touch(key, now);
687
688 self.witness_log.record(
690 now,
691 crate::metrics::WitnessEvent::Access {
692 key,
693 score: 0.0, tier,
695 },
696 );
697
698 Ok(n)
699 }
700
701 pub fn touch(&mut self, key: BlockKey, now: u64) {
707 if let Some(meta) = self.index.get_mut(&key) {
708 let delta = now.saturating_sub(meta.last_access_at);
709
710 if delta >= 64 {
712 meta.window = 1;
713 } else if delta > 0 {
714 meta.window = (meta.window << delta) | 1;
715 }
716 if delta > 0 {
720 let instant_rate = 1.0 / delta as f32;
721 meta.ema_rate = EMA_ALPHA * instant_rate + (1.0 - EMA_ALPHA) * meta.ema_rate;
722 }
723
724 meta.last_access_at = now;
725 meta.access_count = meta.access_count.saturating_add(1);
726 }
727 }
728
729 pub fn meta(&self, key: BlockKey) -> Option<&BlockMeta> {
731 self.index.get(&key)
732 }
733
734 pub fn block_count(&self) -> usize {
736 self.index.len()
737 }
738
739 pub fn tier_count(&self, tier: Tier) -> usize {
741 match tier {
742 Tier::Tier0 => self
743 .index
744 .values()
745 .filter(|m| m.tier == Tier::Tier0)
746 .count(),
747 Tier::Tier1 => self.tier1_keys.len(),
748 Tier::Tier2 => self.tier2_keys.len(),
749 Tier::Tier3 => self.tier3_keys.len(),
750 }
751 }
752
753 pub fn total_bytes(&self) -> usize {
755 let sum = |map: &HashMap<BlockKey, BlockData>| -> usize {
756 map.values().map(|b| b.packed.len()).sum()
757 };
758 sum(&self.tier1_data) + sum(&self.tier2_data) + sum(&self.tier3_data)
759 }
760
761 pub fn blocks_in_tier(&self, tier: Tier) -> &[BlockKey] {
765 match tier {
766 Tier::Tier0 => &[],
767 Tier::Tier1 => &self.tier1_keys,
768 Tier::Tier2 => &self.tier2_keys,
769 Tier::Tier3 => &self.tier3_keys,
770 }
771 }
772
773 pub fn evict(&mut self, key: BlockKey, policy: ReconstructPolicy) -> Result<(), StoreError> {
781 let meta = self.index.get_mut(&key).ok_or(StoreError::BlockNotFound)?;
782 let old_tier = meta.tier;
783
784 if old_tier == Tier::Tier0 {
785 meta.reconstruct = policy;
787 return Ok(());
788 }
789
790 let bytes_freed = meta.block_bytes as usize;
791 let evict_ts = meta.last_access_at;
792
793 meta.tier = Tier::Tier0;
796 meta.reconstruct = policy;
797 meta.tier_age = 0;
798 meta.block_bytes = 0;
799 meta.bits = 0;
800
801 self.remove_data(old_tier, key);
803 self.remove_from_bucket(old_tier, key);
804
805 self.witness_log.record(
807 evict_ts,
808 crate::metrics::WitnessEvent::Eviction {
809 key,
810 score: 0.0,
811 bytes_freed,
812 },
813 );
814
815 Ok(())
816 }
817
818 fn data_map(&self, tier: Tier) -> Option<&HashMap<BlockKey, BlockData>> {
822 match tier {
823 Tier::Tier0 => None,
824 Tier::Tier1 => Some(&self.tier1_data),
825 Tier::Tier2 => Some(&self.tier2_data),
826 Tier::Tier3 => Some(&self.tier3_data),
827 }
828 }
829
830 fn remove_data(&mut self, tier: Tier, key: BlockKey) {
832 match tier {
833 Tier::Tier1 => {
834 self.tier1_data.remove(&key);
835 }
836 Tier::Tier2 => {
837 self.tier2_data.remove(&key);
838 }
839 Tier::Tier3 => {
840 self.tier3_data.remove(&key);
841 }
842 Tier::Tier0 => {}
843 }
844 }
845
846 fn remove_from_bucket(&mut self, tier: Tier, key: BlockKey) {
848 let bucket = match tier {
849 Tier::Tier1 => &mut self.tier1_keys,
850 Tier::Tier2 => &mut self.tier2_keys,
851 Tier::Tier3 => &mut self.tier3_keys,
852 Tier::Tier0 => return,
853 };
854 if let Some(pos) = bucket.iter().position(|k| *k == key) {
855 bucket.swap_remove(pos);
856 }
857 }
858
859 fn add_to_bucket(&mut self, tier: Tier, key: BlockKey) {
861 match tier {
862 Tier::Tier1 => self.tier1_keys.push(key),
863 Tier::Tier2 => self.tier2_keys.push(key),
864 Tier::Tier3 => self.tier3_keys.push(key),
865 Tier::Tier0 => {}
866 }
867 }
868
869 pub fn tick(
877 &mut self,
878 config: &crate::tiering::TierConfig,
879 now: u64,
880 budget_bytes: usize,
881 budget_ops: u32,
882 ) -> TickResult {
883 let mut result = TickResult::default();
884
885 let store_keys: Vec<BlockKey> = self.index.keys().copied().collect();
888 if store_keys.is_empty() {
889 return result;
890 }
891
892 let tiering_blocks: Vec<(crate::tiering::BlockKey, crate::tiering::BlockMeta)> = store_keys
893 .iter()
894 .enumerate()
895 .map(|(idx, key)| {
896 let meta = &self.index[key];
897 (
898 crate::tiering::BlockKey(idx as u64),
899 to_tiering_meta(meta, now),
900 )
901 })
902 .collect();
903
904 let blocks_ref: Vec<(crate::tiering::BlockKey, &crate::tiering::BlockMeta)> =
905 tiering_blocks.iter().map(|(k, m)| (*k, m)).collect();
906
907 let candidates = crate::tiering::select_candidates(config, now, &blocks_ref);
910 result.candidates_found = candidates.len() as u32;
911
912 let mut remaining_bytes = budget_bytes;
914 let mut remaining_ops = budget_ops;
915 let mut migrated = std::collections::HashSet::new();
916
917 for candidate in &candidates {
918 if remaining_ops == 0 {
919 break;
920 }
921
922 let store_key = store_keys[candidate.key.0 as usize];
923 let target_tier = from_tiering_tier(candidate.target_tier);
924 let current_tier = from_tiering_tier(candidate.current_tier);
925
926 let old_bytes = self
927 .index
928 .get(&store_key)
929 .map(|m| m.block_bytes as usize)
930 .unwrap_or(0);
931
932 if old_bytes > remaining_bytes {
934 continue;
935 }
936
937 if target_tier == Tier::Tier0 {
938 if self.evict(store_key, ReconstructPolicy::None).is_ok() {
940 result.evictions += 1;
941 result.bytes_freed += old_bytes;
942 remaining_ops -= 1;
943 result.ops_used += 1;
944 remaining_bytes = remaining_bytes.saturating_sub(old_bytes);
945 migrated.insert(store_key);
946 }
947 } else {
948 let warm_bytes: usize = self.tier2_data.values().map(|b| b.packed.len()).sum();
950 let target_bits =
951 crate::tiering::bits_for_tier(config, to_tiering_tier(target_tier), warm_bytes);
952
953 let old_tier_u8 = current_tier as u8;
954 let new_tier_u8 = target_tier as u8;
955
956 if self
957 .migrate_block(store_key, target_tier, target_bits)
958 .is_ok()
959 {
960 let new_bytes = self
961 .index
962 .get(&store_key)
963 .map(|m| m.block_bytes as usize)
964 .unwrap_or(0);
965
966 if new_tier_u8 < old_tier_u8 {
967 result.upgrades += 1;
969 } else {
970 result.downgrades += 1;
972 result.bytes_freed += old_bytes.saturating_sub(new_bytes);
973 }
974
975 let reason = if new_tier_u8 < old_tier_u8 {
977 crate::metrics::TierChangeReason::ScoreUpgrade
978 } else {
979 crate::metrics::TierChangeReason::ScoreDowngrade
980 };
981 self.witness_log.record(
982 now,
983 crate::metrics::WitnessEvent::TierChange {
984 key: store_key,
985 from_tier: current_tier,
986 to_tier: target_tier,
987 score: candidate.score,
988 reason,
989 },
990 );
991
992 remaining_ops -= 1;
993 result.ops_used += 1;
994 remaining_bytes = remaining_bytes.saturating_sub(old_bytes);
995 migrated.insert(store_key);
996 }
997 }
998 }
999
1000 for key in &store_keys {
1002 if migrated.contains(key) {
1003 continue;
1004 }
1005 if let Some(meta) = self.index.get_mut(key) {
1006 meta.tier_age = meta.tier_age.saturating_add(1);
1007 let mut tm = crate::tiering::BlockMeta {
1009 ema_rate: meta.ema_rate,
1010 access_window: meta.window,
1011 last_access: meta.last_access_at,
1012 access_count: meta.access_count as u64,
1013 current_tier: to_tiering_tier(meta.tier),
1014 tier_since: now.saturating_sub(meta.tier_age as u64),
1015 };
1016 crate::tiering::tick_decay(config, &mut tm);
1017 meta.ema_rate = tm.ema_rate;
1018 meta.window = tm.access_window;
1019 }
1020 }
1021
1022 self.witness_log.record(
1024 now,
1025 crate::metrics::WitnessEvent::Maintenance {
1026 upgrades: result.upgrades,
1027 downgrades: result.downgrades,
1028 evictions: result.evictions,
1029 bytes_freed: result.bytes_freed,
1030 budget_remaining_bytes: remaining_bytes.min(u32::MAX as usize) as u32,
1031 budget_remaining_ops: remaining_ops,
1032 },
1033 );
1034
1035 let snapshot_metrics = self.metrics();
1037 self.metrics_series.record(now, snapshot_metrics);
1038
1039 result
1040 }
1041
1042 fn migrate_block(
1048 &mut self,
1049 key: BlockKey,
1050 target_tier: Tier,
1051 target_bits: u8,
1052 ) -> Result<(), StoreError> {
1053 let meta = self.index.get(&key).ok_or(StoreError::BlockNotFound)?;
1055 let old_tier = meta.tier;
1056 let old_bits = meta.bits;
1057 let old_scale = meta.scale;
1058
1059 if old_tier == Tier::Tier0 {
1060 return Err(StoreError::TensorEvicted);
1061 }
1062 if target_tier == Tier::Tier0 {
1063 return Err(StoreError::InvalidBlock);
1064 }
1065
1066 let (element_count, f32_data) = {
1070 let block = self
1071 .data_map(old_tier)
1072 .and_then(|m| m.get(&key))
1073 .ok_or(StoreError::BlockNotFound)?;
1074 let ec = block.element_count;
1075 let mut data = vec![0.0f32; ec as usize];
1076 dequantize_block(&block.packed, old_scale, old_bits, ec as usize, &mut data);
1077 (ec, data)
1078 };
1079
1080 let (packed, scale) = quantize_block(&f32_data, target_bits);
1082 let checksum = block_checksum(&packed, scale);
1083 let byte_count = packed.len() as u32;
1084 let new_block = BlockData {
1085 element_count,
1086 packed,
1087 };
1088
1089 self.remove_data(old_tier, key);
1091 self.remove_from_bucket(old_tier, key);
1092
1093 match target_tier {
1095 Tier::Tier1 => {
1096 self.tier1_data.insert(key, new_block);
1097 }
1098 Tier::Tier2 => {
1099 self.tier2_data.insert(key, new_block);
1100 }
1101 Tier::Tier3 => {
1102 self.tier3_data.insert(key, new_block);
1103 }
1104 Tier::Tier0 => unreachable!(),
1105 }
1106 self.add_to_bucket(target_tier, key);
1107
1108 let meta = self.index.get_mut(&key).unwrap();
1110 meta.tier = target_tier;
1111 meta.bits = target_bits;
1112 meta.scale = scale;
1113 meta.checksum = checksum;
1114 meta.tier_age = 0;
1115 meta.block_bytes = byte_count;
1116
1117 Ok(())
1118 }
1119
1120 pub fn score_block(
1125 &self,
1126 key: BlockKey,
1127 config: &crate::tiering::TierConfig,
1128 now: u64,
1129 ) -> Option<f32> {
1130 let meta = self.index.get(&key)?;
1131 let tm = to_tiering_meta(meta, now);
1132 Some(crate::tiering::compute_score(config, now, &tm))
1133 }
1134
1135 pub fn touch_block(&mut self, key: BlockKey, config: &crate::tiering::TierConfig, now: u64) {
1141 if let Some(meta) = self.index.get_mut(&key) {
1142 let mut tm = crate::tiering::BlockMeta {
1143 ema_rate: meta.ema_rate,
1144 access_window: meta.window,
1145 last_access: meta.last_access_at,
1146 access_count: meta.access_count as u64,
1147 current_tier: to_tiering_tier(meta.tier),
1148 tier_since: now.saturating_sub(meta.tier_age as u64),
1149 };
1150 crate::tiering::touch(config, now, &mut tm);
1151 meta.ema_rate = tm.ema_rate;
1152 meta.window = tm.access_window;
1153 meta.last_access_at = tm.last_access;
1154 meta.access_count = tm.access_count.min(u32::MAX as u64) as u32;
1155 }
1156 }
1157}
1158
1159impl BlockIO for TieredStore {
1164 fn read_block(&self, tier: Tier, key: BlockKey, dst: &mut [u8]) -> Result<usize, StoreError> {
1165 let map = self.data_map(tier).ok_or(StoreError::BlockNotFound)?;
1166 let block = map.get(&key).ok_or(StoreError::BlockNotFound)?;
1167 let n = block.packed.len().min(dst.len());
1168 dst[..n].copy_from_slice(&block.packed[..n]);
1169 Ok(n)
1170 }
1171
1172 fn write_block(&mut self, tier: Tier, key: BlockKey, src: &[u8]) -> Result<(), StoreError> {
1173 if tier == Tier::Tier0 {
1174 return Err(StoreError::InvalidBlock);
1175 }
1176 let block = BlockData {
1177 element_count: 0, packed: src.to_vec(),
1179 };
1180 match tier {
1181 Tier::Tier1 => {
1182 self.tier1_data.insert(key, block);
1183 }
1184 Tier::Tier2 => {
1185 self.tier2_data.insert(key, block);
1186 }
1187 Tier::Tier3 => {
1188 self.tier3_data.insert(key, block);
1189 }
1190 Tier::Tier0 => unreachable!(),
1191 }
1192 Ok(())
1193 }
1194
1195 fn delete_block(&mut self, tier: Tier, key: BlockKey) -> Result<(), StoreError> {
1196 let removed = match tier {
1197 Tier::Tier1 => self.tier1_data.remove(&key).is_some(),
1198 Tier::Tier2 => self.tier2_data.remove(&key).is_some(),
1199 Tier::Tier3 => self.tier3_data.remove(&key).is_some(),
1200 Tier::Tier0 => false,
1201 };
1202 if removed {
1203 Ok(())
1204 } else {
1205 Err(StoreError::BlockNotFound)
1206 }
1207 }
1208}
1209
1210impl MetaLog for TieredStore {
1211 fn append(&mut self, rec: &BlockMeta) -> Result<(), StoreError> {
1212 self.index.insert(rec.key, rec.clone());
1213 Ok(())
1214 }
1215
1216 fn get(&self, key: BlockKey) -> Option<&BlockMeta> {
1217 self.index.get(&key)
1218 }
1219
1220 fn iter(&self) -> Box<dyn Iterator<Item = &BlockMeta> + '_> {
1221 Box::new(self.index.values())
1222 }
1223}
1224
1225#[cfg(test)]
1230mod tests {
1231 use super::*;
1232 use std::collections::hash_map::DefaultHasher;
1233 use std::hash::{Hash, Hasher};
1234
1235 fn make_key(tid: u128, idx: u32) -> BlockKey {
1236 BlockKey {
1237 tensor_id: tid,
1238 block_index: idx,
1239 }
1240 }
1241
1242 #[test]
1245 fn test_crc32_known_vector() {
1246 let data = b"123456789";
1248 assert_eq!(crc32(data), 0xCBF4_3926);
1249 }
1250
1251 #[test]
1252 fn test_crc32_empty() {
1253 assert_eq!(crc32(&[]), 0x0000_0000);
1254 }
1255
1256 #[test]
1257 fn test_crc32_single_byte() {
1258 assert_eq!(crc32(&[0x00]), 0xD202_EF8D);
1260 }
1261
1262 #[test]
1265 fn test_block_key_equality() {
1266 let a = make_key(1, 0);
1267 let b = make_key(1, 0);
1268 let c = make_key(1, 1);
1269 assert_eq!(a, b);
1270 assert_ne!(a, c);
1271 }
1272
1273 #[test]
1274 fn test_block_key_hash_differs() {
1275 fn hash_of(k: &BlockKey) -> u64 {
1276 let mut h = DefaultHasher::new();
1277 k.hash(&mut h);
1278 h.finish()
1279 }
1280 let a = make_key(1, 0);
1281 let b = make_key(2, 0);
1282 let c = make_key(1, 1);
1283 assert_ne!(hash_of(&a), hash_of(&b));
1285 assert_ne!(hash_of(&a), hash_of(&c));
1286 }
1287
1288 #[test]
1289 fn test_block_key_hash_stable() {
1290 fn hash_of(k: &BlockKey) -> u64 {
1291 let mut h = DefaultHasher::new();
1292 k.hash(&mut h);
1293 h.finish()
1294 }
1295 let a = make_key(42, 7);
1296 let b = make_key(42, 7);
1297 assert_eq!(hash_of(&a), hash_of(&b));
1298 }
1299
1300 #[test]
1303 fn test_qmax_values() {
1304 assert_eq!(qmax(8), 127);
1305 assert_eq!(qmax(7), 63);
1306 assert_eq!(qmax(5), 15);
1307 assert_eq!(qmax(3), 3);
1308 assert_eq!(qmax(1), 0);
1309 assert_eq!(qmax(0), 0);
1310 assert_eq!(qmax(9), 0);
1311 }
1312
1313 #[test]
1316 fn test_quantize_roundtrip_8bit() {
1317 let data: Vec<f32> = (0..128).map(|i| (i as f32 - 64.0) * 0.1).collect();
1318 let (packed, scale) = quantize_block(&data, 8);
1319 let mut out = vec![0.0f32; 128];
1320 let n = dequantize_block(&packed, scale, 8, 128, &mut out);
1321 assert_eq!(n, 128);
1322 for (i, (&orig, &dec)) in data.iter().zip(out.iter()).enumerate() {
1323 let err = (orig - dec).abs();
1324 let tol = if orig.abs() > 0.01 {
1325 orig.abs() * 0.02
1326 } else {
1327 0.1
1328 };
1329 assert!(err < tol, "i={i} orig={orig} dec={dec} err={err}");
1330 }
1331 }
1332
1333 #[test]
1334 fn test_quantize_roundtrip_3bit() {
1335 let data: Vec<f32> = (0..64).map(|i| (i as f32 - 32.0) * 0.5).collect();
1336 let (packed, scale) = quantize_block(&data, 3);
1337 let mut out = vec![0.0f32; 64];
1338 let n = dequantize_block(&packed, scale, 3, 64, &mut out);
1339 assert_eq!(n, 64);
1340 let max_val = data.iter().map(|v| v.abs()).fold(0.0f32, f32::max);
1341 for (&orig, &dec) in data.iter().zip(out.iter()) {
1342 let err = (orig - dec).abs();
1343 assert!(err < max_val * 0.35, "orig={orig} dec={dec} err={err}");
1344 }
1345 }
1346
1347 #[test]
1348 fn test_quantize_zeros() {
1349 let data = vec![0.0f32; 64];
1350 let (packed, scale) = quantize_block(&data, 8);
1351 assert_eq!(scale, 0.0);
1352 let mut out = vec![1.0f32; 64];
1353 let n = dequantize_block(&packed, scale, 8, 64, &mut out);
1354 assert_eq!(n, 64);
1355 for &v in &out {
1356 assert_eq!(v, 0.0);
1357 }
1358 }
1359
1360 #[test]
1363 fn test_store_put_get_roundtrip() {
1364 let mut store = TieredStore::new(4096);
1365 let key = make_key(1, 0);
1366 let data: Vec<f32> = (0..64).map(|i| i as f32 * 0.25).collect();
1367
1368 store.put(key, &data, Tier::Tier1, 0).unwrap();
1369
1370 let mut out = vec![0.0f32; 64];
1371 let n = TieredStore::get(&mut store, key, &mut out, 1).unwrap();
1372 assert_eq!(n, 64);
1373
1374 for (i, (&orig, &dec)) in data.iter().zip(out.iter()).enumerate() {
1375 let err = (orig - dec).abs();
1376 let tol = if orig.abs() > 0.01 {
1377 orig.abs() * 0.02
1378 } else {
1379 0.15
1380 };
1381 assert!(err < tol, "i={i} orig={orig} dec={dec} err={err}");
1382 }
1383 }
1384
1385 #[test]
1386 fn test_store_put_tier3_roundtrip() {
1387 let mut store = TieredStore::new(4096);
1388 let key = make_key(10, 5);
1389 let data: Vec<f32> = (0..32).map(|i| (i as f32 - 16.0) * 0.5).collect();
1390
1391 store.put(key, &data, Tier::Tier3, 100).unwrap();
1392
1393 let meta = store.meta(key).unwrap();
1394 assert_eq!(meta.tier, Tier::Tier3);
1395 assert_eq!(meta.bits, 3);
1396 assert_eq!(meta.created_at, 100);
1397
1398 let mut out = vec![0.0f32; 32];
1399 let n = TieredStore::get(&mut store, key, &mut out, 101).unwrap();
1400 assert_eq!(n, 32);
1401
1402 let max_val = data.iter().map(|v| v.abs()).fold(0.0f32, f32::max);
1403 for (&orig, &dec) in data.iter().zip(out.iter()) {
1404 let err = (orig - dec).abs();
1405 assert!(err < max_val * 0.35, "orig={orig} dec={dec} err={err}");
1406 }
1407 }
1408
1409 #[test]
1410 fn test_store_get_not_found() {
1411 let mut store = TieredStore::new(4096);
1412 let key = make_key(99, 0);
1413 let mut out = vec![0.0f32; 8];
1414 assert_eq!(
1415 TieredStore::get(&mut store, key, &mut out, 0),
1416 Err(StoreError::BlockNotFound)
1417 );
1418 }
1419
1420 #[test]
1421 fn test_store_put_tier0_rejected() {
1422 let mut store = TieredStore::new(4096);
1423 let key = make_key(1, 0);
1424 let data = vec![1.0f32; 8];
1425 assert_eq!(
1426 store.put(key, &data, Tier::Tier0, 0),
1427 Err(StoreError::InvalidBlock)
1428 );
1429 }
1430
1431 #[test]
1434 fn test_eviction() {
1435 let mut store = TieredStore::new(4096);
1436 let key = make_key(1, 0);
1437 let data = vec![1.0f32; 64];
1438
1439 store.put(key, &data, Tier::Tier1, 0).unwrap();
1440 assert_eq!(store.tier_count(Tier::Tier1), 1);
1441 assert!(store.total_bytes() > 0);
1442
1443 store.evict(key, ReconstructPolicy::Delta).unwrap();
1444
1445 let meta = store.meta(key).unwrap();
1446 assert_eq!(meta.tier, Tier::Tier0);
1447 assert_eq!(meta.reconstruct, ReconstructPolicy::Delta);
1448 assert_eq!(meta.block_bytes, 0);
1449 assert_eq!(meta.bits, 0);
1450 assert_eq!(meta.tier_age, 0);
1451
1452 let mut out = vec![0.0f32; 64];
1454 assert_eq!(
1455 TieredStore::get(&mut store, key, &mut out, 1),
1456 Err(StoreError::TensorEvicted)
1457 );
1458
1459 assert_eq!(store.tier_count(Tier::Tier1), 0);
1461 assert_eq!(store.tier_count(Tier::Tier0), 1);
1462
1463 assert_eq!(store.block_count(), 1);
1465 }
1466
1467 #[test]
1468 fn test_eviction_not_found() {
1469 let mut store = TieredStore::new(4096);
1470 let key = make_key(1, 0);
1471 assert_eq!(
1472 store.evict(key, ReconstructPolicy::None),
1473 Err(StoreError::BlockNotFound),
1474 );
1475 }
1476
1477 #[test]
1478 fn test_eviction_idempotent() {
1479 let mut store = TieredStore::new(4096);
1480 let key = make_key(1, 0);
1481 store.put(key, &[1.0; 16], Tier::Tier2, 0).unwrap();
1482
1483 store.evict(key, ReconstructPolicy::None).unwrap();
1484 store.evict(key, ReconstructPolicy::Factor).unwrap();
1486
1487 let meta = store.meta(key).unwrap();
1488 assert_eq!(meta.reconstruct, ReconstructPolicy::Factor);
1489 }
1490
1491 #[test]
1494 fn test_tier_counts() {
1495 let mut store = TieredStore::new(4096);
1496 let data = vec![1.0f32; 16];
1497
1498 store.put(make_key(1, 0), &data, Tier::Tier1, 0).unwrap();
1499 store.put(make_key(2, 0), &data, Tier::Tier1, 0).unwrap();
1500 store.put(make_key(3, 0), &data, Tier::Tier2, 0).unwrap();
1501 store.put(make_key(4, 0), &data, Tier::Tier3, 0).unwrap();
1502 store.put(make_key(5, 0), &data, Tier::Tier3, 0).unwrap();
1503 store.put(make_key(6, 0), &data, Tier::Tier3, 0).unwrap();
1504
1505 assert_eq!(store.block_count(), 6);
1506 assert_eq!(store.tier_count(Tier::Tier0), 0);
1507 assert_eq!(store.tier_count(Tier::Tier1), 2);
1508 assert_eq!(store.tier_count(Tier::Tier2), 1);
1509 assert_eq!(store.tier_count(Tier::Tier3), 3);
1510
1511 assert_eq!(store.blocks_in_tier(Tier::Tier1).len(), 2);
1512 assert_eq!(store.blocks_in_tier(Tier::Tier0).len(), 0);
1513 }
1514
1515 #[test]
1518 fn test_total_bytes() {
1519 let mut store = TieredStore::new(4096);
1520 assert_eq!(store.total_bytes(), 0);
1521
1522 let data = vec![1.0f32; 64];
1523 store.put(make_key(1, 0), &data, Tier::Tier1, 0).unwrap();
1524 let bytes_after_one = store.total_bytes();
1525 assert!(bytes_after_one > 0);
1526
1527 store.put(make_key(2, 0), &data, Tier::Tier2, 0).unwrap();
1528 assert!(store.total_bytes() > bytes_after_one);
1529 }
1530
1531 #[test]
1532 fn test_total_bytes_decreases_on_evict() {
1533 let mut store = TieredStore::new(4096);
1534 let data = vec![1.0f32; 64];
1535 let key = make_key(1, 0);
1536
1537 store.put(key, &data, Tier::Tier1, 0).unwrap();
1538 let before = store.total_bytes();
1539
1540 store.evict(key, ReconstructPolicy::None).unwrap();
1541 assert_eq!(store.total_bytes(), before - before); }
1543
1544 #[test]
1547 fn test_touch_updates_stats() {
1548 let mut store = TieredStore::new(4096);
1549 let key = make_key(1, 0);
1550 store.put(key, &[1.0; 16], Tier::Tier1, 0).unwrap();
1551
1552 let meta = store.meta(key).unwrap();
1554 assert_eq!(meta.access_count, 1);
1555 assert_eq!(meta.last_access_at, 0);
1556 assert_eq!(meta.window, 1);
1557
1558 store.touch(key, 5);
1560 let meta = store.meta(key).unwrap();
1561 assert_eq!(meta.access_count, 2);
1562 assert_eq!(meta.last_access_at, 5);
1563 assert_eq!(meta.window, (1u64 << 5) | 1);
1565 assert!(meta.ema_rate > 0.0);
1566
1567 store.touch(key, 5);
1569 let meta = store.meta(key).unwrap();
1570 assert_eq!(meta.access_count, 3);
1571 assert_eq!(meta.window, (1u64 << 5) | 1);
1573 }
1574
1575 #[test]
1576 fn test_touch_window_overflow() {
1577 let mut store = TieredStore::new(4096);
1578 let key = make_key(1, 0);
1579 store.put(key, &[1.0; 16], Tier::Tier1, 0).unwrap();
1580
1581 store.touch(key, 100);
1583 let meta = store.meta(key).unwrap();
1584 assert_eq!(meta.window, 1);
1585 assert_eq!(meta.last_access_at, 100);
1586 }
1587
1588 #[test]
1589 fn test_touch_nonexistent_noop() {
1590 let mut store = TieredStore::new(4096);
1591 store.touch(make_key(42, 0), 10);
1593 }
1594
1595 #[test]
1598 fn test_put_overwrite() {
1599 let mut store = TieredStore::new(4096);
1600 let key = make_key(1, 0);
1601
1602 store.put(key, &[1.0; 16], Tier::Tier1, 0).unwrap();
1603 assert_eq!(store.tier_count(Tier::Tier1), 1);
1604
1605 store.put(key, &[2.0; 16], Tier::Tier3, 10).unwrap();
1607 assert_eq!(store.block_count(), 1);
1608 assert_eq!(store.tier_count(Tier::Tier1), 0);
1609 assert_eq!(store.tier_count(Tier::Tier3), 1);
1610
1611 let meta = store.meta(key).unwrap();
1612 assert_eq!(meta.tier, Tier::Tier3);
1613 assert_eq!(meta.created_at, 10);
1614 }
1615
1616 #[test]
1619 fn test_checksum_stored_correctly() {
1620 let mut store = TieredStore::new(4096);
1621 let key = make_key(1, 0);
1622 let data: Vec<f32> = (0..32).map(|i| i as f32).collect();
1623
1624 store.put(key, &data, Tier::Tier1, 0).unwrap();
1625
1626 let meta = store.meta(key).unwrap();
1627 assert_ne!(meta.checksum, 0);
1628
1629 let (packed, scale) = quantize_block(&data, 8);
1631 let expected = block_checksum(&packed, scale);
1632 assert_eq!(meta.checksum, expected);
1633 }
1634
1635 #[test]
1638 fn test_block_io_write_read() {
1639 let mut store = TieredStore::new(4096);
1640 let key = make_key(1, 0);
1641 let raw = vec![0xAA, 0xBB, 0xCC, 0xDD];
1642
1643 store.write_block(Tier::Tier1, key, &raw).unwrap();
1644
1645 let mut dst = vec![0u8; 8];
1646 let n = store.read_block(Tier::Tier1, key, &mut dst).unwrap();
1647 assert_eq!(n, 4);
1648 assert_eq!(&dst[..4], &raw);
1649 }
1650
1651 #[test]
1652 fn test_block_io_delete() {
1653 let mut store = TieredStore::new(4096);
1654 let key = make_key(1, 0);
1655 store.write_block(Tier::Tier2, key, &[1, 2, 3]).unwrap();
1656
1657 store.delete_block(Tier::Tier2, key).unwrap();
1658
1659 let mut dst = vec![0u8; 4];
1660 assert_eq!(
1661 store.read_block(Tier::Tier2, key, &mut dst),
1662 Err(StoreError::BlockNotFound),
1663 );
1664 }
1665
1666 #[test]
1667 fn test_block_io_write_tier0_rejected() {
1668 let mut store = TieredStore::new(4096);
1669 let key = make_key(1, 0);
1670 assert_eq!(
1671 store.write_block(Tier::Tier0, key, &[1]),
1672 Err(StoreError::InvalidBlock),
1673 );
1674 }
1675
1676 #[test]
1679 fn test_meta_log_append_get() {
1680 let mut store = TieredStore::new(4096);
1681 let key = make_key(1, 0);
1682 let meta = BlockMeta {
1683 key,
1684 dtype: DType::F32,
1685 tier: Tier::Tier1,
1686 bits: 8,
1687 scale: 0.5,
1688 zero_point: 0,
1689 created_at: 42,
1690 last_access_at: 42,
1691 access_count: 1,
1692 ema_rate: 0.0,
1693 window: 1,
1694 checksum: 0,
1695 reconstruct: ReconstructPolicy::None,
1696 tier_age: 0,
1697 lineage_parent: None,
1698 block_bytes: 64,
1699 };
1700
1701 MetaLog::append(&mut store, &meta).unwrap();
1702 let retrieved = MetaLog::get(&store, key).unwrap();
1703 assert_eq!(retrieved.key, key);
1704 assert_eq!(retrieved.created_at, 42);
1705 }
1706
1707 #[test]
1708 fn test_meta_log_iter() {
1709 let mut store = TieredStore::new(4096);
1710 let data = vec![1.0f32; 8];
1711
1712 store.put(make_key(1, 0), &data, Tier::Tier1, 0).unwrap();
1713 store.put(make_key(2, 0), &data, Tier::Tier2, 0).unwrap();
1714 store.put(make_key(3, 0), &data, Tier::Tier3, 0).unwrap();
1715
1716 let entries: Vec<_> = MetaLog::iter(&store).collect();
1717 assert_eq!(entries.len(), 3);
1718 }
1719
1720 #[test]
1723 fn test_bits_for_tier() {
1724 assert_eq!(bits_for_tier(Tier::Tier0), 0);
1725 assert_eq!(bits_for_tier(Tier::Tier1), 8);
1726 assert_eq!(bits_for_tier(Tier::Tier2), 7);
1727 assert_eq!(bits_for_tier(Tier::Tier3), 3);
1728 }
1729
1730 #[test]
1733 fn test_tier_repr() {
1734 assert_eq!(Tier::Tier0 as u8, 0);
1735 assert_eq!(Tier::Tier1 as u8, 1);
1736 assert_eq!(Tier::Tier2 as u8, 2);
1737 assert_eq!(Tier::Tier3 as u8, 3);
1738 }
1739
1740 #[test]
1741 fn test_dtype_repr() {
1742 assert_eq!(DType::F32 as u8, 0);
1743 assert_eq!(DType::F16 as u8, 1);
1744 assert_eq!(DType::BF16 as u8, 2);
1745 }
1746
1747 #[test]
1748 fn test_reconstruct_policy_repr() {
1749 assert_eq!(ReconstructPolicy::None as u8, 0);
1750 assert_eq!(ReconstructPolicy::Delta as u8, 1);
1751 assert_eq!(ReconstructPolicy::Factor as u8, 2);
1752 }
1753
1754 #[test]
1757 fn test_multi_block_workflow() {
1758 let mut store = TieredStore::new(4096);
1759
1760 for i in 0..10u32 {
1762 let key = make_key(1, i);
1763 let data: Vec<f32> = (0..32).map(|j| (i * 32 + j) as f32 * 0.1).collect();
1764 let tier = match i % 3 {
1765 0 => Tier::Tier1,
1766 1 => Tier::Tier2,
1767 _ => Tier::Tier3,
1768 };
1769 store.put(key, &data, tier, i as u64).unwrap();
1770 }
1771
1772 assert_eq!(store.block_count(), 10);
1773 assert_eq!(store.tier_count(Tier::Tier1), 4); assert_eq!(store.tier_count(Tier::Tier2), 3); assert_eq!(store.tier_count(Tier::Tier3), 3); store.touch(make_key(1, 0), 20);
1779 store.touch(make_key(1, 5), 25);
1780
1781 store
1783 .evict(make_key(1, 8), ReconstructPolicy::Delta)
1784 .unwrap();
1785 assert_eq!(store.tier_count(Tier::Tier3), 2);
1786 assert_eq!(store.tier_count(Tier::Tier0), 1);
1787 assert_eq!(store.block_count(), 10); let mut out = vec![0.0f32; 32];
1791 let n = TieredStore::get(&mut store, make_key(1, 0), &mut out, 30).unwrap();
1792 assert_eq!(n, 32);
1793 }
1794
1795 #[test]
1798 fn test_tick_empty_store() {
1799 let mut store = TieredStore::new(4096);
1800 let config = crate::tiering::TierConfig::default();
1801 let result = store.tick(&config, 100, 1_000_000, 100);
1802 assert_eq!(result.upgrades, 0);
1803 assert_eq!(result.downgrades, 0);
1804 assert_eq!(result.evictions, 0);
1805 assert_eq!(result.bytes_freed, 0);
1806 assert_eq!(result.ops_used, 0);
1807 assert_eq!(result.candidates_found, 0);
1808 }
1809
1810 #[test]
1811 fn test_tick_migrates_cold_to_hot() {
1812 let mut store = TieredStore::new(4096);
1813 let key = make_key(1, 0);
1814 let data: Vec<f32> = (0..64).map(|i| i as f32 * 0.1).collect();
1815
1816 store.put(key, &data, Tier::Tier3, 0).unwrap();
1818 assert_eq!(store.tier_count(Tier::Tier3), 1);
1819
1820 if let Some(meta) = store.index.get_mut(&key) {
1823 meta.ema_rate = 1.0;
1824 meta.window = u64::MAX; meta.last_access_at = 100;
1826 meta.access_count = 100;
1827 meta.tier_age = 10; }
1829
1830 let config = crate::tiering::TierConfig::default();
1831 let result = store.tick(&config, 100, 1_000_000, 100);
1832
1833 assert!(
1834 result.upgrades > 0,
1835 "expected at least one upgrade, got {}",
1836 result.upgrades
1837 );
1838 assert_eq!(result.downgrades, 0);
1839 assert!(result.candidates_found > 0);
1840
1841 let meta = store.meta(key).unwrap();
1842 assert_eq!(
1843 meta.tier,
1844 Tier::Tier1,
1845 "block should be in Tier1 after upgrade"
1846 );
1847 assert_eq!(meta.bits, 8, "Tier1 should use 8-bit quantization");
1848 assert_eq!(meta.tier_age, 0, "tier_age should reset after migration");
1849
1850 let mut out = vec![0.0f32; 64];
1852 let n = TieredStore::get(&mut store, key, &mut out, 101).unwrap();
1853 assert_eq!(n, 64);
1854 }
1855
1856 #[test]
1857 fn test_tick_respects_budget_ops() {
1858 let mut store = TieredStore::new(4096);
1859 let data: Vec<f32> = (0..64).map(|i| i as f32 * 0.1).collect();
1860
1861 for i in 0..5u32 {
1863 let key = make_key(i as u128 + 1, 0);
1864 store.put(key, &data, Tier::Tier3, 0).unwrap();
1865 if let Some(meta) = store.index.get_mut(&key) {
1866 meta.ema_rate = 1.0;
1867 meta.window = u64::MAX;
1868 meta.last_access_at = 100;
1869 meta.access_count = 100;
1870 meta.tier_age = 10;
1871 }
1872 }
1873
1874 let config = crate::tiering::TierConfig::default();
1875 let result = store.tick(&config, 100, 1_000_000, 2);
1877
1878 assert_eq!(result.ops_used, 2, "should use exactly 2 ops");
1879 assert_eq!(result.upgrades, 2, "should upgrade only 2 blocks");
1880 assert!(result.candidates_found >= 5, "should find all 5 candidates");
1881 }
1882
1883 #[test]
1884 fn test_touch_block_updates_ema_and_window() {
1885 let mut store = TieredStore::new(4096);
1886 let key = make_key(1, 0);
1887 store.put(key, &[1.0; 16], Tier::Tier1, 0).unwrap();
1888
1889 let config = crate::tiering::TierConfig::default();
1890
1891 let meta = store.meta(key).unwrap();
1893 assert_eq!(meta.ema_rate, 0.0);
1894
1895 store.touch_block(key, &config, 5);
1897 let meta = store.meta(key).unwrap();
1898
1899 assert!(
1902 (meta.ema_rate - config.alpha).abs() < 1e-6,
1903 "ema_rate={}, expected={}",
1904 meta.ema_rate,
1905 config.alpha,
1906 );
1907 assert_eq!(meta.last_access_at, 5);
1908 assert_ne!(meta.window & 1, 0, "bit 0 should be set");
1910 assert_eq!(meta.window, (1u64 << 5) | 1);
1913 }
1914
1915 #[test]
1916 fn test_score_block_none_for_missing() {
1917 let store = TieredStore::new(4096);
1918 let config = crate::tiering::TierConfig::default();
1919 let result = store.score_block(make_key(99, 0), &config, 100);
1920 assert_eq!(result, None);
1921 }
1922
1923 #[test]
1928 fn test_epoch_tracker_wired_into_put() {
1929 let mut store = TieredStore::new(4096);
1930 let key = BlockKey {
1931 tensor_id: 1,
1932 block_index: 0,
1933 };
1934 let data = vec![1.0f32; 64];
1935
1936 assert_eq!(store.epoch_tracker().check_epoch(key), None);
1937
1938 store.put(key, &data, Tier::Tier1, 0).unwrap();
1939 assert!(store.epoch_tracker().check_epoch(key).is_some());
1940
1941 let epoch1 = store.epoch_tracker().check_epoch(key).unwrap();
1942 store.put(key, &data, Tier::Tier1, 1).unwrap();
1943 let epoch2 = store.epoch_tracker().check_epoch(key).unwrap();
1944 assert!(epoch2 > epoch1, "epoch should increment on overwrite");
1945 }
1946
1947 #[test]
1948 fn test_coherence_disabled_by_default() {
1949 let mut store = TieredStore::new(4096);
1950 let key = BlockKey {
1951 tensor_id: 1,
1952 block_index: 0,
1953 };
1954 let data = vec![1.0f32; 64];
1955 store.put(key, &data, Tier::Tier1, 0).unwrap();
1956
1957 assert!(store.coherence_check(key, &data, 1).is_none());
1958 }
1959
1960 #[test]
1961 fn test_coherence_enabled_passes() {
1962 let mut store = TieredStore::new(4096);
1963 store.enable_coherence(crate::coherence::CoherenceCheck::default());
1964
1965 let key = BlockKey {
1966 tensor_id: 1,
1967 block_index: 0,
1968 };
1969 let data: Vec<f32> = (0..64).map(|i| (i as f32 + 1.0) * 0.25).collect();
1970 store.put(key, &data, Tier::Tier1, 0).unwrap();
1971
1972 let result = store.coherence_check(key, &data, 1).unwrap().unwrap();
1973 assert!(
1974 result.passed,
1975 "Tier1 coherence should pass; err={}",
1976 result.max_error
1977 );
1978 }
1979
1980 #[test]
1985 fn test_metrics_series_wired_into_tick() {
1986 use crate::tiering::TierConfig;
1987
1988 let mut store = TieredStore::new(4096);
1989 let config = TierConfig::default();
1990
1991 for i in 0..5u128 {
1993 let key = BlockKey {
1994 tensor_id: i,
1995 block_index: 0,
1996 };
1997 store.put(key, &vec![1.0f32; 64], Tier::Tier1, 0).unwrap();
1998 }
1999
2000 assert!(store.metrics_series().is_empty());
2001
2002 store.tick(&config, 100, 1_000_000, 100);
2004 assert_eq!(store.metrics_series().len(), 1);
2005
2006 store.tick(&config, 200, 1_000_000, 100);
2008 assert_eq!(store.metrics_series().len(), 2);
2009
2010 let (ts, m) = store.metrics_series().latest().unwrap();
2012 assert_eq!(*ts, 200);
2013 assert_eq!(m.total_blocks, 5);
2014 }
2015
2016 #[test]
2024 fn bench_batch_scoring_10k() {
2025 use crate::tiering::{
2026 compute_score, compute_scores_batch, BlockMeta as TBlockMeta, Tier as TTier, TierConfig,
2027 };
2028 use std::time::Instant;
2029
2030 let cfg = TierConfig::default();
2031 let metas: Vec<TBlockMeta> = (0..10_000)
2032 .map(|i| TBlockMeta {
2033 ema_rate: (i as f32) * 0.0001,
2034 access_window: 0x5555_5555_5555_5555,
2035 last_access: 50 + (i as u64 % 100),
2036 access_count: i as u64,
2037 current_tier: TTier::Tier1,
2038 tier_since: 0,
2039 })
2040 .collect();
2041
2042 let iters = 1000;
2043
2044 let start = Instant::now();
2046 for _ in 0..iters {
2047 for m in &metas {
2048 std::hint::black_box(compute_score(&cfg, 100, m));
2049 }
2050 }
2051 let individual = start.elapsed();
2052
2053 let start = Instant::now();
2055 for _ in 0..iters {
2056 std::hint::black_box(compute_scores_batch(&cfg, 100, &metas));
2057 }
2058 let batch = start.elapsed();
2059
2060 eprintln!(
2061 "Individual scoring 10k x {iters}: {:?} ({:.0} ns/block)",
2062 individual,
2063 individual.as_nanos() as f64 / (iters * 10_000) as f64
2064 );
2065 eprintln!(
2066 "Batch scoring 10k x {iters}: {:?} ({:.0} ns/block)",
2067 batch,
2068 batch.as_nanos() as f64 / (iters * 10_000) as f64
2069 );
2070 }
2071
2072 #[test]
2073 fn bench_dequant_5bit_4096() {
2074 use std::time::Instant;
2075
2076 let data: Vec<f32> = (0..4096).map(|i| (i as f32 - 2048.0) * 0.01).collect();
2077 let (packed, scale) = quantize_block(&data, 5);
2078 let mut out = vec![0.0f32; 4096];
2079
2080 let iters = 10_000;
2081 let start = Instant::now();
2082 for _ in 0..iters {
2083 std::hint::black_box(dequantize_block(&packed, scale, 5, 4096, &mut out));
2084 }
2085 let elapsed = start.elapsed();
2086
2087 let total_bytes = 4096u64 * 4 * iters as u64;
2088 let gbs = total_bytes as f64 / elapsed.as_secs_f64() / 1e9;
2089 eprintln!(
2090 "Dequant 5-bit 4096 x {iters}: {:?} ({:.2} GB/s output throughput)",
2091 elapsed, gbs
2092 );
2093 }
2094
2095 #[test]
2096 fn bench_dequant_7bit_4096() {
2097 use std::time::Instant;
2098
2099 let data: Vec<f32> = (0..4096).map(|i| (i as f32 - 2048.0) * 0.01).collect();
2100 let (packed, scale) = quantize_block(&data, 7);
2101 let mut out = vec![0.0f32; 4096];
2102
2103 let iters = 10_000;
2104 let start = Instant::now();
2105 for _ in 0..iters {
2106 std::hint::black_box(dequantize_block(&packed, scale, 7, 4096, &mut out));
2107 }
2108 let elapsed = start.elapsed();
2109
2110 let total_bytes = 4096u64 * 4 * iters as u64;
2111 let gbs = total_bytes as f64 / elapsed.as_secs_f64() / 1e9;
2112 eprintln!(
2113 "Dequant 7-bit 4096 x {iters}: {:?} ({:.2} GB/s output throughput)",
2114 elapsed, gbs
2115 );
2116 }
2117
2118 #[test]
2119 fn bench_quant_5bit_4096() {
2120 use std::time::Instant;
2121
2122 let data: Vec<f32> = (0..4096).map(|i| (i as f32 - 2048.0) * 0.01).collect();
2123
2124 let iters = 10_000;
2125 let start = Instant::now();
2126 for _ in 0..iters {
2127 std::hint::black_box(quantize_block(&data, 5));
2128 }
2129 let elapsed = start.elapsed();
2130
2131 let total_bytes = 4096u64 * 4 * iters as u64;
2132 let gbs = total_bytes as f64 / elapsed.as_secs_f64() / 1e9;
2133 eprintln!(
2134 "Quant 5-bit 4096 x {iters}: {:?} ({:.2} GB/s input throughput)",
2135 elapsed, gbs
2136 );
2137 }
2138
2139 #[test]
2140 fn bench_svd_adaptive_64x64() {
2141 use crate::delta::FactorSet;
2142 use std::time::Instant;
2143
2144 let (rows, cols) = (64, 64);
2145 let data: Vec<f32> = (0..rows * cols)
2146 .map(|i| (i as f32 * 0.37).sin() + (i as f32 * 0.73).cos())
2147 .collect();
2148
2149 let iters = 100;
2150 let start = Instant::now();
2151 for _ in 0..iters {
2152 std::hint::black_box(FactorSet::from_data_adaptive(&data, rows, cols, 16, 0.05));
2153 }
2154 let elapsed = start.elapsed();
2155
2156 eprintln!(
2157 "SVD adaptive 64x64 (max_rank=16, target=0.05) x {iters}: {:?} ({:.2} ms/iter)",
2158 elapsed,
2159 elapsed.as_secs_f64() * 1000.0 / iters as f64
2160 );
2161 }
2162
2163 #[test]
2164 fn bench_format_report() {
2165 use crate::metrics::StoreMetrics;
2166 use std::time::Instant;
2167
2168 let m = StoreMetrics {
2169 total_blocks: 10_000,
2170 tier0_blocks: 500,
2171 tier1_blocks: 4000,
2172 tier2_blocks: 3500,
2173 tier3_blocks: 2000,
2174 tier1_bytes: 4_000_000,
2175 tier2_bytes: 2_500_000,
2176 tier3_bytes: 750_000,
2177 total_reads: 1_000_000,
2178 total_writes: 500_000,
2179 total_evictions: 5000,
2180 total_upgrades: 12_000,
2181 total_downgrades: 8000,
2182 total_reconstructions: 200,
2183 total_checksum_failures: 0,
2184 total_compactions: 150,
2185 tier_flips_last_minute: 0.023,
2186 avg_score_tier1: 0.85,
2187 avg_score_tier2: 0.45,
2188 avg_score_tier3: 0.12,
2189 };
2190
2191 let iters = 10_000;
2192 let start = Instant::now();
2193 for _ in 0..iters {
2194 std::hint::black_box(m.format_report());
2195 }
2196 let elapsed = start.elapsed();
2197
2198 eprintln!(
2199 "format_report x {iters}: {:?} ({:.0} ns/call)",
2200 elapsed,
2201 elapsed.as_nanos() as f64 / iters as f64
2202 );
2203 }
2204
2205 #[test]
2206 fn bench_format_json() {
2207 use crate::metrics::StoreMetrics;
2208 use std::time::Instant;
2209
2210 let m = StoreMetrics {
2211 total_blocks: 10_000,
2212 tier0_blocks: 500,
2213 tier1_blocks: 4000,
2214 tier2_blocks: 3500,
2215 tier3_blocks: 2000,
2216 tier1_bytes: 4_000_000,
2217 tier2_bytes: 2_500_000,
2218 tier3_bytes: 750_000,
2219 total_reads: 1_000_000,
2220 total_writes: 500_000,
2221 total_evictions: 5000,
2222 total_upgrades: 12_000,
2223 total_downgrades: 8000,
2224 total_reconstructions: 200,
2225 total_checksum_failures: 0,
2226 total_compactions: 150,
2227 tier_flips_last_minute: 0.023,
2228 avg_score_tier1: 0.85,
2229 avg_score_tier2: 0.45,
2230 avg_score_tier3: 0.12,
2231 };
2232
2233 let iters = 10_000;
2234 let start = Instant::now();
2235 for _ in 0..iters {
2236 std::hint::black_box(m.format_json());
2237 }
2238 let elapsed = start.elapsed();
2239
2240 eprintln!(
2241 "format_json x {iters}: {:?} ({:.0} ns/call)",
2242 elapsed,
2243 elapsed.as_nanos() as f64 / iters as f64
2244 );
2245 }
2246
2247 #[test]
2248 fn bench_metrics_series_trend_100() {
2249 use crate::metrics::{MetricsSeries, StoreMetrics};
2250 use std::time::Instant;
2251
2252 let mut series = MetricsSeries::new(256);
2253 for i in 0..100u64 {
2254 series.record(
2255 i,
2256 StoreMetrics {
2257 total_blocks: 1000 + i,
2258 tier1_blocks: 400 + i % 50,
2259 tier2_blocks: 350,
2260 tier3_blocks: 250,
2261 tier1_bytes: 400_000 + i * 100,
2262 tier2_bytes: 250_000,
2263 tier3_bytes: 75_000,
2264 total_evictions: i * 3,
2265 ..Default::default()
2266 },
2267 );
2268 }
2269
2270 let iters = 10_000;
2271 let start = Instant::now();
2272 for _ in 0..iters {
2273 std::hint::black_box(series.trend());
2274 }
2275 let elapsed = start.elapsed();
2276
2277 eprintln!(
2278 "MetricsSeries trend (100 snapshots) x {iters}: {:?} ({:.0} ns/call)",
2279 elapsed,
2280 elapsed.as_nanos() as f64 / iters as f64
2281 );
2282 }
2283}