1use crate::review::parse_diff;
8use crate::CodememEngine;
9use codemem_core::{CodememError, Edge, GraphNode, NodeKind, RelationshipType};
10use serde_json::json;
11use std::collections::{HashMap, HashSet};
12
13const EMPTY_TREE_SHA: &str = "4b825dc642cb6eb9a060e54bf899d69f82d7a419";
15
16#[derive(Debug, Default)]
18pub struct TemporalIngestResult {
19 pub commits_processed: usize,
20 pub commits_skipped: usize,
21 pub pr_nodes_created: usize,
22 pub modified_by_edges: usize,
23 pub part_of_edges: usize,
24 pub symbols_expired: usize,
25}
26
27struct ParsedCommit {
29 hash: String,
30 short_hash: String,
31 parents: Vec<String>,
32 author: String,
33 date: chrono::DateTime<chrono::Utc>,
34 subject: String,
35 files: Vec<String>,
36}
37
38struct DetectedPR {
40 number: String,
42 commits: Vec<String>,
44 squash: bool,
46 merged_at: chrono::DateTime<chrono::Utc>,
48 title: String,
50 author: String,
52}
53
54fn is_bot_commit(author: &str, files: &[String]) -> bool {
56 let bot_author = author.contains("[bot]")
57 || author.ends_with("-bot")
58 || author.ends_with("bot)")
59 || author == "renovate"
60 || author == "github-actions";
61
62 if bot_author {
63 return true;
64 }
65
66 if !files.is_empty()
68 && files.iter().all(|f| {
69 f.ends_with(".lock")
70 || f.ends_with("lock.json")
71 || f.ends_with("lock.yaml")
72 || f == "CHANGELOG.md"
73 || f == "Cargo.lock"
74 || f == "bun.lock"
75 || f == "yarn.lock"
76 || f == "package-lock.json"
77 || f == "pnpm-lock.yaml"
78 || f == "Gemfile.lock"
79 || f == "poetry.lock"
80 || f == "go.sum"
81 })
82 {
83 return true;
84 }
85
86 false
87}
88
89fn extract_pr_number(subject: &str) -> Option<String> {
92 if let Some(start) = subject.rfind("(#") {
94 if let Some(end) = subject[start..].find(')') {
95 let num = &subject[start + 2..start + end];
96 if num.chars().all(|c| c.is_ascii_digit()) {
97 return Some(num.to_string());
98 }
99 }
100 }
101 if let Some(rest) = subject.strip_prefix("Merge pull request #") {
103 let num: String = rest.chars().take_while(|c| c.is_ascii_digit()).collect();
104 if !num.is_empty() {
105 return Some(num);
106 }
107 }
108 None
109}
110
111impl CodememEngine {
112 pub fn ingest_git_temporal(
117 &self,
118 path: &str,
119 days: u64,
120 namespace: Option<&str>,
121 ) -> Result<TemporalIngestResult, CodememError> {
122 let mut result = TemporalIngestResult::default();
123 let ns = namespace.unwrap_or("");
124
125 let commits = self.parse_git_log(path, days)?;
127 if commits.is_empty() {
128 return Ok(result);
129 }
130
131 let last_ingested = self.get_last_ingested_commit(ns);
133 let commits: Vec<ParsedCommit> = if let Some(ref last_hash) = last_ingested {
134 let skip_idx = commits.iter().position(|c| c.hash == *last_hash);
136 match skip_idx {
137 Some(idx) => commits.into_iter().take(idx).collect(),
138 None => commits, }
140 } else {
141 commits
142 };
143
144 if commits.is_empty() {
145 return Ok(result);
146 }
147
148 let (real_commits, bot_groups) = compact_bot_commits(commits);
150 result.commits_skipped = bot_groups.values().map(|g| g.len().saturating_sub(1)).sum();
151
152 let now = chrono::Utc::now();
154 let mut commit_nodes = Vec::new();
155 let mut edges = Vec::new();
156
157 for commit in &real_commits {
158 let commit_id = format!("commit:{}", commit.hash);
159
160 let node = GraphNode {
161 id: commit_id.clone(),
162 kind: NodeKind::Commit,
163 label: format!("{} {}", commit.short_hash, commit.subject),
164 payload: {
165 let mut p = HashMap::new();
166 p.insert("hash".into(), json!(commit.hash));
167 p.insert("short_hash".into(), json!(commit.short_hash));
168 p.insert("author".into(), json!(commit.author));
169 p.insert("date".into(), json!(commit.date.to_rfc3339()));
170 p.insert("subject".into(), json!(commit.subject));
171 p.insert("parents".into(), json!(commit.parents));
172 p.insert("files_changed".into(), json!(commit.files.len()));
173 p
174 },
175 centrality: 0.0,
176 memory_id: None,
177 namespace: Some(ns.to_string()),
178 valid_from: Some(commit.date),
179 valid_to: None,
180 };
181 commit_nodes.push(node);
182
183 for file in &commit.files {
185 let file_id = format!("file:{file}");
186 edges.push(Edge {
187 id: format!("modby:{file_id}:{}", commit.hash),
188 src: file_id,
189 dst: commit_id.clone(),
190 relationship: RelationshipType::ModifiedBy,
191 weight: 0.4,
192 properties: {
193 let mut p = HashMap::new();
194 p.insert("commit_date".into(), json!(commit.date.to_rfc3339()));
195 p
196 },
197 created_at: now,
198 valid_from: Some(commit.date),
199 valid_to: None,
200 });
201 result.modified_by_edges += 1;
202 }
203
204 result.commits_processed += 1;
205 }
206
207 let symbol_cutoff = now - chrono::Duration::days(30);
210 for commit in &real_commits {
211 if commit.date < symbol_cutoff {
212 continue;
213 }
214 let symbol_edges = self.commit_symbol_edges(path, commit, ns);
215 edges.extend(symbol_edges);
216 }
217
218 for (key, group) in &bot_groups {
220 if group.is_empty() {
221 continue;
222 }
223 let representative = &group[0];
224 let commit_id = format!("commit:{}", representative.hash);
225 let node = GraphNode {
226 id: commit_id,
227 kind: NodeKind::Commit,
228 label: format!("{} [{}x] {}", representative.short_hash, group.len(), key),
229 payload: {
230 let mut p = HashMap::new();
231 p.insert("hash".into(), json!(representative.hash));
232 p.insert("author".into(), json!(representative.author));
233 p.insert("date".into(), json!(representative.date.to_rfc3339()));
234 p.insert("compacted_count".into(), json!(group.len()));
235 p.insert("bot".into(), json!(true));
236 p
237 },
238 centrality: 0.0,
239 memory_id: None,
240 namespace: Some(ns.to_string()),
241 valid_from: Some(representative.date),
242 valid_to: None,
243 };
244 commit_nodes.push(node);
245 }
246
247 let prs = detect_prs(&real_commits);
249 for pr in &prs {
250 let pr_id = format!("pr:{}", pr.number);
251 let node = GraphNode {
252 id: pr_id.clone(),
253 kind: NodeKind::PullRequest,
254 label: format!("#{} {}", pr.number, pr.title),
255 payload: {
256 let mut p = HashMap::new();
257 p.insert("number".into(), json!(pr.number));
258 p.insert("title".into(), json!(pr.title));
259 p.insert("author".into(), json!(pr.author));
260 p.insert("squash".into(), json!(pr.squash));
261 p.insert("commit_count".into(), json!(pr.commits.len()));
262 p
263 },
264 centrality: 0.0,
265 memory_id: None,
266 namespace: Some(ns.to_string()),
267 valid_from: Some(pr.merged_at),
268 valid_to: None,
269 };
270 commit_nodes.push(node);
271 result.pr_nodes_created += 1;
272
273 for commit_hash in &pr.commits {
274 let commit_id = format!("commit:{commit_hash}");
275 edges.push(Edge {
276 id: format!("partof:{commit_id}:{pr_id}"),
277 src: commit_id,
278 dst: pr_id.clone(),
279 relationship: RelationshipType::PartOf,
280 weight: 0.4,
281 properties: HashMap::new(),
282 created_at: now,
283 valid_from: Some(pr.merged_at),
284 valid_to: None,
285 });
286 result.part_of_edges += 1;
287 }
288 }
289
290 result.symbols_expired = self.expire_deleted_symbols(path, &real_commits, ns)?;
292
293 self.storage.insert_graph_nodes_batch(&commit_nodes)?;
295 self.storage.insert_graph_edges_batch(&edges)?;
296
297 {
298 let mut graph = self.lock_graph()?;
299 for node in commit_nodes {
300 let _ = graph.add_node(node);
301 }
302 for edge in edges {
303 let _ = graph.add_edge(edge);
304 }
305 }
306
307 if let Some(latest) = real_commits.first() {
309 self.record_last_ingested_commit(ns, &latest.hash);
310 }
311
312 Ok(result)
313 }
314
315 fn parse_git_log(&self, path: &str, days: u64) -> Result<Vec<ParsedCommit>, CodememError> {
317 let output = std::process::Command::new("git")
318 .args([
319 "-C",
320 path,
321 "log",
322 "--format=COMMIT:%H|%P|%an|%aI|%s",
323 "--name-only",
324 "--diff-filter=AMDRT",
325 &format!("--since={days} days ago"),
326 ])
327 .output()
328 .map_err(|e| CodememError::Internal(format!("Failed to run git: {e}")))?;
329
330 if !output.status.success() {
331 let stderr = String::from_utf8_lossy(&output.stderr);
332 return Err(CodememError::Internal(format!("git log failed: {stderr}")));
333 }
334
335 let stdout = String::from_utf8_lossy(&output.stdout);
336 let mut commits = Vec::new();
337
338 for block in stdout.split("COMMIT:").skip(1) {
339 let mut lines = block.lines();
340 if let Some(header) = lines.next() {
341 let parts: Vec<&str> = header.splitn(5, '|').collect();
342 if parts.len() >= 5 {
343 let hash = parts[0].to_string();
344 let short_hash = hash[..hash.len().min(7)].to_string();
345 let parents: Vec<String> =
346 parts[1].split_whitespace().map(|s| s.to_string()).collect();
347 let author = parts[2].to_string();
348 let date = chrono::DateTime::parse_from_rfc3339(parts[3])
349 .map(|dt| dt.with_timezone(&chrono::Utc))
350 .unwrap_or_else(|_| chrono::Utc::now());
351 let subject = parts[4].to_string();
352 let files: Vec<String> = lines
353 .filter(|l| !l.trim().is_empty())
354 .map(|l| l.trim().to_string())
355 .collect();
356
357 commits.push(ParsedCommit {
358 hash,
359 short_hash,
360 parents,
361 author,
362 date,
363 subject,
364 files,
365 });
366 }
367 }
368 }
369
370 Ok(commits)
371 }
372
373 fn commit_symbol_edges(&self, path: &str, commit: &ParsedCommit, namespace: &str) -> Vec<Edge> {
375 let mut edges = Vec::new();
376 let parent = commit
377 .parents
378 .first()
379 .map(|s| s.as_str())
380 .unwrap_or(EMPTY_TREE_SHA);
381
382 let diff_output = std::process::Command::new("git")
383 .args(["-C", path, "diff", parent, &commit.hash, "--unified=0"])
384 .output();
385
386 let diff_text = match diff_output {
387 Ok(o) if o.status.success() => String::from_utf8_lossy(&o.stdout).to_string(),
388 _ => return edges,
389 };
390
391 let hunks = parse_diff(&diff_text);
392 if hunks.is_empty() {
393 return edges;
394 }
395
396 let graph = match self.lock_graph() {
398 Ok(g) => g,
399 Err(e) => {
400 tracing::warn!("Failed to lock graph for symbol-level diff: {e}");
401 return edges;
402 }
403 };
404 let all_nodes = graph.get_all_nodes();
405
406 let mut file_symbols: HashMap<&str, Vec<(&str, u32, u32)>> = HashMap::new();
407 for node in &all_nodes {
408 if matches!(
409 node.kind,
410 NodeKind::Function
411 | NodeKind::Method
412 | NodeKind::Class
413 | NodeKind::Trait
414 | NodeKind::Interface
415 | NodeKind::Enum
416 ) {
417 if let (Some(fp), Some(ls), Some(le)) = (
418 node.payload.get("file_path").and_then(|v| v.as_str()),
419 node.payload
420 .get("line_start")
421 .and_then(|v| v.as_u64())
422 .map(|v| v as u32),
423 node.payload
424 .get("line_end")
425 .and_then(|v| v.as_u64())
426 .map(|v| v as u32),
427 ) {
428 if node.namespace.as_deref() == Some(namespace) || namespace.is_empty() {
429 file_symbols.entry(fp).or_default().push((&node.id, ls, le));
430 }
431 }
432 }
433 }
434 drop(graph);
435
436 let commit_id = format!("commit:{}", commit.hash);
437 let now = chrono::Utc::now();
438 let mut seen = HashSet::new();
439
440 for hunk in &hunks {
441 if let Some(symbols) = file_symbols.get(hunk.file_path.as_str()) {
442 let changed_lines: HashSet<u32> = hunk
443 .added_lines
444 .iter()
445 .chain(hunk.removed_lines.iter())
446 .copied()
447 .collect();
448
449 for &(sym_id, line_start, line_end) in symbols {
450 if changed_lines
451 .iter()
452 .any(|&l| l >= line_start && l <= line_end)
453 && seen.insert(sym_id)
454 {
455 edges.push(Edge {
456 id: format!("modby:{}:{}", sym_id, commit.hash),
457 src: sym_id.to_string(),
458 dst: commit_id.clone(),
459 relationship: RelationshipType::ModifiedBy,
460 weight: 0.4,
461 properties: {
462 let mut p = HashMap::new();
463 p.insert("commit_date".into(), json!(commit.date.to_rfc3339()));
464 p.insert("symbol_level".into(), json!(true));
465 p
466 },
467 created_at: now,
468 valid_from: Some(commit.date),
469 valid_to: None,
470 });
471 }
472 }
473 }
474 }
475
476 edges
477 }
478
479 fn expire_deleted_symbols(
485 &self,
486 path: &str,
487 commits: &[ParsedCommit],
488 namespace: &str,
489 ) -> Result<usize, CodememError> {
490 let since = commits
492 .last()
493 .map(|c| c.date.to_rfc3339())
494 .unwrap_or_else(|| "90 days ago".to_string());
495
496 let output = std::process::Command::new("git")
497 .args([
498 "-C",
499 path,
500 "log",
501 "--format=COMMIT:%H|%aI",
502 "--diff-filter=D",
503 "--name-only",
504 &format!("--since={since}"),
505 ])
506 .output()
507 .map_err(|e| CodememError::Internal(format!("Failed to run git: {e}")))?;
508
509 if !output.status.success() {
510 return Ok(0);
511 }
512
513 let stdout = String::from_utf8_lossy(&output.stdout);
514
515 let mut deletions: Vec<(chrono::DateTime<chrono::Utc>, HashSet<String>)> = Vec::new();
517 for block in stdout.split("COMMIT:").skip(1) {
518 let mut lines = block.lines();
519 let date = lines
520 .next()
521 .and_then(|h| {
522 let parts: Vec<&str> = h.splitn(2, '|').collect();
523 parts.get(1).and_then(|d| {
524 chrono::DateTime::parse_from_rfc3339(d)
525 .ok()
526 .map(|dt| dt.with_timezone(&chrono::Utc))
527 })
528 })
529 .unwrap_or_else(chrono::Utc::now);
530
531 let files: HashSet<String> = lines
532 .filter(|l| !l.trim().is_empty())
533 .map(|l| l.trim().to_string())
534 .collect();
535
536 if !files.is_empty() {
537 deletions.push((date, files));
538 }
539 }
540
541 if deletions.is_empty() {
542 return Ok(0);
543 }
544
545 let expired_nodes: Vec<GraphNode> = {
547 let graph = self.lock_graph()?;
548 let all_nodes = graph.get_all_nodes();
549 let mut to_expire = Vec::new();
550
551 for (date, deleted_files) in &deletions {
552 for node in &all_nodes {
553 if node.valid_to.is_some() {
554 continue;
555 }
556 if !namespace.is_empty() && node.namespace.as_deref() != Some(namespace) {
557 continue;
558 }
559
560 let should_expire = match node.kind {
561 NodeKind::File => {
562 let fp = node.id.strip_prefix("file:").unwrap_or(&node.id);
563 deleted_files.contains(fp)
564 }
565 _ => node
566 .payload
567 .get("file_path")
568 .and_then(|v| v.as_str())
569 .is_some_and(|fp| deleted_files.contains(fp)),
570 };
571
572 if should_expire {
573 let mut expired_node = node.clone();
574 expired_node.valid_to = Some(*date);
575 to_expire.push(expired_node);
576 }
577 }
578 }
579 to_expire
580 };
581 let count = expired_nodes.len();
585 if !expired_nodes.is_empty() {
586 self.storage.insert_graph_nodes_batch(&expired_nodes)?;
587 let mut graph = self.lock_graph()?;
588 for node in expired_nodes {
589 let _ = graph.add_node(node);
590 }
591 }
592
593 Ok(count)
594 }
595
596 fn get_last_ingested_commit(&self, namespace: &str) -> Option<String> {
598 let sentinel_id = format!("commit:_HEAD:{namespace}");
599 if let Ok(Some(node)) = self.storage.get_graph_node(&sentinel_id) {
600 node.payload
601 .get("hash")
602 .and_then(|v| v.as_str())
603 .map(|s| s.to_string())
604 } else {
605 None
606 }
607 }
608
609 fn record_last_ingested_commit(&self, namespace: &str, hash: &str) {
611 let sentinel_id = format!("commit:_HEAD:{namespace}");
612 let node = GraphNode {
613 id: sentinel_id,
614 kind: NodeKind::Commit,
615 label: format!("_HEAD:{namespace}"),
616 payload: {
617 let mut p = HashMap::new();
618 p.insert("hash".into(), json!(hash));
619 p.insert("sentinel".into(), json!(true));
620 p
621 },
622 centrality: 0.0,
623 memory_id: None,
624 namespace: Some(namespace.to_string()),
625 valid_from: None,
626 valid_to: None,
627 };
628 let _ = self.storage.insert_graph_node(&node);
629 }
630}
631
632fn compact_bot_commits(
635 commits: Vec<ParsedCommit>,
636) -> (Vec<ParsedCommit>, HashMap<String, Vec<ParsedCommit>>) {
637 let mut real = Vec::new();
638 let mut bot_groups: HashMap<String, Vec<ParsedCommit>> = HashMap::new();
639
640 for commit in commits {
641 if is_bot_commit(&commit.author, &commit.files) {
642 let key = format!(
643 "{}:{}",
644 commit.author,
645 commit
646 .files
647 .first()
648 .map(|f| f.as_str())
649 .unwrap_or("unknown")
650 );
651 bot_groups.entry(key).or_default().push(commit);
652 } else {
653 real.push(commit);
654 }
655 }
656
657 (real, bot_groups)
658}
659
660fn detect_prs(commits: &[ParsedCommit]) -> Vec<DetectedPR> {
662 let mut prs = Vec::new();
663 let mut seen_prs: HashSet<String> = HashSet::new();
664
665 for commit in commits {
666 if let Some(pr_number) = extract_pr_number(&commit.subject) {
667 if seen_prs.contains(&pr_number) {
668 continue;
669 }
670 seen_prs.insert(pr_number.clone());
671
672 let is_merge = commit.parents.len() > 1;
673 let is_squash = commit.parents.len() == 1;
674
675 let commit_hashes = if is_squash {
678 vec![commit.hash.clone()]
679 } else if is_merge && commit.parents.len() == 2 {
680 vec![commit.hash.clone()]
684 } else {
685 vec![commit.hash.clone()]
686 };
687
688 prs.push(DetectedPR {
689 number: pr_number,
690 commits: commit_hashes,
691 squash: is_squash,
692 merged_at: commit.date,
693 title: commit.subject.clone(),
694 author: commit.author.clone(),
695 });
696 }
697 }
698
699 prs
700}
701
702#[cfg(test)]
703mod tests {
704 use super::*;
705
706 #[test]
707 fn extract_pr_number_squash() {
708 assert_eq!(
709 extract_pr_number("feat: add foo (#123)"),
710 Some("123".to_string())
711 );
712 assert_eq!(
713 extract_pr_number("fix: something (#42)"),
714 Some("42".to_string())
715 );
716 }
717
718 #[test]
719 fn extract_pr_number_merge() {
720 assert_eq!(
721 extract_pr_number("Merge pull request #456 from org/branch"),
722 Some("456".to_string())
723 );
724 }
725
726 #[test]
727 fn extract_pr_number_none() {
728 assert_eq!(extract_pr_number("chore: update deps"), None);
729 assert_eq!(extract_pr_number("fix bug in #parser"), None);
730 }
731
732 #[test]
733 fn bot_detection() {
734 assert!(is_bot_commit("dependabot[bot]", &[]));
735 assert!(is_bot_commit("renovate", &[]));
736 assert!(is_bot_commit("some-user", &["Cargo.lock".to_string()]));
737 assert!(is_bot_commit(
738 "some-user",
739 &["package-lock.json".to_string()]
740 ));
741 assert!(!is_bot_commit("some-user", &["src/main.rs".to_string()]));
742 }
743
744 #[test]
745 fn compact_separates_bots() {
746 let commits = vec![
747 ParsedCommit {
748 hash: "aaa".into(),
749 short_hash: "aaa".into(),
750 parents: vec![],
751 author: "dev".into(),
752 date: chrono::Utc::now(),
753 subject: "feat: real work".into(),
754 files: vec!["src/main.rs".into()],
755 },
756 ParsedCommit {
757 hash: "bbb".into(),
758 short_hash: "bbb".into(),
759 parents: vec![],
760 author: "dependabot[bot]".into(),
761 date: chrono::Utc::now(),
762 subject: "chore: bump deps".into(),
763 files: vec!["Cargo.lock".into()],
764 },
765 ];
766 let (real, bots) = compact_bot_commits(commits);
767 assert_eq!(real.len(), 1);
768 assert_eq!(real[0].hash, "aaa");
769 assert_eq!(bots.len(), 1);
770 }
771
772 #[test]
773 fn detect_prs_from_squash() {
774 let commits = vec![
775 ParsedCommit {
776 hash: "abc123".into(),
777 short_hash: "abc123".into(),
778 parents: vec!["def456".into()],
779 author: "dev".into(),
780 date: chrono::Utc::now(),
781 subject: "feat: add feature (#10)".into(),
782 files: vec!["src/lib.rs".into()],
783 },
784 ParsedCommit {
785 hash: "xyz789".into(),
786 short_hash: "xyz789".into(),
787 parents: vec!["abc123".into()],
788 author: "dev".into(),
789 date: chrono::Utc::now(),
790 subject: "fix: plain commit".into(),
791 files: vec!["src/main.rs".into()],
792 },
793 ];
794 let prs = detect_prs(&commits);
795 assert_eq!(prs.len(), 1);
796 assert_eq!(prs[0].number, "10");
797 assert!(prs[0].squash);
798 assert_eq!(prs[0].commits, vec!["abc123"]);
799 }
800}