use crate::{
db::{
cursor::{
ContinuationSignature, CursorBoundary, CursorBoundarySlot, IndexRangeCursorAnchor,
token::TokenWireError,
token::bytes::{ByteCursor, checked_len_u32, write_len_prefixed_bytes, write_u32},
token::value::{read_value, read_value_vec, write_value, write_value_slice},
},
direction::Direction,
},
value::Value,
};
pub(in crate::db::cursor) const MAX_CURSOR_TOKEN_BYTES: usize = 8 * 1024;
const TOKEN_VARIANT_SCALAR: u8 = 1;
const TOKEN_VARIANT_GROUPED: u8 = 2;
const TOKEN_WIRE_VERSION: u8 = 1;
const SLOT_MISSING: u8 = 0;
const SLOT_PRESENT: u8 = 1;
const DIRECTION_ASC: u8 = 0;
const DIRECTION_DESC: u8 = 1;
pub(in crate::db::cursor::token) struct ScalarTokenParts {
pub(in crate::db::cursor::token) signature: ContinuationSignature,
pub(in crate::db::cursor::token) boundary: CursorBoundary,
pub(in crate::db::cursor::token) direction: Direction,
pub(in crate::db::cursor::token) initial_offset: u32,
pub(in crate::db::cursor::token) index_range_anchor: Option<IndexRangeCursorAnchor>,
}
pub(in crate::db::cursor::token) struct GroupedTokenParts {
pub(in crate::db::cursor::token) signature: ContinuationSignature,
pub(in crate::db::cursor::token) last_group_key: Vec<Value>,
pub(in crate::db::cursor::token) direction: Direction,
pub(in crate::db::cursor::token) initial_offset: u32,
}
pub(in crate::db::cursor::token) fn encode_scalar_token(
signature: ContinuationSignature,
boundary: &CursorBoundary,
direction: Direction,
initial_offset: u32,
index_range_anchor: Option<&IndexRangeCursorAnchor>,
) -> Result<Vec<u8>, TokenWireError> {
let mut out = Vec::new();
write_token_header(&mut out, TOKEN_VARIANT_SCALAR);
out.extend_from_slice(&signature.into_bytes());
write_direction(&mut out, direction);
write_u32(&mut out, initial_offset);
write_cursor_boundary(&mut out, boundary)?;
write_optional_anchor(&mut out, index_range_anchor)?;
finish_token_encode(out)
}
pub(in crate::db::cursor::token) fn encode_grouped_token(
signature: ContinuationSignature,
last_group_key: &[Value],
direction: Direction,
initial_offset: u32,
) -> Result<Vec<u8>, TokenWireError> {
let mut out = Vec::new();
write_token_header(&mut out, TOKEN_VARIANT_GROUPED);
out.extend_from_slice(&signature.into_bytes());
write_direction(&mut out, direction);
write_u32(&mut out, initial_offset);
write_value_slice(&mut out, last_group_key)?;
finish_token_encode(out)
}
pub(in crate::db::cursor::token) fn decode_scalar_token(
bytes: &[u8],
) -> Result<ScalarTokenParts, TokenWireError> {
let mut cursor = start_token_decode(bytes)?;
expect_token_variant(&mut cursor, TOKEN_VARIANT_SCALAR)?;
let signature = ContinuationSignature::from_bytes(cursor.read_array()?);
let direction = read_direction(&mut cursor)?;
let initial_offset = cursor.read_u32()?;
let boundary = read_cursor_boundary(&mut cursor)?;
let index_range_anchor = read_optional_anchor(&mut cursor)?;
cursor.finish()?;
Ok(ScalarTokenParts {
signature,
boundary,
direction,
initial_offset,
index_range_anchor,
})
}
pub(in crate::db::cursor::token) fn decode_grouped_token(
bytes: &[u8],
) -> Result<GroupedTokenParts, TokenWireError> {
let mut cursor = start_token_decode(bytes)?;
expect_token_variant(&mut cursor, TOKEN_VARIANT_GROUPED)?;
let signature = ContinuationSignature::from_bytes(cursor.read_array()?);
let direction = read_direction(&mut cursor)?;
let initial_offset = cursor.read_u32()?;
let last_group_key = read_value_vec(&mut cursor)?;
cursor.finish()?;
Ok(GroupedTokenParts {
signature,
last_group_key,
direction,
initial_offset,
})
}
fn start_token_decode(bytes: &[u8]) -> Result<ByteCursor<'_>, TokenWireError> {
if bytes.len() > MAX_CURSOR_TOKEN_BYTES {
return Err(TokenWireError::decode(format!(
"cursor token exceeds max length: {} bytes (max {MAX_CURSOR_TOKEN_BYTES})",
bytes.len()
)));
}
Ok(ByteCursor::new(bytes))
}
fn finish_token_encode(bytes: Vec<u8>) -> Result<Vec<u8>, TokenWireError> {
if bytes.len() > MAX_CURSOR_TOKEN_BYTES {
return Err(TokenWireError::encode(format!(
"cursor token exceeds max length: {} bytes (max {MAX_CURSOR_TOKEN_BYTES})",
bytes.len()
)));
}
Ok(bytes)
}
fn write_token_header(out: &mut Vec<u8>, variant: u8) {
out.push(TOKEN_WIRE_VERSION);
out.push(variant);
}
fn expect_token_variant(
cursor: &mut ByteCursor<'_>,
expected_variant: u8,
) -> Result<(), TokenWireError> {
let version = cursor.read_u8()?;
if version != TOKEN_WIRE_VERSION {
return Err(TokenWireError::decode(format!(
"unsupported cursor token wire version {version}"
)));
}
let actual_variant = cursor.read_u8()?;
if actual_variant != expected_variant {
return Err(TokenWireError::decode(format!(
"cursor token variant mismatch: expected {expected_variant}, found {actual_variant}"
)));
}
Ok(())
}
fn write_direction(out: &mut Vec<u8>, direction: Direction) {
out.push(match direction {
Direction::Asc => DIRECTION_ASC,
Direction::Desc => DIRECTION_DESC,
});
}
fn read_direction(cursor: &mut ByteCursor<'_>) -> Result<Direction, TokenWireError> {
match cursor.read_u8()? {
DIRECTION_ASC => Ok(Direction::Asc),
DIRECTION_DESC => Ok(Direction::Desc),
other => Err(TokenWireError::decode(format!(
"unsupported cursor direction tag {other}"
))),
}
}
fn write_optional_anchor(
out: &mut Vec<u8>,
anchor: Option<&IndexRangeCursorAnchor>,
) -> Result<(), TokenWireError> {
match anchor {
Some(anchor) => {
out.push(1);
write_len_prefixed_bytes(out, anchor.last_raw_key())?;
}
None => out.push(0),
}
Ok(())
}
fn read_optional_anchor(
cursor: &mut ByteCursor<'_>,
) -> Result<Option<IndexRangeCursorAnchor>, TokenWireError> {
match cursor.read_u8()? {
0 => Ok(None),
1 => Ok(Some(IndexRangeCursorAnchor::new(
cursor.read_len_prefixed_bytes()?.to_vec(),
))),
other => Err(TokenWireError::decode(format!(
"unsupported cursor anchor presence tag {other}"
))),
}
}
fn write_cursor_boundary(
out: &mut Vec<u8>,
boundary: &CursorBoundary,
) -> Result<(), TokenWireError> {
write_u32(out, checked_len_u32(boundary.slots.len())?);
for slot in &boundary.slots {
match slot {
CursorBoundarySlot::Missing => out.push(SLOT_MISSING),
CursorBoundarySlot::Present(value) => {
out.push(SLOT_PRESENT);
write_value(out, value)?;
}
}
}
Ok(())
}
fn read_cursor_boundary(cursor: &mut ByteCursor<'_>) -> Result<CursorBoundary, TokenWireError> {
let slot_count = usize::try_from(cursor.read_u32()?)
.map_err(|_| TokenWireError::decode("cursor boundary slot count does not fit usize"))?;
let mut slots = Vec::with_capacity(slot_count);
for _ in 0..slot_count {
match cursor.read_u8()? {
SLOT_MISSING => slots.push(CursorBoundarySlot::Missing),
SLOT_PRESENT => slots.push(CursorBoundarySlot::Present(read_value(cursor)?)),
other => {
return Err(TokenWireError::decode(format!(
"unsupported cursor boundary slot tag {other}"
)));
}
}
}
Ok(CursorBoundary { slots })
}