#![allow(dead_code)]
use anyhow::Result;
use rivet_error::RivetError;
use serde::{Deserialize, Serialize};
pub const PERSIST_DATA_KEY: &[u8] = &[1];
pub const CONN_PREFIX: [u8; 1] = [2];
pub const INSPECTOR_TOKEN_KEY: [u8; 1] = [3];
pub const KV_PREFIX: [u8; 1] = [4];
pub const QUEUE_PREFIX: [u8; 1] = [5];
pub const WORKFLOW_PREFIX: [u8; 1] = [6];
pub const TRACES_PREFIX: [u8; 1] = [7];
pub const LAST_PUSHED_ALARM_KEY: &[u8] = &[6];
pub const QUEUE_STORAGE_VERSION: u8 = 1;
pub const WORKFLOW_STORAGE_VERSION: u8 = 1;
pub const TRACES_STORAGE_VERSION: u8 = 1;
const QUEUE_NAMESPACE_METADATA: u8 = 1;
const QUEUE_NAMESPACE_MESSAGES: u8 = 2;
const QUEUE_ID_BYTES: usize = 8;
pub const QUEUE_STORAGE_PREFIX: [u8; 2] = [QUEUE_PREFIX[0], QUEUE_STORAGE_VERSION];
pub const QUEUE_METADATA_KEY: [u8; 3] = [
QUEUE_PREFIX[0],
QUEUE_STORAGE_VERSION,
QUEUE_NAMESPACE_METADATA,
];
pub const QUEUE_MESSAGES_PREFIX: [u8; 3] = [
QUEUE_PREFIX[0],
QUEUE_STORAGE_VERSION,
QUEUE_NAMESPACE_MESSAGES,
];
pub const WORKFLOW_STORAGE_PREFIX: [u8; 2] = [WORKFLOW_PREFIX[0], WORKFLOW_STORAGE_VERSION];
pub const TRACES_STORAGE_PREFIX: [u8; 2] = [TRACES_PREFIX[0], TRACES_STORAGE_VERSION];
#[derive(RivetError, Serialize, Deserialize)]
#[error(
"queue",
"invalid_message_key",
"Queue message key is invalid",
"Queue message key is invalid: {reason}"
)]
struct QueueInvalidMessageKey {
reason: String,
}
pub fn make_prefixed_key(key: &[u8]) -> Vec<u8> {
concat_prefix(&KV_PREFIX, key)
}
pub fn remove_prefix_from_key(prefixed_key: &[u8]) -> &[u8] {
&prefixed_key[KV_PREFIX.len()..]
}
pub fn make_workflow_key(key: &[u8]) -> Vec<u8> {
concat_prefix(&WORKFLOW_STORAGE_PREFIX, key)
}
pub fn make_traces_key(key: &[u8]) -> Vec<u8> {
concat_prefix(&TRACES_STORAGE_PREFIX, key)
}
pub fn make_connection_key(conn_id: &str) -> Vec<u8> {
concat_prefix(&CONN_PREFIX, conn_id.as_bytes())
}
pub fn make_queue_message_key(id: u64) -> Vec<u8> {
let mut key = Vec::with_capacity(QUEUE_MESSAGES_PREFIX.len() + QUEUE_ID_BYTES);
key.extend_from_slice(&QUEUE_MESSAGES_PREFIX);
key.extend_from_slice(&id.to_be_bytes());
key
}
pub fn decode_queue_message_key(key: &[u8]) -> Result<u64> {
if key.len() != QUEUE_MESSAGES_PREFIX.len() + QUEUE_ID_BYTES {
return Err(invalid_queue_key("invalid length"));
}
if !key.starts_with(&QUEUE_MESSAGES_PREFIX) {
return Err(invalid_queue_key("invalid prefix"));
}
let bytes: [u8; QUEUE_ID_BYTES] = key[QUEUE_MESSAGES_PREFIX.len()..]
.try_into()
.map_err(|_| invalid_queue_key("invalid id bytes"))?;
Ok(u64::from_be_bytes(bytes))
}
fn concat_prefix(prefix: &[u8], suffix: &[u8]) -> Vec<u8> {
let mut key = Vec::with_capacity(prefix.len() + suffix.len());
key.extend_from_slice(prefix);
key.extend_from_slice(suffix);
key
}
fn invalid_queue_key(reason: &str) -> anyhow::Error {
QueueInvalidMessageKey {
reason: reason.to_owned(),
}
.build()
}