#![allow(dead_code)]
#![allow(unused_imports)]
mod sled {
use rusqlite::{params, Connection};
use std::sync::{Arc, Mutex};
#[derive(Debug, Clone)]
pub struct Error(pub String);
impl std::fmt::Display for Error {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "kv error: {}", self.0)
}
}
impl std::error::Error for Error {}
pub type IVec = Vec<u8>;
#[derive(Clone)]
pub struct Db(Arc<Mutex<Connection>>);
impl std::fmt::Debug for Db {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Db").finish()
}
}
impl Db {
fn conn(&self) -> std::sync::MutexGuard<'_, Connection> {
self.0.lock().unwrap()
}
pub fn get(&self, key: &[u8]) -> Result<Option<IVec>, Error> {
let conn = self.conn();
let mut stmt = conn.prepare_cached(
"SELECT val FROM kv WHERE key = ?1"
).map_err(|e| Error(e.to_string()))?;
let mut rows = stmt.query(params![key])
.map_err(|e| Error(e.to_string()))?;
match rows.next().map_err(|e| Error(e.to_string()))? {
Some(row) => {
let v: Vec<u8> = row.get(0).map_err(|e| Error(e.to_string()))?;
Ok(Some(v))
}
None => Ok(None),
}
}
pub fn insert(
&self,
key: impl AsRef<[u8]>,
value: impl Into<Vec<u8>>,
) -> Result<Option<IVec>, Error> {
let k = key.as_ref().to_vec();
let v: Vec<u8> = value.into();
let old = self.get(&k)?;
self.conn()
.execute(
"INSERT OR REPLACE INTO kv (key, val) VALUES (?1, ?2)",
params![k, v],
)
.map_err(|e| Error(e.to_string()))?;
Ok(old)
}
pub fn remove(&self, key: impl AsRef<[u8]>) -> Result<Option<IVec>, Error> {
let k = key.as_ref().to_vec();
let old = self.get(&k)?;
self.conn()
.execute("DELETE FROM kv WHERE key = ?1", params![k])
.map_err(|e| Error(e.to_string()))?;
Ok(old)
}
pub fn flush(&self) -> Result<usize, Error> {
Ok(0) }
pub fn scan_prefix(&self, prefix: &[u8]) -> ScanIter {
let conn = self.conn();
let mut end = prefix.to_vec();
let pairs: Vec<(IVec, IVec)> = {
let mut stmt = conn.prepare(
"SELECT key, val FROM kv WHERE key >= ?1 ORDER BY key"
).unwrap_or_else(|_| conn.prepare("SELECT key, val FROM kv").unwrap());
let p = prefix.to_vec();
stmt.query_map(params![p], |row| {
let k: Vec<u8> = row.get(0)?;
let v: Vec<u8> = row.get(1)?;
Ok((k, v))
})
.unwrap_or_else(|_| {
panic!("scan_prefix query failed")
})
.filter_map(|r| r.ok())
.filter(|(k, _)| k.starts_with(prefix))
.collect()
};
ScanIter { data: pairs, pos: 0 }
}
pub fn apply_batch(&self, batch: Batch) -> Result<(), Error> {
let conn = self.conn();
conn.execute_batch("BEGIN").map_err(|e| Error(e.to_string()))?;
for op in batch.ops {
match op {
BatchOp::Insert(k, v) => {
conn.execute(
"INSERT OR REPLACE INTO kv (key, val) VALUES (?1, ?2)",
params![k, v],
).map_err(|e| Error(e.to_string()))?;
}
BatchOp::Remove(k) => {
conn.execute(
"DELETE FROM kv WHERE key = ?1",
params![k],
).map_err(|e| Error(e.to_string()))?;
}
}
}
conn.execute_batch("COMMIT").map_err(|e| Error(e.to_string()))?;
Ok(())
}
pub fn clear(&self) -> Result<(), Error> {
self.conn()
.execute("DELETE FROM kv", [])
.map_err(|e| Error(e.to_string()))?;
Ok(())
}
}
enum BatchOp { Insert(Vec<u8>, Vec<u8>), Remove(Vec<u8>) }
pub struct Batch { pub ops: Vec<BatchOp> }
impl Default for Batch {
fn default() -> Self { Self { ops: Vec::new() } }
}
impl Batch {
pub fn insert(&mut self, key: impl AsRef<[u8]>, value: impl Into<Vec<u8>>) {
self.ops.push(BatchOp::Insert(key.as_ref().to_vec(), value.into()));
}
pub fn remove(&mut self, key: impl AsRef<[u8]>) {
self.ops.push(BatchOp::Remove(key.as_ref().to_vec()));
}
}
pub struct ScanIter { data: Vec<(IVec, IVec)>, pos: usize }
impl Iterator for ScanIter {
type Item = Result<(IVec, IVec), Error>;
fn next(&mut self) -> Option<Self::Item> {
if self.pos >= self.data.len() { return None; }
let pair = self.data[self.pos].clone();
self.pos += 1;
Some(Ok(pair))
}
}
pub fn open<P: AsRef<std::path::Path>>(_path: P) -> Result<Db, Error> {
let conn = Connection::open_in_memory()
.map_err(|e| Error(e.to_string()))?;
conn.execute_batch(
"PRAGMA journal_mode = WAL;
CREATE TABLE IF NOT EXISTS kv (key BLOB PRIMARY KEY, val BLOB NOT NULL);"
).map_err(|e| Error(e.to_string()))?;
Ok(Db(Arc::new(Mutex::new(conn))))
}
}
mod lru {
use std::collections::{HashMap, VecDeque};
use std::hash::Hash;
use std::num::NonZeroUsize;
pub struct LruCache<K: Hash + Eq + Clone, V> {
cap: usize,
map: HashMap<K, V>,
order: VecDeque<K>,
}
impl<K: Hash + Eq + Clone, V> LruCache<K, V> {
pub fn new(cap: NonZeroUsize) -> Self {
Self {
cap: cap.get(),
map: HashMap::new(),
order: VecDeque::new(),
}
}
fn touch(&mut self, k: &K) {
if let Some(pos) = self.order.iter().position(|x| x == k) {
self.order.remove(pos);
self.order.push_back(k.clone());
}
}
fn evict_if_needed(&mut self) {
while self.map.len() > self.cap {
if let Some(old) = self.order.pop_front() {
self.map.remove(&old);
}
}
}
pub fn put(&mut self, k: K, v: V) -> Option<V> {
if let Some(pos) = self.order.iter().position(|x| x == &k) {
self.order.remove(pos);
}
self.order.push_back(k.clone());
let old = self.map.insert(k, v);
self.evict_if_needed();
old
}
pub fn get(&mut self, k: &K) -> Option<&V> {
if self.map.contains_key(k) {
self.touch(k);
self.map.get(k)
} else {
None
}
}
pub fn peek(&self, k: &K) -> Option<&V> {
self.map.get(k)
}
pub fn pop(&mut self, k: &K) -> Option<V> {
if let Some(pos) = self.order.iter().position(|x| x == k) {
self.order.remove(pos);
}
self.map.remove(k)
}
pub fn pop_lru(&mut self) -> Option<(K, V)> {
let k = self.order.pop_front()?;
let v = self.map.remove(&k)?;
Some((k, v))
}
pub fn len(&self) -> usize {
self.map.len()
}
pub fn is_empty(&self) -> bool {
self.map.is_empty()
}
pub fn contains(&self, k: &K) -> bool {
self.map.contains_key(k)
}
}
}
use lru::LruCache;
use fastrand;
use std::cmp::Ordering;
use std::collections::{BinaryHeap, HashMap, HashSet};
use std::hash::{Hash, Hasher};
use std::io::{Cursor, Read, Write as IoWrite};
use std::num::NonZeroUsize;
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering as AtomicOrd};
use std::sync::Arc;
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use anyhow::{anyhow, Context, Result};
use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
use bytes::{BufMut, Bytes, BytesMut};
use log::{debug, error, info, warn};
use parking_lot::{Mutex, RwLock};
use serde::{Deserialize, Serialize};
use thiserror::Error;
use tokio::fs;
use tokio::sync::{mpsc, oneshot, Semaphore};
use tokio::time::{sleep, timeout};
use uuid::Uuid;
#[derive(Debug, Error)]
pub enum AetherError {
#[error("IO错误: {0}")]
Io(#[from] std::io::Error),
#[error("元数据存储错误: {0}")]
Meta(String),
#[error("序列化错误: {0}")]
Serialization(String),
#[error("S3错误: {0}")]
S3(String),
#[error("操作超时: {0:?}")]
Timeout(Duration),
#[error("记忆未找到: {0}")]
NotFound(String),
#[error("维度不匹配: 期望 {expected}, 实际 {got}")]
DimensionMismatch { expected: usize, got: usize },
#[error("容量超限: 热层={hot}, 最大={max}")]
CapacityExceeded { hot: usize, max: usize },
#[error("迁移进行中: {0}")]
MigrationInProgress(String),
#[error("压缩错误: {0}")]
Compression(String),
#[error("内部状态不一致: {0}")]
Inconsistency(String),
}
impl From<sled::Error> for AetherError {
fn from(e: sled::Error) -> Self {
AetherError::Meta(e.to_string())
}
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct HotConfig {
pub dim: usize,
pub max_items: usize,
pub search_timeout: Duration,
pub shard_count: usize,
pub max_concurrent_searches: usize,
pub batch_window_ms: u64,
pub max_batch_size: usize,
}
impl Default for HotConfig {
fn default() -> Self {
Self {
dim: 768,
max_items: 100_000,
search_timeout: Duration::from_millis(100),
shard_count: 8,
max_concurrent_searches: 128,
batch_window_ms: 5,
max_batch_size: 64,
}
}
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ColdConfig {
pub local_dir: String,
pub s3_bucket: Option<String>,
pub s3_prefix: Option<String>,
pub s3_region: Option<String>,
pub compress_level: i32,
pub partition_count: usize,
pub top_partitions: usize,
}
impl Default for ColdConfig {
fn default() -> Self {
Self {
local_dir: "./aether_cold".to_string(),
s3_bucket: None,
s3_prefix: None,
s3_region: Some("us-east-1".to_string()),
compress_level: 3,
partition_count: 256,
top_partitions: 8,
}
}
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct FluxConfig {
pub initial_temperature: f32,
pub min_temperature: f32,
pub max_temperature: f32,
pub decay_rate: f32,
pub pressure_scale: f32,
pub tick_interval: Duration,
pub sample_rate: f32,
pub max_candidates: usize,
pub alpha_recency: f32,
pub beta_freq: f32,
pub gamma_importance: f32,
pub delta_cost: f32,
pub sigmoid_k: f32,
pub importance_decay: f32,
pub max_concurrent_migrations: usize,
pub restore_concurrency: usize,
pub restore_backoff_ms: u64,
pub restore_max_retries: u32,
}
impl Default for FluxConfig {
fn default() -> Self {
Self {
initial_temperature: 0.5,
min_temperature: 0.01,
max_temperature: 10.0,
decay_rate: 0.995,
pressure_scale: 2.0,
tick_interval: Duration::from_secs(5),
sample_rate: 0.02,
max_candidates: 256,
alpha_recency: 0.6,
beta_freq: 0.3,
gamma_importance: 1.0,
delta_cost: 0.5,
sigmoid_k: 1.0,
importance_decay: 0.995,
max_concurrent_migrations: 10,
restore_concurrency: 5,
restore_backoff_ms: 50,
restore_max_retries: 5,
}
}
}
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
pub enum StorageLocation {
Hot,
Local(String),
S3(String),
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct MemoryMeta {
pub id: String,
pub location: StorageLocation,
pub last_access_ms: i64,
pub created_ms: i64,
pub freq: u64,
pub importance: f32,
pub cold_cost_mb: f32,
pub version: u64,
pub dimension: usize,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum MigrationState {
Started { target: StorageLocation },
Uploaded { target: StorageLocation },
Committed,
RolledBack,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct Partition {
pub id: usize,
pub centroid: Vec<f32>,
pub keys: Vec<String>,
}
#[derive(Clone, Debug)]
pub struct SearchResult {
pub id: String,
pub distance: f32,
pub from_hot: bool,
pub latency: Duration,
}
#[derive(Clone, Debug, Serialize)]
pub struct HealthStatus {
pub healthy: bool,
pub hot_items: usize,
pub cold_items: usize,
pub total_items: usize,
pub temperature: f32,
pub restore_queue_depth: usize,
pub pending_migrations: usize,
pub hot_hit_rate: f64,
pub avg_search_ms: f64,
pub p99_search_ms: f64,
}
#[derive(Clone, Debug, Serialize)]
pub struct SystemStats {
pub total_items: usize,
pub hot_items: usize,
pub cold_items: usize,
pub avg_importance: f32,
pub avg_freq: u64,
pub metrics_snapshot: HashMap<String, u64>,
}
#[inline]
pub fn now_ms() -> i64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as i64
}
#[inline]
pub fn l2_sq(a: &[f32], b: &[f32]) -> f32 {
a.iter().zip(b.iter()).map(|(x, y)| {
let d = x - y;
d * d
}).sum()
}
#[inline]
pub fn l2_distance(a: &[f32], b: &[f32]) -> f32 {
l2_sq(a, b).sqrt()
}
#[inline]
pub fn cosine_sim(a: &[f32], b: &[f32]) -> f32 {
let dot: f32 = a.iter().zip(b.iter()).map(|(x, y)| x * y).sum();
let na: f32 = a.iter().map(|x| x * x).sum::<f32>().sqrt();
let nb: f32 = b.iter().map(|x| x * x).sum::<f32>().sqrt();
if na < 1e-9 || nb < 1e-9 {
0.0
} else {
(dot / (na * nb)).clamp(-1.0, 1.0)
}
}
fn fnv1a(s: &str) -> u64 {
let mut h: u64 = 0xcbf29ce484222325;
for b in s.bytes() {
h ^= b as u64;
h = h.wrapping_mul(0x100000001b3);
}
h
}
pub struct VectorCodec;
impl VectorCodec {
pub fn encode(v: &[f32]) -> Vec<u8> {
let mut buf = BytesMut::with_capacity(4 + v.len() * 4);
buf.put_u32_le(v.len() as u32);
for &x in v {
buf.put_f32_le(x);
}
buf.freeze().to_vec()
}
pub fn decode(raw: &[u8]) -> Result<Vec<f32>> {
let mut cur = Cursor::new(raw);
let dim = cur.read_u32::<LittleEndian>()
.map_err(|e| AetherError::Serialization(e.to_string()))? as usize;
let mut v = Vec::with_capacity(dim);
for _ in 0..dim {
v.push(cur.read_f32::<LittleEndian>()
.map_err(|e| AetherError::Serialization(e.to_string()))?);
}
Ok(v)
}
pub fn compress(data: &[u8], level: i32) -> Result<Vec<u8>> {
zstd::encode_all(data, level)
.map_err(|e| AetherError::Compression(e.to_string()).into())
}
pub fn decompress(data: &[u8]) -> Result<Vec<u8>> {
zstd::decode_all(data)
.map_err(|e| AetherError::Compression(e.to_string()).into())
}
}
struct MinHeapItem {
dist: f32,
id: String,
}
impl Ord for MinHeapItem {
fn cmp(&self, other: &Self) -> Ordering {
other.dist.partial_cmp(&self.dist).unwrap_or(Ordering::Equal)
}
}
impl PartialOrd for MinHeapItem {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl PartialEq for MinHeapItem {
fn eq(&self, other: &Self) -> bool {
self.dist == other.dist
}
}
impl Eq for MinHeapItem {}
pub struct Metrics {
hot_hits: Arc<AtomicU64>,
hot_misses: Arc<AtomicU64>,
cold_fallbacks: Arc<AtomicU64>,
migrations_down: Arc<AtomicU64>,
migrations_up: Arc<AtomicU64>,
migrations_failed: Arc<AtomicU64>,
migrations_rollback: Arc<AtomicU64>,
restore_enqueued: Arc<AtomicU64>,
total_searches: Arc<AtomicU64>,
total_latency_ns: Arc<AtomicU64>,
feedback_applied: Arc<AtomicU64>,
latency_ring: Arc<Mutex<Vec<u64>>>,
ring_cursor: Arc<AtomicUsize>,
ring_capacity: usize,
}
impl Metrics {
pub fn new(ring_capacity: usize) -> Self {
Self {
hot_hits: Arc::new(AtomicU64::new(0)),
hot_misses: Arc::new(AtomicU64::new(0)),
cold_fallbacks: Arc::new(AtomicU64::new(0)),
migrations_down: Arc::new(AtomicU64::new(0)),
migrations_up: Arc::new(AtomicU64::new(0)),
migrations_failed: Arc::new(AtomicU64::new(0)),
migrations_rollback: Arc::new(AtomicU64::new(0)),
restore_enqueued: Arc::new(AtomicU64::new(0)),
total_searches: Arc::new(AtomicU64::new(0)),
total_latency_ns: Arc::new(AtomicU64::new(0)),
feedback_applied: Arc::new(AtomicU64::new(0)),
latency_ring: Arc::new(Mutex::new(vec![0u64; ring_capacity])),
ring_cursor: Arc::new(AtomicUsize::new(0)),
ring_capacity,
}
}
pub fn record_hot_hit(&self) {
self.hot_hits.fetch_add(1, AtomicOrd::Relaxed);
}
pub fn record_hot_miss(&self) {
self.hot_misses.fetch_add(1, AtomicOrd::Relaxed);
}
pub fn record_cold_fallback(&self) {
self.cold_fallbacks.fetch_add(1, AtomicOrd::Relaxed);
}
pub fn record_migration_down(&self) {
self.migrations_down.fetch_add(1, AtomicOrd::Relaxed);
}
pub fn record_migration_up(&self) {
self.migrations_up.fetch_add(1, AtomicOrd::Relaxed);
}
pub fn record_migration_failed(&self) {
self.migrations_failed.fetch_add(1, AtomicOrd::Relaxed);
}
pub fn record_migration_rollback(&self) {
self.migrations_rollback.fetch_add(1, AtomicOrd::Relaxed);
}
pub fn record_restore_enqueued(&self) {
self.restore_enqueued.fetch_add(1, AtomicOrd::Relaxed);
}
pub fn record_feedback(&self) {
self.feedback_applied.fetch_add(1, AtomicOrd::Relaxed);
}
pub fn record_search_latency(&self, ns: u64) {
self.total_searches.fetch_add(1, AtomicOrd::Relaxed);
self.total_latency_ns.fetch_add(ns, AtomicOrd::Relaxed);
let idx = self.ring_cursor.fetch_add(1, AtomicOrd::Relaxed) % self.ring_capacity;
self.latency_ring.lock()[idx] = ns;
}
pub fn hit_rate(&self) -> f64 {
let hits = self.hot_hits.load(AtomicOrd::Relaxed);
let total = hits + self.hot_misses.load(AtomicOrd::Relaxed);
if total == 0 {
0.0
} else {
hits as f64 / total as f64
}
}
pub fn avg_latency_ms(&self) -> f64 {
let n = self.total_searches.load(AtomicOrd::Relaxed);
if n == 0 {
return 0.0;
}
self.total_latency_ns.load(AtomicOrd::Relaxed) as f64 / n as f64 / 1_000_000.0
}
pub fn p99_latency_ms(&self) -> f64 {
let ring = self.latency_ring.lock();
let mut samples: Vec<u64> = ring.iter().copied().filter(|&v| v > 0).collect();
if samples.is_empty() {
return 0.0;
}
samples.sort_unstable();
let idx = ((samples.len() as f64 * 0.99) as usize).min(samples.len() - 1);
samples[idx] as f64 / 1_000_000.0
}
pub fn snapshot(&self) -> HashMap<String, u64> {
let mut m = HashMap::new();
m.insert("hot_hits".into(), self.hot_hits.load(AtomicOrd::Relaxed));
m.insert("hot_misses".into(), self.hot_misses.load(AtomicOrd::Relaxed));
m.insert("cold_fallbacks".into(), self.cold_fallbacks.load(AtomicOrd::Relaxed));
m.insert("migrations_down".into(), self.migrations_down.load(AtomicOrd::Relaxed));
m.insert("migrations_up".into(), self.migrations_up.load(AtomicOrd::Relaxed));
m.insert("migrations_failed".into(), self.migrations_failed.load(AtomicOrd::Relaxed));
m.insert("migrations_rollback".into(), self.migrations_rollback.load(AtomicOrd::Relaxed));
m.insert("restore_enqueued".into(), self.restore_enqueued.load(AtomicOrd::Relaxed));
m.insert("total_searches".into(), self.total_searches.load(AtomicOrd::Relaxed));
m.insert("feedback_applied".into(), self.feedback_applied.load(AtomicOrd::Relaxed));
m
}
pub fn to_prometheus(&self) -> String {
let snap = self.snapshot();
let mut lines: Vec<String> = snap.iter()
.map(|(k, v)| format!("aether_{} {}", k, v))
.collect();
lines.push(format!("aether_hit_rate {:.6}", self.hit_rate()));
lines.push(format!("aether_avg_latency_ms {:.3}", self.avg_latency_ms()));
lines.push(format!("aether_p99_latency_ms {:.3}", self.p99_latency_ms()));
lines.join("\n")
}
}
pub struct AkashicRecords {
db: sled::Db,
}
impl AkashicRecords {
pub fn open(path: &str) -> Result<Self> {
let db = sled::open(path).map_err(AetherError::from)?;
Ok(Self { db })
}
pub fn put_meta(&self, meta: &MemoryMeta) -> Result<()> {
let key = format!("meta:{}", meta.id);
let val = serde_json::to_vec(meta)
.map_err(|e| AetherError::Serialization(e.to_string()))?;
self.db.insert(key.as_bytes(), val).map_err(AetherError::from)?;
self.db.flush().map_err(AetherError::from)?;
Ok(())
}
pub fn get_meta(&self, id: &str) -> Result<Option<MemoryMeta>> {
let key = format!("meta:{}", id);
match self.db.get(key.as_bytes()).map_err(AetherError::from)? {
Some(v) => Ok(Some(serde_json::from_slice(&v)
.map_err(|e| AetherError::Serialization(e.to_string()))?)),
None => Ok(None),
}
}
pub fn remove_meta(&self, id: &str) -> Result<()> {
let key = format!("meta:{}", id);
self.db.remove(key.as_bytes()).map_err(AetherError::from)?;
Ok(())
}
pub fn commit_meta_and_clear_migration(&self, meta: &MemoryMeta) -> Result<()> {
let meta_key = format!("meta:{}", meta.id);
let mig_key = format!("mig:{}", meta.id);
let meta_val = serde_json::to_vec(meta)
.map_err(|e| AetherError::Serialization(e.to_string()))?;
let mut batch = sled::Batch::default();
batch.insert(meta_key.as_bytes(), meta_val);
batch.remove(mig_key.as_bytes());
self.db.apply_batch(batch).map_err(AetherError::from)?;
self.db.flush().map_err(AetherError::from)?;
Ok(())
}
pub fn update_access(&self, id: &str, now_ms: i64) -> Result<()> {
if let Some(mut meta) = self.get_meta(id)? {
meta.last_access_ms = now_ms;
meta.freq = meta.freq.saturating_add(1);
self.put_meta(&meta)?;
}
Ok(())
}
pub fn scan_all_metas(&self) -> Result<Vec<MemoryMeta>> {
let mut out = Vec::new();
for item in self.db.scan_prefix(b"meta:") {
let (_, v) = item.map_err(AetherError::from)?;
if let Ok(m) = serde_json::from_slice::<MemoryMeta>(&v) {
out.push(m);
}
}
Ok(out)
}
pub fn count_hot(&self) -> usize {
self.db.scan_prefix(b"meta:")
.filter_map(|r| r.ok())
.filter(|(_, v)| {
serde_json::from_slice::<MemoryMeta>(v)
.map(|m| m.location == StorageLocation::Hot)
.unwrap_or(false)
})
.count()
}
pub fn put_raw_vector(&self, id: &str, vec: &[f32]) -> Result<()> {
let key = format!("vec:{}", id);
let encoded = VectorCodec::encode(vec);
let compressed = VectorCodec::compress(&encoded, 3)?;
self.db.insert(key.as_bytes(), compressed).map_err(AetherError::from)?;
Ok(())
}
pub fn get_raw_vector(&self, id: &str) -> Result<Option<Vec<f32>>> {
let key = format!("vec:{}", id);
match self.db.get(key.as_bytes()).map_err(AetherError::from)? {
Some(v) => {
let decompressed = VectorCodec::decompress(&v)?;
Ok(Some(VectorCodec::decode(&decompressed)?))
}
None => Ok(None),
}
}
pub fn remove_raw_vector(&self, id: &str) -> Result<()> {
let key = format!("vec:{}", id);
self.db.remove(key.as_bytes()).map_err(AetherError::from)?;
Ok(())
}
pub fn put_migration_state(&self, id: &str, state: &MigrationState) -> Result<()> {
let key = format!("mig:{}", id);
let val = serde_json::to_vec(state)
.map_err(|e| AetherError::Serialization(e.to_string()))?;
self.db.insert(key.as_bytes(), val).map_err(AetherError::from)?;
self.db.flush().map_err(AetherError::from)?;
Ok(())
}
pub fn get_migration_state(&self, id: &str) -> Result<Option<MigrationState>> {
let key = format!("mig:{}", id);
match self.db.get(key.as_bytes()).map_err(AetherError::from)? {
Some(v) => Ok(Some(serde_json::from_slice(&v)
.map_err(|e| AetherError::Serialization(e.to_string()))?)),
None => Ok(None),
}
}
pub fn remove_migration_state(&self, id: &str) -> Result<()> {
let key = format!("mig:{}", id);
self.db.remove(key.as_bytes()).map_err(AetherError::from)?;
Ok(())
}
pub fn scan_pending_migrations(&self) -> Vec<String> {
self.db.scan_prefix(b"mig:")
.filter_map(|r| r.ok())
.filter_map(|(k, _)| {
String::from_utf8(k.to_vec()).ok()
.and_then(|s| s.strip_prefix("mig:").map(|id| id.to_string()))
})
.collect()
}
pub fn put_importance(&self, id: &str, val: f32) -> Result<()> {
let key = format!("imp:{}", id);
let encoded = val.to_le_bytes();
self.db.insert(key.as_bytes(), &encoded).map_err(AetherError::from)?;
Ok(())
}
pub fn get_importance(&self, id: &str) -> Result<f32> {
let key = format!("imp:{}", id);
match self.db.get(key.as_bytes()).map_err(AetherError::from)? {
Some(v) if v.len() >= 4 => {
let bytes: [u8; 4] = v[..4].try_into().unwrap_or([0u8; 4]);
Ok(f32::from_le_bytes(bytes))
}
_ => Ok(0.5),
}
}
pub fn remove_importance(&self, id: &str) -> Result<()> {
let key = format!("imp:{}", id);
self.db.remove(key.as_bytes()).map_err(AetherError::from)?;
Ok(())
}
pub fn scan_all_importance(&self) -> Vec<(String, f32)> {
self.db.scan_prefix(b"imp:")
.filter_map(|r| r.ok())
.filter_map(|(k, v)| {
let id = String::from_utf8(k.to_vec()).ok()
.and_then(|s| s.strip_prefix("imp:").map(|s| s.to_string()))?;
if v.len() < 4 {
return None;
}
let bytes: [u8; 4] = v[..4].try_into().ok()?;
Some((id, f32::from_le_bytes(bytes)))
})
.collect()
}
pub fn put_partition(&self, p: &Partition) -> Result<()> {
let key = format!("part:{}", p.id);
let val = serde_json::to_vec(p)
.map_err(|e| AetherError::Serialization(e.to_string()))?;
self.db.insert(key.as_bytes(), val).map_err(AetherError::from)?;
Ok(())
}
pub fn load_all_partitions(&self) -> Result<Vec<Partition>> {
let mut parts = Vec::new();
for item in self.db.scan_prefix(b"part:") {
let (_, v) = item.map_err(AetherError::from)?;
if let Ok(p) = serde_json::from_slice::<Partition>(&v) {
parts.push(p);
}
}
Ok(parts)
}
pub fn clear_all_partitions(&self) -> Result<()> {
let keys: Vec<Vec<u8>> = self.db.scan_prefix(b"part:")
.filter_map(|r| r.ok().map(|(k, _)| k.to_vec()))
.collect();
for k in keys {
self.db.remove(&k).map_err(AetherError::from)?;
}
Ok(())
}
}
pub struct ColdStore {
cfg: ColdConfig,
s3: Option<Arc<dyn S3Backend + Send + Sync>>,
}
#[async_trait::async_trait]
pub trait S3Backend: Send + Sync {
async fn put(&self, bucket: &str, key: &str, data: Bytes) -> Result<()>;
async fn get(&self, bucket: &str, key: &str) -> Result<Bytes>;
async fn delete(&self, bucket: &str, key: &str) -> Result<()>;
async fn exists(&self, bucket: &str, key: &str) -> bool;
}
impl ColdStore {
pub async fn new(cfg: ColdConfig) -> Result<Self> {
std::fs::create_dir_all(&cfg.local_dir)?;
Ok(Self { cfg, s3: None })
}
pub fn with_s3(mut self, backend: Arc<dyn S3Backend + Send + Sync>) -> Self {
self.s3 = Some(backend);
self
}
fn local_path(&self, id: &str) -> PathBuf {
let h = fnv1a(id);
let d1 = (h >> 56) & 0xFF;
let d2 = (h >> 48) & 0xFF;
PathBuf::from(&self.cfg.local_dir)
.join(format!("{:02x}", d1))
.join(format!("{:02x}", d2))
.join(format!("{}.bin.zst", id))
}
fn s3_key(&self, id: &str) -> String {
match &self.cfg.s3_prefix {
Some(p) => format!("{}/{}.bin.zst", p.trim_end_matches('/'), id),
None => format!("{}.bin.zst", id),
}
}
pub async fn put(&self, id: &str, vec: &[f32]) -> Result<StorageLocation> {
let encoded = VectorCodec::encode(vec);
let compressed = VectorCodec::compress(&encoded, self.cfg.compress_level)?;
let local = self.local_path(id);
if let Some(parent) = local.parent() {
fs::create_dir_all(parent).await?;
}
fs::write(&local, &compressed).await?;
if let (Some(s3), Some(bucket)) = (&self.s3, &self.cfg.s3_bucket) {
let key = self.s3_key(id);
s3.put(bucket, &key, Bytes::from(compressed)).await?;
return Ok(StorageLocation::S3(key));
}
Ok(StorageLocation::Local(local.to_string_lossy().to_string()))
}
pub async fn get(&self, id: &str) -> Result<Vec<f32>> {
let local = self.local_path(id);
if local.exists() {
let compressed = fs::read(&local).await?;
let encoded = VectorCodec::decompress(&compressed)?;
return VectorCodec::decode(&encoded);
}
if let (Some(s3), Some(bucket)) = (&self.s3, &self.cfg.s3_bucket) {
let key = self.s3_key(id);
let data = s3.get(bucket, &key).await?;
if let Some(parent) = local.parent() {
fs::create_dir_all(parent).await?;
}
fs::write(&local, &data).await?;
let encoded = VectorCodec::decompress(&data)?;
return VectorCodec::decode(&encoded);
}
Err(AetherError::NotFound(id.to_string()).into())
}
pub async fn delete(&self, id: &str) -> Result<()> {
let local = self.local_path(id);
if local.exists() {
fs::remove_file(&local).await?;
}
if let (Some(s3), Some(bucket)) = (&self.s3, &self.cfg.s3_bucket) {
let _ = s3.delete(bucket, &self.s3_key(id)).await;
}
Ok(())
}
pub async fn exists(&self, id: &str) -> bool {
self.local_path(id).exists()
}
}
struct HotShard {
dim: usize,
max: usize,
vectors: RwLock<HashMap<String, Vec<f32>>>,
lru: Mutex<LruCache<String, ()>>,
}
impl HotShard {
fn new(dim: usize, max: usize) -> Self {
Self {
dim,
max,
vectors: RwLock::new(HashMap::new()),
lru: Mutex::new(LruCache::new(NonZeroUsize::new(max.max(1)).unwrap())),
}
}
fn add(&self, id: String, vec: Vec<f32>) -> Result<()> {
if vec.len() != self.dim {
return Err(AetherError::DimensionMismatch {
expected: self.dim,
got: vec.len(),
}.into());
}
self.vectors.write().insert(id.clone(), vec);
self.lru.lock().put(id, ());
Ok(())
}
fn remove(&self, id: &str) -> Option<Vec<f32>> {
self.lru.lock().pop(&id.to_string()); self.vectors.write().remove(id)
}
fn get(&self, id: &str) -> Option<Vec<f32>> {
let v = self.vectors.read().get(id).cloned();
if v.is_some() {
self.lru.lock().get(&id.to_string()); }
v
}
fn touch(&self, id: &str) {
self.lru.lock().get(&id.to_string()); }
fn contains(&self, id: &str) -> bool {
self.vectors.read().contains_key(id)
}
fn len(&self) -> usize {
self.vectors.read().len()
}
#[inline(never)]
fn search(&self, query: &[f32], k: usize) -> Result<Vec<(String, f32)>> {
if query.len() != self.dim {
return Err(AetherError::DimensionMismatch {
expected: self.dim,
got: query.len(),
}.into());
}
let vecs = self.vectors.read();
let mut results: Vec<(String, f32)> = vecs.iter()
.map(|(id, v)| (id.clone(), l2_distance(query, v)))
.collect();
results.sort_by(|a, b| a.1.partial_cmp(&b.1).unwrap_or(Ordering::Equal));
results.truncate(k);
Ok(results)
}
fn evict_overflows(&self) -> Vec<String> {
let mut evicted = Vec::new();
let mut lru = self.lru.lock();
while lru.len() > self.max {
if let Some((id, _)) = lru.pop_lru() {
self.vectors.write().remove(&id);
evicted.push(id);
} else {
break;
}
}
evicted
}
}
pub struct HotIndex {
shards: Vec<Arc<HotShard>>,
dim: usize,
write_tx: mpsc::Sender<WriteBatch>,
}
struct WriteBatch {
items: Vec<(String, Vec<f32>)>,
resp: oneshot::Sender<Result<()>>,
trace_id: String,
}
struct SearchBatch {
query: Vec<f32>,
k: usize,
resp: oneshot::Sender<Result<Vec<(String, f32)>>>,
priority: u8,
trace_id: String,
}
impl HotIndex {
pub fn new(cfg: &HotConfig) -> Result<Self> {
let shard_max = (cfg.max_items / cfg.shard_count).max(1);
let shards: Vec<Arc<HotShard>> = (0..cfg.shard_count)
.map(|_| Arc::new(HotShard::new(cfg.dim, shard_max)))
.collect();
let (write_tx, mut write_rx) = mpsc::channel::<WriteBatch>(8192);
let shards_clone = shards.clone();
let dim = cfg.dim;
tokio::spawn(async move {
while let Some(batch) = write_rx.recv().await {
let result: Result<()> = (|| {
for (id, vec) in batch.items {
let idx = (fnv1a(&id) as usize) % shards_clone.len();
shards_clone[idx].add(id, vec)?;
}
Ok(())
})();
let _ = batch.resp.send(result);
}
});
Ok(Self { shards, dim, write_tx })
}
fn shard_for(&self, id: &str) -> &Arc<HotShard> {
let idx = (fnv1a(id) as usize) % self.shards.len();
&self.shards[idx]
}
pub fn add_sync(&self, id: String, vec: Vec<f32>) -> Result<()> {
self.shard_for(&id).add(id, vec)
}
pub async fn write_batch(&self, items: Vec<(String, Vec<f32>)>, trace_id: String) -> Result<()> {
let (tx, rx) = oneshot::channel();
self.write_tx.send(WriteBatch { items, resp: tx, trace_id })
.await
.map_err(|e| anyhow!("write_tx send error: {}", e))?;
rx.await.map_err(|e| anyhow!("write_tx recv error: {}", e))?
}
pub fn remove(&self, id: &str) -> Option<Vec<f32>> {
self.shard_for(id).remove(id)
}
pub fn get(&self, id: &str) -> Option<Vec<f32>> {
self.shard_for(id).get(id)
}
pub fn touch(&self, id: &str) {
self.shard_for(id).touch(id);
}
pub fn contains(&self, id: &str) -> bool {
self.shard_for(id).contains(id)
}
pub fn len(&self) -> usize {
self.shards.iter().map(|s| s.len()).sum()
}
#[inline(never)]
pub fn search(&self, query: &[f32], k: usize) -> Result<Vec<(String, f32)>> {
if query.len() != self.dim {
return Err(AetherError::DimensionMismatch {
expected: self.dim,
got: query.len(),
}.into());
}
let per_shard: Vec<Vec<(String, f32)>> = self.shards
.iter()
.map(|shard| shard.search(query, k).unwrap_or_default())
.collect();
let mut merged: Vec<(String, f32)> = per_shard.into_iter().flatten().collect();
merged.sort_by(|a, b| a.1.partial_cmp(&b.1).unwrap_or(Ordering::Equal));
merged.truncate(k);
Ok(merged)
}
pub fn collect_evictions(&self) -> Vec<String> {
self.shards.iter().flat_map(|s| s.evict_overflows()).collect()
}
}
pub struct NebulaIndex {
partitions: RwLock<Vec<Partition>>,
dim: usize,
top_k: usize,
}
impl NebulaIndex {
pub fn new(dim: usize, top_k: usize, existing: Vec<Partition>) -> Self {
Self {
partitions: RwLock::new(existing),
dim,
top_k,
}
}
#[inline(never)]
pub fn build(&self, vectors: &[(String, Vec<f32>)], k: usize) {
if vectors.is_empty() {
return;
}
let k = k.min(vectors.len());
let dim = vectors[0].1.len();
let mut centroids: Vec<Vec<f32>> = Vec::with_capacity(k);
let first_idx = fastrand::usize(..vectors.len().max(1));
centroids.push(vectors[first_idx].1.clone());
while centroids.len() < k {
let dists: Vec<f32> = vectors.iter()
.map(|(_, v)| centroids.iter()
.map(|c| l2_sq(v, c))
.fold(f32::MAX, f32::min))
.collect();
let total: f32 = dists.iter().sum();
if total < 1e-9 {
break;
}
let mut r = fastrand::f32() * total;
for (i, &d) in dists.iter().enumerate() {
r -= d;
if r <= 0.0 {
centroids.push(vectors[i].1.clone());
break;
}
}
}
let max_iter = 30;
let mut assignments = vec![0usize; vectors.len()];
for _iter in 0..max_iter {
let mut changed = false;
assignments.iter_mut().enumerate().for_each(|(i, a)| {
let best = centroids.iter()
.enumerate()
.map(|(ci, c)| (ci, l2_sq(&vectors[i].1, c)))
.min_by(|x, y| x.1.partial_cmp(&y.1).unwrap_or(Ordering::Equal))
.map(|(ci, _)| ci)
.unwrap_or(0);
if *a != best {
*a = best;
}
});
let mut sums = vec![vec![0.0f64; dim]; k];
let mut cnts = vec![0usize; k];
for (i, &a) in assignments.iter().enumerate() {
let p = a.min(k - 1);
cnts[p] += 1;
for (j, &v) in vectors[i].1.iter().enumerate() {
sums[p][j] += v as f64;
}
}
for (ci, c) in centroids.iter_mut().enumerate() {
if cnts[ci] > 0 {
let new_c: Vec<f32> = sums[ci].iter().map(|&s| (s / cnts[ci] as f64) as f32).collect();
if l2_sq(c, &new_c) > 1e-8 {
changed = true;
}
*c = new_c;
}
}
if !changed {
break;
}
}
let mut parts: Vec<Partition> = centroids.into_iter().enumerate()
.map(|(i, c)| Partition {
id: i,
centroid: c,
keys: Vec::new(),
})
.collect();
for (i, &a) in assignments.iter().enumerate() {
let p = a.min(parts.len() - 1);
parts[p].keys.push(vectors[i].0.clone());
}
*self.partitions.write() = parts;
}
pub fn coarse_candidates(&self, query: &[f32]) -> Vec<String> {
let parts = self.partitions.read();
if parts.is_empty() {
return Vec::new();
}
let mut dists: Vec<(usize, f32)> = parts.iter()
.enumerate()
.map(|(i, p)| (i, l2_sq(&p.centroid, query)))
.collect();
dists.sort_by(|a, b| a.1.partial_cmp(&b.1).unwrap_or(Ordering::Equal));
dists.iter()
.take(self.top_k)
.flat_map(|(i, _)| parts[*i].keys.iter().cloned())
.collect()
}
pub fn add_to_partition(&self, id: &str, vec: &[f32]) {
let mut parts = self.partitions.write();
if parts.is_empty() {
return;
}
let best = parts.iter()
.enumerate()
.map(|(i, p)| (i, l2_sq(&p.centroid, vec)))
.min_by(|a, b| a.1.partial_cmp(&b.1).unwrap_or(Ordering::Equal))
.map(|(i, _)| i)
.unwrap_or(0);
parts[best].keys.push(id.to_string());
}
pub fn remove_from_partitions(&self, id: &str) {
let mut parts = self.partitions.write();
for p in parts.iter_mut() {
p.keys.retain(|k| k != id);
}
}
pub fn snapshot(&self) -> Vec<Partition> {
self.partitions.read().clone()
}
pub fn partition_count(&self) -> usize {
self.partitions.read().len()
}
}
pub struct Metamorphoser {
akashic: Arc<AkashicRecords>,
cold: Arc<ColdStore>,
hot: Arc<HotIndex>,
nebula: Arc<NebulaIndex>,
metrics: Arc<Metrics>,
locks: Arc<Mutex<HashMap<String, Arc<tokio::sync::Mutex<()>>>>>,
}
impl Metamorphoser {
pub fn new(
akashic: Arc<AkashicRecords>,
cold: Arc<ColdStore>,
hot: Arc<HotIndex>,
nebula: Arc<NebulaIndex>,
metrics: Arc<Metrics>,
) -> Self {
Self {
akashic,
cold,
hot,
nebula,
metrics,
locks: Arc::new(Mutex::new(HashMap::new())),
}
}
fn get_lock(&self, id: &str) -> Arc<tokio::sync::Mutex<()>> {
self.locks.lock()
.entry(id.to_string())
.or_insert_with(|| Arc::new(tokio::sync::Mutex::new(())))
.clone()
}
pub async fn descend(&self, id: &str, trace_id: &str) -> Result<()> {
let lock = self.get_lock(id);
let _guard = lock.lock().await;
if self.akashic.get_migration_state(id)?.is_some() {
return Err(AetherError::MigrationInProgress(id.to_string()).into());
}
let meta = self.akashic.get_meta(id)?
.ok_or_else(|| AetherError::NotFound(id.to_string()))?;
if meta.location != StorageLocation::Hot {
return Ok(());
}
let vec = self.hot.remove(id)
.ok_or_else(|| AetherError::NotFound(format!("hot:{}", id)))?;
self.akashic.put_migration_state(id, &MigrationState::Started {
target: StorageLocation::Local(String::new()),
})?;
let location = match self.cold.put(id, &vec).await {
Ok(loc) => loc,
Err(e) => {
let _ = self.hot.add_sync(id.to_string(), vec);
let _ = self.akashic.remove_migration_state(id);
self.metrics.record_migration_failed();
return Err(e);
}
};
let new_meta = MemoryMeta {
location: location.clone(),
cold_cost_mb: vec.len() as f32 * 4.0 / 1_048_576.0,
version: meta.version + 1,
..meta
};
self.akashic.commit_meta_and_clear_migration(&new_meta)?;
self.nebula.add_to_partition(id, &vec);
self.metrics.record_migration_down();
info!("[{}] descended {} → {:?}", trace_id, id, location);
Ok(())
}
pub async fn ascend(&self, id: &str, trace_id: &str) -> Result<()> {
let lock = self.get_lock(id);
let _guard = lock.lock().await;
if self.akashic.get_migration_state(id)?.is_some() {
return Err(AetherError::MigrationInProgress(id.to_string()).into());
}
let meta = self.akashic.get_meta(id)?
.ok_or_else(|| AetherError::NotFound(id.to_string()))?;
if meta.location == StorageLocation::Hot {
return Ok(());
}
self.akashic.put_migration_state(id, &MigrationState::Started {
target: StorageLocation::Hot,
})?;
let vec = match self.cold.get(id).await {
Ok(v) => v,
Err(e) => {
let _ = self.akashic.remove_migration_state(id);
self.metrics.record_migration_failed();
return Err(e);
}
};
self.hot.write_batch(vec![(id.to_string(), vec.clone())], trace_id.to_string()).await?;
let new_meta = MemoryMeta {
location: StorageLocation::Hot,
cold_cost_mb: 0.0,
freq: meta.freq + 1,
version: meta.version + 1,
..meta
};
self.akashic.commit_meta_and_clear_migration(&new_meta)?;
self.nebula.remove_from_partitions(id);
self.metrics.record_migration_up();
info!("[{}] ascended {} → hot", trace_id, id);
Ok(())
}
pub async fn rollback_pending(&self) -> Result<usize> {
let pending = self.akashic.scan_pending_migrations();
let count = pending.len();
for id in &pending {
warn!("rolling back dangling migration: {}", id);
let _ = self.akashic.remove_migration_state(id);
self.metrics.record_migration_rollback();
}
if count > 0 {
info!("rolled back {} pending migrations", count);
}
Ok(count)
}
}
pub struct FeedbackEngine {
akashic: Arc<AkashicRecords>,
decay_rate: f32,
metrics: Arc<Metrics>,
}
impl FeedbackEngine {
pub fn new(akashic: Arc<AkashicRecords>, decay_rate: f32, metrics: Arc<Metrics>) -> Self {
Self { akashic, decay_rate, metrics }
}
pub async fn apply(&self, id: &str, reward: f32) -> Result<()> {
let mut imp = self.akashic.get_importance(id)?;
imp = (imp * 0.8 + reward * 0.2).clamp(0.0, 1.0);
self.akashic.put_importance(id, imp)?;
if let Some(mut meta) = self.akashic.get_meta(id)? {
meta.importance = imp;
self.akashic.put_meta(&meta)?;
}
self.metrics.record_feedback();
Ok(())
}
pub fn decay_all(&self) -> Result<()> {
let entries = self.akashic.scan_all_importance();
for (id, mut val) in entries {
val *= self.decay_rate;
if val < 0.005 {
let _ = self.akashic.remove_importance(&id);
} else {
let _ = self.akashic.put_importance(&id, val);
}
}
Ok(())
}
}
pub struct EvictionPolicy {
alpha: f32, beta: f32, gamma: f32, delta: f32, }
impl EvictionPolicy {
pub fn new(alpha: f32, beta: f32, gamma: f32, delta: f32) -> Self {
Self { alpha, beta, gamma, delta }
}
pub fn score(&self, meta: &MemoryMeta) -> f32 {
let age_sec = ((now_ms() - meta.last_access_ms).max(1) as f32) / 1000.0;
let recency = -age_sec / 3600.0; let freq = (meta.freq as f32 + 1.0).ln();
self.alpha * recency + self.beta * freq + self.gamma * meta.importance - self.delta * meta.cold_cost_mb
}
pub fn select_evictions(&self, metas: &[MemoryMeta], keep_count: usize) -> Vec<String> {
if metas.len() <= keep_count {
return Vec::new();
}
let mut scored: Vec<(f32, String)> = metas.iter()
.map(|m| (self.score(m), m.id.clone()))
.collect();
scored.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(Ordering::Equal));
scored[keep_count..].iter().map(|(_, id)| id.clone()).collect()
}
}
fn sigmoid(x: f32, k: f32) -> f32 {
1.0 / (1.0 + (-k * x).exp())
}
pub struct RestoreQueue {
tx: mpsc::Sender<(String, u8, String)>,
depth: Arc<AtomicUsize>,
}
impl RestoreQueue {
pub fn new(
morpher: Arc<Metamorphoser>,
concurrency: usize,
backoff_ms: u64,
max_retries: u32,
metrics: Arc<Metrics>,
) -> Self {
let (tx, mut rx) = mpsc::channel::<(String, u8, String)>(4096);
let depth = Arc::new(AtomicUsize::new(0));
let depth_clone = depth.clone();
let sem = Arc::new(Semaphore::new(concurrency.max(1)));
tokio::spawn(async move {
while let Some((id, _priority, trace_id)) = rx.recv().await {
depth_clone.fetch_sub(1, AtomicOrd::SeqCst);
let permit = sem.clone().acquire_owned().await.unwrap();
let m = morpher.clone();
let met = metrics.clone();
let id2 = id.clone();
let tid = trace_id.clone();
tokio::spawn(async move {
let mut backoff = backoff_ms;
for attempt in 0..max_retries {
match m.ascend(&id2, &tid).await {
Ok(_) => break,
Err(e) => {
warn!("[{}] restore {} failed ({}/{}): {}", tid, id2, attempt+1, max_retries, e);
met.record_migration_failed();
if attempt + 1 < max_retries {
sleep(Duration::from_millis(backoff)).await;
backoff = (backoff * 2).min(10_000);
} else {
error!("[{}] restore {} permanently failed", tid, id2);
}
}
}
}
drop(permit);
});
}
});
Self { tx, depth }
}
pub async fn enqueue(&self, id: String, priority: u8, trace_id: String) -> Result<()> {
self.depth.fetch_add(1, AtomicOrd::SeqCst);
self.tx.send((id, priority, trace_id))
.await
.map_err(|e| anyhow!("restore_queue send: {}", e))
}
pub fn queue_depth(&self) -> usize {
self.depth.load(AtomicOrd::Relaxed)
}
}
pub struct SloManager {
search_tx: mpsc::Sender<SearchBatch>,
sem: Arc<Semaphore>,
cfg: HotConfig,
metrics: Arc<Metrics>,
}
impl SloManager {
pub fn new(hot: Arc<HotIndex>, cfg: HotConfig, metrics: Arc<Metrics>) -> Self {
let sem = Arc::new(Semaphore::new(cfg.max_concurrent_searches));
let (tx, mut rx) = mpsc::channel::<SearchBatch>(8192);
let hot_clone = hot.clone();
let sem_clone = sem.clone();
let met_clone = metrics.clone();
let win_ms = cfg.batch_window_ms;
let max_batch = cfg.max_batch_size;
let search_tout = cfg.search_timeout;
tokio::spawn(async move {
loop {
let first = match rx.recv().await {
Some(r) => r,
None => break,
};
let mut batch = vec![first];
let deadline = tokio::time::Instant::now() + Duration::from_millis(win_ms);
while batch.len() < max_batch {
match tokio::time::timeout_at(deadline, rx.recv()).await {
Ok(Some(r)) => batch.push(r),
_ => break,
}
}
for req in batch {
let permit = sem_clone.clone().acquire_owned().await.unwrap();
let h = hot_clone.clone();
let m = met_clone.clone();
let timeout_dur = search_tout;
tokio::spawn(async move {
let t0 = Instant::now();
let result = tokio::time::timeout(
timeout_dur,
tokio::task::spawn_blocking(move || h.search(&req.query, req.k))
).await;
let ns = t0.elapsed().as_nanos() as u64;
m.record_search_latency(ns);
let resp = match result {
Ok(Ok(Ok(r))) => {
m.record_hot_hit();
Ok(r)
}
Ok(Ok(Err(e))) => Err(anyhow!("{}", e)),
Ok(Err(join_e)) => Err(anyhow!("join: {}", join_e)),
Err(_) => {
Err(AetherError::Timeout(timeout_dur).into())
}
};
let _ = req.resp.send(resp);
drop(permit);
});
}
}
});
Self { search_tx: tx, sem, cfg, metrics }
}
pub async fn search(&self, query: Vec<f32>, k: usize, priority: u8, trace_id: String) -> Result<Vec<(String, f32)>> {
let (tx, rx) = oneshot::channel();
self.search_tx.send(SearchBatch { query, k, resp: tx, priority, trace_id })
.await
.map_err(|e| anyhow!("slo_tx: {}", e))?;
rx.await.map_err(|e| anyhow!("slo_rx: {}", e))?
}
}
pub struct FluxEngine {
cfg: FluxConfig,
akashic: Arc<AkashicRecords>,
morpher: Arc<Metamorphoser>,
feedback: Arc<FeedbackEngine>,
restore_q: Arc<RestoreQueue>,
policy: EvictionPolicy,
metrics: Arc<Metrics>,
hot: Arc<HotIndex>,
temp: Arc<tokio::sync::RwLock<f32>>,
mig_sem: Arc<Semaphore>,
}
impl FluxEngine {
pub fn new(
cfg: FluxConfig,
akashic: Arc<AkashicRecords>,
morpher: Arc<Metamorphoser>,
feedback: Arc<FeedbackEngine>,
restore_q: Arc<RestoreQueue>,
metrics: Arc<Metrics>,
hot: Arc<HotIndex>,
) -> Self {
let policy = EvictionPolicy::new(
cfg.alpha_recency,
cfg.beta_freq,
cfg.gamma_importance,
cfg.delta_cost,
);
let init_temp = cfg.initial_temperature;
let max_mig = cfg.max_concurrent_migrations.max(1);
Self {
cfg,
akashic,
morpher,
feedback,
restore_q,
policy,
metrics,
hot,
temp: Arc::new(tokio::sync::RwLock::new(init_temp)),
mig_sem: Arc::new(Semaphore::new(max_mig)),
}
}
pub fn start(self: Arc<Self>) {
let interval = self.cfg.tick_interval;
tokio::spawn(async move {
loop {
let t0 = Instant::now();
if let Err(e) = self.tick().await {
error!("FluxEngine tick error: {}", e);
}
let elapsed = t0.elapsed();
if elapsed < interval {
sleep(interval - elapsed).await;
}
}
});
}
async fn tick(&self) -> Result<()> {
self.update_temperature().await;
let temp = *self.temp.read().await;
let all_metas = self.akashic.scan_all_metas()?;
let sample_count = ((all_metas.len() as f32 * self.cfg.sample_rate) as usize)
.clamp(1, self.cfg.max_candidates);
let mut indices: Vec<usize> = (0..all_metas.len()).collect();
fastrand::shuffle(&mut indices);
let candidates: Vec<&MemoryMeta> = indices.iter()
.take(sample_count)
.map(|&i| &all_metas[i])
.collect();
for meta in candidates {
let score = self.policy.score(meta);
let prob = sigmoid(score, self.cfg.sigmoid_k);
if fastrand::f32() < prob * temp.min(1.0) {
let permit = match self.mig_sem.clone().try_acquire_owned() {
Ok(p) => p,
Err(_) => continue, };
let morpher = self.morpher.clone();
let restore_q = self.restore_q.clone();
let id = meta.id.clone();
let loc = meta.location.clone();
let trace_id = Uuid::new_v4().to_string();
tokio::spawn(async move {
match loc {
StorageLocation::Hot => {
let _ = morpher.descend(&id, &trace_id).await;
}
_ => {
let _ = restore_q.enqueue(id, 0, trace_id).await;
}
}
drop(permit);
});
}
}
self.feedback.decay_all()?;
let evicted = self.hot.collect_evictions();
for id in evicted {
let morpher = self.morpher.clone();
let trace_id = Uuid::new_v4().to_string();
tokio::spawn(async move {
let _ = morpher.descend(&id, &trace_id).await;
});
}
Ok(())
}
async fn update_temperature(&self) {
let mut t = self.temp.write().await;
*t *= self.cfg.decay_rate;
*t = t.max(self.cfg.min_temperature);
if self.detect_pressure().await {
*t = (*t * self.cfg.pressure_scale).min(self.cfg.max_temperature);
}
}
async fn detect_pressure(&self) -> bool {
let q_depth = self.restore_q.queue_depth();
if q_depth > 100 {
return true;
}
let hot_count = self.hot.len();
if hot_count > 0 && q_depth as f64 / hot_count as f64 > 0.05 {
return true;
}
false
}
pub async fn set_temperature(&self, v: f32) {
*self.temp.write().await = v.clamp(self.cfg.min_temperature, self.cfg.max_temperature);
}
pub async fn get_temperature(&self) -> f32 {
*self.temp.read().await
}
}
pub struct MemoryManager {
akashic: Arc<AkashicRecords>,
hot: Arc<HotIndex>,
cold: Arc<ColdStore>,
nebula: Arc<NebulaIndex>,
morpher: Arc<Metamorphoser>,
slo: Arc<SloManager>,
restore_q: Arc<RestoreQueue>,
flux: Arc<FluxEngine>,
feedback: Arc<FeedbackEngine>,
metrics: Arc<Metrics>,
hot_cfg: HotConfig,
cold_cfg: ColdConfig,
}
impl MemoryManager {
pub async fn new(
hot_cfg: HotConfig,
cold_cfg: ColdConfig,
flux_cfg: FluxConfig,
db_path: &str,
) -> Result<Arc<Self>> {
let metrics = Arc::new(Metrics::new(10_000));
let akashic = Arc::new(AkashicRecords::open(db_path)?);
let cold = Arc::new(ColdStore::new(cold_cfg.clone()).await?);
let existing_parts = akashic.load_all_partitions()?;
let nebula = Arc::new(NebulaIndex::new(
hot_cfg.dim,
cold_cfg.top_partitions,
existing_parts,
));
let hot = Arc::new(HotIndex::new(&hot_cfg)?);
let morpher = Arc::new(Metamorphoser::new(
akashic.clone(),
cold.clone(),
hot.clone(),
nebula.clone(),
metrics.clone(),
));
let rolled = morpher.rollback_pending().await?;
if rolled > 0 {
warn!("rolled back {} dangling migrations on startup", rolled);
}
let feedback = Arc::new(FeedbackEngine::new(
akashic.clone(),
flux_cfg.importance_decay,
metrics.clone(),
));
let restore_q = Arc::new(RestoreQueue::new(
morpher.clone(),
flux_cfg.restore_concurrency,
flux_cfg.restore_backoff_ms,
flux_cfg.restore_max_retries,
metrics.clone(),
));
let slo = Arc::new(SloManager::new(hot.clone(), hot_cfg.clone(), metrics.clone()));
let flux = Arc::new(FluxEngine::new(
flux_cfg,
akashic.clone(),
morpher.clone(),
feedback.clone(),
restore_q.clone(),
metrics.clone(),
hot.clone(),
));
let hot_metas: Vec<MemoryMeta> = akashic.scan_all_metas()?
.into_iter()
.filter(|m| m.location == StorageLocation::Hot)
.collect();
let mut preloaded = 0usize;
for m in hot_metas {
if let Ok(Some(vec)) = akashic.get_raw_vector(&m.id) {
if hot.add_sync(m.id.clone(), vec).is_ok() {
preloaded += 1;
}
}
}
if preloaded > 0 {
info!("preloaded {} hot vectors from metadata store", preloaded);
}
let mgr = Arc::new(Self {
akashic,
hot,
cold,
nebula,
morpher,
slo,
restore_q,
flux: flux.clone(),
feedback,
metrics,
hot_cfg: hot_cfg.clone(),
cold_cfg: cold_cfg.clone(),
});
flux.start();
Ok(mgr)
}
pub async fn insert(&self, id: String, vec: Vec<f32>) -> Result<()> {
if vec.len() != self.hot_cfg.dim {
return Err(AetherError::DimensionMismatch {
expected: self.hot_cfg.dim,
got: vec.len(),
}.into());
}
let trace_id = Uuid::new_v4().to_string();
let now = now_ms();
self.akashic.put_raw_vector(&id, &vec)?;
let meta = MemoryMeta {
id: id.clone(),
location: StorageLocation::Hot,
last_access_ms: now,
created_ms: now,
freq: 0,
importance: 0.5,
cold_cost_mb: 0.0,
version: 1,
dimension: vec.len(),
};
self.akashic.put_meta(&meta)?;
self.akashic.put_importance(&id, 0.5)?;
self.hot.write_batch(vec![(id.clone(), vec)], trace_id.clone()).await?;
debug!("[{}] inserted {}", trace_id, id);
Ok(())
}
pub async fn insert_batch(&self, items: Vec<(String, Vec<f32>)>) -> Result<()> {
let dim = self.hot_cfg.dim;
for (_id, vec) in &items {
if vec.len() != dim {
return Err(AetherError::DimensionMismatch {
expected: dim,
got: vec.len(),
}.into());
}
}
let trace_id = Uuid::new_v4().to_string();
let now = now_ms();
for (id, vec) in &items {
self.akashic.put_raw_vector(id, vec)?;
let meta = MemoryMeta {
id: id.clone(),
location: StorageLocation::Hot,
last_access_ms: now,
created_ms: now,
freq: 0,
importance: 0.5,
cold_cost_mb: 0.0,
version: 1,
dimension: vec.len(),
};
self.akashic.put_meta(&meta)?;
self.akashic.put_importance(id, 0.5)?;
}
self.hot.write_batch(items.clone(), trace_id.clone()).await?;
info!("[{}] batch inserted {} items", trace_id, items.len());
Ok(())
}
pub async fn search(&self, query: Vec<f32>, k: usize) -> Result<Vec<SearchResult>> {
let trace_id = Uuid::new_v4().to_string();
let t0 = Instant::now();
match self.slo.search(query.clone(), k, 0, trace_id.clone()).await {
Ok(results) if !results.is_empty() => {
let now = now_ms();
for (id, _) in &results {
let _ = self.akashic.update_access(id, now);
self.hot.touch(id);
}
let latency = t0.elapsed();
return Ok(results.into_iter().map(|(id, dist)| SearchResult {
id,
distance: dist,
from_hot: true,
latency,
}).collect());
}
_ => {}
}
self.metrics.record_hot_miss();
self.metrics.record_cold_fallback();
let candidates = self.nebula.coarse_candidates(&query);
if candidates.is_empty() {
return Ok(Vec::new());
}
let akashic = self.akashic.clone();
let q2 = query.clone();
let mut cold_results: Vec<(String, f32)> = tokio::task::spawn_blocking(move || {
candidates.iter()
.filter_map(|id| {
akashic.get_raw_vector(id).ok()?.map(|v| (id.clone(), l2_distance(&q2, &v)))
})
.collect::<Vec<_>>()
}).await.unwrap_or_default();
cold_results.sort_by(|a, b| a.1.partial_cmp(&b.1).unwrap_or(Ordering::Equal));
cold_results.truncate(k);
for (id, _) in cold_results.iter().take(3) {
let _ = self.restore_q.enqueue(id.clone(), 1, Uuid::new_v4().to_string()).await;
self.metrics.record_restore_enqueued();
}
let latency = t0.elapsed();
Ok(cold_results.into_iter().map(|(id, dist)| SearchResult {
id,
distance: dist,
from_hot: false,
latency,
}).collect())
}
pub async fn get_by_id(&self, id: &str) -> Result<Vec<f32>> {
if let Some(vec) = self.hot.get(id) {
let _ = self.akashic.update_access(id, now_ms());
return Ok(vec);
}
if let Some(meta) = self.akashic.get_meta(id)? {
let vec = match &meta.location {
StorageLocation::Local(_) | StorageLocation::S3(_) => {
self.cold.get(id).await?
}
StorageLocation::Hot => {
return Err(AetherError::Inconsistency(
format!("meta says hot but not in hot index: {}", id)
).into());
}
};
let _ = self.akashic.update_access(id, now_ms());
return Ok(vec);
}
Err(AetherError::NotFound(id.to_string()).into())
}
pub async fn delete(&self, id: &str) -> Result<()> {
let trace_id = Uuid::new_v4().to_string();
if let Some(meta) = self.akashic.get_meta(id)? {
match &meta.location {
StorageLocation::Hot => {
self.hot.remove(id);
}
StorageLocation::Local(_) | StorageLocation::S3(_) => {
self.cold.delete(id).await?;
self.nebula.remove_from_partitions(id);
}
}
}
self.akashic.remove_meta(id)?;
self.akashic.remove_raw_vector(id)?;
self.akashic.remove_importance(id)?;
info!("[{}] deleted {}", trace_id, id);
Ok(())
}
pub async fn apply_feedback(&self, id: &str, reward: f32) -> Result<()> {
self.feedback.apply(id, reward).await
}
pub async fn health(&self) -> HealthStatus {
let all = self.akashic.scan_all_metas().unwrap_or_default();
let hot = all.iter().filter(|m| m.location == StorageLocation::Hot).count();
let cold = all.len() - hot;
HealthStatus {
healthy: true,
hot_items: hot,
cold_items: cold,
total_items: all.len(),
temperature: self.flux.get_temperature().await,
restore_queue_depth: self.restore_q.queue_depth(),
pending_migrations: self.akashic.scan_pending_migrations().len(),
hot_hit_rate: self.metrics.hit_rate(),
avg_search_ms: self.metrics.avg_latency_ms(),
p99_search_ms: self.metrics.p99_latency_ms(),
}
}
pub async fn stats(&self) -> SystemStats {
let metas = self.akashic.scan_all_metas().unwrap_or_default();
let hot_count = metas.iter().filter(|m| m.location == StorageLocation::Hot).count();
let avg_imp = if metas.is_empty() {
0.0
} else {
metas.iter().map(|m| m.importance).sum::<f32>() / metas.len() as f32
};
let avg_freq = if metas.is_empty() {
0
} else {
metas.iter().map(|m| m.freq).sum::<u64>() / metas.len() as u64
};
SystemStats {
total_items: metas.len(),
hot_items: hot_count,
cold_items: metas.len() - hot_count,
avg_importance: avg_imp,
avg_freq,
metrics_snapshot: self.metrics.snapshot(),
}
}
pub async fn rebuild_nebula_index(&self) -> Result<()> {
let cold_metas: Vec<MemoryMeta> = self.akashic.scan_all_metas()?
.into_iter()
.filter(|m| matches!(m.location, StorageLocation::Local(_) | StorageLocation::S3(_)))
.collect();
let akashic = self.akashic.clone();
let dim = self.hot_cfg.dim;
let k = self.cold_cfg.partition_count;
let vectors: Vec<(String, Vec<f32>)> = tokio::task::spawn_blocking(move || {
cold_metas.iter()
.filter_map(|m| akashic.get_raw_vector(&m.id).ok()?.map(|v| (m.id.clone(), v)))
.filter(|(_, v)| v.len() == dim)
.collect()
}).await.unwrap_or_default();
if vectors.is_empty() {
return Ok(());
}
self.nebula.build(&vectors, k);
self.akashic.clear_all_partitions()?;
for part in self.nebula.snapshot() {
self.akashic.put_partition(&part)?;
}
info!("rebuilt nebula index: {} partitions over {} cold vectors", self.nebula.partition_count(), vectors.len());
Ok(())
}
pub async fn manual_descend(&self, id: &str) -> Result<()> {
self.morpher.descend(id, &Uuid::new_v4().to_string()).await
}
pub async fn manual_ascend(&self, id: &str) -> Result<()> {
self.morpher.ascend(id, &Uuid::new_v4().to_string()).await
}
pub async fn set_temperature(&self, v: f32) {
self.flux.set_temperature(v).await;
}
pub async fn get_temperature(&self) -> f32 {
self.flux.get_temperature().await
}
pub fn metrics(&self) -> &Metrics {
&self.metrics
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_vector_codec_roundtrip() {
let v = vec![1.0f32, -2.5, 0.0, 3.14, -0.001];
let enc = VectorCodec::encode(&v);
let dec = VectorCodec::decode(&enc).unwrap();
assert_eq!(v.len(), dec.len());
for (a, b) in v.iter().zip(dec.iter()) {
assert!((a - b).abs() < 1e-6, "codec mismatch: {} vs {}", a, b);
}
}
#[test]
fn test_vector_codec_compressed() {
let v = vec![0.1f32; 512];
let enc = VectorCodec::encode(&v);
let cmp = VectorCodec::compress(&enc, 3).unwrap();
let dcm = VectorCodec::decompress(&cmp).unwrap();
assert_eq!(enc, dcm);
}
#[test]
fn test_l2_distance() {
let a = vec![1.0f32, 0.0, 0.0];
let b = vec![0.0f32, 1.0, 0.0];
let d = l2_distance(&a, &b);
assert!((d - 2.0f32.sqrt()).abs() < 1e-5);
}
#[test]
fn test_cosine_sim() {
let a = vec![1.0f32, 0.0, 0.0];
let b = vec![1.0f32, 0.0, 0.0];
assert!((cosine_sim(&a, &b) - 1.0).abs() < 1e-5);
let c = vec![-1.0f32, 0.0, 0.0];
assert!((cosine_sim(&a, &c) + 1.0).abs() < 1e-5);
}
#[test]
fn test_metrics_p99() {
let m = Metrics::new(1000);
for i in 1..=100u64 {
m.record_search_latency(i * 1_000_000); }
let p99 = m.p99_latency_ms();
assert!(p99 >= 95.0 && p99 <= 100.0, "p99={}", p99);
}
#[test]
fn test_eviction_policy_ordering() {
let policy = EvictionPolicy::new(0.6, 0.3, 1.0, 0.5);
let now = now_ms();
let hot_meta = MemoryMeta {
id: "hot".into(),
location: StorageLocation::Hot,
last_access_ms: now,
created_ms: now - 1000,
freq: 100,
importance: 0.9,
cold_cost_mb: 0.1,
version: 1,
dimension: 768,
};
let cold_meta = MemoryMeta {
id: "cold".into(),
location: StorageLocation::Hot,
last_access_ms: now - 7_200_000,
created_ms: now - 10_000_000,
freq: 1,
importance: 0.1,
cold_cost_mb: 0.1,
version: 1,
dimension: 768,
};
let s_hot = policy.score(&hot_meta);
let s_cold = policy.score(&cold_meta);
assert!(s_hot > s_cold, "hot={} cold={}", s_hot, s_cold);
let evictions = policy.select_evictions(&[hot_meta, cold_meta], 1);
assert_eq!(evictions, vec!["cold"]);
}
#[test]
fn test_hot_index_basic() {
let cfg = HotConfig {
dim: 4,
max_items: 10,
shard_count: 2,
..Default::default()
};
let idx = HotIndex::new(&cfg).unwrap();
idx.add_sync("a".into(), vec![1.0, 0.0, 0.0, 0.0]).unwrap();
idx.add_sync("b".into(), vec![0.0, 1.0, 0.0, 0.0]).unwrap();
idx.add_sync("c".into(), vec![0.0, 0.0, 1.0, 0.0]).unwrap();
assert!(idx.contains("a"));
assert_eq!(idx.get("b").unwrap(), vec![0.0, 1.0, 0.0, 0.0]);
let results = idx.search(&[1.0, 0.0, 0.0, 0.0], 2).unwrap();
assert_eq!(results.len(), 2);
assert_eq!(results[0].0, "a");
idx.remove("b");
assert!(!idx.contains("b"));
}
#[test]
fn test_nebula_index_build_and_search() {
let vectors: Vec<(String, Vec<f32>)> = (0..50).map(|i| {
let x = (i as f32) / 50.0;
(format!("id{}", i), vec![x, 1.0 - x, x * 0.5, (1.0 - x) * 0.5])
}).collect();
let nebula = NebulaIndex::new(4, 3, Vec::new());
nebula.build(&vectors, 5);
assert_eq!(nebula.partition_count(), 5);
let query = vec![0.0f32, 1.0, 0.0, 0.5];
let candidates = nebula.coarse_candidates(&query);
assert!(!candidates.is_empty(), "coarse search returned nothing");
}
#[test]
fn test_akashic_records() {
let dir = format!("/tmp/aether_test_{}", fastrand::u32(..));
let ak = AkashicRecords::open(&dir).unwrap();
let meta = MemoryMeta {
id: "test_id".into(),
location: StorageLocation::Hot,
last_access_ms: 1000,
created_ms: 500,
freq: 3,
importance: 0.7,
cold_cost_mb: 0.0,
version: 1,
dimension: 128,
};
ak.put_meta(&meta).unwrap();
let got = ak.get_meta("test_id").unwrap().unwrap();
assert_eq!(got.id, "test_id");
assert_eq!(got.freq, 3);
ak.put_importance("test_id", 0.8).unwrap();
let imp = ak.get_importance("test_id").unwrap();
assert!((imp - 0.8).abs() < 1e-5, "importance: {}", imp);
ak.remove_meta("test_id").unwrap();
assert!(ak.get_meta("test_id").unwrap().is_none());
let _ = std::fs::remove_dir_all(&dir);
}
#[tokio::test]
async fn test_memory_manager_end_to_end() {
let db_dir = format!("/tmp/aether_mgr_{}", fastrand::u32(..));
let cold_dir = format!("/tmp/aether_cold_{}", fastrand::u32(..));
let hot_cfg = HotConfig {
dim: 4,
max_items: 100,
shard_count: 2,
..Default::default()
};
let cold_cfg = ColdConfig {
local_dir: cold_dir.clone(),
..Default::default()
};
let flux_cfg = FluxConfig {
tick_interval: Duration::from_secs(3600),
..Default::default()
};
let mgr = MemoryManager::new(hot_cfg, cold_cfg, flux_cfg, &db_dir).await.unwrap();
mgr.insert("v1".into(), vec![1.0, 0.0, 0.0, 0.0]).await.unwrap();
mgr.insert("v2".into(), vec![0.0, 1.0, 0.0, 0.0]).await.unwrap();
mgr.insert("v3".into(), vec![0.0, 0.0, 1.0, 0.0]).await.unwrap();
let results = mgr.search(vec![1.0, 0.0, 0.0, 0.0], 2).await.unwrap();
assert!(!results.is_empty());
assert_eq!(results[0].id, "v1");
assert!(results[0].from_hot);
let vec = mgr.get_by_id("v2").await.unwrap();
assert_eq!(vec, vec![0.0, 1.0, 0.0, 0.0]);
mgr.apply_feedback("v1", 1.0).await.unwrap();
let imp = mgr.akashic.get_importance("v1").unwrap();
assert!(imp > 0.5, "importance should increase: {}", imp);
let health = mgr.health().await;
assert!(health.healthy);
assert_eq!(health.hot_items, 3);
mgr.delete("v2").await.unwrap();
assert!(mgr.get_by_id("v2").await.is_err());
let h2 = mgr.health().await;
assert_eq!(h2.total_items, 2);
let _ = std::fs::remove_dir_all(&db_dir);
let _ = std::fs::remove_dir_all(&cold_dir);
}
}
#[tokio::main]
async fn main() -> Result<()> {
env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info"))
.format_timestamp_millis()
.init();
info!("AetherMemory 启动中...");
let hot_cfg = HotConfig {
dim: 1536,
max_items: 200_000,
shard_count: 16,
search_timeout: Duration::from_millis(80),
max_concurrent_searches: 256,
batch_window_ms: 5,
max_batch_size: 64,
};
let cold_cfg = ColdConfig {
local_dir: "./aether_cold".into(),
s3_bucket: std::env::var("AETHER_S3_BUCKET").ok(),
s3_prefix: Some("aether".into()),
s3_region: std::env::var("AWS_REGION").ok().or(Some("us-east-1".into())),
compress_level: 3,
partition_count: 512,
top_partitions: 16,
};
let flux_cfg = FluxConfig {
initial_temperature: 0.3,
min_temperature: 0.005,
max_temperature: 8.0,
decay_rate: 0.997,
pressure_scale: 2.5,
tick_interval: Duration::from_secs(10),
sample_rate: 0.01,
max_candidates: 512,
alpha_recency: 0.55,
beta_freq: 0.25,
gamma_importance: 1.0,
delta_cost: 0.4,
sigmoid_k: 1.2,
importance_decay: 0.998,
max_concurrent_migrations: 16,
restore_concurrency: 8,
restore_backoff_ms: 100,
restore_max_retries: 5,
};
let mgr = MemoryManager::new(hot_cfg, cold_cfg, flux_cfg, "./aether_meta").await?;
info!("系统就绪,热层维度 = 1536");
let probe = vec![0.1f32; 1536];
mgr.insert("probe_0001".into(), probe.clone()).await?;
info!("插入 probe_0001");
let results = mgr.search(probe, 5).await?;
info!("检索返回 {} 条结果,首条ID={:?}, 来自热层={}",
results.len(),
results.first().map(|r| &r.id),
results.first().map(|r| r.from_hot).unwrap_or(false));
mgr.apply_feedback("probe_0001", 0.9).await?;
info!("施加正反馈到 probe_0001");
let h = mgr.health().await;
info!("健康报告: hot={}, cold={}, temp={:.3}, hit_rate={:.2}%, p99_ms={:.2}",
h.hot_items, h.cold_items, h.temperature, h.hot_hit_rate * 100.0, h.p99_search_ms);
info!("指标:\n{}", mgr.metrics().to_prometheus());
tokio::signal::ctrl_c().await?;
info!("AetherMemory 关闭。");
Ok(())
}