concept_analyzer/
unified_pipeline.rs

1//! Unified Pipeline for Complete Concept Analysis
2//!
3//! This module provides a single-call pipeline that analyzes a repository or corpus,
4//! extracts concepts, identifies relationships, detects gaps, and publishes
5//! first-principles instructions to S3.
6
7use anyhow::{Context, Result};
8use aws_config;
9use aws_sdk_s3::{types::ObjectCannedAcl, Client as S3Client};
10use log::{debug, info};
11use pipelines::Pipeline;
12use serde::{Deserialize, Serialize};
13use std::path::Path;
14use std::sync::Arc;
15use std::time::Instant;
16
17use crate::{
18    analyze_relationships::{Relationship, RelationshipAnalyzer},
19    concept_registry::{ConceptRegistry, Gap},
20    identify_abstractions::{Abstraction, AbstractionIdentifier},
21    llm_client::LLMClient,
22    BatchFileCollector, FileCollection,
23};
24
25#[derive(Debug, Clone, Serialize, Deserialize)]
26pub struct FirstPrinciplesOutput {
27    pub project_name: String,
28    pub core_purpose: String,
29    pub essential_concepts: Vec<EssentialConcept>,
30    pub concept_relationships: Vec<ConceptRelation>,
31    pub implementation_order: Vec<String>,
32    pub critical_gaps: Vec<CriticalGap>,
33    pub rebuild_instructions: Vec<RebuildStep>,
34}
35
36#[derive(Debug, Clone, Serialize, Deserialize)]
37pub struct EssentialConcept {
38    pub name: String,
39    pub purpose: String,
40    pub core_responsibilities: Vec<String>,
41    pub interfaces: Vec<String>,
42}
43
44#[derive(Debug, Clone, Serialize, Deserialize)]
45pub struct ConceptRelation {
46    pub from: String,
47    pub to: String,
48    pub relationship_type: String,
49    pub reason: String,
50}
51
52#[derive(Debug, Clone, Serialize, Deserialize)]
53pub struct CriticalGap {
54    pub missing_functionality: String,
55    pub impact: String,
56    pub suggested_solution: String,
57}
58
59#[derive(Debug, Clone, Serialize, Deserialize)]
60pub struct RebuildStep {
61    pub step_number: usize,
62    pub concept: String,
63    pub implementation_details: String,
64    pub dependencies: Vec<String>,
65    pub validation_criteria: Vec<String>,
66}
67
68pub struct UnifiedAnalysisPipeline {
69    llm_client: Arc<LLMClient>,
70    s3_client: Arc<S3Client>,
71    workers: usize,
72}
73
74impl UnifiedAnalysisPipeline {
75    pub async fn new(llm_api_key: String, workers: usize) -> Result<Self> {
76        info!("šŸš€ Initializing Unified Analysis Pipeline");
77        info!("  └─ Workers: {}", workers);
78
79        let config = aws_config::load_defaults(aws_config::BehaviorVersion::latest()).await;
80        let s3_client = S3Client::new(&config);
81
82        Ok(Self {
83            llm_client: Arc::new(LLMClient::new(llm_api_key)),
84            s3_client: Arc::new(s3_client),
85            workers,
86        })
87    }
88
89    pub async fn analyze_and_publish(
90        &self,
91        repo_path: &Path,
92        s3_bucket: &str,
93        s3_key: &str,
94    ) -> Result<String> {
95        let total_start = Instant::now();
96
97        info!("\nšŸ” STARTING REPOSITORY ANALYSIS");
98        info!("  šŸ“ Repository: {}", repo_path.display());
99        info!("  🪣 S3 Target: s3://{}/{}", s3_bucket, s3_key);
100        info!("  āš™ļø  Workers: {}", self.workers);
101        println!("\n{}", "=".repeat(60));
102
103        // Stage 1: Collect files
104        let stage_start = Instant::now();
105        info!("\nšŸ“‹ STAGE 1/6: File Collection");
106        info!("  └─ Scanning repository for source files...");
107        let files = self.collect_files(repo_path).await?;
108        let file_count: usize = files.iter().map(|fc| fc.files.len()).sum();
109        info!(
110            "  āœ“ Collected {} files in {} batches ({}ms)",
111            file_count,
112            files.len(),
113            stage_start.elapsed().as_millis()
114        );
115
116        // Stage 2-4: Process through pipeline
117        let stage_start = Instant::now();
118        info!("\nšŸ”¬ STAGE 2-4: Parallel Analysis Pipeline");
119        info!("  ā”œā”€ Stage 2: Extracting abstractions from code");
120        info!("  ā”œā”€ Stage 3: Analyzing relationships between concepts");
121        info!(
122            "  └─ Stage 4: Processing with {} parallel workers",
123            self.workers
124        );
125
126        let project_name = repo_path
127            .file_name()
128            .and_then(|n| n.to_str())
129            .unwrap_or("unknown")
130            .to_string();
131
132        let llm = self.llm_client.clone();
133        let workers = self.workers;
134
135        // Create processing pipeline
136        let results: Vec<ProcessedUnit> = Pipeline::from(files)
137            .pmap(workers, move |file_collection| {
138                let llm = llm.clone();
139                let batch_size = file_collection.files.len();
140                debug!("  Processing batch with {} files", batch_size);
141
142                tokio::runtime::Handle::current().block_on(async move {
143                    let result = process_file_collection(file_collection, llm)
144                        .await
145                        .expect("Failed to process file collection");
146                    debug!("  āœ“ Batch processed");
147                    result
148                })
149            })
150            .into_iter()
151            .collect();
152
153        let total_abstractions: usize = results.iter().map(|r| r.abstractions.len()).sum();
154        let total_relationships: usize = results.iter().map(|r| r.relationships.len()).sum();
155        info!(
156            "  āœ“ Found {} abstractions and {} relationships ({}ms)",
157            total_abstractions,
158            total_relationships,
159            stage_start.elapsed().as_millis()
160        );
161
162        // Stage 5: Synthesize results
163        let stage_start = Instant::now();
164        info!("\n🧬 STAGE 5/6: Synthesizing First Principles");
165        info!("  ā”œā”€ Extracting essential concepts");
166        info!("  ā”œā”€ Simplifying relationships");
167        info!("  ā”œā”€ Determining build order");
168        info!("  └─ Generating rebuild instructions");
169        let output = self.synthesize_results(project_name, results).await?;
170        info!(
171            "  āœ“ Synthesis complete ({}ms)",
172            stage_start.elapsed().as_millis()
173        );
174        info!(
175            "    ā”œā”€ Essential concepts: {}",
176            output.essential_concepts.len()
177        );
178        info!(
179            "    ā”œā”€ Relationships: {}",
180            output.concept_relationships.len()
181        );
182        info!("    ā”œā”€ Critical gaps: {}", output.critical_gaps.len());
183        info!(
184            "    └─ Rebuild steps: {}",
185            output.rebuild_instructions.len()
186        );
187
188        // Stage 6: Publish to S3
189        let stage_start = Instant::now();
190        info!("\nā˜ļø  STAGE 6/6: Publishing to S3");
191        info!("  └─ Uploading to s3://{}/{}", s3_bucket, s3_key);
192        let s3_url = self.publish_to_s3(s3_bucket, s3_key, &output).await?;
193        info!(
194            "  āœ“ Published successfully ({}ms)",
195            stage_start.elapsed().as_millis()
196        );
197
198        let total_duration = total_start.elapsed();
199        println!("\n{}", "=".repeat(60));
200        info!("\nāœ… ANALYSIS COMPLETE!");
201        info!("  ā±ļø  Total time: {:.2}s", total_duration.as_secs_f64());
202        info!(
203            "  šŸ“Š Output size: {:.2} KB",
204            serde_json::to_string(&output)?.len() as f64 / 1024.0
205        );
206        info!("  šŸ”— Results: {}", s3_url);
207        println!("\n{}", "=".repeat(60));
208
209        Ok(s3_url)
210    }
211
212    async fn collect_files(&self, repo_path: &Path) -> Result<Vec<FileCollection>> {
213        let collector = BatchFileCollector::new()
214            .with_max_files_per_batch(50)
215            .with_exclude_patterns(vec![
216                "**/.git/**".to_string(),
217                "**/node_modules/**".to_string(),
218                "**/target/**".to_string(),
219                "**/*.lock".to_string(),
220            ]);
221
222        collector
223            .collect_repository(repo_path)
224            .await
225            .context("Failed to collect repository files")
226    }
227
228    async fn synthesize_results(
229        &self,
230        project_name: String,
231        results: Vec<ProcessedUnit>,
232    ) -> Result<FirstPrinciplesOutput> {
233        // Aggregate all abstractions and relationships
234        let mut all_abstractions = Vec::new();
235        let mut all_relationships = Vec::new();
236        let mut registry = ConceptRegistry::new();
237
238        for unit in results {
239            all_abstractions.extend(unit.abstractions);
240            all_relationships.extend(unit.relationships);
241        }
242
243        // Build concept registry and detect gaps
244        registry.add_project(&project_name, all_abstractions.clone());
245        let gaps = registry.detect_gaps();
246
247        // Convert to first principles format
248        let essential_concepts = self.extract_essential_concepts(&all_abstractions).await?;
249        let concept_relations = self.simplify_relationships(&all_relationships).await?;
250        let implementation_order = self
251            .determine_build_order(&essential_concepts, &concept_relations)
252            .await?;
253        let critical_gaps = self.identify_critical_gaps(&gaps).await?;
254        let rebuild_instructions = self
255            .generate_rebuild_instructions(
256                &essential_concepts,
257                &concept_relations,
258                &implementation_order,
259            )
260            .await?;
261
262        // Generate core purpose
263        let core_purpose = self
264            .generate_core_purpose(&project_name, &essential_concepts)
265            .await?;
266
267        Ok(FirstPrinciplesOutput {
268            project_name,
269            core_purpose,
270            essential_concepts,
271            concept_relationships: concept_relations,
272            implementation_order,
273            critical_gaps,
274            rebuild_instructions,
275        })
276    }
277
278    async fn extract_essential_concepts(
279        &self,
280        abstractions: &[Abstraction],
281    ) -> Result<Vec<EssentialConcept>> {
282        debug!(
283            "  Extracting essential concepts from {} abstractions",
284            abstractions.len()
285        );
286        let prompt = format!(
287            "Given these code abstractions, extract only the ESSENTIAL concepts needed to rebuild from first principles. \
288             Remove all implementation details, focusing only on core purposes and interfaces:\n\n{}\n\n\
289             For each essential concept provide:\n\
290             1. Name (simple, clear)\n\
291             2. Core purpose (one sentence)\n\
292             3. Key responsibilities (2-4 items)\n\
293             4. Required interfaces (API boundaries)\n\n\
294             Output as JSON array of objects with fields: name, purpose, core_responsibilities, interfaces",
295            serde_json::to_string_pretty(abstractions)?
296        );
297
298        let response = self.llm_client.complete(&prompt).await?;
299        let concepts: Vec<EssentialConcept> =
300            serde_json::from_str(&response).context("Failed to parse essential concepts")?;
301
302        Ok(concepts)
303    }
304
305    async fn simplify_relationships(
306        &self,
307        relationships: &[Relationship],
308    ) -> Result<Vec<ConceptRelation>> {
309        let prompt = format!(
310            "Simplify these code relationships to their essential nature. \
311             Focus on architectural dependencies, not implementation details:\n\n{}\n\n\
312             Output as JSON array with fields: from, to, relationship_type, reason",
313            serde_json::to_string_pretty(relationships)?
314        );
315
316        let response = self.llm_client.complete(&prompt).await?;
317        let relations: Vec<ConceptRelation> =
318            serde_json::from_str(&response).context("Failed to parse concept relations")?;
319
320        Ok(relations)
321    }
322
323    async fn determine_build_order(
324        &self,
325        concepts: &[EssentialConcept],
326        relations: &[ConceptRelation],
327    ) -> Result<Vec<String>> {
328        let prompt = format!(
329            "Given these concepts and their relationships, determine the optimal build order \
330             from first principles (what must be built first):\n\n\
331             Concepts: {}\n\nRelationships: {}\n\n\
332             Output as JSON array of concept names in build order",
333            serde_json::to_string_pretty(concepts)?,
334            serde_json::to_string_pretty(relations)?
335        );
336
337        let response = self.llm_client.complete(&prompt).await?;
338        let order: Vec<String> =
339            serde_json::from_str(&response).context("Failed to parse build order")?;
340
341        Ok(order)
342    }
343
344    async fn identify_critical_gaps(&self, gaps: &[Gap]) -> Result<Vec<CriticalGap>> {
345        let prompt = format!(
346            "From these detected gaps, identify only CRITICAL missing functionality:\n\n{}\n\n\
347             Output as JSON array with fields: missing_functionality, impact, suggested_solution",
348            serde_json::to_string_pretty(gaps)?
349        );
350
351        let response = self.llm_client.complete(&prompt).await?;
352        let critical: Vec<CriticalGap> =
353            serde_json::from_str(&response).context("Failed to parse critical gaps")?;
354
355        Ok(critical)
356    }
357
358    async fn generate_rebuild_instructions(
359        &self,
360        concepts: &[EssentialConcept],
361        relations: &[ConceptRelation],
362        order: &[String],
363    ) -> Result<Vec<RebuildStep>> {
364        let prompt = format!(
365            "Generate step-by-step rebuild instructions from first principles.\n\n\
366             Concepts: {}\n\nRelationships: {}\n\nBuild Order: {:?}\n\n\
367             For each step provide:\n\
368             1. What to implement (concept)\n\
369             2. How to implement it (essential details only)\n\
370             3. What it depends on\n\
371             4. How to validate it works\n\n\
372             Output as JSON array with fields: step_number, concept, implementation_details, dependencies, validation_criteria",
373            serde_json::to_string_pretty(concepts)?,
374            serde_json::to_string_pretty(relations)?,
375            order
376        );
377
378        let response = self.llm_client.complete(&prompt).await?;
379        let steps: Vec<RebuildStep> =
380            serde_json::from_str(&response).context("Failed to parse rebuild steps")?;
381
382        Ok(steps)
383    }
384
385    async fn generate_core_purpose(
386        &self,
387        project_name: &str,
388        concepts: &[EssentialConcept],
389    ) -> Result<String> {
390        debug!("  Generating core purpose for {}", project_name);
391        let prompt = format!(
392            "In one clear sentence, what is the core purpose of {} based on these essential concepts:\n\n{}\n\n\
393             Focus on the fundamental problem it solves, not implementation details.",
394            project_name,
395            serde_json::to_string_pretty(concepts)?
396        );
397
398        self.llm_client.complete(&prompt).await
399    }
400
401    async fn publish_to_s3(
402        &self,
403        bucket: &str,
404        key: &str,
405        output: &FirstPrinciplesOutput,
406    ) -> Result<String> {
407        let json_content = serde_json::to_string_pretty(output)?;
408
409        self.s3_client
410            .put_object()
411            .bucket(bucket)
412            .key(key)
413            .body(json_content.into_bytes().into())
414            .content_type("application/json")
415            .acl(ObjectCannedAcl::Private)
416            .send()
417            .await
418            .context("Failed to upload to S3")?;
419
420        Ok(format!("s3://{}/{}", bucket, key))
421    }
422}
423
424#[derive(Debug)]
425struct ProcessedUnit {
426    abstractions: Vec<Abstraction>,
427    relationships: Vec<Relationship>,
428}
429
430async fn process_file_collection(
431    collection: FileCollection,
432    llm_client: Arc<LLMClient>,
433) -> Result<ProcessedUnit> {
434    // Extract abstractions
435    let abstraction_identifier = AbstractionIdentifier::new(llm_client.clone());
436    let abstractions = abstraction_identifier
437        .identify_abstractions(&collection)
438        .await?;
439
440    // Analyze relationships
441    let relationship_analyzer = RelationshipAnalyzer::new(llm_client);
442    let relationships = relationship_analyzer
443        .analyze_relationships(&abstractions)
444        .await?;
445
446    Ok(ProcessedUnit {
447        abstractions,
448        relationships,
449    })
450}
451
452// Convenience function for single-call usage
453pub async fn analyze_repository(
454    repo_path: &Path,
455    s3_bucket: &str,
456    s3_key: &str,
457    llm_api_key: String,
458    workers: Option<usize>,
459) -> Result<String> {
460    // Initialize logging if not already done
461    let _ = env_logger::try_init();
462
463    let pipeline =
464        UnifiedAnalysisPipeline::new(llm_api_key, workers.unwrap_or(num_cpus::get())).await?;
465
466    pipeline
467        .analyze_and_publish(repo_path, s3_bucket, s3_key)
468        .await
469}