Skip to main content

wp_knowledge/mem/
thread_clone.rs

1use std::cell::RefCell;
2use std::time::Duration;
3
4use crate::DBQuery;
5use crate::error::{KnowledgeResult, Reason};
6use crate::mem::RowData;
7use orion_error::ErrorWith;
8use orion_error::UvsFrom;
9use orion_error::compat_traits::ErrorOweBase;
10use rusqlite::ToSql;
11use rusqlite::backup::Backup;
12use rusqlite::{Connection, Params};
13use wp_log::debug_kdb;
14use wp_model_core::model::DataField;
15
16use super::SqlNamedParam;
17use crate::loader::ProviderKind;
18use crate::runtime::{DatasourceId, Generation, MetadataCacheScope};
19
20thread_local! {
21    // clippy: use const init for thread_local value
22    static TLS_DB: RefCell<Option<ThreadLocalState>> = const { RefCell::new(None) };
23}
24
25struct ThreadLocalState {
26    authority_path: String,
27    generation: u64,
28    conn: Connection,
29}
30
31/// Thread-cloned read-only in-memory DB built from an authority file DB via SQLite backup API.
32/// Each thread lazily creates its own in-memory Connection (no cross-thread sharing).
33#[derive(Clone)]
34pub struct ThreadClonedMDB {
35    authority_path: String,
36    metadata_scope: MetadataCacheScope,
37}
38
39impl ThreadClonedMDB {
40    pub fn from_authority(path: &str) -> Self {
41        Self {
42            authority_path: path.to_string(),
43            metadata_scope: MetadataCacheScope {
44                datasource_id: DatasourceId("sqlite:standalone".to_string()),
45                generation: Generation(0),
46            },
47        }
48    }
49
50    pub fn from_authority_with_generation(path: &str, generation: u64) -> Self {
51        Self::from_authority_with_scope(
52            path,
53            DatasourceId("sqlite:standalone".to_string()),
54            generation,
55        )
56    }
57
58    pub fn from_authority_with_scope(
59        path: &str,
60        datasource_id: DatasourceId,
61        generation: u64,
62    ) -> Self {
63        Self {
64            authority_path: path.to_string(),
65            metadata_scope: MetadataCacheScope {
66                datasource_id,
67                generation: Generation(generation),
68            },
69        }
70    }
71
72    pub fn with_tls_conn<T, F: FnOnce(&Connection) -> KnowledgeResult<T>>(
73        &self,
74        f: F,
75    ) -> KnowledgeResult<T> {
76        let path = self.authority_path.clone();
77        let generation = self.metadata_scope.generation.0;
78        TLS_DB.with(|cell| {
79            // make sure a thread-local in-memory db exists
80            let should_rebuild = cell
81                .borrow()
82                .as_ref()
83                .map(|state| state.authority_path != path || state.generation != generation)
84                .unwrap_or(true);
85            if should_rebuild {
86                debug_kdb!(
87                    "[kdb] rebuild thread-local sqlite snapshot generation={} path={}",
88                    generation,
89                    path
90                );
91                // source: authority file; dest: in-memory
92                let src = Connection::open_with_flags(
93                    &path,
94                    rusqlite::OpenFlags::SQLITE_OPEN_READ_ONLY
95                        | rusqlite::OpenFlags::SQLITE_OPEN_URI,
96                )
97                .owe(Reason::from_res())
98                .doing("connect db")?;
99                let mut dst = Connection::open_in_memory()
100                    .owe(Reason::from_res())
101                    .doing("oepn conn")?;
102                {
103                    let bk = Backup::new(&src, &mut dst)
104                        .owe(Reason::from_conf())
105                        .doing("backup")?;
106                    // Copy all pages with small sleep to yield
107                    bk.run_to_completion(50, Duration::from_millis(0), None)
108                        .owe(Reason::from_res())
109                        .doing("backup run")?;
110                }
111                // 为查询连接注册内置 UDF(只读场景也可用在 SQL/OML 查询中)
112                let _ = crate::sqlite_ext::register_builtin(&dst);
113                *cell.borrow_mut() = Some(ThreadLocalState {
114                    authority_path: path.clone(),
115                    generation,
116                    conn: dst,
117                });
118            }
119            // safe to unwrap since ensured above
120            let conn = cell.borrow();
121            f(&conn.as_ref().unwrap().conn)
122        })
123    }
124
125    pub fn query_fields(&self, sql: &str, params: &[DataField]) -> KnowledgeResult<Vec<RowData>> {
126        self.query_fields_with_scope(sql, params)
127    }
128
129    pub fn query_fields_with_scope(
130        &self,
131        sql: &str,
132        params: &[DataField],
133    ) -> KnowledgeResult<Vec<RowData>> {
134        self.with_tls_conn(|conn| {
135            let named_params = params
136                .iter()
137                .cloned()
138                .map(SqlNamedParam)
139                .collect::<Vec<_>>();
140            let refs: Vec<(&str, &dyn ToSql)> = named_params
141                .iter()
142                .map(|param| (param.0.get_name(), param as &dyn ToSql))
143                .collect();
144            super::query_util::query_cached_with_scope(
145                conn,
146                &self.metadata_scope,
147                Some(ProviderKind::SqliteAuthority),
148                sql,
149                refs.as_slice(),
150            )
151        })
152    }
153
154    pub fn query_named_fields(&self, sql: &str, params: &[DataField]) -> KnowledgeResult<RowData> {
155        self.query_named_fields_with_scope(sql, params)
156    }
157
158    pub fn query_named_fields_with_scope(
159        &self,
160        sql: &str,
161        params: &[DataField],
162    ) -> KnowledgeResult<RowData> {
163        self.query_fields_with_scope(sql, params)
164            .map(|rows| rows.into_iter().next().unwrap_or_default())
165    }
166
167    pub fn query_with_scope(&self, sql: &str) -> KnowledgeResult<Vec<RowData>> {
168        self.with_tls_conn(|conn| {
169            super::query_util::query_cached_with_scope(
170                conn,
171                &self.metadata_scope,
172                Some(ProviderKind::SqliteAuthority),
173                sql,
174                [],
175            )
176        })
177    }
178
179    pub fn query_row_with_scope(&self, sql: &str) -> KnowledgeResult<RowData> {
180        self.with_tls_conn(|conn| {
181            super::query_util::query_first_row_cached_with_scope(
182                conn,
183                &self.metadata_scope,
184                Some(ProviderKind::SqliteAuthority),
185                sql,
186                [],
187            )
188        })
189    }
190}
191
192impl DBQuery for ThreadClonedMDB {
193    fn query(&self, sql: &str) -> KnowledgeResult<Vec<RowData>> {
194        self.with_tls_conn(|conn| super::query_util::query(conn, sql, []))
195    }
196    fn query_row(&self, sql: &str) -> KnowledgeResult<RowData> {
197        self.with_tls_conn(|conn| super::query_util::query_first_row(conn, sql, []))
198    }
199
200    fn query_row_params<P: Params>(&self, sql: &str, params: P) -> KnowledgeResult<RowData> {
201        self.with_tls_conn(|conn| super::query_util::query_first_row(conn, sql, params))
202    }
203
204    fn query_row_tdos<P: Params>(
205        &self,
206        _sql: &str,
207        _params: &[DataField; 2],
208    ) -> KnowledgeResult<RowData> {
209        // not used in current benchmarks
210        Ok(vec![])
211    }
212}