pub mod backend;
pub mod claude_cli;
pub mod merge;
pub mod prompts;
use crate::config::Config;
use crate::db;
use crate::errors::CoreError;
use crate::ingest::{context, session};
use crate::models::{
AnalysisResponse, AnalyzeResult, AnalyzeV2Result, BatchDetail, EdgeType,
GraphAnalysisResponse, GraphOperation, NodeScope, NodeStatus, NodeType, Pattern,
PatternStatus, PatternType, SuggestedTarget,
};
use crate::scrub;
use chrono::{Duration, Utc};
use rusqlite::Connection;
use std::path::Path;
use backend::AnalysisBackend;
use claude_cli::ClaudeCliBackend;
pub const BATCH_SIZE: usize = 20;
pub const ANALYSIS_RESPONSE_SCHEMA: &str = r#"{
"type": "object",
"properties": {
"reasoning": {"type": "string"},
"patterns": {
"type": "array",
"items": {
"type": "object",
"properties": {
"action": {"type": "string", "enum": ["new", "update"]},
"pattern_type": {"type": "string", "enum": ["repetitive_instruction", "recurring_mistake", "workflow_pattern", "stale_context", "redundant_context"]},
"description": {"type": "string"},
"confidence": {"type": "number"},
"source_sessions": {"type": "array", "items": {"type": "string"}},
"related_files": {"type": "array", "items": {"type": "string"}},
"suggested_content": {"type": "string"},
"suggested_target": {"type": "string", "enum": ["skill", "claude_md", "global_agent", "db_only"]},
"existing_id": {"type": "string"},
"new_sessions": {"type": "array", "items": {"type": "string"}},
"new_confidence": {"type": "number"}
},
"required": ["action"],
"additionalProperties": false
}
}
},
"required": ["reasoning", "patterns"],
"additionalProperties": false
}"#;
pub const GRAPH_ANALYSIS_RESPONSE_SCHEMA: &str = r#"{
"type": "object",
"properties": {
"reasoning": { "type": "string", "description": "1-2 sentence summary of what you observed" },
"operations": {
"type": "array",
"items": {
"type": "object",
"properties": {
"action": { "type": "string", "enum": ["create_node", "update_node", "create_edge", "merge_nodes"] },
"node_type": { "type": "string", "enum": ["preference", "pattern", "rule", "skill", "memory", "directive"] },
"scope": { "type": "string", "enum": ["global", "project"] },
"project_id": { "type": "string" },
"content": { "type": "string" },
"confidence": { "type": "number", "minimum": 0.0, "maximum": 1.0 },
"node_id": { "type": "string" },
"new_confidence": { "type": "number", "minimum": 0.0, "maximum": 1.0 },
"new_content": { "type": "string" },
"source_id": { "type": "string" },
"target_id": { "type": "string" },
"edge_type": { "type": "string", "enum": ["supports", "contradicts", "supersedes", "derived_from", "applies_to"] },
"keep_id": { "type": "string" },
"remove_id": { "type": "string" }
},
"required": ["action"],
"additionalProperties": false
}
}
},
"required": ["reasoning", "operations"],
"additionalProperties": false
}"#;
pub fn full_management_analysis_schema() -> String {
let mut schema: serde_json::Value = serde_json::from_str(ANALYSIS_RESPONSE_SCHEMA)
.expect("ANALYSIS_RESPONSE_SCHEMA must be valid JSON");
let edits_schema: serde_json::Value = serde_json::json!({
"type": "array",
"items": {
"type": "object",
"properties": {
"edit_type": {"type": "string", "enum": ["add", "remove", "reword", "move"]},
"original_text": {"type": "string"},
"suggested_content": {"type": "string"},
"target_section": {"type": "string"},
"reasoning": {"type": "string"}
},
"required": ["edit_type", "reasoning"],
"additionalProperties": false
}
});
schema["properties"]["claude_md_edits"] = edits_schema;
serde_json::to_string_pretty(&schema).expect("schema serialization cannot fail")
}
pub fn analyze<F>(
conn: &Connection,
config: &Config,
project: Option<&str>,
window_days: u32,
on_batch_start: F,
) -> Result<AnalyzeResult, CoreError>
where
F: Fn(usize, usize, usize, usize),
{
if !ClaudeCliBackend::is_available() {
return Err(CoreError::Analysis(
"claude CLI not found on PATH. Install Claude Code CLI to use analysis.".to_string(),
));
}
ClaudeCliBackend::check_auth()?;
let since = Utc::now() - Duration::days(window_days as i64);
let rolling = config.analysis.rolling_window;
let sessions_to_analyze = db::get_sessions_for_analysis(conn, project, &since, rolling)?;
if sessions_to_analyze.is_empty() {
return Ok(AnalyzeResult {
sessions_analyzed: 0,
new_patterns: 0,
updated_patterns: 0,
total_patterns: 0,
input_tokens: 0,
output_tokens: 0,
batch_details: Vec::new(),
});
}
let mut parsed_sessions = Vec::new();
for ingested in &sessions_to_analyze {
let path = Path::new(&ingested.session_path);
if !path.exists() {
eprintln!(
"warning: session file not found: {}",
ingested.session_path
);
continue;
}
match session::parse_session_file(path, &ingested.session_id, &ingested.project) {
Ok(mut s) => {
if config.privacy.scrub_secrets {
scrub::scrub_session(&mut s);
}
parsed_sessions.push(s);
}
Err(e) => {
eprintln!(
"warning: failed to re-parse session {}: {e}",
ingested.session_id
);
}
}
}
let before_filter = parsed_sessions.len();
parsed_sessions.retain(|s| s.user_messages.len() >= 2);
let filtered_out = before_filter - parsed_sessions.len();
if filtered_out > 0 {
eprintln!(
" Skipped {} single-message session{} (no pattern signal)",
filtered_out,
if filtered_out == 1 { "" } else { "s" }
);
}
let analyzed_count = parsed_sessions.len();
if parsed_sessions.is_empty() {
for ingested in &sessions_to_analyze {
db::record_analyzed_session(conn, &ingested.session_id, &ingested.project)?;
}
return Ok(AnalyzeResult {
sessions_analyzed: 0,
new_patterns: 0,
updated_patterns: 0,
total_patterns: 0,
input_tokens: 0,
output_tokens: 0,
batch_details: Vec::new(),
});
}
let context_summary = match project {
Some(project_path) => context::snapshot_context(config, project_path)
.ok()
.map(|s| prompts::build_context_summary(&s))
.filter(|s| !s.is_empty()),
None => None,
};
let backend = ClaudeCliBackend::new(&config.ai);
let mut total_input_tokens: u64 = 0;
let mut total_output_tokens: u64 = 0;
let mut new_count = 0;
let mut update_count = 0;
let mut batch_details: Vec<BatchDetail> = Vec::new();
let total_batches = (parsed_sessions.len() + BATCH_SIZE - 1) / BATCH_SIZE;
for (batch_idx, batch) in parsed_sessions.chunks(BATCH_SIZE).enumerate() {
let existing = db::get_patterns(conn, &["discovered", "active"], project)?;
let full_mgmt = config.claude_md.full_management;
let prompt = prompts::build_analysis_prompt(batch, &existing, context_summary.as_deref(), full_mgmt);
let prompt_chars = prompt.len();
on_batch_start(batch_idx, total_batches, batch.len(), prompt_chars);
let schema_string;
let schema: &str = if full_mgmt {
schema_string = full_management_analysis_schema();
&schema_string
} else {
ANALYSIS_RESPONSE_SCHEMA
};
let response = backend.execute(&prompt, Some(schema))?;
total_input_tokens += response.input_tokens;
total_output_tokens += response.output_tokens;
let analysis_resp = parse_analysis_response(&response.text).map_err(|e| {
CoreError::Analysis(format!(
"{e}\n(prompt_chars={}, output_tokens={}, result_chars={})",
prompt_chars,
response.output_tokens,
response.text.len()
))
})?;
let reasoning = analysis_resp.reasoning;
let claude_md_edits = analysis_resp.claude_md_edits;
let (new_patterns, merge_updates) =
merge::process_updates(analysis_resp.patterns, &existing, project);
let mut batch_new = new_patterns.len();
let batch_updated = merge_updates.len();
for pattern in &new_patterns {
db::insert_pattern(conn, pattern)?;
new_count += 1;
}
for update in &merge_updates {
db::update_pattern_merge(
conn,
&update.pattern_id,
&update.new_sessions,
update.new_confidence,
Utc::now(),
update.additional_times_seen,
)?;
update_count += 1;
}
for edit in &claude_md_edits {
let edit_json = serde_json::json!({
"edit_type": edit.edit_type.to_string(),
"original": edit.original_text,
"replacement": edit.suggested_content,
"target_section": edit.target_section,
"reasoning": edit.reasoning,
});
let description = format!(
"[edit:{}] {}",
edit.edit_type,
edit.original_text
);
let now = Utc::now();
let pattern = Pattern {
id: uuid::Uuid::new_v4().to_string(),
pattern_type: PatternType::RedundantContext,
description,
confidence: 0.75,
times_seen: 1,
first_seen: now,
last_seen: now,
last_projected: None,
status: PatternStatus::Discovered,
source_sessions: batch.iter().map(|s| s.session_id.clone()).collect(),
related_files: Vec::new(),
suggested_content: edit_json.to_string(),
suggested_target: SuggestedTarget::ClaudeMd,
project: project.map(String::from),
generation_failed: false,
};
db::insert_pattern(conn, &pattern)?;
new_count += 1;
batch_new += 1;
}
let preview = truncate_for_error(&response.text, 500).to_string();
batch_details.push(BatchDetail {
batch_index: batch_idx,
session_count: batch.len(),
session_ids: batch.iter().map(|s| s.session_id.clone()).collect(),
prompt_chars,
input_tokens: response.input_tokens,
output_tokens: response.output_tokens,
new_patterns: batch_new,
updated_patterns: batch_updated,
reasoning,
ai_response_preview: preview,
});
}
for ingested in &sessions_to_analyze {
db::record_analyzed_session(conn, &ingested.session_id, &ingested.project)?;
}
let discovered = db::pattern_count_by_status(conn, "discovered")?;
let active = db::pattern_count_by_status(conn, "active")?;
Ok(AnalyzeResult {
sessions_analyzed: analyzed_count,
new_patterns: new_count,
updated_patterns: update_count,
total_patterns: (discovered + active) as usize,
input_tokens: total_input_tokens,
output_tokens: total_output_tokens,
batch_details,
})
}
pub fn analyze_v2<F>(
conn: &Connection,
config: &Config,
project: Option<&str>,
window_days: u32,
on_batch_start: F,
) -> Result<AnalyzeV2Result, CoreError>
where
F: Fn(usize, usize, usize, usize),
{
if !ClaudeCliBackend::is_available() {
return Err(CoreError::Analysis(
"claude CLI not found on PATH. Install Claude Code CLI to use analysis.".to_string(),
));
}
ClaudeCliBackend::check_auth()?;
let since = Utc::now() - Duration::days(window_days as i64);
let rolling = config.analysis.rolling_window;
let sessions_to_analyze = db::get_sessions_for_analysis(conn, project, &since, rolling)?;
if sessions_to_analyze.is_empty() {
return Ok(AnalyzeV2Result {
sessions_analyzed: 0,
nodes_created: 0,
nodes_updated: 0,
edges_created: 0,
nodes_merged: 0,
input_tokens: 0,
output_tokens: 0,
batch_count: 0,
});
}
let mut parsed_sessions = Vec::new();
for ingested in &sessions_to_analyze {
let path = Path::new(&ingested.session_path);
if !path.exists() {
eprintln!(
"warning: session file not found: {}",
ingested.session_path
);
continue;
}
match session::parse_session_file(path, &ingested.session_id, &ingested.project) {
Ok(mut s) => {
if config.privacy.scrub_secrets {
scrub::scrub_session(&mut s);
}
parsed_sessions.push(s);
}
Err(e) => {
eprintln!(
"warning: failed to re-parse session {}: {e}",
ingested.session_id
);
}
}
}
let before_filter = parsed_sessions.len();
parsed_sessions.retain(|s| s.user_messages.len() >= 2);
let filtered_out = before_filter - parsed_sessions.len();
if filtered_out > 0 {
eprintln!(
" Skipped {} single-message session{} (no pattern signal)",
filtered_out,
if filtered_out == 1 { "" } else { "s" }
);
}
let analyzed_count = parsed_sessions.len();
if parsed_sessions.is_empty() {
for ingested in &sessions_to_analyze {
db::record_analyzed_session(conn, &ingested.session_id, &ingested.project)?;
}
return Ok(AnalyzeV2Result {
sessions_analyzed: 0,
nodes_created: 0,
nodes_updated: 0,
edges_created: 0,
nodes_merged: 0,
input_tokens: 0,
output_tokens: 0,
batch_count: 0,
});
}
let compact_sessions: Vec<_> = parsed_sessions
.iter()
.map(prompts::to_compact_session)
.collect();
let existing_nodes = db::get_nodes_by_status(conn, &NodeStatus::Active).unwrap_or_default();
let project_slug = project.map(db::generate_project_slug);
let backend = ClaudeCliBackend::new(&config.ai);
let mut total_input_tokens: u64 = 0;
let mut total_output_tokens: u64 = 0;
let mut total_nodes_created: usize = 0;
let mut total_nodes_updated: usize = 0;
let mut total_edges_created: usize = 0;
let mut total_nodes_merged: usize = 0;
let total_batches = (compact_sessions.len() + BATCH_SIZE - 1) / BATCH_SIZE;
let mut batch_count: usize = 0;
for (batch_idx, batch) in compact_sessions.chunks(BATCH_SIZE).enumerate() {
let prompt = prompts::build_graph_analysis_prompt(
batch,
&existing_nodes,
project_slug.as_deref(),
);
let prompt_chars = prompt.len();
on_batch_start(batch_idx, total_batches, batch.len(), prompt_chars);
let response = backend.execute(&prompt, Some(GRAPH_ANALYSIS_RESPONSE_SCHEMA))?;
total_input_tokens += response.input_tokens;
total_output_tokens += response.output_tokens;
let ops = parse_graph_response(&response.text, project_slug.as_deref()).map_err(|e| {
CoreError::Analysis(format!(
"{e}\n(prompt_chars={}, output_tokens={}, result_chars={})",
prompt_chars,
response.output_tokens,
response.text.len()
))
})?;
let graph_result = db::apply_graph_operations(conn, &ops)?;
total_nodes_created += graph_result.nodes_created;
total_nodes_updated += graph_result.nodes_updated;
total_edges_created += graph_result.edges_created;
total_nodes_merged += graph_result.nodes_merged;
batch_count += 1;
}
for ingested in &sessions_to_analyze {
db::record_analyzed_session(conn, &ingested.session_id, &ingested.project)?;
}
Ok(AnalyzeV2Result {
sessions_analyzed: analyzed_count,
nodes_created: total_nodes_created,
nodes_updated: total_nodes_updated,
edges_created: total_edges_created,
nodes_merged: total_nodes_merged,
input_tokens: total_input_tokens,
output_tokens: total_output_tokens,
batch_count,
})
}
fn parse_analysis_response(text: &str) -> Result<AnalysisResponse, CoreError> {
let trimmed = text.trim();
let response: AnalysisResponse = serde_json::from_str(trimmed).map_err(|e| {
CoreError::Analysis(format!(
"failed to parse AI response as JSON: {e}\nresponse text: {}",
truncate_for_error(text, 1500)
))
})?;
Ok(response)
}
pub fn parse_graph_response(json: &str, default_project: Option<&str>) -> Result<Vec<GraphOperation>, CoreError> {
let response: GraphAnalysisResponse = serde_json::from_str(json)
.map_err(|e| CoreError::Parse(format!("failed to parse graph analysis response: {e}")))?;
let mut ops = Vec::new();
for op_resp in &response.operations {
match op_resp.action.as_str() {
"create_node" => {
let node_type = op_resp.node_type.as_deref()
.map(NodeType::from_str)
.unwrap_or(NodeType::Pattern);
let scope = op_resp.scope.as_deref()
.map(NodeScope::from_str)
.unwrap_or(NodeScope::Project);
let project_id = match scope {
NodeScope::Global => None,
NodeScope::Project => op_resp.project_id.clone()
.or_else(|| default_project.map(String::from)),
};
ops.push(GraphOperation::CreateNode {
node_type,
scope,
project_id,
content: op_resp.content.clone().unwrap_or_default(),
confidence: op_resp.confidence.unwrap_or(0.5),
});
}
"update_node" => {
if let Some(id) = &op_resp.node_id {
ops.push(GraphOperation::UpdateNode {
id: id.clone(),
confidence: op_resp.new_confidence,
content: op_resp.new_content.clone(),
});
}
}
"create_edge" => {
if let (Some(source), Some(target)) = (&op_resp.source_id, &op_resp.target_id) {
let edge_type = op_resp.edge_type.as_deref()
.and_then(EdgeType::from_str)
.unwrap_or(EdgeType::Supports);
ops.push(GraphOperation::CreateEdge {
source_id: source.clone(),
target_id: target.clone(),
edge_type,
});
}
}
"merge_nodes" => {
if let (Some(keep), Some(remove)) = (&op_resp.keep_id, &op_resp.remove_id) {
ops.push(GraphOperation::MergeNodes {
keep_id: keep.clone(),
remove_id: remove.clone(),
});
}
}
_ => {} }
}
Ok(ops)
}
fn truncate_for_error(s: &str, max: usize) -> &str {
if s.len() <= max {
s
} else {
let mut i = max;
while i > 0 && !s.is_char_boundary(i) {
i -= 1;
}
&s[..i]
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::models::PatternUpdate;
#[test]
fn test_parse_analysis_response_json() {
let json = r#"{
"reasoning": "Found recurring instruction across sessions.",
"patterns": [
{
"action": "new",
"pattern_type": "repetitive_instruction",
"description": "User always asks to use uv",
"confidence": 0.85,
"source_sessions": ["sess-1"],
"related_files": [],
"suggested_content": "Always use uv",
"suggested_target": "claude_md"
},
{
"action": "update",
"existing_id": "pat-123",
"new_sessions": ["sess-2"],
"new_confidence": 0.92
}
]
}"#;
let resp = parse_analysis_response(json).unwrap();
assert_eq!(resp.reasoning, "Found recurring instruction across sessions.");
assert_eq!(resp.patterns.len(), 2);
assert!(matches!(&resp.patterns[0], PatternUpdate::New(_)));
assert!(matches!(&resp.patterns[1], PatternUpdate::Update(_)));
}
#[test]
fn test_parse_analysis_response_null_fields() {
let json = r#"{
"reasoning": "Observed a single pattern.",
"patterns": [
{
"action": "new",
"pattern_type": "repetitive_instruction",
"description": "Some pattern",
"confidence": 0.8,
"source_sessions": [],
"related_files": [],
"suggested_content": null,
"suggested_target": "claude_md"
}
]
}"#;
let resp = parse_analysis_response(json).unwrap();
assert_eq!(resp.patterns.len(), 1);
if let PatternUpdate::New(ref p) = resp.patterns[0] {
assert_eq!(p.suggested_content, "");
} else {
panic!("expected New pattern");
}
}
#[test]
fn test_parse_analysis_response_empty() {
let json = r#"{"reasoning": "No recurring patterns found.", "patterns": []}"#;
let resp = parse_analysis_response(json).unwrap();
assert_eq!(resp.reasoning, "No recurring patterns found.");
assert!(resp.patterns.is_empty());
}
#[test]
fn test_parse_analysis_response_missing_reasoning_defaults_empty() {
let json = r#"{"patterns": []}"#;
let resp = parse_analysis_response(json).unwrap();
assert_eq!(resp.reasoning, "");
assert!(resp.patterns.is_empty());
}
#[test]
fn test_parse_analysis_response_pure_prose_fails() {
let text = "I analyzed the sessions but found no recurring patterns worth reporting.";
let result = parse_analysis_response(text);
assert!(result.is_err());
}
#[test]
fn test_analysis_response_schema_is_valid_json() {
let value: serde_json::Value = serde_json::from_str(ANALYSIS_RESPONSE_SCHEMA)
.expect("ANALYSIS_RESPONSE_SCHEMA must be valid JSON");
assert_eq!(value["type"], "object");
assert!(value["properties"]["patterns"].is_object());
}
#[test]
fn test_full_management_analysis_schema_is_valid_json() {
let schema_str = full_management_analysis_schema();
let value: serde_json::Value =
serde_json::from_str(&schema_str).expect("full_management schema must be valid JSON");
assert_eq!(value["type"], "object");
assert!(value["properties"]["patterns"].is_object());
}
#[test]
fn test_full_management_analysis_schema_contains_claude_md_edits() {
let schema_str = full_management_analysis_schema();
let value: serde_json::Value = serde_json::from_str(&schema_str).unwrap();
let edits = &value["properties"]["claude_md_edits"];
assert!(edits.is_object(), "claude_md_edits should be in properties");
assert_eq!(edits["type"], "array");
let items = &edits["items"];
assert_eq!(items["type"], "object");
let required: Vec<String> = items["required"]
.as_array()
.unwrap()
.iter()
.map(|v| v.as_str().unwrap().to_string())
.collect();
assert!(required.contains(&"edit_type".to_string()));
assert!(required.contains(&"reasoning".to_string()));
let edit_type_enum = items["properties"]["edit_type"]["enum"]
.as_array()
.unwrap();
let enum_values: Vec<&str> = edit_type_enum.iter().map(|v| v.as_str().unwrap()).collect();
assert!(enum_values.contains(&"add"));
assert!(enum_values.contains(&"remove"));
assert!(enum_values.contains(&"reword"));
assert!(enum_values.contains(&"move"));
assert_eq!(items["additionalProperties"], false);
}
#[test]
fn test_full_management_schema_claude_md_edits_not_required() {
let schema_str = full_management_analysis_schema();
let value: serde_json::Value = serde_json::from_str(&schema_str).unwrap();
let required: Vec<String> = value["required"]
.as_array()
.unwrap()
.iter()
.map(|v| v.as_str().unwrap().to_string())
.collect();
assert!(
!required.contains(&"claude_md_edits".to_string()),
"claude_md_edits should NOT be in top-level required"
);
assert!(required.contains(&"reasoning".to_string()));
assert!(required.contains(&"patterns".to_string()));
}
#[test]
fn test_full_management_schema_preserves_base_patterns() {
let base: serde_json::Value = serde_json::from_str(ANALYSIS_RESPONSE_SCHEMA).unwrap();
let full: serde_json::Value =
serde_json::from_str(&full_management_analysis_schema()).unwrap();
assert_eq!(
base["properties"]["patterns"],
full["properties"]["patterns"],
"patterns schema should be identical between base and full_management"
);
assert_eq!(
base["properties"]["reasoning"],
full["properties"]["reasoning"],
"reasoning schema should be identical"
);
}
#[test]
fn test_graph_analysis_schema_is_valid_json() {
let _: serde_json::Value = serde_json::from_str(GRAPH_ANALYSIS_RESPONSE_SCHEMA)
.expect("schema must be valid JSON");
}
#[test]
fn test_parse_graph_response() {
let json = r#"{
"reasoning": "Found testing pattern",
"operations": [
{
"action": "create_node",
"node_type": "rule",
"scope": "project",
"content": "Always run tests",
"confidence": 0.85
},
{
"action": "update_node",
"node_id": "existing-1",
"new_confidence": 0.9
}
]
}"#;
let ops = parse_graph_response(json, Some("my-app")).unwrap();
assert_eq!(ops.len(), 2);
match &ops[0] {
GraphOperation::CreateNode { content, scope, .. } => {
assert_eq!(content, "Always run tests");
assert_eq!(*scope, NodeScope::Project);
}
_ => panic!("Expected CreateNode"),
}
match &ops[1] {
GraphOperation::UpdateNode { id, confidence, .. } => {
assert_eq!(id, "existing-1");
assert_eq!(*confidence, Some(0.9));
}
_ => panic!("Expected UpdateNode"),
}
}
}