1use regex::Regex;
2use serde_json::Value;
3
4#[derive(Debug, Clone, Default)]
5pub struct PipelineRecord {
6 pub id: String,
7 pub name: String,
8 pub doc_id: String,
9 pub source_systems: String,
10 pub dest_tables: String,
11 pub scheduler_type: String,
12 pub scheduler_config: String,
13 pub compliance: String,
14 pub github_repo: String,
15 pub daci: String,
16 pub idempotent: String,
17 pub business_logic_summary: String,
18 pub data_quality: String,
19 pub dependencies_upstream: String,
20 pub dependencies_downstream: String,
21}
22
23#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
24pub enum Section {
25 Overview,
26 Source,
27 Destination,
28 Compliance,
29 Scheduler,
30 Daci,
31 DataQuality,
32 BusinessLogic,
33 Idempotency,
34 GithubRepo,
35 Dependencies,
36}
37
38struct SectionMatcher {
39 keywords: Vec<(&'static str, Section)>,
40 aliases: Vec<(&'static str, Section)>,
41}
42
43impl SectionMatcher {
44 fn new() -> Self {
45 Self {
46 keywords: vec![
47 ("overview", Section::Overview),
48 ("source system", Section::Source),
49 ("source systems", Section::Source),
50 ("destination", Section::Destination),
51 ("dest table", Section::Destination),
52 ("destination table", Section::Destination),
53 ("compliance", Section::Compliance),
54 ("data classification", Section::Compliance),
55 ("scheduler", Section::Scheduler),
56 ("daci", Section::Daci),
57 ("data quality", Section::DataQuality),
58 ("business logic", Section::BusinessLogic),
59 ("idempotency", Section::Idempotency),
60 ("idempotent", Section::Idempotency),
61 ("github repo", Section::GithubRepo),
62 ("github", Section::GithubRepo),
63 ("dependenc", Section::Dependencies),
64 ],
65 aliases: vec![
66 ("bpp job", Section::Scheduler),
67 ("design description", Section::BusinessLogic),
68 ("error handling", Section::Idempotency),
69 ("security", Section::Compliance),
70 ("s3 partitioning", Section::Destination),
71 ("target layer", Section::Destination),
72 ("architecture", Section::Overview),
73 ],
74 }
75 }
76
77 fn classify(&self, heading: &str) -> Option<Section> {
78 let lower = heading.to_lowercase();
79
80 for &(alias, section) in &self.aliases {
81 if lower.contains(alias) {
82 return Some(section);
83 }
84 }
85
86 for &(keyword, section) in &self.keywords {
87 if lower.contains(keyword) {
88 return Some(section);
89 }
90 }
91
92 None
93 }
94}
95
96#[derive(Debug)]
97struct SectionContent {
98 section: Section,
99 _heading: String,
100 text: String,
101}
102
103pub fn parse_pipeline_template(content: &str, title: &str, doc_id: &str) -> Option<PipelineRecord> {
104 let heading_re = Regex::new(r"(?m)^(#{1,6})\s+(.+)$").unwrap();
105 let matcher = SectionMatcher::new();
106
107 let mut sections: Vec<SectionContent> = Vec::new();
108
109 let heading_positions: Vec<(usize, usize, String)> = heading_re
110 .captures_iter(content)
111 .map(|cap| {
112 let m = cap.get(0).unwrap();
113 let level = cap[1].len();
114 let text = cap[2].trim().to_string();
115 (m.start(), level, text)
116 })
117 .collect();
118
119 for i in 0..heading_positions.len() {
120 let (start, level, ref heading) = heading_positions[i];
121
122 if let Some(section) = matcher.classify(heading) {
123 let end = heading_positions[i + 1..]
124 .iter()
125 .find(|(_, l, _)| *l <= level)
126 .map(|(s, _, _)| *s)
127 .unwrap_or(content.len());
128
129 let body_start = content[start..]
130 .find('\n')
131 .map(|p| start + p + 1)
132 .unwrap_or(end);
133 let body = content[body_start..end].trim().to_string();
134 sections.push(SectionContent {
135 section,
136 _heading: heading.clone(),
137 text: body,
138 });
139 }
140 }
141
142 if sections.is_empty() {
143 return None;
144 }
145
146 let mut record = PipelineRecord {
147 id: format!("pipeline::{}", doc_id),
148 name: title.to_string(),
149 doc_id: doc_id.to_string(),
150 ..Default::default()
151 };
152
153 for sc in §ions {
154 match sc.section {
155 Section::Source => {
156 record.source_systems = extract_sources(&sc.text);
157 }
158 Section::Destination => {
159 record.dest_tables = extract_tables(&sc.text);
160 }
161 Section::Scheduler => {
162 let (stype, config) = extract_scheduler(&sc.text);
163 record.scheduler_type = stype;
164 record.scheduler_config = config;
165 }
166 Section::Compliance => {
167 record.compliance = summarize_section(&sc.text, 500);
168 }
169 Section::GithubRepo => {
170 record.github_repo = extract_github_repo(&sc.text);
171 }
172 Section::Daci => {
173 record.daci = extract_daci(&sc.text);
174 }
175 Section::Idempotency => {
176 record.idempotent = summarize_section(&sc.text, 300);
177 }
178 Section::BusinessLogic => {
179 record.business_logic_summary = summarize_section(&sc.text, 500);
180 }
181 Section::DataQuality => {
182 record.data_quality = summarize_section(&sc.text, 300);
183 }
184 Section::Dependencies => {
185 let (up, down) = extract_dependencies(&sc.text);
186 if !up.is_empty() {
187 record.dependencies_upstream = up;
188 }
189 if !down.is_empty() {
190 record.dependencies_downstream = down;
191 }
192 }
193 Section::Overview => {}
194 }
195 }
196
197 Some(record)
198}
199
200fn extract_sources(text: &str) -> String {
201 let mut sources = Vec::new();
202
203 let table_re =
204 Regex::new(r"(?i)(?:table|schema|database|source)[:\s]*[`]?(\w+\.\w+(?:\.\w+)?)[`]?")
205 .unwrap();
206 for cap in table_re.captures_iter(text) {
207 sources.push(cap[1].to_string());
208 }
209
210 let qualified_re = Regex::new(r"\b(\w+_(?:src|dm|rpt|raw|stg)\.\w+)\b").unwrap();
211 for cap in qualified_re.captures_iter(text) {
212 let t = cap[1].to_string();
213 if !sources.contains(&t) {
214 sources.push(t);
215 }
216 }
217
218 if sources.is_empty() {
219 summarize_section(text, 200)
220 } else {
221 sources.join(", ")
222 }
223}
224
225fn extract_tables(text: &str) -> String {
226 let mut tables = Vec::new();
227
228 let heading_table_re =
229 Regex::new(r"(?i)(?:^|\n)#{1,6}\s+(?:Table[:\s]*)?[`]?(\w+\.\w+(?:\.\w+)?)[`]?").unwrap();
230 for cap in heading_table_re.captures_iter(text) {
231 let t = cap[1].to_string();
232 if !tables.contains(&t) {
233 tables.push(t);
234 }
235 }
236
237 let qualified_re = Regex::new(r"\b(\w+_(?:dm|rpt|src|raw|stg|mart)\.\w+)\b").unwrap();
238 for cap in qualified_re.captures_iter(text) {
239 let t = cap[1].to_string();
240 if !tables.contains(&t) {
241 tables.push(t);
242 }
243 }
244
245 let backtick_re = Regex::new(r"`(\w+\.\w+(?:\.\w+)?)`").unwrap();
246 for cap in backtick_re.captures_iter(text) {
247 let t = cap[1].to_string();
248 if !tables.contains(&t) {
249 tables.push(t);
250 }
251 }
252
253 if tables.is_empty() {
254 summarize_section(text, 200)
255 } else {
256 tables.join(", ")
257 }
258}
259
260fn extract_scheduler(text: &str) -> (String, String) {
261 let lower = text.to_lowercase();
262 let stype = if lower.contains("bpp") || lower.contains("batch processing") {
263 "BPP".to_string()
264 } else if lower.contains("airflow") {
265 "Airflow".to_string()
266 } else if lower.contains("cron") {
267 "Cron".to_string()
268 } else if lower.contains("quicketl") || lower.contains("quick_etl") {
269 "QuickETL".to_string()
270 } else {
271 "unknown".to_string()
272 };
273
274 let config = summarize_section(text, 300);
275 (stype, config)
276}
277
278fn extract_github_repo(text: &str) -> String {
279 let url_re = Regex::new(r"https?://(?:github\.com|github\.intuit\.com)/[^\s\)]+").unwrap();
280 if let Some(m) = url_re.find(text) {
281 return m.as_str().to_string();
282 }
283 let repo_re = Regex::new(r"(?i)(?:repo|repository)[:\s]*[`]?([a-zA-Z0-9_/-]+)[`]?").unwrap();
284 if let Some(cap) = repo_re.captures(text) {
285 return cap[1].to_string();
286 }
287 text.lines()
288 .map(|l| l.trim())
289 .find(|l| !l.is_empty())
290 .unwrap_or("")
291 .to_string()
292}
293
294fn extract_daci(text: &str) -> String {
295 let mut parts = Vec::new();
296 let role_re = Regex::new(
297 r"(?im)^\s*\**\s*(Driver|Approver|Contributor|Informed|Accountable)[:\s*]*(.+)$",
298 )
299 .unwrap();
300 for cap in role_re.captures_iter(text) {
301 parts.push(format!("{}: {}", &cap[1], cap[2].trim()));
302 }
303 if parts.is_empty() {
304 summarize_section(text, 200)
305 } else {
306 parts.join("; ")
307 }
308}
309
310fn extract_dependencies(text: &str) -> (String, String) {
311 let mut upstream = Vec::new();
312 let mut downstream = Vec::new();
313 let mut current_section = "";
314
315 for line in text.lines() {
316 let trimmed = line.trim();
317 let lower = trimmed.to_lowercase();
318 if lower.contains("upstream") && (trimmed.starts_with('#') || trimmed.starts_with("**")) {
319 current_section = "up";
320 continue;
321 }
322 if lower.contains("downstream") && (trimmed.starts_with('#') || trimmed.starts_with("**")) {
323 current_section = "down";
324 continue;
325 }
326 if trimmed.is_empty() || trimmed.starts_with('#') {
327 continue;
328 }
329 if trimmed.starts_with("|--")
330 || trimmed.starts_with("| --")
331 || trimmed.chars().all(|c| c == '-' || c == '|' || c == ' ')
332 {
333 continue;
334 }
335 if trimmed.starts_with('|')
336 && (lower.contains("dependency") || lower.contains("owner") || lower.contains("sla"))
337 {
338 continue;
339 }
340 if trimmed.starts_with('|') {
341 let cells: Vec<&str> = trimmed
342 .split('|')
343 .map(|c| c.trim())
344 .filter(|c| !c.is_empty())
345 .collect();
346 if let Some(first) = cells.first() {
347 let item = first.to_string();
348 if !item.is_empty() {
349 match current_section {
350 "up" => upstream.push(item),
351 "down" => downstream.push(item),
352 _ => {}
353 }
354 }
355 }
356 continue;
357 }
358 let item = trimmed.trim_start_matches(['-', '*', '•', ' ']);
359 if item.is_empty() {
360 continue;
361 }
362 match current_section {
363 "up" => upstream.push(item.to_string()),
364 "down" => downstream.push(item.to_string()),
365 _ => {}
366 }
367 }
368
369 (upstream.join(", "), downstream.join(", "))
370}
371
372fn summarize_section(text: &str, max_chars: usize) -> String {
373 let cleaned: String = text
374 .lines()
375 .map(|l| l.trim())
376 .filter(|l| !l.is_empty())
377 .collect::<Vec<_>>()
378 .join(" ");
379 if cleaned.len() <= max_chars {
380 cleaned
381 } else {
382 let boundary = cleaned.floor_char_boundary(max_chars);
383 format!("{}...", &cleaned[..boundary])
384 }
385}
386
387fn needs_llm_fallback(record: &PipelineRecord) -> Vec<&'static str> {
388 let mut missing = Vec::new();
389 if record.source_systems.is_empty() {
390 missing.push("source_systems");
391 }
392 if record.dest_tables.is_empty() {
393 missing.push("dest_tables");
394 }
395 if record.scheduler_type.is_empty() || record.scheduler_type == "unknown" {
396 missing.push("scheduler_type");
397 }
398 if record.github_repo.is_empty() {
399 missing.push("github_repo");
400 }
401 if record.daci.is_empty() {
402 missing.push("daci");
403 }
404 if record.business_logic_summary.is_empty() {
405 missing.push("business_logic_summary");
406 }
407 missing
408}
409
410fn build_extraction_prompt(content: &str, title: &str, missing_fields: &[&str]) -> String {
411 let fields_desc: Vec<&str> = missing_fields.iter().map(|f| match *f {
412 "source_systems" => "source_systems: comma-separated list of source tables/systems (e.g. tax_dm.fact_tax_w2_metric, commerce_profile)",
413 "dest_tables" => "dest_tables: comma-separated list of destination/target tables (e.g. tax_rpt.rpt_marketing_attributes)",
414 "scheduler_type" => "scheduler_type: one of BPP, Airflow, Cron, QuickETL, or unknown",
415 "github_repo" => "github_repo: GitHub repository URL or name",
416 "daci" => "daci: Driver, Approver, Contributors, Informed roles and names",
417 "business_logic_summary" => "business_logic_summary: 1-2 sentence summary of the pipeline's business logic",
418 _ => "",
419 }).filter(|s| !s.is_empty()).collect();
420
421 format!(
422 "Extract the following fields from this pipeline design document. Return ONLY a JSON object with the requested fields. No explanation.\n\nFields needed:\n{}\n\nDocument title: {}\n\nDocument content (truncated):\n{}",
423 fields_desc.join("\n"),
424 title,
425 &content[..content.len().min(6000)]
426 )
427}
428
429fn call_claude_extract(prompt: &str) -> Option<Value> {
430 let api_key = std::env::var("ANTHROPIC_API_KEY").ok()?;
431 let model = std::env::var("INFIGRAPH_LLM_MODEL")
432 .unwrap_or_else(|_| "claude-sonnet-4-20250514".to_string());
433 let base_url = std::env::var("INFIGRAPH_LLM_BASE_URL")
434 .unwrap_or_else(|_| "https://api.anthropic.com".to_string());
435
436 let body = serde_json::json!({
437 "model": model,
438 "max_tokens": 1024,
439 "messages": [{"role": "user", "content": prompt}],
440 });
441
442 let resp = ureq::post(&format!("{}/v1/messages", base_url))
443 .set("x-api-key", &api_key)
444 .set("anthropic-version", "2023-06-01")
445 .set("content-type", "application/json")
446 .send_string(&body.to_string())
447 .ok()?;
448
449 let resp_body: Value = resp.into_json().ok()?;
450 let text = resp_body["content"]
451 .as_array()
452 .and_then(|arr| arr.first())
453 .and_then(|block| block["text"].as_str())?;
454
455 let json_str = if let Some(start) = text.find('{') {
456 if let Some(end) = text.rfind('}') {
457 &text[start..=end]
458 } else {
459 text
460 }
461 } else {
462 text
463 };
464
465 serde_json::from_str(json_str).ok()
466}
467
468pub fn fill_with_llm(record: &mut PipelineRecord, content: &str, title: &str) -> usize {
469 if std::env::var("INFIGRAPH_LLM_EXTRACT").is_err() {
470 return 0;
471 }
472
473 let missing = needs_llm_fallback(record);
474 if missing.is_empty() {
475 return 0;
476 }
477
478 let prompt = build_extraction_prompt(content, title, &missing);
479 let Some(json) = call_claude_extract(&prompt) else {
480 eprintln!(
481 "LLM extraction failed for pipeline '{}' (missing: {})",
482 title,
483 missing.join(", ")
484 );
485 return 0;
486 };
487
488 let mut filled = 0;
489 for field in &missing {
490 if let Some(val) = json.get(field).and_then(|v| v.as_str()) {
491 if val.is_empty() {
492 continue;
493 }
494 match *field {
495 "source_systems" => {
496 record.source_systems = val.to_string();
497 filled += 1;
498 }
499 "dest_tables" => {
500 record.dest_tables = val.to_string();
501 filled += 1;
502 }
503 "scheduler_type" => {
504 record.scheduler_type = val.to_string();
505 filled += 1;
506 }
507 "github_repo" => {
508 record.github_repo = val.to_string();
509 filled += 1;
510 }
511 "daci" => {
512 record.daci = val.to_string();
513 filled += 1;
514 }
515 "business_logic_summary" => {
516 record.business_logic_summary = val.to_string();
517 filled += 1;
518 }
519 _ => {}
520 }
521 }
522 }
523 filled
524}
525
526#[cfg(test)]
527mod tests {
528 use super::*;
529
530 fn minimal_pipeline_content() -> &'static str {
531 r#"# Pipeline: Mystery ETL
532
533## Overview
534This pipeline does something.
535
536## Source System Details
537Data comes from various internal systems via shared drives.
538
539## Destination Tables
540Output goes to the data lake.
541
542## Scheduler
543Runs nightly.
544
545## DACI
546Team owns it.
547
548## Business Logic
549Complex transformation logic.
550
551## Dependencies
552### Upstream
553- system_a
554### Downstream
555- system_b
556"#
557 }
558
559 fn full_pipeline_content() -> &'static str {
560 r#"# Pipeline: W2 Metrics
561
562## Overview
563W2 metrics pipeline.
564
565## Source System Details
566Source table: `tax_src.raw_w2_data`
567Schema: tax_dm.fact_w2_metric
568
569## Destination Tables
570### tax_rpt.rpt_w2_summary
571Destination for W2 summary data.
572
573## Compliance
574PII — SSN masked. Data classification: Restricted.
575
576## Scheduler
577BPP job runs daily at 2am UTC. Job name: `w2_metrics_daily`.
578
579## DACI
580**Driver:** Alice
581**Approver:** Bob
582**Contributor:** Charlie, Dave
583**Informed:** Eve
584
585## Business Logic
586Aggregates W2 forms by employer EIN, computes YTD totals, applies withholding rules per IRS pub 15.
587
588## Github Repo
589https://github.intuit.com/tax-data/w2-metrics-pipeline
590
591## Dependencies
592### Upstream
593| Dependency | Owner | SLA |
594|---|---|---|
595| tax_src.raw_w2_data | Tax Ingestion | 1am UTC |
596| ref_data.employer_dim | MDM | hourly |
597### Downstream
598| Dependency | Owner | SLA |
599|---|---|---|
600| tax_rpt.executive_dashboard | BI Team | 6am UTC |
601"#
602 }
603
604 #[test]
605 fn test_parse_full_pipeline_all_fields_extracted() {
606 let content = full_pipeline_content();
607 let record = parse_pipeline_template(content, "W2 Metrics", "doc::w2").unwrap();
608
609 assert!(record.source_systems.contains("tax_src.raw_w2_data"));
610 assert!(record.source_systems.contains("tax_dm.fact_w2_metric"));
611 assert!(record.dest_tables.contains("tax_rpt.rpt_w2_summary"));
612 assert_eq!(record.scheduler_type, "BPP");
613 assert!(record.scheduler_config.contains("daily"));
614 assert!(record.github_repo.contains("github.intuit.com"));
615 assert!(record.daci.contains("Alice"));
616 assert!(record.daci.contains("Bob"));
617 assert!(!record.business_logic_summary.is_empty());
618 assert!(!record.compliance.is_empty());
619
620 let missing = needs_llm_fallback(&record);
621 assert!(
622 missing.is_empty(),
623 "Full pipeline should have no missing fields, got: {:?}",
624 missing
625 );
626 }
627
628 #[test]
629 fn test_parse_minimal_pipeline_identifies_missing_fields() {
630 let content = minimal_pipeline_content();
631 let record = parse_pipeline_template(content, "Mystery ETL", "doc::mystery").unwrap();
632
633 assert_eq!(record.name, "Mystery ETL");
634 assert_eq!(record.doc_id, "doc::mystery");
635
636 assert!(
637 !record.source_systems.is_empty(),
638 "source_systems gets prose summary fallback"
639 );
640 assert!(
641 !record.dest_tables.is_empty(),
642 "dest_tables gets prose summary fallback"
643 );
644 assert!(!record.daci.is_empty(), "daci gets prose summary fallback");
645
646 let missing = needs_llm_fallback(&record);
647 assert!(
648 missing.contains(&"scheduler_type"),
649 "scheduler_type should be 'unknown' → needs LLM"
650 );
651 assert!(
652 missing.contains(&"github_repo"),
653 "github_repo should be empty — no section matched"
654 );
655 }
656
657 #[test]
658 fn test_truly_empty_fields_trigger_llm_fallback() {
659 let record = PipelineRecord {
660 id: "pipeline::test".to_string(),
661 name: "Test".to_string(),
662 doc_id: "doc::test".to_string(),
663 source_systems: String::new(),
664 dest_tables: String::new(),
665 scheduler_type: "unknown".to_string(),
666 github_repo: String::new(),
667 daci: String::new(),
668 business_logic_summary: String::new(),
669 ..Default::default()
670 };
671
672 let missing = needs_llm_fallback(&record);
673 assert_eq!(
674 missing.len(),
675 6,
676 "All 6 fields should be flagged: {:?}",
677 missing
678 );
679 }
680
681 #[test]
682 fn test_fill_with_llm_gated_by_env_var() {
683 std::env::remove_var("INFIGRAPH_LLM_EXTRACT");
684
685 let content = minimal_pipeline_content();
686 let mut record = parse_pipeline_template(content, "Mystery ETL", "doc::mystery").unwrap();
687
688 let missing_before = needs_llm_fallback(&record);
689 assert!(!missing_before.is_empty(), "Should have missing fields");
690
691 let filled = fill_with_llm(&mut record, content, "Mystery ETL");
692 assert_eq!(
693 filled, 0,
694 "Should return 0 when INFIGRAPH_LLM_EXTRACT not set"
695 );
696
697 let missing_after = needs_llm_fallback(&record);
698 assert_eq!(
699 missing_before, missing_after,
700 "Fields should be unchanged when env var not set"
701 );
702 }
703
704 #[test]
705 fn test_fill_with_llm_no_op_when_all_fields_present() {
706 std::env::set_var("INFIGRAPH_LLM_EXTRACT", "1");
707
708 let content = full_pipeline_content();
709 let mut record = parse_pipeline_template(content, "W2 Metrics", "doc::w2").unwrap();
710
711 let filled = fill_with_llm(&mut record, content, "W2 Metrics");
712 assert_eq!(filled, 0, "Should return 0 when no fields are missing");
713
714 std::env::remove_var("INFIGRAPH_LLM_EXTRACT");
715 }
716
717 #[test]
718 fn test_dependency_table_extraction() {
719 let dep_text = r#"### Upstream
720| Dependency | Owner | SLA |
721|---|---|---|
722| tax_src.raw_w2_data | Tax Ingestion | 1am UTC |
723| ref_data.employer_dim | MDM | hourly |
724### Downstream
725| Dependency | Owner | SLA |
726|---|---|---|
727| tax_rpt.executive_dashboard | BI Team | 6am UTC |
728"#;
729 let (up, down) = extract_dependencies(dep_text);
730 assert!(
731 up.contains("tax_src.raw_w2_data"),
732 "upstream should contain tax_src.raw_w2_data, got: {}",
733 up
734 );
735 assert!(
736 up.contains("ref_data.employer_dim"),
737 "upstream should contain ref_data.employer_dim, got: {}",
738 up
739 );
740 assert!(
741 down.contains("tax_rpt.executive_dashboard"),
742 "downstream should contain tax_rpt.executive_dashboard, got: {}",
743 down
744 );
745 }
746
747 #[test]
748 fn test_dependency_bullet_extraction() {
749 let dep_text = r#"### Upstream
750- system_alpha
751- system_beta
752### Downstream
753- consumer_one
754"#;
755 let (up, down) = extract_dependencies(dep_text);
756 assert!(up.contains("system_alpha"));
757 assert!(up.contains("system_beta"));
758 assert!(down.contains("consumer_one"));
759 }
760
761 #[test]
762 fn test_build_extraction_prompt_includes_only_missing() {
763 let missing = vec!["source_systems", "github_repo"];
764 let prompt = build_extraction_prompt("doc content here", "Test Pipeline", &missing);
765 assert!(prompt.contains("source_systems"));
766 assert!(prompt.contains("github_repo"));
767 assert!(
768 !prompt.contains("scheduler_type"),
769 "Should not include non-missing fields"
770 );
771 assert!(
772 !prompt.contains("daci"),
773 "Should not include non-missing fields"
774 );
775 }
776
777 #[test]
778 fn test_no_sections_returns_none() {
779 let content = "Just some plain text with no headings at all.";
780 let result = parse_pipeline_template(content, "Empty", "doc::empty");
781 assert!(
782 result.is_none(),
783 "Should return None when no sections found"
784 );
785 }
786
787 #[test]
788 fn test_extract_sources_basic() {
789 let text = "Source table: `analytics_src.user_events`\nSchema: billing_dm.invoice_lines";
790 let result = extract_sources(text);
791 assert!(
792 result.contains("analytics_src.user_events"),
793 "got: {}",
794 result
795 );
796 assert!(
797 result.contains("billing_dm.invoice_lines"),
798 "got: {}",
799 result
800 );
801 }
802
803 #[test]
804 fn test_extract_tables_basic() {
805 let text =
806 "### reporting_rpt.daily_summary\nSummary table.\n\nAlso uses `warehouse.dim_date`";
807 let result = extract_tables(text);
808 assert!(
809 result.contains("reporting_rpt.daily_summary"),
810 "got: {}",
811 result
812 );
813 assert!(result.contains("warehouse.dim_date"), "got: {}", result);
814 }
815
816 #[test]
817 fn test_extract_scheduler_airflow() {
818 let text = "Scheduled via Airflow DAG `etl_daily_dag`, runs at 3am UTC.";
819 let (stype, config) = extract_scheduler(text);
820 assert_eq!(stype, "Airflow");
821 assert!(config.contains("3am UTC"), "got: {}", config);
822 }
823
824 #[test]
825 fn test_extract_scheduler_none() {
826 let text = "Runs on an internal platform with no specific tooling mentioned.";
827 let (stype, _config) = extract_scheduler(text);
828 assert_eq!(stype, "unknown");
829 }
830
831 #[test]
832 fn test_extract_daci_basic() {
833 let text = "**Driver:** Jane\n**Approver:** John\n**Contributor:** Sarah, Mike\n**Informed:** Ops Team";
834 let result = extract_daci(text);
835 assert!(result.contains("Driver: Jane"), "got: {}", result);
836 assert!(result.contains("Approver: John"), "got: {}", result);
837 assert!(
838 result.contains("Contributor: Sarah, Mike"),
839 "got: {}",
840 result
841 );
842 assert!(result.contains("Informed: Ops Team"), "got: {}", result);
843 }
844
845 #[test]
846 fn test_extract_github_repo() {
847 let text = "Code lives at https://github.com/org/data-pipeline and is reviewed weekly.";
848 let result = extract_github_repo(text);
849 assert_eq!(result, "https://github.com/org/data-pipeline");
850 }
851
852 #[test]
853 fn test_needs_llm_fallback_complete() {
854 let record = PipelineRecord {
855 id: "pipeline::full".to_string(),
856 name: "Full".to_string(),
857 doc_id: "doc::full".to_string(),
858 source_systems: "src.table_a".to_string(),
859 dest_tables: "dst.table_b".to_string(),
860 scheduler_type: "Airflow".to_string(),
861 scheduler_config: "daily".to_string(),
862 github_repo: "https://github.com/org/repo".to_string(),
863 daci: "Driver: X".to_string(),
864 business_logic_summary: "Aggregates data.".to_string(),
865 ..Default::default()
866 };
867 let missing = needs_llm_fallback(&record);
868 assert!(
869 missing.is_empty(),
870 "expected no missing, got: {:?}",
871 missing
872 );
873 }
874
875 #[test]
876 fn test_needs_llm_fallback_missing_fields() {
877 let record = PipelineRecord {
878 id: "pipeline::empty".to_string(),
879 name: "Empty".to_string(),
880 doc_id: "doc::empty".to_string(),
881 ..Default::default()
882 };
883 let missing = needs_llm_fallback(&record);
884 assert!(missing.contains(&"source_systems"));
885 assert!(missing.contains(&"dest_tables"));
886 assert!(missing.contains(&"github_repo"));
887 assert!(missing.contains(&"daci"));
888 assert!(missing.contains(&"business_logic_summary"));
889 }
890
891 #[test]
892 fn test_parse_pipeline_template_empty_html() {
893 let content = "# Empty Pipeline\n\n## Overview\nNothing here.\n";
894 let result = parse_pipeline_template(content, "Empty", "doc::e");
895 match result {
896 Some(r) => {
897 assert_eq!(r.name, "Empty");
898 assert!(r.source_systems.is_empty());
899 assert!(r.dest_tables.is_empty());
900 assert!(r.github_repo.is_empty());
901 }
902 None => panic!("Expected Some with Overview section parsed"),
903 }
904 }
905
906 #[test]
907 fn test_crawl_options_custom_values() {
908 use crate::sync::CrawlOptions;
909
910 let opts = CrawlOptions {
911 follow_links: true,
912 follow_depth: 3,
913 max_pages: 50,
914 same_space_only: false,
915 };
916 assert_eq!(opts.follow_depth, 3);
917 assert_eq!(opts.max_pages, 50);
918 assert!(!opts.same_space_only);
919 assert!(opts.follow_links);
920
921 let default = CrawlOptions::default_follow();
922 assert_eq!(default.follow_depth, 1);
923 assert_eq!(default.max_pages, 100);
924
925 let no = CrawlOptions::no_follow();
926 assert!(!no.follow_links);
927 assert_eq!(no.follow_depth, 0);
928 }
929}