1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
// DS-5: WRITE_LOCK guard is held across .await inside block_on().
// This is safe — block_on runs single-threaded, no concurrent tasks can deadlock.
#![allow(clippy::await_holding_lock)]
//! Call graph upsert, delete, batch operations, and basic stats.
use std::path::Path;
use super::CallStats;
use crate::store::helpers::StoreError;
use crate::store::Store;
impl Store {
/// Insert or replace call sites for a chunk
pub fn upsert_calls(
&self,
chunk_id: &str,
calls: &[crate::parser::CallSite],
) -> Result<(), StoreError> {
let _span = tracing::info_span!("upsert_calls", count = calls.len()).entered();
tracing::trace!(chunk_id, call_count = calls.len(), "upserting chunk calls");
self.rt.block_on(async {
let (_guard, mut tx) = self.begin_write().await?;
sqlx::query("DELETE FROM calls WHERE caller_id = ?1")
.bind(chunk_id)
.execute(&mut *tx)
.await?;
// Batch insert calls (300 rows * 3 binds = 900 < SQLite's 999 limit)
if !calls.is_empty() {
const INSERT_BATCH: usize = 300;
for batch in calls.chunks(INSERT_BATCH) {
let mut query_builder: sqlx::QueryBuilder<sqlx::Sqlite> =
sqlx::QueryBuilder::new(
"INSERT INTO calls (caller_id, callee_name, line_number) ",
);
query_builder.push_values(batch.iter(), |mut b, call| {
b.push_bind(chunk_id)
.push_bind(&call.callee_name)
.push_bind(call.line_number as i64);
});
query_builder.build().execute(&mut *tx).await?;
}
tracing::debug!(chunk_id, call_count = calls.len(), "Inserted chunk calls");
}
tx.commit().await?;
Ok(())
})
}
/// Insert call sites for multiple chunks in a single transaction.
/// Takes `(chunk_id, CallSite)` pairs and batches them into one transaction.
pub fn upsert_calls_batch(
&self,
calls: &[(String, crate::parser::CallSite)],
) -> Result<(), StoreError> {
let _span = tracing::info_span!("upsert_calls_batch", count = calls.len()).entered();
if calls.is_empty() {
return Ok(());
}
tracing::trace!(call_count = calls.len(), "upserting calls batch");
self.rt.block_on(async {
let (_guard, mut tx) = self.begin_write().await?;
// Collect unique chunk IDs to delete old calls
let mut seen_ids = std::collections::HashSet::new();
for (chunk_id, _) in calls {
if seen_ids.insert(chunk_id.as_str()) {
sqlx::query("DELETE FROM calls WHERE caller_id = ?1")
.bind(chunk_id)
.execute(&mut *tx)
.await?;
}
}
// Batch insert all calls (300 rows * 3 binds = 900 < SQLite's 999 limit)
const INSERT_BATCH: usize = 300;
for batch in calls.chunks(INSERT_BATCH) {
let mut query_builder: sqlx::QueryBuilder<sqlx::Sqlite> = sqlx::QueryBuilder::new(
"INSERT INTO calls (caller_id, callee_name, line_number) ",
);
query_builder.push_values(batch.iter(), |mut b, (chunk_id, call)| {
b.push_bind(chunk_id)
.push_bind(&call.callee_name)
.push_bind(call.line_number as i64);
});
query_builder.build().execute(&mut *tx).await?;
}
tx.commit().await?;
Ok(())
})
}
/// Check which chunk IDs from a set actually exist in the database.
/// Used by periodic deferred-flush to filter calls whose FK targets are present.
pub fn existing_chunk_ids(
&self,
ids: &std::collections::HashSet<&str>,
) -> Result<std::collections::HashSet<String>, StoreError> {
let _span = tracing::debug_span!("existing_chunk_ids", candidates = ids.len()).entered();
if ids.is_empty() {
return Ok(std::collections::HashSet::new());
}
self.rt.block_on(async {
let mut found = std::collections::HashSet::new();
let id_vec: Vec<&str> = ids.iter().copied().collect();
// 200 per batch keeps bind count well under SQLite's 999 limit
for batch in id_vec.chunks(200) {
let placeholders: String = (0..batch.len())
.map(|i| format!("?{}", i + 1))
.collect::<Vec<_>>()
.join(",");
let sql = format!("SELECT id FROM chunks WHERE id IN ({placeholders})");
let mut query = sqlx::query_scalar::<_, String>(&sql);
for id in batch {
query = query.bind(*id);
}
let rows: Vec<String> = query.fetch_all(&self.pool).await?;
found.extend(rows);
}
Ok(found)
})
}
/// Get all function names called by a given chunk.
/// Takes a chunk **ID** (unique) rather than a name. Returns only callee
/// **names** (not full chunks) because:
/// - Callees may not exist in the index (external functions)
/// - Callers typically chain: `get_callees` → `get_callers_full` for graph traversal
/// For richer callee data, see [`get_callers_with_context`].
pub fn get_callees(&self, chunk_id: &str) -> Result<Vec<String>, StoreError> {
let _span = tracing::debug_span!("get_callees", chunk_id = %chunk_id).entered();
self.rt.block_on(async {
let rows: Vec<(String,)> = sqlx::query_as(
"SELECT DISTINCT callee_name FROM calls WHERE caller_id = ?1 ORDER BY line_number",
)
.bind(chunk_id)
.fetch_all(&self.pool)
.await?;
Ok(rows.into_iter().map(|(s,)| s).collect())
})
}
/// Retrieves aggregated statistics about function calls from the database.
/// Queries the calls table to obtain the total number of calls and the count of distinct callees, returning this information as a CallStats structure.
/// # Arguments
/// * `&self` - A reference to the store instance containing the database connection pool and async runtime.
/// # Returns
/// Returns a `Result` containing:
/// * `Ok(CallStats)` - A struct with `total_calls` (total number of recorded calls) and `unique_callees` (number of distinct functions called).
/// * `Err(StoreError)` - If the database query fails.
/// # Errors
/// Returns `StoreError` if the SQL query execution fails or if database connectivity issues occur.
pub fn call_stats(&self) -> Result<CallStats, StoreError> {
let _span = tracing::debug_span!("call_stats").entered();
self.rt.block_on(async {
let (total_calls, unique_callees): (i64, i64) =
sqlx::query_as("SELECT COUNT(*), COUNT(DISTINCT callee_name) FROM calls")
.fetch_one(&self.pool)
.await?;
Ok(CallStats {
total_calls: total_calls as u64,
unique_callees: unique_callees as u64,
})
})
}
// ============ Full Call Graph Methods (v5) ============
/// Insert function calls for a file (full call graph, no size limits)
pub fn upsert_function_calls(
&self,
file: &Path,
function_calls: &[crate::parser::FunctionCalls],
) -> Result<(), StoreError> {
let _span =
tracing::info_span!("upsert_function_calls", count = function_calls.len()).entered();
let file_str = crate::normalize_path(file);
let total_calls: usize = function_calls.iter().map(|fc| fc.calls.len()).sum();
tracing::trace!(
file = %file_str,
functions = function_calls.len(),
total_calls,
"upserting function calls"
);
self.rt.block_on(async {
let (_guard, mut tx) = self.begin_write().await?;
sqlx::query("DELETE FROM function_calls WHERE file = ?1")
.bind(&file_str)
.execute(&mut *tx)
.await?;
// Flatten all calls and batch insert (instead of N individual inserts)
let all_calls: Vec<_> = function_calls
.iter()
.flat_map(|fc| {
fc.calls.iter().map(move |call| {
(&fc.name, fc.line_start, &call.callee_name, call.line_number)
})
})
.collect();
if !all_calls.is_empty() {
// 190 rows * 5 binds = 950 < SQLite's 999 limit
const INSERT_BATCH: usize = 190;
for batch in all_calls.chunks(INSERT_BATCH) {
let mut query_builder: sqlx::QueryBuilder<sqlx::Sqlite> =
sqlx::QueryBuilder::new(
"INSERT INTO function_calls (file, caller_name, caller_line, callee_name, call_line) ",
);
query_builder.push_values(batch.iter(), |mut b, (caller_name, caller_line, callee_name, call_line)| {
b.push_bind(&file_str)
.push_bind(*caller_name)
.push_bind(*caller_line as i64)
.push_bind(*callee_name)
.push_bind(*call_line as i64);
});
query_builder.build().execute(&mut *tx).await?;
}
tracing::info!(
file = %file_str,
functions = function_calls.len(),
calls = all_calls.len(),
"Indexed function calls"
);
}
tx.commit().await?;
Ok(())
})
}
}