mod ttl;
pub use ttl::*;
#[cfg(test)]
mod tests;
use std::ops::Bound;
use std::path::PathBuf;
use std::time::Duration;
use tokio::sync::broadcast;
use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender};
use std::sync::{Arc, Mutex};
use scru128::Scru128Id;
use serde::{Deserialize, Deserializer, Serialize};
use fjall::{
config::{BlockSizePolicy, HashRatioPolicy},
Database, Error as FjallError, Keyspace, KeyspaceCreateOptions, PersistMode,
};
#[derive(Debug)]
pub enum StoreError {
Locked,
Other(FjallError),
}
impl std::fmt::Display for StoreError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
StoreError::Locked => write!(f, "Store is locked by another process"),
StoreError::Other(e) => write!(f, "{e}"),
}
}
}
impl std::error::Error for StoreError {}
#[derive(PartialEq, Eq, Serialize, Deserialize, Clone, Default, bon::Builder)]
pub struct Frame {
#[builder(start_fn, into)]
pub topic: String,
#[builder(default)]
pub id: Scru128Id,
pub hash: Option<ssri::Integrity>,
pub meta: Option<serde_json::Value>,
pub ttl: Option<TTL>,
}
use std::fmt;
impl fmt::Debug for Frame {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("Frame")
.field("id", &format!("{id}", id = self.id))
.field("topic", &self.topic)
.field("hash", &self.hash.as_ref().map(|x| format!("{x}")))
.field("meta", &self.meta)
.field("ttl", &self.ttl)
.finish()
}
}
impl<'de> Deserialize<'de> for FollowOption {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
let s: String = Deserialize::deserialize(deserializer)?;
if s.is_empty() || s == "yes" {
Ok(FollowOption::On)
} else if let Ok(duration) = s.parse::<u64>() {
Ok(FollowOption::WithHeartbeat(Duration::from_millis(duration)))
} else {
match s.as_str() {
"true" => Ok(FollowOption::On),
"false" | "no" => Ok(FollowOption::Off),
_ => Err(serde::de::Error::custom("Invalid value for follow option")),
}
}
}
}
fn deserialize_bool<'de, D>(deserializer: D) -> Result<bool, D::Error>
where
D: Deserializer<'de>,
{
let s: String = Deserialize::deserialize(deserializer)?;
match s.as_str() {
"false" | "no" | "0" => Ok(false),
_ => Ok(true),
}
}
#[derive(PartialEq, Deserialize, Clone, Debug, Default, bon::Builder)]
pub struct ReadOptions {
#[serde(default)]
#[builder(default)]
pub follow: FollowOption,
#[serde(default, deserialize_with = "deserialize_bool")]
#[builder(default)]
pub new: bool,
#[serde(rename = "after")]
pub after: Option<Scru128Id>,
pub from: Option<Scru128Id>,
pub limit: Option<usize>,
pub last: Option<usize>,
pub topic: Option<String>,
}
impl ReadOptions {
pub fn from_query(query: Option<&str>) -> Result<Self, crate::error::Error> {
match query {
Some(q) => Ok(serde_urlencoded::from_str(q)?),
None => Ok(Self::default()),
}
}
pub fn to_query_string(&self) -> String {
let mut params = Vec::new();
match self.follow {
FollowOption::Off => {}
FollowOption::On => params.push(("follow", "true".to_string())),
FollowOption::WithHeartbeat(duration) => {
params.push(("follow", duration.as_millis().to_string()));
}
}
if self.new {
params.push(("new", "true".to_string()));
}
if let Some(after) = self.after {
params.push(("after", after.to_string()));
}
if let Some(from) = self.from {
params.push(("from", from.to_string()));
}
if let Some(limit) = self.limit {
params.push(("limit", limit.to_string()));
}
if let Some(last) = self.last {
params.push(("last", last.to_string()));
}
if let Some(topic) = &self.topic {
params.push(("topic", topic.clone()));
}
if params.is_empty() {
String::new()
} else {
url::form_urlencoded::Serializer::new(String::new())
.extend_pairs(params)
.finish()
}
}
}
#[derive(Default, PartialEq, Clone, Debug)]
pub enum FollowOption {
#[default]
Off,
On,
WithHeartbeat(Duration),
}
#[derive(Debug)]
enum GCTask {
Remove(Scru128Id),
CheckLastTTL { topic: String, keep: u32 },
Drain(tokio::sync::oneshot::Sender<()>),
}
#[derive(Clone)]
pub struct Store {
pub path: PathBuf,
db: Database,
stream: Keyspace,
idx_topic: Keyspace,
broadcast_tx: broadcast::Sender<Frame>,
gc_tx: UnboundedSender<GCTask>,
append_lock: Arc<Mutex<()>>,
}
impl Store {
pub fn new(path: PathBuf) -> Result<Store, StoreError> {
let db = match Database::builder(path.join("fjall"))
.cache_size(32 * 1024 * 1024) .worker_threads(1)
.open()
{
Ok(db) => db,
Err(FjallError::Locked) => return Err(StoreError::Locked),
Err(e) => return Err(StoreError::Other(e)),
};
let stream_opts = || {
KeyspaceCreateOptions::default()
.max_memtable_size(8 * 1024 * 1024) .data_block_size_policy(BlockSizePolicy::all(16 * 1024)) .data_block_hash_ratio_policy(HashRatioPolicy::all(8.0))
.expect_point_read_hits(true)
};
let idx_opts = || {
KeyspaceCreateOptions::default()
.max_memtable_size(8 * 1024 * 1024) .data_block_size_policy(BlockSizePolicy::all(16 * 1024)) .data_block_hash_ratio_policy(HashRatioPolicy::all(0.0)) .expect_point_read_hits(true)
};
let stream = db.keyspace("stream", stream_opts).unwrap();
let idx_topic = db.keyspace("idx_topic", idx_opts).unwrap();
let (broadcast_tx, _) = broadcast::channel(1024);
let (gc_tx, gc_rx) = mpsc::unbounded_channel();
let store = Store {
path: path.clone(),
db,
stream,
idx_topic,
broadcast_tx,
gc_tx,
append_lock: Arc::new(Mutex::new(())),
};
spawn_gc_worker(gc_rx, store.clone());
Ok(store)
}
pub async fn wait_for_gc(&self) {
let (tx, rx) = tokio::sync::oneshot::channel();
let _ = self.gc_tx.send(GCTask::Drain(tx));
let _ = rx.await;
}
#[tracing::instrument(skip(self))]
pub async fn read(&self, options: ReadOptions) -> tokio::sync::mpsc::Receiver<Frame> {
let (tx, rx) = tokio::sync::mpsc::channel(100);
let should_follow = matches!(
options.follow,
FollowOption::On | FollowOption::WithHeartbeat(_)
);
let broadcast_rx = if should_follow {
Some(self.broadcast_tx.subscribe())
} else {
None
};
let done_rx = if !options.new {
let (done_tx, done_rx) = tokio::sync::oneshot::channel();
let tx_clone = tx.clone();
let store = self.clone();
let options = options.clone();
let should_follow_clone = should_follow;
let gc_tx = self.gc_tx.clone();
std::thread::spawn(move || {
let mut last_id = None;
let mut count = 0;
if let Some(last_n) = options.last {
let iter: Box<dyn Iterator<Item = Frame>> = match options.topic.as_deref() {
None | Some("*") => store.iter_frames_rev(),
Some(topic) if topic.ends_with(".*") => {
let prefix = &topic[..topic.len() - 1];
store.iter_frames_by_topic_prefix_rev(prefix)
}
Some(topic) => store.iter_frames_by_topic_rev(topic),
};
let mut frames: Vec<Frame> = Vec::with_capacity(last_n);
for frame in iter {
if let Some(TTL::Time(ttl)) = frame.ttl.as_ref() {
if is_expired(&frame.id, ttl) {
let _ = gc_tx.send(GCTask::Remove(frame.id));
continue;
}
}
frames.push(frame);
if frames.len() >= last_n {
break;
}
}
for frame in frames.into_iter().rev() {
last_id = Some(frame.id);
count += 1;
if tx_clone.blocking_send(frame).is_err() {
return;
}
}
} else {
let start_bound = options
.from
.as_ref()
.map(|id| (id, true))
.or_else(|| options.after.as_ref().map(|id| (id, false)));
let iter: Box<dyn Iterator<Item = Frame>> = match options.topic.as_deref() {
None | Some("*") => store.iter_frames(start_bound),
Some(topic) if topic.ends_with(".*") => {
let prefix = &topic[..topic.len() - 1]; store.iter_frames_by_topic_prefix(prefix, start_bound)
}
Some(topic) => store.iter_frames_by_topic(topic, start_bound),
};
for frame in iter {
if let Some(TTL::Time(ttl)) = frame.ttl.as_ref() {
if is_expired(&frame.id, ttl) {
let _ = gc_tx.send(GCTask::Remove(frame.id));
continue;
}
}
last_id = Some(frame.id);
if let Some(limit) = options.limit {
if count >= limit {
return; }
}
if tx_clone.blocking_send(frame).is_err() {
return;
}
count += 1;
}
}
if should_follow_clone {
let threshold = Frame::builder("xs.threshold")
.id(scru128::new())
.ttl(TTL::Ephemeral)
.build();
if tx_clone.blocking_send(threshold).is_err() {
return;
}
}
let _ = done_tx.send((last_id, count));
});
Some(done_rx)
} else {
None
};
if let Some(broadcast_rx) = broadcast_rx {
{
let tx = tx.clone();
let limit = options.limit;
tokio::spawn(async move {
let (last_id, mut count) = match done_rx {
Some(done_rx) => match done_rx.await {
Ok((id, count)) => (id, count),
Err(_) => return, },
None => (None, 0),
};
let mut broadcast_rx = broadcast_rx;
while let Ok(frame) = broadcast_rx.recv().await {
match options.topic.as_deref() {
None | Some("*") => {}
Some(topic) if topic.ends_with(".*") => {
let prefix = &topic[..topic.len() - 1]; if !frame.topic.starts_with(prefix) {
continue;
}
}
Some(topic) => {
if frame.topic != topic {
continue;
}
}
}
if let Some(last_scanned_id) = last_id {
if frame.id <= last_scanned_id {
continue;
}
}
if tx.send(frame).await.is_err() {
break;
}
if let Some(limit) = limit {
count += 1;
if count >= limit {
break;
}
}
}
});
}
if let FollowOption::WithHeartbeat(duration) = options.follow {
let heartbeat_tx = tx;
tokio::spawn(async move {
loop {
tokio::time::sleep(duration).await;
let frame = Frame::builder("xs.pulse")
.id(scru128::new())
.ttl(TTL::Ephemeral)
.build();
if heartbeat_tx.send(frame).await.is_err() {
break;
}
}
});
}
}
rx
}
pub fn read_sync(&self, options: ReadOptions) -> impl Iterator<Item = Frame> + '_ {
let gc_tx = self.gc_tx.clone();
let filter_expired = move |frame: Frame, gc_tx: &UnboundedSender<GCTask>| {
if let Some(TTL::Time(ttl)) = frame.ttl.as_ref() {
if is_expired(&frame.id, ttl) {
let _ = gc_tx.send(GCTask::Remove(frame.id));
return None;
}
}
Some(frame)
};
let frames: Vec<Frame> = if let Some(last_n) = options.last {
let iter: Box<dyn Iterator<Item = Frame>> = match options.topic.as_deref() {
None | Some("*") => self.iter_frames_rev(),
Some(topic) if topic.ends_with(".*") => {
let prefix = &topic[..topic.len() - 1];
self.iter_frames_by_topic_prefix_rev(prefix)
}
Some(topic) => self.iter_frames_by_topic_rev(topic),
};
let mut frames: Vec<Frame> = Vec::with_capacity(last_n);
for frame in iter {
if let Some(frame) = filter_expired(frame, &gc_tx) {
frames.push(frame);
if frames.len() >= last_n {
break;
}
}
}
frames.reverse();
frames
} else {
let start_bound = options
.from
.as_ref()
.map(|id| (id, true))
.or_else(|| options.after.as_ref().map(|id| (id, false)));
let iter: Box<dyn Iterator<Item = Frame>> = match options.topic.as_deref() {
None | Some("*") => self.iter_frames(start_bound),
Some(topic) if topic.ends_with(".*") => {
let prefix = &topic[..topic.len() - 1];
self.iter_frames_by_topic_prefix(prefix, start_bound)
}
Some(topic) => self.iter_frames_by_topic(topic, start_bound),
};
iter.filter_map(|frame| filter_expired(frame, &gc_tx))
.take(options.limit.unwrap_or(usize::MAX))
.collect()
};
frames.into_iter()
}
pub fn nu_modules_at(
&self,
as_of: &Scru128Id,
) -> std::collections::HashMap<String, ssri::Integrity> {
let mut modules = std::collections::HashMap::new();
let options = ReadOptions::builder().follow(FollowOption::Off).build();
for frame in self.read_sync(options) {
if frame.id > *as_of {
break;
}
if let Some(hash) = frame.hash {
if frame.topic.ends_with(".nu") {
modules.insert(frame.topic, hash);
}
}
}
modules
}
pub fn get(&self, id: &Scru128Id) -> Option<Frame> {
self.stream
.get(id.to_bytes())
.unwrap()
.map(|value| deserialize_frame((id.as_bytes(), value)))
}
#[tracing::instrument(skip(self), fields(id = %id.to_string()))]
pub fn remove(&self, id: &Scru128Id) -> Result<(), crate::error::Error> {
let Some(frame) = self.get(id) else {
return Ok(());
};
let mut topic_key = idx_topic_key_prefix(&frame.topic);
topic_key.extend(frame.id.as_bytes());
let prefix_keys = idx_topic_prefix_keys(&frame.topic, &frame.id);
let mut batch = self.db.batch();
batch.remove(&self.stream, id.as_bytes());
batch.remove(&self.idx_topic, topic_key);
for prefix_key in &prefix_keys {
batch.remove(&self.idx_topic, prefix_key);
}
batch.commit()?;
self.db.persist(PersistMode::SyncAll)?;
Ok(())
}
pub async fn cas_reader(&self, hash: ssri::Integrity) -> cacache::Result<cacache::Reader> {
cacache::Reader::open_hash(&self.path.join("cacache"), hash).await
}
pub fn cas_reader_sync(&self, hash: ssri::Integrity) -> cacache::Result<cacache::SyncReader> {
cacache::SyncReader::open_hash(self.path.join("cacache"), hash)
}
pub async fn cas_writer(&self) -> cacache::Result<cacache::Writer> {
cacache::WriteOpts::new()
.open_hash(&self.path.join("cacache"))
.await
}
pub fn cas_writer_sync(&self) -> cacache::Result<cacache::SyncWriter> {
cacache::WriteOpts::new().open_hash_sync(self.path.join("cacache"))
}
pub async fn cas_insert(&self, content: impl AsRef<[u8]>) -> cacache::Result<ssri::Integrity> {
cacache::write_hash(&self.path.join("cacache"), content).await
}
pub fn cas_insert_sync(&self, content: impl AsRef<[u8]>) -> cacache::Result<ssri::Integrity> {
cacache::write_hash_sync(self.path.join("cacache"), content)
}
pub async fn cas_insert_bytes(&self, bytes: &[u8]) -> cacache::Result<ssri::Integrity> {
self.cas_insert(bytes).await
}
pub fn cas_insert_bytes_sync(&self, bytes: &[u8]) -> cacache::Result<ssri::Integrity> {
self.cas_insert_sync(bytes)
}
pub async fn cas_read(&self, hash: &ssri::Integrity) -> cacache::Result<Vec<u8>> {
cacache::read_hash(&self.path.join("cacache"), hash).await
}
pub fn cas_read_sync(&self, hash: &ssri::Integrity) -> cacache::Result<Vec<u8>> {
cacache::read_hash_sync(self.path.join("cacache"), hash)
}
#[tracing::instrument(skip(self))]
pub fn insert_frame(&self, frame: &Frame) -> Result<(), crate::error::Error> {
let encoded: Vec<u8> = serde_json::to_vec(&frame).unwrap();
let topic_key = idx_topic_key_from_frame(frame)?;
let prefix_keys = idx_topic_prefix_keys(&frame.topic, &frame.id);
let mut batch = self.db.batch();
batch.insert(&self.stream, frame.id.as_bytes(), encoded);
batch.insert(&self.idx_topic, topic_key, b"");
for prefix_key in &prefix_keys {
batch.insert(&self.idx_topic, prefix_key, b"");
}
batch.commit()?;
self.db.persist(PersistMode::SyncAll)?;
Ok(())
}
pub fn append(&self, mut frame: Frame) -> Result<Frame, crate::error::Error> {
let _guard = self.append_lock.lock().unwrap();
frame.id = scru128::new();
idx_topic_key_from_frame(&frame)?;
if frame.ttl != Some(TTL::Ephemeral) {
self.insert_frame(&frame)?;
if let Some(TTL::Last(n)) = frame.ttl {
let _ = self.gc_tx.send(GCTask::CheckLastTTL {
topic: frame.topic.clone(),
keep: n,
});
}
}
let _ = self.broadcast_tx.send(frame.clone());
Ok(frame)
}
fn iter_frames(
&self,
start: Option<(&Scru128Id, bool)>,
) -> Box<dyn Iterator<Item = Frame> + '_> {
let range = match start {
Some((id, true)) => (Bound::Included(id.as_bytes().to_vec()), Bound::Unbounded),
Some((id, false)) => (Bound::Excluded(id.as_bytes().to_vec()), Bound::Unbounded),
None => (Bound::Unbounded, Bound::Unbounded),
};
Box::new(self.stream.range(range).filter_map(|guard| {
let (key, value) = guard.into_inner().ok()?;
Some(deserialize_frame((key, value)))
}))
}
fn iter_frames_rev(&self) -> Box<dyn Iterator<Item = Frame> + '_> {
Box::new(self.stream.iter().rev().filter_map(|guard| {
let (key, value) = guard.into_inner().ok()?;
Some(deserialize_frame((key, value)))
}))
}
fn iter_frames_by_topic_rev<'a>(
&'a self,
topic: &'a str,
) -> Box<dyn Iterator<Item = Frame> + 'a> {
let prefix = idx_topic_key_prefix(topic);
Box::new(
self.idx_topic
.prefix(prefix)
.rev()
.filter_map(move |guard| {
let key = guard.key().ok()?;
let frame_id = idx_topic_frame_id_from_key(&key);
self.get(&frame_id)
}),
)
}
fn iter_frames_by_topic_prefix_rev<'a>(
&'a self,
prefix: &'a str,
) -> Box<dyn Iterator<Item = Frame> + 'a> {
let mut index_prefix = Vec::with_capacity(prefix.len() + 1);
index_prefix.extend(prefix.as_bytes());
index_prefix.push(NULL_DELIMITER);
Box::new(
self.idx_topic
.prefix(index_prefix)
.rev()
.filter_map(move |guard| {
let key = guard.key().ok()?;
let frame_id = idx_topic_frame_id_from_key(&key);
self.get(&frame_id)
}),
)
}
fn iter_frames_by_topic<'a>(
&'a self,
topic: &'a str,
start: Option<(&'a Scru128Id, bool)>,
) -> Box<dyn Iterator<Item = Frame> + 'a> {
let prefix = idx_topic_key_prefix(topic);
Box::new(self.idx_topic.prefix(prefix).filter_map(move |guard| {
let key = guard.key().ok()?;
let frame_id = idx_topic_frame_id_from_key(&key);
if let Some((bound_id, inclusive)) = start {
if inclusive {
if frame_id < *bound_id {
return None;
}
} else if frame_id <= *bound_id {
return None;
}
}
self.get(&frame_id)
}))
}
fn iter_frames_by_topic_prefix<'a>(
&'a self,
prefix: &'a str,
start: Option<(&'a Scru128Id, bool)>,
) -> Box<dyn Iterator<Item = Frame> + 'a> {
let mut index_prefix = Vec::with_capacity(prefix.len() + 1);
index_prefix.extend(prefix.as_bytes());
index_prefix.push(NULL_DELIMITER);
Box::new(
self.idx_topic
.prefix(index_prefix)
.filter_map(move |guard| {
let key = guard.key().ok()?;
let frame_id = idx_topic_frame_id_from_key(&key);
if let Some((bound_id, inclusive)) = start {
if inclusive {
if frame_id < *bound_id {
return None;
}
} else if frame_id <= *bound_id {
return None;
}
}
self.get(&frame_id)
}),
)
}
}
fn spawn_gc_worker(mut gc_rx: UnboundedReceiver<GCTask>, store: Store) {
std::thread::spawn(move || {
while let Some(task) = gc_rx.blocking_recv() {
match task {
GCTask::Remove(id) => {
let _ = store.remove(&id);
}
GCTask::CheckLastTTL { topic, keep } => {
let prefix = idx_topic_key_prefix(&topic);
let frames_to_remove: Vec<_> = store
.idx_topic
.prefix(&prefix)
.rev() .skip(keep as usize)
.filter_map(|guard| {
let key = guard.key().ok()?;
Some(Scru128Id::from_bytes(
idx_topic_frame_id_from_key(&key).into(),
))
})
.collect();
for frame_id in frames_to_remove {
let _ = store.remove(&frame_id);
}
}
GCTask::Drain(tx) => {
let _ = tx.send(());
}
}
}
});
}
fn is_expired(id: &Scru128Id, ttl: &Duration) -> bool {
let created_ms = id.timestamp();
let expires_ms = created_ms.saturating_add(ttl.as_millis() as u64);
let now_ms = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis() as u64;
now_ms >= expires_ms
}
const NULL_DELIMITER: u8 = 0;
const MAX_TOPIC_LENGTH: usize = 255;
pub fn validate_topic(topic: &str) -> Result<(), crate::error::Error> {
if topic.is_empty() {
return Err("Topic cannot be empty".to_string().into());
}
if topic.len() > MAX_TOPIC_LENGTH {
return Err(format!("Topic exceeds max length of {MAX_TOPIC_LENGTH} bytes").into());
}
if topic.ends_with('.') {
return Err("Topic cannot end with '.'".to_string().into());
}
if topic.contains("..") {
return Err("Topic cannot contain consecutive dots".to_string().into());
}
let bytes = topic.as_bytes();
let first = bytes[0];
if !first.is_ascii_alphabetic() && first != b'_' {
return Err("Topic must start with a-z, A-Z, or _".to_string().into());
}
for &b in bytes {
if !b.is_ascii_alphanumeric() && b != b'_' && b != b'-' && b != b'.' {
return Err(format!(
"Topic contains invalid character: '{}'. Allowed: a-z A-Z 0-9 _ - .",
b as char
)
.into());
}
}
Ok(())
}
pub fn validate_topic_query(topic: &str) -> Result<(), crate::error::Error> {
if topic == "*" {
return Ok(());
}
if let Some(prefix) = topic.strip_suffix(".*") {
if prefix.is_empty() {
return Err("Wildcard '.*' requires a prefix".to_string().into());
}
validate_topic(prefix)
} else {
validate_topic(topic)
}
}
fn idx_topic_prefix_keys(topic: &str, frame_id: &scru128::Scru128Id) -> Vec<Vec<u8>> {
let mut keys = Vec::new();
let mut pos = 0;
while let Some(dot_pos) = topic[pos..].find('.') {
let prefix = &topic[..pos + dot_pos + 1]; let mut key = Vec::with_capacity(prefix.len() + 1 + 16);
key.extend(prefix.as_bytes());
key.push(NULL_DELIMITER);
key.extend(frame_id.as_bytes());
keys.push(key);
pos += dot_pos + 1;
}
keys
}
fn idx_topic_key_prefix(topic: &str) -> Vec<u8> {
let mut v = Vec::with_capacity(topic.len() + 1); v.extend(topic.as_bytes()); v.push(NULL_DELIMITER); v
}
pub(crate) fn idx_topic_key_from_frame(frame: &Frame) -> Result<Vec<u8>, crate::error::Error> {
validate_topic(&frame.topic)?;
let mut v = idx_topic_key_prefix(&frame.topic);
v.extend(frame.id.as_bytes());
Ok(v)
}
fn idx_topic_frame_id_from_key(key: &[u8]) -> Scru128Id {
let frame_id_bytes = &key[key.len() - 16..];
Scru128Id::from_bytes(frame_id_bytes.try_into().unwrap())
}
fn deserialize_frame<B1: AsRef<[u8]> + std::fmt::Debug, B2: AsRef<[u8]>>(
record: (B1, B2),
) -> Frame {
serde_json::from_slice(record.1.as_ref()).unwrap_or_else(|e| {
let key_bytes = record.0.as_ref();
if key_bytes.len() == 16 {
if let Ok(bytes) = key_bytes.try_into() {
let id = Scru128Id::from_bytes(bytes);
eprintln!("CORRUPTED_RECORD_ID: {id}");
}
}
let key = std::str::from_utf8(record.0.as_ref()).unwrap();
let value = std::str::from_utf8(record.1.as_ref()).unwrap();
panic!("Failed to deserialize frame: {e} {key} {value}")
})
}