ferrokinesis-core 0.3.0

Core protocol logic for the ferrokinesis AWS Kinesis emulator
Documentation
use aes::cipher::{BlockDecryptMut, BlockEncryptMut, KeyIvInit, block_padding::Pkcs7};
use alloc::format;
use alloc::string::{String, ToString};
use alloc::vec;
use alloc::vec::Vec;
use base64::{Engine, engine::general_purpose::STANDARD as BASE64};

/// Errors from decoding a shard iterator token.
#[derive(Debug, thiserror::Error)]
pub enum ShardIteratorError {
    #[error("invalid base64")]
    InvalidBase64,
    #[error("invalid length")]
    InvalidLength,
    #[error("base64 encoding mismatch")]
    EncodingMismatch,
    #[error("invalid version header")]
    InvalidVersion,
    #[error("decryption failed")]
    DecryptionFailed,
    #[error("invalid UTF-8")]
    InvalidUtf8,
    #[error("malformed iterator payload")]
    MalformedPayload,
    #[error("invalid timestamp")]
    InvalidTimestamp,
}

type Aes256CbcEnc = cbc::Encryptor<aes::Aes256>;
type Aes256CbcDec = cbc::Decryptor<aes::Aes256>;

// Key and IV are hardcoded to match kinesalite's values exactly. Changing either
// would make all in-flight iterator tokens undecryptable (decrypt would return garbage
// or a padding error), silently breaking any client that calls GetRecords after an
// emulator restart or upgrade.
#[doc(hidden)]
pub const ITERATOR_PWD_KEY: [u8; 32] = [
    0x11, 0x33, 0xa5, 0xa8, 0x33, 0x66, 0x6b, 0x49, 0xab, 0xf2, 0x8c, 0x8b, 0xa3, 0x02, 0x93, 0x0f,
    0x0b, 0x2f, 0xb2, 0x40, 0xdc, 0xcd, 0x43, 0xcf, 0x4d, 0xfb, 0xc0, 0xca, 0x91, 0xf1, 0x77, 0x51,
];

#[doc(hidden)]
pub const ITERATOR_PWD_IV: [u8; 16] = [
    0x7b, 0xf1, 0x39, 0xdb, 0xab, 0xbe, 0xa2, 0xd9, 0x99, 0x5d, 0x6f, 0xca, 0xe1, 0xdf, 0xf7, 0xda,
];

/// Create a shard iterator token (AES-256-CBC encrypted).
///
/// Plaintext format: `{timestamp_ms:014}/{stream}/{shard_id}/{seq}/{nonce}`.
/// The token is encrypted with [`ITERATOR_PWD_KEY`]/[`ITERATOR_PWD_IV`],
/// prepended with an 8-byte version header, then base64-encoded.
pub fn create_shard_iterator(
    stream_name: &str,
    shard_id: &str,
    seq: &str,
    timestamp_ms: u64,
) -> String {
    let now = timestamp_ms as u128;

    // Plaintext token format (slash-delimited, 5 fields):
    //   {timestamp_ms:014}/{stream_name}/{shard_id}/{seq_num}/{36 zero nonce}
    // The 14-digit zero-padded timestamp covers ~317 years from epoch. The 36-character
    // nonce field is always zeros here (kinesalite compatible); it exists to pad the
    // plaintext to a predictable length so AES-CBC block alignment is stable.
    let encrypt_str = format!(
        "{:014}/{stream_name}/{shard_id}/{seq}/{}",
        now,
        "0".repeat(36)
    );

    let plaintext = encrypt_str.as_bytes();

    let cipher = Aes256CbcEnc::new(&ITERATOR_PWD_KEY.into(), &ITERATOR_PWD_IV.into());

    // Manually pad and encrypt
    let block_size = 16;
    let pad_len = block_size - (plaintext.len() % block_size);
    let mut buf = plaintext.to_vec();
    buf.resize(plaintext.len() + pad_len, pad_len as u8);
    let encrypted_len = buf.len();
    cipher
        .encrypt_padded_mut::<Pkcs7>(&mut buf, plaintext.len())
        .unwrap();
    buf.truncate(encrypted_len);

    // Prepend an 8-byte little-endian version header [0,0,0,0,0,0,0,1] before the
    // ciphertext. The value 1 is a format version that matches kinesalite. Decoders
    // check this header first and reject any token whose version byte differs.
    let mut buffer = vec![0u8; 8];
    buffer[7] = 1;
    buffer.extend_from_slice(&buf);

    BASE64.encode(&buffer)
}

/// Decode a shard iterator token, returning `(iterator_time_ms, stream_name, shard_id, seq_no)`.
///
/// Validates the 8-byte version header, decrypts the AES-256-CBC payload,
/// and splits the plaintext on `/` to recover the four fields.
pub fn decode_shard_iterator(
    iterator: &str,
) -> Result<(u64, String, String, String), ShardIteratorError> {
    let buffer = BASE64
        .decode(iterator)
        .map_err(|_| ShardIteratorError::InvalidBase64)?;

    // 152 = 8 (header) + 144 (smallest PKCS7-padded ciphertext for the shortest valid
    // plaintext). 280 = 8 + ceil((14+1+128+1+20+1+57+1+36)/16)*16, where 20 is the
    // shard ID length ("shardId-000000000000") and 57 is the maximum decimal length of
    // a v2 sequence number (47 hex digits ≈ log10(2^188) < 57).
    if buffer.len() < 152 || buffer.len() > 280 {
        return Err(ShardIteratorError::InvalidLength);
    }

    // Re-encode to check it matches (catches padding issues)
    if BASE64.encode(&buffer) != iterator {
        return Err(ShardIteratorError::EncodingMismatch);
    }

    // Check version header
    if buffer[..8] != [0, 0, 0, 0, 0, 0, 0, 1] {
        return Err(ShardIteratorError::InvalidVersion);
    }

    let cipher = Aes256CbcDec::new(&ITERATOR_PWD_KEY.into(), &ITERATOR_PWD_IV.into());
    let mut ciphertext = buffer[8..].to_vec();
    let decrypted = cipher
        .decrypt_padded_mut::<Pkcs7>(&mut ciphertext)
        .map_err(|_| ShardIteratorError::DecryptionFailed)?;

    let plaintext =
        String::from_utf8(decrypted.to_vec()).map_err(|_| ShardIteratorError::InvalidUtf8)?;
    let pieces: Vec<&str> = plaintext.split('/').collect();

    if pieces.len() != 5 {
        return Err(ShardIteratorError::MalformedPayload);
    }

    let iterator_time: u64 = pieces[0]
        .parse()
        .map_err(|_| ShardIteratorError::InvalidTimestamp)?;
    let stream_name = pieces[1].to_string();
    let shard_id = pieces[2].to_string();
    let seq_no = pieces[3].to_string();

    Ok((iterator_time, stream_name, shard_id, seq_no))
}