donadb-rel 0.1.2

DonaDB Relational — typed schemas, secondary indexes, and relational scans on top of DonaDB. Built for the TruthLinked blockchain.
Documentation
// index.rs — Secondary index specification and maintenance.
//
// IndexSpec declares which field in a schema has a secondary index.
// IndexWriter intercepts writes and maintains the index domain automatically.
//
// Index domain key format:
//   [schema_id: u8][field_id: u8][encoded_field_value][primary_key]
//
//   schema_id + field_id: namespace the index to avoid cross-schema collisions
//   encoded_field_value:  the indexed field value (big-endian for range scan correctness)
//   primary_key:          appended so the index entry uniquely identifies one record
//                         and allows the primary key to be recovered from the index
//
// Index domain: family "default" on a dedicated DonaDB DomainId derived as:
//   domain_id = blake3(schema_name ++ field_name) truncated to u32
//   This gives each (schema, field) pair its own independent conflict domain —
//   index writes for accounts.balance never contend with index writes for tokens.holder.
//
// Index maintenance is always within the same WriteBatch as the primary write.
// The Shadow Commit protocol in DonaDB guarantees that primary + all index entries
// are atomic — a crash never leaves a stale or partial index.

use bytes::{BufMut, Bytes, BytesMut};

use crate::codec::encode_field_key;
use crate::error::RelError;
use crate::schema::{FieldValue, Schema};

/// Specification for one secondary index: which schema, which field.
#[derive(Debug, Clone)]
pub struct IndexSpec {
    pub schema_name: String,
    pub field_name: String,
    /// Pre-computed DonaDB domain for this index.
    pub domain_id: u32,
    /// Schema field index (position in schema.fields).
    pub field_idx: usize,
}

impl IndexSpec {
    pub fn new(schema: &Schema, field_name: &str) -> Result<Self, RelError> {
        let (field_idx, _field) = schema
            .field(field_name)
            .ok_or_else(|| RelError::UnknownField(field_name.to_string()))?;
        let domain_id = derive_index_domain(&schema.name, field_name);
        Ok(Self {
            schema_name: schema.name.clone(),
            field_name: field_name.to_string(),
            domain_id,
            field_idx,
        })
    }
}

/// Derive a stable DonaDB DomainId for a (schema, field) index.
/// Uses blake3 so the domain is deterministic across process restarts.
pub fn derive_index_domain(schema_name: &str, field_name: &str) -> u32 {
    let mut h = blake3::Hasher::new();
    h.update(b"donadb-rel:idx:");
    h.update(schema_name.as_bytes());
    h.update(b":");
    h.update(field_name.as_bytes());
    let hash = h.finalize();
    u32::from_le_bytes(hash.as_bytes()[..4].try_into().unwrap())
}

/// Build the DonaDB key for an index entry.
/// Format: [field_value_encoded][primary_key]
/// The field value is encoded big-endian (for range scan ordering).
/// The primary key is appended to make the entry unique per record.
pub fn build_index_key(
    field_value: &FieldValue,
    field_type: &crate::schema::FieldType,
    primary_key: &Bytes,
) -> Result<Bytes, RelError> {
    let mut buf = BytesMut::new();
    encode_field_key(&mut buf, field_type, field_value)?;
    buf.put_slice(primary_key);
    Ok(buf.freeze())
}

/// Extract the primary key from an index key.
/// The field value occupies the first `field_encoded_len` bytes.
/// The remaining bytes are the primary key.
pub fn extract_primary_key_from_index(index_key: &[u8], field_encoded_len: usize) -> Bytes {
    Bytes::copy_from_slice(&index_key[field_encoded_len..])
}

/// Build the index scan start/end keys for a range predicate on a field.
/// Returns (start_key, end_key) for a DonaDB range scan on the index domain.
pub fn index_range_for_eq(
    field_value: &FieldValue,
    field_type: &crate::schema::FieldType,
) -> Result<(Bytes, Bytes), RelError> {
    let mut start_buf = BytesMut::new();
    encode_field_key(&mut start_buf, field_type, field_value)?;
    let start = start_buf.freeze();

    // End = start bytes with the last byte incremented (exclusive upper bound)
    let mut end_bytes = start.to_vec();
    // Append 0xFF bytes so the range covers all primary keys with this field value
    end_bytes.extend_from_slice(&[0xFF; 4]);
    Ok((start, Bytes::from(end_bytes)))
}

// ── IndexWriter ───────────────────────────────────────────────────────────────

/// Maintains secondary indexes automatically during RelTable writes.
/// All index updates go into the same WriteBatch as the primary write.
pub struct IndexWriter {
    pub specs: Vec<IndexSpec>,
}

impl IndexWriter {
    pub fn new(specs: Vec<IndexSpec>) -> Self {
        Self { specs }
    }

    /// Add index entries for a record to an existing WriteBatch.
    /// Must be called for every put_record() operation.
    pub fn write_indexes(
        &self,
        batch: &mut donadb::WriteBatch,
        record: &crate::schema::Record,
        schema: &Schema,
        primary_key: &Bytes,
    ) -> Result<(), RelError> {
        for spec in &self.specs {
            let field = &schema.fields[spec.field_idx];
            let value = record
                .values
                .get(spec.field_idx)
                .ok_or_else(|| RelError::MissingField(field.name.clone()))?;
            if value == &FieldValue::Null {
                continue;
            }
            let index_key = build_index_key(value, &field.field_type, primary_key)?;
            // Index value = primary key bytes (so we can resolve back to the record)
            batch.put(spec.domain_id, index_key, primary_key.clone());
        }
        Ok(())
    }

    /// Remove index entries for a record being deleted or overwritten.
    /// Call before writing the new version so the old index entries are cleaned up.
    pub fn remove_indexes(
        &self,
        batch: &mut donadb::WriteBatch,
        old_record: &crate::schema::Record,
        schema: &Schema,
        primary_key: &Bytes,
    ) -> Result<(), RelError> {
        for spec in &self.specs {
            let field = &schema.fields[spec.field_idx];
            let value = old_record
                .values
                .get(spec.field_idx)
                .ok_or_else(|| RelError::MissingField(field.name.clone()))?;
            if value == &FieldValue::Null {
                continue;
            }
            let index_key = build_index_key(value, &field.field_type, primary_key)?;
            // Tombstone: write empty value to the index key at this height
            batch.put(spec.domain_id, index_key, Bytes::new());
        }
        Ok(())
    }
}