Skip to main content

wp_knowledge/mem/
thread_clone.rs

1use std::cell::RefCell;
2use std::time::Duration;
3
4use crate::DBQuery;
5use crate::mem::RowData;
6use orion_error::{ErrorOwe, ErrorWith};
7use rusqlite::ToSql;
8use rusqlite::backup::Backup;
9use rusqlite::{Connection, Params};
10use wp_error::KnowledgeResult;
11use wp_log::debug_kdb;
12use wp_model_core::model::DataField;
13
14use super::SqlNamedParam;
15use crate::loader::ProviderKind;
16use crate::runtime::{DatasourceId, Generation, MetadataCacheScope};
17
18thread_local! {
19    // clippy: use const init for thread_local value
20    static TLS_DB: RefCell<Option<ThreadLocalState>> = const { RefCell::new(None) };
21}
22
23struct ThreadLocalState {
24    authority_path: String,
25    generation: u64,
26    conn: Connection,
27}
28
29/// Thread-cloned read-only in-memory DB built from an authority file DB via SQLite backup API.
30/// Each thread lazily creates its own in-memory Connection (no cross-thread sharing).
31#[derive(Clone)]
32pub struct ThreadClonedMDB {
33    authority_path: String,
34    metadata_scope: MetadataCacheScope,
35}
36
37impl ThreadClonedMDB {
38    pub fn from_authority(path: &str) -> Self {
39        Self {
40            authority_path: path.to_string(),
41            metadata_scope: MetadataCacheScope {
42                datasource_id: DatasourceId("sqlite:standalone".to_string()),
43                generation: Generation(0),
44            },
45        }
46    }
47
48    pub fn from_authority_with_generation(path: &str, generation: u64) -> Self {
49        Self::from_authority_with_scope(
50            path,
51            DatasourceId("sqlite:standalone".to_string()),
52            generation,
53        )
54    }
55
56    pub fn from_authority_with_scope(
57        path: &str,
58        datasource_id: DatasourceId,
59        generation: u64,
60    ) -> Self {
61        Self {
62            authority_path: path.to_string(),
63            metadata_scope: MetadataCacheScope {
64                datasource_id,
65                generation: Generation(generation),
66            },
67        }
68    }
69
70    pub fn with_tls_conn<T, F: FnOnce(&Connection) -> KnowledgeResult<T>>(
71        &self,
72        f: F,
73    ) -> KnowledgeResult<T> {
74        let path = self.authority_path.clone();
75        let generation = self.metadata_scope.generation.0;
76        TLS_DB.with(|cell| {
77            // make sure a thread-local in-memory db exists
78            let should_rebuild = cell
79                .borrow()
80                .as_ref()
81                .map(|state| state.authority_path != path || state.generation != generation)
82                .unwrap_or(true);
83            if should_rebuild {
84                debug_kdb!(
85                    "[kdb] rebuild thread-local sqlite snapshot generation={} path={}",
86                    generation,
87                    path
88                );
89                // source: authority file; dest: in-memory
90                let src = Connection::open_with_flags(
91                    &path,
92                    rusqlite::OpenFlags::SQLITE_OPEN_READ_ONLY
93                        | rusqlite::OpenFlags::SQLITE_OPEN_URI,
94                )
95                .owe_res()
96                .want("connect db")?;
97                let mut dst = Connection::open_in_memory().owe_res().want("oepn conn")?;
98                {
99                    let bk = Backup::new(&src, &mut dst).owe_conf().want("backup")?;
100                    // Copy all pages with small sleep to yield
101                    bk.run_to_completion(50, Duration::from_millis(0), None)
102                        .owe_res()
103                        .want("backup run")?;
104                }
105                // 为查询连接注册内置 UDF(只读场景也可用在 SQL/OML 查询中)
106                let _ = crate::sqlite_ext::register_builtin(&dst);
107                *cell.borrow_mut() = Some(ThreadLocalState {
108                    authority_path: path.clone(),
109                    generation,
110                    conn: dst,
111                });
112            }
113            // safe to unwrap since ensured above
114            let conn = cell.borrow();
115            f(&conn.as_ref().unwrap().conn)
116        })
117    }
118
119    pub fn query_fields(&self, sql: &str, params: &[DataField]) -> KnowledgeResult<Vec<RowData>> {
120        self.query_fields_with_scope(sql, params)
121    }
122
123    pub fn query_fields_with_scope(
124        &self,
125        sql: &str,
126        params: &[DataField],
127    ) -> KnowledgeResult<Vec<RowData>> {
128        self.with_tls_conn(|conn| {
129            let named_params = params
130                .iter()
131                .cloned()
132                .map(SqlNamedParam)
133                .collect::<Vec<_>>();
134            let refs: Vec<(&str, &dyn ToSql)> = named_params
135                .iter()
136                .map(|param| (param.0.get_name(), param as &dyn ToSql))
137                .collect();
138            super::query_util::query_cached_with_scope(
139                conn,
140                &self.metadata_scope,
141                Some(ProviderKind::SqliteAuthority),
142                sql,
143                refs.as_slice(),
144            )
145        })
146    }
147
148    pub fn query_named_fields(&self, sql: &str, params: &[DataField]) -> KnowledgeResult<RowData> {
149        self.query_named_fields_with_scope(sql, params)
150    }
151
152    pub fn query_named_fields_with_scope(
153        &self,
154        sql: &str,
155        params: &[DataField],
156    ) -> KnowledgeResult<RowData> {
157        self.query_fields_with_scope(sql, params)
158            .map(|rows| rows.into_iter().next().unwrap_or_default())
159    }
160
161    pub fn query_with_scope(&self, sql: &str) -> KnowledgeResult<Vec<RowData>> {
162        self.with_tls_conn(|conn| {
163            super::query_util::query_cached_with_scope(
164                conn,
165                &self.metadata_scope,
166                Some(ProviderKind::SqliteAuthority),
167                sql,
168                [],
169            )
170        })
171    }
172
173    pub fn query_row_with_scope(&self, sql: &str) -> KnowledgeResult<RowData> {
174        self.with_tls_conn(|conn| {
175            super::query_util::query_first_row_cached_with_scope(
176                conn,
177                &self.metadata_scope,
178                Some(ProviderKind::SqliteAuthority),
179                sql,
180                [],
181            )
182        })
183    }
184}
185
186impl DBQuery for ThreadClonedMDB {
187    fn query(&self, sql: &str) -> KnowledgeResult<Vec<RowData>> {
188        self.with_tls_conn(|conn| super::query_util::query(conn, sql, []))
189    }
190    fn query_row(&self, sql: &str) -> KnowledgeResult<RowData> {
191        self.with_tls_conn(|conn| super::query_util::query_first_row(conn, sql, []))
192    }
193
194    fn query_row_params<P: Params>(&self, sql: &str, params: P) -> KnowledgeResult<RowData> {
195        self.with_tls_conn(|conn| super::query_util::query_first_row(conn, sql, params))
196    }
197
198    fn query_row_tdos<P: Params>(
199        &self,
200        _sql: &str,
201        _params: &[DataField; 2],
202    ) -> KnowledgeResult<RowData> {
203        // not used in current benchmarks
204        Ok(vec![])
205    }
206}