use crate::layers::generator::{AbstractGenerator, OverviewGenerator};
use crate::llm::LLMClient;
use crate::{CortexFilesystem, FilesystemOperations, Result};
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use tracing::{debug, info, warn};
#[derive(Debug, Clone)]
pub struct LayerGenerationConfig {
pub batch_size: usize,
pub delay_ms: u64,
pub auto_generate_on_startup: bool,
pub abstract_config: AbstractConfig,
pub overview_config: OverviewConfig,
}
#[derive(Debug, Clone)]
pub struct AbstractConfig {
pub max_tokens: usize,
pub max_chars: usize,
pub target_sentences: usize,
}
#[derive(Debug, Clone)]
pub struct OverviewConfig {
pub max_tokens: usize,
pub max_chars: usize,
}
impl Default for LayerGenerationConfig {
fn default() -> Self {
Self {
batch_size: 10,
delay_ms: 2000,
auto_generate_on_startup: false,
abstract_config: AbstractConfig {
max_tokens: 400,
max_chars: 2000,
target_sentences: 2,
},
overview_config: OverviewConfig {
max_tokens: 1500,
max_chars: 6000,
},
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct GenerationStats {
pub total: usize,
pub generated: usize,
pub failed: usize,
}
pub struct LayerGenerator {
filesystem: Arc<CortexFilesystem>,
abstract_gen: AbstractGenerator,
overview_gen: OverviewGenerator,
llm_client: Arc<dyn LLMClient>,
config: LayerGenerationConfig,
}
impl LayerGenerator {
pub fn new(
filesystem: Arc<CortexFilesystem>,
llm_client: Arc<dyn LLMClient>,
config: LayerGenerationConfig,
) -> Self {
Self {
filesystem,
abstract_gen: AbstractGenerator::new(),
overview_gen: OverviewGenerator::new(),
llm_client,
config,
}
}
pub async fn scan_all_directories(&self) -> Result<Vec<String>> {
let mut directories = Vec::new();
for scope in &["session", "user", "agent", "resources"] {
let scope_uri = format!("cortex://{}", scope);
match self.filesystem.exists(&scope_uri).await {
Ok(true) => {
debug!("Scanning scope: {}", scope);
match self.scan_scope(&scope_uri).await {
Ok(dirs) => {
debug!("Scope {} found {} directories", scope, dirs.len());
directories.extend(dirs);
}
Err(e) => {
warn!("Failed to scan scope {}: {}", scope, e);
}
}
}
Ok(false) => {
debug!("Scope {} does not exist, skipping", scope);
}
Err(e) => {
warn!("Failed to check scope {} existence: {}", scope, e);
}
}
}
Ok(directories)
}
async fn scan_scope(&self, scope_uri: &str) -> Result<Vec<String>> {
let mut directories = Vec::new();
match self.filesystem.exists(scope_uri).await {
Ok(true) => {
debug!("Scope directory exists: {}", scope_uri);
}
Ok(false) => {
debug!("Scope directory does not exist: {}", scope_uri);
return Ok(directories);
}
Err(e) => {
warn!("Failed to check scope existence: {} - {}", scope_uri, e);
return Ok(directories);
}
}
match self.filesystem.list(scope_uri).await {
Ok(entries) => {
debug!("Scope {} has {} entries", scope_uri, entries.len());
}
Err(e) => {
warn!("Failed to list scope directory: {} - {}", scope_uri, e);
return Ok(directories);
}
}
self.scan_recursive(scope_uri, &mut directories).await?;
Ok(directories)
}
fn scan_recursive<'a>(
&'a self,
uri: &'a str,
directories: &'a mut Vec<String>,
) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<()>> + Send + 'a>> {
Box::pin(async move {
let entries = match self.filesystem.list(uri).await {
Ok(entries) => entries,
Err(e) => {
debug!("Failed to list {}: {}", uri, e);
return Ok(());
}
};
for entry in entries {
if entry.name.starts_with('.') {
continue;
}
if entry.is_directory {
directories.push(entry.uri.clone());
self.scan_recursive(&entry.uri, directories).await?;
}
}
Ok(())
})
}
pub async fn has_layers(&self, uri: &str) -> Result<bool> {
let abstract_path = format!("{}/.abstract.md", uri);
let overview_path = format!("{}/.overview.md", uri);
let has_abstract = self.filesystem.exists(&abstract_path).await?;
let has_overview = self.filesystem.exists(&overview_path).await?;
Ok(has_abstract && has_overview)
}
pub async fn filter_missing_layers(&self, dirs: &[String]) -> Result<Vec<String>> {
let mut missing = Vec::new();
for dir in dirs {
match self.has_layers(dir).await {
Ok(has) => {
if !has {
missing.push(dir.clone());
}
}
Err(e) => {
debug!("Failed to check layers for {}: {}", dir, e);
}
}
}
Ok(missing)
}
pub async fn ensure_all_layers(&self) -> Result<GenerationStats> {
info!("Scanning directories for missing L0/L1 layers...");
let directories = self.scan_all_directories().await?;
debug!("Found {} directories", directories.len());
for dir in &directories {
debug!("Scanned directory: {}", dir);
}
let missing = self.filter_missing_layers(&directories).await?;
info!("Found {} directories missing L0/L1", missing.len());
if missing.is_empty() {
return Ok(GenerationStats {
total: 0,
generated: 0,
failed: 0,
});
}
let mut stats = GenerationStats {
total: missing.len(),
generated: 0,
failed: 0,
};
let total_batches = (missing.len() + self.config.batch_size - 1) / self.config.batch_size;
for (batch_idx, batch) in missing.chunks(self.config.batch_size).enumerate() {
debug!("Processing batch {}/{}", batch_idx + 1, total_batches);
for dir in batch {
match self.generate_layers_for_directory(dir).await {
Ok(_) => {
stats.generated += 1;
debug!("Generated: {}", dir);
}
Err(e) => {
stats.failed += 1;
warn!("Failed to generate for {}: {}", dir, e);
}
}
}
if batch_idx < total_batches - 1 {
tokio::time::sleep(tokio::time::Duration::from_millis(self.config.delay_ms)).await;
}
}
info!("Layer generation completed: {} generated, {} failed", stats.generated, stats.failed);
Ok(stats)
}
pub async fn ensure_timeline_layers(&self, timeline_uri: &str) -> Result<GenerationStats> {
info!("Starting layer generation for timeline: {}", timeline_uri);
let mut directories = Vec::new();
self.scan_recursive(timeline_uri, &mut directories).await?;
info!("Found {} timeline directories", directories.len());
let missing = self.filter_missing_layers(&directories).await?;
info!("Found {} directories missing L0/L1", missing.len());
if missing.is_empty() {
return Ok(GenerationStats {
total: 0,
generated: 0,
failed: 0,
});
}
let mut stats = GenerationStats {
total: missing.len(),
generated: 0,
failed: 0,
};
for dir in missing {
match self.generate_layers_for_directory(&dir).await {
Ok(_) => {
stats.generated += 1;
info!("Generation succeeded: {}", dir);
}
Err(e) => {
stats.failed += 1;
warn!("Generation failed: {} - {}", dir, e);
}
}
}
info!(
"Timeline layer generation completed: {} succeeded, {} failed",
stats.generated, stats.failed
);
Ok(stats)
}
async fn generate_layers_for_directory(&self, uri: &str) -> Result<()> {
debug!("Generating layer files for: {}", uri);
if !self.should_regenerate(uri).await? {
debug!("Directory content unchanged, skipping generation: {}", uri);
return Ok(());
}
let content = self.aggregate_directory_content_recursive(uri).await?;
if content.is_empty() {
debug!("Directory has no leaf content, skipping: {}", uri);
return Ok(());
}
let abstract_text = self
.abstract_gen
.generate_with_llm(&content, &self.llm_client, &[])
.await?;
let overview = self
.overview_gen
.generate_with_llm(&content, &self.llm_client)
.await?;
let abstract_text = self.enforce_abstract_limit(abstract_text)?;
let overview = self.enforce_overview_limit(overview)?;
let timestamp = Utc::now().format("%Y-%m-%d %H:%M:%S UTC");
let abstract_with_date = format!("{}\n\n**Added**: {}", abstract_text, timestamp);
let overview_with_date = format!("{}\n\n---\n\n**Added**: {}", overview, timestamp);
let abstract_path = format!("{}/.abstract.md", uri);
let overview_path = format!("{}/.overview.md", uri);
self.filesystem
.write(&abstract_path, &abstract_with_date)
.await?;
self.filesystem
.write(&overview_path, &overview_with_date)
.await?;
debug!("Layer files generated for: {}", uri);
Ok(())
}
async fn should_regenerate(&self, uri: &str) -> Result<bool> {
let abstract_path = format!("{}/.abstract.md", uri);
let overview_path = format!("{}/.overview.md", uri);
let abstract_exists = self.filesystem.exists(&abstract_path).await?;
let overview_exists = self.filesystem.exists(&overview_path).await?;
if !abstract_exists || !overview_exists {
debug!("Layer files missing, need to generate: {}", uri);
return Ok(true);
}
let abstract_content = match self.filesystem.read(&abstract_path).await {
Ok(content) => content,
Err(_) => {
debug!("Cannot read .abstract.md, need to regenerate: {}", uri);
return Ok(true);
}
};
let abstract_timestamp = self.extract_added_timestamp(&abstract_content);
if abstract_timestamp.is_none() {
debug!(".abstract.md missing timestamp, need to regenerate: {}", uri);
return Ok(true);
}
let abstract_time = abstract_timestamp.unwrap();
let entries = self.filesystem.list(uri).await?;
for entry in entries {
if entry.name.starts_with('.') || entry.is_directory {
continue;
}
if entry.name.ends_with(".md") || entry.name.ends_with(".txt") {
if let Ok(file_content) = self.filesystem.read(&entry.uri).await {
if let Some(file_time) = self.extract_added_timestamp(&file_content) {
if file_time > abstract_time {
debug!("File {} has updates, need to regenerate: {}", entry.name, uri);
return Ok(true);
}
}
}
}
}
debug!("Directory content unchanged, no need to regenerate: {}", uri);
Ok(false)
}
fn extract_added_timestamp(&self, content: &str) -> Option<DateTime<Utc>> {
if let Some(start) = content.find("**Added**: ") {
let timestamp_str = &content[start + 11..];
if let Some(end) = timestamp_str.find('\n') {
let timestamp_str = ×tamp_str[..end].trim();
if let Ok(dt) = DateTime::parse_from_str(timestamp_str, "%Y-%m-%d %H:%M:%S UTC") {
return Some(dt.with_timezone(&Utc));
}
}
}
None
}
fn aggregate_directory_content_recursive<'a>(
&'a self,
uri: &'a str,
) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<String>> + Send + 'a>> {
Box::pin(async move {
let entries = match self.filesystem.list(uri).await {
Ok(e) => e,
Err(_) => return Ok(String::new()),
};
let mut content = String::new();
for entry in entries {
if entry.name.starts_with('.') {
continue;
}
if entry.is_directory {
let sub = self.aggregate_directory_content_recursive(&entry.uri).await?;
if !sub.is_empty() {
content.push_str(&sub);
}
} else if entry.name.ends_with(".md") || entry.name.ends_with(".txt") {
match self.filesystem.read(&entry.uri).await {
Ok(file_content) => {
content.push_str(&format!("\n\n=== {} ===\n\n", entry.name));
content.push_str(&file_content);
}
Err(e) => debug!("Failed to read {}: {}", entry.uri, e),
}
}
if content.chars().count() > 10000 {
content = content.chars().take(10000).collect();
content.push_str("\n\n[内容已截断...]");
return Ok(content);
}
}
Ok(content)
})
}
fn enforce_abstract_limit(&self, text: String) -> Result<String> {
let mut result = text.trim().to_string();
let max_chars = self.config.abstract_config.max_chars;
if result.chars().count() <= max_chars {
return Ok(result);
}
let byte_limit = result
.char_indices()
.nth(max_chars)
.map(|(i, _)| i)
.unwrap_or(result.len());
if let Some(pos) = result[..byte_limit]
.rfind(|c| c == '。' || c == '.' || c == '?' || c == '!' || c == '!' || c == '?')
{
result.truncate(pos + 1);
} else {
let truncate_pos = result
.char_indices()
.nth(max_chars.saturating_sub(3))
.map(|(i, _)| i)
.unwrap_or(result.len());
result.truncate(truncate_pos);
result.push_str("...");
}
Ok(result)
}
fn enforce_overview_limit(&self, text: String) -> Result<String> {
let mut result = text.trim().to_string();
let max_chars = self.config.overview_config.max_chars;
if result.chars().count() <= max_chars {
return Ok(result);
}
let byte_limit = result
.char_indices()
.nth(max_chars)
.map(|(i, _)| i)
.unwrap_or(result.len());
if let Some(pos) = result[..byte_limit].rfind("\n\n") {
result.truncate(pos);
result.push_str("\n\n[内容已截断...]");
} else {
let truncate_pos = result
.char_indices()
.nth(max_chars.saturating_sub(3))
.map(|(i, _)| i)
.unwrap_or(result.len());
result.truncate(truncate_pos);
result.push_str("...");
}
Ok(result)
}
pub async fn regenerate_oversized_abstracts(&self) -> Result<RegenerationStats> {
info!("Scanning for oversized .abstract files...");
let directories = self.scan_all_directories().await?;
let max_chars = self.config.abstract_config.max_chars;
let mut stats = RegenerationStats {
total: 0,
regenerated: 0,
failed: 0,
};
for dir in directories {
let abstract_path = format!("{}/.abstract.md", dir);
if let Ok(content) = self.filesystem.read(&abstract_path).await {
let content_without_metadata = self.strip_metadata(&content);
if content_without_metadata.len() > max_chars {
stats.total += 1;
info!(
"Found oversized .abstract: {} ({} chars)",
dir,
content_without_metadata.len()
);
match self.generate_layers_for_directory(&dir).await {
Ok(_) => {
stats.regenerated += 1;
info!("Regeneration succeeded: {}", dir);
}
Err(e) => {
stats.failed += 1;
warn!("Regeneration failed: {} - {}", dir, e);
}
}
}
}
}
info!(
"Regeneration completed: total={}, succeeded={}, failed={}",
stats.total, stats.regenerated, stats.failed
);
Ok(stats)
}
fn strip_metadata(&self, content: &str) -> String {
let mut result = content.to_string();
if let Some(pos) = result.find("\n\n**Added**:") {
result.truncate(pos);
} else if let Some(pos) = result.find("**Added**:") {
result.truncate(pos);
}
result.trim().to_string()
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RegenerationStats {
pub total: usize,
pub regenerated: usize,
pub failed: usize,
}