xs/store/
mod.rs

1mod ttl;
2pub use ttl::*;
3
4#[cfg(test)]
5mod tests;
6
7use std::ops::Bound;
8use std::path::PathBuf;
9use std::time::Duration;
10
11use tokio::sync::broadcast;
12use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender};
13
14use std::collections::HashSet;
15use std::sync::{Arc, RwLock};
16
17use scru128::Scru128Id;
18
19use serde::{Deserialize, Deserializer, Serialize};
20
21use fjall::{Config, Keyspace, PartitionCreateOptions, PartitionHandle};
22
23// Context with all bits set to zero for system operations
24pub const ZERO_CONTEXT: Scru128Id = Scru128Id::from_bytes([0; 16]);
25
26#[derive(PartialEq, Eq, Serialize, Deserialize, Clone, Default, bon::Builder)]
27pub struct Frame {
28    #[builder(start_fn, into)]
29    pub topic: String,
30    #[builder(start_fn)]
31    pub context_id: Scru128Id,
32    #[builder(default)]
33    pub id: Scru128Id,
34    pub hash: Option<ssri::Integrity>,
35    pub meta: Option<serde_json::Value>,
36    pub ttl: Option<TTL>,
37}
38
39use std::fmt;
40
41impl fmt::Debug for Frame {
42    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
43        f.debug_struct("Frame")
44            .field("id", &format!("{id}", id = self.id))
45            .field(
46                "context_id",
47                &format!("{context_id}", context_id = self.context_id),
48            )
49            .field("topic", &self.topic)
50            .field("hash", &self.hash.as_ref().map(|x| format!("{x}")))
51            .field("meta", &self.meta)
52            .field("ttl", &self.ttl)
53            .finish()
54    }
55}
56
57impl<'de> Deserialize<'de> for FollowOption {
58    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
59    where
60        D: Deserializer<'de>,
61    {
62        let s: String = Deserialize::deserialize(deserializer)?;
63        if s.is_empty() || s == "yes" {
64            Ok(FollowOption::On)
65        } else if let Ok(duration) = s.parse::<u64>() {
66            Ok(FollowOption::WithHeartbeat(Duration::from_millis(duration)))
67        } else {
68            match s.as_str() {
69                "true" => Ok(FollowOption::On),
70                "false" | "no" => Ok(FollowOption::Off),
71                _ => Err(serde::de::Error::custom("Invalid value for follow option")),
72            }
73        }
74    }
75}
76
77fn deserialize_bool<'de, D>(deserializer: D) -> Result<bool, D::Error>
78where
79    D: Deserializer<'de>,
80{
81    let s: String = Deserialize::deserialize(deserializer)?;
82    match s.as_str() {
83        "false" | "no" | "0" => Ok(false),
84        _ => Ok(true),
85    }
86}
87
88#[derive(PartialEq, Deserialize, Clone, Debug, Default, bon::Builder)]
89pub struct ReadOptions {
90    #[serde(default)]
91    #[builder(default)]
92    pub follow: FollowOption,
93    #[serde(default, deserialize_with = "deserialize_bool")]
94    #[builder(default)]
95    pub tail: bool,
96    #[serde(rename = "last-id")]
97    pub last_id: Option<Scru128Id>,
98    pub limit: Option<usize>,
99    #[serde(rename = "context-id")]
100    pub context_id: Option<Scru128Id>,
101    pub topic: Option<String>,
102}
103
104impl ReadOptions {
105    pub fn from_query(query: Option<&str>) -> Result<Self, crate::error::Error> {
106        match query {
107            Some(q) => Ok(serde_urlencoded::from_str(q)?),
108            None => Ok(Self::default()),
109        }
110    }
111
112    pub fn to_query_string(&self) -> String {
113        let mut params = Vec::new();
114
115        // Add follow parameter with heartbeat if specified
116        match self.follow {
117            FollowOption::Off => {}
118            FollowOption::On => params.push(("follow", "true".to_string())),
119            FollowOption::WithHeartbeat(duration) => {
120                params.push(("follow", duration.as_millis().to_string()));
121            }
122        }
123
124        if let Some(context_id) = self.context_id {
125            params.push(("context-id", context_id.to_string()));
126        }
127
128        // Add tail if true
129        if self.tail {
130            params.push(("tail", "true".to_string()));
131        }
132
133        // Add last-id if present
134        if let Some(last_id) = self.last_id {
135            params.push(("last-id", last_id.to_string()));
136        }
137
138        // Add limit if present
139        if let Some(limit) = self.limit {
140            params.push(("limit", limit.to_string()));
141        }
142
143        if let Some(topic) = &self.topic {
144            params.push(("topic", topic.clone()));
145        }
146
147        // Return empty string if no params
148        if params.is_empty() {
149            String::new()
150        } else {
151            url::form_urlencoded::Serializer::new(String::new())
152                .extend_pairs(params)
153                .finish()
154        }
155    }
156}
157
158#[derive(Default, PartialEq, Clone, Debug)]
159pub enum FollowOption {
160    #[default]
161    Off,
162    On,
163    WithHeartbeat(Duration),
164}
165
166#[derive(Debug)]
167enum GCTask {
168    Remove(Scru128Id),
169    CheckHeadTTL {
170        context_id: Scru128Id,
171        topic: String,
172        keep: u32,
173    },
174    Drain(tokio::sync::oneshot::Sender<()>),
175}
176
177#[derive(Clone)]
178pub struct Store {
179    pub path: PathBuf,
180    keyspace: Keyspace,
181    frame_partition: PartitionHandle,
182    idx_topic: PartitionHandle,
183    idx_context: PartitionHandle,
184    contexts: Arc<RwLock<HashSet<Scru128Id>>>,
185    broadcast_tx: broadcast::Sender<Frame>,
186    gc_tx: UnboundedSender<GCTask>,
187}
188
189impl Store {
190    pub fn new(path: PathBuf) -> Store {
191        let config = Config::new(path.join("fjall"));
192        let keyspace = config
193            .flush_workers(1)
194            .compaction_workers(1)
195            .open()
196            .unwrap();
197
198        let frame_partition = keyspace
199            .open_partition("stream", PartitionCreateOptions::default())
200            .unwrap();
201
202        let idx_topic = keyspace
203            .open_partition("idx_topic", PartitionCreateOptions::default())
204            .unwrap();
205
206        let idx_context = keyspace
207            .open_partition("idx_context", PartitionCreateOptions::default())
208            .unwrap();
209
210        let (broadcast_tx, _) = broadcast::channel(1024);
211        let (gc_tx, gc_rx) = mpsc::unbounded_channel();
212
213        let mut contexts = HashSet::new();
214        contexts.insert(ZERO_CONTEXT); // System context is always valid
215
216        let store = Store {
217            path: path.clone(),
218            keyspace: keyspace.clone(),
219            frame_partition: frame_partition.clone(),
220            idx_topic: idx_topic.clone(),
221            idx_context: idx_context.clone(),
222            contexts: Arc::new(RwLock::new(contexts)),
223            broadcast_tx,
224            gc_tx,
225        };
226
227        // Load context registrations
228        for frame in store.read_sync(None, None, Some(ZERO_CONTEXT)) {
229            if frame.topic == "xs.context" {
230                store.contexts.write().unwrap().insert(frame.id);
231            }
232        }
233
234        // Spawn gc worker thread
235        spawn_gc_worker(gc_rx, store.clone());
236
237        store
238    }
239
240    pub async fn wait_for_gc(&self) {
241        let (tx, rx) = tokio::sync::oneshot::channel();
242        let _ = self.gc_tx.send(GCTask::Drain(tx));
243        let _ = rx.await;
244    }
245
246    #[tracing::instrument(skip(self))]
247    pub async fn read(&self, options: ReadOptions) -> tokio::sync::mpsc::Receiver<Frame> {
248        let (tx, rx) = tokio::sync::mpsc::channel(100);
249
250        let should_follow = matches!(
251            options.follow,
252            FollowOption::On | FollowOption::WithHeartbeat(_)
253        );
254
255        // Only take broadcast subscription if following. We initate the subscription here to
256        // ensure we don't miss any messages between historical processing and starting the
257        // broadcast subscription.
258        let broadcast_rx = if should_follow {
259            Some(self.broadcast_tx.subscribe())
260        } else {
261            None
262        };
263
264        // Only create done channel if we're doing historical processing
265        let done_rx = if !options.tail {
266            let (done_tx, done_rx) = tokio::sync::oneshot::channel();
267            let tx_clone = tx.clone();
268            let store = self.clone();
269            let options = options.clone();
270            let should_follow_clone = should_follow;
271            let gc_tx = self.gc_tx.clone();
272
273            // Spawn OS thread to handle historical events
274            std::thread::spawn(move || {
275                let mut last_id = None;
276                let mut count = 0;
277
278                let iter: Box<dyn Iterator<Item = Frame>> = if let Some(ref topic) = options.topic {
279                    store.iter_frames_by_topic(options.context_id, topic, options.last_id.as_ref())
280                } else {
281                    store.iter_frames(options.context_id, options.last_id.as_ref())
282                };
283
284                for frame in iter {
285                    if let Some(TTL::Time(ttl)) = frame.ttl.as_ref() {
286                        if is_expired(&frame.id, ttl) {
287                            let _ = gc_tx.send(GCTask::Remove(frame.id));
288                            continue;
289                        }
290                    }
291
292                    last_id = Some(frame.id);
293
294                    if let Some(limit) = options.limit {
295                        if count >= limit {
296                            return; // Exit early if limit reached
297                        }
298                    }
299
300                    if tx_clone.blocking_send(frame).is_err() {
301                        return;
302                    }
303                    count += 1;
304                }
305
306                // Send threshold message if following and no limit
307                if should_follow_clone && options.limit.is_none() {
308                    let threshold =
309                        Frame::builder("xs.threshold", options.context_id.unwrap_or(ZERO_CONTEXT))
310                            .id(scru128::new())
311                            .ttl(TTL::Ephemeral)
312                            .build();
313                    if tx_clone.blocking_send(threshold).is_err() {
314                        return;
315                    }
316                }
317
318                // Signal completion with the last seen ID and count
319                let _ = done_tx.send((last_id, count));
320            });
321
322            Some(done_rx)
323        } else {
324            None
325        };
326
327        // Handle broadcast subscription and heartbeat
328        if let Some(broadcast_rx) = broadcast_rx {
329            {
330                let tx = tx.clone();
331                let limit = options.limit;
332
333                tokio::spawn(async move {
334                    // If we have a done_rx, wait for historical processing
335                    let (last_id, mut count) = match done_rx {
336                        Some(done_rx) => match done_rx.await {
337                            Ok((id, count)) => (id, count),
338                            Err(_) => return, // Historical processing failed/cancelled
339                        },
340                        None => (None, 0),
341                    };
342
343                    let mut broadcast_rx = broadcast_rx;
344                    while let Ok(frame) = broadcast_rx.recv().await {
345                        // Skip frames that do not match the context_id
346                        if let Some(context_id) = options.context_id {
347                            if frame.context_id != context_id {
348                                continue;
349                            }
350                        }
351
352                        if let Some(ref topic) = options.topic {
353                            if frame.topic != *topic {
354                                continue;
355                            }
356                        }
357
358                        // Skip if we've already seen this frame during historical scan
359                        if let Some(last_scanned_id) = last_id {
360                            if frame.id <= last_scanned_id {
361                                continue;
362                            }
363                        }
364
365                        if tx.send(frame).await.is_err() {
366                            break;
367                        }
368
369                        if let Some(limit) = limit {
370                            count += 1;
371                            if count >= limit {
372                                break;
373                            }
374                        }
375                    }
376                });
377            }
378
379            // Handle heartbeat if requested
380            if let FollowOption::WithHeartbeat(duration) = options.follow {
381                let heartbeat_tx = tx;
382                tokio::spawn(async move {
383                    loop {
384                        tokio::time::sleep(duration).await;
385                        let frame =
386                            Frame::builder("xs.pulse", options.context_id.unwrap_or(ZERO_CONTEXT))
387                                .id(scru128::new())
388                                .ttl(TTL::Ephemeral)
389                                .build();
390                        if heartbeat_tx.send(frame).await.is_err() {
391                            break;
392                        }
393                    }
394                });
395            }
396        }
397
398        rx
399    }
400
401    #[tracing::instrument(skip(self))]
402    pub fn read_sync(
403        &self,
404        last_id: Option<&Scru128Id>,
405        limit: Option<usize>,
406        context_id: Option<Scru128Id>,
407    ) -> impl Iterator<Item = Frame> + '_ {
408        self.iter_frames(context_id, last_id)
409            .filter(move |frame| {
410                if let Some(TTL::Time(ttl)) = frame.ttl.as_ref() {
411                    if is_expired(&frame.id, ttl) {
412                        let _ = self.gc_tx.send(GCTask::Remove(frame.id));
413                        return false;
414                    }
415                }
416                true
417            })
418            .take(limit.unwrap_or(usize::MAX))
419    }
420
421    pub fn get(&self, id: &Scru128Id) -> Option<Frame> {
422        self.frame_partition
423            .get(id.to_bytes())
424            .unwrap()
425            .map(|value| deserialize_frame((id.as_bytes(), value)))
426    }
427
428    #[tracing::instrument(skip(self))]
429    pub fn head(&self, topic: &str, context_id: Scru128Id) -> Option<Frame> {
430        self.idx_topic
431            .prefix(idx_topic_key_prefix(context_id, topic))
432            .rev()
433            .find_map(|kv| self.get(&idx_topic_frame_id_from_key(&kv.unwrap().0)))
434    }
435
436    #[tracing::instrument(skip(self), fields(id = %id.to_string()))]
437    pub fn remove(&self, id: &Scru128Id) -> Result<(), crate::error::Error> {
438        let Some(frame) = self.get(id) else {
439            // Already deleted
440            return Ok(());
441        };
442
443        // Get the index topic key
444        let topic_key = idx_topic_key_from_frame(&frame)?;
445
446        let mut batch = self.keyspace.batch();
447        batch.remove(&self.frame_partition, id.as_bytes());
448        batch.remove(&self.idx_topic, topic_key);
449        batch.remove(&self.idx_context, idx_context_key_from_frame(&frame));
450
451        // If this is a context frame, remove it from the contexts set
452        if frame.topic == "xs.context" {
453            self.contexts.write().unwrap().remove(&frame.id);
454        }
455
456        batch.commit()?;
457        self.keyspace.persist(fjall::PersistMode::SyncAll)?;
458        Ok(())
459    }
460
461    pub async fn cas_reader(&self, hash: ssri::Integrity) -> cacache::Result<cacache::Reader> {
462        cacache::Reader::open_hash(&self.path.join("cacache"), hash).await
463    }
464
465    pub fn cas_reader_sync(&self, hash: ssri::Integrity) -> cacache::Result<cacache::SyncReader> {
466        cacache::SyncReader::open_hash(self.path.join("cacache"), hash)
467    }
468
469    pub async fn cas_writer(&self) -> cacache::Result<cacache::Writer> {
470        cacache::WriteOpts::new()
471            .open_hash(&self.path.join("cacache"))
472            .await
473    }
474
475    pub fn cas_writer_sync(&self) -> cacache::Result<cacache::SyncWriter> {
476        cacache::WriteOpts::new().open_hash_sync(self.path.join("cacache"))
477    }
478
479    pub async fn cas_insert(&self, content: impl AsRef<[u8]>) -> cacache::Result<ssri::Integrity> {
480        cacache::write_hash(&self.path.join("cacache"), content).await
481    }
482
483    pub fn cas_insert_sync(&self, content: impl AsRef<[u8]>) -> cacache::Result<ssri::Integrity> {
484        cacache::write_hash_sync(self.path.join("cacache"), content)
485    }
486
487    pub async fn cas_insert_bytes(&self, bytes: &[u8]) -> cacache::Result<ssri::Integrity> {
488        self.cas_insert(bytes).await
489    }
490
491    pub fn cas_insert_bytes_sync(&self, bytes: &[u8]) -> cacache::Result<ssri::Integrity> {
492        self.cas_insert_sync(bytes)
493    }
494
495    pub async fn cas_read(&self, hash: &ssri::Integrity) -> cacache::Result<Vec<u8>> {
496        cacache::read_hash(&self.path.join("cacache"), hash).await
497    }
498
499    pub fn cas_read_sync(&self, hash: &ssri::Integrity) -> cacache::Result<Vec<u8>> {
500        cacache::read_hash_sync(self.path.join("cacache"), hash)
501    }
502
503    #[tracing::instrument(skip(self))]
504    pub fn insert_frame(&self, frame: &Frame) -> Result<(), crate::error::Error> {
505        let encoded: Vec<u8> = serde_json::to_vec(&frame).unwrap();
506
507        // Get the index topic key
508        let topic_key = idx_topic_key_from_frame(frame)?;
509
510        let mut batch = self.keyspace.batch();
511        batch.insert(&self.frame_partition, frame.id.as_bytes(), encoded);
512        batch.insert(&self.idx_topic, topic_key, b"");
513        batch.insert(&self.idx_context, idx_context_key_from_frame(frame), b"");
514        batch.commit()?;
515        self.keyspace.persist(fjall::PersistMode::SyncAll)?;
516        Ok(())
517    }
518
519    pub fn append(&self, mut frame: Frame) -> Result<Frame, crate::error::Error> {
520        frame.id = scru128::new();
521
522        // Special handling for xs.context registration
523        if frame.topic == "xs.context" {
524            if frame.context_id != ZERO_CONTEXT {
525                return Err("xs.context frames must be in zero context".into());
526            }
527            frame.ttl = Some(TTL::Forever);
528            self.contexts.write().unwrap().insert(frame.id);
529        } else {
530            // Validate context exists
531            let contexts = self.contexts.read().unwrap();
532            if !contexts.contains(&frame.context_id) {
533                return Err(format!(
534                    "Invalid context: {context_id}",
535                    context_id = frame.context_id
536                )
537                .into());
538            }
539        }
540
541        // Check for null byte in topic (in case we're not storing the frame)
542        idx_topic_key_from_frame(&frame)?;
543
544        // only store the frame if it's not ephemeral
545        if frame.ttl != Some(TTL::Ephemeral) {
546            self.insert_frame(&frame)?;
547
548            // If this is a Head TTL, schedule a gc task
549            if let Some(TTL::Head(n)) = frame.ttl {
550                let _ = self.gc_tx.send(GCTask::CheckHeadTTL {
551                    context_id: frame.context_id,
552                    topic: frame.topic.clone(),
553                    keep: n,
554                });
555            }
556        }
557
558        let _ = self.broadcast_tx.send(frame.clone());
559        Ok(frame)
560    }
561
562    fn iter_frames(
563        &self,
564        context_id: Option<Scru128Id>,
565        last_id: Option<&Scru128Id>,
566    ) -> Box<dyn Iterator<Item = Frame> + '_> {
567        match context_id {
568            Some(ctx_id) => {
569                let start_key = if let Some(last_id) = last_id {
570                    // explicitly combine context_id + last_id
571                    let mut v = Vec::with_capacity(32);
572                    v.extend(ctx_id.as_bytes());
573                    v.extend(last_id.as_bytes());
574                    Bound::Excluded(v)
575                } else {
576                    Bound::Included(ctx_id.as_bytes().to_vec())
577                };
578
579                let end_key = Bound::Excluded(idx_context_key_range_end(ctx_id));
580
581                Box::new(
582                    self.idx_context
583                        .range((start_key, end_key))
584                        .filter_map(move |r| {
585                            let (key, _) = r.ok()?;
586                            let frame_id_bytes = &key[16..];
587                            let frame_id = Scru128Id::from_bytes(frame_id_bytes.try_into().ok()?);
588                            self.get(&frame_id)
589                        }),
590                )
591            }
592            None => {
593                let range = match last_id {
594                    Some(id) => (Bound::Excluded(id.as_bytes().to_vec()), Bound::Unbounded),
595                    None => (Bound::Unbounded, Bound::Unbounded),
596                };
597
598                Box::new(
599                    self.frame_partition
600                        .range(range)
601                        .map(|r| deserialize_frame(r.unwrap())),
602                )
603            }
604        }
605    }
606
607    fn iter_frames_by_topic<'a>(
608        &'a self,
609        context_id: Option<Scru128Id>,
610        topic: &'a str,
611        last_id: Option<&'a Scru128Id>,
612    ) -> Box<dyn Iterator<Item = Frame> + 'a> {
613        if let Some(ctx_id) = context_id {
614            let prefix = idx_topic_key_prefix(ctx_id, topic);
615            Box::new(self.idx_topic.prefix(prefix).filter_map(move |r| {
616                let (key, _) = r.ok()?;
617                let frame_id = idx_topic_frame_id_from_key(&key);
618                if let Some(last) = last_id {
619                    if frame_id <= *last {
620                        return None;
621                    }
622                }
623                self.get(&frame_id)
624            }))
625        } else {
626            let range = match last_id {
627                Some(id) => (Bound::Excluded(id.as_bytes().to_vec()), Bound::Unbounded),
628                None => (Bound::Unbounded, Bound::Unbounded),
629            };
630
631            Box::new(self.frame_partition.range(range).filter_map(move |r| {
632                let frame = deserialize_frame(r.unwrap());
633                if frame.topic == topic {
634                    Some(frame)
635                } else {
636                    None
637                }
638            }))
639        }
640    }
641}
642
643fn spawn_gc_worker(mut gc_rx: UnboundedReceiver<GCTask>, store: Store) {
644    std::thread::spawn(move || {
645        while let Some(task) = gc_rx.blocking_recv() {
646            match task {
647                GCTask::Remove(id) => {
648                    let _ = store.remove(&id);
649                }
650
651                GCTask::CheckHeadTTL {
652                    context_id,
653                    topic,
654                    keep,
655                } => {
656                    let prefix = idx_topic_key_prefix(context_id, &topic);
657                    let frames_to_remove: Vec<_> = store
658                        .idx_topic
659                        .prefix(&prefix)
660                        .rev() // Scan from newest to oldest
661                        .skip(keep as usize)
662                        .map(|r| {
663                            Scru128Id::from_bytes(idx_topic_frame_id_from_key(&r.unwrap().0).into())
664                        })
665                        .collect();
666
667                    for frame_id in frames_to_remove {
668                        let _ = store.remove(&frame_id);
669                    }
670                }
671
672                GCTask::Drain(tx) => {
673                    let _ = tx.send(());
674                }
675            }
676        }
677    });
678}
679
680fn is_expired(id: &Scru128Id, ttl: &Duration) -> bool {
681    let created_ms = id.timestamp();
682    let expires_ms = created_ms.saturating_add(ttl.as_millis() as u64);
683    let now_ms = std::time::SystemTime::now()
684        .duration_since(std::time::UNIX_EPOCH)
685        .unwrap()
686        .as_millis() as u64;
687
688    now_ms >= expires_ms
689}
690
691const NULL_DELIMITER: u8 = 0;
692
693fn idx_topic_key_prefix(context_id: Scru128Id, topic: &str) -> Vec<u8> {
694    let mut v = Vec::with_capacity(16 + topic.len() + 1); // context_id (16) + topic bytes + delimiter
695    v.extend(context_id.as_bytes()); // binary context_id (16 bytes)
696    v.extend(topic.as_bytes()); // topic string as UTF-8 bytes
697    v.push(NULL_DELIMITER); // Delimiter for variable-sized keys
698    v
699}
700
701pub(crate) fn idx_topic_key_from_frame(frame: &Frame) -> Result<Vec<u8>, crate::error::Error> {
702    // Check if the topic contains a null byte when encoded as UTF-8
703    if frame.topic.as_bytes().contains(&NULL_DELIMITER) {
704        return Err(
705            "Topic cannot contain null byte (0x00) as it's used as a delimiter"
706                .to_string()
707                .into(),
708        );
709    }
710    let mut v = idx_topic_key_prefix(frame.context_id, &frame.topic);
711    v.extend(frame.id.as_bytes());
712    Ok(v)
713}
714
715fn idx_topic_frame_id_from_key(key: &[u8]) -> Scru128Id {
716    let frame_id_bytes = &key[key.len() - 16..];
717    Scru128Id::from_bytes(frame_id_bytes.try_into().unwrap())
718}
719
720// Creates a key for the context index: <context_id><frame_id>
721fn idx_context_key_from_frame(frame: &Frame) -> Vec<u8> {
722    let mut v = Vec::with_capacity(frame.context_id.as_bytes().len() + frame.id.as_bytes().len());
723    v.extend(frame.context_id.as_bytes());
724    v.extend(frame.id.as_bytes());
725    v
726}
727
728// Returns the key prefix for the next context after the given one
729fn idx_context_key_range_end(context_id: Scru128Id) -> Vec<u8> {
730    let mut i = context_id.to_u128();
731
732    // NOTE: Reaching u128::MAX is probably not gonna happen...
733    i = i.saturating_add(1);
734
735    Scru128Id::from(i).as_bytes().to_vec()
736}
737
738fn deserialize_frame<B1: AsRef<[u8]> + std::fmt::Debug, B2: AsRef<[u8]>>(
739    record: (B1, B2),
740) -> Frame {
741    serde_json::from_slice(record.1.as_ref()).unwrap_or_else(|e| {
742        // Try to convert the key to a Scru128Id and print in a format that can be copied for deletion
743        let key_bytes = record.0.as_ref();
744        if key_bytes.len() == 16 {
745            if let Ok(bytes) = key_bytes.try_into() {
746                let id = Scru128Id::from_bytes(bytes);
747                eprintln!("CORRUPTED_RECORD_ID: {id}");
748            }
749        }
750        let key = std::str::from_utf8(record.0.as_ref()).unwrap();
751        let value = std::str::from_utf8(record.1.as_ref()).unwrap();
752        panic!("Failed to deserialize frame: {e} {key} {value}")
753    })
754}