we_trust_sqlite/reader/
mod.rs1use std::sync::Arc;
2use tokio::{
3 fs::File,
4 io::{AsyncReadExt, AsyncSeekExt, SeekFrom},
5 sync::RwLock,
6};
7use yykv_types::{DsError, DsValue};
8
9type Result<T> = std::result::Result<T, DsError>;
10
11pub mod btree;
12pub mod cache;
13pub mod schema;
14pub mod wal;
15
16pub use btree::{BTreeCell, BTreePage, BTreePageHeader};
17pub use cache::PageCache;
18pub use schema::TableSchema;
19pub use wal::{WalFrameHeader, WalReader};
20
21#[derive(Debug, Clone)]
22pub struct Row {
23 pub row_id: i64,
24 pub values: Vec<DsValue>,
25}
26
27pub const SQLITE_MAGIC: &[u8; 16] = b"SQLite format 3\0";
29
30#[derive(Debug, Clone, Copy)]
32pub struct SqliteHeader {
33 pub page_size: u32,
34 pub write_version: u8,
35 pub read_version: u8,
36 pub reserved_space: u8,
37}
38
39pub struct NativeReader {
41 pub(crate) path: String,
42 header: RwLock<Option<SqliteHeader>>,
43 cache: Arc<PageCache>,
44 wal_reader: RwLock<Option<Arc<WalReader>>>,
45}
46
47impl NativeReader {
48 pub fn new(path: impl Into<String>) -> Self {
49 Self {
50 path: path.into(),
51 header: RwLock::new(None),
52 cache: Arc::new(PageCache::new(100)),
53 wal_reader: RwLock::new(None),
54 }
55 }
56
57 async fn get_wal_reader(&self) -> Result<Arc<WalReader>> {
59 {
60 let wal = self.wal_reader.read().await;
61 if let Some(w) = &*wal {
62 return Ok(w.clone());
63 }
64 }
65
66 let header = self.get_header().await?;
67 let wal = Arc::new(WalReader::new(&self.path, header.page_size));
68 let mut wal_slot = self.wal_reader.write().await;
69 *wal_slot = Some(wal.clone());
70 Ok(wal)
71 }
72
73 pub async fn get_schemas(&self) -> Result<Vec<TableSchema>> {
75 let root_page = self.get_btree_page(1).await?;
76 let rows = root_page.get_rows(self).await?;
77
78 let mut schemas = Vec::new();
79 for row in rows {
80 if row.values.len() >= 5 {
81 let table_type = match &row.values[0] {
82 DsValue::Text(s) => s.clone(),
83 _ => continue,
84 };
85 let name = match &row.values[1] {
86 DsValue::Text(s) => s.clone(),
87 _ => continue,
88 };
89 let tbl_name = match &row.values[2] {
90 DsValue::Text(s) => s.clone(),
91 _ => continue,
92 };
93 let rootpage = match &row.values[3] {
94 DsValue::Int(i) => *i as u32,
95 _ => continue,
96 };
97 let sql = match &row.values[4] {
98 DsValue::Text(s) => s.clone(),
99 _ => continue,
100 };
101
102 schemas.push(TableSchema {
103 table_type,
104 name,
105 tbl_name,
106 rootpage,
107 sql,
108 });
109 }
110 }
111 Ok(schemas)
112 }
113
114 pub async fn scan_table(&self, root_page_id: u32) -> Result<Vec<Row>> {
116 let root_page = self.get_btree_page(root_page_id).await?;
117 root_page.get_rows(self).await
118 }
119
120 pub async fn scan_table_by_name(&self, table_name: &str) -> Result<Vec<Row>> {
122 let schemas = self.get_schemas().await?;
123 let schema = schemas
124 .iter()
125 .find(|s| s.name == table_name)
126 .ok_or_else(|| DsError::query(format!("Table not found: {}", table_name)))?;
127 self.scan_table(schema.rootpage).await
128 }
129
130 pub async fn get_btree_page(&self, page_id: u32) -> Result<BTreePage> {
132 if let Some(page) = self.cache.get(page_id).await {
133 return Ok(page);
134 }
135
136 let wal_reader = self.get_wal_reader().await?;
137 let data = if let Some(wal_data) = wal_reader.read_page(page_id).await? {
138 wal_data
139 } else {
140 let header = self.get_header().await?;
141 let mut file = File::open(&self.path)
142 .await
143 .map_err(|e| DsError::io_raw(e, Some(self.path.clone().into())))?;
144 let offset = (page_id as u64 - 1) * header.page_size as u64;
145 let mut buf = vec![0u8; header.page_size as usize];
146 file.seek(SeekFrom::Start(offset))
147 .await
148 .map_err(|e| DsError::io_raw(e, Some(self.path.clone().into())))?;
149 file.read_exact(&mut buf)
150 .await
151 .map_err(|e| DsError::io_raw(e, Some(self.path.clone().into())))?;
152 buf
153 };
154
155 let page =
156 BTreePage::decode(page_id, &data).map_err(|e| DsError::serialization(e.to_string()))?;
157 self.cache.insert(page_id, page.clone()).await;
158 Ok(page)
159 }
160
161 pub async fn verify_header(&self) -> Result<()> {
163 self.get_header().await?;
164 Ok(())
165 }
166
167 pub async fn clear_cache(&self) {
169 self.cache.clear().await;
170 }
171
172 pub async fn get_header(&self) -> Result<SqliteHeader> {
174 {
175 let header = self.header.read().await;
176 if let Some(h) = *header {
177 return Ok(h);
178 }
179 }
180
181 let mut file = File::open(&self.path)
182 .await
183 .map_err(|e| DsError::io_raw(e, Some(self.path.clone().into())))?;
184 let mut magic = [0u8; 16];
185 file.read_exact(&mut magic)
186 .await
187 .map_err(|e| DsError::io_raw(e, Some(self.path.clone().into())))?;
188
189 if &magic != SQLITE_MAGIC {
190 return Err(DsError::io("Invalid SQLite magic header"));
191 }
192
193 let mut page_size_buf = [0u8; 2];
194 file.seek(SeekFrom::Start(16))
195 .await
196 .map_err(|e| DsError::io_raw(e, Some(self.path.clone().into())))?;
197 file.read_exact(&mut page_size_buf)
198 .await
199 .map_err(|e| DsError::io_raw(e, Some(self.path.clone().into())))?;
200 let page_size = u16::from_be_bytes(page_size_buf);
201 let page_size = if page_size == 1 {
202 65536
203 } else {
204 page_size as u32
205 };
206
207 let mut version_buf = [0u8; 2];
208 file.seek(SeekFrom::Start(18))
209 .await
210 .map_err(|e| DsError::io_raw(e, Some(self.path.clone().into())))?;
211 file.read_exact(&mut version_buf)
212 .await
213 .map_err(|e| DsError::io_raw(e, Some(self.path.clone().into())))?;
214
215 let mut reserved_buf = [0u8; 1];
216 file.seek(SeekFrom::Start(20))
217 .await
218 .map_err(|e| DsError::io_raw(e, Some(self.path.clone().into())))?;
219 file.read_exact(&mut reserved_buf)
220 .await
221 .map_err(|e| DsError::io_raw(e, Some(self.path.clone().into())))?;
222
223 let h = SqliteHeader {
224 page_size,
225 write_version: version_buf[0],
226 read_version: version_buf[1],
227 reserved_space: reserved_buf[0],
228 };
229
230 let mut header_slot = self.header.write().await;
231 *header_slot = Some(h);
232 Ok(h)
233 }
234}