#![allow(clippy::await_holding_lock)]
use std::path::Path;
use super::CallStats;
use crate::store::helpers::StoreError;
use crate::store::Store;
impl Store {
pub fn upsert_calls(
&self,
chunk_id: &str,
calls: &[crate::parser::CallSite],
) -> Result<(), StoreError> {
let _span = tracing::info_span!("upsert_calls", count = calls.len()).entered();
tracing::trace!(chunk_id, call_count = calls.len(), "upserting chunk calls");
self.rt.block_on(async {
let (_guard, mut tx) = self.begin_write().await?;
sqlx::query("DELETE FROM calls WHERE caller_id = ?1")
.bind(chunk_id)
.execute(&mut *tx)
.await?;
if !calls.is_empty() {
use crate::store::helpers::sql::max_rows_per_statement;
const INSERT_BATCH: usize = max_rows_per_statement(3);
for batch in calls.chunks(INSERT_BATCH) {
let mut query_builder: sqlx::QueryBuilder<sqlx::Sqlite> =
sqlx::QueryBuilder::new(
"INSERT INTO calls (caller_id, callee_name, line_number) ",
);
query_builder.push_values(batch.iter(), |mut b, call| {
b.push_bind(chunk_id)
.push_bind(&call.callee_name)
.push_bind(call.line_number as i64);
});
query_builder.build().execute(&mut *tx).await?;
}
tracing::debug!(chunk_id, call_count = calls.len(), "Inserted chunk calls");
}
tx.commit().await?;
Ok(())
})
}
pub fn upsert_calls_batch(
&self,
calls: &[(String, crate::parser::CallSite)],
) -> Result<(), StoreError> {
let _span = tracing::info_span!("upsert_calls_batch", count = calls.len()).entered();
if calls.is_empty() {
return Ok(());
}
tracing::trace!(call_count = calls.len(), "upserting calls batch");
self.rt.block_on(async {
let (_guard, mut tx) = self.begin_write().await?;
let mut seen_ids = std::collections::HashSet::new();
for (chunk_id, _) in calls {
if seen_ids.insert(chunk_id.as_str()) {
sqlx::query("DELETE FROM calls WHERE caller_id = ?1")
.bind(chunk_id)
.execute(&mut *tx)
.await?;
}
}
use crate::store::helpers::sql::max_rows_per_statement;
const INSERT_BATCH: usize = max_rows_per_statement(3);
for batch in calls.chunks(INSERT_BATCH) {
let mut query_builder: sqlx::QueryBuilder<sqlx::Sqlite> = sqlx::QueryBuilder::new(
"INSERT INTO calls (caller_id, callee_name, line_number) ",
);
query_builder.push_values(batch.iter(), |mut b, (chunk_id, call)| {
b.push_bind(chunk_id)
.push_bind(&call.callee_name)
.push_bind(call.line_number as i64);
});
query_builder.build().execute(&mut *tx).await?;
}
tx.commit().await?;
Ok(())
})
}
pub fn existing_chunk_ids(
&self,
ids: &std::collections::HashSet<&str>,
) -> Result<std::collections::HashSet<String>, StoreError> {
let _span = tracing::debug_span!("existing_chunk_ids", candidates = ids.len()).entered();
if ids.is_empty() {
return Ok(std::collections::HashSet::new());
}
self.rt.block_on(async {
let mut found = std::collections::HashSet::new();
let id_vec: Vec<&str> = ids.iter().copied().collect();
use crate::store::helpers::sql::max_rows_per_statement;
for batch in id_vec.chunks(max_rows_per_statement(1)) {
let placeholders: String = (0..batch.len())
.map(|i| format!("?{}", i + 1))
.collect::<Vec<_>>()
.join(",");
let sql = format!("SELECT id FROM chunks WHERE id IN ({placeholders})");
let mut query = sqlx::query_scalar::<_, String>(&sql);
for id in batch {
query = query.bind(*id);
}
let rows: Vec<String> = query.fetch_all(&self.pool).await?;
found.extend(rows);
}
Ok(found)
})
}
pub fn get_callees(&self, chunk_id: &str) -> Result<Vec<String>, StoreError> {
let _span = tracing::debug_span!("get_callees", chunk_id = %chunk_id).entered();
self.rt.block_on(async {
let rows: Vec<(String,)> = sqlx::query_as(
"SELECT DISTINCT callee_name FROM calls WHERE caller_id = ?1 ORDER BY line_number",
)
.bind(chunk_id)
.fetch_all(&self.pool)
.await?;
Ok(rows.into_iter().map(|(s,)| s).collect())
})
}
pub fn call_stats(&self) -> Result<CallStats, StoreError> {
let _span = tracing::debug_span!("call_stats").entered();
self.rt.block_on(async {
let (total_calls, unique_callees): (i64, i64) =
sqlx::query_as("SELECT COUNT(*), COUNT(DISTINCT callee_name) FROM calls")
.fetch_one(&self.pool)
.await?;
Ok(CallStats {
total_calls: total_calls as u64,
unique_callees: unique_callees as u64,
})
})
}
pub fn upsert_function_calls(
&self,
file: &Path,
function_calls: &[crate::parser::FunctionCalls],
) -> Result<(), StoreError> {
let _span =
tracing::info_span!("upsert_function_calls", count = function_calls.len()).entered();
let file_str = crate::normalize_path(file);
let total_calls: usize = function_calls.iter().map(|fc| fc.calls.len()).sum();
tracing::trace!(
file = %file_str,
functions = function_calls.len(),
total_calls,
"upserting function calls"
);
self.rt.block_on(async {
let (_guard, mut tx) = self.begin_write().await?;
sqlx::query("DELETE FROM function_calls WHERE file = ?1")
.bind(&file_str)
.execute(&mut *tx)
.await?;
let all_calls: Vec<_> = function_calls
.iter()
.flat_map(|fc| {
fc.calls.iter().map(move |call| {
(&fc.name, fc.line_start, &call.callee_name, call.line_number)
})
})
.collect();
if !all_calls.is_empty() {
use crate::store::helpers::sql::max_rows_per_statement;
const INSERT_BATCH: usize = max_rows_per_statement(5);
for batch in all_calls.chunks(INSERT_BATCH) {
let mut query_builder: sqlx::QueryBuilder<sqlx::Sqlite> =
sqlx::QueryBuilder::new(
"INSERT INTO function_calls (file, caller_name, caller_line, callee_name, call_line) ",
);
query_builder.push_values(batch.iter(), |mut b, (caller_name, caller_line, callee_name, call_line)| {
b.push_bind(&file_str)
.push_bind(*caller_name)
.push_bind(*caller_line as i64)
.push_bind(*callee_name)
.push_bind(*call_line as i64);
});
query_builder.build().execute(&mut *tx).await?;
}
tracing::info!(
file = %file_str,
functions = function_calls.len(),
calls = all_calls.len(),
"Indexed function calls"
);
}
tx.commit().await?;
Ok(())
})
}
}