1use std::sync::Arc;
20
21use bytes::Bytes;
22
23use crate::codec::{decode_key, decode_value, encode_key, encode_value};
24use crate::error::RelError;
25use crate::index::{index_range_for_eq, IndexSpec, IndexWriter};
26use crate::predicate::Predicate;
27use crate::schema::{FieldValue, Record, Schema};
28
29use donadb::types::{BlockHeight, DomainId};
30use donadb::DonaDb;
31
32pub struct RelTableConfig {
34 pub schema: Schema,
36 pub domain_id: DomainId,
38 pub indexes: Vec<IndexSpec>,
40}
41
42impl RelTableConfig {
43 pub fn new(schema: Schema, domain_id: DomainId) -> Self {
44 let indexes = schema
46 .indexed_fields()
47 .into_iter()
48 .filter_map(|(_, field)| IndexSpec::new(&schema, &field.name).ok())
49 .collect();
50 Self {
51 schema,
52 domain_id,
53 indexes,
54 }
55 }
56
57 pub fn with_domain(mut self, domain_id: DomainId) -> Self {
58 self.domain_id = domain_id;
59 self
60 }
61}
62
63#[derive(Clone)]
66pub struct RelTable {
67 db: Arc<DonaDb>,
68 schema: Arc<Schema>,
69 domain_id: DomainId,
70 writer: Arc<IndexWriter>,
71}
72
73impl RelTable {
74 pub fn new(db: Arc<DonaDb>, config: RelTableConfig) -> Self {
75 let writer = IndexWriter::new(config.indexes);
76 Self {
77 db,
78 schema: Arc::new(config.schema),
79 domain_id: config.domain_id,
80 writer: Arc::new(writer),
81 }
82 }
83
84 pub fn schema(&self) -> &Schema {
85 &self.schema
86 }
87 pub fn domain_id(&self) -> DomainId {
88 self.domain_id
89 }
90
91 pub fn put_record(
97 &self,
98 record: &Record,
99 block_height: BlockHeight,
100 entropy: &[u8],
101 ) -> Result<(), RelError> {
102 record.validate(&self.schema)?;
103 let primary_key = encode_key(record, &self.schema)?;
104 let primary_value = encode_value(record, &self.schema)?;
105
106 let mut batch = self.db.begin_batch(block_height, entropy);
107
108 if let Some(old_bytes) = self.db.get(self.domain_id, &primary_key)? {
110 let old_record = self.decode_full_record(&primary_key, &old_bytes)?;
111 self.writer
112 .remove_indexes(&mut batch, &old_record, &self.schema, &primary_key)?;
113 }
114
115 batch.put(self.domain_id, primary_key.clone(), primary_value);
117
118 self.writer
120 .write_indexes(&mut batch, record, &self.schema, &primary_key)?;
121
122 batch.commit()?;
123 Ok(())
124 }
125
126 pub fn put_records(
130 &self,
131 records: &[Record],
132 block_height: BlockHeight,
133 entropy: &[u8],
134 ) -> Result<(), RelError> {
135 let mut batch = self.db.begin_batch(block_height, entropy);
136 for record in records {
137 record.validate(&self.schema)?;
138 let primary_key = encode_key(record, &self.schema)?;
139 let primary_value = encode_value(record, &self.schema)?;
140 if let Some(old_bytes) = self.db.get(self.domain_id, &primary_key)? {
141 let old = self.decode_full_record(&primary_key, &old_bytes)?;
142 self.writer
143 .remove_indexes(&mut batch, &old, &self.schema, &primary_key)?;
144 }
145 batch.put(self.domain_id, primary_key.clone(), primary_value);
146 self.writer
147 .write_indexes(&mut batch, record, &self.schema, &primary_key)?;
148 }
149 batch.commit()?;
150 Ok(())
151 }
152
153 pub fn get_record(&self, key_values: &[FieldValue]) -> Result<Option<Record>, RelError> {
157 let primary_key = self.build_key_from_values(key_values)?;
158 match self.db.get(self.domain_id, &primary_key)? {
159 Some(bytes) if !bytes.is_empty() => {
160 Ok(Some(self.decode_full_record(&primary_key, &bytes)?))
161 }
162 _ => Ok(None),
163 }
164 }
165
166 pub fn get_record_at(
168 &self,
169 key_values: &[FieldValue],
170 block_height: BlockHeight,
171 ) -> Result<Option<Record>, RelError> {
172 let primary_key = self.build_key_from_values(key_values)?;
173 match self.db.get_at(self.domain_id, &primary_key, block_height)? {
174 Some(bytes) if !bytes.is_empty() => {
175 Ok(Some(self.decode_full_record(&primary_key, &bytes)?))
176 }
177 _ => Ok(None),
178 }
179 }
180
181 pub fn scan_where(&self, predicates: &[Predicate]) -> Result<Vec<Record>, RelError> {
186 let index_hit = predicates.iter().find_map(|p| {
188 self.writer
189 .specs
190 .iter()
191 .find(|s| s.field_name == p.field)
192 .map(|spec| (p, spec))
193 });
194
195 let raw_results: Vec<(Bytes, Bytes)> = if let Some((pred, spec)) = index_hit {
196 if let crate::predicate::FieldOp::Eq(ref val) = pred.op {
198 let field = &self.schema.fields[spec.field_idx];
199 let (start, end) = index_range_for_eq(val, &field.field_type)?;
200 let index_entries = self.db.scan(spec.domain_id, &start, &end)?;
202 let mut results = Vec::new();
204 for (_idx_key, pk_bytes) in index_entries {
205 if pk_bytes.is_empty() {
206 continue;
207 } if let Some(val_bytes) = self.db.get(self.domain_id, &pk_bytes)? {
209 if !val_bytes.is_empty() {
210 results.push((pk_bytes, val_bytes));
211 }
212 }
213 }
214 results
215 } else {
216 self.full_scan_raw()?
217 }
218 } else {
219 self.full_scan_raw()?
220 };
221
222 let mut records = Vec::new();
224 for (pk_bytes, val_bytes) in raw_results {
225 let record = self.decode_full_record(&pk_bytes, &val_bytes)?;
226 if self.record_matches(&record, predicates)? {
227 records.push(record);
228 }
229 }
230 Ok(records)
231 }
232
233 pub fn scan_range(
235 &self,
236 start_key: &[FieldValue],
237 end_key: &[FieldValue],
238 ) -> Result<Vec<Record>, RelError> {
239 let start = self.build_key_from_values(start_key)?;
240 let end = self.build_key_from_values(end_key)?;
241 let raw = self.db.scan(self.domain_id, &start, &end)?;
242 let mut records = Vec::new();
243 for (pk, val) in raw {
244 if !val.is_empty() {
245 records.push(self.decode_full_record(&pk, &val)?);
246 }
247 }
248 Ok(records)
249 }
250
251 pub fn scan_prefix_raw(&self, prefix: &[u8]) -> Result<Vec<Record>, RelError> {
253 let raw = self.db.scan_prefix_domain(self.domain_id, prefix)?;
254 let mut records = Vec::new();
255 for (pk, val) in raw {
256 if !val.is_empty() {
257 records.push(self.decode_full_record(&pk, &val)?);
258 }
259 }
260 Ok(records)
261 }
262
263 pub fn count_where(&self, predicates: &[Predicate]) -> Result<usize, RelError> {
265 Ok(self.scan_where(predicates)?.len())
266 }
267
268 pub fn scan_all_raw(&self) -> Result<Vec<Record>, RelError> {
270 let pairs = self.full_scan_raw()?;
271 pairs
272 .into_iter()
273 .map(|(pk, val)| self.decode_full_record(&pk, &val))
274 .collect()
275 }
276
277 pub fn follow_ref(
286 &self,
287 record: &Record,
288 ref_field: &str,
289 target: &RelTable,
290 ) -> Result<Option<Record>, RelError> {
291 let (idx, _) = self
292 .schema
293 .field(ref_field)
294 .ok_or_else(|| RelError::UnknownField(ref_field.to_string()))?;
295 let fk_value = record
296 .values
297 .get(idx)
298 .ok_or_else(|| RelError::MissingField(ref_field.to_string()))?;
299 if fk_value == &FieldValue::Null {
300 return Ok(None);
301 }
302 target.get_record(&[fk_value.clone()])
303 }
304
305 pub fn delete_record(
311 &self,
312 key_values: &[FieldValue],
313 block_height: BlockHeight,
314 entropy: &[u8],
315 ) -> Result<(), RelError> {
316 let primary_key = self.build_key_from_values(key_values)?;
317 if let Some(old_bytes) = self.db.get(self.domain_id, &primary_key)? {
318 let old_record = self.decode_full_record(&primary_key, &old_bytes)?;
319 let mut batch = self.db.begin_batch(block_height, entropy);
320 self.writer
321 .remove_indexes(&mut batch, &old_record, &self.schema, &primary_key)?;
322 batch.put(self.domain_id, primary_key, Bytes::new());
324 batch.commit()?;
325 }
326 Ok(())
327 }
328
329 fn build_key_from_values(&self, key_values: &[FieldValue]) -> Result<Bytes, RelError> {
334 use crate::codec::encode_field_key;
335 use bytes::BytesMut;
336 let key_fields = self.schema.key_fields();
337 if key_values.len() != key_fields.len() {
338 return Err(RelError::Schema(format!(
339 "schema '{}' has {} key fields, got {}",
340 self.schema.name,
341 key_fields.len(),
342 key_values.len()
343 )));
344 }
345 let mut buf = BytesMut::new();
346 for ((_, field), value) in key_fields.iter().zip(key_values.iter()) {
347 encode_field_key(&mut buf, &field.field_type, value)?;
348 }
349 Ok(buf.freeze())
350 }
351
352 fn full_scan_raw(&self) -> Result<Vec<(Bytes, Bytes)>, RelError> {
353 let all = self.db.scan_all(self.domain_id)?;
354 Ok(all.into_iter().filter(|(_, v)| !v.is_empty()).collect())
355 }
356
357 fn decode_full_record(&self, pk_bytes: &Bytes, val_bytes: &Bytes) -> Result<Record, RelError> {
358 let mut values: Vec<FieldValue> = vec![FieldValue::Null; self.schema.fields.len()];
360 for (idx, v) in decode_key(pk_bytes, &self.schema)? {
361 values[idx] = v;
362 }
363 let decoded_vals = decode_value(val_bytes, &self.schema)?;
365 for (i, v) in decoded_vals.into_iter().enumerate() {
366 if !self.schema.fields[i].is_key {
367 values[i] = v;
368 }
369 }
370 Ok(Record::new(values))
371 }
372
373 fn record_matches(&self, record: &Record, predicates: &[Predicate]) -> Result<bool, RelError> {
374 for pred in predicates {
375 let (idx, _) = self
376 .schema
377 .field(&pred.field)
378 .ok_or_else(|| RelError::UnknownField(pred.field.clone()))?;
379 let value = record.values.get(idx).unwrap_or(&FieldValue::Null);
380 if !pred.evaluate(value) {
381 return Ok(false);
382 }
383 }
384 Ok(true)
385 }
386}
387
388#[cfg(test)]
391mod tests {
392 use super::*;
393 use bytes::Bytes;
394 use std::sync::Arc;
395 use tempfile::TempDir;
396
397 use crate::predicate::Predicate;
398 use crate::schema::{Field, FieldType, FieldValue, Record, Schema};
399
400 fn open_db(dir: &TempDir) -> Arc<DonaDb> {
401 use donadb::DonaDbConfig;
402 Arc::new(
403 DonaDb::open(DonaDbConfig {
404 data_dir: dir.path().to_path_buf(),
405 shard_count: 16,
406 compaction_threads: 2,
407 block_cache_bytes: 8 * 1024 * 1024,
408 write_buffer_bytes: 16 * 1024 * 1024,
409 ..Default::default()
410 })
411 .unwrap(),
412 )
413 }
414
415 fn account_schema() -> Schema {
416 Schema::new(
417 "accounts",
418 vec![
419 Field::key("address", FieldType::Address),
420 Field::value("balance", FieldType::U128),
421 Field::indexed_value("nonce", FieldType::U64),
422 ],
423 )
424 }
425
426 fn make_account(addr: [u8; 32], balance: u128, nonce: u64) -> Record {
427 Record::new(vec![
428 FieldValue::Bytes(addr.to_vec()),
429 FieldValue::U128(balance),
430 FieldValue::U64(nonce),
431 ])
432 }
433
434 #[test]
435 fn test_put_and_get() {
436 let dir = TempDir::new().unwrap();
437 let db = open_db(&dir);
438 let tbl = RelTable::new(db, RelTableConfig::new(account_schema(), 1));
439
440 let addr = [0xABu8; 32];
441 let rec = make_account(addr, 1_000_000, 0);
442 tbl.put_record(&rec, 1, b"entropy1").unwrap();
443
444 let fetched = tbl.get_record(&[FieldValue::Bytes(addr.to_vec())]).unwrap();
445 assert!(fetched.is_some());
446 let r = fetched.unwrap();
447 assert_eq!(r.values[1], FieldValue::U128(1_000_000));
448 assert_eq!(r.values[2], FieldValue::U64(0));
449 }
450
451 #[test]
452 fn test_scan_where_full_scan() {
453 let dir = TempDir::new().unwrap();
454 let db = open_db(&dir);
455 let tbl = RelTable::new(db, RelTableConfig::new(account_schema(), 1));
456
457 for i in 0u8..10 {
458 let mut addr = [0u8; 32];
459 addr[31] = i;
460 tbl.put_record(
461 &make_account(addr, (i as u128) * 100, i as u64),
462 i as u64 + 1,
463 b"e",
464 )
465 .unwrap();
466 }
467 tbl.db.finalize_block(10).unwrap();
468
469 let results = tbl
470 .scan_where(&[Predicate::gte("balance", FieldValue::U128(500))])
471 .unwrap();
472 assert_eq!(results.len(), 5); }
474
475 #[test]
476 fn test_scan_where_index_accelerated() {
477 let dir = TempDir::new().unwrap();
478 let db = open_db(&dir);
479 let tbl = RelTable::new(db, RelTableConfig::new(account_schema(), 2));
481
482 for i in 0u8..5 {
483 let mut addr = [0u8; 32];
484 addr[31] = i;
485 tbl.put_record(&make_account(addr, 1000, i as u64), i as u64 + 1, b"e")
486 .unwrap();
487 }
488 tbl.db.finalize_block(5).unwrap();
489
490 let results = tbl
492 .scan_where(&[Predicate::eq("nonce", FieldValue::U64(3))])
493 .unwrap();
494 assert_eq!(results.len(), 1);
495 assert_eq!(results[0].values[2], FieldValue::U64(3));
496 }
497
498 #[test]
499 fn test_point_in_time_read() {
500 let dir = TempDir::new().unwrap();
501 let db = open_db(&dir);
502 let tbl = RelTable::new(db, RelTableConfig::new(account_schema(), 3));
503
504 let addr = [0x01u8; 32];
505 tbl.put_record(&make_account(addr, 500, 0), 10, b"e")
506 .unwrap();
507 tbl.put_record(&make_account(addr, 900, 1), 20, b"e")
508 .unwrap();
509
510 let at_10 = tbl
511 .get_record_at(&[FieldValue::Bytes(addr.to_vec())], 10)
512 .unwrap()
513 .unwrap();
514 assert_eq!(at_10.values[1], FieldValue::U128(500));
515
516 let at_20 = tbl
517 .get_record_at(&[FieldValue::Bytes(addr.to_vec())], 20)
518 .unwrap()
519 .unwrap();
520 assert_eq!(at_20.values[1], FieldValue::U128(900));
521 }
522
523 #[test]
524 fn test_follow_ref() {
525 let dir = TempDir::new().unwrap();
526 let db = open_db(&dir);
527
528 let acc_schema = account_schema();
529 let acc_tbl = RelTable::new(db.clone(), RelTableConfig::new(acc_schema, 4));
530
531 let token_schema = Schema::new(
532 "token_balances",
533 vec![
534 Field::key("cell_id", FieldType::U64),
535 Field::value("holder", FieldType::Address),
536 Field::value("amount", FieldType::U128),
537 ],
538 );
539 let tok_tbl = RelTable::new(db.clone(), RelTableConfig::new(token_schema, 5));
540
541 let addr = [0x11u8; 32];
542 acc_tbl
543 .put_record(&make_account(addr, 5000, 2), 1, b"e")
544 .unwrap();
545
546 let tok_record = Record::new(vec![
547 FieldValue::U64(42),
548 FieldValue::Bytes(addr.to_vec()),
549 FieldValue::U128(250),
550 ]);
551 tok_tbl.put_record(&tok_record, 2, b"e").unwrap();
552 db.finalize_block(2).unwrap();
553
554 let fetched_tok = tok_tbl.get_record(&[FieldValue::U64(42)]).unwrap().unwrap();
556 let account = tok_tbl
557 .follow_ref(&fetched_tok, "holder", &acc_tbl)
558 .unwrap();
559 assert!(account.is_some());
560 assert_eq!(account.unwrap().values[1], FieldValue::U128(5000));
561 }
562
563 #[test]
564 fn test_delete_record() {
565 let dir = TempDir::new().unwrap();
566 let db = open_db(&dir);
567 let tbl = RelTable::new(db, RelTableConfig::new(account_schema(), 6));
568
569 let addr = [0x22u8; 32];
570 tbl.put_record(&make_account(addr, 1000, 0), 1, b"e")
571 .unwrap();
572 tbl.delete_record(&[FieldValue::Bytes(addr.to_vec())], 2, b"e")
573 .unwrap();
574 let result = tbl.get_record(&[FieldValue::Bytes(addr.to_vec())]).unwrap();
575 assert!(result.is_none());
576 }
577
578 #[test]
579 fn test_batch_put_records() {
580 let dir = TempDir::new().unwrap();
581 let db = open_db(&dir);
582 let tbl = RelTable::new(db, RelTableConfig::new(account_schema(), 7));
583
584 let records: Vec<Record> = (0u8..20)
585 .map(|i| {
586 let mut addr = [0u8; 32];
587 addr[31] = i;
588 make_account(addr, i as u128 * 1000, i as u64)
589 })
590 .collect();
591
592 tbl.put_records(&records, 1, b"batch").unwrap();
593 tbl.db.finalize_block(1).unwrap();
594
595 let count = tbl
596 .count_where(&[Predicate::gte("balance", FieldValue::U128(10_000))])
597 .unwrap();
598 assert_eq!(count, 10);
599 }
600}