Skip to main content

donadb_rel/
relation.rs

1// relation.rs — RelTable: the unified relational read/write API.
2//
3// RelTable wraps a DonaDb instance with:
4//   - A registered Schema (typed field definitions)
5//   - An IndexWriter (secondary index maintenance)
6//   - put_record()   — write a typed record (primary + indexes in one batch)
7//   - get_record()   — fetch a typed record by primary key
8//   - get_record_at()— fetch at a specific block height (point-in-time)
9//   - scan_where()   — filter records by field predicates (index-accelerated)
10//   - scan_range()   — range scan by primary key bounds
11//   - scan_prefix()  — prefix scan on primary key
12//   - follow_ref()   — resolve a foreign key to a record in another RelTable
13//   - delete_record()— tombstone a record (DonaDB never deletes, writes empty)
14//   - count_where()  — count matching records without materializing them
15//
16// All writes go through DonaDB WriteBatch — atomic, WAL-backed, fsync-safe.
17// RelTable never bypasses DonaDB's durability model.
18
19use std::sync::Arc;
20
21use bytes::Bytes;
22
23use crate::codec::{decode_key, decode_value, encode_key, encode_value};
24use crate::error::RelError;
25use crate::index::{index_range_for_eq, IndexSpec, IndexWriter};
26use crate::predicate::Predicate;
27use crate::schema::{FieldValue, Record, Schema};
28
29use donadb::types::{BlockHeight, DomainId};
30use donadb::DonaDb;
31
32/// Configuration for a RelTable.
33pub struct RelTableConfig {
34    /// The schema describing records stored in this table.
35    pub schema: Schema,
36    /// DonaDB domain used for primary records.
37    pub domain_id: DomainId,
38    /// Secondary index specs — one per indexed field.
39    pub indexes: Vec<IndexSpec>,
40}
41
42impl RelTableConfig {
43    pub fn new(schema: Schema, domain_id: DomainId) -> Self {
44        // Auto-derive index specs for all fields marked indexed=true
45        let indexes = schema
46            .indexed_fields()
47            .into_iter()
48            .filter_map(|(_, field)| IndexSpec::new(&schema, &field.name).ok())
49            .collect();
50        Self {
51            schema,
52            domain_id,
53            indexes,
54        }
55    }
56
57    pub fn with_domain(mut self, domain_id: DomainId) -> Self {
58        self.domain_id = domain_id;
59        self
60    }
61}
62
63/// RelTable — the main relational interface for one entity type.
64/// Thin, cheap to clone (Arc internally).
65#[derive(Clone)]
66pub struct RelTable {
67    db: Arc<DonaDb>,
68    schema: Arc<Schema>,
69    domain_id: DomainId,
70    writer: Arc<IndexWriter>,
71}
72
73impl RelTable {
74    pub fn new(db: Arc<DonaDb>, config: RelTableConfig) -> Self {
75        let writer = IndexWriter::new(config.indexes);
76        Self {
77            db,
78            schema: Arc::new(config.schema),
79            domain_id: config.domain_id,
80            writer: Arc::new(writer),
81        }
82    }
83
84    pub fn schema(&self) -> &Schema {
85        &self.schema
86    }
87    pub fn domain_id(&self) -> DomainId {
88        self.domain_id
89    }
90
91    // ── Write API ─────────────────────────────────────────────────────────────
92
93    /// Write a typed record at block_height.
94    /// Validates field types, encodes key + value, maintains all secondary indexes.
95    /// Everything goes into a single WriteBatch — atomic across primary + all indexes.
96    pub fn put_record(
97        &self,
98        record: &Record,
99        block_height: BlockHeight,
100        entropy: &[u8],
101    ) -> Result<(), RelError> {
102        record.validate(&self.schema)?;
103        let primary_key = encode_key(record, &self.schema)?;
104        let primary_value = encode_value(record, &self.schema)?;
105
106        let mut batch = self.db.begin_batch(block_height, entropy);
107
108        // If an old version exists, remove its index entries first
109        if let Some(old_bytes) = self.db.get(self.domain_id, &primary_key)? {
110            let old_record = self.decode_full_record(&primary_key, &old_bytes)?;
111            self.writer
112                .remove_indexes(&mut batch, &old_record, &self.schema, &primary_key)?;
113        }
114
115        // Write primary record
116        batch.put(self.domain_id, primary_key.clone(), primary_value);
117
118        // Write all secondary index entries
119        self.writer
120            .write_indexes(&mut batch, record, &self.schema, &primary_key)?;
121
122        batch.commit()?;
123        Ok(())
124    }
125
126    /// Batch put for multiple records — single WriteBatch for all of them.
127    /// More efficient than calling put_record() N times when writing multiple
128    /// records at the same block height.
129    pub fn put_records(
130        &self,
131        records: &[Record],
132        block_height: BlockHeight,
133        entropy: &[u8],
134    ) -> Result<(), RelError> {
135        let mut batch = self.db.begin_batch(block_height, entropy);
136        for record in records {
137            record.validate(&self.schema)?;
138            let primary_key = encode_key(record, &self.schema)?;
139            let primary_value = encode_value(record, &self.schema)?;
140            if let Some(old_bytes) = self.db.get(self.domain_id, &primary_key)? {
141                let old = self.decode_full_record(&primary_key, &old_bytes)?;
142                self.writer
143                    .remove_indexes(&mut batch, &old, &self.schema, &primary_key)?;
144            }
145            batch.put(self.domain_id, primary_key.clone(), primary_value);
146            self.writer
147                .write_indexes(&mut batch, record, &self.schema, &primary_key)?;
148        }
149        batch.commit()?;
150        Ok(())
151    }
152
153    // ── Read API ──────────────────────────────────────────────────────────────
154
155    /// Get a record by its primary key field values (latest version).
156    pub fn get_record(&self, key_values: &[FieldValue]) -> Result<Option<Record>, RelError> {
157        let primary_key = self.build_key_from_values(key_values)?;
158        match self.db.get(self.domain_id, &primary_key)? {
159            Some(bytes) if !bytes.is_empty() => {
160                Ok(Some(self.decode_full_record(&primary_key, &bytes)?))
161            }
162            _ => Ok(None),
163        }
164    }
165
166    /// Get a record at a specific block height (point-in-time read).
167    pub fn get_record_at(
168        &self,
169        key_values: &[FieldValue],
170        block_height: BlockHeight,
171    ) -> Result<Option<Record>, RelError> {
172        let primary_key = self.build_key_from_values(key_values)?;
173        match self.db.get_at(self.domain_id, &primary_key, block_height)? {
174            Some(bytes) if !bytes.is_empty() => {
175                Ok(Some(self.decode_full_record(&primary_key, &bytes)?))
176            }
177            _ => Ok(None),
178        }
179    }
180
181    // ── Scan API ──────────────────────────────────────────────────────────────
182
183    /// Filter records by predicates. Uses secondary index when available for Eq predicates.
184    /// All predicates must be satisfied (implicit AND).
185    pub fn scan_where(&self, predicates: &[Predicate]) -> Result<Vec<Record>, RelError> {
186        // Attempt index acceleration: find the first Eq predicate with an index
187        let index_hit = predicates.iter().find_map(|p| {
188            self.writer
189                .specs
190                .iter()
191                .find(|s| s.field_name == p.field)
192                .map(|spec| (p, spec))
193        });
194
195        let raw_results: Vec<(Bytes, Bytes)> = if let Some((pred, spec)) = index_hit {
196            // Use the secondary index for this Eq predicate
197            if let crate::predicate::FieldOp::Eq(ref val) = pred.op {
198                let field = &self.schema.fields[spec.field_idx];
199                let (start, end) = index_range_for_eq(val, &field.field_type)?;
200                // Scan the index domain to get primary keys
201                let index_entries = self.db.scan(spec.domain_id, &start, &end)?;
202                // Resolve each primary key to a full record
203                let mut results = Vec::new();
204                for (_idx_key, pk_bytes) in index_entries {
205                    if pk_bytes.is_empty() {
206                        continue;
207                    } // tombstone
208                    if let Some(val_bytes) = self.db.get(self.domain_id, &pk_bytes)? {
209                        if !val_bytes.is_empty() {
210                            results.push((pk_bytes, val_bytes));
211                        }
212                    }
213                }
214                results
215            } else {
216                self.full_scan_raw()?
217            }
218        } else {
219            self.full_scan_raw()?
220        };
221
222        // Decode and filter
223        let mut records = Vec::new();
224        for (pk_bytes, val_bytes) in raw_results {
225            let record = self.decode_full_record(&pk_bytes, &val_bytes)?;
226            if self.record_matches(&record, predicates)? {
227                records.push(record);
228            }
229        }
230        Ok(records)
231    }
232
233    /// Range scan by primary key bounds. Returns records ordered by primary key.
234    pub fn scan_range(
235        &self,
236        start_key: &[FieldValue],
237        end_key: &[FieldValue],
238    ) -> Result<Vec<Record>, RelError> {
239        let start = self.build_key_from_values(start_key)?;
240        let end = self.build_key_from_values(end_key)?;
241        let raw = self.db.scan(self.domain_id, &start, &end)?;
242        let mut records = Vec::new();
243        for (pk, val) in raw {
244            if !val.is_empty() {
245                records.push(self.decode_full_record(&pk, &val)?);
246            }
247        }
248        Ok(records)
249    }
250
251    /// Prefix scan on primary key bytes. Returns all records whose key starts with prefix.
252    pub fn scan_prefix_raw(&self, prefix: &[u8]) -> Result<Vec<Record>, RelError> {
253        let raw = self.db.scan_prefix_domain(self.domain_id, prefix)?;
254        let mut records = Vec::new();
255        for (pk, val) in raw {
256            if !val.is_empty() {
257                records.push(self.decode_full_record(&pk, &val)?);
258            }
259        }
260        Ok(records)
261    }
262
263    /// Count records matching predicates without materialising them.
264    pub fn count_where(&self, predicates: &[Predicate]) -> Result<usize, RelError> {
265        Ok(self.scan_where(predicates)?.len())
266    }
267
268    /// Scan all records in this table (no filter).
269    pub fn scan_all_raw(&self) -> Result<Vec<Record>, RelError> {
270        let pairs = self.full_scan_raw()?;
271        pairs
272            .into_iter()
273            .map(|(pk, val)| self.decode_full_record(&pk, &val))
274            .collect()
275    }
276
277    // ── Foreign key traversal ─────────────────────────────────────────────────
278
279    /// Follow a foreign key reference: read `ref_field` from this record,
280    /// then look it up as a primary key in `target` RelTable.
281    ///
282    /// Example:
283    ///   token_transfers.follow_ref(record, "holder_address", &accounts_table)
284    ///   → returns the Account record for the holder
285    pub fn follow_ref(
286        &self,
287        record: &Record,
288        ref_field: &str,
289        target: &RelTable,
290    ) -> Result<Option<Record>, RelError> {
291        let (idx, _) = self
292            .schema
293            .field(ref_field)
294            .ok_or_else(|| RelError::UnknownField(ref_field.to_string()))?;
295        let fk_value = record
296            .values
297            .get(idx)
298            .ok_or_else(|| RelError::MissingField(ref_field.to_string()))?;
299        if fk_value == &FieldValue::Null {
300            return Ok(None);
301        }
302        target.get_record(&[fk_value.clone()])
303    }
304
305    // ── Delete ────────────────────────────────────────────────────────────────
306
307    /// Tombstone a record: write an empty value to mark it deleted.
308    /// DonaDB never physically removes data — this is the correct deletion model.
309    /// Index entries are also tombstoned so they do not appear in future scans.
310    pub fn delete_record(
311        &self,
312        key_values: &[FieldValue],
313        block_height: BlockHeight,
314        entropy: &[u8],
315    ) -> Result<(), RelError> {
316        let primary_key = self.build_key_from_values(key_values)?;
317        if let Some(old_bytes) = self.db.get(self.domain_id, &primary_key)? {
318            let old_record = self.decode_full_record(&primary_key, &old_bytes)?;
319            let mut batch = self.db.begin_batch(block_height, entropy);
320            self.writer
321                .remove_indexes(&mut batch, &old_record, &self.schema, &primary_key)?;
322            // Tombstone: empty value at this height
323            batch.put(self.domain_id, primary_key, Bytes::new());
324            batch.commit()?;
325        }
326        Ok(())
327    }
328
329    // ── Point-in-time scan ────────────────────────────────────────────────────
330
331    // ── Internal helpers ─────────────────────────────────────────────────────
332
333    fn build_key_from_values(&self, key_values: &[FieldValue]) -> Result<Bytes, RelError> {
334        use crate::codec::encode_field_key;
335        use bytes::BytesMut;
336        let key_fields = self.schema.key_fields();
337        if key_values.len() != key_fields.len() {
338            return Err(RelError::Schema(format!(
339                "schema '{}' has {} key fields, got {}",
340                self.schema.name,
341                key_fields.len(),
342                key_values.len()
343            )));
344        }
345        let mut buf = BytesMut::new();
346        for ((_, field), value) in key_fields.iter().zip(key_values.iter()) {
347            encode_field_key(&mut buf, &field.field_type, value)?;
348        }
349        Ok(buf.freeze())
350    }
351
352    fn full_scan_raw(&self) -> Result<Vec<(Bytes, Bytes)>, RelError> {
353        let all = self.db.scan_all(self.domain_id)?;
354        Ok(all.into_iter().filter(|(_, v)| !v.is_empty()).collect())
355    }
356
357    fn decode_full_record(&self, pk_bytes: &Bytes, val_bytes: &Bytes) -> Result<Record, RelError> {
358        // Decode key fields from pk_bytes
359        let mut values: Vec<FieldValue> = vec![FieldValue::Null; self.schema.fields.len()];
360        for (idx, v) in decode_key(pk_bytes, &self.schema)? {
361            values[idx] = v;
362        }
363        // Decode value fields from val_bytes
364        let decoded_vals = decode_value(val_bytes, &self.schema)?;
365        for (i, v) in decoded_vals.into_iter().enumerate() {
366            if !self.schema.fields[i].is_key {
367                values[i] = v;
368            }
369        }
370        Ok(Record::new(values))
371    }
372
373    fn record_matches(&self, record: &Record, predicates: &[Predicate]) -> Result<bool, RelError> {
374        for pred in predicates {
375            let (idx, _) = self
376                .schema
377                .field(&pred.field)
378                .ok_or_else(|| RelError::UnknownField(pred.field.clone()))?;
379            let value = record.values.get(idx).unwrap_or(&FieldValue::Null);
380            if !pred.evaluate(value) {
381                return Ok(false);
382            }
383        }
384        Ok(true)
385    }
386}
387
388// ── Tests ─────────────────────────────────────────────────────────────────────
389
390#[cfg(test)]
391mod tests {
392    use super::*;
393    use bytes::Bytes;
394    use std::sync::Arc;
395    use tempfile::TempDir;
396
397    use crate::predicate::Predicate;
398    use crate::schema::{Field, FieldType, FieldValue, Record, Schema};
399
400    fn open_db(dir: &TempDir) -> Arc<DonaDb> {
401        use donadb::DonaDbConfig;
402        Arc::new(
403            DonaDb::open(DonaDbConfig {
404                data_dir: dir.path().to_path_buf(),
405                shard_count: 16,
406                compaction_threads: 2,
407                block_cache_bytes: 8 * 1024 * 1024,
408                write_buffer_bytes: 16 * 1024 * 1024,
409                ..Default::default()
410            })
411            .unwrap(),
412        )
413    }
414
415    fn account_schema() -> Schema {
416        Schema::new(
417            "accounts",
418            vec![
419                Field::key("address", FieldType::Address),
420                Field::value("balance", FieldType::U128),
421                Field::indexed_value("nonce", FieldType::U64),
422            ],
423        )
424    }
425
426    fn make_account(addr: [u8; 32], balance: u128, nonce: u64) -> Record {
427        Record::new(vec![
428            FieldValue::Bytes(addr.to_vec()),
429            FieldValue::U128(balance),
430            FieldValue::U64(nonce),
431        ])
432    }
433
434    #[test]
435    fn test_put_and_get() {
436        let dir = TempDir::new().unwrap();
437        let db = open_db(&dir);
438        let tbl = RelTable::new(db, RelTableConfig::new(account_schema(), 1));
439
440        let addr = [0xABu8; 32];
441        let rec = make_account(addr, 1_000_000, 0);
442        tbl.put_record(&rec, 1, b"entropy1").unwrap();
443
444        let fetched = tbl.get_record(&[FieldValue::Bytes(addr.to_vec())]).unwrap();
445        assert!(fetched.is_some());
446        let r = fetched.unwrap();
447        assert_eq!(r.values[1], FieldValue::U128(1_000_000));
448        assert_eq!(r.values[2], FieldValue::U64(0));
449    }
450
451    #[test]
452    fn test_scan_where_full_scan() {
453        let dir = TempDir::new().unwrap();
454        let db = open_db(&dir);
455        let tbl = RelTable::new(db, RelTableConfig::new(account_schema(), 1));
456
457        for i in 0u8..10 {
458            let mut addr = [0u8; 32];
459            addr[31] = i;
460            tbl.put_record(
461                &make_account(addr, (i as u128) * 100, i as u64),
462                i as u64 + 1,
463                b"e",
464            )
465            .unwrap();
466        }
467        tbl.db.finalize_block(10).unwrap();
468
469        let results = tbl
470            .scan_where(&[Predicate::gte("balance", FieldValue::U128(500))])
471            .unwrap();
472        assert_eq!(results.len(), 5); // balances 500,600,700,800,900
473    }
474
475    #[test]
476    fn test_scan_where_index_accelerated() {
477        let dir = TempDir::new().unwrap();
478        let db = open_db(&dir);
479        // nonce is indexed
480        let tbl = RelTable::new(db, RelTableConfig::new(account_schema(), 2));
481
482        for i in 0u8..5 {
483            let mut addr = [0u8; 32];
484            addr[31] = i;
485            tbl.put_record(&make_account(addr, 1000, i as u64), i as u64 + 1, b"e")
486                .unwrap();
487        }
488        tbl.db.finalize_block(5).unwrap();
489
490        // nonce == 3 should hit the secondary index
491        let results = tbl
492            .scan_where(&[Predicate::eq("nonce", FieldValue::U64(3))])
493            .unwrap();
494        assert_eq!(results.len(), 1);
495        assert_eq!(results[0].values[2], FieldValue::U64(3));
496    }
497
498    #[test]
499    fn test_point_in_time_read() {
500        let dir = TempDir::new().unwrap();
501        let db = open_db(&dir);
502        let tbl = RelTable::new(db, RelTableConfig::new(account_schema(), 3));
503
504        let addr = [0x01u8; 32];
505        tbl.put_record(&make_account(addr, 500, 0), 10, b"e")
506            .unwrap();
507        tbl.put_record(&make_account(addr, 900, 1), 20, b"e")
508            .unwrap();
509
510        let at_10 = tbl
511            .get_record_at(&[FieldValue::Bytes(addr.to_vec())], 10)
512            .unwrap()
513            .unwrap();
514        assert_eq!(at_10.values[1], FieldValue::U128(500));
515
516        let at_20 = tbl
517            .get_record_at(&[FieldValue::Bytes(addr.to_vec())], 20)
518            .unwrap()
519            .unwrap();
520        assert_eq!(at_20.values[1], FieldValue::U128(900));
521    }
522
523    #[test]
524    fn test_follow_ref() {
525        let dir = TempDir::new().unwrap();
526        let db = open_db(&dir);
527
528        let acc_schema = account_schema();
529        let acc_tbl = RelTable::new(db.clone(), RelTableConfig::new(acc_schema, 4));
530
531        let token_schema = Schema::new(
532            "token_balances",
533            vec![
534                Field::key("cell_id", FieldType::U64),
535                Field::value("holder", FieldType::Address),
536                Field::value("amount", FieldType::U128),
537            ],
538        );
539        let tok_tbl = RelTable::new(db.clone(), RelTableConfig::new(token_schema, 5));
540
541        let addr = [0x11u8; 32];
542        acc_tbl
543            .put_record(&make_account(addr, 5000, 2), 1, b"e")
544            .unwrap();
545
546        let tok_record = Record::new(vec![
547            FieldValue::U64(42),
548            FieldValue::Bytes(addr.to_vec()),
549            FieldValue::U128(250),
550        ]);
551        tok_tbl.put_record(&tok_record, 2, b"e").unwrap();
552        db.finalize_block(2).unwrap();
553
554        // Fetch the token record then follow the "holder" ref to the account
555        let fetched_tok = tok_tbl.get_record(&[FieldValue::U64(42)]).unwrap().unwrap();
556        let account = tok_tbl
557            .follow_ref(&fetched_tok, "holder", &acc_tbl)
558            .unwrap();
559        assert!(account.is_some());
560        assert_eq!(account.unwrap().values[1], FieldValue::U128(5000));
561    }
562
563    #[test]
564    fn test_delete_record() {
565        let dir = TempDir::new().unwrap();
566        let db = open_db(&dir);
567        let tbl = RelTable::new(db, RelTableConfig::new(account_schema(), 6));
568
569        let addr = [0x22u8; 32];
570        tbl.put_record(&make_account(addr, 1000, 0), 1, b"e")
571            .unwrap();
572        tbl.delete_record(&[FieldValue::Bytes(addr.to_vec())], 2, b"e")
573            .unwrap();
574        let result = tbl.get_record(&[FieldValue::Bytes(addr.to_vec())]).unwrap();
575        assert!(result.is_none());
576    }
577
578    #[test]
579    fn test_batch_put_records() {
580        let dir = TempDir::new().unwrap();
581        let db = open_db(&dir);
582        let tbl = RelTable::new(db, RelTableConfig::new(account_schema(), 7));
583
584        let records: Vec<Record> = (0u8..20)
585            .map(|i| {
586                let mut addr = [0u8; 32];
587                addr[31] = i;
588                make_account(addr, i as u128 * 1000, i as u64)
589            })
590            .collect();
591
592        tbl.put_records(&records, 1, b"batch").unwrap();
593        tbl.db.finalize_block(1).unwrap();
594
595        let count = tbl
596            .count_where(&[Predicate::gte("balance", FieldValue::U128(10_000))])
597            .unwrap();
598        assert_eq!(count, 10);
599    }
600}