Skip to main content

wp_knowledge/mem/
thread_clone.rs

1use std::cell::RefCell;
2use std::time::Duration;
3
4use crate::DBQuery;
5use crate::error::{KnowReason, KnowledgeResult};
6use crate::mem::RowData;
7use orion_error::conversion::{ErrorWith, SourceRawErr};
8use rusqlite::ToSql;
9use rusqlite::backup::Backup;
10use rusqlite::{Connection, Params};
11use wp_log::debug_kdb;
12use wp_model_core::model::DataField;
13
14use super::SqlNamedParam;
15use crate::loader::ProviderKind;
16use crate::runtime::{DatasourceId, Generation, MetadataCacheScope};
17
18thread_local! {
19    // clippy: use const init for thread_local value
20    static TLS_DB: RefCell<Option<ThreadLocalState>> = const { RefCell::new(None) };
21}
22
23struct ThreadLocalState {
24    authority_path: String,
25    generation: u64,
26    conn: Connection,
27}
28
29/// Thread-cloned read-only in-memory DB built from an authority file DB via SQLite backup API.
30/// Each thread lazily creates its own in-memory Connection (no cross-thread sharing).
31#[derive(Clone)]
32pub struct ThreadClonedMDB {
33    authority_path: String,
34    metadata_scope: MetadataCacheScope,
35}
36
37impl ThreadClonedMDB {
38    pub fn from_authority(path: &str) -> Self {
39        Self {
40            authority_path: path.to_string(),
41            metadata_scope: MetadataCacheScope {
42                datasource_id: DatasourceId("sqlite:standalone".to_string()),
43                generation: Generation(0),
44            },
45        }
46    }
47
48    pub fn from_authority_with_generation(path: &str, generation: u64) -> Self {
49        Self::from_authority_with_scope(
50            path,
51            DatasourceId("sqlite:standalone".to_string()),
52            generation,
53        )
54    }
55
56    pub fn from_authority_with_scope(
57        path: &str,
58        datasource_id: DatasourceId,
59        generation: u64,
60    ) -> Self {
61        Self {
62            authority_path: path.to_string(),
63            metadata_scope: MetadataCacheScope {
64                datasource_id,
65                generation: Generation(generation),
66            },
67        }
68    }
69
70    pub fn with_tls_conn<T, F: FnOnce(&Connection) -> KnowledgeResult<T>>(
71        &self,
72        f: F,
73    ) -> KnowledgeResult<T> {
74        let path = self.authority_path.clone();
75        let generation = self.metadata_scope.generation.0;
76        TLS_DB.with(|cell| {
77            // make sure a thread-local in-memory db exists
78            let should_rebuild = cell
79                .borrow()
80                .as_ref()
81                .map(|state| state.authority_path != path || state.generation != generation)
82                .unwrap_or(true);
83            if should_rebuild {
84                debug_kdb!(
85                    "[kdb] rebuild thread-local sqlite snapshot generation={} path={}",
86                    generation,
87                    path
88                );
89                // source: authority file; dest: in-memory
90                let src = Connection::open_with_flags(
91                    &path,
92                    rusqlite::OpenFlags::SQLITE_OPEN_READ_ONLY
93                        | rusqlite::OpenFlags::SQLITE_OPEN_URI,
94                )
95                .source_raw_err(KnowReason::from_res(), "source error")
96                .doing("connect db")?;
97                let mut dst = Connection::open_in_memory()
98                    .source_raw_err(KnowReason::from_res(), "source error")
99                    .doing("oepn conn")?;
100                {
101                    let bk = Backup::new(&src, &mut dst)
102                        .source_raw_err(KnowReason::from_conf(), "source error")
103                        .doing("backup")?;
104                    // Copy all pages with small sleep to yield
105                    bk.run_to_completion(50, Duration::from_millis(0), None)
106                        .source_raw_err(KnowReason::from_res(), "source error")
107                        .doing("backup run")?;
108                }
109                // 为查询连接注册内置 UDF(只读场景也可用在 SQL/OML 查询中)
110                let _ = crate::sqlite_ext::register_builtin(&dst);
111                *cell.borrow_mut() = Some(ThreadLocalState {
112                    authority_path: path.clone(),
113                    generation,
114                    conn: dst,
115                });
116            }
117            // safe to unwrap since ensured above
118            let conn = cell.borrow();
119            f(&conn.as_ref().unwrap().conn)
120        })
121    }
122
123    pub fn query_fields(&self, sql: &str, params: &[DataField]) -> KnowledgeResult<Vec<RowData>> {
124        self.query_fields_with_scope(sql, params)
125    }
126
127    pub fn query_fields_with_scope(
128        &self,
129        sql: &str,
130        params: &[DataField],
131    ) -> KnowledgeResult<Vec<RowData>> {
132        self.with_tls_conn(|conn| {
133            let named_params = params
134                .iter()
135                .cloned()
136                .map(SqlNamedParam)
137                .collect::<Vec<_>>();
138            let refs: Vec<(&str, &dyn ToSql)> = named_params
139                .iter()
140                .map(|param| (param.0.get_name(), param as &dyn ToSql))
141                .collect();
142            super::query_util::query_cached_with_scope(
143                conn,
144                &self.metadata_scope,
145                Some(ProviderKind::SqliteAuthority),
146                sql,
147                refs.as_slice(),
148            )
149        })
150    }
151
152    pub fn query_named_fields(&self, sql: &str, params: &[DataField]) -> KnowledgeResult<RowData> {
153        self.query_named_fields_with_scope(sql, params)
154    }
155
156    pub fn query_named_fields_with_scope(
157        &self,
158        sql: &str,
159        params: &[DataField],
160    ) -> KnowledgeResult<RowData> {
161        self.query_fields_with_scope(sql, params)
162            .map(|rows| rows.into_iter().next().unwrap_or_default())
163    }
164
165    pub fn query_with_scope(&self, sql: &str) -> KnowledgeResult<Vec<RowData>> {
166        self.with_tls_conn(|conn| {
167            super::query_util::query_cached_with_scope(
168                conn,
169                &self.metadata_scope,
170                Some(ProviderKind::SqliteAuthority),
171                sql,
172                [],
173            )
174        })
175    }
176
177    pub fn query_row_with_scope(&self, sql: &str) -> KnowledgeResult<RowData> {
178        self.with_tls_conn(|conn| {
179            super::query_util::query_first_row_cached_with_scope(
180                conn,
181                &self.metadata_scope,
182                Some(ProviderKind::SqliteAuthority),
183                sql,
184                [],
185            )
186        })
187    }
188}
189
190impl DBQuery for ThreadClonedMDB {
191    fn query(&self, sql: &str) -> KnowledgeResult<Vec<RowData>> {
192        self.with_tls_conn(|conn| super::query_util::query(conn, sql, []))
193    }
194    fn query_row(&self, sql: &str) -> KnowledgeResult<RowData> {
195        self.with_tls_conn(|conn| super::query_util::query_first_row(conn, sql, []))
196    }
197
198    fn query_row_params<P: Params>(&self, sql: &str, params: P) -> KnowledgeResult<RowData> {
199        self.with_tls_conn(|conn| super::query_util::query_first_row(conn, sql, params))
200    }
201
202    fn query_row_tdos<P: Params>(
203        &self,
204        _sql: &str,
205        _params: &[DataField; 2],
206    ) -> KnowledgeResult<RowData> {
207        // not used in current benchmarks
208        Ok(vec![])
209    }
210}