1use regex::Regex;
2use serde_json::Value;
3
4#[derive(Debug, Clone, Default)]
5pub struct PipelineRecord {
6 pub id: String,
7 pub name: String,
8 pub doc_id: String,
9 pub source_systems: String,
10 pub dest_tables: String,
11 pub scheduler_type: String,
12 pub scheduler_config: String,
13 pub compliance: String,
14 pub github_repo: String,
15 pub daci: String,
16 pub idempotent: String,
17 pub business_logic_summary: String,
18 pub data_quality: String,
19 pub dependencies_upstream: String,
20 pub dependencies_downstream: String,
21}
22
23#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
24pub enum Section {
25 Overview,
26 Source,
27 Destination,
28 Compliance,
29 Scheduler,
30 Daci,
31 DataQuality,
32 BusinessLogic,
33 Idempotency,
34 GithubRepo,
35 Dependencies,
36}
37
38struct SectionMatcher {
39 keywords: Vec<(&'static str, Section)>,
40 aliases: Vec<(&'static str, Section)>,
41}
42
43impl SectionMatcher {
44 fn new() -> Self {
45 Self {
46 keywords: vec![
47 ("overview", Section::Overview),
48 ("source system", Section::Source),
49 ("source systems", Section::Source),
50 ("destination", Section::Destination),
51 ("dest table", Section::Destination),
52 ("destination table", Section::Destination),
53 ("compliance", Section::Compliance),
54 ("data classification", Section::Compliance),
55 ("scheduler", Section::Scheduler),
56 ("daci", Section::Daci),
57 ("data quality", Section::DataQuality),
58 ("business logic", Section::BusinessLogic),
59 ("idempotency", Section::Idempotency),
60 ("idempotent", Section::Idempotency),
61 ("github repo", Section::GithubRepo),
62 ("github", Section::GithubRepo),
63 ("dependenc", Section::Dependencies),
64 ],
65 aliases: vec![
66 ("bpp job", Section::Scheduler),
67 ("design description", Section::BusinessLogic),
68 ("error handling", Section::Idempotency),
69 ("security", Section::Compliance),
70 ("s3 partitioning", Section::Destination),
71 ("target layer", Section::Destination),
72 ("architecture", Section::Overview),
73 ],
74 }
75 }
76
77 fn classify(&self, heading: &str) -> Option<Section> {
78 let lower = heading.to_lowercase();
79
80 for &(alias, section) in &self.aliases {
81 if lower.contains(alias) {
82 return Some(section);
83 }
84 }
85
86 for &(keyword, section) in &self.keywords {
87 if lower.contains(keyword) {
88 return Some(section);
89 }
90 }
91
92 None
93 }
94}
95
96#[derive(Debug)]
97struct SectionContent {
98 section: Section,
99 _heading: String,
100 text: String,
101}
102
103pub fn parse_pipeline_template(content: &str, title: &str, doc_id: &str) -> Option<PipelineRecord> {
104 let heading_re = Regex::new(r"(?m)^(#{1,6})\s+(.+)$").unwrap();
105 let matcher = SectionMatcher::new();
106
107 let mut sections: Vec<SectionContent> = Vec::new();
108
109 let heading_positions: Vec<(usize, usize, String)> = heading_re
110 .captures_iter(content)
111 .map(|cap| {
112 let m = cap.get(0).unwrap();
113 let level = cap[1].len();
114 let text = cap[2].trim().to_string();
115 (m.start(), level, text)
116 })
117 .collect();
118
119 for i in 0..heading_positions.len() {
120 let (start, level, ref heading) = heading_positions[i];
121
122 if let Some(section) = matcher.classify(heading) {
123 let end = heading_positions[i + 1..]
124 .iter()
125 .find(|(_, l, _)| *l <= level)
126 .map(|(s, _, _)| *s)
127 .unwrap_or(content.len());
128
129 let body_start = content[start..].find('\n').map(|p| start + p + 1).unwrap_or(end);
130 let body = content[body_start..end].trim().to_string();
131 sections.push(SectionContent {
132 section,
133 _heading: heading.clone(),
134 text: body,
135 });
136 }
137 }
138
139 if sections.is_empty() {
140 return None;
141 }
142
143 let mut record = PipelineRecord {
144 id: format!("pipeline::{}", doc_id),
145 name: title.to_string(),
146 doc_id: doc_id.to_string(),
147 ..Default::default()
148 };
149
150 for sc in §ions {
151 match sc.section {
152 Section::Source => {
153 record.source_systems = extract_sources(&sc.text);
154 }
155 Section::Destination => {
156 record.dest_tables = extract_tables(&sc.text);
157 }
158 Section::Scheduler => {
159 let (stype, config) = extract_scheduler(&sc.text);
160 record.scheduler_type = stype;
161 record.scheduler_config = config;
162 }
163 Section::Compliance => {
164 record.compliance = summarize_section(&sc.text, 500);
165 }
166 Section::GithubRepo => {
167 record.github_repo = extract_github_repo(&sc.text);
168 }
169 Section::Daci => {
170 record.daci = extract_daci(&sc.text);
171 }
172 Section::Idempotency => {
173 record.idempotent = summarize_section(&sc.text, 300);
174 }
175 Section::BusinessLogic => {
176 record.business_logic_summary = summarize_section(&sc.text, 500);
177 }
178 Section::DataQuality => {
179 record.data_quality = summarize_section(&sc.text, 300);
180 }
181 Section::Dependencies => {
182 let (up, down) = extract_dependencies(&sc.text);
183 if !up.is_empty() {
184 record.dependencies_upstream = up;
185 }
186 if !down.is_empty() {
187 record.dependencies_downstream = down;
188 }
189 }
190 Section::Overview => {}
191 }
192 }
193
194 Some(record)
195}
196
197fn extract_sources(text: &str) -> String {
198 let mut sources = Vec::new();
199
200 let table_re = Regex::new(r"(?i)(?:table|schema|database|source)[:\s]*[`]?(\w+\.\w+(?:\.\w+)?)[`]?").unwrap();
201 for cap in table_re.captures_iter(text) {
202 sources.push(cap[1].to_string());
203 }
204
205 let qualified_re = Regex::new(r"\b(\w+_(?:src|dm|rpt|raw|stg)\.\w+)\b").unwrap();
206 for cap in qualified_re.captures_iter(text) {
207 let t = cap[1].to_string();
208 if !sources.contains(&t) {
209 sources.push(t);
210 }
211 }
212
213 if sources.is_empty() {
214 summarize_section(text, 200)
215 } else {
216 sources.join(", ")
217 }
218}
219
220fn extract_tables(text: &str) -> String {
221 let mut tables = Vec::new();
222
223 let heading_table_re = Regex::new(r"(?i)(?:^|\n)#{1,6}\s+(?:Table[:\s]*)?[`]?(\w+\.\w+(?:\.\w+)?)[`]?").unwrap();
224 for cap in heading_table_re.captures_iter(text) {
225 let t = cap[1].to_string();
226 if !tables.contains(&t) {
227 tables.push(t);
228 }
229 }
230
231 let qualified_re = Regex::new(r"\b(\w+_(?:dm|rpt|src|raw|stg|mart)\.\w+)\b").unwrap();
232 for cap in qualified_re.captures_iter(text) {
233 let t = cap[1].to_string();
234 if !tables.contains(&t) {
235 tables.push(t);
236 }
237 }
238
239 let backtick_re = Regex::new(r"`(\w+\.\w+(?:\.\w+)?)`").unwrap();
240 for cap in backtick_re.captures_iter(text) {
241 let t = cap[1].to_string();
242 if !tables.contains(&t) {
243 tables.push(t);
244 }
245 }
246
247 if tables.is_empty() {
248 summarize_section(text, 200)
249 } else {
250 tables.join(", ")
251 }
252}
253
254fn extract_scheduler(text: &str) -> (String, String) {
255 let lower = text.to_lowercase();
256 let stype = if lower.contains("bpp") || lower.contains("batch processing") {
257 "BPP".to_string()
258 } else if lower.contains("airflow") {
259 "Airflow".to_string()
260 } else if lower.contains("cron") {
261 "Cron".to_string()
262 } else if lower.contains("quicketl") || lower.contains("quick_etl") {
263 "QuickETL".to_string()
264 } else {
265 "unknown".to_string()
266 };
267
268 let config = summarize_section(text, 300);
269 (stype, config)
270}
271
272fn extract_github_repo(text: &str) -> String {
273 let url_re = Regex::new(r"https?://(?:github\.com|github\.intuit\.com)/[^\s\)]+").unwrap();
274 if let Some(m) = url_re.find(text) {
275 return m.as_str().to_string();
276 }
277 let repo_re = Regex::new(r"(?i)(?:repo|repository)[:\s]*[`]?([a-zA-Z0-9_/-]+)[`]?").unwrap();
278 if let Some(cap) = repo_re.captures(text) {
279 return cap[1].to_string();
280 }
281 text.lines()
282 .map(|l| l.trim())
283 .find(|l| !l.is_empty())
284 .unwrap_or("")
285 .to_string()
286}
287
288fn extract_daci(text: &str) -> String {
289 let mut parts = Vec::new();
290 let role_re = Regex::new(r"(?im)^\s*\**\s*(Driver|Approver|Contributor|Informed|Accountable)[:\s*]*(.+)$").unwrap();
291 for cap in role_re.captures_iter(text) {
292 parts.push(format!("{}: {}", &cap[1], cap[2].trim()));
293 }
294 if parts.is_empty() {
295 summarize_section(text, 200)
296 } else {
297 parts.join("; ")
298 }
299}
300
301fn extract_dependencies(text: &str) -> (String, String) {
302 let mut upstream = Vec::new();
303 let mut downstream = Vec::new();
304 let mut current_section = "";
305
306 for line in text.lines() {
307 let trimmed = line.trim();
308 let lower = trimmed.to_lowercase();
309 if lower.contains("upstream") && (trimmed.starts_with('#') || trimmed.starts_with("**")) {
310 current_section = "up";
311 continue;
312 }
313 if lower.contains("downstream") && (trimmed.starts_with('#') || trimmed.starts_with("**")) {
314 current_section = "down";
315 continue;
316 }
317 if trimmed.is_empty() || trimmed.starts_with('#') {
318 continue;
319 }
320 if trimmed.starts_with("|--") || trimmed.starts_with("| --") || trimmed.chars().all(|c| c == '-' || c == '|' || c == ' ') {
321 continue;
322 }
323 if trimmed.starts_with('|') && (lower.contains("dependency") || lower.contains("owner") || lower.contains("sla")) {
324 continue;
325 }
326 if trimmed.starts_with('|') {
327 let cells: Vec<&str> = trimmed.split('|')
328 .map(|c| c.trim())
329 .filter(|c| !c.is_empty())
330 .collect();
331 if let Some(first) = cells.first() {
332 let item = first.to_string();
333 if !item.is_empty() {
334 match current_section {
335 "up" => upstream.push(item),
336 "down" => downstream.push(item),
337 _ => {}
338 }
339 }
340 }
341 continue;
342 }
343 let item = trimmed.trim_start_matches(['-', '*', '•', ' ']);
344 if item.is_empty() {
345 continue;
346 }
347 match current_section {
348 "up" => upstream.push(item.to_string()),
349 "down" => downstream.push(item.to_string()),
350 _ => {}
351 }
352 }
353
354 (upstream.join(", "), downstream.join(", "))
355}
356
357fn summarize_section(text: &str, max_chars: usize) -> String {
358 let cleaned: String = text
359 .lines()
360 .map(|l| l.trim())
361 .filter(|l| !l.is_empty())
362 .collect::<Vec<_>>()
363 .join(" ");
364 if cleaned.len() <= max_chars {
365 cleaned
366 } else {
367 let boundary = cleaned.floor_char_boundary(max_chars);
368 format!("{}...", &cleaned[..boundary])
369 }
370}
371
372fn needs_llm_fallback(record: &PipelineRecord) -> Vec<&'static str> {
373 let mut missing = Vec::new();
374 if record.source_systems.is_empty() { missing.push("source_systems"); }
375 if record.dest_tables.is_empty() { missing.push("dest_tables"); }
376 if record.scheduler_type.is_empty() || record.scheduler_type == "unknown" { missing.push("scheduler_type"); }
377 if record.github_repo.is_empty() { missing.push("github_repo"); }
378 if record.daci.is_empty() { missing.push("daci"); }
379 if record.business_logic_summary.is_empty() { missing.push("business_logic_summary"); }
380 missing
381}
382
383fn build_extraction_prompt(content: &str, title: &str, missing_fields: &[&str]) -> String {
384 let fields_desc: Vec<&str> = missing_fields.iter().map(|f| match *f {
385 "source_systems" => "source_systems: comma-separated list of source tables/systems (e.g. tax_dm.fact_tax_w2_metric, commerce_profile)",
386 "dest_tables" => "dest_tables: comma-separated list of destination/target tables (e.g. tax_rpt.rpt_marketing_attributes)",
387 "scheduler_type" => "scheduler_type: one of BPP, Airflow, Cron, QuickETL, or unknown",
388 "github_repo" => "github_repo: GitHub repository URL or name",
389 "daci" => "daci: Driver, Approver, Contributors, Informed roles and names",
390 "business_logic_summary" => "business_logic_summary: 1-2 sentence summary of the pipeline's business logic",
391 _ => "",
392 }).filter(|s| !s.is_empty()).collect();
393
394 format!(
395 "Extract the following fields from this pipeline design document. Return ONLY a JSON object with the requested fields. No explanation.\n\nFields needed:\n{}\n\nDocument title: {}\n\nDocument content (truncated):\n{}",
396 fields_desc.join("\n"),
397 title,
398 &content[..content.len().min(6000)]
399 )
400}
401
402fn call_claude_extract(prompt: &str) -> Option<Value> {
403 let api_key = std::env::var("ANTHROPIC_API_KEY").ok()?;
404 let model = std::env::var("INFIGRAPH_LLM_MODEL")
405 .unwrap_or_else(|_| "claude-sonnet-4-20250514".to_string());
406 let base_url = std::env::var("INFIGRAPH_LLM_BASE_URL")
407 .unwrap_or_else(|_| "https://api.anthropic.com".to_string());
408
409 let body = serde_json::json!({
410 "model": model,
411 "max_tokens": 1024,
412 "messages": [{"role": "user", "content": prompt}],
413 });
414
415 let resp = ureq::post(&format!("{}/v1/messages", base_url))
416 .set("x-api-key", &api_key)
417 .set("anthropic-version", "2023-06-01")
418 .set("content-type", "application/json")
419 .send_string(&body.to_string())
420 .ok()?;
421
422 let resp_body: Value = resp.into_json().ok()?;
423 let text = resp_body["content"]
424 .as_array()
425 .and_then(|arr| arr.first())
426 .and_then(|block| block["text"].as_str())?;
427
428 let json_str = if let Some(start) = text.find('{') {
429 if let Some(end) = text.rfind('}') {
430 &text[start..=end]
431 } else {
432 text
433 }
434 } else {
435 text
436 };
437
438 serde_json::from_str(json_str).ok()
439}
440
441pub fn fill_with_llm(record: &mut PipelineRecord, content: &str, title: &str) -> usize {
442 if std::env::var("INFIGRAPH_LLM_EXTRACT").is_err() {
443 return 0;
444 }
445
446 let missing = needs_llm_fallback(record);
447 if missing.is_empty() {
448 return 0;
449 }
450
451 let prompt = build_extraction_prompt(content, title, &missing);
452 let Some(json) = call_claude_extract(&prompt) else {
453 eprintln!("LLM extraction failed for pipeline '{}' (missing: {})", title, missing.join(", "));
454 return 0;
455 };
456
457 let mut filled = 0;
458 for field in &missing {
459 if let Some(val) = json.get(field).and_then(|v| v.as_str()) {
460 if val.is_empty() { continue; }
461 match *field {
462 "source_systems" => { record.source_systems = val.to_string(); filled += 1; }
463 "dest_tables" => { record.dest_tables = val.to_string(); filled += 1; }
464 "scheduler_type" => { record.scheduler_type = val.to_string(); filled += 1; }
465 "github_repo" => { record.github_repo = val.to_string(); filled += 1; }
466 "daci" => { record.daci = val.to_string(); filled += 1; }
467 "business_logic_summary" => { record.business_logic_summary = val.to_string(); filled += 1; }
468 _ => {}
469 }
470 }
471 }
472 filled
473}
474
475#[cfg(test)]
476mod tests {
477 use super::*;
478
479 fn minimal_pipeline_content() -> &'static str {
480 r#"# Pipeline: Mystery ETL
481
482## Overview
483This pipeline does something.
484
485## Source System Details
486Data comes from various internal systems via shared drives.
487
488## Destination Tables
489Output goes to the data lake.
490
491## Scheduler
492Runs nightly.
493
494## DACI
495Team owns it.
496
497## Business Logic
498Complex transformation logic.
499
500## Dependencies
501### Upstream
502- system_a
503### Downstream
504- system_b
505"#
506 }
507
508 fn full_pipeline_content() -> &'static str {
509 r#"# Pipeline: W2 Metrics
510
511## Overview
512W2 metrics pipeline.
513
514## Source System Details
515Source table: `tax_src.raw_w2_data`
516Schema: tax_dm.fact_w2_metric
517
518## Destination Tables
519### tax_rpt.rpt_w2_summary
520Destination for W2 summary data.
521
522## Compliance
523PII — SSN masked. Data classification: Restricted.
524
525## Scheduler
526BPP job runs daily at 2am UTC. Job name: `w2_metrics_daily`.
527
528## DACI
529**Driver:** Alice
530**Approver:** Bob
531**Contributor:** Charlie, Dave
532**Informed:** Eve
533
534## Business Logic
535Aggregates W2 forms by employer EIN, computes YTD totals, applies withholding rules per IRS pub 15.
536
537## Github Repo
538https://github.intuit.com/tax-data/w2-metrics-pipeline
539
540## Dependencies
541### Upstream
542| Dependency | Owner | SLA |
543|---|---|---|
544| tax_src.raw_w2_data | Tax Ingestion | 1am UTC |
545| ref_data.employer_dim | MDM | hourly |
546### Downstream
547| Dependency | Owner | SLA |
548|---|---|---|
549| tax_rpt.executive_dashboard | BI Team | 6am UTC |
550"#
551 }
552
553 #[test]
554 fn test_parse_full_pipeline_all_fields_extracted() {
555 let content = full_pipeline_content();
556 let record = parse_pipeline_template(content, "W2 Metrics", "doc::w2").unwrap();
557
558 assert!(record.source_systems.contains("tax_src.raw_w2_data"));
559 assert!(record.source_systems.contains("tax_dm.fact_w2_metric"));
560 assert!(record.dest_tables.contains("tax_rpt.rpt_w2_summary"));
561 assert_eq!(record.scheduler_type, "BPP");
562 assert!(record.scheduler_config.contains("daily"));
563 assert!(record.github_repo.contains("github.intuit.com"));
564 assert!(record.daci.contains("Alice"));
565 assert!(record.daci.contains("Bob"));
566 assert!(!record.business_logic_summary.is_empty());
567 assert!(!record.compliance.is_empty());
568
569 let missing = needs_llm_fallback(&record);
570 assert!(missing.is_empty(), "Full pipeline should have no missing fields, got: {:?}", missing);
571 }
572
573 #[test]
574 fn test_parse_minimal_pipeline_identifies_missing_fields() {
575 let content = minimal_pipeline_content();
576 let record = parse_pipeline_template(content, "Mystery ETL", "doc::mystery").unwrap();
577
578 assert_eq!(record.name, "Mystery ETL");
579 assert_eq!(record.doc_id, "doc::mystery");
580
581 assert!(!record.source_systems.is_empty(), "source_systems gets prose summary fallback");
582 assert!(!record.dest_tables.is_empty(), "dest_tables gets prose summary fallback");
583 assert!(!record.daci.is_empty(), "daci gets prose summary fallback");
584
585 let missing = needs_llm_fallback(&record);
586 assert!(missing.contains(&"scheduler_type"), "scheduler_type should be 'unknown' → needs LLM");
587 assert!(missing.contains(&"github_repo"), "github_repo should be empty — no section matched");
588 }
589
590 #[test]
591 fn test_truly_empty_fields_trigger_llm_fallback() {
592 let record = PipelineRecord {
593 id: "pipeline::test".to_string(),
594 name: "Test".to_string(),
595 doc_id: "doc::test".to_string(),
596 source_systems: String::new(),
597 dest_tables: String::new(),
598 scheduler_type: "unknown".to_string(),
599 github_repo: String::new(),
600 daci: String::new(),
601 business_logic_summary: String::new(),
602 ..Default::default()
603 };
604
605 let missing = needs_llm_fallback(&record);
606 assert_eq!(missing.len(), 6, "All 6 fields should be flagged: {:?}", missing);
607 }
608
609 #[test]
610 fn test_fill_with_llm_gated_by_env_var() {
611 std::env::remove_var("INFIGRAPH_LLM_EXTRACT");
612
613 let content = minimal_pipeline_content();
614 let mut record = parse_pipeline_template(content, "Mystery ETL", "doc::mystery").unwrap();
615
616 let missing_before = needs_llm_fallback(&record);
617 assert!(!missing_before.is_empty(), "Should have missing fields");
618
619 let filled = fill_with_llm(&mut record, content, "Mystery ETL");
620 assert_eq!(filled, 0, "Should return 0 when INFIGRAPH_LLM_EXTRACT not set");
621
622 let missing_after = needs_llm_fallback(&record);
623 assert_eq!(missing_before, missing_after, "Fields should be unchanged when env var not set");
624 }
625
626 #[test]
627 fn test_fill_with_llm_no_op_when_all_fields_present() {
628 std::env::set_var("INFIGRAPH_LLM_EXTRACT", "1");
629
630 let content = full_pipeline_content();
631 let mut record = parse_pipeline_template(content, "W2 Metrics", "doc::w2").unwrap();
632
633 let filled = fill_with_llm(&mut record, content, "W2 Metrics");
634 assert_eq!(filled, 0, "Should return 0 when no fields are missing");
635
636 std::env::remove_var("INFIGRAPH_LLM_EXTRACT");
637 }
638
639 #[test]
640 fn test_dependency_table_extraction() {
641 let dep_text = r#"### Upstream
642| Dependency | Owner | SLA |
643|---|---|---|
644| tax_src.raw_w2_data | Tax Ingestion | 1am UTC |
645| ref_data.employer_dim | MDM | hourly |
646### Downstream
647| Dependency | Owner | SLA |
648|---|---|---|
649| tax_rpt.executive_dashboard | BI Team | 6am UTC |
650"#;
651 let (up, down) = extract_dependencies(dep_text);
652 assert!(up.contains("tax_src.raw_w2_data"), "upstream should contain tax_src.raw_w2_data, got: {}", up);
653 assert!(up.contains("ref_data.employer_dim"), "upstream should contain ref_data.employer_dim, got: {}", up);
654 assert!(down.contains("tax_rpt.executive_dashboard"), "downstream should contain tax_rpt.executive_dashboard, got: {}", down);
655 }
656
657 #[test]
658 fn test_dependency_bullet_extraction() {
659 let dep_text = r#"### Upstream
660- system_alpha
661- system_beta
662### Downstream
663- consumer_one
664"#;
665 let (up, down) = extract_dependencies(dep_text);
666 assert!(up.contains("system_alpha"));
667 assert!(up.contains("system_beta"));
668 assert!(down.contains("consumer_one"));
669 }
670
671 #[test]
672 fn test_build_extraction_prompt_includes_only_missing() {
673 let missing = vec!["source_systems", "github_repo"];
674 let prompt = build_extraction_prompt("doc content here", "Test Pipeline", &missing);
675 assert!(prompt.contains("source_systems"));
676 assert!(prompt.contains("github_repo"));
677 assert!(!prompt.contains("scheduler_type"), "Should not include non-missing fields");
678 assert!(!prompt.contains("daci"), "Should not include non-missing fields");
679 }
680
681 #[test]
682 fn test_no_sections_returns_none() {
683 let content = "Just some plain text with no headings at all.";
684 let result = parse_pipeline_template(content, "Empty", "doc::empty");
685 assert!(result.is_none(), "Should return None when no sections found");
686 }
687}