Skip to main content

codemem_engine/enrichment/
temporal.rs

1//! Temporal graph layer: commit and PR nodes with symbol-level ModifiedBy edges.
2//!
3//! Replays git history (default 90 days) to build a layered graph where each
4//! commit is a node connected to the symbols/files it modified. PRs are detected
5//! from merge/squash commit patterns and connected to their commits via PartOf edges.
6
7use crate::review::parse_diff;
8use crate::CodememEngine;
9use codemem_core::{CodememError, Edge, GraphNode, NodeKind, RelationshipType};
10use serde_json::json;
11use std::collections::{HashMap, HashSet};
12
13/// Git's empty tree SHA — used as the diff parent for root commits.
14const EMPTY_TREE_SHA: &str = "4b825dc642cb6eb9a060e54bf899d69f82d7a419";
15
16/// Result of temporal graph ingestion.
17#[derive(Debug, Default)]
18pub struct TemporalIngestResult {
19    pub commits_processed: usize,
20    pub commits_skipped: usize,
21    pub pr_nodes_created: usize,
22    pub modified_by_edges: usize,
23    pub part_of_edges: usize,
24    pub symbols_expired: usize,
25}
26
27/// Parsed commit from git log.
28pub(crate) struct ParsedCommit {
29    hash: String,
30    short_hash: String,
31    parents: Vec<String>,
32    author: String,
33    date: chrono::DateTime<chrono::Utc>,
34    subject: String,
35    files: Vec<String>,
36}
37
38/// Detected PR from commit patterns.
39struct DetectedPR {
40    /// PR number (from commit subject).
41    number: String,
42    /// Commit hashes belonging to this PR.
43    commits: Vec<String>,
44    /// Whether this was a squash merge (single commit).
45    squash: bool,
46    /// Timestamp from the merge/squash commit.
47    merged_at: chrono::DateTime<chrono::Utc>,
48    /// Subject of the merge commit (used as PR title).
49    title: String,
50    /// Author of the merge commit.
51    author: String,
52}
53
54/// Check if a commit looks like a bot/CI commit that should be compacted.
55fn is_bot_commit(author: &str, files: &[String]) -> bool {
56    let bot_author = author.contains("[bot]")
57        || author.ends_with("-bot")
58        || author.ends_with("bot)")
59        || author == "renovate"
60        || author == "github-actions";
61
62    if bot_author {
63        return true;
64    }
65
66    // All files are lock/generated files
67    if !files.is_empty()
68        && files.iter().all(|f| {
69            f.ends_with(".lock")
70                || f.ends_with("lock.json")
71                || f.ends_with("lock.yaml")
72                || f == "CHANGELOG.md"
73                || f == "Cargo.lock"
74                || f == "bun.lock"
75                || f == "yarn.lock"
76                || f == "package-lock.json"
77                || f == "pnpm-lock.yaml"
78                || f == "Gemfile.lock"
79                || f == "poetry.lock"
80                || f == "go.sum"
81        })
82    {
83        return true;
84    }
85
86    false
87}
88
89/// Extract PR number from a commit subject.
90/// Matches: `feat: add foo (#123)`, `Merge pull request #123 from ...`
91fn extract_pr_number(subject: &str) -> Option<String> {
92    // Squash merge: "subject (#123)"
93    if let Some(start) = subject.rfind("(#") {
94        if let Some(end) = subject[start..].find(')') {
95            let num = &subject[start + 2..start + end];
96            if num.chars().all(|c| c.is_ascii_digit()) {
97                return Some(num.to_string());
98            }
99        }
100    }
101    // GitHub merge commit: "Merge pull request #123 from ..."
102    if let Some(rest) = subject.strip_prefix("Merge pull request #") {
103        let num: String = rest.chars().take_while(|c| c.is_ascii_digit()).collect();
104        if !num.is_empty() {
105            return Some(num);
106        }
107    }
108    None
109}
110
111impl CodememEngine {
112    /// Ingest git history into the temporal graph layer.
113    ///
114    /// Creates Commit nodes, PullRequest nodes, and ModifiedBy edges.
115    /// Detects squash/merge PRs and compacts bot commits.
116    pub fn ingest_git_temporal(
117        &self,
118        path: &str,
119        days: u64,
120        namespace: Option<&str>,
121    ) -> Result<TemporalIngestResult, CodememError> {
122        let mut result = TemporalIngestResult::default();
123        let ns = namespace.unwrap_or("");
124
125        // ── Step 1: Parse git log with parent hashes and subject ────────
126        let commits = self.parse_git_log(path, days)?;
127        if commits.is_empty() {
128            return Ok(result);
129        }
130
131        // ── Step 2: Check for incremental ingestion ────────────────────
132        let last_ingested = self.get_last_ingested_commit(ns);
133        let commits: Vec<ParsedCommit> = if let Some(ref last_hash) = last_ingested {
134            // Skip commits we've already processed
135            let skip_idx = commits.iter().position(|c| c.hash == *last_hash);
136            match skip_idx {
137                Some(idx) => commits.into_iter().take(idx).collect(),
138                None => commits, // Last commit not found, process all
139            }
140        } else {
141            commits
142        };
143
144        if commits.is_empty() {
145            return Ok(result);
146        }
147
148        // ── Step 3: Compact bot/repetitive commits ──────────────────────
149        let (real_commits, bot_groups) = compact_bot_commits(commits);
150        result.commits_skipped = bot_groups.values().map(|g| g.len().saturating_sub(1)).sum();
151
152        // ── Step 4: Create commit nodes and ModifiedBy edges ────────────
153        let now = chrono::Utc::now();
154        let mut commit_nodes = Vec::new();
155        let mut edges = Vec::new();
156
157        for commit in &real_commits {
158            let commit_id = format!("commit:{}", commit.hash);
159
160            let node = GraphNode {
161                id: commit_id.clone(),
162                kind: NodeKind::Commit,
163                label: format!("{} {}", commit.short_hash, commit.subject),
164                payload: {
165                    let mut p = HashMap::new();
166                    p.insert("hash".into(), json!(commit.hash));
167                    p.insert("short_hash".into(), json!(commit.short_hash));
168                    p.insert("author".into(), json!(commit.author));
169                    p.insert("date".into(), json!(commit.date.to_rfc3339()));
170                    p.insert("subject".into(), json!(commit.subject));
171                    p.insert("parents".into(), json!(commit.parents));
172                    p.insert("files_changed".into(), json!(commit.files.len()));
173                    p
174                },
175                centrality: 0.0,
176                memory_id: None,
177                namespace: Some(ns.to_string()),
178                valid_from: Some(commit.date),
179                valid_to: None,
180            };
181            commit_nodes.push(node);
182
183            // File-level ModifiedBy edges
184            for file in &commit.files {
185                let file_id = format!("file:{file}");
186                edges.push(Edge {
187                    id: format!("modby:{file_id}:{}", commit.hash),
188                    src: file_id,
189                    dst: commit_id.clone(),
190                    relationship: RelationshipType::ModifiedBy,
191                    weight: 0.4,
192                    properties: {
193                        let mut p = HashMap::new();
194                        p.insert("commit_date".into(), json!(commit.date.to_rfc3339()));
195                        p
196                    },
197                    created_at: now,
198                    valid_from: Some(commit.date),
199                    valid_to: None,
200                });
201                result.modified_by_edges += 1;
202            }
203
204            result.commits_processed += 1;
205        }
206
207        // ── Step 5: Symbol-level ModifiedBy edges (via diff) ────────────
208        // Only for recent commits (last 30 days) to limit cost
209        let symbol_cutoff = now - chrono::Duration::days(30);
210        for commit in &real_commits {
211            if commit.date < symbol_cutoff {
212                continue;
213            }
214            let symbol_edges = self.commit_symbol_edges(path, commit, ns);
215            edges.extend(symbol_edges);
216        }
217
218        // ── Step 6: Create compacted bot commit nodes ───────────────────
219        for (key, group) in &bot_groups {
220            if group.is_empty() {
221                continue;
222            }
223            let representative = &group[0];
224            let commit_id = format!("commit:{}", representative.hash);
225            let node = GraphNode {
226                id: commit_id,
227                kind: NodeKind::Commit,
228                label: format!("{} [{}x] {}", representative.short_hash, group.len(), key),
229                payload: {
230                    let mut p = HashMap::new();
231                    p.insert("hash".into(), json!(representative.hash));
232                    p.insert("author".into(), json!(representative.author));
233                    p.insert("date".into(), json!(representative.date.to_rfc3339()));
234                    p.insert("compacted_count".into(), json!(group.len()));
235                    p.insert("bot".into(), json!(true));
236                    p
237                },
238                centrality: 0.0,
239                memory_id: None,
240                namespace: Some(ns.to_string()),
241                valid_from: Some(representative.date),
242                valid_to: None,
243            };
244            commit_nodes.push(node);
245        }
246
247        // ── Step 7: Detect PRs and create PR nodes + PartOf edges ───────
248        let prs = detect_prs(&real_commits);
249        for pr in &prs {
250            let pr_id = format!("pr:{}", pr.number);
251            let node = GraphNode {
252                id: pr_id.clone(),
253                kind: NodeKind::PullRequest,
254                label: format!("#{} {}", pr.number, pr.title),
255                payload: {
256                    let mut p = HashMap::new();
257                    p.insert("number".into(), json!(pr.number));
258                    p.insert("title".into(), json!(pr.title));
259                    p.insert("author".into(), json!(pr.author));
260                    p.insert("squash".into(), json!(pr.squash));
261                    p.insert("commit_count".into(), json!(pr.commits.len()));
262                    p
263                },
264                centrality: 0.0,
265                memory_id: None,
266                namespace: Some(ns.to_string()),
267                valid_from: Some(pr.merged_at),
268                valid_to: None,
269            };
270            commit_nodes.push(node);
271            result.pr_nodes_created += 1;
272
273            for commit_hash in &pr.commits {
274                let commit_id = format!("commit:{commit_hash}");
275                edges.push(Edge {
276                    id: format!("partof:{commit_id}:{pr_id}"),
277                    src: commit_id,
278                    dst: pr_id.clone(),
279                    relationship: RelationshipType::PartOf,
280                    weight: 0.4,
281                    properties: HashMap::new(),
282                    created_at: now,
283                    valid_from: Some(pr.merged_at),
284                    valid_to: None,
285                });
286                result.part_of_edges += 1;
287            }
288        }
289
290        // ── Step 8: Detect deleted symbols ──────────────────────────────
291        result.symbols_expired = self.expire_deleted_symbols(path, &real_commits, ns)?;
292
293        // ── Step 9: Persist to storage and in-memory graph ──────────────
294        // Collect edge endpoints that don't exist as commit/PR nodes we're
295        // about to insert.  These need placeholder rows in graph_nodes
296        // BEFORE we insert edges, otherwise the FK constraint fails.
297        let commit_node_ids: HashSet<&str> = commit_nodes.iter().map(|n| n.id.as_str()).collect();
298        let mut placeholder_ids = HashSet::new();
299        let mut placeholders = Vec::new();
300        for edge in &edges {
301            for endpoint_id in [&edge.src, &edge.dst] {
302                if commit_node_ids.contains(endpoint_id.as_str()) {
303                    continue;
304                }
305                if !placeholder_ids.insert(endpoint_id.clone()) {
306                    continue; // already queued
307                }
308                // Only create if missing from storage
309                if matches!(self.storage.get_graph_node(endpoint_id), Ok(Some(_))) {
310                    continue;
311                }
312                let kind = if endpoint_id.starts_with("file:") {
313                    NodeKind::File
314                } else if endpoint_id.starts_with("sym:") {
315                    NodeKind::Function
316                } else if endpoint_id.starts_with("commit:") {
317                    NodeKind::Commit
318                } else if endpoint_id.starts_with("pr:") {
319                    NodeKind::PullRequest
320                } else {
321                    NodeKind::External
322                };
323                let label = endpoint_id
324                    .find(':')
325                    .map(|i| &endpoint_id[i + 1..])
326                    .unwrap_or(endpoint_id)
327                    .to_string();
328                placeholders.push(GraphNode {
329                    id: endpoint_id.clone(),
330                    kind,
331                    label,
332                    payload: HashMap::new(),
333                    centrality: 0.0,
334                    memory_id: None,
335                    namespace: None,
336                    valid_from: None,
337                    valid_to: None,
338                });
339            }
340        }
341
342        if !placeholders.is_empty() {
343            self.storage.insert_graph_nodes_batch(&placeholders)?;
344        }
345        self.storage.insert_graph_nodes_batch(&commit_nodes)?;
346        self.storage.insert_graph_edges_batch(&edges)?;
347
348        // Single lock scope for both nodes and edges to ensure atomic
349        // visibility to concurrent readers.
350        {
351            let mut graph = self.lock_graph()?;
352            for node in placeholders {
353                let _ = graph.add_node(node);
354            }
355            for node in commit_nodes {
356                let _ = graph.add_node(node);
357            }
358            self.add_edges_with_placeholders(&mut **graph, &edges)?;
359        }
360
361        // Record last ingested commit for incremental runs
362        if let Some(latest) = real_commits.first() {
363            self.record_last_ingested_commit(ns, &latest.hash);
364        }
365
366        Ok(result)
367    }
368
369    /// Ensure all edge endpoints exist in the in-memory graph, creating placeholder
370    /// nodes as needed, then add the edges. Logs warnings for any remaining failures.
371    ///
372    /// Placeholder nodes are also persisted to storage so they survive restarts.
373    /// Callers must hold the graph lock; this avoids a double-lock window where
374    /// concurrent readers could see nodes without their edges.
375    pub(crate) fn add_edges_with_placeholders(
376        &self,
377        graph: &mut dyn codemem_core::GraphBackend,
378        edges: &[Edge],
379    ) -> Result<(), CodememError> {
380        let mut warn_count = 0u32;
381        let mut total_failures = 0u32;
382
383        for edge in edges {
384            // Ensure src node exists
385            for endpoint_id in [&edge.src, &edge.dst] {
386                if graph.get_node(endpoint_id)?.is_none() {
387                    let kind = if endpoint_id.starts_with("file:") {
388                        NodeKind::File
389                    } else if endpoint_id.starts_with("sym:") {
390                        NodeKind::Function
391                    } else if endpoint_id.starts_with("commit:") {
392                        NodeKind::Commit
393                    } else if endpoint_id.starts_with("pr:") {
394                        NodeKind::PullRequest
395                    } else {
396                        NodeKind::External
397                    };
398
399                    let label = endpoint_id
400                        .find(':')
401                        .map(|i| &endpoint_id[i + 1..])
402                        .unwrap_or(endpoint_id)
403                        .to_string();
404
405                    let placeholder = GraphNode {
406                        id: endpoint_id.clone(),
407                        kind,
408                        label,
409                        payload: HashMap::new(),
410                        centrality: 0.0,
411                        memory_id: None,
412                        namespace: None,
413                        valid_from: None,
414                        valid_to: None,
415                    };
416                    // Persist to storage so placeholder survives restarts
417                    let _ = self.storage.insert_graph_node(&placeholder);
418                    let _ = graph.add_node(placeholder);
419                }
420            }
421
422            if let Err(e) = graph.add_edge(edge.clone()) {
423                total_failures += 1;
424                if warn_count < 5 {
425                    tracing::warn!(
426                        "Failed to add edge {} ({} -> {}): {e}",
427                        edge.id,
428                        edge.src,
429                        edge.dst
430                    );
431                    warn_count += 1;
432                }
433            }
434        }
435
436        if total_failures > 0 && total_failures > warn_count {
437            tracing::warn!(
438                "... and {} more edge insertion failures (total: {})",
439                total_failures - warn_count,
440                total_failures
441            );
442        }
443
444        Ok(())
445    }
446
447    /// Parse git log output into structured commits.
448    fn parse_git_log(&self, path: &str, days: u64) -> Result<Vec<ParsedCommit>, CodememError> {
449        let output = std::process::Command::new("git")
450            .args([
451                "-C",
452                path,
453                "log",
454                "--format=COMMIT:%H|%P|%an|%aI|%s",
455                "--name-only",
456                "--diff-filter=AMDRT",
457                &format!("--since={days} days ago"),
458            ])
459            .output()
460            .map_err(|e| CodememError::Internal(format!("Failed to run git: {e}")))?;
461
462        if !output.status.success() {
463            let stderr = String::from_utf8_lossy(&output.stderr);
464            return Err(CodememError::Internal(format!("git log failed: {stderr}")));
465        }
466
467        let stdout = String::from_utf8_lossy(&output.stdout);
468        let mut commits = Vec::new();
469
470        for block in stdout.split("COMMIT:").skip(1) {
471            let mut lines = block.lines();
472            if let Some(header) = lines.next() {
473                let parts: Vec<&str> = header.splitn(5, '|').collect();
474                if parts.len() >= 5 {
475                    let hash = parts[0].to_string();
476                    let short_hash = hash[..hash.len().min(7)].to_string();
477                    let parents: Vec<String> =
478                        parts[1].split_whitespace().map(|s| s.to_string()).collect();
479                    let author = parts[2].to_string();
480                    let date = match chrono::DateTime::parse_from_rfc3339(parts[3]) {
481                        Ok(dt) => dt.with_timezone(&chrono::Utc),
482                        Err(e) => {
483                            tracing::warn!(
484                                "Skipping commit {}: unparseable date {:?}: {e}",
485                                &parts[0][..parts[0].len().min(7)],
486                                parts[3]
487                            );
488                            continue;
489                        }
490                    };
491                    let subject = parts[4].to_string();
492                    let files: Vec<String> = lines
493                        .filter(|l| !l.trim().is_empty())
494                        .map(|l| l.trim().to_string())
495                        .collect();
496
497                    commits.push(ParsedCommit {
498                        hash,
499                        short_hash,
500                        parents,
501                        author,
502                        date,
503                        subject,
504                        files,
505                    });
506                }
507            }
508        }
509
510        Ok(commits)
511    }
512
513    /// Get symbol-level ModifiedBy edges for a single commit by running git diff.
514    fn commit_symbol_edges(&self, path: &str, commit: &ParsedCommit, namespace: &str) -> Vec<Edge> {
515        let mut edges = Vec::new();
516        let parent = commit
517            .parents
518            .first()
519            .map(|s| s.as_str())
520            .unwrap_or(EMPTY_TREE_SHA);
521
522        let diff_output = std::process::Command::new("git")
523            .args(["-C", path, "diff", parent, &commit.hash, "--unified=0"])
524            .output();
525
526        let diff_text = match diff_output {
527            Ok(o) if o.status.success() => String::from_utf8_lossy(&o.stdout).to_string(),
528            _ => return edges,
529        };
530
531        let hunks = parse_diff(&diff_text);
532        if hunks.is_empty() {
533            return edges;
534        }
535
536        // Build file→symbols map from graph
537        let graph = match self.lock_graph() {
538            Ok(g) => g,
539            Err(e) => {
540                tracing::warn!("Failed to lock graph for symbol-level diff: {e}");
541                return edges;
542            }
543        };
544        let all_nodes = graph.get_all_nodes();
545
546        let mut file_symbols: HashMap<&str, Vec<(&str, u32, u32)>> = HashMap::new();
547        for node in &all_nodes {
548            if matches!(
549                node.kind,
550                NodeKind::Function
551                    | NodeKind::Method
552                    | NodeKind::Class
553                    | NodeKind::Trait
554                    | NodeKind::Interface
555                    | NodeKind::Enum
556            ) {
557                if let (Some(fp), Some(ls), Some(le)) = (
558                    node.payload.get("file_path").and_then(|v| v.as_str()),
559                    node.payload
560                        .get("line_start")
561                        .and_then(|v| v.as_u64())
562                        .map(|v| v as u32),
563                    node.payload
564                        .get("line_end")
565                        .and_then(|v| v.as_u64())
566                        .map(|v| v as u32),
567                ) {
568                    if node.namespace.as_deref() == Some(namespace) || namespace.is_empty() {
569                        file_symbols.entry(fp).or_default().push((&node.id, ls, le));
570                    }
571                }
572            }
573        }
574        drop(graph);
575
576        let commit_id = format!("commit:{}", commit.hash);
577        let now = chrono::Utc::now();
578        let mut seen = HashSet::new();
579
580        for hunk in &hunks {
581            if let Some(symbols) = file_symbols.get(hunk.file_path.as_str()) {
582                let changed_lines: HashSet<u32> = hunk
583                    .added_lines
584                    .iter()
585                    .chain(hunk.removed_lines.iter())
586                    .copied()
587                    .collect();
588
589                for &(sym_id, line_start, line_end) in symbols {
590                    if changed_lines
591                        .iter()
592                        .any(|&l| l >= line_start && l <= line_end)
593                        && seen.insert(sym_id)
594                    {
595                        edges.push(Edge {
596                            id: format!("modby:{}:{}", sym_id, commit.hash),
597                            src: sym_id.to_string(),
598                            dst: commit_id.clone(),
599                            relationship: RelationshipType::ModifiedBy,
600                            weight: 0.4,
601                            properties: {
602                                let mut p = HashMap::new();
603                                p.insert("commit_date".into(), json!(commit.date.to_rfc3339()));
604                                p.insert("symbol_level".into(), json!(true));
605                                p
606                            },
607                            created_at: now,
608                            valid_from: Some(commit.date),
609                            valid_to: None,
610                        });
611                    }
612                }
613            }
614        }
615
616        edges
617    }
618
619    /// Set valid_to on symbols/files that were deleted in the given commits.
620    ///
621    /// Uses `git log --diff-filter=D` to find deleted files, then collects
622    /// expired nodes before writing — avoids holding the graph lock during
623    /// storage writes (deadlock risk).
624    pub(crate) fn expire_deleted_symbols(
625        &self,
626        path: &str,
627        commits: &[ParsedCommit],
628        namespace: &str,
629    ) -> Result<usize, CodememError> {
630        // Find deleted files from the already-parsed commits' time range
631        let since = commits
632            .last()
633            .map(|c| c.date.to_rfc3339())
634            .unwrap_or_else(|| "90 days ago".to_string());
635
636        let output = std::process::Command::new("git")
637            .args([
638                "-C",
639                path,
640                "log",
641                "--format=COMMIT:%H|%aI",
642                "--diff-filter=D",
643                "--name-only",
644                &format!("--since={since}"),
645            ])
646            .output()
647            .map_err(|e| CodememError::Internal(format!("Failed to run git: {e}")))?;
648
649        if !output.status.success() {
650            return Ok(0);
651        }
652
653        let stdout = String::from_utf8_lossy(&output.stdout);
654
655        // Parse deletion events: (date, set of deleted file paths)
656        let mut deletions: Vec<(chrono::DateTime<chrono::Utc>, HashSet<String>)> = Vec::new();
657        for block in stdout.split("COMMIT:").skip(1) {
658            let mut lines = block.lines();
659            let date = lines
660                .next()
661                .and_then(|h| {
662                    let parts: Vec<&str> = h.splitn(2, '|').collect();
663                    parts.get(1).and_then(|d| {
664                        chrono::DateTime::parse_from_rfc3339(d)
665                            .ok()
666                            .map(|dt| dt.with_timezone(&chrono::Utc))
667                    })
668                })
669                .unwrap_or_else(chrono::Utc::now);
670
671            let files: HashSet<String> = lines
672                .filter(|l| !l.trim().is_empty())
673                .map(|l| l.trim().to_string())
674                .collect();
675
676            if !files.is_empty() {
677                deletions.push((date, files));
678            }
679        }
680
681        if deletions.is_empty() {
682            return Ok(0);
683        }
684
685        // Filter out files that currently exist in the working tree
686        // (they were deleted then re-created, so should not be expired)
687        for (_date, deleted_files) in &mut deletions {
688            deleted_files.retain(|f| {
689                let full_path = std::path::Path::new(path).join(f);
690                !full_path.exists()
691            });
692        }
693        deletions.retain(|(_, files)| !files.is_empty());
694
695        if deletions.is_empty() {
696            return Ok(0);
697        }
698
699        // Phase 1: collect expired nodes under graph lock (read-only)
700        let expired_nodes: Vec<GraphNode> = {
701            let graph = self.lock_graph()?;
702            let all_nodes = graph.get_all_nodes();
703            let mut to_expire = Vec::new();
704
705            for (date, deleted_files) in &deletions {
706                for node in &all_nodes {
707                    if node.valid_to.is_some() {
708                        continue;
709                    }
710                    if !namespace.is_empty() && node.namespace.as_deref() != Some(namespace) {
711                        continue;
712                    }
713
714                    let should_expire = match node.kind {
715                        NodeKind::File => {
716                            let fp = node.id.strip_prefix("file:").unwrap_or(&node.id);
717                            deleted_files.contains(fp)
718                        }
719                        _ => node
720                            .payload
721                            .get("file_path")
722                            .and_then(|v| v.as_str())
723                            .is_some_and(|fp| deleted_files.contains(fp)),
724                    };
725
726                    if should_expire {
727                        let mut expired_node = node.clone();
728                        expired_node.valid_to = Some(*date);
729                        to_expire.push(expired_node);
730                    }
731                }
732            }
733            to_expire
734        };
735        // Graph lock dropped here
736
737        // Phase 2: write to storage and in-memory graph separately
738        let count = expired_nodes.len();
739        if !expired_nodes.is_empty() {
740            self.storage.insert_graph_nodes_batch(&expired_nodes)?;
741            let mut graph = self.lock_graph()?;
742            for node in expired_nodes {
743                let _ = graph.add_node(node);
744            }
745        }
746
747        Ok(count)
748    }
749
750    /// Get the last ingested commit hash for incremental processing.
751    fn get_last_ingested_commit(&self, namespace: &str) -> Option<String> {
752        let sentinel_id = format!("commit:_HEAD:{namespace}");
753        if let Ok(Some(node)) = self.storage.get_graph_node(&sentinel_id) {
754            node.payload
755                .get("hash")
756                .and_then(|v| v.as_str())
757                .map(|s| s.to_string())
758        } else {
759            None
760        }
761    }
762
763    /// Record the last ingested commit hash for incremental processing.
764    fn record_last_ingested_commit(&self, namespace: &str, hash: &str) {
765        let sentinel_id = format!("commit:_HEAD:{namespace}");
766        let node = GraphNode {
767            id: sentinel_id,
768            kind: NodeKind::Commit,
769            label: format!("_HEAD:{namespace}"),
770            payload: {
771                let mut p = HashMap::new();
772                p.insert("hash".into(), json!(hash));
773                p.insert("sentinel".into(), json!(true));
774                p
775            },
776            centrality: 0.0,
777            memory_id: None,
778            namespace: Some(namespace.to_string()),
779            valid_from: None,
780            valid_to: None,
781        };
782        let _ = self.storage.insert_graph_node(&node);
783    }
784}
785
786/// Separate real commits from bot/repetitive commits.
787/// Bot commits are grouped by (author, file pattern) key.
788fn compact_bot_commits(
789    commits: Vec<ParsedCommit>,
790) -> (Vec<ParsedCommit>, HashMap<String, Vec<ParsedCommit>>) {
791    let mut real = Vec::new();
792    let mut bot_groups: HashMap<String, Vec<ParsedCommit>> = HashMap::new();
793
794    for commit in commits {
795        if is_bot_commit(&commit.author, &commit.files) {
796            let key = format!(
797                "{}:{}",
798                commit.author,
799                commit
800                    .files
801                    .first()
802                    .map(|f| f.as_str())
803                    .unwrap_or("unknown")
804            );
805            bot_groups.entry(key).or_default().push(commit);
806        } else {
807            real.push(commit);
808        }
809    }
810
811    (real, bot_groups)
812}
813
814/// Detect PRs from commit patterns.
815fn detect_prs(commits: &[ParsedCommit]) -> Vec<DetectedPR> {
816    let mut prs = Vec::new();
817    let mut seen_prs: HashSet<String> = HashSet::new();
818
819    for commit in commits {
820        if let Some(pr_number) = extract_pr_number(&commit.subject) {
821            if seen_prs.contains(&pr_number) {
822                continue;
823            }
824            seen_prs.insert(pr_number.clone());
825
826            let is_merge = commit.parents.len() > 1;
827            let is_squash = commit.parents.len() == 1;
828
829            // For squash merges: single commit = single PR
830            // For merge commits: collect commits between this merge and the previous one
831            let commit_hashes = if is_squash {
832                vec![commit.hash.clone()]
833            } else if is_merge && commit.parents.len() == 2 {
834                // The second parent is the branch head; commits between
835                // first parent and this merge are the PR's commits.
836                // For simplicity, just reference the merge commit itself.
837                vec![commit.hash.clone()]
838            } else {
839                vec![commit.hash.clone()]
840            };
841
842            prs.push(DetectedPR {
843                number: pr_number,
844                commits: commit_hashes,
845                squash: is_squash,
846                merged_at: commit.date,
847                title: commit.subject.clone(),
848                author: commit.author.clone(),
849            });
850        }
851    }
852
853    prs
854}
855
856#[cfg(test)]
857mod tests {
858    use super::*;
859
860    #[test]
861    fn extract_pr_number_squash() {
862        assert_eq!(
863            extract_pr_number("feat: add foo (#123)"),
864            Some("123".to_string())
865        );
866        assert_eq!(
867            extract_pr_number("fix: something (#42)"),
868            Some("42".to_string())
869        );
870    }
871
872    #[test]
873    fn extract_pr_number_merge() {
874        assert_eq!(
875            extract_pr_number("Merge pull request #456 from org/branch"),
876            Some("456".to_string())
877        );
878    }
879
880    #[test]
881    fn extract_pr_number_none() {
882        assert_eq!(extract_pr_number("chore: update deps"), None);
883        assert_eq!(extract_pr_number("fix bug in #parser"), None);
884    }
885
886    #[test]
887    fn bot_detection() {
888        assert!(is_bot_commit("dependabot[bot]", &[]));
889        assert!(is_bot_commit("renovate", &[]));
890        assert!(is_bot_commit("some-user", &["Cargo.lock".to_string()]));
891        assert!(is_bot_commit(
892            "some-user",
893            &["package-lock.json".to_string()]
894        ));
895        assert!(!is_bot_commit("some-user", &["src/main.rs".to_string()]));
896    }
897
898    #[test]
899    fn compact_separates_bots() {
900        let commits = vec![
901            ParsedCommit {
902                hash: "aaa".into(),
903                short_hash: "aaa".into(),
904                parents: vec![],
905                author: "dev".into(),
906                date: chrono::Utc::now(),
907                subject: "feat: real work".into(),
908                files: vec!["src/main.rs".into()],
909            },
910            ParsedCommit {
911                hash: "bbb".into(),
912                short_hash: "bbb".into(),
913                parents: vec![],
914                author: "dependabot[bot]".into(),
915                date: chrono::Utc::now(),
916                subject: "chore: bump deps".into(),
917                files: vec!["Cargo.lock".into()],
918            },
919        ];
920        let (real, bots) = compact_bot_commits(commits);
921        assert_eq!(real.len(), 1);
922        assert_eq!(real[0].hash, "aaa");
923        assert_eq!(bots.len(), 1);
924    }
925
926    #[test]
927    fn detect_prs_from_squash() {
928        let commits = vec![
929            ParsedCommit {
930                hash: "abc123".into(),
931                short_hash: "abc123".into(),
932                parents: vec!["def456".into()],
933                author: "dev".into(),
934                date: chrono::Utc::now(),
935                subject: "feat: add feature (#10)".into(),
936                files: vec!["src/lib.rs".into()],
937            },
938            ParsedCommit {
939                hash: "xyz789".into(),
940                short_hash: "xyz789".into(),
941                parents: vec!["abc123".into()],
942                author: "dev".into(),
943                date: chrono::Utc::now(),
944                subject: "fix: plain commit".into(),
945                files: vec!["src/main.rs".into()],
946            },
947        ];
948        let prs = detect_prs(&commits);
949        assert_eq!(prs.len(), 1);
950        assert_eq!(prs[0].number, "10");
951        assert!(prs[0].squash);
952        assert_eq!(prs[0].commits, vec!["abc123"]);
953    }
954}