Skip to main content

infigraph_confluence/
template.rs

1use regex::Regex;
2use serde_json::Value;
3
4#[derive(Debug, Clone, Default)]
5pub struct PipelineRecord {
6    pub id: String,
7    pub name: String,
8    pub doc_id: String,
9    pub source_systems: String,
10    pub dest_tables: String,
11    pub scheduler_type: String,
12    pub scheduler_config: String,
13    pub compliance: String,
14    pub github_repo: String,
15    pub daci: String,
16    pub idempotent: String,
17    pub business_logic_summary: String,
18    pub data_quality: String,
19    pub dependencies_upstream: String,
20    pub dependencies_downstream: String,
21}
22
23#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
24pub enum Section {
25    Overview,
26    Source,
27    Destination,
28    Compliance,
29    Scheduler,
30    Daci,
31    DataQuality,
32    BusinessLogic,
33    Idempotency,
34    GithubRepo,
35    Dependencies,
36}
37
38struct SectionMatcher {
39    keywords: Vec<(&'static str, Section)>,
40    aliases: Vec<(&'static str, Section)>,
41}
42
43impl SectionMatcher {
44    fn new() -> Self {
45        Self {
46            keywords: vec![
47                ("overview", Section::Overview),
48                ("source system", Section::Source),
49                ("source systems", Section::Source),
50                ("destination", Section::Destination),
51                ("dest table", Section::Destination),
52                ("destination table", Section::Destination),
53                ("compliance", Section::Compliance),
54                ("data classification", Section::Compliance),
55                ("scheduler", Section::Scheduler),
56                ("daci", Section::Daci),
57                ("data quality", Section::DataQuality),
58                ("business logic", Section::BusinessLogic),
59                ("idempotency", Section::Idempotency),
60                ("idempotent", Section::Idempotency),
61                ("github repo", Section::GithubRepo),
62                ("github", Section::GithubRepo),
63                ("dependenc", Section::Dependencies),
64            ],
65            aliases: vec![
66                ("bpp job", Section::Scheduler),
67                ("design description", Section::BusinessLogic),
68                ("error handling", Section::Idempotency),
69                ("security", Section::Compliance),
70                ("s3 partitioning", Section::Destination),
71                ("target layer", Section::Destination),
72                ("architecture", Section::Overview),
73            ],
74        }
75    }
76
77    fn classify(&self, heading: &str) -> Option<Section> {
78        let lower = heading.to_lowercase();
79
80        for &(alias, section) in &self.aliases {
81            if lower.contains(alias) {
82                return Some(section);
83            }
84        }
85
86        for &(keyword, section) in &self.keywords {
87            if lower.contains(keyword) {
88                return Some(section);
89            }
90        }
91
92        None
93    }
94}
95
96#[derive(Debug)]
97struct SectionContent {
98    section: Section,
99    _heading: String,
100    text: String,
101}
102
103pub fn parse_pipeline_template(content: &str, title: &str, doc_id: &str) -> Option<PipelineRecord> {
104    let heading_re = Regex::new(r"(?m)^(#{1,6})\s+(.+)$").unwrap();
105    let matcher = SectionMatcher::new();
106
107    let mut sections: Vec<SectionContent> = Vec::new();
108
109    let heading_positions: Vec<(usize, usize, String)> = heading_re
110        .captures_iter(content)
111        .map(|cap| {
112            let m = cap.get(0).unwrap();
113            let level = cap[1].len();
114            let text = cap[2].trim().to_string();
115            (m.start(), level, text)
116        })
117        .collect();
118
119    for i in 0..heading_positions.len() {
120        let (start, level, ref heading) = heading_positions[i];
121
122        if let Some(section) = matcher.classify(heading) {
123            let end = heading_positions[i + 1..]
124                .iter()
125                .find(|(_, l, _)| *l <= level)
126                .map(|(s, _, _)| *s)
127                .unwrap_or(content.len());
128
129            let body_start = content[start..].find('\n').map(|p| start + p + 1).unwrap_or(end);
130            let body = content[body_start..end].trim().to_string();
131            sections.push(SectionContent {
132                section,
133                _heading: heading.clone(),
134                text: body,
135            });
136        }
137    }
138
139    if sections.is_empty() {
140        return None;
141    }
142
143    let mut record = PipelineRecord {
144        id: format!("pipeline::{}", doc_id),
145        name: title.to_string(),
146        doc_id: doc_id.to_string(),
147        ..Default::default()
148    };
149
150    for sc in &sections {
151        match sc.section {
152            Section::Source => {
153                record.source_systems = extract_sources(&sc.text);
154            }
155            Section::Destination => {
156                record.dest_tables = extract_tables(&sc.text);
157            }
158            Section::Scheduler => {
159                let (stype, config) = extract_scheduler(&sc.text);
160                record.scheduler_type = stype;
161                record.scheduler_config = config;
162            }
163            Section::Compliance => {
164                record.compliance = summarize_section(&sc.text, 500);
165            }
166            Section::GithubRepo => {
167                record.github_repo = extract_github_repo(&sc.text);
168            }
169            Section::Daci => {
170                record.daci = extract_daci(&sc.text);
171            }
172            Section::Idempotency => {
173                record.idempotent = summarize_section(&sc.text, 300);
174            }
175            Section::BusinessLogic => {
176                record.business_logic_summary = summarize_section(&sc.text, 500);
177            }
178            Section::DataQuality => {
179                record.data_quality = summarize_section(&sc.text, 300);
180            }
181            Section::Dependencies => {
182                let (up, down) = extract_dependencies(&sc.text);
183                if !up.is_empty() {
184                    record.dependencies_upstream = up;
185                }
186                if !down.is_empty() {
187                    record.dependencies_downstream = down;
188                }
189            }
190            Section::Overview => {}
191        }
192    }
193
194    Some(record)
195}
196
197fn extract_sources(text: &str) -> String {
198    let mut sources = Vec::new();
199
200    let table_re = Regex::new(r"(?i)(?:table|schema|database|source)[:\s]*[`]?(\w+\.\w+(?:\.\w+)?)[`]?").unwrap();
201    for cap in table_re.captures_iter(text) {
202        sources.push(cap[1].to_string());
203    }
204
205    let qualified_re = Regex::new(r"\b(\w+_(?:src|dm|rpt|raw|stg)\.\w+)\b").unwrap();
206    for cap in qualified_re.captures_iter(text) {
207        let t = cap[1].to_string();
208        if !sources.contains(&t) {
209            sources.push(t);
210        }
211    }
212
213    if sources.is_empty() {
214        summarize_section(text, 200)
215    } else {
216        sources.join(", ")
217    }
218}
219
220fn extract_tables(text: &str) -> String {
221    let mut tables = Vec::new();
222
223    let heading_table_re = Regex::new(r"(?i)(?:^|\n)#{1,6}\s+(?:Table[:\s]*)?[`]?(\w+\.\w+(?:\.\w+)?)[`]?").unwrap();
224    for cap in heading_table_re.captures_iter(text) {
225        let t = cap[1].to_string();
226        if !tables.contains(&t) {
227            tables.push(t);
228        }
229    }
230
231    let qualified_re = Regex::new(r"\b(\w+_(?:dm|rpt|src|raw|stg|mart)\.\w+)\b").unwrap();
232    for cap in qualified_re.captures_iter(text) {
233        let t = cap[1].to_string();
234        if !tables.contains(&t) {
235            tables.push(t);
236        }
237    }
238
239    let backtick_re = Regex::new(r"`(\w+\.\w+(?:\.\w+)?)`").unwrap();
240    for cap in backtick_re.captures_iter(text) {
241        let t = cap[1].to_string();
242        if !tables.contains(&t) {
243            tables.push(t);
244        }
245    }
246
247    if tables.is_empty() {
248        summarize_section(text, 200)
249    } else {
250        tables.join(", ")
251    }
252}
253
254fn extract_scheduler(text: &str) -> (String, String) {
255    let lower = text.to_lowercase();
256    let stype = if lower.contains("bpp") || lower.contains("batch processing") {
257        "BPP".to_string()
258    } else if lower.contains("airflow") {
259        "Airflow".to_string()
260    } else if lower.contains("cron") {
261        "Cron".to_string()
262    } else if lower.contains("quicketl") || lower.contains("quick_etl") {
263        "QuickETL".to_string()
264    } else {
265        "unknown".to_string()
266    };
267
268    let config = summarize_section(text, 300);
269    (stype, config)
270}
271
272fn extract_github_repo(text: &str) -> String {
273    let url_re = Regex::new(r"https?://(?:github\.com|github\.intuit\.com)/[^\s\)]+").unwrap();
274    if let Some(m) = url_re.find(text) {
275        return m.as_str().to_string();
276    }
277    let repo_re = Regex::new(r"(?i)(?:repo|repository)[:\s]*[`]?([a-zA-Z0-9_/-]+)[`]?").unwrap();
278    if let Some(cap) = repo_re.captures(text) {
279        return cap[1].to_string();
280    }
281    text.lines()
282        .map(|l| l.trim())
283        .find(|l| !l.is_empty())
284        .unwrap_or("")
285        .to_string()
286}
287
288fn extract_daci(text: &str) -> String {
289    let mut parts = Vec::new();
290    let role_re = Regex::new(r"(?im)^\s*\**\s*(Driver|Approver|Contributor|Informed|Accountable)[:\s*]*(.+)$").unwrap();
291    for cap in role_re.captures_iter(text) {
292        parts.push(format!("{}: {}", &cap[1], cap[2].trim()));
293    }
294    if parts.is_empty() {
295        summarize_section(text, 200)
296    } else {
297        parts.join("; ")
298    }
299}
300
301fn extract_dependencies(text: &str) -> (String, String) {
302    let mut upstream = Vec::new();
303    let mut downstream = Vec::new();
304    let mut current_section = "";
305
306    for line in text.lines() {
307        let trimmed = line.trim();
308        let lower = trimmed.to_lowercase();
309        if lower.contains("upstream") && (trimmed.starts_with('#') || trimmed.starts_with("**")) {
310            current_section = "up";
311            continue;
312        }
313        if lower.contains("downstream") && (trimmed.starts_with('#') || trimmed.starts_with("**")) {
314            current_section = "down";
315            continue;
316        }
317        if trimmed.is_empty() || trimmed.starts_with('#') {
318            continue;
319        }
320        if trimmed.starts_with("|--") || trimmed.starts_with("| --") || trimmed.chars().all(|c| c == '-' || c == '|' || c == ' ') {
321            continue;
322        }
323        if trimmed.starts_with('|') && (lower.contains("dependency") || lower.contains("owner") || lower.contains("sla")) {
324            continue;
325        }
326        if trimmed.starts_with('|') {
327            let cells: Vec<&str> = trimmed.split('|')
328                .map(|c| c.trim())
329                .filter(|c| !c.is_empty())
330                .collect();
331            if let Some(first) = cells.first() {
332                let item = first.to_string();
333                if !item.is_empty() {
334                    match current_section {
335                        "up" => upstream.push(item),
336                        "down" => downstream.push(item),
337                        _ => {}
338                    }
339                }
340            }
341            continue;
342        }
343        let item = trimmed.trim_start_matches(['-', '*', '•', ' ']);
344        if item.is_empty() {
345            continue;
346        }
347        match current_section {
348            "up" => upstream.push(item.to_string()),
349            "down" => downstream.push(item.to_string()),
350            _ => {}
351        }
352    }
353
354    (upstream.join(", "), downstream.join(", "))
355}
356
357fn summarize_section(text: &str, max_chars: usize) -> String {
358    let cleaned: String = text
359        .lines()
360        .map(|l| l.trim())
361        .filter(|l| !l.is_empty())
362        .collect::<Vec<_>>()
363        .join(" ");
364    if cleaned.len() <= max_chars {
365        cleaned
366    } else {
367        let boundary = cleaned.floor_char_boundary(max_chars);
368        format!("{}...", &cleaned[..boundary])
369    }
370}
371
372fn needs_llm_fallback(record: &PipelineRecord) -> Vec<&'static str> {
373    let mut missing = Vec::new();
374    if record.source_systems.is_empty() { missing.push("source_systems"); }
375    if record.dest_tables.is_empty() { missing.push("dest_tables"); }
376    if record.scheduler_type.is_empty() || record.scheduler_type == "unknown" { missing.push("scheduler_type"); }
377    if record.github_repo.is_empty() { missing.push("github_repo"); }
378    if record.daci.is_empty() { missing.push("daci"); }
379    if record.business_logic_summary.is_empty() { missing.push("business_logic_summary"); }
380    missing
381}
382
383fn build_extraction_prompt(content: &str, title: &str, missing_fields: &[&str]) -> String {
384    let fields_desc: Vec<&str> = missing_fields.iter().map(|f| match *f {
385        "source_systems" => "source_systems: comma-separated list of source tables/systems (e.g. tax_dm.fact_tax_w2_metric, commerce_profile)",
386        "dest_tables" => "dest_tables: comma-separated list of destination/target tables (e.g. tax_rpt.rpt_marketing_attributes)",
387        "scheduler_type" => "scheduler_type: one of BPP, Airflow, Cron, QuickETL, or unknown",
388        "github_repo" => "github_repo: GitHub repository URL or name",
389        "daci" => "daci: Driver, Approver, Contributors, Informed roles and names",
390        "business_logic_summary" => "business_logic_summary: 1-2 sentence summary of the pipeline's business logic",
391        _ => "",
392    }).filter(|s| !s.is_empty()).collect();
393
394    format!(
395        "Extract the following fields from this pipeline design document. Return ONLY a JSON object with the requested fields. No explanation.\n\nFields needed:\n{}\n\nDocument title: {}\n\nDocument content (truncated):\n{}",
396        fields_desc.join("\n"),
397        title,
398        &content[..content.len().min(6000)]
399    )
400}
401
402fn call_claude_extract(prompt: &str) -> Option<Value> {
403    let api_key = std::env::var("ANTHROPIC_API_KEY").ok()?;
404    let model = std::env::var("INFIGRAPH_LLM_MODEL")
405        .unwrap_or_else(|_| "claude-sonnet-4-20250514".to_string());
406    let base_url = std::env::var("INFIGRAPH_LLM_BASE_URL")
407        .unwrap_or_else(|_| "https://api.anthropic.com".to_string());
408
409    let body = serde_json::json!({
410        "model": model,
411        "max_tokens": 1024,
412        "messages": [{"role": "user", "content": prompt}],
413    });
414
415    let resp = ureq::post(&format!("{}/v1/messages", base_url))
416        .set("x-api-key", &api_key)
417        .set("anthropic-version", "2023-06-01")
418        .set("content-type", "application/json")
419        .send_string(&body.to_string())
420        .ok()?;
421
422    let resp_body: Value = resp.into_json().ok()?;
423    let text = resp_body["content"]
424        .as_array()
425        .and_then(|arr| arr.first())
426        .and_then(|block| block["text"].as_str())?;
427
428    let json_str = if let Some(start) = text.find('{') {
429        if let Some(end) = text.rfind('}') {
430            &text[start..=end]
431        } else {
432            text
433        }
434    } else {
435        text
436    };
437
438    serde_json::from_str(json_str).ok()
439}
440
441pub fn fill_with_llm(record: &mut PipelineRecord, content: &str, title: &str) -> usize {
442    if std::env::var("INFIGRAPH_LLM_EXTRACT").is_err() {
443        return 0;
444    }
445
446    let missing = needs_llm_fallback(record);
447    if missing.is_empty() {
448        return 0;
449    }
450
451    let prompt = build_extraction_prompt(content, title, &missing);
452    let Some(json) = call_claude_extract(&prompt) else {
453        eprintln!("LLM extraction failed for pipeline '{}' (missing: {})", title, missing.join(", "));
454        return 0;
455    };
456
457    let mut filled = 0;
458    for field in &missing {
459        if let Some(val) = json.get(field).and_then(|v| v.as_str()) {
460            if val.is_empty() { continue; }
461            match *field {
462                "source_systems" => { record.source_systems = val.to_string(); filled += 1; }
463                "dest_tables" => { record.dest_tables = val.to_string(); filled += 1; }
464                "scheduler_type" => { record.scheduler_type = val.to_string(); filled += 1; }
465                "github_repo" => { record.github_repo = val.to_string(); filled += 1; }
466                "daci" => { record.daci = val.to_string(); filled += 1; }
467                "business_logic_summary" => { record.business_logic_summary = val.to_string(); filled += 1; }
468                _ => {}
469            }
470        }
471    }
472    filled
473}
474
475#[cfg(test)]
476mod tests {
477    use super::*;
478
479    fn minimal_pipeline_content() -> &'static str {
480        r#"# Pipeline: Mystery ETL
481
482## Overview
483This pipeline does something.
484
485## Source System Details
486Data comes from various internal systems via shared drives.
487
488## Destination Tables
489Output goes to the data lake.
490
491## Scheduler
492Runs nightly.
493
494## DACI
495Team owns it.
496
497## Business Logic
498Complex transformation logic.
499
500## Dependencies
501### Upstream
502- system_a
503### Downstream
504- system_b
505"#
506    }
507
508    fn full_pipeline_content() -> &'static str {
509        r#"# Pipeline: W2 Metrics
510
511## Overview
512W2 metrics pipeline.
513
514## Source System Details
515Source table: `tax_src.raw_w2_data`
516Schema: tax_dm.fact_w2_metric
517
518## Destination Tables
519### tax_rpt.rpt_w2_summary
520Destination for W2 summary data.
521
522## Compliance
523PII — SSN masked. Data classification: Restricted.
524
525## Scheduler
526BPP job runs daily at 2am UTC. Job name: `w2_metrics_daily`.
527
528## DACI
529**Driver:** Alice
530**Approver:** Bob
531**Contributor:** Charlie, Dave
532**Informed:** Eve
533
534## Business Logic
535Aggregates W2 forms by employer EIN, computes YTD totals, applies withholding rules per IRS pub 15.
536
537## Github Repo
538https://github.intuit.com/tax-data/w2-metrics-pipeline
539
540## Dependencies
541### Upstream
542| Dependency | Owner | SLA |
543|---|---|---|
544| tax_src.raw_w2_data | Tax Ingestion | 1am UTC |
545| ref_data.employer_dim | MDM | hourly |
546### Downstream
547| Dependency | Owner | SLA |
548|---|---|---|
549| tax_rpt.executive_dashboard | BI Team | 6am UTC |
550"#
551    }
552
553    #[test]
554    fn test_parse_full_pipeline_all_fields_extracted() {
555        let content = full_pipeline_content();
556        let record = parse_pipeline_template(content, "W2 Metrics", "doc::w2").unwrap();
557
558        assert!(record.source_systems.contains("tax_src.raw_w2_data"));
559        assert!(record.source_systems.contains("tax_dm.fact_w2_metric"));
560        assert!(record.dest_tables.contains("tax_rpt.rpt_w2_summary"));
561        assert_eq!(record.scheduler_type, "BPP");
562        assert!(record.scheduler_config.contains("daily"));
563        assert!(record.github_repo.contains("github.intuit.com"));
564        assert!(record.daci.contains("Alice"));
565        assert!(record.daci.contains("Bob"));
566        assert!(!record.business_logic_summary.is_empty());
567        assert!(!record.compliance.is_empty());
568
569        let missing = needs_llm_fallback(&record);
570        assert!(missing.is_empty(), "Full pipeline should have no missing fields, got: {:?}", missing);
571    }
572
573    #[test]
574    fn test_parse_minimal_pipeline_identifies_missing_fields() {
575        let content = minimal_pipeline_content();
576        let record = parse_pipeline_template(content, "Mystery ETL", "doc::mystery").unwrap();
577
578        assert_eq!(record.name, "Mystery ETL");
579        assert_eq!(record.doc_id, "doc::mystery");
580
581        assert!(!record.source_systems.is_empty(), "source_systems gets prose summary fallback");
582        assert!(!record.dest_tables.is_empty(), "dest_tables gets prose summary fallback");
583        assert!(!record.daci.is_empty(), "daci gets prose summary fallback");
584
585        let missing = needs_llm_fallback(&record);
586        assert!(missing.contains(&"scheduler_type"), "scheduler_type should be 'unknown' → needs LLM");
587        assert!(missing.contains(&"github_repo"), "github_repo should be empty — no section matched");
588    }
589
590    #[test]
591    fn test_truly_empty_fields_trigger_llm_fallback() {
592        let record = PipelineRecord {
593            id: "pipeline::test".to_string(),
594            name: "Test".to_string(),
595            doc_id: "doc::test".to_string(),
596            source_systems: String::new(),
597            dest_tables: String::new(),
598            scheduler_type: "unknown".to_string(),
599            github_repo: String::new(),
600            daci: String::new(),
601            business_logic_summary: String::new(),
602            ..Default::default()
603        };
604
605        let missing = needs_llm_fallback(&record);
606        assert_eq!(missing.len(), 6, "All 6 fields should be flagged: {:?}", missing);
607    }
608
609    #[test]
610    fn test_fill_with_llm_gated_by_env_var() {
611        std::env::remove_var("INFIGRAPH_LLM_EXTRACT");
612
613        let content = minimal_pipeline_content();
614        let mut record = parse_pipeline_template(content, "Mystery ETL", "doc::mystery").unwrap();
615
616        let missing_before = needs_llm_fallback(&record);
617        assert!(!missing_before.is_empty(), "Should have missing fields");
618
619        let filled = fill_with_llm(&mut record, content, "Mystery ETL");
620        assert_eq!(filled, 0, "Should return 0 when INFIGRAPH_LLM_EXTRACT not set");
621
622        let missing_after = needs_llm_fallback(&record);
623        assert_eq!(missing_before, missing_after, "Fields should be unchanged when env var not set");
624    }
625
626    #[test]
627    fn test_fill_with_llm_no_op_when_all_fields_present() {
628        std::env::set_var("INFIGRAPH_LLM_EXTRACT", "1");
629
630        let content = full_pipeline_content();
631        let mut record = parse_pipeline_template(content, "W2 Metrics", "doc::w2").unwrap();
632
633        let filled = fill_with_llm(&mut record, content, "W2 Metrics");
634        assert_eq!(filled, 0, "Should return 0 when no fields are missing");
635
636        std::env::remove_var("INFIGRAPH_LLM_EXTRACT");
637    }
638
639    #[test]
640    fn test_dependency_table_extraction() {
641        let dep_text = r#"### Upstream
642| Dependency | Owner | SLA |
643|---|---|---|
644| tax_src.raw_w2_data | Tax Ingestion | 1am UTC |
645| ref_data.employer_dim | MDM | hourly |
646### Downstream
647| Dependency | Owner | SLA |
648|---|---|---|
649| tax_rpt.executive_dashboard | BI Team | 6am UTC |
650"#;
651        let (up, down) = extract_dependencies(dep_text);
652        assert!(up.contains("tax_src.raw_w2_data"), "upstream should contain tax_src.raw_w2_data, got: {}", up);
653        assert!(up.contains("ref_data.employer_dim"), "upstream should contain ref_data.employer_dim, got: {}", up);
654        assert!(down.contains("tax_rpt.executive_dashboard"), "downstream should contain tax_rpt.executive_dashboard, got: {}", down);
655    }
656
657    #[test]
658    fn test_dependency_bullet_extraction() {
659        let dep_text = r#"### Upstream
660- system_alpha
661- system_beta
662### Downstream
663- consumer_one
664"#;
665        let (up, down) = extract_dependencies(dep_text);
666        assert!(up.contains("system_alpha"));
667        assert!(up.contains("system_beta"));
668        assert!(down.contains("consumer_one"));
669    }
670
671    #[test]
672    fn test_build_extraction_prompt_includes_only_missing() {
673        let missing = vec!["source_systems", "github_repo"];
674        let prompt = build_extraction_prompt("doc content here", "Test Pipeline", &missing);
675        assert!(prompt.contains("source_systems"));
676        assert!(prompt.contains("github_repo"));
677        assert!(!prompt.contains("scheduler_type"), "Should not include non-missing fields");
678        assert!(!prompt.contains("daci"), "Should not include non-missing fields");
679    }
680
681    #[test]
682    fn test_no_sections_returns_none() {
683        let content = "Just some plain text with no headings at all.";
684        let result = parse_pipeline_template(content, "Empty", "doc::empty");
685        assert!(result.is_none(), "Should return None when no sections found");
686    }
687}