xs/store/
mod.rs

1mod ttl;
2pub use ttl::*;
3
4#[cfg(test)]
5mod tests;
6
7use std::ops::Bound;
8use std::path::PathBuf;
9use std::time::Duration;
10
11use tokio::sync::broadcast;
12use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender};
13
14use std::collections::HashSet;
15use std::sync::{Arc, Mutex, RwLock};
16
17use scru128::Scru128Id;
18
19use serde::{Deserialize, Deserializer, Serialize};
20
21use fjall::{Config, Keyspace, PartitionCreateOptions, PartitionHandle};
22
23// Context with all bits set to zero for system operations
24pub const ZERO_CONTEXT: Scru128Id = Scru128Id::from_bytes([0; 16]);
25
26#[derive(PartialEq, Eq, Serialize, Deserialize, Clone, Default, bon::Builder)]
27pub struct Frame {
28    #[builder(start_fn, into)]
29    pub topic: String,
30    #[builder(start_fn)]
31    pub context_id: Scru128Id,
32    #[builder(default)]
33    pub id: Scru128Id,
34    pub hash: Option<ssri::Integrity>,
35    pub meta: Option<serde_json::Value>,
36    pub ttl: Option<TTL>,
37}
38
39use std::fmt;
40
41impl fmt::Debug for Frame {
42    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
43        f.debug_struct("Frame")
44            .field("id", &format!("{id}", id = self.id))
45            .field(
46                "context_id",
47                &format!("{context_id}", context_id = self.context_id),
48            )
49            .field("topic", &self.topic)
50            .field("hash", &self.hash.as_ref().map(|x| format!("{x}")))
51            .field("meta", &self.meta)
52            .field("ttl", &self.ttl)
53            .finish()
54    }
55}
56
57impl<'de> Deserialize<'de> for FollowOption {
58    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
59    where
60        D: Deserializer<'de>,
61    {
62        let s: String = Deserialize::deserialize(deserializer)?;
63        if s.is_empty() || s == "yes" {
64            Ok(FollowOption::On)
65        } else if let Ok(duration) = s.parse::<u64>() {
66            Ok(FollowOption::WithHeartbeat(Duration::from_millis(duration)))
67        } else {
68            match s.as_str() {
69                "true" => Ok(FollowOption::On),
70                "false" | "no" => Ok(FollowOption::Off),
71                _ => Err(serde::de::Error::custom("Invalid value for follow option")),
72            }
73        }
74    }
75}
76
77fn deserialize_bool<'de, D>(deserializer: D) -> Result<bool, D::Error>
78where
79    D: Deserializer<'de>,
80{
81    let s: String = Deserialize::deserialize(deserializer)?;
82    match s.as_str() {
83        "false" | "no" | "0" => Ok(false),
84        _ => Ok(true),
85    }
86}
87
88#[derive(PartialEq, Deserialize, Clone, Debug, Default, bon::Builder)]
89pub struct ReadOptions {
90    #[serde(default)]
91    #[builder(default)]
92    pub follow: FollowOption,
93    #[serde(default, deserialize_with = "deserialize_bool")]
94    #[builder(default)]
95    pub tail: bool,
96    #[serde(rename = "last-id")]
97    pub last_id: Option<Scru128Id>,
98    pub limit: Option<usize>,
99    #[serde(rename = "context-id")]
100    pub context_id: Option<Scru128Id>,
101    pub topic: Option<String>,
102}
103
104impl ReadOptions {
105    pub fn from_query(query: Option<&str>) -> Result<Self, crate::error::Error> {
106        match query {
107            Some(q) => Ok(serde_urlencoded::from_str(q)?),
108            None => Ok(Self::default()),
109        }
110    }
111
112    pub fn to_query_string(&self) -> String {
113        let mut params = Vec::new();
114
115        // Add follow parameter with heartbeat if specified
116        match self.follow {
117            FollowOption::Off => {}
118            FollowOption::On => params.push(("follow", "true".to_string())),
119            FollowOption::WithHeartbeat(duration) => {
120                params.push(("follow", duration.as_millis().to_string()));
121            }
122        }
123
124        if let Some(context_id) = self.context_id {
125            params.push(("context-id", context_id.to_string()));
126        }
127
128        // Add tail if true
129        if self.tail {
130            params.push(("tail", "true".to_string()));
131        }
132
133        // Add last-id if present
134        if let Some(last_id) = self.last_id {
135            params.push(("last-id", last_id.to_string()));
136        }
137
138        // Add limit if present
139        if let Some(limit) = self.limit {
140            params.push(("limit", limit.to_string()));
141        }
142
143        if let Some(topic) = &self.topic {
144            params.push(("topic", topic.clone()));
145        }
146
147        // Return empty string if no params
148        if params.is_empty() {
149            String::new()
150        } else {
151            url::form_urlencoded::Serializer::new(String::new())
152                .extend_pairs(params)
153                .finish()
154        }
155    }
156}
157
158#[derive(Default, PartialEq, Clone, Debug)]
159pub enum FollowOption {
160    #[default]
161    Off,
162    On,
163    WithHeartbeat(Duration),
164}
165
166#[derive(Debug)]
167enum GCTask {
168    Remove(Scru128Id),
169    CheckHeadTTL {
170        context_id: Scru128Id,
171        topic: String,
172        keep: u32,
173    },
174    Drain(tokio::sync::oneshot::Sender<()>),
175}
176
177#[derive(Clone)]
178pub struct Store {
179    pub path: PathBuf,
180    keyspace: Keyspace,
181    frame_partition: PartitionHandle,
182    idx_topic: PartitionHandle,
183    idx_context: PartitionHandle,
184    contexts: Arc<RwLock<HashSet<Scru128Id>>>,
185    broadcast_tx: broadcast::Sender<Frame>,
186    gc_tx: UnboundedSender<GCTask>,
187    append_lock: Arc<Mutex<()>>,
188}
189
190impl Store {
191    pub fn new(path: PathBuf) -> Store {
192        let config = Config::new(path.join("fjall"));
193        let keyspace = config
194            .flush_workers(1)
195            .compaction_workers(1)
196            .open()
197            .unwrap();
198
199        let frame_partition = keyspace
200            .open_partition("stream", PartitionCreateOptions::default())
201            .unwrap();
202
203        let idx_topic = keyspace
204            .open_partition("idx_topic", PartitionCreateOptions::default())
205            .unwrap();
206
207        let idx_context = keyspace
208            .open_partition("idx_context", PartitionCreateOptions::default())
209            .unwrap();
210
211        let (broadcast_tx, _) = broadcast::channel(1024);
212        let (gc_tx, gc_rx) = mpsc::unbounded_channel();
213
214        let mut contexts = HashSet::new();
215        contexts.insert(ZERO_CONTEXT); // System context is always valid
216
217        let store = Store {
218            path: path.clone(),
219            keyspace: keyspace.clone(),
220            frame_partition: frame_partition.clone(),
221            idx_topic: idx_topic.clone(),
222            idx_context: idx_context.clone(),
223            contexts: Arc::new(RwLock::new(contexts)),
224            broadcast_tx,
225            gc_tx,
226            append_lock: Arc::new(Mutex::new(())),
227        };
228
229        // Load context registrations
230        for frame in store.read_sync(None, None, Some(ZERO_CONTEXT)) {
231            if frame.topic == "xs.context" {
232                store.contexts.write().unwrap().insert(frame.id);
233            }
234        }
235
236        // Spawn gc worker thread
237        spawn_gc_worker(gc_rx, store.clone());
238
239        store
240    }
241
242    pub async fn wait_for_gc(&self) {
243        let (tx, rx) = tokio::sync::oneshot::channel();
244        let _ = self.gc_tx.send(GCTask::Drain(tx));
245        let _ = rx.await;
246    }
247
248    #[tracing::instrument(skip(self))]
249    pub async fn read(&self, options: ReadOptions) -> tokio::sync::mpsc::Receiver<Frame> {
250        let (tx, rx) = tokio::sync::mpsc::channel(100);
251
252        let should_follow = matches!(
253            options.follow,
254            FollowOption::On | FollowOption::WithHeartbeat(_)
255        );
256
257        // Only take broadcast subscription if following. We initate the subscription here to
258        // ensure we don't miss any messages between historical processing and starting the
259        // broadcast subscription.
260        let broadcast_rx = if should_follow {
261            Some(self.broadcast_tx.subscribe())
262        } else {
263            None
264        };
265
266        // Only create done channel if we're doing historical processing
267        let done_rx = if !options.tail {
268            let (done_tx, done_rx) = tokio::sync::oneshot::channel();
269            let tx_clone = tx.clone();
270            let store = self.clone();
271            let options = options.clone();
272            let should_follow_clone = should_follow;
273            let gc_tx = self.gc_tx.clone();
274
275            // Spawn OS thread to handle historical events
276            std::thread::spawn(move || {
277                let mut last_id = None;
278                let mut count = 0;
279
280                let iter: Box<dyn Iterator<Item = Frame>> = if let Some(ref topic) = options.topic {
281                    store.iter_frames_by_topic(options.context_id, topic, options.last_id.as_ref())
282                } else {
283                    store.iter_frames(options.context_id, options.last_id.as_ref())
284                };
285
286                for frame in iter {
287                    if let Some(TTL::Time(ttl)) = frame.ttl.as_ref() {
288                        if is_expired(&frame.id, ttl) {
289                            let _ = gc_tx.send(GCTask::Remove(frame.id));
290                            continue;
291                        }
292                    }
293
294                    last_id = Some(frame.id);
295
296                    if let Some(limit) = options.limit {
297                        if count >= limit {
298                            return; // Exit early if limit reached
299                        }
300                    }
301
302                    if tx_clone.blocking_send(frame).is_err() {
303                        return;
304                    }
305                    count += 1;
306                }
307
308                // Send threshold message if following and no limit
309                if should_follow_clone && options.limit.is_none() {
310                    let threshold =
311                        Frame::builder("xs.threshold", options.context_id.unwrap_or(ZERO_CONTEXT))
312                            .id(scru128::new())
313                            .ttl(TTL::Ephemeral)
314                            .build();
315                    if tx_clone.blocking_send(threshold).is_err() {
316                        return;
317                    }
318                }
319
320                // Signal completion with the last seen ID and count
321                let _ = done_tx.send((last_id, count));
322            });
323
324            Some(done_rx)
325        } else {
326            None
327        };
328
329        // Handle broadcast subscription and heartbeat
330        if let Some(broadcast_rx) = broadcast_rx {
331            {
332                let tx = tx.clone();
333                let limit = options.limit;
334
335                tokio::spawn(async move {
336                    // If we have a done_rx, wait for historical processing
337                    let (last_id, mut count) = match done_rx {
338                        Some(done_rx) => match done_rx.await {
339                            Ok((id, count)) => (id, count),
340                            Err(_) => return, // Historical processing failed/cancelled
341                        },
342                        None => (None, 0),
343                    };
344
345                    let mut broadcast_rx = broadcast_rx;
346                    while let Ok(frame) = broadcast_rx.recv().await {
347                        // Skip frames that do not match the context_id
348                        if let Some(context_id) = options.context_id {
349                            if frame.context_id != context_id {
350                                continue;
351                            }
352                        }
353
354                        if let Some(ref topic) = options.topic {
355                            if frame.topic != *topic {
356                                continue;
357                            }
358                        }
359
360                        // Skip if we've already seen this frame during historical scan
361                        if let Some(last_scanned_id) = last_id {
362                            if frame.id <= last_scanned_id {
363                                continue;
364                            }
365                        }
366
367                        if tx.send(frame).await.is_err() {
368                            break;
369                        }
370
371                        if let Some(limit) = limit {
372                            count += 1;
373                            if count >= limit {
374                                break;
375                            }
376                        }
377                    }
378                });
379            }
380
381            // Handle heartbeat if requested
382            if let FollowOption::WithHeartbeat(duration) = options.follow {
383                let heartbeat_tx = tx;
384                tokio::spawn(async move {
385                    loop {
386                        tokio::time::sleep(duration).await;
387                        let frame =
388                            Frame::builder("xs.pulse", options.context_id.unwrap_or(ZERO_CONTEXT))
389                                .id(scru128::new())
390                                .ttl(TTL::Ephemeral)
391                                .build();
392                        if heartbeat_tx.send(frame).await.is_err() {
393                            break;
394                        }
395                    }
396                });
397            }
398        }
399
400        rx
401    }
402
403    #[tracing::instrument(skip(self))]
404    pub fn read_sync(
405        &self,
406        last_id: Option<&Scru128Id>,
407        limit: Option<usize>,
408        context_id: Option<Scru128Id>,
409    ) -> impl Iterator<Item = Frame> + '_ {
410        self.iter_frames(context_id, last_id)
411            .filter(move |frame| {
412                if let Some(TTL::Time(ttl)) = frame.ttl.as_ref() {
413                    if is_expired(&frame.id, ttl) {
414                        let _ = self.gc_tx.send(GCTask::Remove(frame.id));
415                        return false;
416                    }
417                }
418                true
419            })
420            .take(limit.unwrap_or(usize::MAX))
421    }
422
423    pub fn get(&self, id: &Scru128Id) -> Option<Frame> {
424        self.frame_partition
425            .get(id.to_bytes())
426            .unwrap()
427            .map(|value| deserialize_frame((id.as_bytes(), value)))
428    }
429
430    #[tracing::instrument(skip(self))]
431    pub fn head(&self, topic: &str, context_id: Scru128Id) -> Option<Frame> {
432        self.idx_topic
433            .prefix(idx_topic_key_prefix(context_id, topic))
434            .rev()
435            .find_map(|kv| self.get(&idx_topic_frame_id_from_key(&kv.unwrap().0)))
436    }
437
438    #[tracing::instrument(skip(self), fields(id = %id.to_string()))]
439    pub fn remove(&self, id: &Scru128Id) -> Result<(), crate::error::Error> {
440        let Some(frame) = self.get(id) else {
441            // Already deleted
442            return Ok(());
443        };
444
445        // Get the index topic key
446        let topic_key = idx_topic_key_from_frame(&frame)?;
447
448        let mut batch = self.keyspace.batch();
449        batch.remove(&self.frame_partition, id.as_bytes());
450        batch.remove(&self.idx_topic, topic_key);
451        batch.remove(&self.idx_context, idx_context_key_from_frame(&frame));
452
453        // If this is a context frame, remove it from the contexts set
454        if frame.topic == "xs.context" {
455            self.contexts.write().unwrap().remove(&frame.id);
456        }
457
458        batch.commit()?;
459        self.keyspace.persist(fjall::PersistMode::SyncAll)?;
460        Ok(())
461    }
462
463    pub async fn cas_reader(&self, hash: ssri::Integrity) -> cacache::Result<cacache::Reader> {
464        cacache::Reader::open_hash(&self.path.join("cacache"), hash).await
465    }
466
467    pub fn cas_reader_sync(&self, hash: ssri::Integrity) -> cacache::Result<cacache::SyncReader> {
468        cacache::SyncReader::open_hash(self.path.join("cacache"), hash)
469    }
470
471    pub async fn cas_writer(&self) -> cacache::Result<cacache::Writer> {
472        cacache::WriteOpts::new()
473            .open_hash(&self.path.join("cacache"))
474            .await
475    }
476
477    pub fn cas_writer_sync(&self) -> cacache::Result<cacache::SyncWriter> {
478        cacache::WriteOpts::new().open_hash_sync(self.path.join("cacache"))
479    }
480
481    pub async fn cas_insert(&self, content: impl AsRef<[u8]>) -> cacache::Result<ssri::Integrity> {
482        cacache::write_hash(&self.path.join("cacache"), content).await
483    }
484
485    pub fn cas_insert_sync(&self, content: impl AsRef<[u8]>) -> cacache::Result<ssri::Integrity> {
486        cacache::write_hash_sync(self.path.join("cacache"), content)
487    }
488
489    pub async fn cas_insert_bytes(&self, bytes: &[u8]) -> cacache::Result<ssri::Integrity> {
490        self.cas_insert(bytes).await
491    }
492
493    pub fn cas_insert_bytes_sync(&self, bytes: &[u8]) -> cacache::Result<ssri::Integrity> {
494        self.cas_insert_sync(bytes)
495    }
496
497    pub async fn cas_read(&self, hash: &ssri::Integrity) -> cacache::Result<Vec<u8>> {
498        cacache::read_hash(&self.path.join("cacache"), hash).await
499    }
500
501    pub fn cas_read_sync(&self, hash: &ssri::Integrity) -> cacache::Result<Vec<u8>> {
502        cacache::read_hash_sync(self.path.join("cacache"), hash)
503    }
504
505    #[tracing::instrument(skip(self))]
506    pub fn insert_frame(&self, frame: &Frame) -> Result<(), crate::error::Error> {
507        let encoded: Vec<u8> = serde_json::to_vec(&frame).unwrap();
508
509        // Get the index topic key
510        let topic_key = idx_topic_key_from_frame(frame)?;
511
512        let mut batch = self.keyspace.batch();
513        batch.insert(&self.frame_partition, frame.id.as_bytes(), encoded);
514        batch.insert(&self.idx_topic, topic_key, b"");
515        batch.insert(&self.idx_context, idx_context_key_from_frame(frame), b"");
516        batch.commit()?;
517        self.keyspace.persist(fjall::PersistMode::SyncAll)?;
518        Ok(())
519    }
520
521    pub fn append(&self, mut frame: Frame) -> Result<Frame, crate::error::Error> {
522        // Serialize all appends to ensure ID generation, write, and broadcast
523        // happen atomically. This guarantees subscribers receive frames in
524        // scru128 ID order.
525        let _guard = self.append_lock.lock().unwrap();
526
527        frame.id = scru128::new();
528
529        // Special handling for xs.context registration
530        if frame.topic == "xs.context" {
531            if frame.context_id != ZERO_CONTEXT {
532                return Err("xs.context frames must be in zero context".into());
533            }
534            frame.ttl = Some(TTL::Forever);
535            self.contexts.write().unwrap().insert(frame.id);
536        } else {
537            // Validate context exists
538            let contexts = self.contexts.read().unwrap();
539            if !contexts.contains(&frame.context_id) {
540                return Err(format!(
541                    "Invalid context: {context_id}",
542                    context_id = frame.context_id
543                )
544                .into());
545            }
546        }
547
548        // Check for null byte in topic (in case we're not storing the frame)
549        idx_topic_key_from_frame(&frame)?;
550
551        // only store the frame if it's not ephemeral
552        if frame.ttl != Some(TTL::Ephemeral) {
553            self.insert_frame(&frame)?;
554
555            // If this is a Head TTL, schedule a gc task
556            if let Some(TTL::Head(n)) = frame.ttl {
557                let _ = self.gc_tx.send(GCTask::CheckHeadTTL {
558                    context_id: frame.context_id,
559                    topic: frame.topic.clone(),
560                    keep: n,
561                });
562            }
563        }
564
565        let _ = self.broadcast_tx.send(frame.clone());
566        Ok(frame)
567    }
568
569    fn iter_frames(
570        &self,
571        context_id: Option<Scru128Id>,
572        last_id: Option<&Scru128Id>,
573    ) -> Box<dyn Iterator<Item = Frame> + '_> {
574        match context_id {
575            Some(ctx_id) => {
576                let start_key = if let Some(last_id) = last_id {
577                    // explicitly combine context_id + last_id
578                    let mut v = Vec::with_capacity(32);
579                    v.extend(ctx_id.as_bytes());
580                    v.extend(last_id.as_bytes());
581                    Bound::Excluded(v)
582                } else {
583                    Bound::Included(ctx_id.as_bytes().to_vec())
584                };
585
586                let end_key = Bound::Excluded(idx_context_key_range_end(ctx_id));
587
588                Box::new(
589                    self.idx_context
590                        .range((start_key, end_key))
591                        .filter_map(move |r| {
592                            let (key, _) = r.ok()?;
593                            let frame_id_bytes = &key[16..];
594                            let frame_id = Scru128Id::from_bytes(frame_id_bytes.try_into().ok()?);
595                            self.get(&frame_id)
596                        }),
597                )
598            }
599            None => {
600                let range = match last_id {
601                    Some(id) => (Bound::Excluded(id.as_bytes().to_vec()), Bound::Unbounded),
602                    None => (Bound::Unbounded, Bound::Unbounded),
603                };
604
605                Box::new(
606                    self.frame_partition
607                        .range(range)
608                        .map(|r| deserialize_frame(r.unwrap())),
609                )
610            }
611        }
612    }
613
614    fn iter_frames_by_topic<'a>(
615        &'a self,
616        context_id: Option<Scru128Id>,
617        topic: &'a str,
618        last_id: Option<&'a Scru128Id>,
619    ) -> Box<dyn Iterator<Item = Frame> + 'a> {
620        if let Some(ctx_id) = context_id {
621            let prefix = idx_topic_key_prefix(ctx_id, topic);
622            Box::new(self.idx_topic.prefix(prefix).filter_map(move |r| {
623                let (key, _) = r.ok()?;
624                let frame_id = idx_topic_frame_id_from_key(&key);
625                if let Some(last) = last_id {
626                    if frame_id <= *last {
627                        return None;
628                    }
629                }
630                self.get(&frame_id)
631            }))
632        } else {
633            let range = match last_id {
634                Some(id) => (Bound::Excluded(id.as_bytes().to_vec()), Bound::Unbounded),
635                None => (Bound::Unbounded, Bound::Unbounded),
636            };
637
638            Box::new(self.frame_partition.range(range).filter_map(move |r| {
639                let frame = deserialize_frame(r.unwrap());
640                if frame.topic == topic {
641                    Some(frame)
642                } else {
643                    None
644                }
645            }))
646        }
647    }
648}
649
650fn spawn_gc_worker(mut gc_rx: UnboundedReceiver<GCTask>, store: Store) {
651    std::thread::spawn(move || {
652        while let Some(task) = gc_rx.blocking_recv() {
653            match task {
654                GCTask::Remove(id) => {
655                    let _ = store.remove(&id);
656                }
657
658                GCTask::CheckHeadTTL {
659                    context_id,
660                    topic,
661                    keep,
662                } => {
663                    let prefix = idx_topic_key_prefix(context_id, &topic);
664                    let frames_to_remove: Vec<_> = store
665                        .idx_topic
666                        .prefix(&prefix)
667                        .rev() // Scan from newest to oldest
668                        .skip(keep as usize)
669                        .map(|r| {
670                            Scru128Id::from_bytes(idx_topic_frame_id_from_key(&r.unwrap().0).into())
671                        })
672                        .collect();
673
674                    for frame_id in frames_to_remove {
675                        let _ = store.remove(&frame_id);
676                    }
677                }
678
679                GCTask::Drain(tx) => {
680                    let _ = tx.send(());
681                }
682            }
683        }
684    });
685}
686
687fn is_expired(id: &Scru128Id, ttl: &Duration) -> bool {
688    let created_ms = id.timestamp();
689    let expires_ms = created_ms.saturating_add(ttl.as_millis() as u64);
690    let now_ms = std::time::SystemTime::now()
691        .duration_since(std::time::UNIX_EPOCH)
692        .unwrap()
693        .as_millis() as u64;
694
695    now_ms >= expires_ms
696}
697
698const NULL_DELIMITER: u8 = 0;
699
700fn idx_topic_key_prefix(context_id: Scru128Id, topic: &str) -> Vec<u8> {
701    let mut v = Vec::with_capacity(16 + topic.len() + 1); // context_id (16) + topic bytes + delimiter
702    v.extend(context_id.as_bytes()); // binary context_id (16 bytes)
703    v.extend(topic.as_bytes()); // topic string as UTF-8 bytes
704    v.push(NULL_DELIMITER); // Delimiter for variable-sized keys
705    v
706}
707
708pub(crate) fn idx_topic_key_from_frame(frame: &Frame) -> Result<Vec<u8>, crate::error::Error> {
709    // Check if the topic contains a null byte when encoded as UTF-8
710    if frame.topic.as_bytes().contains(&NULL_DELIMITER) {
711        return Err(
712            "Topic cannot contain null byte (0x00) as it's used as a delimiter"
713                .to_string()
714                .into(),
715        );
716    }
717    let mut v = idx_topic_key_prefix(frame.context_id, &frame.topic);
718    v.extend(frame.id.as_bytes());
719    Ok(v)
720}
721
722fn idx_topic_frame_id_from_key(key: &[u8]) -> Scru128Id {
723    let frame_id_bytes = &key[key.len() - 16..];
724    Scru128Id::from_bytes(frame_id_bytes.try_into().unwrap())
725}
726
727// Creates a key for the context index: <context_id><frame_id>
728fn idx_context_key_from_frame(frame: &Frame) -> Vec<u8> {
729    let mut v = Vec::with_capacity(frame.context_id.as_bytes().len() + frame.id.as_bytes().len());
730    v.extend(frame.context_id.as_bytes());
731    v.extend(frame.id.as_bytes());
732    v
733}
734
735// Returns the key prefix for the next context after the given one
736fn idx_context_key_range_end(context_id: Scru128Id) -> Vec<u8> {
737    let mut i = context_id.to_u128();
738
739    // NOTE: Reaching u128::MAX is probably not gonna happen...
740    i = i.saturating_add(1);
741
742    Scru128Id::from(i).as_bytes().to_vec()
743}
744
745fn deserialize_frame<B1: AsRef<[u8]> + std::fmt::Debug, B2: AsRef<[u8]>>(
746    record: (B1, B2),
747) -> Frame {
748    serde_json::from_slice(record.1.as_ref()).unwrap_or_else(|e| {
749        // Try to convert the key to a Scru128Id and print in a format that can be copied for deletion
750        let key_bytes = record.0.as_ref();
751        if key_bytes.len() == 16 {
752            if let Ok(bytes) = key_bytes.try_into() {
753                let id = Scru128Id::from_bytes(bytes);
754                eprintln!("CORRUPTED_RECORD_ID: {id}");
755            }
756        }
757        let key = std::str::from_utf8(record.0.as_ref()).unwrap();
758        let value = std::str::from_utf8(record.1.as_ref()).unwrap();
759        panic!("Failed to deserialize frame: {e} {key} {value}")
760    })
761}