1#![allow(dead_code)]
25#![allow(unused_imports)]
26
27mod sled {
38 use rusqlite::{params, Connection};
39 use std::sync::{Arc, Mutex};
40
41 #[derive(Debug, Clone)]
42 pub struct Error(pub String);
43 impl std::fmt::Display for Error {
44 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
45 write!(f, "kv error: {}", self.0)
46 }
47 }
48 impl std::error::Error for Error {}
49
50 pub type IVec = Vec<u8>;
51
52 #[derive(Clone)]
54 pub struct Db(Arc<Mutex<Connection>>);
55
56 impl std::fmt::Debug for Db {
57 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
58 f.debug_struct("Db").finish()
59 }
60 }
61
62 impl Db {
63 fn conn(&self) -> std::sync::MutexGuard<'_, Connection> {
64 self.0.lock().unwrap()
65 }
66
67 pub fn get(&self, key: &[u8]) -> Result<Option<IVec>, Error> {
68 let conn = self.conn();
69 let mut stmt = conn.prepare_cached(
70 "SELECT val FROM kv WHERE key = ?1"
71 ).map_err(|e| Error(e.to_string()))?;
72 let mut rows = stmt.query(params![key])
73 .map_err(|e| Error(e.to_string()))?;
74 match rows.next().map_err(|e| Error(e.to_string()))? {
75 Some(row) => {
76 let v: Vec<u8> = row.get(0).map_err(|e| Error(e.to_string()))?;
77 Ok(Some(v))
78 }
79 None => Ok(None),
80 }
81 }
82
83 pub fn insert(
84 &self,
85 key: impl AsRef<[u8]>,
86 value: impl Into<Vec<u8>>,
87 ) -> Result<Option<IVec>, Error> {
88 let k = key.as_ref().to_vec();
89 let v: Vec<u8> = value.into();
90 let old = self.get(&k)?;
91 self.conn()
92 .execute(
93 "INSERT OR REPLACE INTO kv (key, val) VALUES (?1, ?2)",
94 params![k, v],
95 )
96 .map_err(|e| Error(e.to_string()))?;
97 Ok(old)
98 }
99
100 pub fn remove(&self, key: impl AsRef<[u8]>) -> Result<Option<IVec>, Error> {
101 let k = key.as_ref().to_vec();
102 let old = self.get(&k)?;
103 self.conn()
104 .execute("DELETE FROM kv WHERE key = ?1", params![k])
105 .map_err(|e| Error(e.to_string()))?;
106 Ok(old)
107 }
108
109 pub fn flush(&self) -> Result<usize, Error> {
110 Ok(0) }
112
113 pub fn scan_prefix(&self, prefix: &[u8]) -> ScanIter {
114 let conn = self.conn();
115 let mut end = prefix.to_vec();
117 let pairs: Vec<(IVec, IVec)> = {
119 let mut stmt = conn.prepare(
120 "SELECT key, val FROM kv WHERE key >= ?1 ORDER BY key"
121 ).unwrap_or_else(|_| conn.prepare("SELECT key, val FROM kv").unwrap());
122 let p = prefix.to_vec();
123 stmt.query_map(params![p], |row| {
124 let k: Vec<u8> = row.get(0)?;
125 let v: Vec<u8> = row.get(1)?;
126 Ok((k, v))
127 })
128 .unwrap_or_else(|_| {
129 panic!("scan_prefix query failed")
131 })
132 .filter_map(|r| r.ok())
133 .filter(|(k, _)| k.starts_with(prefix))
134 .collect()
135 };
136 ScanIter { data: pairs, pos: 0 }
137 }
138
139 pub fn apply_batch(&self, batch: Batch) -> Result<(), Error> {
140 let conn = self.conn();
141 conn.execute_batch("BEGIN").map_err(|e| Error(e.to_string()))?;
143 for op in batch.ops {
144 match op {
145 BatchOp::Insert(k, v) => {
146 conn.execute(
147 "INSERT OR REPLACE INTO kv (key, val) VALUES (?1, ?2)",
148 params![k, v],
149 ).map_err(|e| Error(e.to_string()))?;
150 }
151 BatchOp::Remove(k) => {
152 conn.execute(
153 "DELETE FROM kv WHERE key = ?1",
154 params![k],
155 ).map_err(|e| Error(e.to_string()))?;
156 }
157 }
158 }
159 conn.execute_batch("COMMIT").map_err(|e| Error(e.to_string()))?;
160 Ok(())
161 }
162
163 pub fn clear(&self) -> Result<(), Error> {
164 self.conn()
165 .execute("DELETE FROM kv", [])
166 .map_err(|e| Error(e.to_string()))?;
167 Ok(())
168 }
169 }
170
171 enum BatchOp { Insert(Vec<u8>, Vec<u8>), Remove(Vec<u8>) }
172
173 pub struct Batch { pub ops: Vec<BatchOp> }
174 impl Default for Batch {
175 fn default() -> Self { Self { ops: Vec::new() } }
176 }
177 impl Batch {
178 pub fn insert(&mut self, key: impl AsRef<[u8]>, value: impl Into<Vec<u8>>) {
179 self.ops.push(BatchOp::Insert(key.as_ref().to_vec(), value.into()));
180 }
181 pub fn remove(&mut self, key: impl AsRef<[u8]>) {
182 self.ops.push(BatchOp::Remove(key.as_ref().to_vec()));
183 }
184 }
185
186 pub struct ScanIter { data: Vec<(IVec, IVec)>, pos: usize }
187 impl Iterator for ScanIter {
188 type Item = Result<(IVec, IVec), Error>;
189 fn next(&mut self) -> Option<Self::Item> {
190 if self.pos >= self.data.len() { return None; }
191 let pair = self.data[self.pos].clone();
192 self.pos += 1;
193 Some(Ok(pair))
194 }
195 }
196
197 pub fn open<P: AsRef<std::path::Path>>(_path: P) -> Result<Db, Error> {
198 let conn = Connection::open_in_memory()
200 .map_err(|e| Error(e.to_string()))?;
201 conn.execute_batch(
202 "PRAGMA journal_mode = WAL;
203 CREATE TABLE IF NOT EXISTS kv (key BLOB PRIMARY KEY, val BLOB NOT NULL);"
204 ).map_err(|e| Error(e.to_string()))?;
205 Ok(Db(Arc::new(Mutex::new(conn))))
206 }
207}
208
209mod lru {
211 use std::collections::{HashMap, VecDeque};
212 use std::hash::Hash;
213 use std::num::NonZeroUsize;
214
215 pub struct LruCache<K: Hash + Eq + Clone, V> {
216 cap: usize,
217 map: HashMap<K, V>,
218 order: VecDeque<K>,
219 }
220
221 impl<K: Hash + Eq + Clone, V> LruCache<K, V> {
222 pub fn new(cap: NonZeroUsize) -> Self {
223 Self {
224 cap: cap.get(),
225 map: HashMap::new(),
226 order: VecDeque::new(),
227 }
228 }
229
230 fn touch(&mut self, k: &K) {
231 if let Some(pos) = self.order.iter().position(|x| x == k) {
232 self.order.remove(pos);
233 self.order.push_back(k.clone());
234 }
235 }
236
237 fn evict_if_needed(&mut self) {
238 while self.map.len() > self.cap {
239 if let Some(old) = self.order.pop_front() {
240 self.map.remove(&old);
241 }
242 }
243 }
244
245 pub fn put(&mut self, k: K, v: V) -> Option<V> {
246 if let Some(pos) = self.order.iter().position(|x| x == &k) {
247 self.order.remove(pos);
248 }
249 self.order.push_back(k.clone());
250 let old = self.map.insert(k, v);
251 self.evict_if_needed();
252 old
253 }
254
255 pub fn get(&mut self, k: &K) -> Option<&V> {
256 if self.map.contains_key(k) {
257 self.touch(k);
258 self.map.get(k)
259 } else {
260 None
261 }
262 }
263
264 pub fn peek(&self, k: &K) -> Option<&V> {
265 self.map.get(k)
266 }
267
268 pub fn pop(&mut self, k: &K) -> Option<V> {
269 if let Some(pos) = self.order.iter().position(|x| x == k) {
270 self.order.remove(pos);
271 }
272 self.map.remove(k)
273 }
274
275 pub fn pop_lru(&mut self) -> Option<(K, V)> {
276 let k = self.order.pop_front()?;
277 let v = self.map.remove(&k)?;
278 Some((k, v))
279 }
280
281 pub fn len(&self) -> usize {
282 self.map.len()
283 }
284
285 pub fn is_empty(&self) -> bool {
286 self.map.is_empty()
287 }
288
289 pub fn contains(&self, k: &K) -> bool {
290 self.map.contains_key(k)
291 }
292 }
293}
294
295use lru::LruCache;
296use fastrand;
297
298use std::cmp::Ordering;
299use std::collections::{BinaryHeap, HashMap, HashSet};
300use std::hash::{Hash, Hasher};
301use std::io::{Cursor, Read, Write as IoWrite};
302use std::num::NonZeroUsize;
303use std::path::{Path, PathBuf};
304use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering as AtomicOrd};
305use std::sync::Arc;
306use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
307
308use anyhow::{anyhow, Context, Result};
309use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
310use bytes::{BufMut, Bytes, BytesMut};
311use log::{debug, error, info, warn};
312use parking_lot::{Mutex, RwLock};
313use serde::{Deserialize, Serialize};
314use thiserror::Error;
315use tokio::fs;
316use tokio::sync::{mpsc, oneshot, Semaphore};
317use tokio::time::{sleep, timeout};
318use uuid::Uuid;
319
320#[derive(Debug, Error)]
325pub enum AetherError {
326 #[error("IO错误: {0}")]
327 Io(#[from] std::io::Error),
328 #[error("元数据存储错误: {0}")]
329 Meta(String),
330 #[error("序列化错误: {0}")]
331 Serialization(String),
332 #[error("S3错误: {0}")]
333 S3(String),
334 #[error("操作超时: {0:?}")]
335 Timeout(Duration),
336 #[error("记忆未找到: {0}")]
337 NotFound(String),
338 #[error("维度不匹配: 期望 {expected}, 实际 {got}")]
339 DimensionMismatch { expected: usize, got: usize },
340 #[error("容量超限: 热层={hot}, 最大={max}")]
341 CapacityExceeded { hot: usize, max: usize },
342 #[error("迁移进行中: {0}")]
343 MigrationInProgress(String),
344 #[error("压缩错误: {0}")]
345 Compression(String),
346 #[error("内部状态不一致: {0}")]
347 Inconsistency(String),
348}
349
350impl From<sled::Error> for AetherError {
351 fn from(e: sled::Error) -> Self {
352 AetherError::Meta(e.to_string())
353 }
354}
355
356#[derive(Clone, Debug, Serialize, Deserialize)]
362pub struct HotConfig {
363 pub dim: usize,
365 pub max_items: usize,
367 pub search_timeout: Duration,
369 pub shard_count: usize,
371 pub max_concurrent_searches: usize,
373 pub batch_window_ms: u64,
375 pub max_batch_size: usize,
377}
378
379impl Default for HotConfig {
380 fn default() -> Self {
381 Self {
382 dim: 768,
383 max_items: 100_000,
384 search_timeout: Duration::from_millis(100),
385 shard_count: 8,
386 max_concurrent_searches: 128,
387 batch_window_ms: 5,
388 max_batch_size: 64,
389 }
390 }
391}
392
393#[derive(Clone, Debug, Serialize, Deserialize)]
395pub struct ColdConfig {
396 pub local_dir: String,
398 pub s3_bucket: Option<String>,
400 pub s3_prefix: Option<String>,
402 pub s3_region: Option<String>,
404 pub compress_level: i32,
406 pub partition_count: usize,
408 pub top_partitions: usize,
410}
411
412impl Default for ColdConfig {
413 fn default() -> Self {
414 Self {
415 local_dir: "./aether_cold".to_string(),
416 s3_bucket: None,
417 s3_prefix: None,
418 s3_region: Some("us-east-1".to_string()),
419 compress_level: 3,
420 partition_count: 256,
421 top_partitions: 8,
422 }
423 }
424}
425
426#[derive(Clone, Debug, Serialize, Deserialize)]
428pub struct FluxConfig {
429 pub initial_temperature: f32,
431 pub min_temperature: f32,
433 pub max_temperature: f32,
435 pub decay_rate: f32,
437 pub pressure_scale: f32,
439 pub tick_interval: Duration,
441 pub sample_rate: f32,
443 pub max_candidates: usize,
445 pub alpha_recency: f32,
447 pub beta_freq: f32,
449 pub gamma_importance: f32,
451 pub delta_cost: f32,
453 pub sigmoid_k: f32,
455 pub importance_decay: f32,
457 pub max_concurrent_migrations: usize,
459 pub restore_concurrency: usize,
461 pub restore_backoff_ms: u64,
463 pub restore_max_retries: u32,
465}
466
467impl Default for FluxConfig {
468 fn default() -> Self {
469 Self {
470 initial_temperature: 0.5,
471 min_temperature: 0.01,
472 max_temperature: 10.0,
473 decay_rate: 0.995,
474 pressure_scale: 2.0,
475 tick_interval: Duration::from_secs(5),
476 sample_rate: 0.02,
477 max_candidates: 256,
478 alpha_recency: 0.6,
479 beta_freq: 0.3,
480 gamma_importance: 1.0,
481 delta_cost: 0.5,
482 sigmoid_k: 1.0,
483 importance_decay: 0.995,
484 max_concurrent_migrations: 10,
485 restore_concurrency: 5,
486 restore_backoff_ms: 50,
487 restore_max_retries: 5,
488 }
489 }
490}
491
492#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
498pub enum StorageLocation {
499 Hot,
500 Local(String),
501 S3(String),
502}
503
504#[derive(Clone, Debug, Serialize, Deserialize)]
506pub struct MemoryMeta {
507 pub id: String,
508 pub location: StorageLocation,
509 pub last_access_ms: i64,
510 pub created_ms: i64,
511 pub freq: u64,
512 pub importance: f32,
513 pub cold_cost_mb: f32,
515 pub version: u64,
516 pub dimension: usize,
517}
518
519#[derive(Clone, Debug, Serialize, Deserialize)]
521pub enum MigrationState {
522 Started { target: StorageLocation },
523 Uploaded { target: StorageLocation },
524 Committed,
525 RolledBack,
526}
527
528#[derive(Clone, Debug, Serialize, Deserialize)]
530pub struct Partition {
531 pub id: usize,
532 pub centroid: Vec<f32>,
533 pub keys: Vec<String>,
534}
535
536#[derive(Clone, Debug)]
538pub struct SearchResult {
539 pub id: String,
540 pub distance: f32,
541 pub from_hot: bool,
542 pub latency: Duration,
543}
544
545#[derive(Clone, Debug, Serialize)]
547pub struct HealthStatus {
548 pub healthy: bool,
549 pub hot_items: usize,
550 pub cold_items: usize,
551 pub total_items: usize,
552 pub temperature: f32,
553 pub restore_queue_depth: usize,
554 pub pending_migrations: usize,
555 pub hot_hit_rate: f64,
556 pub avg_search_ms: f64,
557 pub p99_search_ms: f64,
558}
559
560#[derive(Clone, Debug, Serialize)]
562pub struct SystemStats {
563 pub total_items: usize,
564 pub hot_items: usize,
565 pub cold_items: usize,
566 pub avg_importance: f32,
567 pub avg_freq: u64,
568 pub metrics_snapshot: HashMap<String, u64>,
569}
570
571#[inline]
577pub fn now_ms() -> i64 {
578 SystemTime::now()
579 .duration_since(UNIX_EPOCH)
580 .unwrap_or_default()
581 .as_millis() as i64
582}
583
584#[inline]
586pub fn l2_sq(a: &[f32], b: &[f32]) -> f32 {
587 a.iter().zip(b.iter()).map(|(x, y)| {
588 let d = x - y;
589 d * d
590 }).sum()
591}
592
593#[inline]
595pub fn l2_distance(a: &[f32], b: &[f32]) -> f32 {
596 l2_sq(a, b).sqrt()
597}
598
599#[inline]
601pub fn cosine_sim(a: &[f32], b: &[f32]) -> f32 {
602 let dot: f32 = a.iter().zip(b.iter()).map(|(x, y)| x * y).sum();
603 let na: f32 = a.iter().map(|x| x * x).sum::<f32>().sqrt();
604 let nb: f32 = b.iter().map(|x| x * x).sum::<f32>().sqrt();
605 if na < 1e-9 || nb < 1e-9 {
606 0.0
607 } else {
608 (dot / (na * nb)).clamp(-1.0, 1.0)
609 }
610}
611
612fn fnv1a(s: &str) -> u64 {
614 let mut h: u64 = 0xcbf29ce484222325;
615 for b in s.bytes() {
616 h ^= b as u64;
617 h = h.wrapping_mul(0x100000001b3);
618 }
619 h
620}
621
622pub struct VectorCodec;
624
625impl VectorCodec {
626 pub fn encode(v: &[f32]) -> Vec<u8> {
627 let mut buf = BytesMut::with_capacity(4 + v.len() * 4);
628 buf.put_u32_le(v.len() as u32);
629 for &x in v {
630 buf.put_f32_le(x);
631 }
632 buf.freeze().to_vec()
633 }
634
635 pub fn decode(raw: &[u8]) -> Result<Vec<f32>> {
636 let mut cur = Cursor::new(raw);
637 let dim = cur.read_u32::<LittleEndian>()
638 .map_err(|e| AetherError::Serialization(e.to_string()))? as usize;
639 let mut v = Vec::with_capacity(dim);
640 for _ in 0..dim {
641 v.push(cur.read_f32::<LittleEndian>()
642 .map_err(|e| AetherError::Serialization(e.to_string()))?);
643 }
644 Ok(v)
645 }
646
647 pub fn compress(data: &[u8], level: i32) -> Result<Vec<u8>> {
648 zstd::encode_all(data, level)
649 .map_err(|e| AetherError::Compression(e.to_string()).into())
650 }
651
652 pub fn decompress(data: &[u8]) -> Result<Vec<u8>> {
653 zstd::decode_all(data)
654 .map_err(|e| AetherError::Compression(e.to_string()).into())
655 }
656}
657
658struct MinHeapItem {
660 dist: f32,
661 id: String,
662}
663
664impl Ord for MinHeapItem {
665 fn cmp(&self, other: &Self) -> Ordering {
666 other.dist.partial_cmp(&self.dist).unwrap_or(Ordering::Equal)
667 }
668}
669
670impl PartialOrd for MinHeapItem {
671 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
672 Some(self.cmp(other))
673 }
674}
675
676impl PartialEq for MinHeapItem {
677 fn eq(&self, other: &Self) -> bool {
678 self.dist == other.dist
679 }
680}
681
682impl Eq for MinHeapItem {}
683
684pub struct Metrics {
689 hot_hits: Arc<AtomicU64>,
690 hot_misses: Arc<AtomicU64>,
691 cold_fallbacks: Arc<AtomicU64>,
692 migrations_down: Arc<AtomicU64>,
693 migrations_up: Arc<AtomicU64>,
694 migrations_failed: Arc<AtomicU64>,
695 migrations_rollback: Arc<AtomicU64>,
696 restore_enqueued: Arc<AtomicU64>,
697 total_searches: Arc<AtomicU64>,
698 total_latency_ns: Arc<AtomicU64>,
699 feedback_applied: Arc<AtomicU64>,
700 latency_ring: Arc<Mutex<Vec<u64>>>,
702 ring_cursor: Arc<AtomicUsize>,
703 ring_capacity: usize,
704}
705
706impl Metrics {
707 pub fn new(ring_capacity: usize) -> Self {
708 Self {
709 hot_hits: Arc::new(AtomicU64::new(0)),
710 hot_misses: Arc::new(AtomicU64::new(0)),
711 cold_fallbacks: Arc::new(AtomicU64::new(0)),
712 migrations_down: Arc::new(AtomicU64::new(0)),
713 migrations_up: Arc::new(AtomicU64::new(0)),
714 migrations_failed: Arc::new(AtomicU64::new(0)),
715 migrations_rollback: Arc::new(AtomicU64::new(0)),
716 restore_enqueued: Arc::new(AtomicU64::new(0)),
717 total_searches: Arc::new(AtomicU64::new(0)),
718 total_latency_ns: Arc::new(AtomicU64::new(0)),
719 feedback_applied: Arc::new(AtomicU64::new(0)),
720 latency_ring: Arc::new(Mutex::new(vec![0u64; ring_capacity])),
721 ring_cursor: Arc::new(AtomicUsize::new(0)),
722 ring_capacity,
723 }
724 }
725
726 pub fn record_hot_hit(&self) {
727 self.hot_hits.fetch_add(1, AtomicOrd::Relaxed);
728 }
729
730 pub fn record_hot_miss(&self) {
731 self.hot_misses.fetch_add(1, AtomicOrd::Relaxed);
732 }
733
734 pub fn record_cold_fallback(&self) {
735 self.cold_fallbacks.fetch_add(1, AtomicOrd::Relaxed);
736 }
737
738 pub fn record_migration_down(&self) {
739 self.migrations_down.fetch_add(1, AtomicOrd::Relaxed);
740 }
741
742 pub fn record_migration_up(&self) {
743 self.migrations_up.fetch_add(1, AtomicOrd::Relaxed);
744 }
745
746 pub fn record_migration_failed(&self) {
747 self.migrations_failed.fetch_add(1, AtomicOrd::Relaxed);
748 }
749
750 pub fn record_migration_rollback(&self) {
751 self.migrations_rollback.fetch_add(1, AtomicOrd::Relaxed);
752 }
753
754 pub fn record_restore_enqueued(&self) {
755 self.restore_enqueued.fetch_add(1, AtomicOrd::Relaxed);
756 }
757
758 pub fn record_feedback(&self) {
759 self.feedback_applied.fetch_add(1, AtomicOrd::Relaxed);
760 }
761
762 pub fn record_search_latency(&self, ns: u64) {
763 self.total_searches.fetch_add(1, AtomicOrd::Relaxed);
764 self.total_latency_ns.fetch_add(ns, AtomicOrd::Relaxed);
765 let idx = self.ring_cursor.fetch_add(1, AtomicOrd::Relaxed) % self.ring_capacity;
767 self.latency_ring.lock()[idx] = ns;
768 }
769
770 pub fn hit_rate(&self) -> f64 {
771 let hits = self.hot_hits.load(AtomicOrd::Relaxed);
772 let total = hits + self.hot_misses.load(AtomicOrd::Relaxed);
773 if total == 0 {
774 0.0
775 } else {
776 hits as f64 / total as f64
777 }
778 }
779
780 pub fn avg_latency_ms(&self) -> f64 {
781 let n = self.total_searches.load(AtomicOrd::Relaxed);
782 if n == 0 {
783 return 0.0;
784 }
785 self.total_latency_ns.load(AtomicOrd::Relaxed) as f64 / n as f64 / 1_000_000.0
786 }
787
788 pub fn p99_latency_ms(&self) -> f64 {
789 let ring = self.latency_ring.lock();
790 let mut samples: Vec<u64> = ring.iter().copied().filter(|&v| v > 0).collect();
791 if samples.is_empty() {
792 return 0.0;
793 }
794 samples.sort_unstable();
795 let idx = ((samples.len() as f64 * 0.99) as usize).min(samples.len() - 1);
796 samples[idx] as f64 / 1_000_000.0
797 }
798
799 pub fn snapshot(&self) -> HashMap<String, u64> {
800 let mut m = HashMap::new();
801 m.insert("hot_hits".into(), self.hot_hits.load(AtomicOrd::Relaxed));
802 m.insert("hot_misses".into(), self.hot_misses.load(AtomicOrd::Relaxed));
803 m.insert("cold_fallbacks".into(), self.cold_fallbacks.load(AtomicOrd::Relaxed));
804 m.insert("migrations_down".into(), self.migrations_down.load(AtomicOrd::Relaxed));
805 m.insert("migrations_up".into(), self.migrations_up.load(AtomicOrd::Relaxed));
806 m.insert("migrations_failed".into(), self.migrations_failed.load(AtomicOrd::Relaxed));
807 m.insert("migrations_rollback".into(), self.migrations_rollback.load(AtomicOrd::Relaxed));
808 m.insert("restore_enqueued".into(), self.restore_enqueued.load(AtomicOrd::Relaxed));
809 m.insert("total_searches".into(), self.total_searches.load(AtomicOrd::Relaxed));
810 m.insert("feedback_applied".into(), self.feedback_applied.load(AtomicOrd::Relaxed));
811 m
812 }
813
814 pub fn to_prometheus(&self) -> String {
816 let snap = self.snapshot();
817 let mut lines: Vec<String> = snap.iter()
818 .map(|(k, v)| format!("aether_{} {}", k, v))
819 .collect();
820 lines.push(format!("aether_hit_rate {:.6}", self.hit_rate()));
821 lines.push(format!("aether_avg_latency_ms {:.3}", self.avg_latency_ms()));
822 lines.push(format!("aether_p99_latency_ms {:.3}", self.p99_latency_ms()));
823 lines.join("\n")
824 }
825}
826
827pub struct AkashicRecords {
832 db: sled::Db,
833}
834
835impl AkashicRecords {
836 pub fn open(path: &str) -> Result<Self> {
837 let db = sled::open(path).map_err(AetherError::from)?;
838 Ok(Self { db })
839 }
840
841 pub fn put_meta(&self, meta: &MemoryMeta) -> Result<()> {
843 let key = format!("meta:{}", meta.id);
844 let val = serde_json::to_vec(meta)
845 .map_err(|e| AetherError::Serialization(e.to_string()))?;
846 self.db.insert(key.as_bytes(), val).map_err(AetherError::from)?;
847 self.db.flush().map_err(AetherError::from)?;
848 Ok(())
849 }
850
851 pub fn get_meta(&self, id: &str) -> Result<Option<MemoryMeta>> {
852 let key = format!("meta:{}", id);
853 match self.db.get(key.as_bytes()).map_err(AetherError::from)? {
854 Some(v) => Ok(Some(serde_json::from_slice(&v)
855 .map_err(|e| AetherError::Serialization(e.to_string()))?)),
856 None => Ok(None),
857 }
858 }
859
860 pub fn remove_meta(&self, id: &str) -> Result<()> {
861 let key = format!("meta:{}", id);
862 self.db.remove(key.as_bytes()).map_err(AetherError::from)?;
863 Ok(())
864 }
865
866 pub fn commit_meta_and_clear_migration(&self, meta: &MemoryMeta) -> Result<()> {
868 let meta_key = format!("meta:{}", meta.id);
869 let mig_key = format!("mig:{}", meta.id);
870 let meta_val = serde_json::to_vec(meta)
871 .map_err(|e| AetherError::Serialization(e.to_string()))?;
872 let mut batch = sled::Batch::default();
873 batch.insert(meta_key.as_bytes(), meta_val);
874 batch.remove(mig_key.as_bytes());
875 self.db.apply_batch(batch).map_err(AetherError::from)?;
876 self.db.flush().map_err(AetherError::from)?;
877 Ok(())
878 }
879
880 pub fn update_access(&self, id: &str, now_ms: i64) -> Result<()> {
881 if let Some(mut meta) = self.get_meta(id)? {
882 meta.last_access_ms = now_ms;
883 meta.freq = meta.freq.saturating_add(1);
884 self.put_meta(&meta)?;
885 }
886 Ok(())
887 }
888
889 pub fn scan_all_metas(&self) -> Result<Vec<MemoryMeta>> {
890 let mut out = Vec::new();
891 for item in self.db.scan_prefix(b"meta:") {
892 let (_, v) = item.map_err(AetherError::from)?;
893 if let Ok(m) = serde_json::from_slice::<MemoryMeta>(&v) {
894 out.push(m);
895 }
896 }
897 Ok(out)
898 }
899
900 pub fn count_hot(&self) -> usize {
901 self.db.scan_prefix(b"meta:")
902 .filter_map(|r| r.ok())
903 .filter(|(_, v)| {
904 serde_json::from_slice::<MemoryMeta>(v)
905 .map(|m| m.location == StorageLocation::Hot)
906 .unwrap_or(false)
907 })
908 .count()
909 }
910
911 pub fn put_raw_vector(&self, id: &str, vec: &[f32]) -> Result<()> {
913 let key = format!("vec:{}", id);
914 let encoded = VectorCodec::encode(vec);
915 let compressed = VectorCodec::compress(&encoded, 3)?;
916 self.db.insert(key.as_bytes(), compressed).map_err(AetherError::from)?;
917 Ok(())
918 }
919
920 pub fn get_raw_vector(&self, id: &str) -> Result<Option<Vec<f32>>> {
921 let key = format!("vec:{}", id);
922 match self.db.get(key.as_bytes()).map_err(AetherError::from)? {
923 Some(v) => {
924 let decompressed = VectorCodec::decompress(&v)?;
925 Ok(Some(VectorCodec::decode(&decompressed)?))
926 }
927 None => Ok(None),
928 }
929 }
930
931 pub fn remove_raw_vector(&self, id: &str) -> Result<()> {
932 let key = format!("vec:{}", id);
933 self.db.remove(key.as_bytes()).map_err(AetherError::from)?;
934 Ok(())
935 }
936
937 pub fn put_migration_state(&self, id: &str, state: &MigrationState) -> Result<()> {
939 let key = format!("mig:{}", id);
940 let val = serde_json::to_vec(state)
941 .map_err(|e| AetherError::Serialization(e.to_string()))?;
942 self.db.insert(key.as_bytes(), val).map_err(AetherError::from)?;
943 self.db.flush().map_err(AetherError::from)?;
944 Ok(())
945 }
946
947 pub fn get_migration_state(&self, id: &str) -> Result<Option<MigrationState>> {
948 let key = format!("mig:{}", id);
949 match self.db.get(key.as_bytes()).map_err(AetherError::from)? {
950 Some(v) => Ok(Some(serde_json::from_slice(&v)
951 .map_err(|e| AetherError::Serialization(e.to_string()))?)),
952 None => Ok(None),
953 }
954 }
955
956 pub fn remove_migration_state(&self, id: &str) -> Result<()> {
957 let key = format!("mig:{}", id);
958 self.db.remove(key.as_bytes()).map_err(AetherError::from)?;
959 Ok(())
960 }
961
962 pub fn scan_pending_migrations(&self) -> Vec<String> {
963 self.db.scan_prefix(b"mig:")
964 .filter_map(|r| r.ok())
965 .filter_map(|(k, _)| {
966 String::from_utf8(k.to_vec()).ok()
967 .and_then(|s| s.strip_prefix("mig:").map(|id| id.to_string()))
968 })
969 .collect()
970 }
971
972 pub fn put_importance(&self, id: &str, val: f32) -> Result<()> {
974 let key = format!("imp:{}", id);
975 let encoded = val.to_le_bytes();
976 self.db.insert(key.as_bytes(), &encoded).map_err(AetherError::from)?;
977 Ok(())
978 }
979
980 pub fn get_importance(&self, id: &str) -> Result<f32> {
981 let key = format!("imp:{}", id);
982 match self.db.get(key.as_bytes()).map_err(AetherError::from)? {
983 Some(v) if v.len() >= 4 => {
984 let bytes: [u8; 4] = v[..4].try_into().unwrap_or([0u8; 4]);
985 Ok(f32::from_le_bytes(bytes))
986 }
987 _ => Ok(0.5),
988 }
989 }
990
991 pub fn remove_importance(&self, id: &str) -> Result<()> {
992 let key = format!("imp:{}", id);
993 self.db.remove(key.as_bytes()).map_err(AetherError::from)?;
994 Ok(())
995 }
996
997 pub fn scan_all_importance(&self) -> Vec<(String, f32)> {
998 self.db.scan_prefix(b"imp:")
999 .filter_map(|r| r.ok())
1000 .filter_map(|(k, v)| {
1001 let id = String::from_utf8(k.to_vec()).ok()
1002 .and_then(|s| s.strip_prefix("imp:").map(|s| s.to_string()))?;
1003 if v.len() < 4 {
1004 return None;
1005 }
1006 let bytes: [u8; 4] = v[..4].try_into().ok()?;
1007 Some((id, f32::from_le_bytes(bytes)))
1008 })
1009 .collect()
1010 }
1011
1012 pub fn put_partition(&self, p: &Partition) -> Result<()> {
1014 let key = format!("part:{}", p.id);
1015 let val = serde_json::to_vec(p)
1016 .map_err(|e| AetherError::Serialization(e.to_string()))?;
1017 self.db.insert(key.as_bytes(), val).map_err(AetherError::from)?;
1018 Ok(())
1019 }
1020
1021 pub fn load_all_partitions(&self) -> Result<Vec<Partition>> {
1022 let mut parts = Vec::new();
1023 for item in self.db.scan_prefix(b"part:") {
1024 let (_, v) = item.map_err(AetherError::from)?;
1025 if let Ok(p) = serde_json::from_slice::<Partition>(&v) {
1026 parts.push(p);
1027 }
1028 }
1029 Ok(parts)
1030 }
1031
1032 pub fn clear_all_partitions(&self) -> Result<()> {
1033 let keys: Vec<Vec<u8>> = self.db.scan_prefix(b"part:")
1034 .filter_map(|r| r.ok().map(|(k, _)| k.to_vec()))
1035 .collect();
1036 for k in keys {
1037 self.db.remove(&k).map_err(AetherError::from)?;
1038 }
1039 Ok(())
1040 }
1041}
1042
1043pub struct ColdStore {
1048 cfg: ColdConfig,
1049 s3: Option<Arc<dyn S3Backend + Send + Sync>>,
1052}
1053
1054#[async_trait::async_trait]
1056pub trait S3Backend: Send + Sync {
1057 async fn put(&self, bucket: &str, key: &str, data: Bytes) -> Result<()>;
1058 async fn get(&self, bucket: &str, key: &str) -> Result<Bytes>;
1059 async fn delete(&self, bucket: &str, key: &str) -> Result<()>;
1060 async fn exists(&self, bucket: &str, key: &str) -> bool;
1061}
1062
1063impl ColdStore {
1064 pub async fn new(cfg: ColdConfig) -> Result<Self> {
1065 std::fs::create_dir_all(&cfg.local_dir)?;
1066 Ok(Self { cfg, s3: None })
1068 }
1069
1070 pub fn with_s3(mut self, backend: Arc<dyn S3Backend + Send + Sync>) -> Self {
1072 self.s3 = Some(backend);
1073 self
1074 }
1075
1076 fn local_path(&self, id: &str) -> PathBuf {
1077 let h = fnv1a(id);
1079 let d1 = (h >> 56) & 0xFF;
1080 let d2 = (h >> 48) & 0xFF;
1081 PathBuf::from(&self.cfg.local_dir)
1082 .join(format!("{:02x}", d1))
1083 .join(format!("{:02x}", d2))
1084 .join(format!("{}.bin.zst", id))
1085 }
1086
1087 fn s3_key(&self, id: &str) -> String {
1088 match &self.cfg.s3_prefix {
1089 Some(p) => format!("{}/{}.bin.zst", p.trim_end_matches('/'), id),
1090 None => format!("{}.bin.zst", id),
1091 }
1092 }
1093
1094 pub async fn put(&self, id: &str, vec: &[f32]) -> Result<StorageLocation> {
1096 let encoded = VectorCodec::encode(vec);
1097 let compressed = VectorCodec::compress(&encoded, self.cfg.compress_level)?;
1098 let local = self.local_path(id);
1099
1100 if let Some(parent) = local.parent() {
1101 fs::create_dir_all(parent).await?;
1102 }
1103 fs::write(&local, &compressed).await?;
1104
1105 if let (Some(s3), Some(bucket)) = (&self.s3, &self.cfg.s3_bucket) {
1106 let key = self.s3_key(id);
1107 s3.put(bucket, &key, Bytes::from(compressed)).await?;
1108 return Ok(StorageLocation::S3(key));
1109 }
1110
1111 Ok(StorageLocation::Local(local.to_string_lossy().to_string()))
1112 }
1113
1114 pub async fn get(&self, id: &str) -> Result<Vec<f32>> {
1116 let local = self.local_path(id);
1117 if local.exists() {
1118 let compressed = fs::read(&local).await?;
1119 let encoded = VectorCodec::decompress(&compressed)?;
1120 return VectorCodec::decode(&encoded);
1121 }
1122
1123 if let (Some(s3), Some(bucket)) = (&self.s3, &self.cfg.s3_bucket) {
1124 let key = self.s3_key(id);
1125 let data = s3.get(bucket, &key).await?;
1126 if let Some(parent) = local.parent() {
1128 fs::create_dir_all(parent).await?;
1129 }
1130 fs::write(&local, &data).await?;
1131 let encoded = VectorCodec::decompress(&data)?;
1132 return VectorCodec::decode(&encoded);
1133 }
1134
1135 Err(AetherError::NotFound(id.to_string()).into())
1136 }
1137
1138 pub async fn delete(&self, id: &str) -> Result<()> {
1139 let local = self.local_path(id);
1140 if local.exists() {
1141 fs::remove_file(&local).await?;
1142 }
1143 if let (Some(s3), Some(bucket)) = (&self.s3, &self.cfg.s3_bucket) {
1144 let _ = s3.delete(bucket, &self.s3_key(id)).await;
1145 }
1146 Ok(())
1147 }
1148
1149 pub async fn exists(&self, id: &str) -> bool {
1150 self.local_path(id).exists()
1151 }
1152}
1153
1154struct HotShard {
1160 dim: usize,
1161 max: usize,
1162 vectors: RwLock<HashMap<String, Vec<f32>>>,
1163 lru: Mutex<LruCache<String, ()>>,
1164}
1165
1166impl HotShard {
1167 fn new(dim: usize, max: usize) -> Self {
1168 Self {
1169 dim,
1170 max,
1171 vectors: RwLock::new(HashMap::new()),
1172 lru: Mutex::new(LruCache::new(NonZeroUsize::new(max.max(1)).unwrap())),
1173 }
1174 }
1175
1176 fn add(&self, id: String, vec: Vec<f32>) -> Result<()> {
1177 if vec.len() != self.dim {
1178 return Err(AetherError::DimensionMismatch {
1179 expected: self.dim,
1180 got: vec.len(),
1181 }.into());
1182 }
1183 self.vectors.write().insert(id.clone(), vec);
1184 self.lru.lock().put(id, ());
1185 Ok(())
1186 }
1187
1188 fn remove(&self, id: &str) -> Option<Vec<f32>> {
1189 self.lru.lock().pop(&id.to_string()); self.vectors.write().remove(id)
1191 }
1192
1193 fn get(&self, id: &str) -> Option<Vec<f32>> {
1194 let v = self.vectors.read().get(id).cloned();
1195 if v.is_some() {
1196 self.lru.lock().get(&id.to_string()); }
1198 v
1199 }
1200
1201 fn touch(&self, id: &str) {
1202 self.lru.lock().get(&id.to_string()); }
1204
1205 fn contains(&self, id: &str) -> bool {
1206 self.vectors.read().contains_key(id)
1207 }
1208
1209 fn len(&self) -> usize {
1210 self.vectors.read().len()
1211 }
1212
1213 #[inline(never)]
1215 fn search(&self, query: &[f32], k: usize) -> Result<Vec<(String, f32)>> {
1216 if query.len() != self.dim {
1217 return Err(AetherError::DimensionMismatch {
1218 expected: self.dim,
1219 got: query.len(),
1220 }.into());
1221 }
1222 let vecs = self.vectors.read();
1223 let mut results: Vec<(String, f32)> = vecs.iter()
1225 .map(|(id, v)| (id.clone(), l2_distance(query, v)))
1226 .collect();
1227 results.sort_by(|a, b| a.1.partial_cmp(&b.1).unwrap_or(Ordering::Equal));
1228 results.truncate(k);
1229 Ok(results)
1230 }
1231
1232 fn evict_overflows(&self) -> Vec<String> {
1234 let mut evicted = Vec::new();
1235 let mut lru = self.lru.lock();
1236 while lru.len() > self.max {
1237 if let Some((id, _)) = lru.pop_lru() {
1238 self.vectors.write().remove(&id);
1239 evicted.push(id);
1240 } else {
1241 break;
1242 }
1243 }
1244 evicted
1245 }
1246}
1247
1248pub struct HotIndex {
1250 shards: Vec<Arc<HotShard>>,
1251 dim: usize,
1252 write_tx: mpsc::Sender<WriteBatch>,
1254}
1255
1256struct WriteBatch {
1257 items: Vec<(String, Vec<f32>)>,
1258 resp: oneshot::Sender<Result<()>>,
1259 trace_id: String,
1260}
1261
1262struct SearchBatch {
1263 query: Vec<f32>,
1264 k: usize,
1265 resp: oneshot::Sender<Result<Vec<(String, f32)>>>,
1266 priority: u8,
1267 trace_id: String,
1268}
1269
1270impl HotIndex {
1271 pub fn new(cfg: &HotConfig) -> Result<Self> {
1272 let shard_max = (cfg.max_items / cfg.shard_count).max(1);
1273 let shards: Vec<Arc<HotShard>> = (0..cfg.shard_count)
1274 .map(|_| Arc::new(HotShard::new(cfg.dim, shard_max)))
1275 .collect();
1276
1277 let (write_tx, mut write_rx) = mpsc::channel::<WriteBatch>(8192);
1278 let shards_clone = shards.clone();
1279 let dim = cfg.dim;
1280
1281 tokio::spawn(async move {
1283 while let Some(batch) = write_rx.recv().await {
1284 let result: Result<()> = (|| {
1285 for (id, vec) in batch.items {
1286 let idx = (fnv1a(&id) as usize) % shards_clone.len();
1287 shards_clone[idx].add(id, vec)?;
1288 }
1289 Ok(())
1290 })();
1291 let _ = batch.resp.send(result);
1292 }
1293 });
1294
1295 Ok(Self { shards, dim, write_tx })
1296 }
1297
1298 fn shard_for(&self, id: &str) -> &Arc<HotShard> {
1299 let idx = (fnv1a(id) as usize) % self.shards.len();
1300 &self.shards[idx]
1301 }
1302
1303 pub fn add_sync(&self, id: String, vec: Vec<f32>) -> Result<()> {
1304 self.shard_for(&id).add(id, vec)
1305 }
1306
1307 pub async fn write_batch(&self, items: Vec<(String, Vec<f32>)>, trace_id: String) -> Result<()> {
1309 let (tx, rx) = oneshot::channel();
1310 self.write_tx.send(WriteBatch { items, resp: tx, trace_id })
1311 .await
1312 .map_err(|e| anyhow!("write_tx send error: {}", e))?;
1313 rx.await.map_err(|e| anyhow!("write_tx recv error: {}", e))?
1314 }
1315
1316 pub fn remove(&self, id: &str) -> Option<Vec<f32>> {
1317 self.shard_for(id).remove(id)
1318 }
1319
1320 pub fn get(&self, id: &str) -> Option<Vec<f32>> {
1321 self.shard_for(id).get(id)
1322 }
1323
1324 pub fn touch(&self, id: &str) {
1325 self.shard_for(id).touch(id);
1326 }
1327
1328 pub fn contains(&self, id: &str) -> bool {
1329 self.shard_for(id).contains(id)
1330 }
1331
1332 pub fn len(&self) -> usize {
1333 self.shards.iter().map(|s| s.len()).sum()
1334 }
1335
1336 #[inline(never)]
1338 pub fn search(&self, query: &[f32], k: usize) -> Result<Vec<(String, f32)>> {
1339 if query.len() != self.dim {
1340 return Err(AetherError::DimensionMismatch {
1341 expected: self.dim,
1342 got: query.len(),
1343 }.into());
1344 }
1345
1346 let per_shard: Vec<Vec<(String, f32)>> = self.shards
1348 .iter()
1349 .map(|shard| shard.search(query, k).unwrap_or_default())
1350 .collect();
1351
1352 let mut merged: Vec<(String, f32)> = per_shard.into_iter().flatten().collect();
1353 merged.sort_by(|a, b| a.1.partial_cmp(&b.1).unwrap_or(Ordering::Equal));
1354 merged.truncate(k);
1355 Ok(merged)
1356 }
1357
1358 pub fn collect_evictions(&self) -> Vec<String> {
1360 self.shards.iter().flat_map(|s| s.evict_overflows()).collect()
1361 }
1362}
1363
1364pub struct NebulaIndex {
1369 partitions: RwLock<Vec<Partition>>,
1370 dim: usize,
1371 top_k: usize,
1372}
1373
1374impl NebulaIndex {
1375 pub fn new(dim: usize, top_k: usize, existing: Vec<Partition>) -> Self {
1376 Self {
1377 partitions: RwLock::new(existing),
1378 dim,
1379 top_k,
1380 }
1381 }
1382
1383 #[inline(never)]
1385 pub fn build(&self, vectors: &[(String, Vec<f32>)], k: usize) {
1386 if vectors.is_empty() {
1387 return;
1388 }
1389
1390 let k = k.min(vectors.len());
1391 let dim = vectors[0].1.len();
1392
1393 let mut centroids: Vec<Vec<f32>> = Vec::with_capacity(k);
1395 let first_idx = fastrand::usize(..vectors.len().max(1));
1397 centroids.push(vectors[first_idx].1.clone());
1398
1399 while centroids.len() < k {
1400 let dists: Vec<f32> = vectors.iter()
1402 .map(|(_, v)| centroids.iter()
1403 .map(|c| l2_sq(v, c))
1404 .fold(f32::MAX, f32::min))
1405 .collect();
1406
1407 let total: f32 = dists.iter().sum();
1408 if total < 1e-9 {
1409 break;
1410 }
1411
1412 let mut r = fastrand::f32() * total;
1413 for (i, &d) in dists.iter().enumerate() {
1414 r -= d;
1415 if r <= 0.0 {
1416 centroids.push(vectors[i].1.clone());
1417 break;
1418 }
1419 }
1420 }
1421
1422 let max_iter = 30;
1424 let mut assignments = vec![0usize; vectors.len()];
1425
1426 for _iter in 0..max_iter {
1427 let mut changed = false;
1428
1429 assignments.iter_mut().enumerate().for_each(|(i, a)| {
1431 let best = centroids.iter()
1432 .enumerate()
1433 .map(|(ci, c)| (ci, l2_sq(&vectors[i].1, c)))
1434 .min_by(|x, y| x.1.partial_cmp(&y.1).unwrap_or(Ordering::Equal))
1435 .map(|(ci, _)| ci)
1436 .unwrap_or(0);
1437 if *a != best {
1438 *a = best;
1439 }
1440 });
1441
1442 let mut sums = vec![vec![0.0f64; dim]; k];
1444 let mut cnts = vec![0usize; k];
1445
1446 for (i, &a) in assignments.iter().enumerate() {
1447 let p = a.min(k - 1);
1448 cnts[p] += 1;
1449 for (j, &v) in vectors[i].1.iter().enumerate() {
1450 sums[p][j] += v as f64;
1451 }
1452 }
1453
1454 for (ci, c) in centroids.iter_mut().enumerate() {
1455 if cnts[ci] > 0 {
1456 let new_c: Vec<f32> = sums[ci].iter().map(|&s| (s / cnts[ci] as f64) as f32).collect();
1457 if l2_sq(c, &new_c) > 1e-8 {
1458 changed = true;
1459 }
1460 *c = new_c;
1461 }
1462 }
1463
1464 if !changed {
1465 break;
1466 }
1467 }
1468
1469 let mut parts: Vec<Partition> = centroids.into_iter().enumerate()
1471 .map(|(i, c)| Partition {
1472 id: i,
1473 centroid: c,
1474 keys: Vec::new(),
1475 })
1476 .collect();
1477
1478 for (i, &a) in assignments.iter().enumerate() {
1479 let p = a.min(parts.len() - 1);
1480 parts[p].keys.push(vectors[i].0.clone());
1481 }
1482
1483 *self.partitions.write() = parts;
1484 }
1485
1486 pub fn coarse_candidates(&self, query: &[f32]) -> Vec<String> {
1488 let parts = self.partitions.read();
1489 if parts.is_empty() {
1490 return Vec::new();
1491 }
1492
1493 let mut dists: Vec<(usize, f32)> = parts.iter()
1494 .enumerate()
1495 .map(|(i, p)| (i, l2_sq(&p.centroid, query)))
1496 .collect();
1497
1498 dists.sort_by(|a, b| a.1.partial_cmp(&b.1).unwrap_or(Ordering::Equal));
1499
1500 dists.iter()
1501 .take(self.top_k)
1502 .flat_map(|(i, _)| parts[*i].keys.iter().cloned())
1503 .collect()
1504 }
1505
1506 pub fn add_to_partition(&self, id: &str, vec: &[f32]) {
1507 let mut parts = self.partitions.write();
1508 if parts.is_empty() {
1509 return;
1510 }
1511 let best = parts.iter()
1512 .enumerate()
1513 .map(|(i, p)| (i, l2_sq(&p.centroid, vec)))
1514 .min_by(|a, b| a.1.partial_cmp(&b.1).unwrap_or(Ordering::Equal))
1515 .map(|(i, _)| i)
1516 .unwrap_or(0);
1517 parts[best].keys.push(id.to_string());
1518 }
1519
1520 pub fn remove_from_partitions(&self, id: &str) {
1521 let mut parts = self.partitions.write();
1522 for p in parts.iter_mut() {
1523 p.keys.retain(|k| k != id);
1524 }
1525 }
1526
1527 pub fn snapshot(&self) -> Vec<Partition> {
1528 self.partitions.read().clone()
1529 }
1530
1531 pub fn partition_count(&self) -> usize {
1532 self.partitions.read().len()
1533 }
1534}
1535
1536pub struct Metamorphoser {
1541 akashic: Arc<AkashicRecords>,
1542 cold: Arc<ColdStore>,
1543 hot: Arc<HotIndex>,
1544 nebula: Arc<NebulaIndex>,
1545 metrics: Arc<Metrics>,
1546 locks: Arc<Mutex<HashMap<String, Arc<tokio::sync::Mutex<()>>>>>,
1548}
1549
1550impl Metamorphoser {
1551 pub fn new(
1552 akashic: Arc<AkashicRecords>,
1553 cold: Arc<ColdStore>,
1554 hot: Arc<HotIndex>,
1555 nebula: Arc<NebulaIndex>,
1556 metrics: Arc<Metrics>,
1557 ) -> Self {
1558 Self {
1559 akashic,
1560 cold,
1561 hot,
1562 nebula,
1563 metrics,
1564 locks: Arc::new(Mutex::new(HashMap::new())),
1565 }
1566 }
1567
1568 fn get_lock(&self, id: &str) -> Arc<tokio::sync::Mutex<()>> {
1569 self.locks.lock()
1570 .entry(id.to_string())
1571 .or_insert_with(|| Arc::new(tokio::sync::Mutex::new(())))
1572 .clone()
1573 }
1574
1575 pub async fn descend(&self, id: &str, trace_id: &str) -> Result<()> {
1577 let lock = self.get_lock(id);
1578 let _guard = lock.lock().await;
1579
1580 if self.akashic.get_migration_state(id)?.is_some() {
1582 return Err(AetherError::MigrationInProgress(id.to_string()).into());
1583 }
1584
1585 let meta = self.akashic.get_meta(id)?
1586 .ok_or_else(|| AetherError::NotFound(id.to_string()))?;
1587
1588 if meta.location != StorageLocation::Hot {
1589 return Ok(());
1590 }
1591
1592 let vec = self.hot.remove(id)
1593 .ok_or_else(|| AetherError::NotFound(format!("hot:{}", id)))?;
1594
1595 self.akashic.put_migration_state(id, &MigrationState::Started {
1597 target: StorageLocation::Local(String::new()),
1598 })?;
1599
1600 let location = match self.cold.put(id, &vec).await {
1602 Ok(loc) => loc,
1603 Err(e) => {
1604 let _ = self.hot.add_sync(id.to_string(), vec);
1606 let _ = self.akashic.remove_migration_state(id);
1607 self.metrics.record_migration_failed();
1608 return Err(e);
1609 }
1610 };
1611
1612 let new_meta = MemoryMeta {
1614 location: location.clone(),
1615 cold_cost_mb: vec.len() as f32 * 4.0 / 1_048_576.0,
1616 version: meta.version + 1,
1617 ..meta
1618 };
1619 self.akashic.commit_meta_and_clear_migration(&new_meta)?;
1620
1621 self.nebula.add_to_partition(id, &vec);
1623
1624 self.metrics.record_migration_down();
1625 info!("[{}] descended {} → {:?}", trace_id, id, location);
1626
1627 Ok(())
1628 }
1629
1630 pub async fn ascend(&self, id: &str, trace_id: &str) -> Result<()> {
1632 let lock = self.get_lock(id);
1633 let _guard = lock.lock().await;
1634
1635 if self.akashic.get_migration_state(id)?.is_some() {
1636 return Err(AetherError::MigrationInProgress(id.to_string()).into());
1637 }
1638
1639 let meta = self.akashic.get_meta(id)?
1640 .ok_or_else(|| AetherError::NotFound(id.to_string()))?;
1641
1642 if meta.location == StorageLocation::Hot {
1643 return Ok(());
1644 }
1645
1646 self.akashic.put_migration_state(id, &MigrationState::Started {
1648 target: StorageLocation::Hot,
1649 })?;
1650
1651 let vec = match self.cold.get(id).await {
1653 Ok(v) => v,
1654 Err(e) => {
1655 let _ = self.akashic.remove_migration_state(id);
1656 self.metrics.record_migration_failed();
1657 return Err(e);
1658 }
1659 };
1660
1661 self.hot.write_batch(vec![(id.to_string(), vec.clone())], trace_id.to_string()).await?;
1663
1664 let new_meta = MemoryMeta {
1666 location: StorageLocation::Hot,
1667 cold_cost_mb: 0.0,
1668 freq: meta.freq + 1,
1669 version: meta.version + 1,
1670 ..meta
1671 };
1672 self.akashic.commit_meta_and_clear_migration(&new_meta)?;
1673
1674 self.nebula.remove_from_partitions(id);
1676
1677 self.metrics.record_migration_up();
1678 info!("[{}] ascended {} → hot", trace_id, id);
1679
1680 Ok(())
1681 }
1682
1683 pub async fn rollback_pending(&self) -> Result<usize> {
1684 let pending = self.akashic.scan_pending_migrations();
1685 let count = pending.len();
1686
1687 for id in &pending {
1688 warn!("rolling back dangling migration: {}", id);
1689 let _ = self.akashic.remove_migration_state(id);
1691 self.metrics.record_migration_rollback();
1692 }
1693
1694 if count > 0 {
1695 info!("rolled back {} pending migrations", count);
1696 }
1697
1698 Ok(count)
1699 }
1700}
1701
1702pub struct FeedbackEngine {
1707 akashic: Arc<AkashicRecords>,
1708 decay_rate: f32,
1709 metrics: Arc<Metrics>,
1710}
1711
1712impl FeedbackEngine {
1713 pub fn new(akashic: Arc<AkashicRecords>, decay_rate: f32, metrics: Arc<Metrics>) -> Self {
1714 Self { akashic, decay_rate, metrics }
1715 }
1716
1717 pub async fn apply(&self, id: &str, reward: f32) -> Result<()> {
1719 let mut imp = self.akashic.get_importance(id)?;
1720 imp = (imp * 0.8 + reward * 0.2).clamp(0.0, 1.0);
1722 self.akashic.put_importance(id, imp)?;
1723
1724 if let Some(mut meta) = self.akashic.get_meta(id)? {
1725 meta.importance = imp;
1726 self.akashic.put_meta(&meta)?;
1727 }
1728
1729 self.metrics.record_feedback();
1730 Ok(())
1731 }
1732
1733 pub fn decay_all(&self) -> Result<()> {
1735 let entries = self.akashic.scan_all_importance();
1736 for (id, mut val) in entries {
1737 val *= self.decay_rate;
1738 if val < 0.005 {
1739 let _ = self.akashic.remove_importance(&id);
1740 } else {
1741 let _ = self.akashic.put_importance(&id, val);
1742 }
1743 }
1744 Ok(())
1745 }
1746}
1747
1748pub struct EvictionPolicy {
1753 alpha: f32, beta: f32, gamma: f32, delta: f32, }
1758
1759impl EvictionPolicy {
1760 pub fn new(alpha: f32, beta: f32, gamma: f32, delta: f32) -> Self {
1761 Self { alpha, beta, gamma, delta }
1762 }
1763
1764 pub fn score(&self, meta: &MemoryMeta) -> f32 {
1766 let age_sec = ((now_ms() - meta.last_access_ms).max(1) as f32) / 1000.0;
1767 let recency = -age_sec / 3600.0; let freq = (meta.freq as f32 + 1.0).ln(); self.alpha * recency + self.beta * freq + self.gamma * meta.importance - self.delta * meta.cold_cost_mb
1771 }
1772
1773 pub fn select_evictions(&self, metas: &[MemoryMeta], keep_count: usize) -> Vec<String> {
1774 if metas.len() <= keep_count {
1775 return Vec::new();
1776 }
1777
1778 let mut scored: Vec<(f32, String)> = metas.iter()
1779 .map(|m| (self.score(m), m.id.clone()))
1780 .collect();
1781
1782 scored.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(Ordering::Equal));
1783
1784 scored[keep_count..].iter().map(|(_, id)| id.clone()).collect()
1785 }
1786}
1787
1788fn sigmoid(x: f32, k: f32) -> f32 {
1790 1.0 / (1.0 + (-k * x).exp())
1791}
1792
1793pub struct RestoreQueue {
1798 tx: mpsc::Sender<(String, u8, String)>,
1799 depth: Arc<AtomicUsize>,
1800}
1801
1802impl RestoreQueue {
1803 pub fn new(
1804 morpher: Arc<Metamorphoser>,
1805 concurrency: usize,
1806 backoff_ms: u64,
1807 max_retries: u32,
1808 metrics: Arc<Metrics>,
1809 ) -> Self {
1810 let (tx, mut rx) = mpsc::channel::<(String, u8, String)>(4096);
1811 let depth = Arc::new(AtomicUsize::new(0));
1812 let depth_clone = depth.clone();
1813 let sem = Arc::new(Semaphore::new(concurrency.max(1)));
1814
1815 tokio::spawn(async move {
1816 while let Some((id, _priority, trace_id)) = rx.recv().await {
1817 depth_clone.fetch_sub(1, AtomicOrd::SeqCst);
1818 let permit = sem.clone().acquire_owned().await.unwrap();
1819 let m = morpher.clone();
1820 let met = metrics.clone();
1821 let id2 = id.clone();
1822 let tid = trace_id.clone();
1823
1824 tokio::spawn(async move {
1825 let mut backoff = backoff_ms;
1826 for attempt in 0..max_retries {
1827 match m.ascend(&id2, &tid).await {
1828 Ok(_) => break,
1829 Err(e) => {
1830 warn!("[{}] restore {} failed ({}/{}): {}", tid, id2, attempt+1, max_retries, e);
1831 met.record_migration_failed();
1832 if attempt + 1 < max_retries {
1833 sleep(Duration::from_millis(backoff)).await;
1834 backoff = (backoff * 2).min(10_000);
1835 } else {
1836 error!("[{}] restore {} permanently failed", tid, id2);
1837 }
1838 }
1839 }
1840 }
1841 drop(permit);
1842 });
1843 }
1844 });
1845
1846 Self { tx, depth }
1847 }
1848
1849 pub async fn enqueue(&self, id: String, priority: u8, trace_id: String) -> Result<()> {
1850 self.depth.fetch_add(1, AtomicOrd::SeqCst);
1851 self.tx.send((id, priority, trace_id))
1852 .await
1853 .map_err(|e| anyhow!("restore_queue send: {}", e))
1854 }
1855
1856 pub fn queue_depth(&self) -> usize {
1857 self.depth.load(AtomicOrd::Relaxed)
1858 }
1859}
1860
1861pub struct SloManager {
1866 search_tx: mpsc::Sender<SearchBatch>,
1867 sem: Arc<Semaphore>,
1868 cfg: HotConfig,
1869 metrics: Arc<Metrics>,
1870}
1871
1872impl SloManager {
1873 pub fn new(hot: Arc<HotIndex>, cfg: HotConfig, metrics: Arc<Metrics>) -> Self {
1874 let sem = Arc::new(Semaphore::new(cfg.max_concurrent_searches));
1875 let (tx, mut rx) = mpsc::channel::<SearchBatch>(8192);
1876
1877 let hot_clone = hot.clone();
1878 let sem_clone = sem.clone();
1879 let met_clone = metrics.clone();
1880 let win_ms = cfg.batch_window_ms;
1881 let max_batch = cfg.max_batch_size;
1882 let search_tout = cfg.search_timeout;
1883
1884 tokio::spawn(async move {
1885 loop {
1886 let first = match rx.recv().await {
1887 Some(r) => r,
1888 None => break,
1889 };
1890
1891 let mut batch = vec![first];
1892 let deadline = tokio::time::Instant::now() + Duration::from_millis(win_ms);
1894
1895 while batch.len() < max_batch {
1896 match tokio::time::timeout_at(deadline, rx.recv()).await {
1897 Ok(Some(r)) => batch.push(r),
1898 _ => break,
1899 }
1900 }
1901
1902 for req in batch {
1903 let permit = sem_clone.clone().acquire_owned().await.unwrap();
1904 let h = hot_clone.clone();
1905 let m = met_clone.clone();
1906 let timeout_dur = search_tout;
1907
1908 tokio::spawn(async move {
1909 let t0 = Instant::now();
1910 let result = tokio::time::timeout(
1911 timeout_dur,
1912 tokio::task::spawn_blocking(move || h.search(&req.query, req.k))
1913 ).await;
1914
1915 let ns = t0.elapsed().as_nanos() as u64;
1916 m.record_search_latency(ns);
1917
1918 let resp = match result {
1919 Ok(Ok(Ok(r))) => {
1920 m.record_hot_hit();
1921 Ok(r)
1922 }
1923 Ok(Ok(Err(e))) => Err(anyhow!("{}", e)),
1924 Ok(Err(join_e)) => Err(anyhow!("join: {}", join_e)),
1925 Err(_) => {
1926 Err(AetherError::Timeout(timeout_dur).into())
1927 }
1928 };
1929
1930 let _ = req.resp.send(resp);
1931 drop(permit);
1932 });
1933 }
1934 }
1935 });
1936
1937 Self { search_tx: tx, sem, cfg, metrics }
1938 }
1939
1940 pub async fn search(&self, query: Vec<f32>, k: usize, priority: u8, trace_id: String) -> Result<Vec<(String, f32)>> {
1941 let (tx, rx) = oneshot::channel();
1942 self.search_tx.send(SearchBatch { query, k, resp: tx, priority, trace_id })
1943 .await
1944 .map_err(|e| anyhow!("slo_tx: {}", e))?;
1945 rx.await.map_err(|e| anyhow!("slo_rx: {}", e))?
1946 }
1947}
1948
1949pub struct FluxEngine {
1954 cfg: FluxConfig,
1955 akashic: Arc<AkashicRecords>,
1956 morpher: Arc<Metamorphoser>,
1957 feedback: Arc<FeedbackEngine>,
1958 restore_q: Arc<RestoreQueue>,
1959 policy: EvictionPolicy,
1960 metrics: Arc<Metrics>,
1961 hot: Arc<HotIndex>,
1962 temp: Arc<tokio::sync::RwLock<f32>>,
1964 mig_sem: Arc<Semaphore>,
1966}
1967
1968impl FluxEngine {
1969 pub fn new(
1970 cfg: FluxConfig,
1971 akashic: Arc<AkashicRecords>,
1972 morpher: Arc<Metamorphoser>,
1973 feedback: Arc<FeedbackEngine>,
1974 restore_q: Arc<RestoreQueue>,
1975 metrics: Arc<Metrics>,
1976 hot: Arc<HotIndex>,
1977 ) -> Self {
1978 let policy = EvictionPolicy::new(
1979 cfg.alpha_recency,
1980 cfg.beta_freq,
1981 cfg.gamma_importance,
1982 cfg.delta_cost,
1983 );
1984
1985 let init_temp = cfg.initial_temperature;
1986 let max_mig = cfg.max_concurrent_migrations.max(1);
1987
1988 Self {
1989 cfg,
1990 akashic,
1991 morpher,
1992 feedback,
1993 restore_q,
1994 policy,
1995 metrics,
1996 hot,
1997 temp: Arc::new(tokio::sync::RwLock::new(init_temp)),
1998 mig_sem: Arc::new(Semaphore::new(max_mig)),
1999 }
2000 }
2001
2002 pub fn start(self: Arc<Self>) {
2003 let interval = self.cfg.tick_interval;
2004
2005 tokio::spawn(async move {
2006 loop {
2007 let t0 = Instant::now();
2008 if let Err(e) = self.tick().await {
2009 error!("FluxEngine tick error: {}", e);
2010 }
2011 let elapsed = t0.elapsed();
2012 if elapsed < interval {
2013 sleep(interval - elapsed).await;
2014 }
2015 }
2016 });
2017 }
2018
2019 async fn tick(&self) -> Result<()> {
2020 self.update_temperature().await;
2022 let temp = *self.temp.read().await;
2023
2024 let all_metas = self.akashic.scan_all_metas()?;
2026 let sample_count = ((all_metas.len() as f32 * self.cfg.sample_rate) as usize)
2027 .clamp(1, self.cfg.max_candidates);
2028
2029 let mut indices: Vec<usize> = (0..all_metas.len()).collect();
2031 fastrand::shuffle(&mut indices);
2032
2033 let candidates: Vec<&MemoryMeta> = indices.iter()
2034 .take(sample_count)
2035 .map(|&i| &all_metas[i])
2036 .collect();
2037
2038 for meta in candidates {
2040 let score = self.policy.score(meta);
2041 let prob = sigmoid(score, self.cfg.sigmoid_k);
2042
2043 if fastrand::f32() < prob * temp.min(1.0) {
2044 let permit = match self.mig_sem.clone().try_acquire_owned() {
2045 Ok(p) => p,
2046 Err(_) => continue, };
2048
2049 let morpher = self.morpher.clone();
2050 let restore_q = self.restore_q.clone();
2051 let id = meta.id.clone();
2052 let loc = meta.location.clone();
2053 let trace_id = Uuid::new_v4().to_string();
2054
2055 tokio::spawn(async move {
2056 match loc {
2057 StorageLocation::Hot => {
2058 let _ = morpher.descend(&id, &trace_id).await;
2059 }
2060 _ => {
2061 let _ = restore_q.enqueue(id, 0, trace_id).await;
2062 }
2063 }
2064 drop(permit);
2065 });
2066 }
2067 }
2068
2069 self.feedback.decay_all()?;
2071
2072 let evicted = self.hot.collect_evictions();
2074 for id in evicted {
2075 let morpher = self.morpher.clone();
2076 let trace_id = Uuid::new_v4().to_string();
2077 tokio::spawn(async move {
2078 let _ = morpher.descend(&id, &trace_id).await;
2079 });
2080 }
2081
2082 Ok(())
2083 }
2084
2085 async fn update_temperature(&self) {
2086 let mut t = self.temp.write().await;
2087 *t *= self.cfg.decay_rate;
2088 *t = t.max(self.cfg.min_temperature);
2089
2090 if self.detect_pressure().await {
2092 *t = (*t * self.cfg.pressure_scale).min(self.cfg.max_temperature);
2093 }
2094 }
2095
2096 async fn detect_pressure(&self) -> bool {
2097 let q_depth = self.restore_q.queue_depth();
2098 if q_depth > 100 {
2099 return true;
2100 }
2101 let hot_count = self.hot.len();
2102 if hot_count > 0 && q_depth as f64 / hot_count as f64 > 0.05 {
2103 return true;
2104 }
2105 false
2106 }
2107
2108 pub async fn set_temperature(&self, v: f32) {
2109 *self.temp.write().await = v.clamp(self.cfg.min_temperature, self.cfg.max_temperature);
2110 }
2111
2112 pub async fn get_temperature(&self) -> f32 {
2113 *self.temp.read().await
2114 }
2115}
2116
2117pub struct MemoryManager {
2122 akashic: Arc<AkashicRecords>,
2123 hot: Arc<HotIndex>,
2124 cold: Arc<ColdStore>,
2125 nebula: Arc<NebulaIndex>,
2126 morpher: Arc<Metamorphoser>,
2127 slo: Arc<SloManager>,
2128 restore_q: Arc<RestoreQueue>,
2129 flux: Arc<FluxEngine>,
2130 feedback: Arc<FeedbackEngine>,
2131 metrics: Arc<Metrics>,
2132 hot_cfg: HotConfig,
2133 cold_cfg: ColdConfig,
2134}
2135
2136impl MemoryManager {
2137 pub async fn new(
2139 hot_cfg: HotConfig,
2140 cold_cfg: ColdConfig,
2141 flux_cfg: FluxConfig,
2142 db_path: &str,
2143 ) -> Result<Arc<Self>> {
2144 let metrics = Arc::new(Metrics::new(10_000));
2145 let akashic = Arc::new(AkashicRecords::open(db_path)?);
2146 let cold = Arc::new(ColdStore::new(cold_cfg.clone()).await?);
2147
2148 let existing_parts = akashic.load_all_partitions()?;
2150 let nebula = Arc::new(NebulaIndex::new(
2151 hot_cfg.dim,
2152 cold_cfg.top_partitions,
2153 existing_parts,
2154 ));
2155
2156 let hot = Arc::new(HotIndex::new(&hot_cfg)?);
2157
2158 let morpher = Arc::new(Metamorphoser::new(
2159 akashic.clone(),
2160 cold.clone(),
2161 hot.clone(),
2162 nebula.clone(),
2163 metrics.clone(),
2164 ));
2165
2166 let rolled = morpher.rollback_pending().await?;
2168 if rolled > 0 {
2169 warn!("rolled back {} dangling migrations on startup", rolled);
2170 }
2171
2172 let feedback = Arc::new(FeedbackEngine::new(
2173 akashic.clone(),
2174 flux_cfg.importance_decay,
2175 metrics.clone(),
2176 ));
2177
2178 let restore_q = Arc::new(RestoreQueue::new(
2179 morpher.clone(),
2180 flux_cfg.restore_concurrency,
2181 flux_cfg.restore_backoff_ms,
2182 flux_cfg.restore_max_retries,
2183 metrics.clone(),
2184 ));
2185
2186 let slo = Arc::new(SloManager::new(hot.clone(), hot_cfg.clone(), metrics.clone()));
2187
2188 let flux = Arc::new(FluxEngine::new(
2189 flux_cfg,
2190 akashic.clone(),
2191 morpher.clone(),
2192 feedback.clone(),
2193 restore_q.clone(),
2194 metrics.clone(),
2195 hot.clone(),
2196 ));
2197
2198 let hot_metas: Vec<MemoryMeta> = akashic.scan_all_metas()?
2200 .into_iter()
2201 .filter(|m| m.location == StorageLocation::Hot)
2202 .collect();
2203
2204 let mut preloaded = 0usize;
2205 for m in hot_metas {
2206 if let Ok(Some(vec)) = akashic.get_raw_vector(&m.id) {
2207 if hot.add_sync(m.id.clone(), vec).is_ok() {
2208 preloaded += 1;
2209 }
2210 }
2211 }
2212
2213 if preloaded > 0 {
2214 info!("preloaded {} hot vectors from metadata store", preloaded);
2215 }
2216
2217 let mgr = Arc::new(Self {
2218 akashic,
2219 hot,
2220 cold,
2221 nebula,
2222 morpher,
2223 slo,
2224 restore_q,
2225 flux: flux.clone(),
2226 feedback,
2227 metrics,
2228 hot_cfg: hot_cfg.clone(),
2229 cold_cfg: cold_cfg.clone(),
2230 });
2231
2232 flux.start();
2234
2235 Ok(mgr)
2236 }
2237
2238 pub async fn insert(&self, id: String, vec: Vec<f32>) -> Result<()> {
2240 if vec.len() != self.hot_cfg.dim {
2241 return Err(AetherError::DimensionMismatch {
2242 expected: self.hot_cfg.dim,
2243 got: vec.len(),
2244 }.into());
2245 }
2246
2247 let trace_id = Uuid::new_v4().to_string();
2248 let now = now_ms();
2249
2250 self.akashic.put_raw_vector(&id, &vec)?;
2252
2253 let meta = MemoryMeta {
2254 id: id.clone(),
2255 location: StorageLocation::Hot,
2256 last_access_ms: now,
2257 created_ms: now,
2258 freq: 0,
2259 importance: 0.5,
2260 cold_cost_mb: 0.0,
2261 version: 1,
2262 dimension: vec.len(),
2263 };
2264
2265 self.akashic.put_meta(&meta)?;
2266 self.akashic.put_importance(&id, 0.5)?;
2267 self.hot.write_batch(vec![(id.clone(), vec)], trace_id.clone()).await?;
2268
2269 debug!("[{}] inserted {}", trace_id, id);
2270 Ok(())
2271 }
2272
2273 pub async fn insert_batch(&self, items: Vec<(String, Vec<f32>)>) -> Result<()> {
2274 let dim = self.hot_cfg.dim;
2275 for (_id, vec) in &items {
2277 if vec.len() != dim {
2278 return Err(AetherError::DimensionMismatch {
2279 expected: dim,
2280 got: vec.len(),
2281 }.into());
2282 }
2283 }
2284
2285 let trace_id = Uuid::new_v4().to_string();
2286 let now = now_ms();
2287
2288 for (id, vec) in &items {
2289 self.akashic.put_raw_vector(id, vec)?;
2290 let meta = MemoryMeta {
2291 id: id.clone(),
2292 location: StorageLocation::Hot,
2293 last_access_ms: now,
2294 created_ms: now,
2295 freq: 0,
2296 importance: 0.5,
2297 cold_cost_mb: 0.0,
2298 version: 1,
2299 dimension: vec.len(),
2300 };
2301 self.akashic.put_meta(&meta)?;
2302 self.akashic.put_importance(id, 0.5)?;
2303 }
2304
2305 self.hot.write_batch(items.clone(), trace_id.clone()).await?;
2306 info!("[{}] batch inserted {} items", trace_id, items.len());
2307 Ok(())
2308 }
2309
2310 pub async fn search(&self, query: Vec<f32>, k: usize) -> Result<Vec<SearchResult>> {
2313 let trace_id = Uuid::new_v4().to_string();
2314 let t0 = Instant::now();
2315
2316 match self.slo.search(query.clone(), k, 0, trace_id.clone()).await {
2317 Ok(results) if !results.is_empty() => {
2318 let now = now_ms();
2320 for (id, _) in &results {
2321 let _ = self.akashic.update_access(id, now);
2322 self.hot.touch(id);
2323 }
2324 let latency = t0.elapsed();
2325 return Ok(results.into_iter().map(|(id, dist)| SearchResult {
2326 id,
2327 distance: dist,
2328 from_hot: true,
2329 latency,
2330 }).collect());
2331 }
2332 _ => {}
2333 }
2334
2335 self.metrics.record_hot_miss();
2336 self.metrics.record_cold_fallback();
2337
2338 let candidates = self.nebula.coarse_candidates(&query);
2340 if candidates.is_empty() {
2341 return Ok(Vec::new());
2342 }
2343
2344 let akashic = self.akashic.clone();
2346 let q2 = query.clone();
2347
2348 let mut cold_results: Vec<(String, f32)> = tokio::task::spawn_blocking(move || {
2349 candidates.iter()
2350 .filter_map(|id| {
2351 akashic.get_raw_vector(id).ok()?.map(|v| (id.clone(), l2_distance(&q2, &v)))
2352 })
2353 .collect::<Vec<_>>()
2354 }).await.unwrap_or_default();
2355
2356 cold_results.sort_by(|a, b| a.1.partial_cmp(&b.1).unwrap_or(Ordering::Equal));
2357 cold_results.truncate(k);
2358
2359 for (id, _) in cold_results.iter().take(3) {
2361 let _ = self.restore_q.enqueue(id.clone(), 1, Uuid::new_v4().to_string()).await;
2362 self.metrics.record_restore_enqueued();
2363 }
2364
2365 let latency = t0.elapsed();
2366 Ok(cold_results.into_iter().map(|(id, dist)| SearchResult {
2367 id,
2368 distance: dist,
2369 from_hot: false,
2370 latency,
2371 }).collect())
2372 }
2373
2374 pub async fn get_by_id(&self, id: &str) -> Result<Vec<f32>> {
2376 if let Some(vec) = self.hot.get(id) {
2377 let _ = self.akashic.update_access(id, now_ms());
2378 return Ok(vec);
2379 }
2380
2381 if let Some(meta) = self.akashic.get_meta(id)? {
2382 let vec = match &meta.location {
2383 StorageLocation::Local(_) | StorageLocation::S3(_) => {
2384 self.cold.get(id).await?
2385 }
2386 StorageLocation::Hot => {
2387 return Err(AetherError::Inconsistency(
2388 format!("meta says hot but not in hot index: {}", id)
2389 ).into());
2390 }
2391 };
2392 let _ = self.akashic.update_access(id, now_ms());
2393 return Ok(vec);
2394 }
2395
2396 Err(AetherError::NotFound(id.to_string()).into())
2397 }
2398
2399 pub async fn delete(&self, id: &str) -> Result<()> {
2401 let trace_id = Uuid::new_v4().to_string();
2402
2403 if let Some(meta) = self.akashic.get_meta(id)? {
2404 match &meta.location {
2405 StorageLocation::Hot => {
2406 self.hot.remove(id);
2407 }
2408 StorageLocation::Local(_) | StorageLocation::S3(_) => {
2409 self.cold.delete(id).await?;
2410 self.nebula.remove_from_partitions(id);
2411 }
2412 }
2413 }
2414
2415 self.akashic.remove_meta(id)?;
2416 self.akashic.remove_raw_vector(id)?;
2417 self.akashic.remove_importance(id)?;
2418
2419 info!("[{}] deleted {}", trace_id, id);
2420 Ok(())
2421 }
2422
2423 pub async fn apply_feedback(&self, id: &str, reward: f32) -> Result<()> {
2425 self.feedback.apply(id, reward).await
2426 }
2427
2428 pub async fn health(&self) -> HealthStatus {
2430 let all = self.akashic.scan_all_metas().unwrap_or_default();
2431 let hot = all.iter().filter(|m| m.location == StorageLocation::Hot).count();
2432 let cold = all.len() - hot;
2433
2434 HealthStatus {
2435 healthy: true,
2436 hot_items: hot,
2437 cold_items: cold,
2438 total_items: all.len(),
2439 temperature: self.flux.get_temperature().await,
2440 restore_queue_depth: self.restore_q.queue_depth(),
2441 pending_migrations: self.akashic.scan_pending_migrations().len(),
2442 hot_hit_rate: self.metrics.hit_rate(),
2443 avg_search_ms: self.metrics.avg_latency_ms(),
2444 p99_search_ms: self.metrics.p99_latency_ms(),
2445 }
2446 }
2447
2448 pub async fn stats(&self) -> SystemStats {
2449 let metas = self.akashic.scan_all_metas().unwrap_or_default();
2450 let hot_count = metas.iter().filter(|m| m.location == StorageLocation::Hot).count();
2451 let avg_imp = if metas.is_empty() {
2452 0.0
2453 } else {
2454 metas.iter().map(|m| m.importance).sum::<f32>() / metas.len() as f32
2455 };
2456 let avg_freq = if metas.is_empty() {
2457 0
2458 } else {
2459 metas.iter().map(|m| m.freq).sum::<u64>() / metas.len() as u64
2460 };
2461
2462 SystemStats {
2463 total_items: metas.len(),
2464 hot_items: hot_count,
2465 cold_items: metas.len() - hot_count,
2466 avg_importance: avg_imp,
2467 avg_freq,
2468 metrics_snapshot: self.metrics.snapshot(),
2469 }
2470 }
2471
2472 pub async fn rebuild_nebula_index(&self) -> Result<()> {
2474 let cold_metas: Vec<MemoryMeta> = self.akashic.scan_all_metas()?
2475 .into_iter()
2476 .filter(|m| matches!(m.location, StorageLocation::Local(_) | StorageLocation::S3(_)))
2477 .collect();
2478
2479 let akashic = self.akashic.clone();
2480 let dim = self.hot_cfg.dim;
2481 let k = self.cold_cfg.partition_count;
2482
2483 let vectors: Vec<(String, Vec<f32>)> = tokio::task::spawn_blocking(move || {
2484 cold_metas.iter()
2485 .filter_map(|m| akashic.get_raw_vector(&m.id).ok()?.map(|v| (m.id.clone(), v)))
2486 .filter(|(_, v)| v.len() == dim)
2487 .collect()
2488 }).await.unwrap_or_default();
2489
2490 if vectors.is_empty() {
2491 return Ok(());
2492 }
2493
2494 self.nebula.build(&vectors, k);
2495
2496 self.akashic.clear_all_partitions()?;
2498 for part in self.nebula.snapshot() {
2499 self.akashic.put_partition(&part)?;
2500 }
2501
2502 info!("rebuilt nebula index: {} partitions over {} cold vectors", self.nebula.partition_count(), vectors.len());
2503 Ok(())
2504 }
2505
2506 pub async fn manual_descend(&self, id: &str) -> Result<()> {
2507 self.morpher.descend(id, &Uuid::new_v4().to_string()).await
2508 }
2509
2510 pub async fn manual_ascend(&self, id: &str) -> Result<()> {
2511 self.morpher.ascend(id, &Uuid::new_v4().to_string()).await
2512 }
2513
2514 pub async fn set_temperature(&self, v: f32) {
2515 self.flux.set_temperature(v).await;
2516 }
2517
2518 pub async fn get_temperature(&self) -> f32 {
2519 self.flux.get_temperature().await
2520 }
2521
2522 pub fn metrics(&self) -> &Metrics {
2523 &self.metrics
2524 }
2525}
2526
2527#[cfg(test)]
2532mod tests {
2533 use super::*;
2534
2535 #[test]
2536 fn test_vector_codec_roundtrip() {
2537 let v = vec![1.0f32, -2.5, 0.0, 3.14, -0.001];
2538 let enc = VectorCodec::encode(&v);
2539 let dec = VectorCodec::decode(&enc).unwrap();
2540 assert_eq!(v.len(), dec.len());
2541 for (a, b) in v.iter().zip(dec.iter()) {
2542 assert!((a - b).abs() < 1e-6, "codec mismatch: {} vs {}", a, b);
2543 }
2544 }
2545
2546 #[test]
2547 fn test_vector_codec_compressed() {
2548 let v = vec![0.1f32; 512];
2549 let enc = VectorCodec::encode(&v);
2550 let cmp = VectorCodec::compress(&enc, 3).unwrap();
2551 let dcm = VectorCodec::decompress(&cmp).unwrap();
2552 assert_eq!(enc, dcm);
2553 }
2554
2555 #[test]
2556 fn test_l2_distance() {
2557 let a = vec![1.0f32, 0.0, 0.0];
2558 let b = vec![0.0f32, 1.0, 0.0];
2559 let d = l2_distance(&a, &b);
2560 assert!((d - 2.0f32.sqrt()).abs() < 1e-5);
2561 }
2562
2563 #[test]
2564 fn test_cosine_sim() {
2565 let a = vec![1.0f32, 0.0, 0.0];
2566 let b = vec![1.0f32, 0.0, 0.0];
2567 assert!((cosine_sim(&a, &b) - 1.0).abs() < 1e-5);
2568 let c = vec![-1.0f32, 0.0, 0.0];
2569 assert!((cosine_sim(&a, &c) + 1.0).abs() < 1e-5);
2570 }
2571
2572 #[test]
2573 fn test_metrics_p99() {
2574 let m = Metrics::new(1000);
2575 for i in 1..=100u64 {
2576 m.record_search_latency(i * 1_000_000); }
2578 let p99 = m.p99_latency_ms();
2579 assert!(p99 >= 95.0 && p99 <= 100.0, "p99={}", p99);
2581 }
2582
2583 #[test]
2584 fn test_eviction_policy_ordering() {
2585 let policy = EvictionPolicy::new(0.6, 0.3, 1.0, 0.5);
2586 let now = now_ms();
2587
2588 let hot_meta = MemoryMeta {
2590 id: "hot".into(),
2591 location: StorageLocation::Hot,
2592 last_access_ms: now,
2593 created_ms: now - 1000,
2594 freq: 100,
2595 importance: 0.9,
2596 cold_cost_mb: 0.1,
2597 version: 1,
2598 dimension: 768,
2599 };
2600
2601 let cold_meta = MemoryMeta {
2603 id: "cold".into(),
2604 location: StorageLocation::Hot,
2605 last_access_ms: now - 7_200_000,
2606 created_ms: now - 10_000_000,
2607 freq: 1,
2608 importance: 0.1,
2609 cold_cost_mb: 0.1,
2610 version: 1,
2611 dimension: 768,
2612 };
2613
2614 let s_hot = policy.score(&hot_meta);
2615 let s_cold = policy.score(&cold_meta);
2616 assert!(s_hot > s_cold, "hot={} cold={}", s_hot, s_cold);
2617
2618 let evictions = policy.select_evictions(&[hot_meta, cold_meta], 1);
2619 assert_eq!(evictions, vec!["cold"]);
2620 }
2621
2622 #[test]
2623 fn test_hot_index_basic() {
2624 let cfg = HotConfig {
2625 dim: 4,
2626 max_items: 10,
2627 shard_count: 2,
2628 ..Default::default()
2629 };
2630 let idx = HotIndex::new(&cfg).unwrap();
2631
2632 idx.add_sync("a".into(), vec![1.0, 0.0, 0.0, 0.0]).unwrap();
2633 idx.add_sync("b".into(), vec![0.0, 1.0, 0.0, 0.0]).unwrap();
2634 idx.add_sync("c".into(), vec![0.0, 0.0, 1.0, 0.0]).unwrap();
2635
2636 assert!(idx.contains("a"));
2637 assert_eq!(idx.get("b").unwrap(), vec![0.0, 1.0, 0.0, 0.0]);
2638
2639 let results = idx.search(&[1.0, 0.0, 0.0, 0.0], 2).unwrap();
2640 assert_eq!(results.len(), 2);
2641 assert_eq!(results[0].0, "a");
2642
2643 idx.remove("b");
2644 assert!(!idx.contains("b"));
2645 }
2646
2647 #[test]
2648 fn test_nebula_index_build_and_search() {
2649 let vectors: Vec<(String, Vec<f32>)> = (0..50).map(|i| {
2650 let x = (i as f32) / 50.0;
2651 (format!("id{}", i), vec![x, 1.0 - x, x * 0.5, (1.0 - x) * 0.5])
2652 }).collect();
2653
2654 let nebula = NebulaIndex::new(4, 3, Vec::new());
2655 nebula.build(&vectors, 5);
2656 assert_eq!(nebula.partition_count(), 5);
2657
2658 let query = vec![0.0f32, 1.0, 0.0, 0.5];
2659 let candidates = nebula.coarse_candidates(&query);
2660 assert!(!candidates.is_empty(), "coarse search returned nothing");
2661 }
2662
2663 #[test]
2664 fn test_akashic_records() {
2665 let dir = format!("/tmp/aether_test_{}", fastrand::u32(..));
2666 let ak = AkashicRecords::open(&dir).unwrap();
2667
2668 let meta = MemoryMeta {
2669 id: "test_id".into(),
2670 location: StorageLocation::Hot,
2671 last_access_ms: 1000,
2672 created_ms: 500,
2673 freq: 3,
2674 importance: 0.7,
2675 cold_cost_mb: 0.0,
2676 version: 1,
2677 dimension: 128,
2678 };
2679
2680 ak.put_meta(&meta).unwrap();
2681 let got = ak.get_meta("test_id").unwrap().unwrap();
2682 assert_eq!(got.id, "test_id");
2683 assert_eq!(got.freq, 3);
2684
2685 ak.put_importance("test_id", 0.8).unwrap();
2686 let imp = ak.get_importance("test_id").unwrap();
2687 assert!((imp - 0.8).abs() < 1e-5, "importance: {}", imp);
2688
2689 ak.remove_meta("test_id").unwrap();
2690 assert!(ak.get_meta("test_id").unwrap().is_none());
2691
2692 let _ = std::fs::remove_dir_all(&dir);
2693 }
2694
2695 #[tokio::test]
2696 async fn test_memory_manager_end_to_end() {
2697 let db_dir = format!("/tmp/aether_mgr_{}", fastrand::u32(..));
2698 let cold_dir = format!("/tmp/aether_cold_{}", fastrand::u32(..));
2699
2700 let hot_cfg = HotConfig {
2701 dim: 4,
2702 max_items: 100,
2703 shard_count: 2,
2704 ..Default::default()
2705 };
2706 let cold_cfg = ColdConfig {
2707 local_dir: cold_dir.clone(),
2708 ..Default::default()
2709 };
2710 let flux_cfg = FluxConfig {
2711 tick_interval: Duration::from_secs(3600),
2712 ..Default::default()
2713 };
2714
2715 let mgr = MemoryManager::new(hot_cfg, cold_cfg, flux_cfg, &db_dir).await.unwrap();
2716
2717 mgr.insert("v1".into(), vec![1.0, 0.0, 0.0, 0.0]).await.unwrap();
2718 mgr.insert("v2".into(), vec![0.0, 1.0, 0.0, 0.0]).await.unwrap();
2719 mgr.insert("v3".into(), vec![0.0, 0.0, 1.0, 0.0]).await.unwrap();
2720
2721 let results = mgr.search(vec![1.0, 0.0, 0.0, 0.0], 2).await.unwrap();
2722 assert!(!results.is_empty());
2723 assert_eq!(results[0].id, "v1");
2724 assert!(results[0].from_hot);
2725
2726 let vec = mgr.get_by_id("v2").await.unwrap();
2727 assert_eq!(vec, vec![0.0, 1.0, 0.0, 0.0]);
2728
2729 mgr.apply_feedback("v1", 1.0).await.unwrap();
2730 let imp = mgr.akashic.get_importance("v1").unwrap();
2731 assert!(imp > 0.5, "importance should increase: {}", imp);
2732
2733 let health = mgr.health().await;
2734 assert!(health.healthy);
2735 assert_eq!(health.hot_items, 3);
2736
2737 mgr.delete("v2").await.unwrap();
2738 assert!(mgr.get_by_id("v2").await.is_err());
2739
2740 let h2 = mgr.health().await;
2741 assert_eq!(h2.total_items, 2);
2742
2743 let _ = std::fs::remove_dir_all(&db_dir);
2744 let _ = std::fs::remove_dir_all(&cold_dir);
2745 }
2746}
2747
2748#[tokio::main]
2753async fn main() -> Result<()> {
2754 env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info"))
2755 .format_timestamp_millis()
2756 .init();
2757
2758 info!("AetherMemory 启动中...");
2759
2760 let hot_cfg = HotConfig {
2761 dim: 1536,
2762 max_items: 200_000,
2763 shard_count: 16,
2764 search_timeout: Duration::from_millis(80),
2765 max_concurrent_searches: 256,
2766 batch_window_ms: 5,
2767 max_batch_size: 64,
2768 };
2769
2770 let cold_cfg = ColdConfig {
2771 local_dir: "./aether_cold".into(),
2772 s3_bucket: std::env::var("AETHER_S3_BUCKET").ok(),
2773 s3_prefix: Some("aether".into()),
2774 s3_region: std::env::var("AWS_REGION").ok().or(Some("us-east-1".into())),
2775 compress_level: 3,
2776 partition_count: 512,
2777 top_partitions: 16,
2778 };
2779
2780 let flux_cfg = FluxConfig {
2781 initial_temperature: 0.3,
2782 min_temperature: 0.005,
2783 max_temperature: 8.0,
2784 decay_rate: 0.997,
2785 pressure_scale: 2.5,
2786 tick_interval: Duration::from_secs(10),
2787 sample_rate: 0.01,
2788 max_candidates: 512,
2789 alpha_recency: 0.55,
2790 beta_freq: 0.25,
2791 gamma_importance: 1.0,
2792 delta_cost: 0.4,
2793 sigmoid_k: 1.2,
2794 importance_decay: 0.998,
2795 max_concurrent_migrations: 16,
2796 restore_concurrency: 8,
2797 restore_backoff_ms: 100,
2798 restore_max_retries: 5,
2799 };
2800
2801 let mgr = MemoryManager::new(hot_cfg, cold_cfg, flux_cfg, "./aether_meta").await?;
2802
2803 info!("系统就绪,热层维度 = 1536");
2804
2805 let probe = vec![0.1f32; 1536];
2807 mgr.insert("probe_0001".into(), probe.clone()).await?;
2808 info!("插入 probe_0001");
2809
2810 let results = mgr.search(probe, 5).await?;
2812 info!("检索返回 {} 条结果,首条ID={:?}, 来自热层={}",
2813 results.len(),
2814 results.first().map(|r| &r.id),
2815 results.first().map(|r| r.from_hot).unwrap_or(false));
2816
2817 mgr.apply_feedback("probe_0001", 0.9).await?;
2819 info!("施加正反馈到 probe_0001");
2820
2821 let h = mgr.health().await;
2823 info!("健康报告: hot={}, cold={}, temp={:.3}, hit_rate={:.2}%, p99_ms={:.2}",
2824 h.hot_items, h.cold_items, h.temperature, h.hot_hit_rate * 100.0, h.p99_search_ms);
2825
2826 info!("指标:\n{}", mgr.metrics().to_prometheus());
2828
2829 tokio::signal::ctrl_c().await?;
2831 info!("AetherMemory 关闭。");
2832
2833 Ok(())
2834}
2835
2836