use libsql::{Builder, Connection, Database, params};
use std::future::Future;
use std::path::Path;
use tokio::runtime::{Handle, Runtime};
pub struct FindingsCache {
conn: Connection,
#[allow(dead_code)]
db: Database,
runtime: Option<Runtime>,
}
impl FindingsCache {
fn block_on<F: Future + Send>(&self, fut: F) -> F::Output
where
F::Output: Send,
{
block_on_helper(&self.runtime, fut)
}
}
fn block_on_helper<F: Future + Send>(runtime: &Option<Runtime>, fut: F) -> F::Output
where
F::Output: Send,
{
if let Ok(handle) = Handle::try_current() {
return match handle.runtime_flavor() {
tokio::runtime::RuntimeFlavor::MultiThread => {
tokio::task::block_in_place(|| handle.block_on(fut))
}
_ => spawn_scoped_runtime(fut),
};
}
if let Some(rt) = runtime {
return rt.block_on(fut);
}
spawn_scoped_runtime(fut)
}
fn spawn_scoped_runtime<F: Future + Send>(fut: F) -> F::Output
where
F::Output: Send,
{
std::thread::scope(|s| {
s.spawn(|| {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("failed to build tokio runtime worker thread");
rt.block_on(fut)
})
.join()
.expect("libsql worker thread panicked")
})
}
impl FindingsCache {
pub fn open(project_root: &Path) -> Self {
let dir = project_root.join(".normalize");
let _ = std::fs::create_dir_all(&dir);
let db_path = dir.join("findings-cache.sqlite");
let runtime: Option<Runtime> = if Handle::try_current().is_ok() {
None
} else {
Some(
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("failed to build tokio runtime for findings cache"),
)
};
let init = async {
let db = match Builder::new_local(&db_path).build().await {
Ok(db) => db,
Err(_) => Builder::new_local(":memory:")
.build()
.await
.expect("failed to open in-memory libsql database"),
};
let conn = db.connect().expect("failed to connect to libsql database");
let _ = conn
.execute_batch(
"PRAGMA journal_mode=WAL;
PRAGMA synchronous=NORMAL;
CREATE TABLE IF NOT EXISTS findings_cache (
path TEXT NOT NULL,
engine TEXT NOT NULL,
mtime_nanos INTEGER NOT NULL,
config_hash TEXT NOT NULL,
findings_json TEXT NOT NULL,
PRIMARY KEY (path, engine)
);",
)
.await;
(db, conn)
};
let (db, conn) = block_on_helper(&runtime, init);
Self { conn, db, runtime }
}
pub fn get(
&self,
path: &str,
mtime_nanos: u64,
config_hash: &str,
engine: &str,
) -> Option<String> {
let conn = &self.conn;
self.block_on(async {
let mut rows = conn
.query(
"SELECT findings_json FROM findings_cache
WHERE path = ?1 AND engine = ?2 AND mtime_nanos = ?3 AND config_hash = ?4",
params![path, engine, mtime_nanos as i64, config_hash],
)
.await
.ok()?;
let row = rows.next().await.ok()??;
row.get::<String>(0).ok()
})
}
pub fn put(
&self,
path: &str,
mtime_nanos: u64,
config_hash: &str,
engine: &str,
findings_json: &str,
) {
let conn = &self.conn;
let _ = self.block_on(async {
conn.execute(
"INSERT OR REPLACE INTO findings_cache (path, engine, mtime_nanos, config_hash, findings_json)
VALUES (?1, ?2, ?3, ?4, ?5)",
params![path, engine, mtime_nanos as i64, config_hash, findings_json],
)
.await
});
}
pub fn begin(&self) {
let conn = &self.conn;
let _ = self.block_on(async { conn.execute_batch("BEGIN;").await });
}
pub fn commit(&self) {
let conn = &self.conn;
let _ = self.block_on(async { conn.execute_batch("COMMIT;").await });
}
pub fn flush(&self) {}
}
pub fn file_mtime_nanos(path: &Path) -> u64 {
path.metadata()
.and_then(|m| m.modified())
.map(|t| {
t.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_nanos() as u64)
.unwrap_or(0)
})
.unwrap_or(0)
}
pub trait FileRule: Send + Sync {
type Finding: serde::Serialize + serde::de::DeserializeOwned + Send;
fn engine_name(&self) -> &str;
fn config_hash(&self) -> String;
fn check_file(&self, path: &Path, root: &Path) -> Vec<Self::Finding>;
fn to_diagnostics(
&self,
findings: Vec<(std::path::PathBuf, Vec<Self::Finding>)>,
root: &Path,
files_checked: usize,
) -> normalize_output::diagnostics::DiagnosticsReport;
}
pub fn run_file_rule<R: FileRule>(
rule: &R,
root: &Path,
explicit_files: Option<&[std::path::PathBuf]>,
walk_config: &normalize_rules_config::WalkConfig,
) -> normalize_output::diagnostics::DiagnosticsReport {
let files: Vec<std::path::PathBuf> = if let Some(ef) = explicit_files {
ef.iter()
.filter(|p| p.is_file())
.filter(|p| normalize_languages::support_for_path(p).is_some())
.cloned()
.collect()
} else {
super::walk::gitignore_walk(root, walk_config)
.filter(|e| e.path().is_file())
.filter(|e| normalize_languages::support_for_path(e.path()).is_some())
.map(|e| e.path().to_path_buf())
.collect()
};
let files_checked = files.len();
let cache = FindingsCache::open(root);
let config_hash = rule.config_hash();
let engine = rule.engine_name();
let mut cached_findings: Vec<(std::path::PathBuf, Vec<R::Finding>)> = Vec::new();
let mut cache_misses: Vec<std::path::PathBuf> = Vec::new();
for file in &files {
let path_key = file.to_string_lossy().to_string();
let mtime = file_mtime_nanos(file);
if mtime > 0
&& let Some(json) = cache.get(&path_key, mtime, &config_hash, engine)
&& let Ok(findings) = serde_json::from_str::<Vec<R::Finding>>(&json)
{
cached_findings.push((file.clone(), findings));
continue;
}
cache_misses.push(file.clone());
}
use rayon::prelude::*;
let fresh_findings: Vec<(std::path::PathBuf, Vec<R::Finding>)> = cache_misses
.par_iter()
.map(|path| {
let findings = rule.check_file(path, root);
(path.clone(), findings)
})
.collect();
cache.begin();
for (path, findings) in &fresh_findings {
let path_key = path.to_string_lossy().to_string();
let mtime = file_mtime_nanos(path);
if mtime > 0
&& let Ok(json) = serde_json::to_string(findings)
{
cache.put(&path_key, mtime, &config_hash, engine, &json);
}
}
cache.commit();
let mut all_findings: Vec<(std::path::PathBuf, Vec<R::Finding>)> = cached_findings;
all_findings.extend(fresh_findings);
rule.to_diagnostics(all_findings, root, files_checked)
}