iridium-db 0.4.0

A high-performance vector-graph hybrid storage and indexing engine
use super::*;

const TYPED_EDGE_MAGIC: &[u8] = b"IRTE1";

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum TypedEdgeDeltaOp {
    Add,
    Delete,
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct TypedEdgeDeltaPayload {
    pub target_id: u64,
    pub rel_type: String,
    pub op: TypedEdgeDeltaOp,
}

pub(super) enum WalRecord<'a> {
    Delta(sstable::EntryKind, &'a [u8]),
    BitmapPosting {
        index_name: String,
        value_key: String,
        node_id: u64,
    },
    EmbeddingPending {
        node_id: u64,
    },
}

fn encode_bitmap_wal_record(index_name: &str, value_key: &str, node_id: u64) -> Result<Vec<u8>> {
    let index_name = index_name.trim().as_bytes();
    let value_key = value_key.trim().as_bytes();
    let index_len = u16::try_from(index_name.len()).map_err(|_| {
        StorageError::InvalidInput("bitmap index name too long for WAL".to_string())
    })?;
    let value_len = u16::try_from(value_key.len())
        .map_err(|_| StorageError::InvalidInput("bitmap value key too long for WAL".to_string()))?;
    let mut record = Vec::with_capacity(1 + 2 + index_name.len() + 2 + value_key.len() + 8);
    record.push(4_u8);
    record.extend_from_slice(&index_len.to_le_bytes());
    record.extend_from_slice(index_name);
    record.extend_from_slice(&value_len.to_le_bytes());
    record.extend_from_slice(value_key);
    record.extend_from_slice(&node_id.to_le_bytes());
    Ok(record)
}

pub(super) fn decode_wal_record(record: &[u8]) -> Result<WalRecord<'_>> {
    if record.is_empty() {
        return Err(StorageError::InvalidInput("empty WAL record".to_string()));
    }
    match record[0] {
        0 => Ok(WalRecord::Delta(sstable::EntryKind::FullNode, &record[1..])),
        1 => Ok(WalRecord::Delta(
            sstable::EntryKind::EdgeDelta,
            &record[1..],
        )),
        2 => Ok(WalRecord::Delta(
            sstable::EntryKind::VectorDelta,
            &record[1..],
        )),
        3 => Ok(WalRecord::Delta(
            sstable::EntryKind::Tombstone,
            &record[1..],
        )),
        4 => decode_bitmap_wal_record(&record[1..]),
        5 => decode_embedding_pending_wal_record(&record[1..]),
        other => Err(StorageError::InvalidInput(format!(
            "invalid WAL record kind {}",
            other
        ))),
    }
}

fn decode_bitmap_wal_record(payload: &[u8]) -> Result<WalRecord<'_>> {
    if payload.len() < 2 {
        return Err(StorageError::InvalidInput(
            "bitmap WAL payload missing index length".to_string(),
        ));
    }
    let index_len = u16::from_le_bytes(payload[0..2].try_into().unwrap()) as usize;
    if payload.len() < 2 + index_len + 2 + 8 {
        return Err(StorageError::InvalidInput(
            "bitmap WAL payload truncated".to_string(),
        ));
    }
    let index_start = 2;
    let index_end = index_start + index_len;
    let value_len =
        u16::from_le_bytes(payload[index_end..index_end + 2].try_into().unwrap()) as usize;
    let value_start = index_end + 2;
    let value_end = value_start + value_len;
    if payload.len() < value_end + 8 {
        return Err(StorageError::InvalidInput(
            "bitmap WAL payload missing node id".to_string(),
        ));
    }
    let node_id = u64::from_le_bytes(payload[value_end..value_end + 8].try_into().unwrap());
    let index_name = std::str::from_utf8(&payload[index_start..index_end])
        .map_err(|_| StorageError::InvalidInput("bitmap WAL index name is not utf8".to_string()))?;
    let value_key = std::str::from_utf8(&payload[value_start..value_end])
        .map_err(|_| StorageError::InvalidInput("bitmap WAL value key is not utf8".to_string()))?;
    Ok(WalRecord::BitmapPosting {
        index_name: index_name.to_string(),
        value_key: value_key.to_string(),
        node_id,
    })
}

pub(super) fn apply_bitmap_posting(
    handle: &mut StorageHandle,
    index_name: &str,
    value_key: &str,
    node_id: u64,
    write_wal: bool,
) -> Result<()> {
    if !handle.manifest.has_bitmap_index(index_name) {
        return Err(StorageError::InvalidInput(format!(
            "unknown bitmap index '{}'",
            index_name
        )));
    }
    if write_wal {
        let record = encode_bitmap_wal_record(index_name, value_key, node_id)?;
        let wal_bytes = handle.wal.append_record(&record)?;
        handle.metrics.wal_bytes_written += wal_bytes;
        handle.metrics.logical_bytes_written += (index_name.len() + value_key.len() + 8) as u64;
    }
    handle
        .bitmap_store
        .add_posting(index_name, value_key, node_id)?;
    Ok(())
}

pub(super) fn encode_embedding_pending_wal_record(node_id: u64) -> Vec<u8> {
    let mut record = Vec::with_capacity(9);
    record.push(5_u8);
    record.extend_from_slice(&node_id.to_le_bytes());
    record
}

fn decode_embedding_pending_wal_record(payload: &[u8]) -> Result<WalRecord<'_>> {
    if payload.len() < 8 {
        return Err(StorageError::InvalidInput(
            "embedding pending WAL payload truncated".to_string(),
        ));
    }
    let node_id = u64::from_le_bytes(payload[0..8].try_into().unwrap());
    Ok(WalRecord::EmbeddingPending { node_id })
}

pub(super) fn decode_delta(delta: &[u8], kind: sstable::EntryKind) -> Result<sstable::Entry> {
    if delta.len() < 16 {
        return Err(StorageError::InvalidInput(
            "delta must include node_id and version".to_string(),
        ));
    }
    let node_id = u64::from_le_bytes(delta[0..8].try_into().unwrap());
    let version = u64::from_le_bytes(delta[8..16].try_into().unwrap());
    Ok(sstable::Entry {
        key: node_id,
        version,
        kind,
        value: delta[16..].to_vec(),
    })
}

pub fn encode_typed_edge_delta_payload(
    target_id: u64,
    rel_type: &str,
    op: TypedEdgeDeltaOp,
) -> Vec<u8> {
    let rel_type = rel_type.trim();
    let rel_type_bytes = rel_type.as_bytes();
    let mut payload = Vec::with_capacity(TYPED_EDGE_MAGIC.len() + 2 + rel_type_bytes.len() + 8 + 1);
    payload.extend_from_slice(TYPED_EDGE_MAGIC);
    payload.extend_from_slice(&(rel_type_bytes.len() as u16).to_le_bytes());
    payload.extend_from_slice(rel_type_bytes);
    payload.extend_from_slice(&target_id.to_le_bytes());
    payload.push(match op {
        TypedEdgeDeltaOp::Add => 0,
        TypedEdgeDeltaOp::Delete => 1,
    });
    payload
}

pub fn decode_typed_edge_delta_payload(payload: &[u8]) -> Option<TypedEdgeDeltaPayload> {
    if payload.len() < TYPED_EDGE_MAGIC.len() + 2 + 8 + 1 {
        return None;
    }
    if &payload[..TYPED_EDGE_MAGIC.len()] != TYPED_EDGE_MAGIC {
        return None;
    }
    let len_start = TYPED_EDGE_MAGIC.len();
    let rel_type_len =
        u16::from_le_bytes(payload[len_start..len_start + 2].try_into().ok()?) as usize;
    let rel_type_start = len_start + 2;
    let rel_type_end = rel_type_start + rel_type_len;
    if payload.len() < rel_type_end + 8 + 1 {
        return None;
    }
    let rel_type = std::str::from_utf8(&payload[rel_type_start..rel_type_end])
        .ok()?
        .to_string();
    let target_start = rel_type_end;
    let target_id = u64::from_le_bytes(payload[target_start..target_start + 8].try_into().ok()?);
    let op = match payload[target_start + 8] {
        0 => TypedEdgeDeltaOp::Add,
        1 => TypedEdgeDeltaOp::Delete,
        _ => return None,
    };
    Some(TypedEdgeDeltaPayload {
        target_id,
        rel_type,
        op,
    })
}

pub(super) fn bytes_to_u64_list(bytes: &[u8]) -> Vec<u64> {
    bytes
        .chunks_exact(8)
        .map(|chunk| u64::from_le_bytes(chunk.try_into().unwrap()))
        .collect()
}