1pub mod cache;
2pub(crate) mod hnsw;
3mod lru;
4mod mapper;
5pub(crate) mod tree;
6
7use crate::ctx::Context;
8use crate::dbs::Options;
9use crate::err::Error;
10use crate::idx::trees::store::cache::TreeCache;
11use crate::idx::trees::store::hnsw::{HnswIndexes, SharedHnswIndex};
12use crate::idx::trees::store::mapper::Mappers;
13use crate::idx::trees::store::tree::{TreeRead, TreeWrite};
14use crate::idx::IndexKeyBase;
15use crate::kvs::index::IndexBuilder;
16use crate::kvs::{Key, Transaction, TransactionType, Val};
17use crate::sql::index::HnswParams;
18use crate::sql::statements::DefineIndexStatement;
19use crate::sql::Index;
20use std::fmt::{Debug, Display, Formatter};
21use std::sync::Arc;
22
23pub type NodeId = u64;
24pub type StoreGeneration = u64;
25
26#[non_exhaustive]
27#[allow(clippy::large_enum_variant)]
28pub enum TreeStore<N>
29where
30 N: TreeNode + Debug + Clone,
31{
32 Write(TreeWrite<N>),
34 Read(TreeRead<N>),
36}
37
38impl<N> TreeStore<N>
39where
40 N: TreeNode + Debug + Display + Clone,
41{
42 pub async fn new(np: TreeNodeProvider, cache: Arc<TreeCache<N>>, tt: TransactionType) -> Self {
43 match tt {
44 TransactionType::Read => Self::Read(TreeRead::new(cache)),
45 TransactionType::Write => Self::Write(TreeWrite::new(np, cache)),
46 }
47 }
48
49 pub(in crate::idx) async fn get_node_mut(
50 &mut self,
51 tx: &Transaction,
52 node_id: NodeId,
53 ) -> Result<StoredNode<N>, Error> {
54 match self {
55 Self::Write(w) => w.get_node_mut(tx, node_id).await,
56 _ => Err(fail!("TreeStore::get_node_mut")),
57 }
58 }
59
60 pub(in crate::idx) async fn get_node(
61 &self,
62 tx: &Transaction,
63 node_id: NodeId,
64 ) -> Result<Arc<StoredNode<N>>, Error> {
65 match self {
66 Self::Read(r) => r.get_node(tx, node_id).await,
67 _ => Err(fail!("TreeStore::get_node")),
68 }
69 }
70
71 pub(in crate::idx) async fn get_node_txn(
72 &self,
73 ctx: &Context,
74 node_id: NodeId,
75 ) -> Result<Arc<StoredNode<N>>, Error> {
76 match self {
77 Self::Read(r) => {
78 let tx = ctx.tx();
79 r.get_node(&tx, node_id).await
80 }
81 _ => Err(fail!("TreeStore::get_node_txn")),
82 }
83 }
84
85 pub(in crate::idx) async fn set_node(
86 &mut self,
87 node: StoredNode<N>,
88 updated: bool,
89 ) -> Result<(), Error> {
90 match self {
91 Self::Write(w) => w.set_node(node, updated),
92 _ => Err(fail!("TreeStore::set_node")),
93 }
94 }
95
96 pub(in crate::idx) fn new_node(&mut self, id: NodeId, node: N) -> Result<StoredNode<N>, Error> {
97 match self {
98 Self::Write(w) => Ok(w.new_node(id, node)?),
99 _ => Err(fail!("TreeStore::new_node")),
100 }
101 }
102
103 pub(in crate::idx) async fn remove_node(
104 &mut self,
105 node_id: NodeId,
106 node_key: Key,
107 ) -> Result<(), Error> {
108 match self {
109 Self::Write(w) => w.remove_node(node_id, node_key),
110 _ => Err(fail!("TreeStore::remove_node")),
111 }
112 }
113
114 pub async fn finish(&mut self, tx: &Transaction) -> Result<Option<TreeCache<N>>, Error> {
115 match self {
116 Self::Write(w) => w.finish(tx).await,
117 _ => Ok(None),
118 }
119 }
120}
121
122#[derive(Clone)]
123#[non_exhaustive]
124pub enum TreeNodeProvider {
125 DocIds(IndexKeyBase),
126 DocLengths(IndexKeyBase),
127 Postings(IndexKeyBase),
128 Terms(IndexKeyBase),
129 Vector(IndexKeyBase),
130 Debug,
131}
132
133impl TreeNodeProvider {
134 pub fn get_key(&self, node_id: NodeId) -> Result<Key, Error> {
135 match self {
136 TreeNodeProvider::DocIds(ikb) => ikb.new_bd_key(Some(node_id)),
137 TreeNodeProvider::DocLengths(ikb) => ikb.new_bl_key(Some(node_id)),
138 TreeNodeProvider::Postings(ikb) => ikb.new_bp_key(Some(node_id)),
139 TreeNodeProvider::Terms(ikb) => ikb.new_bt_key(Some(node_id)),
140 TreeNodeProvider::Vector(ikb) => ikb.new_vm_key(Some(node_id)),
141 TreeNodeProvider::Debug => Ok(node_id.to_be_bytes().to_vec()),
142 }
143 }
144
145 async fn load<N>(&self, tx: &Transaction, id: NodeId) -> Result<StoredNode<N>, Error>
146 where
147 N: TreeNode + Clone,
148 {
149 let key = self.get_key(id)?;
150 if let Some(val) = tx.get(key.clone(), None).await? {
151 let size = val.len() as u32;
152 let node = N::try_from_val(val)?;
153 Ok(StoredNode::new(node, id, key, size))
154 } else {
155 Err(Error::CorruptedIndex("TreeStore::load"))
156 }
157 }
158
159 async fn save<N>(&self, tx: &Transaction, node: &mut StoredNode<N>) -> Result<(), Error>
160 where
161 N: TreeNode + Clone + Display,
162 {
163 let val = node.n.try_into_val()?;
164 node.size = val.len() as u32;
165 tx.set(node.key.clone(), val, None).await?;
166 Ok(())
167 }
168}
169
170#[non_exhaustive]
171#[derive(Debug)]
172pub struct StoredNode<N>
173where
174 N: Clone + Display,
175{
176 pub(super) n: N,
177 pub(super) id: NodeId,
178 pub(super) key: Key,
179 pub(super) size: u32,
180}
181
182impl<N> StoredNode<N>
183where
184 N: Clone + Display,
185{
186 pub(super) fn new(n: N, id: NodeId, key: Key, size: u32) -> Self {
187 Self {
188 n,
189 id,
190 key,
191 size,
192 }
193 }
194}
195
196impl<N> Display for StoredNode<N>
197where
198 N: Clone + Display,
199{
200 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
201 write!(f, "node_id: {} - {}", self.id, self.n)
202 }
203}
204
205pub trait TreeNode: Debug + Clone + Display {
206 fn prepare_save(&mut self) {}
207 fn try_from_val(val: Val) -> Result<Self, Error>
208 where
209 Self: Sized;
210 fn try_into_val(&self) -> Result<Val, Error>;
211}
212
213#[derive(Clone)]
214#[non_exhaustive]
215pub struct IndexStores(Arc<Inner>);
216
217struct Inner {
218 hnsw_indexes: HnswIndexes,
219 mappers: Mappers,
220}
221
222impl Default for IndexStores {
223 fn default() -> Self {
224 Self(Arc::new(Inner {
225 hnsw_indexes: HnswIndexes::default(),
226 mappers: Mappers::default(),
227 }))
228 }
229}
230
231impl IndexStores {
232 pub(crate) async fn get_index_hnsw(
233 &self,
234 ctx: &Context,
235 opt: &Options,
236 ix: &DefineIndexStatement,
237 p: &HnswParams,
238 ) -> Result<SharedHnswIndex, Error> {
239 let (ns, db) = opt.ns_db()?;
240 let ikb = IndexKeyBase::new(ns, db, ix)?;
241 self.0.hnsw_indexes.get(ctx, &ix.what, &ikb, p).await
242 }
243
244 pub(crate) async fn index_removed(
245 &self,
246 ib: Option<&IndexBuilder>,
247 tx: &Transaction,
248 ns: &str,
249 db: &str,
250 tb: &str,
251 ix: &str,
252 ) -> Result<(), Error> {
253 if let Some(ib) = ib {
254 ib.remove_index(ns, db, tb, ix).await?;
255 }
256 self.remove_index(ns, db, tx.get_tb_index(ns, db, tb, ix).await?.as_ref()).await
257 }
258
259 pub(crate) async fn namespace_removed(
260 &self,
261 ib: Option<&IndexBuilder>,
262 tx: &Transaction,
263 ns: &str,
264 ) -> Result<(), Error> {
265 for db in tx.all_db(ns).await?.iter() {
266 self.database_removed(ib, tx, ns, &db.name).await?;
267 }
268 Ok(())
269 }
270
271 pub(crate) async fn database_removed(
272 &self,
273 ib: Option<&IndexBuilder>,
274 tx: &Transaction,
275 ns: &str,
276 db: &str,
277 ) -> Result<(), Error> {
278 for tb in tx.all_tb(ns, db, None).await?.iter() {
279 self.table_removed(ib, tx, ns, db, &tb.name).await?;
280 }
281 Ok(())
282 }
283
284 pub(crate) async fn table_removed(
285 &self,
286 ib: Option<&IndexBuilder>,
287 tx: &Transaction,
288 ns: &str,
289 db: &str,
290 tb: &str,
291 ) -> Result<(), Error> {
292 for ix in tx.all_tb_indexes(ns, db, tb).await?.iter() {
293 if let Some(ib) = ib {
294 ib.remove_index(ns, db, tb, &ix.name).await?;
295 }
296 self.remove_index(ns, db, ix).await?;
297 }
298 Ok(())
299 }
300
301 async fn remove_index(
302 &self,
303 ns: &str,
304 db: &str,
305 ix: &DefineIndexStatement,
306 ) -> Result<(), Error> {
307 if matches!(ix.index, Index::Hnsw(_)) {
308 let ikb = IndexKeyBase::new(ns, db, ix)?;
309 self.remove_hnsw_index(ikb).await?;
310 }
311 Ok(())
312 }
313
314 async fn remove_hnsw_index(&self, ikb: IndexKeyBase) -> Result<(), Error> {
315 self.0.hnsw_indexes.remove(&ikb).await?;
316 Ok(())
317 }
318
319 pub(crate) fn mappers(&self) -> &Mappers {
320 &self.0.mappers
321 }
322}