use super::*;
const TYPED_EDGE_MAGIC: &[u8] = b"IRTE1";
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum TypedEdgeDeltaOp {
Add,
Delete,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct TypedEdgeDeltaPayload {
pub target_id: u64,
pub rel_type: String,
pub op: TypedEdgeDeltaOp,
}
pub(super) enum WalRecord<'a> {
Delta(sstable::EntryKind, &'a [u8]),
BitmapPosting {
index_name: String,
value_key: String,
node_id: u64,
},
EmbeddingPending {
node_id: u64,
},
}
fn encode_bitmap_wal_record(index_name: &str, value_key: &str, node_id: u64) -> Result<Vec<u8>> {
let index_name = index_name.trim().as_bytes();
let value_key = value_key.trim().as_bytes();
let index_len = u16::try_from(index_name.len()).map_err(|_| {
StorageError::InvalidInput("bitmap index name too long for WAL".to_string())
})?;
let value_len = u16::try_from(value_key.len())
.map_err(|_| StorageError::InvalidInput("bitmap value key too long for WAL".to_string()))?;
let mut record = Vec::with_capacity(1 + 2 + index_name.len() + 2 + value_key.len() + 8);
record.push(4_u8);
record.extend_from_slice(&index_len.to_le_bytes());
record.extend_from_slice(index_name);
record.extend_from_slice(&value_len.to_le_bytes());
record.extend_from_slice(value_key);
record.extend_from_slice(&node_id.to_le_bytes());
Ok(record)
}
pub(super) fn decode_wal_record(record: &[u8]) -> Result<WalRecord<'_>> {
if record.is_empty() {
return Err(StorageError::InvalidInput("empty WAL record".to_string()));
}
match record[0] {
0 => Ok(WalRecord::Delta(sstable::EntryKind::FullNode, &record[1..])),
1 => Ok(WalRecord::Delta(
sstable::EntryKind::EdgeDelta,
&record[1..],
)),
2 => Ok(WalRecord::Delta(
sstable::EntryKind::VectorDelta,
&record[1..],
)),
3 => Ok(WalRecord::Delta(
sstable::EntryKind::Tombstone,
&record[1..],
)),
4 => decode_bitmap_wal_record(&record[1..]),
5 => decode_embedding_pending_wal_record(&record[1..]),
other => Err(StorageError::InvalidInput(format!(
"invalid WAL record kind {}",
other
))),
}
}
fn decode_bitmap_wal_record(payload: &[u8]) -> Result<WalRecord<'_>> {
if payload.len() < 2 {
return Err(StorageError::InvalidInput(
"bitmap WAL payload missing index length".to_string(),
));
}
let index_len = u16::from_le_bytes(payload[0..2].try_into().unwrap()) as usize;
if payload.len() < 2 + index_len + 2 + 8 {
return Err(StorageError::InvalidInput(
"bitmap WAL payload truncated".to_string(),
));
}
let index_start = 2;
let index_end = index_start + index_len;
let value_len =
u16::from_le_bytes(payload[index_end..index_end + 2].try_into().unwrap()) as usize;
let value_start = index_end + 2;
let value_end = value_start + value_len;
if payload.len() < value_end + 8 {
return Err(StorageError::InvalidInput(
"bitmap WAL payload missing node id".to_string(),
));
}
let node_id = u64::from_le_bytes(payload[value_end..value_end + 8].try_into().unwrap());
let index_name = std::str::from_utf8(&payload[index_start..index_end])
.map_err(|_| StorageError::InvalidInput("bitmap WAL index name is not utf8".to_string()))?;
let value_key = std::str::from_utf8(&payload[value_start..value_end])
.map_err(|_| StorageError::InvalidInput("bitmap WAL value key is not utf8".to_string()))?;
Ok(WalRecord::BitmapPosting {
index_name: index_name.to_string(),
value_key: value_key.to_string(),
node_id,
})
}
pub(super) fn apply_bitmap_posting(
handle: &mut StorageHandle,
index_name: &str,
value_key: &str,
node_id: u64,
write_wal: bool,
) -> Result<()> {
if !handle.manifest.has_bitmap_index(index_name) {
return Err(StorageError::InvalidInput(format!(
"unknown bitmap index '{}'",
index_name
)));
}
if write_wal {
let record = encode_bitmap_wal_record(index_name, value_key, node_id)?;
let wal_bytes = handle.wal.append_record(&record)?;
handle.metrics.wal_bytes_written += wal_bytes;
handle.metrics.logical_bytes_written += (index_name.len() + value_key.len() + 8) as u64;
}
handle
.bitmap_store
.add_posting(index_name, value_key, node_id)?;
Ok(())
}
pub(super) fn encode_embedding_pending_wal_record(node_id: u64) -> Vec<u8> {
let mut record = Vec::with_capacity(9);
record.push(5_u8);
record.extend_from_slice(&node_id.to_le_bytes());
record
}
fn decode_embedding_pending_wal_record(payload: &[u8]) -> Result<WalRecord<'_>> {
if payload.len() < 8 {
return Err(StorageError::InvalidInput(
"embedding pending WAL payload truncated".to_string(),
));
}
let node_id = u64::from_le_bytes(payload[0..8].try_into().unwrap());
Ok(WalRecord::EmbeddingPending { node_id })
}
pub(super) fn decode_delta(delta: &[u8], kind: sstable::EntryKind) -> Result<sstable::Entry> {
if delta.len() < 16 {
return Err(StorageError::InvalidInput(
"delta must include node_id and version".to_string(),
));
}
let node_id = u64::from_le_bytes(delta[0..8].try_into().unwrap());
let version = u64::from_le_bytes(delta[8..16].try_into().unwrap());
Ok(sstable::Entry {
key: node_id,
version,
kind,
value: delta[16..].to_vec(),
})
}
pub fn encode_typed_edge_delta_payload(
target_id: u64,
rel_type: &str,
op: TypedEdgeDeltaOp,
) -> Vec<u8> {
let rel_type = rel_type.trim();
let rel_type_bytes = rel_type.as_bytes();
let mut payload = Vec::with_capacity(TYPED_EDGE_MAGIC.len() + 2 + rel_type_bytes.len() + 8 + 1);
payload.extend_from_slice(TYPED_EDGE_MAGIC);
payload.extend_from_slice(&(rel_type_bytes.len() as u16).to_le_bytes());
payload.extend_from_slice(rel_type_bytes);
payload.extend_from_slice(&target_id.to_le_bytes());
payload.push(match op {
TypedEdgeDeltaOp::Add => 0,
TypedEdgeDeltaOp::Delete => 1,
});
payload
}
pub fn decode_typed_edge_delta_payload(payload: &[u8]) -> Option<TypedEdgeDeltaPayload> {
if payload.len() < TYPED_EDGE_MAGIC.len() + 2 + 8 + 1 {
return None;
}
if &payload[..TYPED_EDGE_MAGIC.len()] != TYPED_EDGE_MAGIC {
return None;
}
let len_start = TYPED_EDGE_MAGIC.len();
let rel_type_len =
u16::from_le_bytes(payload[len_start..len_start + 2].try_into().ok()?) as usize;
let rel_type_start = len_start + 2;
let rel_type_end = rel_type_start + rel_type_len;
if payload.len() < rel_type_end + 8 + 1 {
return None;
}
let rel_type = std::str::from_utf8(&payload[rel_type_start..rel_type_end])
.ok()?
.to_string();
let target_start = rel_type_end;
let target_id = u64::from_le_bytes(payload[target_start..target_start + 8].try_into().ok()?);
let op = match payload[target_start + 8] {
0 => TypedEdgeDeltaOp::Add,
1 => TypedEdgeDeltaOp::Delete,
_ => return None,
};
Some(TypedEdgeDeltaPayload {
target_id,
rel_type,
op,
})
}
pub(super) fn bytes_to_u64_list(bytes: &[u8]) -> Vec<u64> {
bytes
.chunks_exact(8)
.map(|chunk| u64::from_le_bytes(chunk.try_into().unwrap()))
.collect()
}