use crate::filesystem::{CortexFilesystem, FilesystemOperations};
use crate::layers::generator::{AbstractGenerator, OverviewGenerator};
use crate::llm::LLMClient;
use crate::llm_result_cache::{CacheConfig, LlmResultCache};
use crate::memory_events::{ChangeType, MemoryEvent};
use crate::memory_index::MemoryScope;
use crate::{ContextLayer, Result};
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
use std::sync::Arc;
use tokio::sync::{mpsc, RwLock};
use tracing::{debug, info, warn};
#[derive(Debug, Clone, Default)]
pub struct UpdateStats {
pub updated_count: usize,
pub skipped_count: usize,
pub llm_call_count: usize,
pub cache_hits: usize,
pub cache_misses: usize,
}
impl UpdateStats {
pub fn total_operations(&self) -> usize {
self.updated_count + self.skipped_count
}
pub fn skip_rate(&self) -> f64 {
if self.total_operations() == 0 {
0.0
} else {
self.skipped_count as f64 / self.total_operations() as f64
}
}
pub fn cache_hit_rate(&self) -> f64 {
let total = self.cache_hits + self.cache_misses;
if total == 0 {
0.0
} else {
self.cache_hits as f64 / total as f64
}
}
}
pub struct CascadeLayerUpdater {
filesystem: Arc<CortexFilesystem>,
llm_client: Arc<dyn LLMClient>,
l0_generator: AbstractGenerator,
l1_generator: OverviewGenerator,
event_tx: mpsc::UnboundedSender<MemoryEvent>,
stats: Arc<RwLock<UpdateStats>>,
llm_cache: Option<Arc<LlmResultCache>>,
}
impl CascadeLayerUpdater {
pub fn new(
filesystem: Arc<CortexFilesystem>,
llm_client: Arc<dyn LLMClient>,
event_tx: mpsc::UnboundedSender<MemoryEvent>,
) -> Self {
Self::new_with_cache(filesystem, llm_client, event_tx, None)
}
pub fn new_with_cache(
filesystem: Arc<CortexFilesystem>,
llm_client: Arc<dyn LLMClient>,
event_tx: mpsc::UnboundedSender<MemoryEvent>,
cache_config: Option<CacheConfig>,
) -> Self {
let llm_cache = cache_config.map(|config| {
Arc::new(LlmResultCache::new(config))
});
Self {
filesystem,
llm_client,
l0_generator: AbstractGenerator::new(),
l1_generator: OverviewGenerator::new(),
event_tx,
stats: Arc::new(RwLock::new(UpdateStats::default())),
llm_cache,
}
}
pub async fn get_stats(&self) -> UpdateStats {
self.stats.read().await.clone()
}
pub async fn reset_stats(&self) {
let mut stats = self.stats.write().await;
*stats = UpdateStats::default();
}
fn calculate_content_hash(&self, content: &str) -> String {
let mut hasher = DefaultHasher::new();
content.hash(&mut hasher);
format!("{:x}", hasher.finish())
}
fn strip_metadata_lines(content: &str) -> String {
content
.lines()
.filter(|line| {
!line.starts_with("<!-- source-hash:") && !line.starts_with("**Added**:")
})
.collect::<Vec<_>>()
.join("\n")
}
async fn should_update_layer(&self, layer_uri: &str, new_content_hash: &str) -> Result<bool> {
match self.filesystem.read(layer_uri).await {
Ok(existing_content) => {
for line in existing_content.lines() {
if let Some(rest) = line.strip_prefix("<!-- source-hash: ") {
if let Some(stored_hash) = rest.strip_suffix(" -->") {
return Ok(stored_hash != new_content_hash);
}
}
}
Ok(true)
}
Err(_) => {
Ok(true)
}
}
}
pub async fn on_memory_changed(
&self,
scope: MemoryScope,
owner_id: String,
file_uri: String,
change_type: ChangeType,
) -> Result<()> {
debug!(
"CascadeLayerUpdater: handling {:?} for {} in {:?}/{}",
change_type, file_uri, scope, owner_id
);
let parent_dir = self.get_parent_directory(&file_uri);
self.update_directory_layers(&parent_dir, &scope, &owner_id).await?;
self.update_ancestor_layers(&scope, &owner_id, &parent_dir).await?;
Ok(())
}
pub async fn update_directory_layers(&self, dir_uri: &str, scope: &MemoryScope, owner_id: &str) -> Result<()> {
let content = self.aggregate_directory_content(dir_uri).await?;
if content.is_empty() {
debug!("Directory {} has no content, skipping layer update", dir_uri);
return Ok(());
}
let new_content_hash = self.calculate_content_hash(&content);
let abstract_uri = format!("{}/.abstract.md", dir_uri);
let should_update = self.should_update_layer(&abstract_uri, &new_content_hash).await?;
if !should_update {
debug!("⏭️ Skipped L0/L1 update for {} (content unchanged, hash: {})", dir_uri, &new_content_hash[..8]);
let mut stats = self.stats.write().await;
stats.skipped_count += 1;
return Ok(());
}
info!("🔄 Updating L0/L1 for {} (hash: {} -> {})", dir_uri, "new", &new_content_hash[..8]);
let (abstract_text, overview) = if let Some(ref cache) = self.llm_cache {
let cache_key_l0 = format!("{}:L0", new_content_hash);
let cache_key_l1 = format!("{}:L1", new_content_hash);
let cached_l0 = cache.get(&cache_key_l0).await;
let cached_l1 = cache.get(&cache_key_l1).await;
match (cached_l0, cached_l1) {
(Some(l0), Some(l1)) => {
debug!("💚 Cache HIT for both L0 and L1");
let mut stats = self.stats.write().await;
stats.cache_hits += 2;
(l0, l1)
}
_ => {
debug!("💔 Cache MISS, generating with LLM");
let l0 = self.l0_generator
.generate_with_llm(&content, &self.llm_client, &[])
.await?;
let l1 = self.l1_generator
.generate_with_llm(&content, &self.llm_client)
.await?;
cache.put(cache_key_l0, l0.clone()).await;
cache.put(cache_key_l1, l1.clone()).await;
let mut stats = self.stats.write().await;
stats.cache_misses += 2;
stats.llm_call_count += 2;
(l0, l1)
}
}
} else {
let l0 = self.l0_generator
.generate_with_llm(&content, &self.llm_client, &[])
.await?;
let l1 = self.l1_generator
.generate_with_llm(&content, &self.llm_client)
.await?;
let mut stats = self.stats.write().await;
stats.llm_call_count += 2;
(l0, l1)
};
{
let mut stats = self.stats.write().await;
stats.updated_count += 1;
}
let timestamp = chrono::Utc::now().format("%Y-%m-%d %H:%M:%S UTC");
let abstract_with_ts = format!("{}\n\n**Added**: {}\n<!-- source-hash: {} -->", abstract_text, timestamp, new_content_hash);
let overview_with_ts = format!("{}\n\n---\n\n**Added**: {}\n<!-- source-hash: {} -->", overview, timestamp, new_content_hash);
let overview_uri = format!("{}/.overview.md", dir_uri);
self.filesystem.write(&abstract_uri, &abstract_with_ts).await?;
self.filesystem.write(&overview_uri, &overview_with_ts).await?;
info!("✅ Updated L0/L1 layers for {}", dir_uri);
let _ = self.event_tx.send(MemoryEvent::LayersUpdated {
scope: scope.clone(),
owner_id: owner_id.to_string(),
directory_uri: dir_uri.to_string(),
layers: vec![ContextLayer::L0Abstract, ContextLayer::L1Overview],
});
Ok(())
}
async fn update_ancestor_layers(
&self,
scope: &MemoryScope,
owner_id: &str,
start_dir: &str,
) -> Result<()> {
let root_uri = self.get_scope_root(scope, owner_id);
let mut current = start_dir.to_string();
loop {
let parent = match self.get_parent_directory_opt(¤t) {
Some(p) => p,
None => break,
};
if parent == current || parent.len() < root_uri.len() {
break;
}
if parent == root_uri {
self.update_root_layers(scope, owner_id).await?;
break;
}
self.update_directory_layers(&parent, scope, owner_id).await?;
current = parent;
}
Ok(())
}
async fn update_root_layers(
&self,
scope: &MemoryScope,
owner_id: &str,
) -> Result<()> {
let root_uri = self.get_scope_root(scope, owner_id);
let aggregated = self.aggregate_child_abstracts(&root_uri).await?;
if aggregated.is_empty() {
debug!("Root {} has no content, skipping layer update", root_uri);
return Ok(());
}
let new_content_hash = self.calculate_content_hash(&aggregated);
let abstract_uri = format!("{}/.abstract.md", root_uri);
let should_update = self.should_update_layer(&abstract_uri, &new_content_hash).await?;
if !should_update {
debug!("⏭️ Skipped root L0/L1 update for {:?}/{} (content unchanged)", scope, owner_id);
let mut stats = self.stats.write().await;
stats.skipped_count += 1;
return Ok(());
}
info!("🔄 Updating root L0/L1 for {:?}/{}", scope, owner_id);
let (abstract_text, overview) = if let Some(ref cache) = self.llm_cache {
let cache_key_l0 = format!("{}:L0:root", new_content_hash);
let cache_key_l1 = format!("{}:L1:root", new_content_hash);
let cached_l0 = cache.get(&cache_key_l0).await;
let cached_l1 = cache.get(&cache_key_l1).await;
match (cached_l0, cached_l1) {
(Some(l0), Some(l1)) => {
debug!("💚 Cache HIT for root L0 and L1");
let mut stats = self.stats.write().await;
stats.cache_hits += 2;
(l0, l1)
}
_ => {
debug!("💔 Cache MISS for root, generating with LLM");
let l0 = self.l0_generator
.generate_with_llm(&aggregated, &self.llm_client, &[])
.await?;
let l1 = self.l1_generator
.generate_with_llm(&aggregated, &self.llm_client)
.await?;
cache.put(cache_key_l0, l0.clone()).await;
cache.put(cache_key_l1, l1.clone()).await;
let mut stats = self.stats.write().await;
stats.cache_misses += 2;
stats.llm_call_count += 2;
(l0, l1)
}
}
} else {
let l0 = self.l0_generator
.generate_with_llm(&aggregated, &self.llm_client, &[])
.await?;
let l1 = self.l1_generator
.generate_with_llm(&aggregated, &self.llm_client)
.await?;
let mut stats = self.stats.write().await;
stats.llm_call_count += 2;
(l0, l1)
};
{
let mut stats = self.stats.write().await;
stats.updated_count += 1;
}
let timestamp = chrono::Utc::now().format("%Y-%m-%d %H:%M:%S UTC");
let abstract_with_ts = format!("{}\n\n**Added**: {}\n<!-- source-hash: {} -->", abstract_text, timestamp, new_content_hash);
let overview_with_ts = format!("{}\n\n---\n\n**Added**: {}\n<!-- source-hash: {} -->", overview, timestamp, new_content_hash);
let overview_uri = format!("{}/.overview.md", root_uri);
self.filesystem.write(&abstract_uri, &abstract_with_ts).await?;
self.filesystem.write(&overview_uri, &overview_with_ts).await?;
info!("✅ Updated root L0/L1 layers for {:?}/{}", scope, owner_id);
let _ = self.event_tx.send(MemoryEvent::LayersUpdated {
scope: scope.clone(),
owner_id: owner_id.to_string(),
directory_uri: root_uri,
layers: vec![ContextLayer::L0Abstract, ContextLayer::L1Overview],
});
Ok(())
}
async fn aggregate_directory_content(&self, dir_uri: &str) -> Result<String> {
let entries = self.filesystem.list(dir_uri).await?;
let mut content = String::new();
let mut file_count = 0;
for entry in entries {
if entry.name.starts_with('.') {
continue;
}
if entry.is_directory {
continue;
}
if entry.name.ends_with(".md") || entry.name.ends_with(".txt") {
match self.filesystem.read(&entry.uri).await {
Ok(file_content) => {
content.push_str(&format!("\n\n=== {} ===\n\n", entry.name));
let stripped = Self::strip_metadata_lines(&file_content);
content.push_str(&stripped);
file_count += 1;
}
Err(e) => {
debug!("Failed to read {}: {}", entry.uri, e);
}
}
}
}
if file_count > 0 {
debug!("Aggregated {} files from {}", file_count, dir_uri);
}
let max_chars = 10000;
if content.chars().count() > max_chars {
let truncated: String = content.chars().take(max_chars).collect();
content = truncated;
content.push_str("\n\n[内容已截断...]");
}
Ok(content)
}
async fn aggregate_child_abstracts(&self, dir_uri: &str) -> Result<String> {
let entries = self.filesystem.list(dir_uri).await?;
let mut content = String::new();
let mut dir_count = 0;
for entry in entries {
if !entry.is_directory || entry.name.starts_with('.') {
continue;
}
let abstract_uri = format!("{}/.abstract.md", entry.uri);
if let Ok(abstract_content) = self.filesystem.read(&abstract_uri).await {
content.push_str(&format!("\n\n## {}\n\n", entry.name));
let stripped = Self::strip_metadata_lines(&abstract_content);
content.push_str(&stripped);
dir_count += 1;
}
}
if dir_count > 0 {
debug!("Aggregated abstracts from {} child directories of {}", dir_count, dir_uri);
}
Ok(content)
}
fn get_parent_directory(&self, uri: &str) -> String {
uri.rsplit_once('/')
.map(|(dir, _)| dir.to_string())
.unwrap_or_else(|| uri.to_string())
}
fn get_parent_directory_opt(&self, uri: &str) -> Option<String> {
uri.rsplit_once('/')
.map(|(dir, _)| dir.to_string())
.filter(|dir| !dir.is_empty())
}
fn get_scope_root(&self, scope: &MemoryScope, owner_id: &str) -> String {
match scope {
MemoryScope::User => format!("cortex://user/{}", owner_id),
MemoryScope::Agent => format!("cortex://agent/{}", owner_id),
MemoryScope::Session => format!("cortex://session/{}", owner_id),
MemoryScope::Resources => "cortex://resources".to_string(),
}
}
pub async fn update_timeline_layers(&self, session_id: &str) -> Result<()> {
let timeline_uri = format!("cortex://session/{}/timeline", session_id);
if !self.filesystem.exists(&timeline_uri).await? {
debug!("Timeline {} does not exist, skipping", timeline_uri);
return Ok(());
}
let content = self.aggregate_timeline_content(&timeline_uri).await?;
if content.is_empty() {
debug!("Timeline {} is empty, skipping layer update", timeline_uri);
return Ok(());
}
let abstract_uri = format!("{}/.abstract.md", timeline_uri);
let content_hash = self.calculate_content_hash(&content);
if !self.should_update_layer(&abstract_uri, &content_hash).await? {
debug!("⏭️ Skipped timeline L0/L1 for session {} (content unchanged)", session_id);
self.update_timeline_date_layers(&timeline_uri).await?;
return Ok(());
}
let abstract_text = self.l0_generator
.generate_with_llm(&content, &self.llm_client, &[])
.await?;
let overview = self.l1_generator
.generate_with_llm(&content, &self.llm_client)
.await?;
let timestamp = chrono::Utc::now().format("%Y-%m-%d %H:%M:%S UTC");
let abstract_with_ts = format!("{}\n\n**Added**: {}\n<!-- source-hash: {} -->", abstract_text, timestamp, content_hash);
let overview_with_ts = format!("{}\n\n---\n\n**Added**: {}\n<!-- source-hash: {} -->", overview, timestamp, content_hash);
let overview_uri = format!("{}/.overview.md", timeline_uri);
self.filesystem.write(&abstract_uri, &abstract_with_ts).await?;
self.filesystem.write(&overview_uri, &overview_with_ts).await?;
info!("Updated timeline L0/L1 layers for session {}", session_id);
let _ = self.event_tx.send(MemoryEvent::LayersUpdated {
scope: MemoryScope::Session,
owner_id: session_id.to_string(),
directory_uri: timeline_uri.clone(),
layers: vec![ContextLayer::L0Abstract, ContextLayer::L1Overview],
});
self.update_timeline_date_layers(&timeline_uri).await?;
Ok(())
}
async fn aggregate_timeline_content(&self, timeline_uri: &str) -> Result<String> {
let mut content = String::new();
let mut message_count = 0;
self.collect_timeline_messages_recursive(timeline_uri, &mut content, &mut message_count)
.await?;
if message_count > 0 {
content.insert_str(0, &format!("# Timeline Messages: {}\n\n", message_count));
debug!("Aggregated {} messages from {}", message_count, timeline_uri);
}
let max_chars = 15000;
if content.chars().count() > max_chars {
let truncated: String = content.chars().take(max_chars).collect();
content = truncated;
content.push_str("\n\n[内容已截断...]");
}
Ok(content)
}
fn collect_timeline_messages_recursive<'a>(
&'a self,
uri: &'a str,
content: &'a mut String,
message_count: &'a mut usize,
) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<()>> + Send + 'a>> {
Box::pin(async move {
let entries = self.filesystem.list(uri).await?;
for entry in entries {
if entry.name.starts_with('.') {
continue;
}
if entry.is_directory {
self.collect_timeline_messages_recursive(&entry.uri, content, message_count)
.await?;
} else if entry.name.ends_with(".md") {
match self.filesystem.read(&entry.uri).await {
Ok(file_content) => {
content.push_str(&format!("\n\n---\n\n## Message: {}\n\n", entry.name));
content.push_str(&file_content);
*message_count += 1;
}
Err(e) => {
debug!("Failed to read {}: {}", entry.uri, e);
}
}
}
}
Ok(())
})
}
async fn update_timeline_date_layers(&self, timeline_uri: &str) -> Result<()> {
let entries = self.filesystem.list(timeline_uri).await?;
for entry in entries {
if entry.is_directory && !entry.name.starts_with('.') {
if entry.name.len() == 7 && entry.name.contains('-') {
let month_content = self.aggregate_directory_content_recursive(&entry.uri).await?;
if !month_content.is_empty() {
let abstract_uri = format!("{}/.abstract.md", entry.uri);
let content_hash = self.calculate_content_hash(&month_content);
if self.should_update_layer(&abstract_uri, &content_hash).await? {
let abstract_text = self.l0_generator
.generate_with_llm(&month_content, &self.llm_client, &[])
.await?;
let timestamp = chrono::Utc::now().format("%Y-%m-%d %H:%M:%S UTC");
let abstract_with_ts = format!("{}\n\n**Added**: {}\n<!-- source-hash: {} -->", abstract_text, timestamp, content_hash);
self.filesystem.write(&abstract_uri, &abstract_with_ts).await?;
debug!("Updated month-level L0 for {}", entry.uri);
} else {
debug!("Skipped month-level L0 for {} (content unchanged)", entry.uri);
}
}
self.update_timeline_day_layers(&entry.uri).await?;
}
}
}
Ok(())
}
async fn update_timeline_day_layers(&self, month_uri: &str) -> Result<()> {
let entries = self.filesystem.list(month_uri).await?;
for entry in entries {
if entry.is_directory && !entry.name.starts_with('.') {
let day_content = self.aggregate_directory_content(&entry.uri).await?;
if !day_content.is_empty() {
let abstract_uri = format!("{}/.abstract.md", entry.uri);
let content_hash = self.calculate_content_hash(&day_content);
if self.should_update_layer(&abstract_uri, &content_hash).await? {
let abstract_text = self.l0_generator
.generate_with_llm(&day_content, &self.llm_client, &[])
.await?;
let timestamp = chrono::Utc::now().format("%Y-%m-%d %H:%M:%S UTC");
let abstract_with_ts = format!("{}\n\n**Added**: {}\n<!-- source-hash: {} -->", abstract_text, timestamp, content_hash);
self.filesystem.write(&abstract_uri, &abstract_with_ts).await?;
debug!("Updated day-level L0 for {}", entry.uri);
} else {
debug!("Skipped day-level L0 for {} (content unchanged)", entry.uri);
}
}
}
}
Ok(())
}
async fn aggregate_directory_content_recursive(&self, dir_uri: &str) -> Result<String> {
let mut content = String::new();
self.collect_content_recursive(dir_uri, &mut content).await?;
Ok(content)
}
fn collect_content_recursive<'a>(
&'a self,
uri: &'a str,
content: &'a mut String,
) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<()>> + Send + 'a>> {
Box::pin(async move {
let entries = self.filesystem.list(uri).await?;
for entry in entries {
if entry.name.starts_with('.') {
continue;
}
if entry.is_directory {
self.collect_content_recursive(&entry.uri, content).await?;
} else if entry.name.ends_with(".md") {
if let Ok(file_content) = self.filesystem.read(&entry.uri).await {
content.push_str(&format!("\n\n=== {} ===\n\n", entry.name));
content.push_str(&file_content);
}
}
}
Ok(())
})
}
pub async fn update_all_layers(&self, scope: &MemoryScope, owner_id: &str) -> Result<()> {
let root_uri = self.get_scope_root(scope, owner_id);
debug!("Checking root directory: {}", root_uri);
if !self.filesystem.exists(&root_uri).await? {
debug!("Root directory {} does not exist, skipping", root_uri);
return Ok(());
}
debug!("Starting recursive layer update...");
self.update_all_layers_recursive(&root_uri, scope, owner_id).await?;
self.update_root_layers(scope, owner_id).await?;
info!("Layer update completed for {:?}", scope);
Ok(())
}
fn update_all_layers_recursive<'a>(
&'a self,
dir_uri: &'a str,
scope: &'a MemoryScope,
owner_id: &'a str,
) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<()>> + Send + 'a>> {
Box::pin(async move {
let entries = self.filesystem.list(dir_uri).await?;
debug!("Directory {} has {} entries", dir_uri, entries.len());
for entry in &entries {
if entry.is_directory && !entry.name.starts_with('.') {
self.update_all_layers_recursive(&entry.uri, scope, owner_id).await?;
}
}
let has_content = entries.iter().any(|e| {
!e.is_directory && !e.name.starts_with('.') && e.name.ends_with(".md")
});
if has_content {
match self.update_directory_layers(dir_uri, scope, owner_id).await {
Ok(_) => debug!("Layer files generated for {}", dir_uri),
Err(e) => warn!("Layer generation failed for {}: {}", dir_uri, e),
}
}
Ok(())
})
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::filesystem::CortexFilesystem;
use crate::llm::MockLLMClient;
use std::sync::Arc;
use tokio::sync::mpsc;
#[test]
fn test_get_parent_directory() {
let (tx, _rx) = mpsc::unbounded_channel();
let filesystem = Arc::new(CortexFilesystem::new("/tmp/test"));
let llm_client = Arc::new(MockLLMClient::new());
let updater = CascadeLayerUpdater::new(filesystem, llm_client, tx);
assert_eq!(updater.get_parent_directory("cortex://user/test/path/file.md"), "cortex://user/test/path");
assert_eq!(updater.get_parent_directory("cortex://user/test/file.md"), "cortex://user/test");
assert_eq!(updater.get_parent_directory("cortex://user/file.md"), "cortex://user");
}
#[test]
fn test_get_scope_root() {
let (tx, _rx) = mpsc::unbounded_channel();
let filesystem = Arc::new(CortexFilesystem::new("/tmp/test"));
let llm_client = Arc::new(MockLLMClient::new());
let updater = CascadeLayerUpdater::new(filesystem, llm_client, tx);
assert_eq!(updater.get_scope_root(&MemoryScope::User, "user_001"), "cortex://user/user_001");
assert_eq!(updater.get_scope_root(&MemoryScope::Agent, "agent_001"), "cortex://agent/agent_001");
assert_eq!(updater.get_scope_root(&MemoryScope::Session, "session_001"), "cortex://session/session_001");
assert_eq!(updater.get_scope_root(&MemoryScope::Resources, ""), "cortex://resources");
}
#[test]
fn test_get_parent_directory_opt() {
let (tx, _rx) = mpsc::unbounded_channel();
let filesystem = Arc::new(CortexFilesystem::new("/tmp/test"));
let llm_client = Arc::new(MockLLMClient::new());
let updater = CascadeLayerUpdater::new(filesystem, llm_client, tx);
assert_eq!(updater.get_parent_directory_opt("cortex://user/test/file.md"), Some("cortex://user/test".to_string()));
assert_eq!(updater.get_parent_directory_opt("cortex://user/file.md"), Some("cortex://user".to_string()));
assert_eq!(updater.get_parent_directory_opt("cortex://file.md"), Some("cortex:/".to_string()));
}
}