Skip to main content

infigraph_confluence/
template.rs

1use regex::Regex;
2use serde_json::Value;
3
4#[derive(Debug, Clone, Default)]
5pub struct PipelineRecord {
6    pub id: String,
7    pub name: String,
8    pub doc_id: String,
9    pub source_systems: String,
10    pub dest_tables: String,
11    pub scheduler_type: String,
12    pub scheduler_config: String,
13    pub compliance: String,
14    pub github_repo: String,
15    pub daci: String,
16    pub idempotent: String,
17    pub business_logic_summary: String,
18    pub data_quality: String,
19    pub dependencies_upstream: String,
20    pub dependencies_downstream: String,
21}
22
23#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
24pub enum Section {
25    Overview,
26    Source,
27    Destination,
28    Compliance,
29    Scheduler,
30    Daci,
31    DataQuality,
32    BusinessLogic,
33    Idempotency,
34    GithubRepo,
35    Dependencies,
36}
37
38struct SectionMatcher {
39    keywords: Vec<(&'static str, Section)>,
40    aliases: Vec<(&'static str, Section)>,
41}
42
43impl SectionMatcher {
44    fn new() -> Self {
45        Self {
46            keywords: vec![
47                ("overview", Section::Overview),
48                ("source system", Section::Source),
49                ("source systems", Section::Source),
50                ("destination", Section::Destination),
51                ("dest table", Section::Destination),
52                ("destination table", Section::Destination),
53                ("compliance", Section::Compliance),
54                ("data classification", Section::Compliance),
55                ("scheduler", Section::Scheduler),
56                ("daci", Section::Daci),
57                ("data quality", Section::DataQuality),
58                ("business logic", Section::BusinessLogic),
59                ("idempotency", Section::Idempotency),
60                ("idempotent", Section::Idempotency),
61                ("github repo", Section::GithubRepo),
62                ("github", Section::GithubRepo),
63                ("dependenc", Section::Dependencies),
64            ],
65            aliases: vec![
66                ("bpp job", Section::Scheduler),
67                ("design description", Section::BusinessLogic),
68                ("error handling", Section::Idempotency),
69                ("security", Section::Compliance),
70                ("s3 partitioning", Section::Destination),
71                ("target layer", Section::Destination),
72                ("architecture", Section::Overview),
73            ],
74        }
75    }
76
77    fn classify(&self, heading: &str) -> Option<Section> {
78        let lower = heading.to_lowercase();
79
80        for &(alias, section) in &self.aliases {
81            if lower.contains(alias) {
82                return Some(section);
83            }
84        }
85
86        for &(keyword, section) in &self.keywords {
87            if lower.contains(keyword) {
88                return Some(section);
89            }
90        }
91
92        None
93    }
94}
95
96#[derive(Debug)]
97struct SectionContent {
98    section: Section,
99    _heading: String,
100    text: String,
101}
102
103pub fn parse_pipeline_template(content: &str, title: &str, doc_id: &str) -> Option<PipelineRecord> {
104    let heading_re = Regex::new(r"(?m)^(#{1,6})\s+(.+)$").unwrap();
105    let matcher = SectionMatcher::new();
106
107    let mut sections: Vec<SectionContent> = Vec::new();
108
109    let heading_positions: Vec<(usize, usize, String)> = heading_re
110        .captures_iter(content)
111        .map(|cap| {
112            let m = cap.get(0).unwrap();
113            let level = cap[1].len();
114            let text = cap[2].trim().to_string();
115            (m.start(), level, text)
116        })
117        .collect();
118
119    for i in 0..heading_positions.len() {
120        let (start, level, ref heading) = heading_positions[i];
121
122        if let Some(section) = matcher.classify(heading) {
123            let end = heading_positions[i + 1..]
124                .iter()
125                .find(|(_, l, _)| *l <= level)
126                .map(|(s, _, _)| *s)
127                .unwrap_or(content.len());
128
129            let body_start = content[start..]
130                .find('\n')
131                .map(|p| start + p + 1)
132                .unwrap_or(end);
133            let body = content[body_start..end].trim().to_string();
134            sections.push(SectionContent {
135                section,
136                _heading: heading.clone(),
137                text: body,
138            });
139        }
140    }
141
142    if sections.is_empty() {
143        return None;
144    }
145
146    let mut record = PipelineRecord {
147        id: format!("pipeline::{}", doc_id),
148        name: title.to_string(),
149        doc_id: doc_id.to_string(),
150        ..Default::default()
151    };
152
153    for sc in &sections {
154        match sc.section {
155            Section::Source => {
156                record.source_systems = extract_sources(&sc.text);
157            }
158            Section::Destination => {
159                record.dest_tables = extract_tables(&sc.text);
160            }
161            Section::Scheduler => {
162                let (stype, config) = extract_scheduler(&sc.text);
163                record.scheduler_type = stype;
164                record.scheduler_config = config;
165            }
166            Section::Compliance => {
167                record.compliance = summarize_section(&sc.text, 500);
168            }
169            Section::GithubRepo => {
170                record.github_repo = extract_github_repo(&sc.text);
171            }
172            Section::Daci => {
173                record.daci = extract_daci(&sc.text);
174            }
175            Section::Idempotency => {
176                record.idempotent = summarize_section(&sc.text, 300);
177            }
178            Section::BusinessLogic => {
179                record.business_logic_summary = summarize_section(&sc.text, 500);
180            }
181            Section::DataQuality => {
182                record.data_quality = summarize_section(&sc.text, 300);
183            }
184            Section::Dependencies => {
185                let (up, down) = extract_dependencies(&sc.text);
186                if !up.is_empty() {
187                    record.dependencies_upstream = up;
188                }
189                if !down.is_empty() {
190                    record.dependencies_downstream = down;
191                }
192            }
193            Section::Overview => {}
194        }
195    }
196
197    Some(record)
198}
199
200fn extract_sources(text: &str) -> String {
201    let mut sources = Vec::new();
202
203    let table_re =
204        Regex::new(r"(?i)(?:table|schema|database|source)[:\s]*[`]?(\w+\.\w+(?:\.\w+)?)[`]?")
205            .unwrap();
206    for cap in table_re.captures_iter(text) {
207        sources.push(cap[1].to_string());
208    }
209
210    let qualified_re = Regex::new(r"\b(\w+_(?:src|dm|rpt|raw|stg)\.\w+)\b").unwrap();
211    for cap in qualified_re.captures_iter(text) {
212        let t = cap[1].to_string();
213        if !sources.contains(&t) {
214            sources.push(t);
215        }
216    }
217
218    if sources.is_empty() {
219        summarize_section(text, 200)
220    } else {
221        sources.join(", ")
222    }
223}
224
225fn extract_tables(text: &str) -> String {
226    let mut tables = Vec::new();
227
228    let heading_table_re =
229        Regex::new(r"(?i)(?:^|\n)#{1,6}\s+(?:Table[:\s]*)?[`]?(\w+\.\w+(?:\.\w+)?)[`]?").unwrap();
230    for cap in heading_table_re.captures_iter(text) {
231        let t = cap[1].to_string();
232        if !tables.contains(&t) {
233            tables.push(t);
234        }
235    }
236
237    let qualified_re = Regex::new(r"\b(\w+_(?:dm|rpt|src|raw|stg|mart)\.\w+)\b").unwrap();
238    for cap in qualified_re.captures_iter(text) {
239        let t = cap[1].to_string();
240        if !tables.contains(&t) {
241            tables.push(t);
242        }
243    }
244
245    let backtick_re = Regex::new(r"`(\w+\.\w+(?:\.\w+)?)`").unwrap();
246    for cap in backtick_re.captures_iter(text) {
247        let t = cap[1].to_string();
248        if !tables.contains(&t) {
249            tables.push(t);
250        }
251    }
252
253    if tables.is_empty() {
254        summarize_section(text, 200)
255    } else {
256        tables.join(", ")
257    }
258}
259
260fn extract_scheduler(text: &str) -> (String, String) {
261    let lower = text.to_lowercase();
262    let stype = if lower.contains("bpp") || lower.contains("batch processing") {
263        "BPP".to_string()
264    } else if lower.contains("airflow") {
265        "Airflow".to_string()
266    } else if lower.contains("cron") {
267        "Cron".to_string()
268    } else if lower.contains("quicketl") || lower.contains("quick_etl") {
269        "QuickETL".to_string()
270    } else {
271        "unknown".to_string()
272    };
273
274    let config = summarize_section(text, 300);
275    (stype, config)
276}
277
278fn extract_github_repo(text: &str) -> String {
279    let url_re = Regex::new(r"https?://(?:github\.com|github\.intuit\.com)/[^\s\)]+").unwrap();
280    if let Some(m) = url_re.find(text) {
281        return m.as_str().to_string();
282    }
283    let repo_re = Regex::new(r"(?i)(?:repo|repository)[:\s]*[`]?([a-zA-Z0-9_/-]+)[`]?").unwrap();
284    if let Some(cap) = repo_re.captures(text) {
285        return cap[1].to_string();
286    }
287    text.lines()
288        .map(|l| l.trim())
289        .find(|l| !l.is_empty())
290        .unwrap_or("")
291        .to_string()
292}
293
294fn extract_daci(text: &str) -> String {
295    let mut parts = Vec::new();
296    let role_re = Regex::new(
297        r"(?im)^\s*\**\s*(Driver|Approver|Contributor|Informed|Accountable)[:\s*]*(.+)$",
298    )
299    .unwrap();
300    for cap in role_re.captures_iter(text) {
301        parts.push(format!("{}: {}", &cap[1], cap[2].trim()));
302    }
303    if parts.is_empty() {
304        summarize_section(text, 200)
305    } else {
306        parts.join("; ")
307    }
308}
309
310fn extract_dependencies(text: &str) -> (String, String) {
311    let mut upstream = Vec::new();
312    let mut downstream = Vec::new();
313    let mut current_section = "";
314
315    for line in text.lines() {
316        let trimmed = line.trim();
317        let lower = trimmed.to_lowercase();
318        if lower.contains("upstream") && (trimmed.starts_with('#') || trimmed.starts_with("**")) {
319            current_section = "up";
320            continue;
321        }
322        if lower.contains("downstream") && (trimmed.starts_with('#') || trimmed.starts_with("**")) {
323            current_section = "down";
324            continue;
325        }
326        if trimmed.is_empty() || trimmed.starts_with('#') {
327            continue;
328        }
329        if trimmed.starts_with("|--")
330            || trimmed.starts_with("| --")
331            || trimmed.chars().all(|c| c == '-' || c == '|' || c == ' ')
332        {
333            continue;
334        }
335        if trimmed.starts_with('|')
336            && (lower.contains("dependency") || lower.contains("owner") || lower.contains("sla"))
337        {
338            continue;
339        }
340        if trimmed.starts_with('|') {
341            let cells: Vec<&str> = trimmed
342                .split('|')
343                .map(|c| c.trim())
344                .filter(|c| !c.is_empty())
345                .collect();
346            if let Some(first) = cells.first() {
347                let item = first.to_string();
348                if !item.is_empty() {
349                    match current_section {
350                        "up" => upstream.push(item),
351                        "down" => downstream.push(item),
352                        _ => {}
353                    }
354                }
355            }
356            continue;
357        }
358        let item = trimmed.trim_start_matches(['-', '*', '•', ' ']);
359        if item.is_empty() {
360            continue;
361        }
362        match current_section {
363            "up" => upstream.push(item.to_string()),
364            "down" => downstream.push(item.to_string()),
365            _ => {}
366        }
367    }
368
369    (upstream.join(", "), downstream.join(", "))
370}
371
372fn summarize_section(text: &str, max_chars: usize) -> String {
373    let cleaned: String = text
374        .lines()
375        .map(|l| l.trim())
376        .filter(|l| !l.is_empty())
377        .collect::<Vec<_>>()
378        .join(" ");
379    if cleaned.len() <= max_chars {
380        cleaned
381    } else {
382        let boundary = cleaned.floor_char_boundary(max_chars);
383        format!("{}...", &cleaned[..boundary])
384    }
385}
386
387fn needs_llm_fallback(record: &PipelineRecord) -> Vec<&'static str> {
388    let mut missing = Vec::new();
389    if record.source_systems.is_empty() {
390        missing.push("source_systems");
391    }
392    if record.dest_tables.is_empty() {
393        missing.push("dest_tables");
394    }
395    if record.scheduler_type.is_empty() || record.scheduler_type == "unknown" {
396        missing.push("scheduler_type");
397    }
398    if record.github_repo.is_empty() {
399        missing.push("github_repo");
400    }
401    if record.daci.is_empty() {
402        missing.push("daci");
403    }
404    if record.business_logic_summary.is_empty() {
405        missing.push("business_logic_summary");
406    }
407    missing
408}
409
410fn build_extraction_prompt(content: &str, title: &str, missing_fields: &[&str]) -> String {
411    let fields_desc: Vec<&str> = missing_fields.iter().map(|f| match *f {
412        "source_systems" => "source_systems: comma-separated list of source tables/systems (e.g. tax_dm.fact_tax_w2_metric, commerce_profile)",
413        "dest_tables" => "dest_tables: comma-separated list of destination/target tables (e.g. tax_rpt.rpt_marketing_attributes)",
414        "scheduler_type" => "scheduler_type: one of BPP, Airflow, Cron, QuickETL, or unknown",
415        "github_repo" => "github_repo: GitHub repository URL or name",
416        "daci" => "daci: Driver, Approver, Contributors, Informed roles and names",
417        "business_logic_summary" => "business_logic_summary: 1-2 sentence summary of the pipeline's business logic",
418        _ => "",
419    }).filter(|s| !s.is_empty()).collect();
420
421    format!(
422        "Extract the following fields from this pipeline design document. Return ONLY a JSON object with the requested fields. No explanation.\n\nFields needed:\n{}\n\nDocument title: {}\n\nDocument content (truncated):\n{}",
423        fields_desc.join("\n"),
424        title,
425        &content[..content.len().min(6000)]
426    )
427}
428
429fn call_claude_extract(prompt: &str) -> Option<Value> {
430    let api_key = std::env::var("ANTHROPIC_API_KEY").ok()?;
431    let model = std::env::var("INFIGRAPH_LLM_MODEL")
432        .unwrap_or_else(|_| "claude-sonnet-4-20250514".to_string());
433    let base_url = std::env::var("INFIGRAPH_LLM_BASE_URL")
434        .unwrap_or_else(|_| "https://api.anthropic.com".to_string());
435
436    let body = serde_json::json!({
437        "model": model,
438        "max_tokens": 1024,
439        "messages": [{"role": "user", "content": prompt}],
440    });
441
442    let resp = ureq::post(&format!("{}/v1/messages", base_url))
443        .set("x-api-key", &api_key)
444        .set("anthropic-version", "2023-06-01")
445        .set("content-type", "application/json")
446        .send_string(&body.to_string())
447        .ok()?;
448
449    let resp_body: Value = resp.into_json().ok()?;
450    let text = resp_body["content"]
451        .as_array()
452        .and_then(|arr| arr.first())
453        .and_then(|block| block["text"].as_str())?;
454
455    let json_str = if let Some(start) = text.find('{') {
456        if let Some(end) = text.rfind('}') {
457            &text[start..=end]
458        } else {
459            text
460        }
461    } else {
462        text
463    };
464
465    serde_json::from_str(json_str).ok()
466}
467
468pub fn fill_with_llm(record: &mut PipelineRecord, content: &str, title: &str) -> usize {
469    if std::env::var("INFIGRAPH_LLM_EXTRACT").is_err() {
470        return 0;
471    }
472
473    let missing = needs_llm_fallback(record);
474    if missing.is_empty() {
475        return 0;
476    }
477
478    let prompt = build_extraction_prompt(content, title, &missing);
479    let Some(json) = call_claude_extract(&prompt) else {
480        eprintln!(
481            "LLM extraction failed for pipeline '{}' (missing: {})",
482            title,
483            missing.join(", ")
484        );
485        return 0;
486    };
487
488    let mut filled = 0;
489    for field in &missing {
490        if let Some(val) = json.get(field).and_then(|v| v.as_str()) {
491            if val.is_empty() {
492                continue;
493            }
494            match *field {
495                "source_systems" => {
496                    record.source_systems = val.to_string();
497                    filled += 1;
498                }
499                "dest_tables" => {
500                    record.dest_tables = val.to_string();
501                    filled += 1;
502                }
503                "scheduler_type" => {
504                    record.scheduler_type = val.to_string();
505                    filled += 1;
506                }
507                "github_repo" => {
508                    record.github_repo = val.to_string();
509                    filled += 1;
510                }
511                "daci" => {
512                    record.daci = val.to_string();
513                    filled += 1;
514                }
515                "business_logic_summary" => {
516                    record.business_logic_summary = val.to_string();
517                    filled += 1;
518                }
519                _ => {}
520            }
521        }
522    }
523    filled
524}
525
526#[cfg(test)]
527mod tests {
528    use super::*;
529
530    fn minimal_pipeline_content() -> &'static str {
531        r#"# Pipeline: Mystery ETL
532
533## Overview
534This pipeline does something.
535
536## Source System Details
537Data comes from various internal systems via shared drives.
538
539## Destination Tables
540Output goes to the data lake.
541
542## Scheduler
543Runs nightly.
544
545## DACI
546Team owns it.
547
548## Business Logic
549Complex transformation logic.
550
551## Dependencies
552### Upstream
553- system_a
554### Downstream
555- system_b
556"#
557    }
558
559    fn full_pipeline_content() -> &'static str {
560        r#"# Pipeline: W2 Metrics
561
562## Overview
563W2 metrics pipeline.
564
565## Source System Details
566Source table: `tax_src.raw_w2_data`
567Schema: tax_dm.fact_w2_metric
568
569## Destination Tables
570### tax_rpt.rpt_w2_summary
571Destination for W2 summary data.
572
573## Compliance
574PII — SSN masked. Data classification: Restricted.
575
576## Scheduler
577BPP job runs daily at 2am UTC. Job name: `w2_metrics_daily`.
578
579## DACI
580**Driver:** Alice
581**Approver:** Bob
582**Contributor:** Charlie, Dave
583**Informed:** Eve
584
585## Business Logic
586Aggregates W2 forms by employer EIN, computes YTD totals, applies withholding rules per IRS pub 15.
587
588## Github Repo
589https://github.intuit.com/tax-data/w2-metrics-pipeline
590
591## Dependencies
592### Upstream
593| Dependency | Owner | SLA |
594|---|---|---|
595| tax_src.raw_w2_data | Tax Ingestion | 1am UTC |
596| ref_data.employer_dim | MDM | hourly |
597### Downstream
598| Dependency | Owner | SLA |
599|---|---|---|
600| tax_rpt.executive_dashboard | BI Team | 6am UTC |
601"#
602    }
603
604    #[test]
605    fn test_parse_full_pipeline_all_fields_extracted() {
606        let content = full_pipeline_content();
607        let record = parse_pipeline_template(content, "W2 Metrics", "doc::w2").unwrap();
608
609        assert!(record.source_systems.contains("tax_src.raw_w2_data"));
610        assert!(record.source_systems.contains("tax_dm.fact_w2_metric"));
611        assert!(record.dest_tables.contains("tax_rpt.rpt_w2_summary"));
612        assert_eq!(record.scheduler_type, "BPP");
613        assert!(record.scheduler_config.contains("daily"));
614        assert!(record.github_repo.contains("github.intuit.com"));
615        assert!(record.daci.contains("Alice"));
616        assert!(record.daci.contains("Bob"));
617        assert!(!record.business_logic_summary.is_empty());
618        assert!(!record.compliance.is_empty());
619
620        let missing = needs_llm_fallback(&record);
621        assert!(
622            missing.is_empty(),
623            "Full pipeline should have no missing fields, got: {:?}",
624            missing
625        );
626    }
627
628    #[test]
629    fn test_parse_minimal_pipeline_identifies_missing_fields() {
630        let content = minimal_pipeline_content();
631        let record = parse_pipeline_template(content, "Mystery ETL", "doc::mystery").unwrap();
632
633        assert_eq!(record.name, "Mystery ETL");
634        assert_eq!(record.doc_id, "doc::mystery");
635
636        assert!(
637            !record.source_systems.is_empty(),
638            "source_systems gets prose summary fallback"
639        );
640        assert!(
641            !record.dest_tables.is_empty(),
642            "dest_tables gets prose summary fallback"
643        );
644        assert!(!record.daci.is_empty(), "daci gets prose summary fallback");
645
646        let missing = needs_llm_fallback(&record);
647        assert!(
648            missing.contains(&"scheduler_type"),
649            "scheduler_type should be 'unknown' → needs LLM"
650        );
651        assert!(
652            missing.contains(&"github_repo"),
653            "github_repo should be empty — no section matched"
654        );
655    }
656
657    #[test]
658    fn test_truly_empty_fields_trigger_llm_fallback() {
659        let record = PipelineRecord {
660            id: "pipeline::test".to_string(),
661            name: "Test".to_string(),
662            doc_id: "doc::test".to_string(),
663            source_systems: String::new(),
664            dest_tables: String::new(),
665            scheduler_type: "unknown".to_string(),
666            github_repo: String::new(),
667            daci: String::new(),
668            business_logic_summary: String::new(),
669            ..Default::default()
670        };
671
672        let missing = needs_llm_fallback(&record);
673        assert_eq!(
674            missing.len(),
675            6,
676            "All 6 fields should be flagged: {:?}",
677            missing
678        );
679    }
680
681    #[test]
682    fn test_fill_with_llm_gated_by_env_var() {
683        std::env::remove_var("INFIGRAPH_LLM_EXTRACT");
684
685        let content = minimal_pipeline_content();
686        let mut record = parse_pipeline_template(content, "Mystery ETL", "doc::mystery").unwrap();
687
688        let missing_before = needs_llm_fallback(&record);
689        assert!(!missing_before.is_empty(), "Should have missing fields");
690
691        let filled = fill_with_llm(&mut record, content, "Mystery ETL");
692        assert_eq!(
693            filled, 0,
694            "Should return 0 when INFIGRAPH_LLM_EXTRACT not set"
695        );
696
697        let missing_after = needs_llm_fallback(&record);
698        assert_eq!(
699            missing_before, missing_after,
700            "Fields should be unchanged when env var not set"
701        );
702    }
703
704    #[test]
705    fn test_fill_with_llm_no_op_when_all_fields_present() {
706        std::env::set_var("INFIGRAPH_LLM_EXTRACT", "1");
707
708        let content = full_pipeline_content();
709        let mut record = parse_pipeline_template(content, "W2 Metrics", "doc::w2").unwrap();
710
711        let filled = fill_with_llm(&mut record, content, "W2 Metrics");
712        assert_eq!(filled, 0, "Should return 0 when no fields are missing");
713
714        std::env::remove_var("INFIGRAPH_LLM_EXTRACT");
715    }
716
717    #[test]
718    fn test_dependency_table_extraction() {
719        let dep_text = r#"### Upstream
720| Dependency | Owner | SLA |
721|---|---|---|
722| tax_src.raw_w2_data | Tax Ingestion | 1am UTC |
723| ref_data.employer_dim | MDM | hourly |
724### Downstream
725| Dependency | Owner | SLA |
726|---|---|---|
727| tax_rpt.executive_dashboard | BI Team | 6am UTC |
728"#;
729        let (up, down) = extract_dependencies(dep_text);
730        assert!(
731            up.contains("tax_src.raw_w2_data"),
732            "upstream should contain tax_src.raw_w2_data, got: {}",
733            up
734        );
735        assert!(
736            up.contains("ref_data.employer_dim"),
737            "upstream should contain ref_data.employer_dim, got: {}",
738            up
739        );
740        assert!(
741            down.contains("tax_rpt.executive_dashboard"),
742            "downstream should contain tax_rpt.executive_dashboard, got: {}",
743            down
744        );
745    }
746
747    #[test]
748    fn test_dependency_bullet_extraction() {
749        let dep_text = r#"### Upstream
750- system_alpha
751- system_beta
752### Downstream
753- consumer_one
754"#;
755        let (up, down) = extract_dependencies(dep_text);
756        assert!(up.contains("system_alpha"));
757        assert!(up.contains("system_beta"));
758        assert!(down.contains("consumer_one"));
759    }
760
761    #[test]
762    fn test_build_extraction_prompt_includes_only_missing() {
763        let missing = vec!["source_systems", "github_repo"];
764        let prompt = build_extraction_prompt("doc content here", "Test Pipeline", &missing);
765        assert!(prompt.contains("source_systems"));
766        assert!(prompt.contains("github_repo"));
767        assert!(
768            !prompt.contains("scheduler_type"),
769            "Should not include non-missing fields"
770        );
771        assert!(
772            !prompt.contains("daci"),
773            "Should not include non-missing fields"
774        );
775    }
776
777    #[test]
778    fn test_no_sections_returns_none() {
779        let content = "Just some plain text with no headings at all.";
780        let result = parse_pipeline_template(content, "Empty", "doc::empty");
781        assert!(
782            result.is_none(),
783            "Should return None when no sections found"
784        );
785    }
786
787    #[test]
788    fn test_extract_sources_basic() {
789        let text = "Source table: `analytics_src.user_events`\nSchema: billing_dm.invoice_lines";
790        let result = extract_sources(text);
791        assert!(
792            result.contains("analytics_src.user_events"),
793            "got: {}",
794            result
795        );
796        assert!(
797            result.contains("billing_dm.invoice_lines"),
798            "got: {}",
799            result
800        );
801    }
802
803    #[test]
804    fn test_extract_tables_basic() {
805        let text =
806            "### reporting_rpt.daily_summary\nSummary table.\n\nAlso uses `warehouse.dim_date`";
807        let result = extract_tables(text);
808        assert!(
809            result.contains("reporting_rpt.daily_summary"),
810            "got: {}",
811            result
812        );
813        assert!(result.contains("warehouse.dim_date"), "got: {}", result);
814    }
815
816    #[test]
817    fn test_extract_scheduler_airflow() {
818        let text = "Scheduled via Airflow DAG `etl_daily_dag`, runs at 3am UTC.";
819        let (stype, config) = extract_scheduler(text);
820        assert_eq!(stype, "Airflow");
821        assert!(config.contains("3am UTC"), "got: {}", config);
822    }
823
824    #[test]
825    fn test_extract_scheduler_none() {
826        let text = "Runs on an internal platform with no specific tooling mentioned.";
827        let (stype, _config) = extract_scheduler(text);
828        assert_eq!(stype, "unknown");
829    }
830
831    #[test]
832    fn test_extract_daci_basic() {
833        let text = "**Driver:** Jane\n**Approver:** John\n**Contributor:** Sarah, Mike\n**Informed:** Ops Team";
834        let result = extract_daci(text);
835        assert!(result.contains("Driver: Jane"), "got: {}", result);
836        assert!(result.contains("Approver: John"), "got: {}", result);
837        assert!(
838            result.contains("Contributor: Sarah, Mike"),
839            "got: {}",
840            result
841        );
842        assert!(result.contains("Informed: Ops Team"), "got: {}", result);
843    }
844
845    #[test]
846    fn test_extract_github_repo() {
847        let text = "Code lives at https://github.com/org/data-pipeline and is reviewed weekly.";
848        let result = extract_github_repo(text);
849        assert_eq!(result, "https://github.com/org/data-pipeline");
850    }
851
852    #[test]
853    fn test_needs_llm_fallback_complete() {
854        let record = PipelineRecord {
855            id: "pipeline::full".to_string(),
856            name: "Full".to_string(),
857            doc_id: "doc::full".to_string(),
858            source_systems: "src.table_a".to_string(),
859            dest_tables: "dst.table_b".to_string(),
860            scheduler_type: "Airflow".to_string(),
861            scheduler_config: "daily".to_string(),
862            github_repo: "https://github.com/org/repo".to_string(),
863            daci: "Driver: X".to_string(),
864            business_logic_summary: "Aggregates data.".to_string(),
865            ..Default::default()
866        };
867        let missing = needs_llm_fallback(&record);
868        assert!(
869            missing.is_empty(),
870            "expected no missing, got: {:?}",
871            missing
872        );
873    }
874
875    #[test]
876    fn test_needs_llm_fallback_missing_fields() {
877        let record = PipelineRecord {
878            id: "pipeline::empty".to_string(),
879            name: "Empty".to_string(),
880            doc_id: "doc::empty".to_string(),
881            ..Default::default()
882        };
883        let missing = needs_llm_fallback(&record);
884        assert!(missing.contains(&"source_systems"));
885        assert!(missing.contains(&"dest_tables"));
886        assert!(missing.contains(&"github_repo"));
887        assert!(missing.contains(&"daci"));
888        assert!(missing.contains(&"business_logic_summary"));
889    }
890
891    #[test]
892    fn test_parse_pipeline_template_empty_html() {
893        let content = "# Empty Pipeline\n\n## Overview\nNothing here.\n";
894        let result = parse_pipeline_template(content, "Empty", "doc::e");
895        match result {
896            Some(r) => {
897                assert_eq!(r.name, "Empty");
898                assert!(r.source_systems.is_empty());
899                assert!(r.dest_tables.is_empty());
900                assert!(r.github_repo.is_empty());
901            }
902            None => panic!("Expected Some with Overview section parsed"),
903        }
904    }
905
906    #[test]
907    fn test_crawl_options_custom_values() {
908        use crate::sync::CrawlOptions;
909
910        let opts = CrawlOptions {
911            follow_links: true,
912            follow_depth: 3,
913            max_pages: 50,
914            same_space_only: false,
915        };
916        assert_eq!(opts.follow_depth, 3);
917        assert_eq!(opts.max_pages, 50);
918        assert!(!opts.same_space_only);
919        assert!(opts.follow_links);
920
921        let default = CrawlOptions::default_follow();
922        assert_eq!(default.follow_depth, 1);
923        assert_eq!(default.max_pages, 100);
924
925        let no = CrawlOptions::no_follow();
926        assert!(!no.follow_links);
927        assert_eq!(no.follow_depth, 0);
928    }
929}