1use crate::error::{KnowledgeResult, Reason};
2use orion_error::UvsFrom;
3use orion_error::compat_traits::ErrorOweBase;
4use rusqlite::Params;
5use std::collections::hash_map::DefaultHasher;
6use std::future::Future;
7use std::hash::{Hash, Hasher};
8use std::num::NonZeroUsize;
9use std::sync::{Arc, RwLock};
10use wp_log::debug_kdb;
11use wp_model_core::model::{self, DataField};
12
13use lazy_static::lazy_static;
14use lru::LruCache;
15
16use crate::loader::ProviderKind;
17use crate::mem::RowData;
18use crate::runtime::{DatasourceId, Generation, MetadataCacheScope, runtime};
19use crate::telemetry::{
20 CacheLayer, CacheOutcome, CacheTelemetryEvent, telemetry, telemetry_enabled,
21};
22
23#[derive(Debug, Clone, PartialEq, Eq, Hash)]
24pub struct MetadataCacheKey {
25 pub datasource_id: DatasourceId,
26 pub generation: Generation,
27 pub query_hash: u64,
28}
29
30lazy_static! {
31 pub static ref COLNAME_CACHE: RwLock<LruCache<MetadataCacheKey, Arc<Vec<String>>>> =
33 RwLock::new(LruCache::new(
34 NonZeroUsize::new(512).expect("non-zero metadata cache size")
35 ));
36}
37
38pub fn column_metadata_cache_snapshot() -> (usize, usize) {
39 COLNAME_CACHE
40 .read()
41 .map(|cache| (cache.len(), cache.cap().get()))
42 .unwrap_or((0, 0))
43}
44
45fn stable_hash(value: &str) -> u64 {
46 let mut hasher = DefaultHasher::new();
47 value.hash(&mut hasher);
48 hasher.finish()
49}
50
51#[cfg(test)]
52pub(crate) fn metadata_cache_key_for_current_scope(sql: &str) -> MetadataCacheKey {
53 let scope = runtime().current_metadata_scope();
54 metadata_cache_key_for_scope(&scope, sql)
55}
56
57pub(crate) fn metadata_cache_key_for_scope(
58 scope: &MetadataCacheScope,
59 sql: &str,
60) -> MetadataCacheKey {
61 MetadataCacheKey {
62 datasource_id: scope.datasource_id.clone(),
63 generation: scope.generation,
64 query_hash: stable_hash(sql),
65 }
66}
67
68pub(crate) fn metadata_cache_get_or_try_init<F>(sql: &str, load: F) -> KnowledgeResult<Vec<String>>
69where
70 F: FnOnce() -> KnowledgeResult<Option<Vec<String>>>,
71{
72 let scope = runtime().current_metadata_scope();
73 let provider_kind = runtime().current_provider_kind();
74 metadata_cache_get_or_try_init_for_scope(&scope, provider_kind, sql, load)
75}
76
77pub(crate) fn metadata_cache_get_or_try_init_for_scope<F>(
78 scope: &MetadataCacheScope,
79 provider_kind: Option<ProviderKind>,
80 sql: &str,
81 load: F,
82) -> KnowledgeResult<Vec<String>>
83where
84 F: FnOnce() -> KnowledgeResult<Option<Vec<String>>>,
85{
86 let cache_key = metadata_cache_key_for_scope(scope, sql);
87 if let Some(names) = COLNAME_CACHE
88 .read()
89 .ok()
90 .and_then(|m| m.peek(&cache_key).cloned())
91 {
92 runtime().record_metadata_cache_hit();
93 if telemetry_enabled() {
94 telemetry().on_cache(&CacheTelemetryEvent {
95 layer: CacheLayer::Metadata,
96 outcome: CacheOutcome::Hit,
97 provider_kind: provider_kind.clone(),
98 });
99 }
100 debug_kdb!(
101 "[kdb] metadata cache hit datasource_id={} generation={}",
102 cache_key.datasource_id.0,
103 cache_key.generation.0
104 );
105 return Ok((*names).clone());
106 }
107
108 runtime().record_metadata_cache_miss();
109 if telemetry_enabled() {
110 telemetry().on_cache(&CacheTelemetryEvent {
111 layer: CacheLayer::Metadata,
112 outcome: CacheOutcome::Miss,
113 provider_kind,
114 });
115 }
116 debug_kdb!(
117 "[kdb] metadata cache miss datasource_id={} generation={}",
118 cache_key.datasource_id.0,
119 cache_key.generation.0
120 );
121
122 let Some(names) = load()? else {
123 return Ok(Vec::new());
124 };
125 if let Ok(mut m) = COLNAME_CACHE.write() {
126 m.put(cache_key, Arc::new(names.clone()));
127 }
128 Ok(names)
129}
130
131pub(crate) async fn metadata_cache_get_or_try_init_async_for_scope<F, Fut>(
132 scope: &MetadataCacheScope,
133 provider_kind: Option<ProviderKind>,
134 sql: &str,
135 load: F,
136) -> KnowledgeResult<Vec<String>>
137where
138 F: FnOnce() -> Fut,
139 Fut: Future<Output = KnowledgeResult<Option<Vec<String>>>>,
140{
141 metadata_cache_get_or_try_init_async_for_scope_typed(scope, provider_kind, sql, load).await
142}
143
144pub(crate) async fn metadata_cache_get_or_try_init_async_for_scope_typed<F, Fut, E>(
145 scope: &MetadataCacheScope,
146 provider_kind: Option<ProviderKind>,
147 sql: &str,
148 load: F,
149) -> Result<Vec<String>, E>
150where
151 F: FnOnce() -> Fut,
152 Fut: Future<Output = Result<Option<Vec<String>>, E>>,
153{
154 let cache_key = metadata_cache_key_for_scope(scope, sql);
155 if let Some(names) = COLNAME_CACHE
156 .read()
157 .ok()
158 .and_then(|m| m.peek(&cache_key).cloned())
159 {
160 runtime().record_metadata_cache_hit();
161 if telemetry_enabled() {
162 telemetry().on_cache(&CacheTelemetryEvent {
163 layer: CacheLayer::Metadata,
164 outcome: CacheOutcome::Hit,
165 provider_kind: provider_kind.clone(),
166 });
167 }
168 return Ok((*names).clone());
169 }
170
171 runtime().record_metadata_cache_miss();
172 if telemetry_enabled() {
173 telemetry().on_cache(&CacheTelemetryEvent {
174 layer: CacheLayer::Metadata,
175 outcome: CacheOutcome::Miss,
176 provider_kind,
177 });
178 }
179
180 let Some(names) = load().await? else {
181 return Ok(Vec::new());
182 };
183 if let Ok(mut m) = COLNAME_CACHE.write() {
184 m.put(cache_key, Arc::new(names.clone()));
185 }
186 Ok(names)
187}
188
189fn map_row(row: &rusqlite::Row<'_>, col_names: &[String]) -> KnowledgeResult<RowData> {
191 let mut result = Vec::with_capacity(col_names.len());
192 for (i, col_name) in col_names.iter().enumerate() {
193 let value = row.get_ref(i).owe(Reason::from_rule())?;
194 let field = match value {
195 rusqlite::types::ValueRef::Null => {
196 DataField::new(model::DataType::default(), col_name, model::Value::Null)
197 }
198 rusqlite::types::ValueRef::Integer(v) => DataField::from_digit(col_name, v),
199 rusqlite::types::ValueRef::Real(v) => DataField::from_float(col_name, v),
200 rusqlite::types::ValueRef::Text(v) => DataField::from_chars(
201 col_name,
202 String::from_utf8(v.to_vec()).owe(Reason::from_rule())?,
203 ),
204 rusqlite::types::ValueRef::Blob(v) => {
205 DataField::from_chars(col_name, String::from_utf8_lossy(v).to_string())
206 }
207 };
208 result.push(field);
209 }
210 Ok(result)
211}
212
213fn extract_col_names(stmt: &rusqlite::Statement<'_>) -> Vec<String> {
215 let col_cnt = stmt.column_count();
216 debug_kdb!("[memdb] col_cnt={}", col_cnt);
217 let mut col_names = Vec::with_capacity(col_cnt);
218 for i in 0..col_cnt {
219 let name = stmt.column_name(i).unwrap_or("").to_string();
220 debug_kdb!("[memdb] col[{}] name='{}'", i, name);
221 col_names.push(name);
222 }
223 col_names
224}
225
226fn extract_col_names_cached(
228 stmt: &rusqlite::Statement<'_>,
229 sql: &str,
230) -> KnowledgeResult<Vec<String>> {
231 metadata_cache_get_or_try_init(sql, || {
232 let col_cnt = stmt.column_count();
233 let mut names = Vec::with_capacity(col_cnt);
234 for i in 0..col_cnt {
235 names.push(stmt.column_name(i).owe(Reason::from_rule())?.to_string());
236 }
237 Ok(Some(names))
238 })
239}
240
241fn extract_col_names_cached_with_scope(
242 stmt: &rusqlite::Statement<'_>,
243 scope: &MetadataCacheScope,
244 provider_kind: Option<ProviderKind>,
245 sql: &str,
246) -> KnowledgeResult<Vec<String>> {
247 metadata_cache_get_or_try_init_for_scope(scope, provider_kind, sql, || {
248 let col_cnt = stmt.column_count();
249 let mut names = Vec::with_capacity(col_cnt);
250 for i in 0..col_cnt {
251 names.push(stmt.column_name(i).owe(Reason::from_rule())?.to_string());
252 }
253 Ok(Some(names))
254 })
255}
256
257pub fn query<P: Params>(
258 conn: &rusqlite::Connection,
259 sql: &str,
260 params: P,
261) -> KnowledgeResult<Vec<RowData>> {
262 let mut stmt = conn.prepare_cached(sql).owe(Reason::from_rule())?;
263 let col_names = extract_col_names(&stmt);
264 let mut rows = stmt.query(params).owe(Reason::from_rule())?;
265 let mut all_result = Vec::new();
266 while let Some(row) = rows.next().owe(Reason::from_rule())? {
267 all_result.push(map_row(row, &col_names)?);
268 }
269 Ok(all_result)
270}
271
272pub fn query_first_row<P: Params>(
274 conn: &rusqlite::Connection,
275 sql: &str,
276 params: P,
277) -> KnowledgeResult<RowData> {
278 let mut stmt = conn.prepare_cached(sql).owe(Reason::from_rule())?;
279 let col_names = extract_col_names(&stmt);
280 let mut rows = stmt.query(params).owe(Reason::from_rule())?;
281 if let Some(row) = rows.next().owe(Reason::from_rule())? {
282 map_row(row, &col_names)
283 } else {
284 debug_kdb!("[memdb] no row for sql");
285 Ok(Vec::new())
286 }
287}
288
289pub fn query_cached<P: Params>(
290 conn: &rusqlite::Connection,
291 sql: &str,
292 params: P,
293) -> KnowledgeResult<Vec<RowData>> {
294 let mut stmt = conn.prepare_cached(sql).owe(Reason::from_rule())?;
295 let col_names = extract_col_names_cached(&stmt, sql)?;
297 let mut rows = stmt.query(params).owe(Reason::from_rule())?;
298 let mut all_result = Vec::new();
299 while let Some(row) = rows.next().owe(Reason::from_rule())? {
300 all_result.push(map_row(row, &col_names)?);
301 }
302 Ok(all_result)
303}
304
305pub fn query_cached_with_scope<P: Params>(
306 conn: &rusqlite::Connection,
307 scope: &MetadataCacheScope,
308 provider_kind: Option<ProviderKind>,
309 sql: &str,
310 params: P,
311) -> KnowledgeResult<Vec<RowData>> {
312 let mut stmt = conn.prepare_cached(sql).owe(Reason::from_rule())?;
313 let col_names = extract_col_names_cached_with_scope(&stmt, scope, provider_kind, sql)?;
314 let mut rows = stmt.query(params).owe(Reason::from_rule())?;
315 let mut all_result = Vec::new();
316 while let Some(row) = rows.next().owe(Reason::from_rule())? {
317 all_result.push(map_row(row, &col_names)?);
318 }
319 Ok(all_result)
320}
321
322pub fn query_first_row_cached<P: Params>(
324 conn: &rusqlite::Connection,
325 sql: &str,
326 params: P,
327) -> KnowledgeResult<RowData> {
328 let mut stmt = conn.prepare_cached(sql).owe(Reason::from_rule())?;
329 let col_names = extract_col_names_cached(&stmt, sql)?;
330 let mut rows = stmt.query(params).owe(Reason::from_rule())?;
331 if let Some(row) = rows.next().owe(Reason::from_rule())? {
332 map_row(row, &col_names)
333 } else {
334 Ok(Vec::new())
335 }
336}
337
338pub fn query_first_row_cached_with_scope<P: Params>(
339 conn: &rusqlite::Connection,
340 scope: &MetadataCacheScope,
341 provider_kind: Option<ProviderKind>,
342 sql: &str,
343 params: P,
344) -> KnowledgeResult<RowData> {
345 let mut stmt = conn.prepare_cached(sql).owe(Reason::from_rule())?;
346 let col_names = extract_col_names_cached_with_scope(&stmt, scope, provider_kind, sql)?;
347 let mut rows = stmt.query(params).owe(Reason::from_rule())?;
348 if let Some(row) = rows.next().owe(Reason::from_rule())? {
349 map_row(row, &col_names)
350 } else {
351 Ok(Vec::new())
352 }
353}
354
355#[cfg(test)]
356mod tests {
357 use super::*;
358 use rusqlite::Connection;
359
360 fn setup_test_db() -> Connection {
361 let conn = Connection::open_in_memory().unwrap();
362 conn.execute(
363 "CREATE TABLE test (id INTEGER, name TEXT, score REAL, data BLOB, empty)",
364 [],
365 )
366 .unwrap();
367 conn
368 }
369
370 #[test]
371 fn test_query_returns_all_rows() {
372 let conn = setup_test_db();
373 let rows = query(&conn, "SELECT * FROM test", []).unwrap();
374 assert!(rows.is_empty());
375 conn.execute("INSERT INTO test (id, name) VALUES (1, 'alice')", [])
376 .unwrap();
377 conn.execute("INSERT INTO test (id, name) VALUES (2, 'bob')", [])
378 .unwrap();
379 conn.execute("INSERT INTO test (id, name) VALUES (3, 'charlie')", [])
380 .unwrap();
381
382 let rows = query(&conn, "SELECT id, name FROM test ORDER BY id", []).unwrap();
383 assert_eq!(rows.len(), 3);
384 }
385
386 #[test]
387 fn test_query_first_row_returns_single_row() {
388 let conn = setup_test_db();
389 let row = query_first_row(&conn, "SELECT * FROM test", []).unwrap();
390 assert!(row.is_empty());
391 conn.execute("INSERT INTO test (id, name) VALUES (1, 'first')", [])
392 .unwrap();
393 conn.execute("INSERT INTO test (id, name) VALUES (2, 'second')", [])
394 .unwrap();
395
396 let row = query_first_row(&conn, "SELECT id, name FROM test ORDER BY id", []).unwrap();
397 assert_eq!(row.len(), 2);
398 assert_eq!(row[0].to_string(), "digit(1)");
399 assert_eq!(row[1].to_string(), "chars(first)");
400 }
401
402 #[test]
403 fn test_map_row_handles_all_types() {
404 let conn = setup_test_db();
405 conn.execute(
406 "INSERT INTO test (id, name, score, data, empty) VALUES (42, 'hello', 3.14, X'414243', NULL)",
407 [],
408 )
409 .unwrap();
410
411 let row =
412 query_first_row(&conn, "SELECT id, name, score, data, empty FROM test", []).unwrap();
413 assert_eq!(row.len(), 5);
414 }
415
416 #[test]
417 fn test_extract_col_names_preserves_aliases() {
418 let conn = setup_test_db();
419 conn.execute("INSERT INTO test (id, name) VALUES (1, 'x')", [])
420 .unwrap();
421
422 let row = query_first_row(
423 &conn,
424 "SELECT id AS user_id, name AS user_name FROM test",
425 [],
426 )
427 .unwrap();
428 assert_eq!(row[0].get_name(), "user_id");
429 assert_eq!(row[1].get_name(), "user_name");
430 }
431
432 #[test]
433 fn test_query_cached_uses_cache() {
434 let _guard = crate::runtime::runtime_test_guard()
435 .lock()
436 .expect("runtime test guard");
437 let conn = setup_test_db();
438 conn.execute("INSERT INTO test (id) VALUES (1)", [])
439 .unwrap();
440
441 let sql = "SELECT id FROM test WHERE id = 1";
442 let _ = query_cached(&conn, sql, []).unwrap();
444 let rows = query_cached(&conn, sql, []).unwrap();
446 assert_eq!(rows.len(), 1);
447
448 let cache = COLNAME_CACHE.read().unwrap();
450 assert!(cache.contains(&metadata_cache_key_for_current_scope(sql)));
451 }
452
453 #[test]
454 fn test_query_with_params() {
455 let conn = setup_test_db();
456 conn.execute("INSERT INTO test (id, name) VALUES (1, 'alice')", [])
457 .unwrap();
458 conn.execute("INSERT INTO test (id, name) VALUES (2, 'bob')", [])
459 .unwrap();
460
461 let rows = query(&conn, "SELECT name FROM test WHERE id = ?1", [2]).unwrap();
462 assert_eq!(rows.len(), 1);
463 assert_eq!(rows[0][0].to_string(), "chars(bob)");
464 }
465
466 #[test]
467 fn test_metadata_cache_key_for_scope_is_explicit() {
468 let sql = "SELECT id FROM test";
469 let scope_a = MetadataCacheScope {
470 datasource_id: DatasourceId("postgres:aaaa".to_string()),
471 generation: Generation(1),
472 };
473 let scope_b = MetadataCacheScope {
474 datasource_id: DatasourceId("postgres:bbbb".to_string()),
475 generation: Generation(2),
476 };
477 let key_a = metadata_cache_key_for_scope(&scope_a, sql);
478 let key_b = metadata_cache_key_for_scope(&scope_b, sql);
479 assert_ne!(key_a, key_b);
480 }
481}