Skip to main content

we_trust_sqlite/reader/
mod.rs

1use std::sync::Arc;
2use tokio::{
3    fs::File,
4    io::{AsyncReadExt, AsyncSeekExt, SeekFrom},
5    sync::RwLock,
6};
7use yykv_types::{DsError, DsValue};
8
9type Result<T> = std::result::Result<T, DsError>;
10
11pub mod btree;
12pub mod cache;
13pub mod schema;
14pub mod wal;
15
16pub use btree::{BTreeCell, BTreePage, BTreePageHeader};
17pub use cache::PageCache;
18pub use schema::TableSchema;
19pub use wal::{WalFrameHeader, WalReader};
20
21#[derive(Debug, Clone)]
22pub struct Row {
23    pub row_id: i64,
24    pub values: Vec<DsValue>,
25}
26
27/// SQLite 文件头魔数
28pub const SQLITE_MAGIC: &[u8; 16] = b"SQLite format 3\0";
29
30/// SQLite 文件头信息
31#[derive(Debug, Clone, Copy)]
32pub struct SqliteHeader {
33    pub page_size: u32,
34    pub write_version: u8,
35    pub read_version: u8,
36    pub reserved_space: u8,
37}
38
39/// 原生 SQLite 读取器
40pub struct NativeReader {
41    pub(crate) path: String,
42    header: RwLock<Option<SqliteHeader>>,
43    cache: Arc<PageCache>,
44    wal_reader: RwLock<Option<Arc<WalReader>>>,
45}
46
47impl NativeReader {
48    pub fn new(path: impl Into<String>) -> Self {
49        Self {
50            path: path.into(),
51            header: RwLock::new(None),
52            cache: Arc::new(PageCache::new(100)),
53            wal_reader: RwLock::new(None),
54        }
55    }
56
57    /// 获取或初始化 WalReader
58    async fn get_wal_reader(&self) -> Result<Arc<WalReader>> {
59        {
60            let wal = self.wal_reader.read().await;
61            if let Some(w) = &*wal {
62                return Ok(w.clone());
63            }
64        }
65
66        let header = self.get_header().await?;
67        let wal = Arc::new(WalReader::new(&self.path, header.page_size));
68        let mut wal_slot = self.wal_reader.write().await;
69        *wal_slot = Some(wal.clone());
70        Ok(wal)
71    }
72
73    /// 获取数据库中所有表的 Schema
74    pub async fn get_schemas(&self) -> Result<Vec<TableSchema>> {
75        let root_page = self.get_btree_page(1).await?;
76        let rows = root_page.get_rows(self).await?;
77
78        let mut schemas = Vec::new();
79        for row in rows {
80            if row.values.len() >= 5 {
81                let table_type = match &row.values[0] {
82                    DsValue::Text(s) => s.clone(),
83                    _ => continue,
84                };
85                let name = match &row.values[1] {
86                    DsValue::Text(s) => s.clone(),
87                    _ => continue,
88                };
89                let tbl_name = match &row.values[2] {
90                    DsValue::Text(s) => s.clone(),
91                    _ => continue,
92                };
93                let rootpage = match &row.values[3] {
94                    DsValue::Int(i) => *i as u32,
95                    _ => continue,
96                };
97                let sql = match &row.values[4] {
98                    DsValue::Text(s) => s.clone(),
99                    _ => continue,
100                };
101
102                schemas.push(TableSchema {
103                    table_type,
104                    name,
105                    tbl_name,
106                    rootpage,
107                    sql,
108                });
109            }
110        }
111        Ok(schemas)
112    }
113
114    /// 执行全表扫描
115    pub async fn scan_table(&self, root_page_id: u32) -> Result<Vec<Row>> {
116        let root_page = self.get_btree_page(root_page_id).await?;
117        root_page.get_rows(self).await
118    }
119
120    /// 根据表名执行全表扫描
121    pub async fn scan_table_by_name(&self, table_name: &str) -> Result<Vec<Row>> {
122        let schemas = self.get_schemas().await?;
123        let schema = schemas
124            .iter()
125            .find(|s| s.name == table_name)
126            .ok_or_else(|| DsError::query(format!("Table not found: {}", table_name)))?;
127        self.scan_table(schema.rootpage).await
128    }
129
130    /// 获取 B-Tree 页面
131    pub async fn get_btree_page(&self, page_id: u32) -> Result<BTreePage> {
132        if let Some(page) = self.cache.get(page_id).await {
133            return Ok(page);
134        }
135
136        let wal_reader = self.get_wal_reader().await?;
137        let data = if let Some(wal_data) = wal_reader.read_page(page_id).await? {
138            wal_data
139        } else {
140            let header = self.get_header().await?;
141            let mut file = File::open(&self.path)
142                .await
143                .map_err(|e| DsError::io_raw(e, Some(self.path.clone().into())))?;
144            let offset = (page_id as u64 - 1) * header.page_size as u64;
145            let mut buf = vec![0u8; header.page_size as usize];
146            file.seek(SeekFrom::Start(offset))
147                .await
148                .map_err(|e| DsError::io_raw(e, Some(self.path.clone().into())))?;
149            file.read_exact(&mut buf)
150                .await
151                .map_err(|e| DsError::io_raw(e, Some(self.path.clone().into())))?;
152            buf
153        };
154
155        let page =
156            BTreePage::decode(page_id, &data).map_err(|e| DsError::serialization(e.to_string()))?;
157        self.cache.insert(page_id, page.clone()).await;
158        Ok(page)
159    }
160
161    /// 验证文件头
162    pub async fn verify_header(&self) -> Result<()> {
163        self.get_header().await?;
164        Ok(())
165    }
166
167    /// 清除页面缓存
168    pub async fn clear_cache(&self) {
169        self.cache.clear().await;
170    }
171
172    /// 获取 SQLite 文件头信息
173    pub async fn get_header(&self) -> Result<SqliteHeader> {
174        {
175            let header = self.header.read().await;
176            if let Some(h) = *header {
177                return Ok(h);
178            }
179        }
180
181        let mut file = File::open(&self.path)
182            .await
183            .map_err(|e| DsError::io_raw(e, Some(self.path.clone().into())))?;
184        let mut magic = [0u8; 16];
185        file.read_exact(&mut magic)
186            .await
187            .map_err(|e| DsError::io_raw(e, Some(self.path.clone().into())))?;
188
189        if &magic != SQLITE_MAGIC {
190            return Err(DsError::io("Invalid SQLite magic header"));
191        }
192
193        let mut page_size_buf = [0u8; 2];
194        file.seek(SeekFrom::Start(16))
195            .await
196            .map_err(|e| DsError::io_raw(e, Some(self.path.clone().into())))?;
197        file.read_exact(&mut page_size_buf)
198            .await
199            .map_err(|e| DsError::io_raw(e, Some(self.path.clone().into())))?;
200        let page_size = u16::from_be_bytes(page_size_buf);
201        let page_size = if page_size == 1 {
202            65536
203        } else {
204            page_size as u32
205        };
206
207        let mut version_buf = [0u8; 2];
208        file.seek(SeekFrom::Start(18))
209            .await
210            .map_err(|e| DsError::io_raw(e, Some(self.path.clone().into())))?;
211        file.read_exact(&mut version_buf)
212            .await
213            .map_err(|e| DsError::io_raw(e, Some(self.path.clone().into())))?;
214
215        let mut reserved_buf = [0u8; 1];
216        file.seek(SeekFrom::Start(20))
217            .await
218            .map_err(|e| DsError::io_raw(e, Some(self.path.clone().into())))?;
219        file.read_exact(&mut reserved_buf)
220            .await
221            .map_err(|e| DsError::io_raw(e, Some(self.path.clone().into())))?;
222
223        let h = SqliteHeader {
224            page_size,
225            write_version: version_buf[0],
226            read_version: version_buf[1],
227            reserved_space: reserved_buf[0],
228        };
229
230        let mut header_slot = self.header.write().await;
231        *header_slot = Some(h);
232        Ok(h)
233    }
234}