Skip to main content

wp_knowledge/mem/
query_util.rs

1use crate::error::{KnowledgeResult, Reason};
2use orion_error::UvsFrom;
3use orion_error::compat_traits::ErrorOweBase;
4use rusqlite::Params;
5use std::collections::hash_map::DefaultHasher;
6use std::future::Future;
7use std::hash::{Hash, Hasher};
8use std::num::NonZeroUsize;
9use std::sync::{Arc, RwLock};
10use wp_log::debug_kdb;
11use wp_model_core::model::{self, DataField};
12
13use lazy_static::lazy_static;
14use lru::LruCache;
15
16use crate::loader::ProviderKind;
17use crate::mem::RowData;
18use crate::runtime::{DatasourceId, Generation, MetadataCacheScope, runtime};
19use crate::telemetry::{
20    CacheLayer, CacheOutcome, CacheTelemetryEvent, telemetry, telemetry_enabled,
21};
22
23#[derive(Debug, Clone, PartialEq, Eq, Hash)]
24pub struct MetadataCacheKey {
25    pub datasource_id: DatasourceId,
26    pub generation: Generation,
27    pub query_hash: u64,
28}
29
30lazy_static! {
31    /// Global column metadata cache keyed by datasource/generation/query hash.
32    pub static ref COLNAME_CACHE: RwLock<LruCache<MetadataCacheKey, Arc<Vec<String>>>> =
33        RwLock::new(LruCache::new(
34            NonZeroUsize::new(512).expect("non-zero metadata cache size")
35        ));
36}
37
38pub fn column_metadata_cache_snapshot() -> (usize, usize) {
39    COLNAME_CACHE
40        .read()
41        .map(|cache| (cache.len(), cache.cap().get()))
42        .unwrap_or((0, 0))
43}
44
45fn stable_hash(value: &str) -> u64 {
46    let mut hasher = DefaultHasher::new();
47    value.hash(&mut hasher);
48    hasher.finish()
49}
50
51#[cfg(test)]
52pub(crate) fn metadata_cache_key_for_current_scope(sql: &str) -> MetadataCacheKey {
53    let scope = runtime().current_metadata_scope();
54    metadata_cache_key_for_scope(&scope, sql)
55}
56
57pub(crate) fn metadata_cache_key_for_scope(
58    scope: &MetadataCacheScope,
59    sql: &str,
60) -> MetadataCacheKey {
61    MetadataCacheKey {
62        datasource_id: scope.datasource_id.clone(),
63        generation: scope.generation,
64        query_hash: stable_hash(sql),
65    }
66}
67
68pub(crate) fn metadata_cache_get_or_try_init<F>(sql: &str, load: F) -> KnowledgeResult<Vec<String>>
69where
70    F: FnOnce() -> KnowledgeResult<Option<Vec<String>>>,
71{
72    let scope = runtime().current_metadata_scope();
73    let provider_kind = runtime().current_provider_kind();
74    metadata_cache_get_or_try_init_for_scope(&scope, provider_kind, sql, load)
75}
76
77pub(crate) fn metadata_cache_get_or_try_init_for_scope<F>(
78    scope: &MetadataCacheScope,
79    provider_kind: Option<ProviderKind>,
80    sql: &str,
81    load: F,
82) -> KnowledgeResult<Vec<String>>
83where
84    F: FnOnce() -> KnowledgeResult<Option<Vec<String>>>,
85{
86    let cache_key = metadata_cache_key_for_scope(scope, sql);
87    if let Some(names) = COLNAME_CACHE
88        .read()
89        .ok()
90        .and_then(|m| m.peek(&cache_key).cloned())
91    {
92        runtime().record_metadata_cache_hit();
93        if telemetry_enabled() {
94            telemetry().on_cache(&CacheTelemetryEvent {
95                layer: CacheLayer::Metadata,
96                outcome: CacheOutcome::Hit,
97                provider_kind: provider_kind.clone(),
98            });
99        }
100        debug_kdb!(
101            "[kdb] metadata cache hit datasource_id={} generation={}",
102            cache_key.datasource_id.0,
103            cache_key.generation.0
104        );
105        return Ok((*names).clone());
106    }
107
108    runtime().record_metadata_cache_miss();
109    if telemetry_enabled() {
110        telemetry().on_cache(&CacheTelemetryEvent {
111            layer: CacheLayer::Metadata,
112            outcome: CacheOutcome::Miss,
113            provider_kind,
114        });
115    }
116    debug_kdb!(
117        "[kdb] metadata cache miss datasource_id={} generation={}",
118        cache_key.datasource_id.0,
119        cache_key.generation.0
120    );
121
122    let Some(names) = load()? else {
123        return Ok(Vec::new());
124    };
125    if let Ok(mut m) = COLNAME_CACHE.write() {
126        m.put(cache_key, Arc::new(names.clone()));
127    }
128    Ok(names)
129}
130
131pub(crate) async fn metadata_cache_get_or_try_init_async_for_scope<F, Fut>(
132    scope: &MetadataCacheScope,
133    provider_kind: Option<ProviderKind>,
134    sql: &str,
135    load: F,
136) -> KnowledgeResult<Vec<String>>
137where
138    F: FnOnce() -> Fut,
139    Fut: Future<Output = KnowledgeResult<Option<Vec<String>>>>,
140{
141    metadata_cache_get_or_try_init_async_for_scope_typed(scope, provider_kind, sql, load).await
142}
143
144pub(crate) async fn metadata_cache_get_or_try_init_async_for_scope_typed<F, Fut, E>(
145    scope: &MetadataCacheScope,
146    provider_kind: Option<ProviderKind>,
147    sql: &str,
148    load: F,
149) -> Result<Vec<String>, E>
150where
151    F: FnOnce() -> Fut,
152    Fut: Future<Output = Result<Option<Vec<String>>, E>>,
153{
154    let cache_key = metadata_cache_key_for_scope(scope, sql);
155    if let Some(names) = COLNAME_CACHE
156        .read()
157        .ok()
158        .and_then(|m| m.peek(&cache_key).cloned())
159    {
160        runtime().record_metadata_cache_hit();
161        if telemetry_enabled() {
162            telemetry().on_cache(&CacheTelemetryEvent {
163                layer: CacheLayer::Metadata,
164                outcome: CacheOutcome::Hit,
165                provider_kind: provider_kind.clone(),
166            });
167        }
168        return Ok((*names).clone());
169    }
170
171    runtime().record_metadata_cache_miss();
172    if telemetry_enabled() {
173        telemetry().on_cache(&CacheTelemetryEvent {
174            layer: CacheLayer::Metadata,
175            outcome: CacheOutcome::Miss,
176            provider_kind,
177        });
178    }
179
180    let Some(names) = load().await? else {
181        return Ok(Vec::new());
182    };
183    if let Ok(mut m) = COLNAME_CACHE.write() {
184        m.put(cache_key, Arc::new(names.clone()));
185    }
186    Ok(names)
187}
188
189/// 将一行数据映射为 RowData
190fn map_row(row: &rusqlite::Row<'_>, col_names: &[String]) -> KnowledgeResult<RowData> {
191    let mut result = Vec::with_capacity(col_names.len());
192    for (i, col_name) in col_names.iter().enumerate() {
193        let value = row.get_ref(i).owe(Reason::from_rule())?;
194        let field = match value {
195            rusqlite::types::ValueRef::Null => {
196                DataField::new(model::DataType::default(), col_name, model::Value::Null)
197            }
198            rusqlite::types::ValueRef::Integer(v) => DataField::from_digit(col_name, v),
199            rusqlite::types::ValueRef::Real(v) => DataField::from_float(col_name, v),
200            rusqlite::types::ValueRef::Text(v) => DataField::from_chars(
201                col_name,
202                String::from_utf8(v.to_vec()).owe(Reason::from_rule())?,
203            ),
204            rusqlite::types::ValueRef::Blob(v) => {
205                DataField::from_chars(col_name, String::from_utf8_lossy(v).to_string())
206            }
207        };
208        result.push(field);
209    }
210    Ok(result)
211}
212
213/// 从 statement 获取列名(普通版,带 debug 日志)
214fn extract_col_names(stmt: &rusqlite::Statement<'_>) -> Vec<String> {
215    let col_cnt = stmt.column_count();
216    debug_kdb!("[memdb] col_cnt={}", col_cnt);
217    let mut col_names = Vec::with_capacity(col_cnt);
218    for i in 0..col_cnt {
219        let name = stmt.column_name(i).unwrap_or("").to_string();
220        debug_kdb!("[memdb] col[{}] name='{}'", i, name);
221        col_names.push(name);
222    }
223    col_names
224}
225
226/// 从 statement 获取列名(cached 版,使用全局缓存)
227fn extract_col_names_cached(
228    stmt: &rusqlite::Statement<'_>,
229    sql: &str,
230) -> KnowledgeResult<Vec<String>> {
231    metadata_cache_get_or_try_init(sql, || {
232        let col_cnt = stmt.column_count();
233        let mut names = Vec::with_capacity(col_cnt);
234        for i in 0..col_cnt {
235            names.push(stmt.column_name(i).owe(Reason::from_rule())?.to_string());
236        }
237        Ok(Some(names))
238    })
239}
240
241fn extract_col_names_cached_with_scope(
242    stmt: &rusqlite::Statement<'_>,
243    scope: &MetadataCacheScope,
244    provider_kind: Option<ProviderKind>,
245    sql: &str,
246) -> KnowledgeResult<Vec<String>> {
247    metadata_cache_get_or_try_init_for_scope(scope, provider_kind, sql, || {
248        let col_cnt = stmt.column_count();
249        let mut names = Vec::with_capacity(col_cnt);
250        for i in 0..col_cnt {
251            names.push(stmt.column_name(i).owe(Reason::from_rule())?.to_string());
252        }
253        Ok(Some(names))
254    })
255}
256
257pub fn query<P: Params>(
258    conn: &rusqlite::Connection,
259    sql: &str,
260    params: P,
261) -> KnowledgeResult<Vec<RowData>> {
262    let mut stmt = conn.prepare_cached(sql).owe(Reason::from_rule())?;
263    let col_names = extract_col_names(&stmt);
264    let mut rows = stmt.query(params).owe(Reason::from_rule())?;
265    let mut all_result = Vec::new();
266    while let Some(row) = rows.next().owe(Reason::from_rule())? {
267        all_result.push(map_row(row, &col_names)?);
268    }
269    Ok(all_result)
270}
271
272/// Query first row and map columns into RowData with column names preserved.
273pub fn query_first_row<P: Params>(
274    conn: &rusqlite::Connection,
275    sql: &str,
276    params: P,
277) -> KnowledgeResult<RowData> {
278    let mut stmt = conn.prepare_cached(sql).owe(Reason::from_rule())?;
279    let col_names = extract_col_names(&stmt);
280    let mut rows = stmt.query(params).owe(Reason::from_rule())?;
281    if let Some(row) = rows.next().owe(Reason::from_rule())? {
282        map_row(row, &col_names)
283    } else {
284        debug_kdb!("[memdb] no row for sql");
285        Ok(Vec::new())
286    }
287}
288
289pub fn query_cached<P: Params>(
290    conn: &rusqlite::Connection,
291    sql: &str,
292    params: P,
293) -> KnowledgeResult<Vec<RowData>> {
294    let mut stmt = conn.prepare_cached(sql).owe(Reason::from_rule())?;
295    // Column names cache (per SQL)
296    let col_names = extract_col_names_cached(&stmt, sql)?;
297    let mut rows = stmt.query(params).owe(Reason::from_rule())?;
298    let mut all_result = Vec::new();
299    while let Some(row) = rows.next().owe(Reason::from_rule())? {
300        all_result.push(map_row(row, &col_names)?);
301    }
302    Ok(all_result)
303}
304
305pub fn query_cached_with_scope<P: Params>(
306    conn: &rusqlite::Connection,
307    scope: &MetadataCacheScope,
308    provider_kind: Option<ProviderKind>,
309    sql: &str,
310    params: P,
311) -> KnowledgeResult<Vec<RowData>> {
312    let mut stmt = conn.prepare_cached(sql).owe(Reason::from_rule())?;
313    let col_names = extract_col_names_cached_with_scope(&stmt, scope, provider_kind, sql)?;
314    let mut rows = stmt.query(params).owe(Reason::from_rule())?;
315    let mut all_result = Vec::new();
316    while let Some(row) = rows.next().owe(Reason::from_rule())? {
317        all_result.push(map_row(row, &col_names)?);
318    }
319    Ok(all_result)
320}
321
322/// Same as `query_first_row` but with a shared column-names cache to reduce metadata lookups.
323pub fn query_first_row_cached<P: Params>(
324    conn: &rusqlite::Connection,
325    sql: &str,
326    params: P,
327) -> KnowledgeResult<RowData> {
328    let mut stmt = conn.prepare_cached(sql).owe(Reason::from_rule())?;
329    let col_names = extract_col_names_cached(&stmt, sql)?;
330    let mut rows = stmt.query(params).owe(Reason::from_rule())?;
331    if let Some(row) = rows.next().owe(Reason::from_rule())? {
332        map_row(row, &col_names)
333    } else {
334        Ok(Vec::new())
335    }
336}
337
338pub fn query_first_row_cached_with_scope<P: Params>(
339    conn: &rusqlite::Connection,
340    scope: &MetadataCacheScope,
341    provider_kind: Option<ProviderKind>,
342    sql: &str,
343    params: P,
344) -> KnowledgeResult<RowData> {
345    let mut stmt = conn.prepare_cached(sql).owe(Reason::from_rule())?;
346    let col_names = extract_col_names_cached_with_scope(&stmt, scope, provider_kind, sql)?;
347    let mut rows = stmt.query(params).owe(Reason::from_rule())?;
348    if let Some(row) = rows.next().owe(Reason::from_rule())? {
349        map_row(row, &col_names)
350    } else {
351        Ok(Vec::new())
352    }
353}
354
355#[cfg(test)]
356mod tests {
357    use super::*;
358    use rusqlite::Connection;
359
360    fn setup_test_db() -> Connection {
361        let conn = Connection::open_in_memory().unwrap();
362        conn.execute(
363            "CREATE TABLE test (id INTEGER, name TEXT, score REAL, data BLOB, empty)",
364            [],
365        )
366        .unwrap();
367        conn
368    }
369
370    #[test]
371    fn test_query_returns_all_rows() {
372        let conn = setup_test_db();
373        let rows = query(&conn, "SELECT * FROM test", []).unwrap();
374        assert!(rows.is_empty());
375        conn.execute("INSERT INTO test (id, name) VALUES (1, 'alice')", [])
376            .unwrap();
377        conn.execute("INSERT INTO test (id, name) VALUES (2, 'bob')", [])
378            .unwrap();
379        conn.execute("INSERT INTO test (id, name) VALUES (3, 'charlie')", [])
380            .unwrap();
381
382        let rows = query(&conn, "SELECT id, name FROM test ORDER BY id", []).unwrap();
383        assert_eq!(rows.len(), 3);
384    }
385
386    #[test]
387    fn test_query_first_row_returns_single_row() {
388        let conn = setup_test_db();
389        let row = query_first_row(&conn, "SELECT * FROM test", []).unwrap();
390        assert!(row.is_empty());
391        conn.execute("INSERT INTO test (id, name) VALUES (1, 'first')", [])
392            .unwrap();
393        conn.execute("INSERT INTO test (id, name) VALUES (2, 'second')", [])
394            .unwrap();
395
396        let row = query_first_row(&conn, "SELECT id, name FROM test ORDER BY id", []).unwrap();
397        assert_eq!(row.len(), 2);
398        assert_eq!(row[0].to_string(), "digit(1)");
399        assert_eq!(row[1].to_string(), "chars(first)");
400    }
401
402    #[test]
403    fn test_map_row_handles_all_types() {
404        let conn = setup_test_db();
405        conn.execute(
406            "INSERT INTO test (id, name, score, data, empty) VALUES (42, 'hello', 3.14, X'414243', NULL)",
407            [],
408        )
409        .unwrap();
410
411        let row =
412            query_first_row(&conn, "SELECT id, name, score, data, empty FROM test", []).unwrap();
413        assert_eq!(row.len(), 5);
414    }
415
416    #[test]
417    fn test_extract_col_names_preserves_aliases() {
418        let conn = setup_test_db();
419        conn.execute("INSERT INTO test (id, name) VALUES (1, 'x')", [])
420            .unwrap();
421
422        let row = query_first_row(
423            &conn,
424            "SELECT id AS user_id, name AS user_name FROM test",
425            [],
426        )
427        .unwrap();
428        assert_eq!(row[0].get_name(), "user_id");
429        assert_eq!(row[1].get_name(), "user_name");
430    }
431
432    #[test]
433    fn test_query_cached_uses_cache() {
434        let _guard = crate::runtime::runtime_test_guard()
435            .lock()
436            .expect("runtime test guard");
437        let conn = setup_test_db();
438        conn.execute("INSERT INTO test (id) VALUES (1)", [])
439            .unwrap();
440
441        let sql = "SELECT id FROM test WHERE id = 1";
442        // 第一次查询,填充缓存
443        let _ = query_cached(&conn, sql, []).unwrap();
444        // 第二次查询,应命中缓存
445        let rows = query_cached(&conn, sql, []).unwrap();
446        assert_eq!(rows.len(), 1);
447
448        // 验证缓存已填充
449        let cache = COLNAME_CACHE.read().unwrap();
450        assert!(cache.contains(&metadata_cache_key_for_current_scope(sql)));
451    }
452
453    #[test]
454    fn test_query_with_params() {
455        let conn = setup_test_db();
456        conn.execute("INSERT INTO test (id, name) VALUES (1, 'alice')", [])
457            .unwrap();
458        conn.execute("INSERT INTO test (id, name) VALUES (2, 'bob')", [])
459            .unwrap();
460
461        let rows = query(&conn, "SELECT name FROM test WHERE id = ?1", [2]).unwrap();
462        assert_eq!(rows.len(), 1);
463        assert_eq!(rows[0][0].to_string(), "chars(bob)");
464    }
465
466    #[test]
467    fn test_metadata_cache_key_for_scope_is_explicit() {
468        let sql = "SELECT id FROM test";
469        let scope_a = MetadataCacheScope {
470            datasource_id: DatasourceId("postgres:aaaa".to_string()),
471            generation: Generation(1),
472        };
473        let scope_b = MetadataCacheScope {
474            datasource_id: DatasourceId("postgres:bbbb".to_string()),
475            generation: Generation(2),
476        };
477        let key_a = metadata_cache_key_for_scope(&scope_a, sql);
478        let key_b = metadata_cache_key_for_scope(&scope_b, sql);
479        assert_ne!(key_a, key_b);
480    }
481}