use crate::memory_core::analytics::{RecallEvent, RecallLog, query_hash};
const RECALL_LOG_FILENAME: &str = "recall.db";
use crate::memory_core::decay::DecayConfig;
use crate::memory_core::dream::extract_keywords;
use crate::memory_core::embed::{Embedder, FastEmbedder};
use crate::memory_core::filter::{FilterConfig, FilterReject, classify};
use crate::memory_core::palace::{Drawer, DrawerType, Palace, PalaceId, RoomType};
use crate::memory_core::store::kg::KnowledgeGraph;
use crate::memory_core::store::l1_cache::L1Cache;
use crate::memory_core::store::palace_store::PalaceStore;
use crate::memory_core::store::vector::{UsearchStore, VectorStore};
use crate::memory_core::timeouts;
use anyhow::{Context, Result};
use parking_lot::RwLock;
use std::collections::HashMap;
use std::path::Path;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use tokio::sync::OnceCell;
use uuid::Uuid;
static SHARED_EMBEDDER: OnceCell<Arc<dyn Embedder + Send + Sync>> = OnceCell::const_new();
pub async fn shared_embedder() -> Result<Arc<dyn Embedder + Send + Sync>> {
let timeout = timeouts::embedder_init_timeout();
SHARED_EMBEDDER
.get_or_try_init(|| async {
let e = tokio::time::timeout(timeout, FastEmbedder::new())
.await
.map_err(|_| {
anyhow::anyhow!(
"FastEmbedder cold init timed out after {:?} (issue #906); \
increase TRUSTY_EMBEDDER_INIT_TIMEOUT_SECS if the model \
needs more time on this host",
timeout
)
})?
.context("init shared FastEmbedder")?;
Ok::<Arc<dyn Embedder + Send + Sync>, anyhow::Error>(Arc::new(e))
})
.await
.cloned()
}
#[cfg(any(test, feature = "embedder-test-support"))]
pub fn seed_shared_embedder_with_mock() {
use crate::embedder::MockEmbedder;
let mock: Arc<dyn Embedder + Send + Sync> = Arc::new(MockEmbedder::new(384));
let _ = SHARED_EMBEDDER.set(mock);
}
pub struct L0Identity {
pub content: String,
}
pub struct L1Essential {
pub drawers: Vec<Drawer>,
}
#[derive(Debug, Clone)]
pub struct RecallResult {
pub drawer: Drawer,
pub score: f32,
pub layer: u8,
}
const L1_CAP: usize = 15;
pub struct PalaceHandle {
pub id: PalaceId,
pub identity: String,
pub l1_drawers: Vec<Drawer>,
pub vector_store: Arc<UsearchStore>,
pub kg: Arc<KnowledgeGraph>,
pub drawers: Arc<RwLock<Vec<Drawer>>>,
pub data_dir: Option<std::path::PathBuf>,
pub decay_config: DecayConfig,
pub recall_log: Option<Arc<RecallLog>>,
pub closets: Arc<RwLock<HashMap<String, Vec<Uuid>>>>,
pub is_compacting: Arc<AtomicBool>,
pub write_mutex: Arc<tokio::sync::Mutex<()>>,
}
#[derive(Debug, Clone)]
pub struct RememberOptions {
pub filter: FilterConfig,
pub force: bool,
pub enforce_min_tokens: bool,
pub classify_as: Option<DrawerType>,
}
impl Default for RememberOptions {
fn default() -> Self {
Self {
filter: FilterConfig::default(),
force: false,
enforce_min_tokens: true,
classify_as: None,
}
}
}
impl RememberOptions {
pub fn note() -> Self {
Self {
filter: FilterConfig::default(),
force: false,
enforce_min_tokens: false,
classify_as: Some(DrawerType::UserFact),
}
}
pub fn forced() -> Self {
Self {
force: true,
..Self::default()
}
}
}
impl PalaceHandle {
pub fn is_compacting(&self) -> bool {
self.is_compacting.load(Ordering::Relaxed)
}
pub fn is_read_only(&self) -> bool {
self.kg.is_read_only() || self.vector_store.is_read_only()
}
#[cfg(test)]
pub fn write_mutex_for_test(&self) -> Arc<tokio::sync::Mutex<()>> {
Arc::clone(&self.write_mutex)
}
pub fn new(
id: PalaceId,
identity: String,
vector_store: UsearchStore,
kg: KnowledgeGraph,
) -> Self {
Self {
id,
identity,
l1_drawers: Vec::new(),
vector_store: Arc::new(vector_store),
kg: Arc::new(kg),
drawers: Arc::new(RwLock::new(Vec::new())),
data_dir: None,
decay_config: DecayConfig::default(),
recall_log: None,
closets: Arc::new(RwLock::new(HashMap::new())),
is_compacting: Arc::new(AtomicBool::new(false)),
write_mutex: Arc::new(tokio::sync::Mutex::new(())),
}
}
pub fn open(palace: &Palace) -> Result<Arc<PalaceHandle>> {
let data_dir = &palace.data_dir;
std::fs::create_dir_all(data_dir)
.with_context(|| format!("create palace data dir {}", data_dir.display()))?;
let identity = PalaceStore::load_identity(data_dir)
.with_context(|| format!("load identity for {}", palace.id))?
.unwrap_or_default();
let l1_drawers = L1Cache::load_l1_cache(data_dir)
.with_context(|| format!("load L1 cache for {}", palace.id))?;
let vector_path = data_dir.join("index.usearch");
let vector_store = UsearchStore::new(vector_path, 384)
.with_context(|| format!("open vector store for {}", palace.id))?;
let kg_path = data_dir.join("kg.db");
let kg =
KnowledgeGraph::open(&kg_path).with_context(|| format!("open KG for {}", palace.id))?;
let persisted_drawers = match kg.load_drawers() {
Ok(d) => d,
Err(e) => {
tracing::warn!(palace = %palace.id, "load_drawers failed, falling back to L1 only: {e:#}");
Vec::new()
}
};
let mut all_drawers = persisted_drawers;
for l1 in &l1_drawers {
if !all_drawers.iter().any(|d| d.id == l1.id) {
all_drawers.push(l1.clone());
}
}
let now = chrono::Utc::now();
let mut pruned = 0usize;
all_drawers.retain(|d| {
let expired = d.expires_at.is_some_and(|t| t < now);
if expired {
if let Err(e) = kg.delete_drawer_sync(d.id) {
tracing::warn!(
palace = %palace.id, id = %d.id,
"purge_expired: delete_drawer failed: {e:#}"
);
}
pruned += 1;
}
!expired
});
if pruned > 0 {
tracing::info!(palace = %palace.id, count = pruned, "purged expired drawers at open");
}
let index_count = vector_store.index_size();
let drawer_count = all_drawers.len();
if index_count > drawer_count + 5 {
tracing::warn!(
palace = %palace.id,
index_vectors = index_count,
drawer_records = drawer_count,
"vector index has orphaned entries — consider re-ingesting"
);
}
let drawers = Arc::new(RwLock::new(all_drawers));
let recall_log = match RecallLog::open(&data_dir.join(RECALL_LOG_FILENAME)) {
Ok(log) => Some(Arc::new(log)),
Err(e) => {
tracing::warn!(palace = %palace.id, "open recall log failed, analytics disabled: {e:#}");
None
}
};
let handle = PalaceHandle {
id: palace.id.clone(),
identity,
l1_drawers,
vector_store: Arc::new(vector_store),
kg: Arc::new(kg),
drawers,
data_dir: Some(data_dir.clone()),
decay_config: DecayConfig::default(),
recall_log,
closets: Arc::new(RwLock::new(HashMap::new())),
is_compacting: Arc::new(AtomicBool::new(false)),
write_mutex: Arc::new(tokio::sync::Mutex::new(())),
};
Ok(Arc::new(handle))
}
pub fn flush(&self) -> Result<()> {
let Some(data_dir) = self.data_dir.as_ref() else {
return Ok(());
};
let drawers = self.drawers.read().clone();
L1Cache::save_l1_cache(&drawers, data_dir)
.with_context(|| format!("save L1 cache for {}", self.id))?;
PalaceStore::save_identity(&self.id, &self.identity, data_dir)
.with_context(|| format!("save identity for {}", self.id))?;
Ok(())
}
pub fn with_recall_log(mut self, log: Arc<RecallLog>) -> Self {
self.recall_log = Some(log);
self
}
pub fn with_decay_config(mut self, config: DecayConfig) -> Self {
self.decay_config = config;
self
}
pub fn add_drawer(&self, drawer: Drawer) {
let mut drawers = self.drawers.write();
drawers.push(drawer);
}
pub fn refresh_l1(&mut self) {
let drawers = self.drawers.read();
let mut sorted: Vec<Drawer> = drawers.clone();
sorted.sort_by(|a, b| {
b.importance
.partial_cmp(&a.importance)
.unwrap_or(std::cmp::Ordering::Equal)
});
self.l1_drawers = sorted.into_iter().take(L1_CAP).collect();
}
pub async fn remember(
&self,
content: String,
room: RoomType,
tags: Vec<String>,
importance: f32,
) -> Result<Uuid> {
self.remember_with_options(content, room, tags, importance, RememberOptions::default())
.await
}
pub async fn remember_with_options(
&self,
content: String,
room: RoomType,
tags: Vec<String>,
importance: f32,
opts: RememberOptions,
) -> Result<Uuid> {
if self.is_read_only() {
return Err(anyhow::anyhow!(
"palace '{}' is read-only: HTTP daemon holds the write lock — \
route writes through the daemon's HTTP API or stop the daemon \
before retrying via stdio",
self.id
));
}
let _write_guard = timeouts::lock_with_timeout(
&self.write_mutex,
timeouts::write_lock_timeout(),
self.id.as_str(),
)
.await?;
if !opts.force {
opts.filter
.apply(&content, opts.enforce_min_tokens)
.map_err(|reject| match reject {
FilterReject::TooShort { .. }
| FilterReject::NoisePattern { .. }
| FilterReject::NonAlphabetic { .. } => anyhow::anyhow!("{reject}"),
})?;
}
let room_id = room_to_uuid(&room);
let mut drawer = Drawer::new(room_id, content.clone());
drawer.tags = tags;
drawer.importance = importance.clamp(0.0, 1.0);
let final_type = match opts.classify_as {
Some(t) => t,
None => classify(&content, DrawerType::Unknown),
};
drawer = drawer.with_type(final_type);
let id = drawer.id;
let embedder = shared_embedder()
.await
.context("acquire shared embedder for remember")?;
let embed_timeout = timeouts::embed_batch_timeout();
let vecs = tokio::time::timeout(embed_timeout, embedder.embed_batch(&[content]))
.await
.map_err(|_| {
anyhow::anyhow!(
"embed_batch timed out after {:?} on remember path (issue #906); \
increase TRUSTY_EMBED_BATCH_TIMEOUT_SECS if batches legitimately \
take longer on this host",
embed_timeout
)
})?
.context("embed drawer content")?;
if let Some(v) = vecs.into_iter().next() {
self.vector_store
.upsert(id, v)
.await
.context("upsert drawer vector")?;
}
self.kg
.upsert_drawer(&drawer)
.await
.context("persist drawer metadata")?;
{
let mut drawers = self.drawers.write();
drawers.push(drawer);
}
if let Some(data_dir) = self.data_dir.as_ref() {
let snap = self.drawers.read().clone();
L1Cache::save_l1_cache(&snap, data_dir).context("save L1 snapshot")?;
}
self.rebuild_closets();
Ok(id)
}
pub fn rebuild_closets(&self) {
let snapshot: Vec<Drawer> = self.drawers.read().clone();
let mut new_index: HashMap<String, Vec<Uuid>> = HashMap::new();
for drawer in snapshot.iter() {
for kw in extract_keywords(&drawer.content) {
new_index.entry(kw).or_default().push(drawer.id);
}
}
let mut closets = self.closets.write();
*closets = new_index;
}
pub async fn forget(&self, id: Uuid) -> Result<()> {
if self.is_read_only() {
return Err(anyhow::anyhow!(
"palace '{}' is read-only: HTTP daemon holds the write lock — \
route forget through the daemon's HTTP API or stop the daemon \
before retrying via stdio",
self.id
));
}
let _write_guard = timeouts::lock_with_timeout(
&self.write_mutex,
timeouts::write_lock_timeout(),
self.id.as_str(),
)
.await?;
if let Err(e) = self.vector_store.remove(id).await {
tracing::warn!(?id, "vector remove failed: {e:#}");
}
if let Err(e) = self.kg.delete_drawer(id).await {
tracing::warn!(?id, "drawer metadata delete failed: {e:#}");
}
if let Err(e) = self.kg.cascade_delete_by_drawer(id).await {
tracing::warn!(?id, "kg cascade_delete_by_drawer failed: {e:#}");
}
{
let mut drawers = self.drawers.write();
drawers.retain(|d| d.id != id);
}
if let Some(data_dir) = self.data_dir.as_ref() {
let snap = self.drawers.read().clone();
L1Cache::save_l1_cache(&snap, data_dir).context("save L1 snapshot after forget")?;
}
Ok(())
}
pub fn list_drawers(
&self,
room: Option<RoomType>,
tag: Option<String>,
limit: usize,
) -> Vec<Drawer> {
let drawers = self.drawers.read();
let target_room_id = room.as_ref().map(room_to_uuid);
let mut filtered: Vec<Drawer> = drawers
.iter()
.filter(|d| match &target_room_id {
Some(rid) => d.room_id == *rid,
None => true,
})
.filter(|d| match &tag {
Some(t) => d.tags.iter().any(|x| x == t),
None => true,
})
.cloned()
.collect();
drop(drawers);
filtered.sort_by(|a, b| {
b.importance
.partial_cmp(&a.importance)
.unwrap_or(std::cmp::Ordering::Equal)
});
filtered.truncate(limit);
filtered
}
pub async fn purge_expired(&self) -> Result<usize> {
if self.is_read_only() {
return Ok(0);
}
let now = chrono::Utc::now();
let expired_ids: Vec<Uuid> = self
.drawers
.read()
.iter()
.filter(|d| d.expires_at.is_some_and(|t| t < now))
.map(|d| d.id)
.collect();
let count = expired_ids.len();
for id in expired_ids {
if let Err(e) = self.forget(id).await {
tracing::warn!(?id, "purge_expired: forget failed: {e:#}");
}
}
if count > 0 {
tracing::info!(palace = %self.id, count, "purged expired drawers");
}
Ok(count)
}
}
pub async fn recall_with_default_embedder(
handle: &PalaceHandle,
query: &str,
top_k: usize,
) -> Result<Vec<RecallResult>> {
let embedder = shared_embedder()
.await
.context("acquire shared embedder for recall")?;
recall(handle, embedder.as_ref(), query, top_k).await
}
pub async fn recall_deep_with_default_embedder(
handle: &PalaceHandle,
query: &str,
top_k: usize,
) -> Result<Vec<RecallResult>> {
let embedder = shared_embedder()
.await
.context("acquire shared embedder for recall_deep")?;
recall_deep(handle, embedder.as_ref(), query, top_k).await
}
#[derive(Debug, Clone)]
pub struct CrossPalaceResult {
pub palace_id: String,
pub result: RecallResult,
}
pub async fn recall_across_palaces(
handles: &[Arc<PalaceHandle>],
embedder: &Arc<dyn Embedder + Send + Sync>,
query: &str,
top_k: usize,
deep: bool,
) -> Result<Vec<CrossPalaceResult>> {
if handles.is_empty() || top_k == 0 {
return Ok(Vec::new());
}
let mut futures = Vec::with_capacity(handles.len());
for handle in handles {
let palace_id = handle.id.as_str().to_string();
let handle = handle.clone();
let embedder = embedder.clone();
let query = query.to_string();
futures.push(async move {
let result = if deep {
recall_deep(&handle, embedder.as_ref(), &query, top_k).await
} else {
recall(&handle, embedder.as_ref(), &query, top_k).await
};
(palace_id, result)
});
}
let outcomes = futures::future::join_all(futures).await;
let mut merged: Vec<CrossPalaceResult> = Vec::new();
let mut by_drawer: HashMap<Uuid, usize> = HashMap::new();
for (palace_id, outcome) in outcomes {
match outcome {
Ok(hits) => {
for r in hits {
let drawer_id = r.drawer.id;
let candidate = CrossPalaceResult {
palace_id: palace_id.clone(),
result: r,
};
match by_drawer.get(&drawer_id).copied() {
Some(idx) if merged[idx].result.score >= candidate.result.score => {
}
Some(idx) => {
merged[idx] = candidate;
}
None => {
by_drawer.insert(drawer_id, merged.len());
merged.push(candidate);
}
}
}
}
Err(e) => {
tracing::warn!(palace = %palace_id, "recall_across_palaces: skipping palace: {e:#}");
}
}
}
merged.sort_by(|a, b| {
b.result
.score
.partial_cmp(&a.result.score)
.unwrap_or(std::cmp::Ordering::Equal)
});
merged.truncate(top_k);
Ok(merged)
}
pub async fn recall_across_palaces_with_default_embedder(
handles: &[Arc<PalaceHandle>],
query: &str,
top_k: usize,
deep: bool,
) -> Result<Vec<CrossPalaceResult>> {
let embedder = shared_embedder()
.await
.context("acquire shared embedder for recall_across_palaces")?;
recall_across_palaces(handles, &embedder, query, top_k, deep).await
}
fn room_to_uuid(room: &RoomType) -> Uuid {
let label = format!("{room:?}");
let mut bytes = [0u8; 16];
for (i, b) in label.bytes().enumerate() {
bytes[i % 16] ^= b.wrapping_add(i as u8);
}
Uuid::from_bytes(bytes)
}
fn uuid_prefix_eq(a: Uuid, b: Uuid) -> bool {
a.as_bytes()[..8] == b.as_bytes()[..8]
}
const L1_NO_SIMILARITY_PENALTY: f32 = 0.15;
pub fn retrieve_l0_l1(handle: &PalaceHandle) -> Vec<RecallResult> {
let mut out: Vec<RecallResult> = Vec::with_capacity(1 + handle.l1_drawers.len());
if !handle.identity.is_empty() {
let identity_drawer = Drawer {
id: Uuid::nil(),
room_id: Uuid::nil(),
content: handle.identity.clone(),
importance: 1.0,
source_file: None,
created_at: chrono::Utc::now(),
tags: Vec::new(),
last_accessed_at: None,
access_count: 0,
drawer_type: crate::memory_core::palace::DrawerType::UserFact,
expires_at: None,
};
out.push(RecallResult {
drawer: identity_drawer,
score: 1.0,
layer: 0,
});
}
for d in &handle.l1_drawers {
out.push(RecallResult {
drawer: d.clone(),
score: d.importance,
layer: 1,
});
}
out
}
pub fn rescore_l1_by_similarity(
results: &mut [RecallResult],
similarity_scores: &HashMap<Uuid, f32>,
) {
for r in results.iter_mut() {
if r.layer == 1 {
let id = r.drawer.id;
r.score = match similarity_scores.get(&id) {
Some(&sim) => sim,
None => r.drawer.importance * L1_NO_SIMILARITY_PENALTY,
};
}
}
}
pub async fn retrieve_l2(
handle: &PalaceHandle,
embedder: &dyn Embedder,
query: &str,
room_filter: Option<RoomType>,
top_k: usize,
) -> Result<Vec<RecallResult>> {
if top_k == 0 {
return Ok(Vec::new());
}
let embeddings = embedder.embed_batch(&[query.to_string()]).await?;
let Some(query_vec) = embeddings.into_iter().next() else {
return Ok(Vec::new());
};
let overfetch = top_k.saturating_mul(3).max(top_k);
let hits = handle.vector_store.search(&query_vec, overfetch).await?;
let drawers = handle.drawers.read();
let closets = handle.closets.read();
let query_tokens: Vec<String> = extract_keywords(query);
let mut results: Vec<RecallResult> = Vec::with_capacity(hits.len());
for hit in hits {
let Some(drawer) = drawers.iter().find(|d| uuid_prefix_eq(d.id, hit.drawer_id)) else {
continue;
};
if room_filter.is_some() {
}
let age_days = DecayConfig::age_days(drawer.created_at);
let boost = drawer.accumulated_boost(&handle.decay_config);
let eff_importance =
handle
.decay_config
.effective_importance(drawer.importance, age_days, boost);
let effective_score = eff_importance * hit.score;
let drawer_id = drawer.id;
let in_closet = query_tokens
.iter()
.any(|tok| closets.get(tok).is_some_and(|ids| ids.contains(&drawer_id)));
let tag_boost = if in_closet { 0.15_f32 } else { 0.0 };
let final_score = (effective_score + tag_boost).min(1.0);
results.push(RecallResult {
drawer: drawer.clone(),
score: final_score,
layer: 2,
});
}
drop(closets);
drop(drawers);
results.sort_by(|a, b| {
b.score
.partial_cmp(&a.score)
.unwrap_or(std::cmp::Ordering::Equal)
});
results.truncate(top_k);
Ok(results)
}
pub async fn retrieve_l3(
handle: &PalaceHandle,
embedder: &dyn Embedder,
query: &str,
top_k: usize,
) -> Result<Vec<RecallResult>> {
if top_k == 0 {
return Ok(Vec::new());
}
let embeddings = embedder.embed_batch(&[query.to_string()]).await?;
let Some(query_vec) = embeddings.into_iter().next() else {
return Ok(Vec::new());
};
let hits = handle.vector_store.search(&query_vec, top_k).await?;
let drawers = handle.drawers.read();
let closets = handle.closets.read();
let query_tokens: Vec<String> = extract_keywords(query);
let mut results: Vec<RecallResult> = Vec::with_capacity(hits.len());
for hit in hits {
let Some(drawer) = drawers.iter().find(|d| uuid_prefix_eq(d.id, hit.drawer_id)) else {
continue;
};
let age_days = DecayConfig::age_days(drawer.created_at);
let boost = drawer.accumulated_boost(&handle.decay_config);
let eff_importance =
handle
.decay_config
.effective_importance(drawer.importance, age_days, boost);
let effective_score = eff_importance * hit.score;
let drawer_id = drawer.id;
let in_closet = query_tokens
.iter()
.any(|tok| closets.get(tok).is_some_and(|ids| ids.contains(&drawer_id)));
let tag_boost = if in_closet { 0.15_f32 } else { 0.0 };
let final_score = (effective_score + tag_boost).min(1.0);
results.push(RecallResult {
drawer: drawer.clone(),
score: final_score,
layer: 3,
});
}
drop(closets);
drop(drawers);
results.sort_by(|a, b| {
b.score
.partial_cmp(&a.score)
.unwrap_or(std::cmp::Ordering::Equal)
});
results.truncate(top_k);
Ok(results)
}
pub fn expand_query(query: &str) -> String {
let q = query.to_lowercase();
let mut extra: Vec<&str> = Vec::new();
if q.contains("fast")
|| q.contains("speed")
|| q.contains("latency")
|| q.contains("performance")
{
extra.push("latency performance speed throughput");
}
if q.contains("vector search")
|| q.contains("semantic search")
|| q.contains("nearest neighbor")
{
extra.push("HNSW ANN approximate nearest neighbor usearch vector index");
}
if q.contains("memory safe") || q.contains("borrow") || q.contains("ownership") {
extra.push("borrow checker lifetime ownership Rust memory safety");
}
if q.contains("concurren") || q.contains("thread") || q.contains("parallel") {
extra.push("concurrent async tokio DashMap RwLock mutex thread-safe");
}
if extra.is_empty() {
query.to_string()
} else {
format!("{} {}", query, extra.join(" "))
}
}
pub async fn recall(
handle: &PalaceHandle,
embedder: &dyn Embedder,
query: &str,
top_k: usize,
) -> Result<Vec<RecallResult>> {
let expanded = expand_query(query);
let mut combined = retrieve_l0_l1(handle);
let l2 = retrieve_l2(handle, embedder, &expanded, None, top_k).await?;
let sim_scores: HashMap<Uuid, f32> = l2.iter().map(|r| (r.drawer.id, r.score)).collect();
rescore_l1_by_similarity(&mut combined, &sim_scores);
dedup_extend(&mut combined, l2);
combined.sort_by(|a, b| {
b.score
.partial_cmp(&a.score)
.unwrap_or(std::cmp::Ordering::Equal)
});
combined.truncate(top_k);
log_recall(handle, query, &combined);
Ok(combined)
}
pub async fn recall_deep(
handle: &PalaceHandle,
embedder: &dyn Embedder,
query: &str,
top_k: usize,
) -> Result<Vec<RecallResult>> {
let expanded = expand_query(query);
let mut combined = retrieve_l0_l1(handle);
let l3 = retrieve_l3(handle, embedder, &expanded, top_k).await?;
let sim_scores: HashMap<Uuid, f32> = l3.iter().map(|r| (r.drawer.id, r.score)).collect();
rescore_l1_by_similarity(&mut combined, &sim_scores);
dedup_extend(&mut combined, l3);
combined.sort_by(|a, b| {
b.score
.partial_cmp(&a.score)
.unwrap_or(std::cmp::Ordering::Equal)
});
combined.truncate(top_k);
log_recall(handle, query, &combined);
Ok(combined)
}
fn log_recall(handle: &PalaceHandle, query: &str, results: &[RecallResult]) {
let Some(log) = handle.recall_log.clone() else {
return;
};
let palace_id = handle.id.as_str().to_string();
let q_hash = query_hash(query);
let logged: Vec<RecallResult> = results.iter().filter(|r| r.layer > 0).cloned().collect();
tokio::spawn(async move {
let now = chrono::Utc::now();
if logged.is_empty() {
let _ = log
.record(RecallEvent {
palace_id,
query_hash: q_hash,
layer: 3,
drawer_id: None,
score: 0.0,
occurred_at: now,
})
.await;
} else {
for r in &logged {
let _ = log
.record(RecallEvent {
palace_id: palace_id.clone(),
query_hash: q_hash,
layer: r.layer,
drawer_id: Some(r.drawer.id),
score: r.score,
occurred_at: now,
})
.await;
}
}
});
}
fn dedup_extend(base: &mut Vec<RecallResult>, extra: Vec<RecallResult>) {
for r in extra {
if !base.iter().any(|b| b.drawer.id == r.drawer.id) {
base.push(r);
}
}
}
pub struct RetrievalLayers;
impl RetrievalLayers {
pub async fn load_l0(_palace_data_dir: &Path) -> Result<L0Identity> {
Ok(L0Identity {
content: String::new(),
})
}
pub async fn load_l1(_palace_id: &PalaceId) -> Result<L1Essential> {
Ok(L1Essential {
drawers: Vec::new(),
})
}
}
#[cfg(test)]
mod tests;
#[cfg(test)]
mod timeout_tests;