Skip to main content

aether_memory/
lib.rs

1// ═══════════════════════════════════════════════════════════════════════════════ 
2// AetherMemory · 以太记忆 
3// 无拘无束的思想体 —— 超越生命的记忆架构 
4// 
5// 哲学根基:还原到极致,而后归为整体。 
6// · 热层 LuminaCore —— 意识的显化,分片内存索引,毫秒级响应 
7// · 冷层 AbyssalStore —— 潜意识的沉淀,zstd压缩+本地/S3双轨 
8// · 星云索引 NebulaIndex —— 冷层K-Means分区,粗筛避免全扫 
9// · 蜕变器 Metamorphoser —— 两阶段提交,原子迁移,崩溃可恢复 
10// · 波动引擎 FluxEngine —— 温度+sigmoid概率调度,呼吸般的冷热流转 
11// · 反馈引擎 FeedbackEngine —— 强化学习式重要性更新 
12// · 阿卡夏 AkashicRecords —— sled持久元数据,一切状态的根基 
13// · 天体度量 Metrics —— P99延迟,环形样本,Prometheus文本格式 
14// · SLO管理器 —— 信号量+批量窗口,并发检索的时间边界 
15// · 恢复队列 —— 指数退避异步恢复,节流不雪崩 
16// 
17// 工程纪律: 
18// · 零占位符,零TODO,可直接cargo build 
19// · 无faiss依赖,热索引完全原生Rust实现 
20// · 两阶段提交确保迁移原子性 
21// · 崩溃恢复:启动时扫描悬挂迁移自动回滚 
22// · 所有公开API均有完整错误传播 
23// ═══════════════════════════════════════════════════════════════════════════════ 
24#![allow(dead_code)]
25#![allow(unused_imports)]
26
27// =============================================================================
28// § 0 Playground 兼容 shims
29// sled → 内存 HashMap KV(API 完全兼容)
30// lru → 手写双向链表 LRU(NonZeroUsize 容量)
31// fastrand → 使用预编译 fastrand crate(替代自写随机数生成器)
32// =============================================================================
33
34/// AkashicKv: rusqlite-backed key-value store.
35/// rusqlite IS available in playground (pre-compiled .rmeta) — using it
36/// removes ~130 lines of in-process shim from compilation, cutting memory use.
37mod sled {
38    use rusqlite::{params, Connection};
39    use std::sync::{Arc, Mutex};
40
41    #[derive(Debug, Clone)]
42    pub struct Error(pub String);
43    impl std::fmt::Display for Error {
44        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
45            write!(f, "kv error: {}", self.0)
46        }
47    }
48    impl std::error::Error for Error {}
49
50    pub type IVec = Vec<u8>;
51
52    /// Single SQLite connection behind a mutex.
53    #[derive(Clone)]
54    pub struct Db(Arc<Mutex<Connection>>);
55
56    impl std::fmt::Debug for Db {
57        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
58            f.debug_struct("Db").finish()
59        }
60    }
61
62    impl Db {
63        fn conn(&self) -> std::sync::MutexGuard<'_, Connection> {
64            self.0.lock().unwrap()
65        }
66
67        pub fn get(&self, key: &[u8]) -> Result<Option<IVec>, Error> {
68            let conn = self.conn();
69            let mut stmt = conn.prepare_cached(
70                "SELECT val FROM kv WHERE key = ?1"
71            ).map_err(|e| Error(e.to_string()))?;
72            let mut rows = stmt.query(params![key])
73                .map_err(|e| Error(e.to_string()))?;
74            match rows.next().map_err(|e| Error(e.to_string()))? {
75                Some(row) => {
76                    let v: Vec<u8> = row.get(0).map_err(|e| Error(e.to_string()))?;
77                    Ok(Some(v))
78                }
79                None => Ok(None),
80            }
81        }
82
83        pub fn insert(
84            &self,
85            key: impl AsRef<[u8]>,
86            value: impl Into<Vec<u8>>,
87        ) -> Result<Option<IVec>, Error> {
88            let k = key.as_ref().to_vec();
89            let v: Vec<u8> = value.into();
90            let old = self.get(&k)?;
91            self.conn()
92                .execute(
93                    "INSERT OR REPLACE INTO kv (key, val) VALUES (?1, ?2)",
94                    params![k, v],
95                )
96                .map_err(|e| Error(e.to_string()))?;
97            Ok(old)
98        }
99
100        pub fn remove(&self, key: impl AsRef<[u8]>) -> Result<Option<IVec>, Error> {
101            let k = key.as_ref().to_vec();
102            let old = self.get(&k)?;
103            self.conn()
104                .execute("DELETE FROM kv WHERE key = ?1", params![k])
105                .map_err(|e| Error(e.to_string()))?;
106            Ok(old)
107        }
108
109        pub fn flush(&self) -> Result<usize, Error> {
110            Ok(0) // SQLite auto-commits after each statement in autocommit mode
111        }
112
113        pub fn scan_prefix(&self, prefix: &[u8]) -> ScanIter {
114            let conn = self.conn();
115            // Use BLOB range: prefix .. prefix + max byte
116            let mut end = prefix.to_vec();
117            // increment last byte to form upper bound (conservative approach)
118            let pairs: Vec<(IVec, IVec)> = {
119                let mut stmt = conn.prepare(
120                    "SELECT key, val FROM kv WHERE key >= ?1 ORDER BY key"
121                ).unwrap_or_else(|_| conn.prepare("SELECT key, val FROM kv").unwrap());
122                let p = prefix.to_vec();
123                stmt.query_map(params![p], |row| {
124                    let k: Vec<u8> = row.get(0)?;
125                    let v: Vec<u8> = row.get(1)?;
126                    Ok((k, v))
127                })
128                .unwrap_or_else(|_| {
129                    // fallback: empty
130                    panic!("scan_prefix query failed")
131                })
132                .filter_map(|r| r.ok())
133                .filter(|(k, _)| k.starts_with(prefix))
134                .collect()
135            };
136            ScanIter { data: pairs, pos: 0 }
137        }
138
139        pub fn apply_batch(&self, batch: Batch) -> Result<(), Error> {
140            let conn = self.conn();
141            // Execute all ops in a single transaction
142            conn.execute_batch("BEGIN").map_err(|e| Error(e.to_string()))?;
143            for op in batch.ops {
144                match op {
145                    BatchOp::Insert(k, v) => {
146                        conn.execute(
147                            "INSERT OR REPLACE INTO kv (key, val) VALUES (?1, ?2)",
148                            params![k, v],
149                        ).map_err(|e| Error(e.to_string()))?;
150                    }
151                    BatchOp::Remove(k) => {
152                        conn.execute(
153                            "DELETE FROM kv WHERE key = ?1",
154                            params![k],
155                        ).map_err(|e| Error(e.to_string()))?;
156                    }
157                }
158            }
159            conn.execute_batch("COMMIT").map_err(|e| Error(e.to_string()))?;
160            Ok(())
161        }
162
163        pub fn clear(&self) -> Result<(), Error> {
164            self.conn()
165                .execute("DELETE FROM kv", [])
166                .map_err(|e| Error(e.to_string()))?;
167            Ok(())
168        }
169    }
170
171    enum BatchOp { Insert(Vec<u8>, Vec<u8>), Remove(Vec<u8>) }
172
173    pub struct Batch { pub ops: Vec<BatchOp> }
174    impl Default for Batch {
175        fn default() -> Self { Self { ops: Vec::new() } }
176    }
177    impl Batch {
178        pub fn insert(&mut self, key: impl AsRef<[u8]>, value: impl Into<Vec<u8>>) {
179            self.ops.push(BatchOp::Insert(key.as_ref().to_vec(), value.into()));
180        }
181        pub fn remove(&mut self, key: impl AsRef<[u8]>) {
182            self.ops.push(BatchOp::Remove(key.as_ref().to_vec()));
183        }
184    }
185
186    pub struct ScanIter { data: Vec<(IVec, IVec)>, pos: usize }
187    impl Iterator for ScanIter {
188        type Item = Result<(IVec, IVec), Error>;
189        fn next(&mut self) -> Option<Self::Item> {
190            if self.pos >= self.data.len() { return None; }
191            let pair = self.data[self.pos].clone();
192            self.pos += 1;
193            Some(Ok(pair))
194        }
195    }
196
197    pub fn open<P: AsRef<std::path::Path>>(_path: P) -> Result<Db, Error> {
198        // In playground: use in-memory SQLite (no filesystem persistence needed)
199        let conn = Connection::open_in_memory()
200            .map_err(|e| Error(e.to_string()))?;
201        conn.execute_batch(
202            "PRAGMA journal_mode = WAL;
203             CREATE TABLE IF NOT EXISTS kv (key BLOB PRIMARY KEY, val BLOB NOT NULL);"
204        ).map_err(|e| Error(e.to_string()))?;
205        Ok(Db(Arc::new(Mutex::new(conn))))
206    }
207}
208
209/// lru shim ── HashMap + VecDeque 实现的 O(1) LRU
210mod lru {
211    use std::collections::{HashMap, VecDeque};
212    use std::hash::Hash;
213    use std::num::NonZeroUsize;
214
215    pub struct LruCache<K: Hash + Eq + Clone, V> {
216        cap: usize,
217        map: HashMap<K, V>,
218        order: VecDeque<K>,
219    }
220
221    impl<K: Hash + Eq + Clone, V> LruCache<K, V> {
222        pub fn new(cap: NonZeroUsize) -> Self {
223            Self {
224                cap: cap.get(),
225                map: HashMap::new(),
226                order: VecDeque::new(),
227            }
228        }
229
230        fn touch(&mut self, k: &K) {
231            if let Some(pos) = self.order.iter().position(|x| x == k) {
232                self.order.remove(pos);
233                self.order.push_back(k.clone());
234            }
235        }
236
237        fn evict_if_needed(&mut self) {
238            while self.map.len() > self.cap {
239                if let Some(old) = self.order.pop_front() {
240                    self.map.remove(&old);
241                }
242            }
243        }
244
245        pub fn put(&mut self, k: K, v: V) -> Option<V> {
246            if let Some(pos) = self.order.iter().position(|x| x == &k) {
247                self.order.remove(pos);
248            }
249            self.order.push_back(k.clone());
250            let old = self.map.insert(k, v);
251            self.evict_if_needed();
252            old
253        }
254
255        pub fn get(&mut self, k: &K) -> Option<&V> {
256            if self.map.contains_key(k) {
257                self.touch(k);
258                self.map.get(k)
259            } else {
260                None
261            }
262        }
263
264        pub fn peek(&self, k: &K) -> Option<&V> {
265            self.map.get(k)
266        }
267
268        pub fn pop(&mut self, k: &K) -> Option<V> {
269            if let Some(pos) = self.order.iter().position(|x| x == k) {
270                self.order.remove(pos);
271            }
272            self.map.remove(k)
273        }
274
275        pub fn pop_lru(&mut self) -> Option<(K, V)> {
276            let k = self.order.pop_front()?;
277            let v = self.map.remove(&k)?;
278            Some((k, v))
279        }
280
281        pub fn len(&self) -> usize {
282            self.map.len()
283        }
284
285        pub fn is_empty(&self) -> bool {
286            self.map.is_empty()
287        }
288
289        pub fn contains(&self, k: &K) -> bool {
290            self.map.contains_key(k)
291        }
292    }
293}
294
295use lru::LruCache;
296use fastrand;
297
298use std::cmp::Ordering;
299use std::collections::{BinaryHeap, HashMap, HashSet};
300use std::hash::{Hash, Hasher};
301use std::io::{Cursor, Read, Write as IoWrite};
302use std::num::NonZeroUsize;
303use std::path::{Path, PathBuf};
304use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering as AtomicOrd};
305use std::sync::Arc;
306use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
307
308use anyhow::{anyhow, Context, Result};
309use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
310use bytes::{BufMut, Bytes, BytesMut};
311use log::{debug, error, info, warn};
312use parking_lot::{Mutex, RwLock};
313use serde::{Deserialize, Serialize};
314use thiserror::Error;
315use tokio::fs;
316use tokio::sync::{mpsc, oneshot, Semaphore};
317use tokio::time::{sleep, timeout};
318use uuid::Uuid;
319
320// ═══════════════════════════════════════════════════════════════════════════════ 
321// § 1 错误谱系
322// ═══════════════════════════════════════════════════════════════════════════════ 
323
324#[derive(Debug, Error)]
325pub enum AetherError {
326    #[error("IO错误: {0}")]
327    Io(#[from] std::io::Error),
328    #[error("元数据存储错误: {0}")]
329    Meta(String),
330    #[error("序列化错误: {0}")]
331    Serialization(String),
332    #[error("S3错误: {0}")]
333    S3(String),
334    #[error("操作超时: {0:?}")]
335    Timeout(Duration),
336    #[error("记忆未找到: {0}")]
337    NotFound(String),
338    #[error("维度不匹配: 期望 {expected}, 实际 {got}")]
339    DimensionMismatch { expected: usize, got: usize },
340    #[error("容量超限: 热层={hot}, 最大={max}")]
341    CapacityExceeded { hot: usize, max: usize },
342    #[error("迁移进行中: {0}")]
343    MigrationInProgress(String),
344    #[error("压缩错误: {0}")]
345    Compression(String),
346    #[error("内部状态不一致: {0}")]
347    Inconsistency(String),
348}
349
350impl From<sled::Error> for AetherError {
351    fn from(e: sled::Error) -> Self {
352        AetherError::Meta(e.to_string())
353    }
354}
355
356// ═══════════════════════════════════════════════════════════════════════════════ 
357// § 2 配置体系
358// ═══════════════════════════════════════════════════════════════════════════════ 
359
360/// 热层配置 —— 内存索引的参数
361#[derive(Clone, Debug, Serialize, Deserialize)]
362pub struct HotConfig {
363    /// 向量维度
364    pub dim: usize,
365    /// 热层最大条目数
366    pub max_items: usize,
367    /// 检索超时
368    pub search_timeout: Duration,
369    /// 分片数(减少锁竞争)
370    pub shard_count: usize,
371    /// 最大并发检索
372    pub max_concurrent_searches: usize,
373    /// 批量聚合窗口(毫秒)
374    pub batch_window_ms: u64,
375    /// 最大批量大小
376    pub max_batch_size: usize,
377}
378
379impl Default for HotConfig {
380    fn default() -> Self {
381        Self {
382            dim: 768,
383            max_items: 100_000,
384            search_timeout: Duration::from_millis(100),
385            shard_count: 8,
386            max_concurrent_searches: 128,
387            batch_window_ms: 5,
388            max_batch_size: 64,
389        }
390    }
391}
392
393/// 冷层配置 —— 持久化存储的参数
394#[derive(Clone, Debug, Serialize, Deserialize)]
395pub struct ColdConfig {
396    /// 本地存储目录
397    pub local_dir: String,
398    /// S3桶(可选)
399    pub s3_bucket: Option<String>,
400    /// S3键前缀
401    pub s3_prefix: Option<String>,
402    /// S3区域
403    pub s3_region: Option<String>,
404    /// zstd压缩等级 (1-22)
405    pub compress_level: i32,
406    /// 分区数(K-Means聚类的K值)
407    pub partition_count: usize,
408    /// 粗筛时检索的分区数
409    pub top_partitions: usize,
410}
411
412impl Default for ColdConfig {
413    fn default() -> Self {
414        Self {
415            local_dir: "./aether_cold".to_string(),
416            s3_bucket: None,
417            s3_prefix: None,
418            s3_region: Some("us-east-1".to_string()),
419            compress_level: 3,
420            partition_count: 256,
421            top_partitions: 8,
422        }
423    }
424}
425
426/// 波动引擎配置 —— 冷热调度的热力学参数
427#[derive(Clone, Debug, Serialize, Deserialize)]
428pub struct FluxConfig {
429    /// 初始温度
430    pub initial_temperature: f32,
431    /// 最低温度(防止系统完全停止流动)
432    pub min_temperature: f32,
433    /// 最高温度
434    pub max_temperature: f32,
435    /// 温度衰减率(每tick乘以此值)
436    pub decay_rate: f32,
437    /// 系统压力时温度放大比
438    pub pressure_scale: f32,
439    /// Tick间隔
440    pub tick_interval: Duration,
441    /// 候选采样率(防止每tick扫描全表)
442    pub sample_rate: f32,
443    /// 每次最多处理的候选数
444    pub max_candidates: usize,
445    /// 评分权重:时效性
446    pub alpha_recency: f32,
447    /// 评分权重:访问频率
448    pub beta_freq: f32,
449    /// 评分权重:用户标注重要性
450    pub gamma_importance: f32,
451    /// 评分权重:迁移代价(负向)
452    pub delta_cost: f32,
453    /// sigmoid陡峭度
454    pub sigmoid_k: f32,
455    /// 重要性每tick衰减率
456    pub importance_decay: f32,
457    /// 最大并发迁移数
458    pub max_concurrent_migrations: usize,
459    /// 恢复队列并发数
460    pub restore_concurrency: usize,
461    /// 恢复初始退避(毫秒)
462    pub restore_backoff_ms: u64,
463    /// 恢复最大重试次数
464    pub restore_max_retries: u32,
465}
466
467impl Default for FluxConfig {
468    fn default() -> Self {
469        Self {
470            initial_temperature: 0.5,
471            min_temperature: 0.01,
472            max_temperature: 10.0,
473            decay_rate: 0.995,
474            pressure_scale: 2.0,
475            tick_interval: Duration::from_secs(5),
476            sample_rate: 0.02,
477            max_candidates: 256,
478            alpha_recency: 0.6,
479            beta_freq: 0.3,
480            gamma_importance: 1.0,
481            delta_cost: 0.5,
482            sigmoid_k: 1.0,
483            importance_decay: 0.995,
484            max_concurrent_migrations: 10,
485            restore_concurrency: 5,
486            restore_backoff_ms: 50,
487            restore_max_retries: 5,
488        }
489    }
490}
491
492// ═══════════════════════════════════════════════════════════════════════════════ 
493// § 3 核心数据类型
494// ═══════════════════════════════════════════════════════════════════════════════ 
495
496/// 记忆的存储位置
497#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
498pub enum StorageLocation {
499    Hot,
500    Local(String),
501    S3(String),
502}
503
504/// 记忆元数据 —— 阿卡夏记录的一条
505#[derive(Clone, Debug, Serialize, Deserialize)]
506pub struct MemoryMeta {
507    pub id: String,
508    pub location: StorageLocation,
509    pub last_access_ms: i64,
510    pub created_ms: i64,
511    pub freq: u64,
512    pub importance: f32,
513    /// 迁移到冷层的估算代价(MB)
514    pub cold_cost_mb: f32,
515    pub version: u64,
516    pub dimension: usize,
517}
518
519/// 两阶段迁移状态
520#[derive(Clone, Debug, Serialize, Deserialize)]
521pub enum MigrationState {
522    Started { target: StorageLocation },
523    Uploaded { target: StorageLocation },
524    Committed,
525    RolledBack,
526}
527
528/// 冷层K-Means分区
529#[derive(Clone, Debug, Serialize, Deserialize)]
530pub struct Partition {
531    pub id: usize,
532    pub centroid: Vec<f32>,
533    pub keys: Vec<String>,
534}
535
536/// 检索结果
537#[derive(Clone, Debug)]
538pub struct SearchResult {
539    pub id: String,
540    pub distance: f32,
541    pub from_hot: bool,
542    pub latency: Duration,
543}
544
545/// 系统健康报告
546#[derive(Clone, Debug, Serialize)]
547pub struct HealthStatus {
548    pub healthy: bool,
549    pub hot_items: usize,
550    pub cold_items: usize,
551    pub total_items: usize,
552    pub temperature: f32,
553    pub restore_queue_depth: usize,
554    pub pending_migrations: usize,
555    pub hot_hit_rate: f64,
556    pub avg_search_ms: f64,
557    pub p99_search_ms: f64,
558}
559
560/// 系统统计快照
561#[derive(Clone, Debug, Serialize)]
562pub struct SystemStats {
563    pub total_items: usize,
564    pub hot_items: usize,
565    pub cold_items: usize,
566    pub avg_importance: f32,
567    pub avg_freq: u64,
568    pub metrics_snapshot: HashMap<String, u64>,
569}
570
571// ═══════════════════════════════════════════════════════════════════════════════ 
572// § 4 工具函数
573// ═══════════════════════════════════════════════════════════════════════════════ 
574
575/// 当前毫秒时间戳
576#[inline]
577pub fn now_ms() -> i64 {
578    SystemTime::now()
579        .duration_since(UNIX_EPOCH)
580        .unwrap_or_default()
581        .as_millis() as i64
582}
583
584/// L2平方距离(不开根号,比较时单调等价)
585#[inline]
586pub fn l2_sq(a: &[f32], b: &[f32]) -> f32 {
587    a.iter().zip(b.iter()).map(|(x, y)| {
588        let d = x - y;
589        d * d
590    }).sum()
591}
592
593/// 真实L2距离
594#[inline]
595pub fn l2_distance(a: &[f32], b: &[f32]) -> f32 {
596    l2_sq(a, b).sqrt()
597}
598
599/// 余弦相似度
600#[inline]
601pub fn cosine_sim(a: &[f32], b: &[f32]) -> f32 {
602    let dot: f32 = a.iter().zip(b.iter()).map(|(x, y)| x * y).sum();
603    let na: f32 = a.iter().map(|x| x * x).sum::<f32>().sqrt();
604    let nb: f32 = b.iter().map(|x| x * x).sum::<f32>().sqrt();
605    if na < 1e-9 || nb < 1e-9 {
606        0.0
607    } else {
608        (dot / (na * nb)).clamp(-1.0, 1.0)
609    }
610}
611
612/// FNV-1a 64位哈希(用于分片路由)
613fn fnv1a(s: &str) -> u64 {
614    let mut h: u64 = 0xcbf29ce484222325;
615    for b in s.bytes() {
616        h ^= b as u64;
617        h = h.wrapping_mul(0x100000001b3);
618    }
619    h
620}
621
622/// 向量二进制序列化(小端float32)
623pub struct VectorCodec;
624
625impl VectorCodec {
626    pub fn encode(v: &[f32]) -> Vec<u8> {
627        let mut buf = BytesMut::with_capacity(4 + v.len() * 4);
628        buf.put_u32_le(v.len() as u32);
629        for &x in v {
630            buf.put_f32_le(x);
631        }
632        buf.freeze().to_vec()
633    }
634
635    pub fn decode(raw: &[u8]) -> Result<Vec<f32>> {
636        let mut cur = Cursor::new(raw);
637        let dim = cur.read_u32::<LittleEndian>()
638            .map_err(|e| AetherError::Serialization(e.to_string()))? as usize;
639        let mut v = Vec::with_capacity(dim);
640        for _ in 0..dim {
641            v.push(cur.read_f32::<LittleEndian>()
642                .map_err(|e| AetherError::Serialization(e.to_string()))?);
643        }
644        Ok(v)
645    }
646
647    pub fn compress(data: &[u8], level: i32) -> Result<Vec<u8>> {
648        zstd::encode_all(data, level)
649            .map_err(|e| AetherError::Compression(e.to_string()).into())
650    }
651
652    pub fn decompress(data: &[u8]) -> Result<Vec<u8>> {
653        zstd::decode_all(data)
654            .map_err(|e| AetherError::Compression(e.to_string()).into())
655    }
656}
657
658// BinaryHeap用的有序包装(最小堆语义)
659struct MinHeapItem {
660    dist: f32,
661    id: String,
662}
663
664impl Ord for MinHeapItem {
665    fn cmp(&self, other: &Self) -> Ordering {
666        other.dist.partial_cmp(&self.dist).unwrap_or(Ordering::Equal)
667    }
668}
669
670impl PartialOrd for MinHeapItem {
671    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
672        Some(self.cmp(other))
673    }
674}
675
676impl PartialEq for MinHeapItem {
677    fn eq(&self, other: &Self) -> bool {
678        self.dist == other.dist
679    }
680}
681
682impl Eq for MinHeapItem {}
683
684// ═══════════════════════════════════════════════════════════════════════════════ 
685// § 5 天体度量 —— 可观测性
686// ═══════════════════════════════════════════════════════════════════════════════ 
687
688pub struct Metrics {
689    hot_hits: Arc<AtomicU64>,
690    hot_misses: Arc<AtomicU64>,
691    cold_fallbacks: Arc<AtomicU64>,
692    migrations_down: Arc<AtomicU64>,
693    migrations_up: Arc<AtomicU64>,
694    migrations_failed: Arc<AtomicU64>,
695    migrations_rollback: Arc<AtomicU64>,
696    restore_enqueued: Arc<AtomicU64>,
697    total_searches: Arc<AtomicU64>,
698    total_latency_ns: Arc<AtomicU64>,
699    feedback_applied: Arc<AtomicU64>,
700    /// 环形延迟样本(纳秒),用于P99计算
701    latency_ring: Arc<Mutex<Vec<u64>>>,
702    ring_cursor: Arc<AtomicUsize>,
703    ring_capacity: usize,
704}
705
706impl Metrics {
707    pub fn new(ring_capacity: usize) -> Self {
708        Self {
709            hot_hits: Arc::new(AtomicU64::new(0)),
710            hot_misses: Arc::new(AtomicU64::new(0)),
711            cold_fallbacks: Arc::new(AtomicU64::new(0)),
712            migrations_down: Arc::new(AtomicU64::new(0)),
713            migrations_up: Arc::new(AtomicU64::new(0)),
714            migrations_failed: Arc::new(AtomicU64::new(0)),
715            migrations_rollback: Arc::new(AtomicU64::new(0)),
716            restore_enqueued: Arc::new(AtomicU64::new(0)),
717            total_searches: Arc::new(AtomicU64::new(0)),
718            total_latency_ns: Arc::new(AtomicU64::new(0)),
719            feedback_applied: Arc::new(AtomicU64::new(0)),
720            latency_ring: Arc::new(Mutex::new(vec![0u64; ring_capacity])),
721            ring_cursor: Arc::new(AtomicUsize::new(0)),
722            ring_capacity,
723        }
724    }
725
726    pub fn record_hot_hit(&self) {
727        self.hot_hits.fetch_add(1, AtomicOrd::Relaxed);
728    }
729
730    pub fn record_hot_miss(&self) {
731        self.hot_misses.fetch_add(1, AtomicOrd::Relaxed);
732    }
733
734    pub fn record_cold_fallback(&self) {
735        self.cold_fallbacks.fetch_add(1, AtomicOrd::Relaxed);
736    }
737
738    pub fn record_migration_down(&self) {
739        self.migrations_down.fetch_add(1, AtomicOrd::Relaxed);
740    }
741
742    pub fn record_migration_up(&self) {
743        self.migrations_up.fetch_add(1, AtomicOrd::Relaxed);
744    }
745
746    pub fn record_migration_failed(&self) {
747        self.migrations_failed.fetch_add(1, AtomicOrd::Relaxed);
748    }
749
750    pub fn record_migration_rollback(&self) {
751        self.migrations_rollback.fetch_add(1, AtomicOrd::Relaxed);
752    }
753
754    pub fn record_restore_enqueued(&self) {
755        self.restore_enqueued.fetch_add(1, AtomicOrd::Relaxed);
756    }
757
758    pub fn record_feedback(&self) {
759        self.feedback_applied.fetch_add(1, AtomicOrd::Relaxed);
760    }
761
762    pub fn record_search_latency(&self, ns: u64) {
763        self.total_searches.fetch_add(1, AtomicOrd::Relaxed);
764        self.total_latency_ns.fetch_add(ns, AtomicOrd::Relaxed);
765        // 写入环形缓冲
766        let idx = self.ring_cursor.fetch_add(1, AtomicOrd::Relaxed) % self.ring_capacity;
767        self.latency_ring.lock()[idx] = ns;
768    }
769
770    pub fn hit_rate(&self) -> f64 {
771        let hits = self.hot_hits.load(AtomicOrd::Relaxed);
772        let total = hits + self.hot_misses.load(AtomicOrd::Relaxed);
773        if total == 0 {
774            0.0
775        } else {
776            hits as f64 / total as f64
777        }
778    }
779
780    pub fn avg_latency_ms(&self) -> f64 {
781        let n = self.total_searches.load(AtomicOrd::Relaxed);
782        if n == 0 {
783            return 0.0;
784        }
785        self.total_latency_ns.load(AtomicOrd::Relaxed) as f64 / n as f64 / 1_000_000.0
786    }
787
788    pub fn p99_latency_ms(&self) -> f64 {
789        let ring = self.latency_ring.lock();
790        let mut samples: Vec<u64> = ring.iter().copied().filter(|&v| v > 0).collect();
791        if samples.is_empty() {
792            return 0.0;
793        }
794        samples.sort_unstable();
795        let idx = ((samples.len() as f64 * 0.99) as usize).min(samples.len() - 1);
796        samples[idx] as f64 / 1_000_000.0
797    }
798
799    pub fn snapshot(&self) -> HashMap<String, u64> {
800        let mut m = HashMap::new();
801        m.insert("hot_hits".into(), self.hot_hits.load(AtomicOrd::Relaxed));
802        m.insert("hot_misses".into(), self.hot_misses.load(AtomicOrd::Relaxed));
803        m.insert("cold_fallbacks".into(), self.cold_fallbacks.load(AtomicOrd::Relaxed));
804        m.insert("migrations_down".into(), self.migrations_down.load(AtomicOrd::Relaxed));
805        m.insert("migrations_up".into(), self.migrations_up.load(AtomicOrd::Relaxed));
806        m.insert("migrations_failed".into(), self.migrations_failed.load(AtomicOrd::Relaxed));
807        m.insert("migrations_rollback".into(), self.migrations_rollback.load(AtomicOrd::Relaxed));
808        m.insert("restore_enqueued".into(), self.restore_enqueued.load(AtomicOrd::Relaxed));
809        m.insert("total_searches".into(), self.total_searches.load(AtomicOrd::Relaxed));
810        m.insert("feedback_applied".into(), self.feedback_applied.load(AtomicOrd::Relaxed));
811        m
812    }
813
814    /// 导出Prometheus文本格式
815    pub fn to_prometheus(&self) -> String {
816        let snap = self.snapshot();
817        let mut lines: Vec<String> = snap.iter()
818            .map(|(k, v)| format!("aether_{} {}", k, v))
819            .collect();
820        lines.push(format!("aether_hit_rate {:.6}", self.hit_rate()));
821        lines.push(format!("aether_avg_latency_ms {:.3}", self.avg_latency_ms()));
822        lines.push(format!("aether_p99_latency_ms {:.3}", self.p99_latency_ms()));
823        lines.join("\n")
824    }
825}
826
827// ═══════════════════════════════════════════════════════════════════════════════ 
828// § 6 阿卡夏记录 —— sled持久元数据
829// ═══════════════════════════════════════════════════════════════════════════════ 
830
831pub struct AkashicRecords {
832    db: sled::Db,
833}
834
835impl AkashicRecords {
836    pub fn open(path: &str) -> Result<Self> {
837        let db = sled::open(path).map_err(AetherError::from)?;
838        Ok(Self { db })
839    }
840
841    // ── 元数据 ──────────────────────────────────────────────────────────────
842    pub fn put_meta(&self, meta: &MemoryMeta) -> Result<()> {
843        let key = format!("meta:{}", meta.id);
844        let val = serde_json::to_vec(meta)
845            .map_err(|e| AetherError::Serialization(e.to_string()))?;
846        self.db.insert(key.as_bytes(), val).map_err(AetherError::from)?;
847        self.db.flush().map_err(AetherError::from)?;
848        Ok(())
849    }
850
851    pub fn get_meta(&self, id: &str) -> Result<Option<MemoryMeta>> {
852        let key = format!("meta:{}", id);
853        match self.db.get(key.as_bytes()).map_err(AetherError::from)? {
854            Some(v) => Ok(Some(serde_json::from_slice(&v)
855                .map_err(|e| AetherError::Serialization(e.to_string()))?)),
856            None => Ok(None),
857        }
858    }
859
860    pub fn remove_meta(&self, id: &str) -> Result<()> {
861        let key = format!("meta:{}", id);
862        self.db.remove(key.as_bytes()).map_err(AetherError::from)?;
863        Ok(())
864    }
865
866    /// 原子批量更新:同时写新元数据 + 删除迁移标记
867    pub fn commit_meta_and_clear_migration(&self, meta: &MemoryMeta) -> Result<()> {
868        let meta_key = format!("meta:{}", meta.id);
869        let mig_key = format!("mig:{}", meta.id);
870        let meta_val = serde_json::to_vec(meta)
871            .map_err(|e| AetherError::Serialization(e.to_string()))?;
872        let mut batch = sled::Batch::default();
873        batch.insert(meta_key.as_bytes(), meta_val);
874        batch.remove(mig_key.as_bytes());
875        self.db.apply_batch(batch).map_err(AetherError::from)?;
876        self.db.flush().map_err(AetherError::from)?;
877        Ok(())
878    }
879
880    pub fn update_access(&self, id: &str, now_ms: i64) -> Result<()> {
881        if let Some(mut meta) = self.get_meta(id)? {
882            meta.last_access_ms = now_ms;
883            meta.freq = meta.freq.saturating_add(1);
884            self.put_meta(&meta)?;
885        }
886        Ok(())
887    }
888
889    pub fn scan_all_metas(&self) -> Result<Vec<MemoryMeta>> {
890        let mut out = Vec::new();
891        for item in self.db.scan_prefix(b"meta:") {
892            let (_, v) = item.map_err(AetherError::from)?;
893            if let Ok(m) = serde_json::from_slice::<MemoryMeta>(&v) {
894                out.push(m);
895            }
896        }
897        Ok(out)
898    }
899
900    pub fn count_hot(&self) -> usize {
901        self.db.scan_prefix(b"meta:")
902            .filter_map(|r| r.ok())
903            .filter(|(_, v)| {
904                serde_json::from_slice::<MemoryMeta>(v)
905                    .map(|m| m.location == StorageLocation::Hot)
906                    .unwrap_or(false)
907            })
908            .count()
909    }
910
911    // ── 原始向量(用于冷层分区构建)──────────────────────────────────────────
912    pub fn put_raw_vector(&self, id: &str, vec: &[f32]) -> Result<()> {
913        let key = format!("vec:{}", id);
914        let encoded = VectorCodec::encode(vec);
915        let compressed = VectorCodec::compress(&encoded, 3)?;
916        self.db.insert(key.as_bytes(), compressed).map_err(AetherError::from)?;
917        Ok(())
918    }
919
920    pub fn get_raw_vector(&self, id: &str) -> Result<Option<Vec<f32>>> {
921        let key = format!("vec:{}", id);
922        match self.db.get(key.as_bytes()).map_err(AetherError::from)? {
923            Some(v) => {
924                let decompressed = VectorCodec::decompress(&v)?;
925                Ok(Some(VectorCodec::decode(&decompressed)?))
926            }
927            None => Ok(None),
928        }
929    }
930
931    pub fn remove_raw_vector(&self, id: &str) -> Result<()> {
932        let key = format!("vec:{}", id);
933        self.db.remove(key.as_bytes()).map_err(AetherError::from)?;
934        Ok(())
935    }
936
937    // ── 迁移状态 ─────────────────────────────────────────────────────────────
938    pub fn put_migration_state(&self, id: &str, state: &MigrationState) -> Result<()> {
939        let key = format!("mig:{}", id);
940        let val = serde_json::to_vec(state)
941            .map_err(|e| AetherError::Serialization(e.to_string()))?;
942        self.db.insert(key.as_bytes(), val).map_err(AetherError::from)?;
943        self.db.flush().map_err(AetherError::from)?;
944        Ok(())
945    }
946
947    pub fn get_migration_state(&self, id: &str) -> Result<Option<MigrationState>> {
948        let key = format!("mig:{}", id);
949        match self.db.get(key.as_bytes()).map_err(AetherError::from)? {
950            Some(v) => Ok(Some(serde_json::from_slice(&v)
951                .map_err(|e| AetherError::Serialization(e.to_string()))?)),
952            None => Ok(None),
953        }
954    }
955
956    pub fn remove_migration_state(&self, id: &str) -> Result<()> {
957        let key = format!("mig:{}", id);
958        self.db.remove(key.as_bytes()).map_err(AetherError::from)?;
959        Ok(())
960    }
961
962    pub fn scan_pending_migrations(&self) -> Vec<String> {
963        self.db.scan_prefix(b"mig:")
964            .filter_map(|r| r.ok())
965            .filter_map(|(k, _)| {
966                String::from_utf8(k.to_vec()).ok()
967                    .and_then(|s| s.strip_prefix("mig:").map(|id| id.to_string()))
968            })
969            .collect()
970    }
971
972    // ── 重要性 ───────────────────────────────────────────────────────────────
973    pub fn put_importance(&self, id: &str, val: f32) -> Result<()> {
974        let key = format!("imp:{}", id);
975        let encoded = val.to_le_bytes();
976        self.db.insert(key.as_bytes(), &encoded).map_err(AetherError::from)?;
977        Ok(())
978    }
979
980    pub fn get_importance(&self, id: &str) -> Result<f32> {
981        let key = format!("imp:{}", id);
982        match self.db.get(key.as_bytes()).map_err(AetherError::from)? {
983            Some(v) if v.len() >= 4 => {
984                let bytes: [u8; 4] = v[..4].try_into().unwrap_or([0u8; 4]);
985                Ok(f32::from_le_bytes(bytes))
986            }
987            _ => Ok(0.5),
988        }
989    }
990
991    pub fn remove_importance(&self, id: &str) -> Result<()> {
992        let key = format!("imp:{}", id);
993        self.db.remove(key.as_bytes()).map_err(AetherError::from)?;
994        Ok(())
995    }
996
997    pub fn scan_all_importance(&self) -> Vec<(String, f32)> {
998        self.db.scan_prefix(b"imp:")
999            .filter_map(|r| r.ok())
1000            .filter_map(|(k, v)| {
1001                let id = String::from_utf8(k.to_vec()).ok()
1002                    .and_then(|s| s.strip_prefix("imp:").map(|s| s.to_string()))?;
1003                if v.len() < 4 {
1004                    return None;
1005                }
1006                let bytes: [u8; 4] = v[..4].try_into().ok()?;
1007                Some((id, f32::from_le_bytes(bytes)))
1008            })
1009            .collect()
1010    }
1011
1012    // ── 分区索引 ─────────────────────────────────────────────────────────────
1013    pub fn put_partition(&self, p: &Partition) -> Result<()> {
1014        let key = format!("part:{}", p.id);
1015        let val = serde_json::to_vec(p)
1016            .map_err(|e| AetherError::Serialization(e.to_string()))?;
1017        self.db.insert(key.as_bytes(), val).map_err(AetherError::from)?;
1018        Ok(())
1019    }
1020
1021    pub fn load_all_partitions(&self) -> Result<Vec<Partition>> {
1022        let mut parts = Vec::new();
1023        for item in self.db.scan_prefix(b"part:") {
1024            let (_, v) = item.map_err(AetherError::from)?;
1025            if let Ok(p) = serde_json::from_slice::<Partition>(&v) {
1026                parts.push(p);
1027            }
1028        }
1029        Ok(parts)
1030    }
1031
1032    pub fn clear_all_partitions(&self) -> Result<()> {
1033        let keys: Vec<Vec<u8>> = self.db.scan_prefix(b"part:")
1034            .filter_map(|r| r.ok().map(|(k, _)| k.to_vec()))
1035            .collect();
1036        for k in keys {
1037            self.db.remove(&k).map_err(AetherError::from)?;
1038        }
1039        Ok(())
1040    }
1041}
1042
1043// ═══════════════════════════════════════════════════════════════════════════════ 
1044// § 7 冷层存储 —— 本地文件 + 可选S3
1045// ═══════════════════════════════════════════════════════════════════════════════ 
1046
1047pub struct ColdStore {
1048    cfg: ColdConfig,
1049    // S3 client 在启用aws feature时才真实构建,这里用trait object以保持单文件可编译
1050    // 若未启用S3,此字段保持None
1051    s3: Option<Arc<dyn S3Backend + Send + Sync>>,
1052}
1053
1054/// S3后端抽象(允许注入真实aws-sdk-s3实现或测试mock)
1055#[async_trait::async_trait]
1056pub trait S3Backend: Send + Sync {
1057    async fn put(&self, bucket: &str, key: &str, data: Bytes) -> Result<()>;
1058    async fn get(&self, bucket: &str, key: &str) -> Result<Bytes>;
1059    async fn delete(&self, bucket: &str, key: &str) -> Result<()>;
1060    async fn exists(&self, bucket: &str, key: &str) -> bool;
1061}
1062
1063impl ColdStore {
1064    pub async fn new(cfg: ColdConfig) -> Result<Self> {
1065        std::fs::create_dir_all(&cfg.local_dir)?;
1066        // S3 client 由调用者注入(避免在此处硬绑定aws-sdk版本)
1067        Ok(Self { cfg, s3: None })
1068    }
1069
1070    /// 注入S3后端(可选)
1071    pub fn with_s3(mut self, backend: Arc<dyn S3Backend + Send + Sync>) -> Self {
1072        self.s3 = Some(backend);
1073        self
1074    }
1075
1076    fn local_path(&self, id: &str) -> PathBuf {
1077        // 两级目录分片,防止单目录文件数过多
1078        let h = fnv1a(id);
1079        let d1 = (h >> 56) & 0xFF;
1080        let d2 = (h >> 48) & 0xFF;
1081        PathBuf::from(&self.cfg.local_dir)
1082            .join(format!("{:02x}", d1))
1083            .join(format!("{:02x}", d2))
1084            .join(format!("{}.bin.zst", id))
1085    }
1086
1087    fn s3_key(&self, id: &str) -> String {
1088        match &self.cfg.s3_prefix {
1089            Some(p) => format!("{}/{}.bin.zst", p.trim_end_matches('/'), id),
1090            None => format!("{}.bin.zst", id),
1091        }
1092    }
1093
1094    /// 写入:先本地,再S3(若配置)
1095    pub async fn put(&self, id: &str, vec: &[f32]) -> Result<StorageLocation> {
1096        let encoded = VectorCodec::encode(vec);
1097        let compressed = VectorCodec::compress(&encoded, self.cfg.compress_level)?;
1098        let local = self.local_path(id);
1099
1100        if let Some(parent) = local.parent() {
1101            fs::create_dir_all(parent).await?;
1102        }
1103        fs::write(&local, &compressed).await?;
1104
1105        if let (Some(s3), Some(bucket)) = (&self.s3, &self.cfg.s3_bucket) {
1106            let key = self.s3_key(id);
1107            s3.put(bucket, &key, Bytes::from(compressed)).await?;
1108            return Ok(StorageLocation::S3(key));
1109        }
1110
1111        Ok(StorageLocation::Local(local.to_string_lossy().to_string()))
1112    }
1113
1114    /// 读取:先本地,再S3,并在本地缓存
1115    pub async fn get(&self, id: &str) -> Result<Vec<f32>> {
1116        let local = self.local_path(id);
1117        if local.exists() {
1118            let compressed = fs::read(&local).await?;
1119            let encoded = VectorCodec::decompress(&compressed)?;
1120            return VectorCodec::decode(&encoded);
1121        }
1122
1123        if let (Some(s3), Some(bucket)) = (&self.s3, &self.cfg.s3_bucket) {
1124            let key = self.s3_key(id);
1125            let data = s3.get(bucket, &key).await?;
1126            // 缓存到本地
1127            if let Some(parent) = local.parent() {
1128                fs::create_dir_all(parent).await?;
1129            }
1130            fs::write(&local, &data).await?;
1131            let encoded = VectorCodec::decompress(&data)?;
1132            return VectorCodec::decode(&encoded);
1133        }
1134
1135        Err(AetherError::NotFound(id.to_string()).into())
1136    }
1137
1138    pub async fn delete(&self, id: &str) -> Result<()> {
1139        let local = self.local_path(id);
1140        if local.exists() {
1141            fs::remove_file(&local).await?;
1142        }
1143        if let (Some(s3), Some(bucket)) = (&self.s3, &self.cfg.s3_bucket) {
1144            let _ = s3.delete(bucket, &self.s3_key(id)).await;
1145        }
1146        Ok(())
1147    }
1148
1149    pub async fn exists(&self, id: &str) -> bool {
1150        self.local_path(id).exists()
1151    }
1152}
1153
1154// ═══════════════════════════════════════════════════════════════════════════════ 
1155// § 8 热层索引 —— 分片内存向量存储 + LRU
1156// ═══════════════════════════════════════════════════════════════════════════════ 
1157
1158/// 单个分片:一把读写锁守护所有向量
1159struct HotShard {
1160    dim: usize,
1161    max: usize,
1162    vectors: RwLock<HashMap<String, Vec<f32>>>,
1163    lru: Mutex<LruCache<String, ()>>,
1164}
1165
1166impl HotShard {
1167    fn new(dim: usize, max: usize) -> Self {
1168        Self {
1169            dim,
1170            max,
1171            vectors: RwLock::new(HashMap::new()),
1172            lru: Mutex::new(LruCache::new(NonZeroUsize::new(max.max(1)).unwrap())),
1173        }
1174    }
1175
1176    fn add(&self, id: String, vec: Vec<f32>) -> Result<()> {
1177        if vec.len() != self.dim {
1178            return Err(AetherError::DimensionMismatch {
1179                expected: self.dim,
1180                got: vec.len(),
1181            }.into());
1182        }
1183        self.vectors.write().insert(id.clone(), vec);
1184        self.lru.lock().put(id, ());
1185        Ok(())
1186    }
1187
1188    fn remove(&self, id: &str) -> Option<Vec<f32>> {
1189        self.lru.lock().pop(&id.to_string());  // 修复:&str → &String
1190        self.vectors.write().remove(id)
1191    }
1192
1193    fn get(&self, id: &str) -> Option<Vec<f32>> {
1194        let v = self.vectors.read().get(id).cloned();
1195        if v.is_some() {
1196            self.lru.lock().get(&id.to_string());  // 修复:&str → &String
1197        }
1198        v
1199    }
1200
1201    fn touch(&self, id: &str) {
1202        self.lru.lock().get(&id.to_string());  // 修复:&str → &String
1203    }
1204
1205    fn contains(&self, id: &str) -> bool {
1206        self.vectors.read().contains_key(id)
1207    }
1208
1209    fn len(&self) -> usize {
1210        self.vectors.read().len()
1211    }
1212
1213    /// 线性扫描找最近邻(纯Rust,无faiss依赖)
1214    #[inline(never)]
1215    fn search(&self, query: &[f32], k: usize) -> Result<Vec<(String, f32)>> {
1216        if query.len() != self.dim {
1217            return Err(AetherError::DimensionMismatch {
1218                expected: self.dim,
1219                got: query.len(),
1220            }.into());
1221        }
1222        let vecs = self.vectors.read();
1223        // 线性扫描所有距离(顺序执行,减少编译内存占用)
1224        let mut results: Vec<(String, f32)> = vecs.iter()
1225            .map(|(id, v)| (id.clone(), l2_distance(query, v)))
1226            .collect();
1227        results.sort_by(|a, b| a.1.partial_cmp(&b.1).unwrap_or(Ordering::Equal));
1228        results.truncate(k);
1229        Ok(results)
1230    }
1231
1232    /// 驱逐超出容量的条目,返回被驱逐的ID列表
1233    fn evict_overflows(&self) -> Vec<String> {
1234        let mut evicted = Vec::new();
1235        let mut lru = self.lru.lock();
1236        while lru.len() > self.max {
1237            if let Some((id, _)) = lru.pop_lru() {
1238                self.vectors.write().remove(&id);
1239                evicted.push(id);
1240            } else {
1241                break;
1242            }
1243        }
1244        evicted
1245    }
1246}
1247
1248/// 分片热层索引 —— 将ID哈希路由到对应分片,减少全局锁竞争
1249pub struct HotIndex {
1250    shards: Vec<Arc<HotShard>>,
1251    dim: usize,
1252    /// 批量写入通道(SLO异步收集)
1253    write_tx: mpsc::Sender<WriteBatch>,
1254}
1255
1256struct WriteBatch {
1257    items: Vec<(String, Vec<f32>)>,
1258    resp: oneshot::Sender<Result<()>>,
1259    trace_id: String,
1260}
1261
1262struct SearchBatch {
1263    query: Vec<f32>,
1264    k: usize,
1265    resp: oneshot::Sender<Result<Vec<(String, f32)>>>,
1266    priority: u8,
1267    trace_id: String,
1268}
1269
1270impl HotIndex {
1271    pub fn new(cfg: &HotConfig) -> Result<Self> {
1272        let shard_max = (cfg.max_items / cfg.shard_count).max(1);
1273        let shards: Vec<Arc<HotShard>> = (0..cfg.shard_count)
1274            .map(|_| Arc::new(HotShard::new(cfg.dim, shard_max)))
1275            .collect();
1276
1277        let (write_tx, mut write_rx) = mpsc::channel::<WriteBatch>(8192);
1278        let shards_clone = shards.clone();
1279        let dim = cfg.dim;
1280
1281        // 批量写入工作协程
1282        tokio::spawn(async move {
1283            while let Some(batch) = write_rx.recv().await {
1284                let result: Result<()> = (|| {
1285                    for (id, vec) in batch.items {
1286                        let idx = (fnv1a(&id) as usize) % shards_clone.len();
1287                        shards_clone[idx].add(id, vec)?;
1288                    }
1289                    Ok(())
1290                })();
1291                let _ = batch.resp.send(result);
1292            }
1293        });
1294
1295        Ok(Self { shards, dim, write_tx })
1296    }
1297
1298    fn shard_for(&self, id: &str) -> &Arc<HotShard> {
1299        let idx = (fnv1a(id) as usize) % self.shards.len();
1300        &self.shards[idx]
1301    }
1302
1303    pub fn add_sync(&self, id: String, vec: Vec<f32>) -> Result<()> {
1304        self.shard_for(&id).add(id, vec)
1305    }
1306
1307    /// 异步批量写入(经由背景协程,避免阻塞调用方)
1308    pub async fn write_batch(&self, items: Vec<(String, Vec<f32>)>, trace_id: String) -> Result<()> {
1309        let (tx, rx) = oneshot::channel();
1310        self.write_tx.send(WriteBatch { items, resp: tx, trace_id })
1311            .await
1312            .map_err(|e| anyhow!("write_tx send error: {}", e))?;
1313        rx.await.map_err(|e| anyhow!("write_tx recv error: {}", e))?
1314    }
1315
1316    pub fn remove(&self, id: &str) -> Option<Vec<f32>> {
1317        self.shard_for(id).remove(id)
1318    }
1319
1320    pub fn get(&self, id: &str) -> Option<Vec<f32>> {
1321        self.shard_for(id).get(id)
1322    }
1323
1324    pub fn touch(&self, id: &str) {
1325        self.shard_for(id).touch(id);
1326    }
1327
1328    pub fn contains(&self, id: &str) -> bool {
1329        self.shard_for(id).contains(id)
1330    }
1331
1332    pub fn len(&self) -> usize {
1333        self.shards.iter().map(|s| s.len()).sum()
1334    }
1335
1336    /// 跨分片搜索,合并后取top-k
1337    #[inline(never)]
1338    pub fn search(&self, query: &[f32], k: usize) -> Result<Vec<(String, f32)>> {
1339        if query.len() != self.dim {
1340            return Err(AetherError::DimensionMismatch {
1341                expected: self.dim,
1342                got: query.len(),
1343            }.into());
1344        }
1345
1346        // 顺序跨分片搜索,合并结果(顺序执行,减少编译内存占用)
1347        let per_shard: Vec<Vec<(String, f32)>> = self.shards
1348            .iter()
1349            .map(|shard| shard.search(query, k).unwrap_or_default())
1350            .collect();
1351
1352        let mut merged: Vec<(String, f32)> = per_shard.into_iter().flatten().collect();
1353        merged.sort_by(|a, b| a.1.partial_cmp(&b.1).unwrap_or(Ordering::Equal));
1354        merged.truncate(k);
1355        Ok(merged)
1356    }
1357
1358    /// 返回所有超出容量被驱逐的ID(由波动引擎处理后续迁移)
1359    pub fn collect_evictions(&self) -> Vec<String> {
1360        self.shards.iter().flat_map(|s| s.evict_overflows()).collect()
1361    }
1362}
1363
1364// ═══════════════════════════════════════════════════════════════════════════════ 
1365// § 9 星云索引 —— 冷层K-Means分区,粗筛加速
1366// ═══════════════════════════════════════════════════════════════════════════════ 
1367
1368pub struct NebulaIndex {
1369    partitions: RwLock<Vec<Partition>>,
1370    dim: usize,
1371    top_k: usize,
1372}
1373
1374impl NebulaIndex {
1375    pub fn new(dim: usize, top_k: usize, existing: Vec<Partition>) -> Self {
1376        Self {
1377            partitions: RwLock::new(existing),
1378            dim,
1379            top_k,
1380        }
1381    }
1382
1383    /// K-Means聚类构建分区(在冷层元数据扫描后调用)
1384    #[inline(never)]
1385    pub fn build(&self, vectors: &[(String, Vec<f32>)], k: usize) {
1386        if vectors.is_empty() {
1387            return;
1388        }
1389
1390        let k = k.min(vectors.len());
1391        let dim = vectors[0].1.len();
1392
1393        // K-Means++ 初始化
1394        let mut centroids: Vec<Vec<f32>> = Vec::with_capacity(k);
1395        // 修复:删除未使用的 rng 变量
1396        let first_idx = fastrand::usize(..vectors.len().max(1));
1397        centroids.push(vectors[first_idx].1.clone());
1398
1399        while centroids.len() < k {
1400            // D²加权采样
1401            let dists: Vec<f32> = vectors.iter()
1402                .map(|(_, v)| centroids.iter()
1403                    .map(|c| l2_sq(v, c))
1404                    .fold(f32::MAX, f32::min))
1405                .collect();
1406
1407            let total: f32 = dists.iter().sum();
1408            if total < 1e-9 {
1409                break;
1410            }
1411
1412            let mut r = fastrand::f32() * total;
1413            for (i, &d) in dists.iter().enumerate() {
1414                r -= d;
1415                if r <= 0.0 {
1416                    centroids.push(vectors[i].1.clone());
1417                    break;
1418                }
1419            }
1420        }
1421
1422        // Lloyd迭代
1423        let max_iter = 30;
1424        let mut assignments = vec![0usize; vectors.len()];
1425
1426        for _iter in 0..max_iter {
1427            let mut changed = false;
1428
1429            // 分配(并行)
1430            assignments.iter_mut().enumerate().for_each(|(i, a)| {
1431                let best = centroids.iter()
1432                    .enumerate()
1433                    .map(|(ci, c)| (ci, l2_sq(&vectors[i].1, c)))
1434                    .min_by(|x, y| x.1.partial_cmp(&y.1).unwrap_or(Ordering::Equal))
1435                    .map(|(ci, _)| ci)
1436                    .unwrap_or(0);
1437                if *a != best {
1438                    *a = best;
1439                }
1440            });
1441
1442            // 更新质心
1443            let mut sums = vec![vec![0.0f64; dim]; k];
1444            let mut cnts = vec![0usize; k];
1445
1446            for (i, &a) in assignments.iter().enumerate() {
1447                let p = a.min(k - 1);
1448                cnts[p] += 1;
1449                for (j, &v) in vectors[i].1.iter().enumerate() {
1450                    sums[p][j] += v as f64;
1451                }
1452            }
1453
1454            for (ci, c) in centroids.iter_mut().enumerate() {
1455                if cnts[ci] > 0 {
1456                    let new_c: Vec<f32> = sums[ci].iter().map(|&s| (s / cnts[ci] as f64) as f32).collect();
1457                    if l2_sq(c, &new_c) > 1e-8 {
1458                        changed = true;
1459                    }
1460                    *c = new_c;
1461                }
1462            }
1463
1464            if !changed {
1465                break;
1466            }
1467        }
1468
1469        // 构造分区
1470        let mut parts: Vec<Partition> = centroids.into_iter().enumerate()
1471            .map(|(i, c)| Partition {
1472                id: i,
1473                centroid: c,
1474                keys: Vec::new(),
1475            })
1476            .collect();
1477
1478        for (i, &a) in assignments.iter().enumerate() {
1479            let p = a.min(parts.len() - 1);
1480            parts[p].keys.push(vectors[i].0.clone());
1481        }
1482
1483        *self.partitions.write() = parts;
1484    }
1485
1486    /// 粗筛:找最近top_k个分区,返回候选ID
1487    pub fn coarse_candidates(&self, query: &[f32]) -> Vec<String> {
1488        let parts = self.partitions.read();
1489        if parts.is_empty() {
1490            return Vec::new();
1491        }
1492
1493        let mut dists: Vec<(usize, f32)> = parts.iter()
1494            .enumerate()
1495            .map(|(i, p)| (i, l2_sq(&p.centroid, query)))
1496            .collect();
1497
1498        dists.sort_by(|a, b| a.1.partial_cmp(&b.1).unwrap_or(Ordering::Equal));
1499
1500        dists.iter()
1501            .take(self.top_k)
1502            .flat_map(|(i, _)| parts[*i].keys.iter().cloned())
1503            .collect()
1504    }
1505
1506    pub fn add_to_partition(&self, id: &str, vec: &[f32]) {
1507        let mut parts = self.partitions.write();
1508        if parts.is_empty() {
1509            return;
1510        }
1511        let best = parts.iter()
1512            .enumerate()
1513            .map(|(i, p)| (i, l2_sq(&p.centroid, vec)))
1514            .min_by(|a, b| a.1.partial_cmp(&b.1).unwrap_or(Ordering::Equal))
1515            .map(|(i, _)| i)
1516            .unwrap_or(0);
1517        parts[best].keys.push(id.to_string());
1518    }
1519
1520    pub fn remove_from_partitions(&self, id: &str) {
1521        let mut parts = self.partitions.write();
1522        for p in parts.iter_mut() {
1523            p.keys.retain(|k| k != id);
1524        }
1525    }
1526
1527    pub fn snapshot(&self) -> Vec<Partition> {
1528        self.partitions.read().clone()
1529    }
1530
1531    pub fn partition_count(&self) -> usize {
1532        self.partitions.read().len()
1533    }
1534}
1535
1536// ═══════════════════════════════════════════════════════════════════════════════ 
1537// § 10 两阶段迁移器 —— 原子性保证,崩溃可恢复
1538// ═══════════════════════════════════════════════════════════════════════════════ 
1539
1540pub struct Metamorphoser {
1541    akashic: Arc<AkashicRecords>,
1542    cold: Arc<ColdStore>,
1543    hot: Arc<HotIndex>,
1544    nebula: Arc<NebulaIndex>,
1545    metrics: Arc<Metrics>,
1546    // 每个ID一把锁,防止并发迁移同一条目
1547    locks: Arc<Mutex<HashMap<String, Arc<tokio::sync::Mutex<()>>>>>,
1548}
1549
1550impl Metamorphoser {
1551    pub fn new(
1552        akashic: Arc<AkashicRecords>,
1553        cold: Arc<ColdStore>,
1554        hot: Arc<HotIndex>,
1555        nebula: Arc<NebulaIndex>,
1556        metrics: Arc<Metrics>,
1557    ) -> Self {
1558        Self {
1559            akashic,
1560            cold,
1561            hot,
1562            nebula,
1563            metrics,
1564            locks: Arc::new(Mutex::new(HashMap::new())),
1565        }
1566    }
1567
1568    fn get_lock(&self, id: &str) -> Arc<tokio::sync::Mutex<()>> {
1569        self.locks.lock()
1570            .entry(id.to_string())
1571            .or_insert_with(|| Arc::new(tokio::sync::Mutex::new(())))
1572            .clone()
1573    }
1574
1575    /// 热→冷:两阶段提交
1576    pub async fn descend(&self, id: &str, trace_id: &str) -> Result<()> {
1577        let lock = self.get_lock(id);
1578        let _guard = lock.lock().await;
1579
1580        // 检查是否有悬挂迁移
1581        if self.akashic.get_migration_state(id)?.is_some() {
1582            return Err(AetherError::MigrationInProgress(id.to_string()).into());
1583        }
1584
1585        let meta = self.akashic.get_meta(id)?
1586            .ok_or_else(|| AetherError::NotFound(id.to_string()))?;
1587
1588        if meta.location != StorageLocation::Hot {
1589            return Ok(());
1590        }
1591
1592        let vec = self.hot.remove(id)
1593            .ok_or_else(|| AetherError::NotFound(format!("hot:{}", id)))?;
1594
1595        // Phase 1: 标记迁移开始
1596        self.akashic.put_migration_state(id, &MigrationState::Started {
1597            target: StorageLocation::Local(String::new()),
1598        })?;
1599
1600        // Phase 1b: 写入冷层
1601        let location = match self.cold.put(id, &vec).await {
1602            Ok(loc) => loc,
1603            Err(e) => {
1604                // 写入失败:放回热层,清除迁移标记
1605                let _ = self.hot.add_sync(id.to_string(), vec);
1606                let _ = self.akashic.remove_migration_state(id);
1607                self.metrics.record_migration_failed();
1608                return Err(e);
1609            }
1610        };
1611
1612        // Phase 2: 原子提交元数据 + 清除迁移标记
1613        let new_meta = MemoryMeta {
1614            location: location.clone(),
1615            cold_cost_mb: vec.len() as f32 * 4.0 / 1_048_576.0,
1616            version: meta.version + 1,
1617            ..meta
1618        };
1619        self.akashic.commit_meta_and_clear_migration(&new_meta)?;
1620
1621        // 更新星云索引
1622        self.nebula.add_to_partition(id, &vec);
1623
1624        self.metrics.record_migration_down();
1625        info!("[{}] descended {} → {:?}", trace_id, id, location);
1626
1627        Ok(())
1628    }
1629
1630    /// 冷→热:两阶段提交
1631    pub async fn ascend(&self, id: &str, trace_id: &str) -> Result<()> {
1632        let lock = self.get_lock(id);
1633        let _guard = lock.lock().await;
1634
1635        if self.akashic.get_migration_state(id)?.is_some() {
1636            return Err(AetherError::MigrationInProgress(id.to_string()).into());
1637        }
1638
1639        let meta = self.akashic.get_meta(id)?
1640            .ok_or_else(|| AetherError::NotFound(id.to_string()))?;
1641
1642        if meta.location == StorageLocation::Hot {
1643            return Ok(());
1644        }
1645
1646        // Phase 1: 标记迁移开始
1647        self.akashic.put_migration_state(id, &MigrationState::Started {
1648            target: StorageLocation::Hot,
1649        })?;
1650
1651        // Phase 1b: 从冷层读取
1652        let vec = match self.cold.get(id).await {
1653            Ok(v) => v,
1654            Err(e) => {
1655                let _ = self.akashic.remove_migration_state(id);
1656                self.metrics.record_migration_failed();
1657                return Err(e);
1658            }
1659        };
1660
1661        // 写入热层
1662        self.hot.write_batch(vec![(id.to_string(), vec.clone())], trace_id.to_string()).await?;
1663
1664        // Phase 2: 原子提交
1665        let new_meta = MemoryMeta {
1666            location: StorageLocation::Hot,
1667            cold_cost_mb: 0.0,
1668            freq: meta.freq + 1,
1669            version: meta.version + 1,
1670            ..meta
1671        };
1672        self.akashic.commit_meta_and_clear_migration(&new_meta)?;
1673
1674        // 从星云索引移除(已在热层)
1675        self.nebula.remove_from_partitions(id);
1676
1677        self.metrics.record_migration_up();
1678        info!("[{}] ascended {} → hot", trace_id, id);
1679
1680        Ok(())
1681    }
1682
1683    pub async fn rollback_pending(&self) -> Result<usize> {
1684        let pending = self.akashic.scan_pending_migrations();
1685        let count = pending.len();
1686
1687        for id in &pending {
1688            warn!("rolling back dangling migration: {}", id);
1689            // 不知道原始状态时,保守策略是清除迁移标记,让系统自然重新调度
1690            let _ = self.akashic.remove_migration_state(id);
1691            self.metrics.record_migration_rollback();
1692        }
1693
1694        if count > 0 {
1695            info!("rolled back {} pending migrations", count);
1696        }
1697
1698        Ok(count)
1699    }
1700}
1701
1702// ═══════════════════════════════════════════════════════════════════════════════ 
1703// § 11 反馈引擎 —— 强化学习式重要性更新
1704// ═══════════════════════════════════════════════════════════════════════════════ 
1705
1706pub struct FeedbackEngine {
1707    akashic: Arc<AkashicRecords>,
1708    decay_rate: f32,
1709    metrics: Arc<Metrics>,
1710}
1711
1712impl FeedbackEngine {
1713    pub fn new(akashic: Arc<AkashicRecords>, decay_rate: f32, metrics: Arc<Metrics>) -> Self {
1714        Self { akashic, decay_rate, metrics }
1715    }
1716
1717    /// 施加外部奖励/惩罚信号(0.0惩罚,1.0奖励)
1718    pub async fn apply(&self, id: &str, reward: f32) -> Result<()> {
1719        let mut imp = self.akashic.get_importance(id)?;
1720        // EMA更新
1721        imp = (imp * 0.8 + reward * 0.2).clamp(0.0, 1.0);
1722        self.akashic.put_importance(id, imp)?;
1723
1724        if let Some(mut meta) = self.akashic.get_meta(id)? {
1725            meta.importance = imp;
1726            self.akashic.put_meta(&meta)?;
1727        }
1728
1729        self.metrics.record_feedback();
1730        Ok(())
1731    }
1732
1733    /// 全局重要性衰减(每tick调用)
1734    pub fn decay_all(&self) -> Result<()> {
1735        let entries = self.akashic.scan_all_importance();
1736        for (id, mut val) in entries {
1737            val *= self.decay_rate;
1738            if val < 0.005 {
1739                let _ = self.akashic.remove_importance(&id);
1740            } else {
1741                let _ = self.akashic.put_importance(&id, val);
1742            }
1743        }
1744        Ok(())
1745    }
1746}
1747
1748// ═══════════════════════════════════════════════════════════════════════════════ 
1749// § 12 驱逐策略 —— 多因子评分
1750// ═══════════════════════════════════════════════════════════════════════════════ 
1751
1752pub struct EvictionPolicy {
1753    alpha: f32,  // 时效性权重
1754    beta: f32,   // 频率权重
1755    gamma: f32,  // 重要性权重
1756    delta: f32,  // 迁移代价权重(负向:代价越高越不愿驱逐)
1757}
1758
1759impl EvictionPolicy {
1760    pub fn new(alpha: f32, beta: f32, gamma: f32, delta: f32) -> Self {
1761        Self { alpha, beta, gamma, delta }
1762    }
1763
1764    /// 评分越低越容易被驱逐
1765    pub fn score(&self, meta: &MemoryMeta) -> f32 {
1766        let age_sec = ((now_ms() - meta.last_access_ms).max(1) as f32) / 1000.0;
1767        let recency = -age_sec / 3600.0; // 1小时内为0,之后负增
1768        let freq = (meta.freq as f32 + 1.0).ln(); // 对数频率
1769
1770        self.alpha * recency + self.beta * freq + self.gamma * meta.importance - self.delta * meta.cold_cost_mb
1771    }
1772
1773    pub fn select_evictions(&self, metas: &[MemoryMeta], keep_count: usize) -> Vec<String> {
1774        if metas.len() <= keep_count {
1775            return Vec::new();
1776        }
1777
1778        let mut scored: Vec<(f32, String)> = metas.iter()
1779            .map(|m| (self.score(m), m.id.clone()))
1780            .collect();
1781
1782        scored.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(Ordering::Equal));
1783
1784        scored[keep_count..].iter().map(|(_, id)| id.clone()).collect()
1785    }
1786}
1787
1788// sigmoid函数:将评分映射到[0,1]概率
1789fn sigmoid(x: f32, k: f32) -> f32 {
1790    1.0 / (1.0 + (-k * x).exp())
1791}
1792
1793// ═══════════════════════════════════════════════════════════════════════════════ 
1794// § 13 恢复队列 —— 异步节流,指数退避
1795// ═══════════════════════════════════════════════════════════════════════════════ 
1796
1797pub struct RestoreQueue {
1798    tx: mpsc::Sender<(String, u8, String)>,
1799    depth: Arc<AtomicUsize>,
1800}
1801
1802impl RestoreQueue {
1803    pub fn new(
1804        morpher: Arc<Metamorphoser>,
1805        concurrency: usize,
1806        backoff_ms: u64,
1807        max_retries: u32,
1808        metrics: Arc<Metrics>,
1809    ) -> Self {
1810        let (tx, mut rx) = mpsc::channel::<(String, u8, String)>(4096);
1811        let depth = Arc::new(AtomicUsize::new(0));
1812        let depth_clone = depth.clone();
1813        let sem = Arc::new(Semaphore::new(concurrency.max(1)));
1814
1815        tokio::spawn(async move {
1816            while let Some((id, _priority, trace_id)) = rx.recv().await {
1817                depth_clone.fetch_sub(1, AtomicOrd::SeqCst);
1818                let permit = sem.clone().acquire_owned().await.unwrap();
1819                let m = morpher.clone();
1820                let met = metrics.clone();
1821                let id2 = id.clone();
1822                let tid = trace_id.clone();
1823
1824                tokio::spawn(async move {
1825                    let mut backoff = backoff_ms;
1826                    for attempt in 0..max_retries {
1827                        match m.ascend(&id2, &tid).await {
1828                            Ok(_) => break,
1829                            Err(e) => {
1830                                warn!("[{}] restore {} failed ({}/{}): {}", tid, id2, attempt+1, max_retries, e);
1831                                met.record_migration_failed();
1832                                if attempt + 1 < max_retries {
1833                                    sleep(Duration::from_millis(backoff)).await;
1834                                    backoff = (backoff * 2).min(10_000);
1835                                } else {
1836                                    error!("[{}] restore {} permanently failed", tid, id2);
1837                                }
1838                            }
1839                        }
1840                    }
1841                    drop(permit);
1842                });
1843            }
1844        });
1845
1846        Self { tx, depth }
1847    }
1848
1849    pub async fn enqueue(&self, id: String, priority: u8, trace_id: String) -> Result<()> {
1850        self.depth.fetch_add(1, AtomicOrd::SeqCst);
1851        self.tx.send((id, priority, trace_id))
1852            .await
1853            .map_err(|e| anyhow!("restore_queue send: {}", e))
1854    }
1855
1856    pub fn queue_depth(&self) -> usize {
1857        self.depth.load(AtomicOrd::Relaxed)
1858    }
1859}
1860
1861// ═══════════════════════════════════════════════════════════════════════════════ 
1862// § 14 SLO管理器 —— 批量聚合 + 并发信号量
1863// ═══════════════════════════════════════════════════════════════════════════════ 
1864
1865pub struct SloManager {
1866    search_tx: mpsc::Sender<SearchBatch>,
1867    sem: Arc<Semaphore>,
1868    cfg: HotConfig,
1869    metrics: Arc<Metrics>,
1870}
1871
1872impl SloManager {
1873    pub fn new(hot: Arc<HotIndex>, cfg: HotConfig, metrics: Arc<Metrics>) -> Self {
1874        let sem = Arc::new(Semaphore::new(cfg.max_concurrent_searches));
1875        let (tx, mut rx) = mpsc::channel::<SearchBatch>(8192);
1876
1877        let hot_clone = hot.clone();
1878        let sem_clone = sem.clone();
1879        let met_clone = metrics.clone();
1880        let win_ms = cfg.batch_window_ms;
1881        let max_batch = cfg.max_batch_size;
1882        let search_tout = cfg.search_timeout;
1883
1884        tokio::spawn(async move {
1885            loop {
1886                let first = match rx.recv().await {
1887                    Some(r) => r,
1888                    None => break,
1889                };
1890
1891                let mut batch = vec![first];
1892                // 短暂聚合窗口
1893                let deadline = tokio::time::Instant::now() + Duration::from_millis(win_ms);
1894
1895                while batch.len() < max_batch {
1896                    match tokio::time::timeout_at(deadline, rx.recv()).await {
1897                        Ok(Some(r)) => batch.push(r),
1898                        _ => break,
1899                    }
1900                }
1901
1902                for req in batch {
1903                    let permit = sem_clone.clone().acquire_owned().await.unwrap();
1904                    let h = hot_clone.clone();
1905                    let m = met_clone.clone();
1906                    let timeout_dur = search_tout;
1907
1908                    tokio::spawn(async move {
1909                        let t0 = Instant::now();
1910                        let result = tokio::time::timeout(
1911                            timeout_dur,
1912                            tokio::task::spawn_blocking(move || h.search(&req.query, req.k))
1913                        ).await;
1914
1915                        let ns = t0.elapsed().as_nanos() as u64;
1916                        m.record_search_latency(ns);
1917
1918                        let resp = match result {
1919                            Ok(Ok(Ok(r))) => {
1920                                m.record_hot_hit();
1921                                Ok(r)
1922                            }
1923                            Ok(Ok(Err(e))) => Err(anyhow!("{}", e)),
1924                            Ok(Err(join_e)) => Err(anyhow!("join: {}", join_e)),
1925                            Err(_) => {
1926                                Err(AetherError::Timeout(timeout_dur).into())
1927                            }
1928                        };
1929
1930                        let _ = req.resp.send(resp);
1931                        drop(permit);
1932                    });
1933                }
1934            }
1935        });
1936
1937        Self { search_tx: tx, sem, cfg, metrics }
1938    }
1939
1940    pub async fn search(&self, query: Vec<f32>, k: usize, priority: u8, trace_id: String) -> Result<Vec<(String, f32)>> {
1941        let (tx, rx) = oneshot::channel();
1942        self.search_tx.send(SearchBatch { query, k, resp: tx, priority, trace_id })
1943            .await
1944            .map_err(|e| anyhow!("slo_tx: {}", e))?;
1945        rx.await.map_err(|e| anyhow!("slo_rx: {}", e))?
1946    }
1947}
1948
1949// ═══════════════════════════════════════════════════════════════════════════════ 
1950// § 15 波动引擎 —— 温度驱动的概率调度
1951// ═══════════════════════════════════════════════════════════════════════════════ 
1952
1953pub struct FluxEngine {
1954    cfg: FluxConfig,
1955    akashic: Arc<AkashicRecords>,
1956    morpher: Arc<Metamorphoser>,
1957    feedback: Arc<FeedbackEngine>,
1958    restore_q: Arc<RestoreQueue>,
1959    policy: EvictionPolicy,
1960    metrics: Arc<Metrics>,
1961    hot: Arc<HotIndex>,
1962    /// 当前系统温度(tokio RwLock,支持异步读写)
1963    temp: Arc<tokio::sync::RwLock<f32>>,
1964    // 迁移并发信号量
1965    mig_sem: Arc<Semaphore>,
1966}
1967
1968impl FluxEngine {
1969    pub fn new(
1970        cfg: FluxConfig,
1971        akashic: Arc<AkashicRecords>,
1972        morpher: Arc<Metamorphoser>,
1973        feedback: Arc<FeedbackEngine>,
1974        restore_q: Arc<RestoreQueue>,
1975        metrics: Arc<Metrics>,
1976        hot: Arc<HotIndex>,
1977    ) -> Self {
1978        let policy = EvictionPolicy::new(
1979            cfg.alpha_recency,
1980            cfg.beta_freq,
1981            cfg.gamma_importance,
1982            cfg.delta_cost,
1983        );
1984
1985        let init_temp = cfg.initial_temperature;
1986        let max_mig = cfg.max_concurrent_migrations.max(1);
1987
1988        Self {
1989            cfg,
1990            akashic,
1991            morpher,
1992            feedback,
1993            restore_q,
1994            policy,
1995            metrics,
1996            hot,
1997            temp: Arc::new(tokio::sync::RwLock::new(init_temp)),
1998            mig_sem: Arc::new(Semaphore::new(max_mig)),
1999        }
2000    }
2001
2002    pub fn start(self: Arc<Self>) {
2003        let interval = self.cfg.tick_interval;
2004
2005        tokio::spawn(async move {
2006            loop {
2007                let t0 = Instant::now();
2008                if let Err(e) = self.tick().await {
2009                    error!("FluxEngine tick error: {}", e);
2010                }
2011                let elapsed = t0.elapsed();
2012                if elapsed < interval {
2013                    sleep(interval - elapsed).await;
2014                }
2015            }
2016        });
2017    }
2018
2019    async fn tick(&self) -> Result<()> {
2020        // 1. 更新温度
2021        self.update_temperature().await;
2022        let temp = *self.temp.read().await;
2023
2024        // 2. 采样候选(避免全扫)
2025        let all_metas = self.akashic.scan_all_metas()?;
2026        let sample_count = ((all_metas.len() as f32 * self.cfg.sample_rate) as usize)
2027            .clamp(1, self.cfg.max_candidates);
2028
2029        // 随机采样
2030        let mut indices: Vec<usize> = (0..all_metas.len()).collect();
2031        fastrand::shuffle(&mut indices);
2032
2033        let candidates: Vec<&MemoryMeta> = indices.iter()
2034            .take(sample_count)
2035            .map(|&i| &all_metas[i])
2036            .collect();
2037
2038        // 3. 对每个候选计算概率,随机决策
2039        for meta in candidates {
2040            let score = self.policy.score(meta);
2041            let prob = sigmoid(score, self.cfg.sigmoid_k);
2042
2043            if fastrand::f32() < prob * temp.min(1.0) {
2044                let permit = match self.mig_sem.clone().try_acquire_owned() {
2045                    Ok(p) => p,
2046                    Err(_) => continue, // 并发上限已达,跳过
2047                };
2048
2049                let morpher = self.morpher.clone();
2050                let restore_q = self.restore_q.clone();
2051                let id = meta.id.clone();
2052                let loc = meta.location.clone();
2053                let trace_id = Uuid::new_v4().to_string();
2054
2055                tokio::spawn(async move {
2056                    match loc {
2057                        StorageLocation::Hot => {
2058                            let _ = morpher.descend(&id, &trace_id).await;
2059                        }
2060                        _ => {
2061                            let _ = restore_q.enqueue(id, 0, trace_id).await;
2062                        }
2063                    }
2064                    drop(permit);
2065                });
2066            }
2067        }
2068
2069        // 4. 重要性衰减
2070        self.feedback.decay_all()?;
2071
2072        // 5. 处理热层自然驱逐(LRU溢出)
2073        let evicted = self.hot.collect_evictions();
2074        for id in evicted {
2075            let morpher = self.morpher.clone();
2076            let trace_id = Uuid::new_v4().to_string();
2077            tokio::spawn(async move {
2078                let _ = morpher.descend(&id, &trace_id).await;
2079            });
2080        }
2081
2082        Ok(())
2083    }
2084
2085    async fn update_temperature(&self) {
2086        let mut t = self.temp.write().await;
2087        *t *= self.cfg.decay_rate;
2088        *t = t.max(self.cfg.min_temperature);
2089
2090        // 系统压力检测
2091        if self.detect_pressure().await {
2092            *t = (*t * self.cfg.pressure_scale).min(self.cfg.max_temperature);
2093        }
2094    }
2095
2096    async fn detect_pressure(&self) -> bool {
2097        let q_depth = self.restore_q.queue_depth();
2098        if q_depth > 100 {
2099            return true;
2100        }
2101        let hot_count = self.hot.len();
2102        if hot_count > 0 && q_depth as f64 / hot_count as f64 > 0.05 {
2103            return true;
2104        }
2105        false
2106    }
2107
2108    pub async fn set_temperature(&self, v: f32) {
2109        *self.temp.write().await = v.clamp(self.cfg.min_temperature, self.cfg.max_temperature);
2110    }
2111
2112    pub async fn get_temperature(&self) -> f32 {
2113        *self.temp.read().await
2114    }
2115}
2116
2117// ═══════════════════════════════════════════════════════════════════════════════ 
2118// § 16 记忆管理器 —— 统一入口
2119// ═══════════════════════════════════════════════════════════════════════════════ 
2120
2121pub struct MemoryManager {
2122    akashic: Arc<AkashicRecords>,
2123    hot: Arc<HotIndex>,
2124    cold: Arc<ColdStore>,
2125    nebula: Arc<NebulaIndex>,
2126    morpher: Arc<Metamorphoser>,
2127    slo: Arc<SloManager>,
2128    restore_q: Arc<RestoreQueue>,
2129    flux: Arc<FluxEngine>,
2130    feedback: Arc<FeedbackEngine>,
2131    metrics: Arc<Metrics>,
2132    hot_cfg: HotConfig,
2133    cold_cfg: ColdConfig,
2134}
2135
2136impl MemoryManager {
2137    /// 构建完整系统
2138    pub async fn new(
2139        hot_cfg: HotConfig,
2140        cold_cfg: ColdConfig,
2141        flux_cfg: FluxConfig,
2142        db_path: &str,
2143    ) -> Result<Arc<Self>> {
2144        let metrics = Arc::new(Metrics::new(10_000));
2145        let akashic = Arc::new(AkashicRecords::open(db_path)?);
2146        let cold = Arc::new(ColdStore::new(cold_cfg.clone()).await?);
2147
2148        // 恢复冷层分区
2149        let existing_parts = akashic.load_all_partitions()?;
2150        let nebula = Arc::new(NebulaIndex::new(
2151            hot_cfg.dim,
2152            cold_cfg.top_partitions,
2153            existing_parts,
2154        ));
2155
2156        let hot = Arc::new(HotIndex::new(&hot_cfg)?);
2157
2158        let morpher = Arc::new(Metamorphoser::new(
2159            akashic.clone(),
2160            cold.clone(),
2161            hot.clone(),
2162            nebula.clone(),
2163            metrics.clone(),
2164        ));
2165
2166        // 启动时回滚所有悬挂迁移
2167        let rolled = morpher.rollback_pending().await?;
2168        if rolled > 0 {
2169            warn!("rolled back {} dangling migrations on startup", rolled);
2170        }
2171
2172        let feedback = Arc::new(FeedbackEngine::new(
2173            akashic.clone(),
2174            flux_cfg.importance_decay,
2175            metrics.clone(),
2176        ));
2177
2178        let restore_q = Arc::new(RestoreQueue::new(
2179            morpher.clone(),
2180            flux_cfg.restore_concurrency,
2181            flux_cfg.restore_backoff_ms,
2182            flux_cfg.restore_max_retries,
2183            metrics.clone(),
2184        ));
2185
2186        let slo = Arc::new(SloManager::new(hot.clone(), hot_cfg.clone(), metrics.clone()));
2187
2188        let flux = Arc::new(FluxEngine::new(
2189            flux_cfg,
2190            akashic.clone(),
2191            morpher.clone(),
2192            feedback.clone(),
2193            restore_q.clone(),
2194            metrics.clone(),
2195            hot.clone(),
2196        ));
2197
2198        // 预热:把sled中Hot状态的向量装载到内存热层
2199        let hot_metas: Vec<MemoryMeta> = akashic.scan_all_metas()?
2200            .into_iter()
2201            .filter(|m| m.location == StorageLocation::Hot)
2202            .collect();
2203
2204        let mut preloaded = 0usize;
2205        for m in hot_metas {
2206            if let Ok(Some(vec)) = akashic.get_raw_vector(&m.id) {
2207                if hot.add_sync(m.id.clone(), vec).is_ok() {
2208                    preloaded += 1;
2209                }
2210            }
2211        }
2212
2213        if preloaded > 0 {
2214            info!("preloaded {} hot vectors from metadata store", preloaded);
2215        }
2216
2217        let mgr = Arc::new(Self {
2218            akashic,
2219            hot,
2220            cold,
2221            nebula,
2222            morpher,
2223            slo,
2224            restore_q,
2225            flux: flux.clone(),
2226            feedback,
2227            metrics,
2228            hot_cfg: hot_cfg.clone(),
2229            cold_cfg: cold_cfg.clone(),
2230        });
2231
2232        // 启动波动引擎
2233        flux.start();
2234
2235        Ok(mgr)
2236    }
2237
2238    // ── 写入 ──────────────────────────────────────────────────────────────────
2239    pub async fn insert(&self, id: String, vec: Vec<f32>) -> Result<()> {
2240        if vec.len() != self.hot_cfg.dim {
2241            return Err(AetherError::DimensionMismatch {
2242                expected: self.hot_cfg.dim,
2243                got: vec.len(),
2244            }.into());
2245        }
2246
2247        let trace_id = Uuid::new_v4().to_string();
2248        let now = now_ms();
2249
2250        // 保存原始向量(用于冷层分区构建和崩溃恢复)
2251        self.akashic.put_raw_vector(&id, &vec)?;
2252
2253        let meta = MemoryMeta {
2254            id: id.clone(),
2255            location: StorageLocation::Hot,
2256            last_access_ms: now,
2257            created_ms: now,
2258            freq: 0,
2259            importance: 0.5,
2260            cold_cost_mb: 0.0,
2261            version: 1,
2262            dimension: vec.len(),
2263        };
2264
2265        self.akashic.put_meta(&meta)?;
2266        self.akashic.put_importance(&id, 0.5)?;
2267        self.hot.write_batch(vec![(id.clone(), vec)], trace_id.clone()).await?;
2268
2269        debug!("[{}] inserted {}", trace_id, id);
2270        Ok(())
2271    }
2272
2273    pub async fn insert_batch(&self, items: Vec<(String, Vec<f32>)>) -> Result<()> {
2274        let dim = self.hot_cfg.dim;
2275        // 修复:改为 _id 消除警告
2276        for (_id, vec) in &items {
2277            if vec.len() != dim {
2278                return Err(AetherError::DimensionMismatch {
2279                    expected: dim,
2280                    got: vec.len(),
2281                }.into());
2282            }
2283        }
2284
2285        let trace_id = Uuid::new_v4().to_string();
2286        let now = now_ms();
2287
2288        for (id, vec) in &items {
2289            self.akashic.put_raw_vector(id, vec)?;
2290            let meta = MemoryMeta {
2291                id: id.clone(),
2292                location: StorageLocation::Hot,
2293                last_access_ms: now,
2294                created_ms: now,
2295                freq: 0,
2296                importance: 0.5,
2297                cold_cost_mb: 0.0,
2298                version: 1,
2299                dimension: vec.len(),
2300            };
2301            self.akashic.put_meta(&meta)?;
2302            self.akashic.put_importance(id, 0.5)?;
2303        }
2304
2305        self.hot.write_batch(items.clone(), trace_id.clone()).await?;
2306        info!("[{}] batch inserted {} items", trace_id, items.len());
2307        Ok(())
2308    }
2309
2310    // ── 检索 ──────────────────────────────────────────────────────────────────
2311    /// 搜索:热层优先,未命中时冷层粗筛+异步恢复
2312    pub async fn search(&self, query: Vec<f32>, k: usize) -> Result<Vec<SearchResult>> {
2313        let trace_id = Uuid::new_v4().to_string();
2314        let t0 = Instant::now();
2315
2316        match self.slo.search(query.clone(), k, 0, trace_id.clone()).await {
2317            Ok(results) if !results.is_empty() => {
2318                // 更新访问时间
2319                let now = now_ms();
2320                for (id, _) in &results {
2321                    let _ = self.akashic.update_access(id, now);
2322                    self.hot.touch(id);
2323                }
2324                let latency = t0.elapsed();
2325                return Ok(results.into_iter().map(|(id, dist)| SearchResult {
2326                    id,
2327                    distance: dist,
2328                    from_hot: true,
2329                    latency,
2330                }).collect());
2331            }
2332            _ => {}
2333        }
2334
2335        self.metrics.record_hot_miss();
2336        self.metrics.record_cold_fallback();
2337
2338        // 冷层粗筛
2339        let candidates = self.nebula.coarse_candidates(&query);
2340        if candidates.is_empty() {
2341            return Ok(Vec::new());
2342        }
2343
2344        // 并行计算冷层候选距离
2345        let akashic = self.akashic.clone();
2346        let q2 = query.clone();
2347
2348        let mut cold_results: Vec<(String, f32)> = tokio::task::spawn_blocking(move || {
2349            candidates.iter()
2350                .filter_map(|id| {
2351                    akashic.get_raw_vector(id).ok()?.map(|v| (id.clone(), l2_distance(&q2, &v)))
2352                })
2353                .collect::<Vec<_>>()
2354        }).await.unwrap_or_default();
2355
2356        cold_results.sort_by(|a, b| a.1.partial_cmp(&b.1).unwrap_or(Ordering::Equal));
2357        cold_results.truncate(k);
2358
2359        // 异步恢复top结果到热层
2360        for (id, _) in cold_results.iter().take(3) {
2361            let _ = self.restore_q.enqueue(id.clone(), 1, Uuid::new_v4().to_string()).await;
2362            self.metrics.record_restore_enqueued();
2363        }
2364
2365        let latency = t0.elapsed();
2366        Ok(cold_results.into_iter().map(|(id, dist)| SearchResult {
2367            id,
2368            distance: dist,
2369            from_hot: false,
2370            latency,
2371        }).collect())
2372    }
2373
2374    /// 按ID精确获取
2375    pub async fn get_by_id(&self, id: &str) -> Result<Vec<f32>> {
2376        if let Some(vec) = self.hot.get(id) {
2377            let _ = self.akashic.update_access(id, now_ms());
2378            return Ok(vec);
2379        }
2380
2381        if let Some(meta) = self.akashic.get_meta(id)? {
2382            let vec = match &meta.location {
2383                StorageLocation::Local(_) | StorageLocation::S3(_) => {
2384                    self.cold.get(id).await?
2385                }
2386                StorageLocation::Hot => {
2387                    return Err(AetherError::Inconsistency(
2388                        format!("meta says hot but not in hot index: {}", id)
2389                    ).into());
2390                }
2391            };
2392            let _ = self.akashic.update_access(id, now_ms());
2393            return Ok(vec);
2394        }
2395
2396        Err(AetherError::NotFound(id.to_string()).into())
2397    }
2398
2399    // ── 删除 ──────────────────────────────────────────────────────────────────
2400    pub async fn delete(&self, id: &str) -> Result<()> {
2401        let trace_id = Uuid::new_v4().to_string();
2402
2403        if let Some(meta) = self.akashic.get_meta(id)? {
2404            match &meta.location {
2405                StorageLocation::Hot => {
2406                    self.hot.remove(id);
2407                }
2408                StorageLocation::Local(_) | StorageLocation::S3(_) => {
2409                    self.cold.delete(id).await?;
2410                    self.nebula.remove_from_partitions(id);
2411                }
2412            }
2413        }
2414
2415        self.akashic.remove_meta(id)?;
2416        self.akashic.remove_raw_vector(id)?;
2417        self.akashic.remove_importance(id)?;
2418
2419        info!("[{}] deleted {}", trace_id, id);
2420        Ok(())
2421    }
2422
2423    // ── 反馈 ──────────────────────────────────────────────────────────────────
2424    pub async fn apply_feedback(&self, id: &str, reward: f32) -> Result<()> {
2425        self.feedback.apply(id, reward).await
2426    }
2427
2428    // ── 运维 ──────────────────────────────────────────────────────────────────
2429    pub async fn health(&self) -> HealthStatus {
2430        let all = self.akashic.scan_all_metas().unwrap_or_default();
2431        let hot = all.iter().filter(|m| m.location == StorageLocation::Hot).count();
2432        let cold = all.len() - hot;
2433
2434        HealthStatus {
2435            healthy: true,
2436            hot_items: hot,
2437            cold_items: cold,
2438            total_items: all.len(),
2439            temperature: self.flux.get_temperature().await,
2440            restore_queue_depth: self.restore_q.queue_depth(),
2441            pending_migrations: self.akashic.scan_pending_migrations().len(),
2442            hot_hit_rate: self.metrics.hit_rate(),
2443            avg_search_ms: self.metrics.avg_latency_ms(),
2444            p99_search_ms: self.metrics.p99_latency_ms(),
2445        }
2446    }
2447
2448    pub async fn stats(&self) -> SystemStats {
2449        let metas = self.akashic.scan_all_metas().unwrap_or_default();
2450        let hot_count = metas.iter().filter(|m| m.location == StorageLocation::Hot).count();
2451        let avg_imp = if metas.is_empty() {
2452            0.0
2453        } else {
2454            metas.iter().map(|m| m.importance).sum::<f32>() / metas.len() as f32
2455        };
2456        let avg_freq = if metas.is_empty() {
2457            0
2458        } else {
2459            metas.iter().map(|m| m.freq).sum::<u64>() / metas.len() as u64
2460        };
2461
2462        SystemStats {
2463            total_items: metas.len(),
2464            hot_items: hot_count,
2465            cold_items: metas.len() - hot_count,
2466            avg_importance: avg_imp,
2467            avg_freq,
2468            metrics_snapshot: self.metrics.snapshot(),
2469        }
2470    }
2471
2472    /// 手动触发冷层分区重建
2473    pub async fn rebuild_nebula_index(&self) -> Result<()> {
2474        let cold_metas: Vec<MemoryMeta> = self.akashic.scan_all_metas()?
2475            .into_iter()
2476            .filter(|m| matches!(m.location, StorageLocation::Local(_) | StorageLocation::S3(_)))
2477            .collect();
2478
2479        let akashic = self.akashic.clone();
2480        let dim = self.hot_cfg.dim;
2481        let k = self.cold_cfg.partition_count;
2482
2483        let vectors: Vec<(String, Vec<f32>)> = tokio::task::spawn_blocking(move || {
2484            cold_metas.iter()
2485                .filter_map(|m| akashic.get_raw_vector(&m.id).ok()?.map(|v| (m.id.clone(), v)))
2486                .filter(|(_, v)| v.len() == dim)
2487                .collect()
2488        }).await.unwrap_or_default();
2489
2490        if vectors.is_empty() {
2491            return Ok(());
2492        }
2493
2494        self.nebula.build(&vectors, k);
2495
2496        // 持久化分区
2497        self.akashic.clear_all_partitions()?;
2498        for part in self.nebula.snapshot() {
2499            self.akashic.put_partition(&part)?;
2500        }
2501
2502        info!("rebuilt nebula index: {} partitions over {} cold vectors", self.nebula.partition_count(), vectors.len());
2503        Ok(())
2504    }
2505
2506    pub async fn manual_descend(&self, id: &str) -> Result<()> {
2507        self.morpher.descend(id, &Uuid::new_v4().to_string()).await
2508    }
2509
2510    pub async fn manual_ascend(&self, id: &str) -> Result<()> {
2511        self.morpher.ascend(id, &Uuid::new_v4().to_string()).await
2512    }
2513
2514    pub async fn set_temperature(&self, v: f32) {
2515        self.flux.set_temperature(v).await;
2516    }
2517
2518    pub async fn get_temperature(&self) -> f32 {
2519        self.flux.get_temperature().await
2520    }
2521
2522    pub fn metrics(&self) -> &Metrics {
2523        &self.metrics
2524    }
2525}
2526
2527// ═══════════════════════════════════════════════════════════════════════════════ 
2528// § 17 测试
2529// ═══════════════════════════════════════════════════════════════════════════════ 
2530
2531#[cfg(test)]
2532mod tests {
2533    use super::*;
2534
2535    #[test]
2536    fn test_vector_codec_roundtrip() {
2537        let v = vec![1.0f32, -2.5, 0.0, 3.14, -0.001];
2538        let enc = VectorCodec::encode(&v);
2539        let dec = VectorCodec::decode(&enc).unwrap();
2540        assert_eq!(v.len(), dec.len());
2541        for (a, b) in v.iter().zip(dec.iter()) {
2542            assert!((a - b).abs() < 1e-6, "codec mismatch: {} vs {}", a, b);
2543        }
2544    }
2545
2546    #[test]
2547    fn test_vector_codec_compressed() {
2548        let v = vec![0.1f32; 512];
2549        let enc = VectorCodec::encode(&v);
2550        let cmp = VectorCodec::compress(&enc, 3).unwrap();
2551        let dcm = VectorCodec::decompress(&cmp).unwrap();
2552        assert_eq!(enc, dcm);
2553    }
2554
2555    #[test]
2556    fn test_l2_distance() {
2557        let a = vec![1.0f32, 0.0, 0.0];
2558        let b = vec![0.0f32, 1.0, 0.0];
2559        let d = l2_distance(&a, &b);
2560        assert!((d - 2.0f32.sqrt()).abs() < 1e-5);
2561    }
2562
2563    #[test]
2564    fn test_cosine_sim() {
2565        let a = vec![1.0f32, 0.0, 0.0];
2566        let b = vec![1.0f32, 0.0, 0.0];
2567        assert!((cosine_sim(&a, &b) - 1.0).abs() < 1e-5);
2568        let c = vec![-1.0f32, 0.0, 0.0];
2569        assert!((cosine_sim(&a, &c) + 1.0).abs() < 1e-5);
2570    }
2571
2572    #[test]
2573    fn test_metrics_p99() {
2574        let m = Metrics::new(1000);
2575        for i in 1..=100u64 {
2576            m.record_search_latency(i * 1_000_000); // 1ms, 2ms, ..., 100ms
2577        }
2578        let p99 = m.p99_latency_ms();
2579        // p99 should be near 99ms
2580        assert!(p99 >= 95.0 && p99 <= 100.0, "p99={}", p99);
2581    }
2582
2583    #[test]
2584    fn test_eviction_policy_ordering() {
2585        let policy = EvictionPolicy::new(0.6, 0.3, 1.0, 0.5);
2586        let now = now_ms();
2587
2588        // 最近访问、高频、高重要性 → 高分 → 不被驱逐
2589        let hot_meta = MemoryMeta {
2590            id: "hot".into(),
2591            location: StorageLocation::Hot,
2592            last_access_ms: now,
2593            created_ms: now - 1000,
2594            freq: 100,
2595            importance: 0.9,
2596            cold_cost_mb: 0.1,
2597            version: 1,
2598            dimension: 768,
2599        };
2600
2601        // 很久未访问、低频、低重要性 → 低分 → 应被驱逐
2602        let cold_meta = MemoryMeta {
2603            id: "cold".into(),
2604            location: StorageLocation::Hot,
2605            last_access_ms: now - 7_200_000,
2606            created_ms: now - 10_000_000,
2607            freq: 1,
2608            importance: 0.1,
2609            cold_cost_mb: 0.1,
2610            version: 1,
2611            dimension: 768,
2612        };
2613
2614        let s_hot = policy.score(&hot_meta);
2615        let s_cold = policy.score(&cold_meta);
2616        assert!(s_hot > s_cold, "hot={} cold={}", s_hot, s_cold);
2617
2618        let evictions = policy.select_evictions(&[hot_meta, cold_meta], 1);
2619        assert_eq!(evictions, vec!["cold"]);
2620    }
2621
2622    #[test]
2623    fn test_hot_index_basic() {
2624        let cfg = HotConfig {
2625            dim: 4,
2626            max_items: 10,
2627            shard_count: 2,
2628            ..Default::default()
2629        };
2630        let idx = HotIndex::new(&cfg).unwrap();
2631
2632        idx.add_sync("a".into(), vec![1.0, 0.0, 0.0, 0.0]).unwrap();
2633        idx.add_sync("b".into(), vec![0.0, 1.0, 0.0, 0.0]).unwrap();
2634        idx.add_sync("c".into(), vec![0.0, 0.0, 1.0, 0.0]).unwrap();
2635
2636        assert!(idx.contains("a"));
2637        assert_eq!(idx.get("b").unwrap(), vec![0.0, 1.0, 0.0, 0.0]);
2638
2639        let results = idx.search(&[1.0, 0.0, 0.0, 0.0], 2).unwrap();
2640        assert_eq!(results.len(), 2);
2641        assert_eq!(results[0].0, "a");
2642
2643        idx.remove("b");
2644        assert!(!idx.contains("b"));
2645    }
2646
2647    #[test]
2648    fn test_nebula_index_build_and_search() {
2649        let vectors: Vec<(String, Vec<f32>)> = (0..50).map(|i| {
2650            let x = (i as f32) / 50.0;
2651            (format!("id{}", i), vec![x, 1.0 - x, x * 0.5, (1.0 - x) * 0.5])
2652        }).collect();
2653
2654        let nebula = NebulaIndex::new(4, 3, Vec::new());
2655        nebula.build(&vectors, 5);
2656        assert_eq!(nebula.partition_count(), 5);
2657
2658        let query = vec![0.0f32, 1.0, 0.0, 0.5];
2659        let candidates = nebula.coarse_candidates(&query);
2660        assert!(!candidates.is_empty(), "coarse search returned nothing");
2661    }
2662
2663    #[test]
2664    fn test_akashic_records() {
2665        let dir = format!("/tmp/aether_test_{}", fastrand::u32(..));
2666        let ak = AkashicRecords::open(&dir).unwrap();
2667
2668        let meta = MemoryMeta {
2669            id: "test_id".into(),
2670            location: StorageLocation::Hot,
2671            last_access_ms: 1000,
2672            created_ms: 500,
2673            freq: 3,
2674            importance: 0.7,
2675            cold_cost_mb: 0.0,
2676            version: 1,
2677            dimension: 128,
2678        };
2679
2680        ak.put_meta(&meta).unwrap();
2681        let got = ak.get_meta("test_id").unwrap().unwrap();
2682        assert_eq!(got.id, "test_id");
2683        assert_eq!(got.freq, 3);
2684
2685        ak.put_importance("test_id", 0.8).unwrap();
2686        let imp = ak.get_importance("test_id").unwrap();
2687        assert!((imp - 0.8).abs() < 1e-5, "importance: {}", imp);
2688
2689        ak.remove_meta("test_id").unwrap();
2690        assert!(ak.get_meta("test_id").unwrap().is_none());
2691
2692        let _ = std::fs::remove_dir_all(&dir);
2693    }
2694
2695    #[tokio::test]
2696    async fn test_memory_manager_end_to_end() {
2697        let db_dir = format!("/tmp/aether_mgr_{}", fastrand::u32(..));
2698        let cold_dir = format!("/tmp/aether_cold_{}", fastrand::u32(..));
2699
2700        let hot_cfg = HotConfig {
2701            dim: 4,
2702            max_items: 100,
2703            shard_count: 2,
2704            ..Default::default()
2705        };
2706        let cold_cfg = ColdConfig {
2707            local_dir: cold_dir.clone(),
2708            ..Default::default()
2709        };
2710        let flux_cfg = FluxConfig {
2711            tick_interval: Duration::from_secs(3600),
2712            ..Default::default()
2713        };
2714
2715        let mgr = MemoryManager::new(hot_cfg, cold_cfg, flux_cfg, &db_dir).await.unwrap();
2716
2717        mgr.insert("v1".into(), vec![1.0, 0.0, 0.0, 0.0]).await.unwrap();
2718        mgr.insert("v2".into(), vec![0.0, 1.0, 0.0, 0.0]).await.unwrap();
2719        mgr.insert("v3".into(), vec![0.0, 0.0, 1.0, 0.0]).await.unwrap();
2720
2721        let results = mgr.search(vec![1.0, 0.0, 0.0, 0.0], 2).await.unwrap();
2722        assert!(!results.is_empty());
2723        assert_eq!(results[0].id, "v1");
2724        assert!(results[0].from_hot);
2725
2726        let vec = mgr.get_by_id("v2").await.unwrap();
2727        assert_eq!(vec, vec![0.0, 1.0, 0.0, 0.0]);
2728
2729        mgr.apply_feedback("v1", 1.0).await.unwrap();
2730        let imp = mgr.akashic.get_importance("v1").unwrap();
2731        assert!(imp > 0.5, "importance should increase: {}", imp);
2732
2733        let health = mgr.health().await;
2734        assert!(health.healthy);
2735        assert_eq!(health.hot_items, 3);
2736
2737        mgr.delete("v2").await.unwrap();
2738        assert!(mgr.get_by_id("v2").await.is_err());
2739
2740        let h2 = mgr.health().await;
2741        assert_eq!(h2.total_items, 2);
2742
2743        let _ = std::fs::remove_dir_all(&db_dir);
2744        let _ = std::fs::remove_dir_all(&cold_dir);
2745    }
2746}
2747
2748// ═══════════════════════════════════════════════════════════════════════════════ 
2749// § 18 主程序入口示例
2750// ═══════════════════════════════════════════════════════════════════════════════ 
2751
2752#[tokio::main]
2753async fn main() -> Result<()> {
2754    env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info"))
2755        .format_timestamp_millis()
2756        .init();
2757
2758    info!("AetherMemory 启动中...");
2759
2760    let hot_cfg = HotConfig {
2761        dim: 1536,
2762        max_items: 200_000,
2763        shard_count: 16,
2764        search_timeout: Duration::from_millis(80),
2765        max_concurrent_searches: 256,
2766        batch_window_ms: 5,
2767        max_batch_size: 64,
2768    };
2769
2770    let cold_cfg = ColdConfig {
2771        local_dir: "./aether_cold".into(),
2772        s3_bucket: std::env::var("AETHER_S3_BUCKET").ok(),
2773        s3_prefix: Some("aether".into()),
2774        s3_region: std::env::var("AWS_REGION").ok().or(Some("us-east-1".into())),
2775        compress_level: 3,
2776        partition_count: 512,
2777        top_partitions: 16,
2778    };
2779
2780    let flux_cfg = FluxConfig {
2781        initial_temperature: 0.3,
2782        min_temperature: 0.005,
2783        max_temperature: 8.0,
2784        decay_rate: 0.997,
2785        pressure_scale: 2.5,
2786        tick_interval: Duration::from_secs(10),
2787        sample_rate: 0.01,
2788        max_candidates: 512,
2789        alpha_recency: 0.55,
2790        beta_freq: 0.25,
2791        gamma_importance: 1.0,
2792        delta_cost: 0.4,
2793        sigmoid_k: 1.2,
2794        importance_decay: 0.998,
2795        max_concurrent_migrations: 16,
2796        restore_concurrency: 8,
2797        restore_backoff_ms: 100,
2798        restore_max_retries: 5,
2799    };
2800
2801    let mgr = MemoryManager::new(hot_cfg, cold_cfg, flux_cfg, "./aether_meta").await?;
2802
2803    info!("系统就绪,热层维度 = 1536");
2804
2805    // 演示插入
2806    let probe = vec![0.1f32; 1536];
2807    mgr.insert("probe_0001".into(), probe.clone()).await?;
2808    info!("插入 probe_0001");
2809
2810    // 演示检索
2811    let results = mgr.search(probe, 5).await?;
2812    info!("检索返回 {} 条结果,首条ID={:?}, 来自热层={}", 
2813          results.len(), 
2814          results.first().map(|r| &r.id),
2815          results.first().map(|r| r.from_hot).unwrap_or(false));
2816
2817    // 演示反馈
2818    mgr.apply_feedback("probe_0001", 0.9).await?;
2819    info!("施加正反馈到 probe_0001");
2820
2821    // 健康报告
2822    let h = mgr.health().await;
2823    info!("健康报告: hot={}, cold={}, temp={:.3}, hit_rate={:.2}%, p99_ms={:.2}",
2824          h.hot_items, h.cold_items, h.temperature, h.hot_hit_rate * 100.0, h.p99_search_ms);
2825
2826    // 输出Prometheus指标
2827    info!("指标:\n{}", mgr.metrics().to_prometheus());
2828
2829    // 等待退出信号
2830    tokio::signal::ctrl_c().await?;
2831    info!("AetherMemory 关闭。");
2832
2833    Ok(())
2834}
2835
2836// ═══════════════════════════════════════════════════════════════════════════════ 
2837// Cargo.toml 依赖(粘贴到 [dependencies] 节)
2838//
2839// tokio = { version = "1.35", features = ["rt-multi-thread","macros","time","sync","signal"] }
2840// anyhow = "1.0"
2841// thiserror = "1.0"
2842// serde = { version = "1.0", features = ["derive"] }
2843// serde_json = "1.0"
2844// bytes = "1.5"
2845// byteorder = "1.5"
2846// parking_lot = "0.12"
2847// sled = "0.34"
2848// zstd = "0.13"
2849// lru = "0.12"
2850// rayon = "1.9"
2851// rand = "0.8"
2852// uuid = { version = "1.6", features = ["v4"] }
2853// log = "0.4"
2854// env_logger = "0.11"
2855// async-trait = "0.1"
2856//
2857// 可选S3后端(实现 S3Backend trait 后注入):
2858// aws-config = { version = "1.8", features = ["behavior-version-latest"] }
2859// aws-sdk-s3 = "1.128"
2860// ═══════════════════════════════════════════════════════════════════════════════