Skip to main content

xs/store/
mod.rs

1//! The event stream store.
2//!
3//! A [`Store`] is an append-only log of [`Frame`]s persisted to a directory on
4//! disk. Append events with [`Store::append`], replay and follow them with
5//! [`Store::read`], and stash payload bytes in the content-addressed store with
6//! the `cas_*` methods.
7//!
8//! ## Topics
9//!
10//! Every frame has a dot-delimited `topic` (for example `clip.add`). Topics form
11//! a hierarchy: a reader can ask for an exact topic, a prefix wildcard like
12//! `clip.*`, `*` for everything, or a comma-separated list of such patterns
13//! (`game.move.*,game.create`). See [`validate_topic`](crate::store::validate_topic)
14//! for the allowed characters and [`ReadOptions::topic`] for querying.
15//!
16//! ## Retention
17//!
18//! Each frame carries a [`TTL`] that controls how long it is kept: forever, for
19//! a fixed duration, only the last N per topic, or ephemeral (broadcast to live
20//! readers but never stored).
21
22mod ttl;
23pub use ttl::*;
24
25#[cfg(test)]
26mod tests;
27
28use std::ops::Bound;
29use std::path::PathBuf;
30use std::time::Duration;
31
32use tokio::sync::broadcast;
33use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender};
34
35use std::sync::{Arc, Mutex};
36
37use nu_protocol::engine::EngineState;
38use scru128::Scru128Id;
39
40use serde::{Deserialize, Deserializer, Serialize};
41
42use fjall::{
43    config::{BlockSizePolicy, HashRatioPolicy},
44    Database, Error as FjallError, Keyspace, KeyspaceCreateOptions, PersistMode,
45};
46
47/// Error returned when opening a [`Store`].
48#[derive(Debug)]
49pub enum StoreError {
50    /// The store directory is already open in another process.
51    Locked,
52    /// An error from the underlying `fjall` database.
53    Other(FjallError),
54}
55
56impl std::fmt::Display for StoreError {
57    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
58        match self {
59            StoreError::Locked => write!(f, "Store is locked by another process"),
60            StoreError::Other(e) => write!(f, "{e}"),
61        }
62    }
63}
64
65impl std::error::Error for StoreError {}
66
67/// A single event in the stream.
68///
69/// A frame is metadata; the payload bytes (if any) live in the content-addressed
70/// store and are referenced by [`hash`](Frame::hash). Build one with the
71/// [`bon`](https://docs.rs/bon) builder, where the topic is the required
72/// starting argument:
73///
74/// ```
75/// use xs::{Frame, TTL};
76///
77/// let frame = Frame::builder("clip.add")
78///     .meta(serde_json::json!({ "source": "keyboard" }))
79///     .ttl(TTL::Last(100))
80///     .build();
81///
82/// assert_eq!(frame.topic, "clip.add");
83/// ```
84///
85/// The [`id`](Frame::id) is assigned by [`Store::append`] at write time, so the
86/// value you set on a builder is ignored when appending.
87#[derive(PartialEq, Eq, Serialize, Deserialize, Clone, Default, bon::Builder)]
88pub struct Frame {
89    /// Dot-delimited topic this frame belongs to (for example `clip.add`).
90    ///
91    /// Must satisfy [`validate_topic`]; [`Store::append`] rejects invalid topics.
92    #[builder(start_fn, into)]
93    pub topic: String,
94    /// Time-sortable identifier. Assigned by [`Store::append`]; any value set
95    /// before appending is overwritten.
96    #[builder(default)]
97    pub id: Scru128Id,
98    /// Integrity hash of the payload in the content-addressed store, if this
99    /// frame has one. Produce it with [`Store::cas_insert`].
100    pub hash: Option<ssri::Integrity>,
101    /// Arbitrary JSON metadata carried inline with the frame.
102    pub meta: Option<serde_json::Value>,
103    /// Retention policy for this frame. Defaults to [`TTL::Forever`] when unset.
104    pub ttl: Option<TTL>,
105}
106
107use std::fmt;
108
109impl fmt::Debug for Frame {
110    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
111        f.debug_struct("Frame")
112            .field("id", &format!("{id}", id = self.id))
113            .field("topic", &self.topic)
114            .field("hash", &self.hash.as_ref().map(|x| format!("{x}")))
115            .field("meta", &self.meta)
116            .field("ttl", &self.ttl)
117            .finish()
118    }
119}
120
121impl<'de> Deserialize<'de> for FollowOption {
122    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
123    where
124        D: Deserializer<'de>,
125    {
126        let s: String = Deserialize::deserialize(deserializer)?;
127        if s.is_empty() || s == "yes" {
128            Ok(FollowOption::On)
129        } else if let Ok(duration) = s.parse::<u64>() {
130            Ok(FollowOption::WithHeartbeat(Duration::from_millis(duration)))
131        } else {
132            match s.as_str() {
133                "true" => Ok(FollowOption::On),
134                "false" | "no" => Ok(FollowOption::Off),
135                _ => Err(serde::de::Error::custom("Invalid value for follow option")),
136            }
137        }
138    }
139}
140
141fn deserialize_bool<'de, D>(deserializer: D) -> Result<bool, D::Error>
142where
143    D: Deserializer<'de>,
144{
145    let s: String = Deserialize::deserialize(deserializer)?;
146    match s.as_str() {
147        "false" | "no" | "0" => Ok(false),
148        _ => Ok(true),
149    }
150}
151
152/// Options controlling a [`Store::read`] or [`Store::read_sync`] call.
153///
154/// Defaults replay every stored frame once, oldest first, then stop. Build a
155/// query with the [`bon`](https://docs.rs/bon) builder:
156///
157/// ```
158/// use xs::{ReadOptions, FollowOption};
159///
160/// // Replay history for one topic, then keep streaming new appends.
161/// let opts = ReadOptions::builder()
162///     .topic("clip.*".to_string())
163///     .follow(FollowOption::On)
164///     .build();
165/// ```
166#[derive(PartialEq, Deserialize, Clone, Debug, Default, bon::Builder)]
167pub struct ReadOptions {
168    /// Whether to keep streaming live appends after history is replayed.
169    /// Defaults to [`FollowOption::Off`].
170    #[serde(default)]
171    #[builder(default)]
172    pub follow: FollowOption,
173    /// Skip historical frames and emit only appends made after the read starts.
174    #[serde(default, deserialize_with = "deserialize_bool")]
175    #[builder(default)]
176    pub new: bool,
177    /// Start after this ID (exclusive).
178    #[serde(rename = "after")]
179    pub after: Option<Scru128Id>,
180    /// Start from this ID (inclusive).
181    pub from: Option<Scru128Id>,
182    /// Stop after emitting this many historical frames.
183    pub limit: Option<usize>,
184    /// Return the last N frames (most recent), in chronological order.
185    pub last: Option<usize>,
186    /// Restrict to one or more topic patterns, comma-separated. Each pattern
187    /// is an exact name, a `prefix.*` wildcard, or `*` for all, e.g.
188    /// `clip.add` or `game.move.*,game.create`. Commas are a safe separator
189    /// because [`validate_topic`] forbids them in topic names.
190    pub topic: Option<String>,
191}
192
193/// A single topic pattern: an exact topic name or a `prefix.*` wildcard.
194#[derive(Clone, Debug, PartialEq)]
195pub enum Pattern {
196    /// Matches the topic exactly.
197    Exact(String),
198    /// Matches any topic starting with this prefix. Stored with the trailing
199    /// dot, so `a.*` becomes `Prefix("a.")` and matches `a.b` but not `a`.
200    Prefix(String),
201}
202
203impl Pattern {
204    fn matches(&self, topic: &str) -> bool {
205        match self {
206            Pattern::Exact(t) => t == topic,
207            Pattern::Prefix(p) => topic.starts_with(p.as_str()),
208        }
209    }
210}
211
212/// A parsed topic filter: the value of [`ReadOptions::topic`] split on commas
213/// into individual [`Pattern`]s.
214///
215/// Note that synthetic control frames emitted by a following read --
216/// `xs.threshold` and heartbeat `xs.pulse` -- are delivered directly on the
217/// read channel and are never subject to this filter.
218#[derive(Clone, Debug, PartialEq)]
219pub enum TopicFilter {
220    /// Match every topic (no filter, or some element was `*`).
221    All,
222    /// Match topics satisfying at least one of these patterns.
223    Patterns(Vec<Pattern>),
224}
225
226impl TopicFilter {
227    /// Parse a comma-separated pattern list. Empty elements are ignored; an
228    /// empty or all-`*` spec yields [`TopicFilter::All`].
229    pub fn parse(spec: &str) -> TopicFilter {
230        let mut patterns = Vec::new();
231        for part in spec.split(',') {
232            let part = part.trim();
233            if part.is_empty() {
234                continue;
235            }
236            if part == "*" {
237                return TopicFilter::All;
238            }
239            if let Some(prefix) = part.strip_suffix(".*") {
240                patterns.push(Pattern::Prefix(format!("{prefix}.")));
241            } else {
242                patterns.push(Pattern::Exact(part.to_string()));
243            }
244        }
245        if patterns.is_empty() {
246            TopicFilter::All
247        } else {
248            TopicFilter::Patterns(patterns)
249        }
250    }
251
252    /// Parse an optional spec; `None` means no filter.
253    pub fn from_option(spec: Option<&str>) -> TopicFilter {
254        match spec {
255            Some(s) => TopicFilter::parse(s),
256            None => TopicFilter::All,
257        }
258    }
259
260    /// Does `topic` match this filter?
261    pub fn matches(&self, topic: &str) -> bool {
262        match self {
263            TopicFilter::All => true,
264            TopicFilter::Patterns(patterns) => patterns.iter().any(|p| p.matches(topic)),
265        }
266    }
267}
268
269/// K-way merge of per-pattern frame iterators, ordered by frame id, with
270/// dedupe when overlapping patterns yield the same frame from more than one
271/// iterator. Each input iterator must itself be sorted by id (ascending when
272/// `descending` is false, descending otherwise).
273struct MergeById<'a> {
274    iters: Vec<Box<dyn Iterator<Item = Frame> + 'a>>,
275    heap: std::collections::BinaryHeap<MergeEntry>,
276    descending: bool,
277    last_emitted: Option<Scru128Id>,
278}
279
280struct MergeEntry {
281    key: u128,
282    idx: usize,
283    frame: Frame,
284}
285
286impl PartialEq for MergeEntry {
287    fn eq(&self, other: &Self) -> bool {
288        self.key == other.key && self.idx == other.idx
289    }
290}
291impl Eq for MergeEntry {}
292impl PartialOrd for MergeEntry {
293    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
294        Some(self.cmp(other))
295    }
296}
297impl Ord for MergeEntry {
298    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
299        self.key.cmp(&other.key).then(self.idx.cmp(&other.idx))
300    }
301}
302
303/// BinaryHeap is a max-heap; for ascending merges flip the bits so the
304/// smallest id pops first.
305fn merge_key(id: &Scru128Id, descending: bool) -> u128 {
306    let raw = u128::from_be_bytes(id.to_bytes());
307    if descending {
308        raw
309    } else {
310        !raw
311    }
312}
313
314impl<'a> MergeById<'a> {
315    fn new(mut iters: Vec<Box<dyn Iterator<Item = Frame> + 'a>>, descending: bool) -> Self {
316        let mut heap = std::collections::BinaryHeap::with_capacity(iters.len());
317        for (idx, iter) in iters.iter_mut().enumerate() {
318            if let Some(frame) = iter.next() {
319                heap.push(MergeEntry {
320                    key: merge_key(&frame.id, descending),
321                    idx,
322                    frame,
323                });
324            }
325        }
326        MergeById {
327            iters,
328            heap,
329            descending,
330            last_emitted: None,
331        }
332    }
333}
334
335impl Iterator for MergeById<'_> {
336    type Item = Frame;
337
338    fn next(&mut self) -> Option<Frame> {
339        loop {
340            let entry = self.heap.pop()?;
341            if let Some(frame) = self.iters[entry.idx].next() {
342                self.heap.push(MergeEntry {
343                    key: merge_key(&frame.id, self.descending),
344                    idx: entry.idx,
345                    frame,
346                });
347            }
348            // Overlapping patterns can yield the same frame from more than
349            // one iterator; emit it once.
350            if self.last_emitted == Some(entry.frame.id) {
351                continue;
352            }
353            self.last_emitted = Some(entry.frame.id);
354            return Some(entry.frame);
355        }
356    }
357}
358
359impl ReadOptions {
360    /// Parse options from a URL query string (the form used by the HTTP API),
361    /// for example `follow=true&topic=clip.*&last=10`. `None` yields the
362    /// defaults.
363    pub fn from_query(query: Option<&str>) -> Result<Self, crate::error::Error> {
364        match query {
365            Some(q) => Ok(serde_urlencoded::from_str(q)?),
366            None => Ok(Self::default()),
367        }
368    }
369
370    /// Render these options back into a URL query string, the inverse of
371    /// [`from_query`](ReadOptions::from_query). Returns an empty string when no
372    /// options are set.
373    pub fn to_query_string(&self) -> String {
374        let mut params = Vec::new();
375
376        // Add follow parameter with heartbeat if specified
377        match self.follow {
378            FollowOption::Off => {}
379            FollowOption::On => params.push(("follow", "true".to_string())),
380            FollowOption::WithHeartbeat(duration) => {
381                params.push(("follow", duration.as_millis().to_string()));
382            }
383        }
384
385        // Add new if true
386        if self.new {
387            params.push(("new", "true".to_string()));
388        }
389
390        // Add after if present
391        if let Some(after) = self.after {
392            params.push(("after", after.to_string()));
393        }
394
395        // Add from if present
396        if let Some(from) = self.from {
397            params.push(("from", from.to_string()));
398        }
399
400        // Add limit if present
401        if let Some(limit) = self.limit {
402            params.push(("limit", limit.to_string()));
403        }
404
405        // Add last if present
406        if let Some(last) = self.last {
407            params.push(("last", last.to_string()));
408        }
409
410        if let Some(topic) = &self.topic {
411            params.push(("topic", topic.clone()));
412        }
413
414        // Return empty string if no params
415        if params.is_empty() {
416            String::new()
417        } else {
418            url::form_urlencoded::Serializer::new(String::new())
419                .extend_pairs(params)
420                .finish()
421        }
422    }
423}
424
425/// Whether a read keeps streaming after history is replayed.
426#[derive(Default, PartialEq, Clone, Debug)]
427pub enum FollowOption {
428    /// Stop once historical frames are exhausted.
429    #[default]
430    Off,
431    /// Replay history, then stream live appends indefinitely.
432    On,
433    /// Like [`On`](FollowOption::On), but also emit a periodic heartbeat frame
434    /// at the given interval so idle readers can detect a live connection.
435    WithHeartbeat(Duration),
436}
437
438#[derive(Debug)]
439enum GCTask {
440    Remove(Scru128Id),
441    CheckLastTTL { topic: String, keep: u32 },
442    Drain(tokio::sync::oneshot::Sender<()>),
443}
444
445/// An append-only event stream backed by a directory on disk.
446///
447/// Open one with [`new`](Store::new). `Store` is cheaply [`Clone`]able: every
448/// clone shares the same underlying database, broadcast channel, and
449/// content-addressed store, so clone it freely across tasks and threads instead
450/// of wrapping it in an `Arc`.
451///
452/// See the [module docs](crate::store) for the topic and retention model.
453#[derive(Clone)]
454pub struct Store {
455    /// Directory backing this store (the path passed to [`new`](Store::new)).
456    pub path: PathBuf,
457    db: Database,
458    stream: Keyspace,
459    idx_topic: Keyspace,
460    broadcast_tx: broadcast::Sender<Frame>,
461    gc_tx: UnboundedSender<GCTask>,
462    append_lock: Arc<Mutex<()>>,
463    /// Optional base engine the processors clone, set via [`with_base_engine`].
464    /// `None` falls back to `Engine::new()`. See
465    /// [`prepared_base`](crate::nu::prepared_base) and ADR 0007.
466    base_engine: Option<Arc<EngineState>>,
467}
468
469impl Store {
470    /// Open the store at `path`, creating the directory layout if it does not
471    /// exist. Spawns a background worker that garbage-collects expired frames.
472    ///
473    /// # Errors
474    ///
475    /// Returns [`StoreError::Locked`] if another process already holds the
476    /// store open, or [`StoreError::Other`] for any other database error.
477    ///
478    /// ```no_run
479    /// use xs::Store;
480    ///
481    /// let store = Store::new("./clipboard-store".into())?;
482    /// # Ok::<(), xs::StoreError>(())
483    /// ```
484    pub fn new(path: PathBuf) -> Result<Store, StoreError> {
485        let db = match Database::builder(path.join("fjall"))
486            .cache_size(32 * 1024 * 1024) // 32 MiB
487            .worker_threads(1)
488            .open()
489        {
490            Ok(db) => db,
491            Err(FjallError::Locked) => return Err(StoreError::Locked),
492            Err(e) => return Err(StoreError::Other(e)),
493        };
494
495        // Options for stream keyspace: point reads by frame ID
496        let stream_opts = || {
497            KeyspaceCreateOptions::default()
498                .max_memtable_size(8 * 1024 * 1024) // 8 MiB
499                .data_block_size_policy(BlockSizePolicy::all(16 * 1024)) // 16 KiB
500                .data_block_hash_ratio_policy(HashRatioPolicy::all(8.0))
501                .expect_point_read_hits(true)
502        };
503
504        // Options for idx_topic keyspace: prefix scans only
505        let idx_opts = || {
506            KeyspaceCreateOptions::default()
507                .max_memtable_size(8 * 1024 * 1024) // 8 MiB
508                .data_block_size_policy(BlockSizePolicy::all(16 * 1024)) // 16 KiB
509                .data_block_hash_ratio_policy(HashRatioPolicy::all(0.0)) // no point reads
510                .expect_point_read_hits(true)
511        };
512
513        let stream = db.keyspace("stream", stream_opts).unwrap();
514        let idx_topic = db.keyspace("idx_topic", idx_opts).unwrap();
515
516        let (broadcast_tx, _) = broadcast::channel(1024);
517        let (gc_tx, gc_rx) = mpsc::unbounded_channel();
518
519        let store = Store {
520            path: path.clone(),
521            db,
522            stream,
523            idx_topic,
524            broadcast_tx,
525            gc_tx,
526            append_lock: Arc::new(Mutex::new(())),
527            base_engine: None,
528        };
529
530        // Spawn gc worker thread
531        spawn_gc_worker(gc_rx, store.clone());
532
533        Ok(store)
534    }
535
536    /// Set a base engine the processor engines clone.
537    ///
538    /// A program that embeds xs prepares an [`EngineState`] with its own
539    /// commands, env, and consts;
540    /// [`prepared_base`](crate::nu::prepared_base) clones it per spawn and adds
541    /// the store commands, so the actor, service, and action processors get
542    /// those commands. Shared across `Store` clones, so set it once, before the
543    /// processors spawn. No base set: `Engine::new()`.
544    pub fn with_base_engine(mut self, base: EngineState) -> Self {
545        self.base_engine = Some(Arc::new(base));
546        self
547    }
548
549    /// The base engine an embedder attached via [`with_base_engine`], if any.
550    pub fn base_engine(&self) -> Option<&EngineState> {
551        self.base_engine.as_deref()
552    }
553
554    /// Wait until the background garbage-collection worker has processed every
555    /// task queued so far. Useful in tests to observe TTL eviction
556    /// deterministically.
557    pub async fn wait_for_gc(&self) {
558        let (tx, rx) = tokio::sync::oneshot::channel();
559        let _ = self.gc_tx.send(GCTask::Drain(tx));
560        let _ = rx.await;
561    }
562
563    /// Read frames into an async channel according to `options`.
564    ///
565    /// By default this replays matching historical frames oldest-first and then
566    /// closes the channel. With [`FollowOption::On`] it instead keeps the
567    /// channel open and streams new appends as they arrive. When following, a
568    /// single ephemeral `xs.threshold` frame is emitted to mark the boundary
569    /// between replayed history and live events.
570    ///
571    /// The returned [`Receiver`](tokio::sync::mpsc::Receiver) is bounded;
572    /// dropping it stops the read. For a blocking, non-async caller use
573    /// [`read_sync`](Store::read_sync).
574    ///
575    /// ```no_run
576    /// use xs::{Store, ReadOptions, FollowOption};
577    ///
578    /// # async fn run(store: Store) {
579    /// let mut rx = store
580    ///     .read(ReadOptions::builder().follow(FollowOption::On).build())
581    ///     .await;
582    /// while let Some(frame) = rx.recv().await {
583    ///     if frame.topic == "xs.threshold" {
584    ///         // caught up to live; everything after this is new
585    ///         continue;
586    ///     }
587    ///     println!("{} {}", frame.id, frame.topic);
588    /// }
589    /// # }
590    /// ```
591    #[tracing::instrument(skip(self))]
592    pub async fn read(&self, options: ReadOptions) -> tokio::sync::mpsc::Receiver<Frame> {
593        let (tx, rx) = tokio::sync::mpsc::channel(100);
594
595        let should_follow = matches!(
596            options.follow,
597            FollowOption::On | FollowOption::WithHeartbeat(_)
598        );
599
600        // Only take broadcast subscription if following. We initate the subscription here to
601        // ensure we don't miss any messages between historical processing and starting the
602        // broadcast subscription.
603        let broadcast_rx = if should_follow {
604            Some(self.broadcast_tx.subscribe())
605        } else {
606            None
607        };
608
609        // Only create done channel if we're doing historical processing
610        let done_rx = if !options.new {
611            let (done_tx, done_rx) = tokio::sync::oneshot::channel();
612            let tx_clone = tx.clone();
613            let store = self.clone();
614            let options = options.clone();
615            let should_follow_clone = should_follow;
616            let gc_tx = self.gc_tx.clone();
617
618            // Spawn OS thread to handle historical events
619            std::thread::spawn(move || {
620                let mut last_id = None;
621                let mut count = 0;
622
623                // Handle --last N: get the N most recent frames
624                let filter = TopicFilter::from_option(options.topic.as_deref());
625
626                if let Some(last_n) = options.last {
627                    let iter = store.iter_for_filter_rev(&filter);
628
629                    // Collect last N frames (in reverse order), skipping expired
630                    let mut frames: Vec<Frame> = Vec::with_capacity(last_n);
631                    for frame in iter {
632                        if let Some(TTL::Time(ttl)) = frame.ttl.as_ref() {
633                            if is_expired(&frame.id, ttl) {
634                                let _ = gc_tx.send(GCTask::Remove(frame.id));
635                                continue;
636                            }
637                        }
638                        frames.push(frame);
639                        if frames.len() >= last_n {
640                            break;
641                        }
642                    }
643
644                    // Reverse to chronological order and send
645                    for frame in frames.into_iter().rev() {
646                        last_id = Some(frame.id);
647                        count += 1;
648                        if tx_clone.blocking_send(frame).is_err() {
649                            return;
650                        }
651                    }
652                } else {
653                    // Normal forward iteration
654                    // Determine start bound: from (inclusive) takes precedence over after (exclusive)
655                    let start_bound = options
656                        .from
657                        .as_ref()
658                        .map(|id| (id, true))
659                        .or_else(|| options.after.as_ref().map(|id| (id, false)));
660
661                    let iter = store.iter_for_filter(&filter, start_bound);
662
663                    for frame in iter {
664                        if let Some(TTL::Time(ttl)) = frame.ttl.as_ref() {
665                            if is_expired(&frame.id, ttl) {
666                                let _ = gc_tx.send(GCTask::Remove(frame.id));
667                                continue;
668                            }
669                        }
670
671                        last_id = Some(frame.id);
672
673                        if let Some(limit) = options.limit {
674                            if count >= limit {
675                                return; // Exit early if limit reached
676                            }
677                        }
678
679                        if tx_clone.blocking_send(frame).is_err() {
680                            return;
681                        }
682                        count += 1;
683                    }
684                }
685
686                // Send threshold message if following
687                if should_follow_clone {
688                    let threshold = Frame::builder("xs.threshold")
689                        .id(scru128::new())
690                        .ttl(TTL::Ephemeral)
691                        .build();
692                    if tx_clone.blocking_send(threshold).is_err() {
693                        return;
694                    }
695                }
696
697                // Signal completion with the last seen ID and count
698                let _ = done_tx.send((last_id, count));
699            });
700
701            Some(done_rx)
702        } else {
703            None
704        };
705
706        // Handle broadcast subscription and heartbeat
707        if let Some(broadcast_rx) = broadcast_rx {
708            {
709                let tx = tx.clone();
710                let limit = options.limit;
711
712                tokio::spawn(async move {
713                    // If we have a done_rx, wait for historical processing
714                    let (last_id, mut count) = match done_rx {
715                        Some(done_rx) => match done_rx.await {
716                            Ok((id, count)) => (id, count),
717                            Err(_) => return, // Historical processing failed/cancelled
718                        },
719                        None => (None, 0),
720                    };
721
722                    let filter = TopicFilter::from_option(options.topic.as_deref());
723
724                    let mut broadcast_rx = broadcast_rx;
725                    while let Ok(frame) = broadcast_rx.recv().await {
726                        // Filter by topic (any-match against the parsed patterns)
727                        if !filter.matches(&frame.topic) {
728                            continue;
729                        }
730
731                        // Skip if we've already seen this frame during historical scan
732                        if let Some(last_scanned_id) = last_id {
733                            if frame.id <= last_scanned_id {
734                                continue;
735                            }
736                        }
737
738                        if tx.send(frame).await.is_err() {
739                            break;
740                        }
741
742                        if let Some(limit) = limit {
743                            count += 1;
744                            if count >= limit {
745                                break;
746                            }
747                        }
748                    }
749                });
750            }
751
752            // Handle heartbeat if requested
753            if let FollowOption::WithHeartbeat(duration) = options.follow {
754                let heartbeat_tx = tx;
755                tokio::spawn(async move {
756                    loop {
757                        tokio::time::sleep(duration).await;
758                        let frame = Frame::builder("xs.pulse")
759                            .id(scru128::new())
760                            .ttl(TTL::Ephemeral)
761                            .build();
762                        if heartbeat_tx.send(frame).await.is_err() {
763                            break;
764                        }
765                    }
766                });
767            }
768        }
769
770        rx
771    }
772
773    /// Replay matching historical frames as a blocking iterator.
774    ///
775    /// This honours the `topic`, `from`, `after`, `limit`, and `last` parts of
776    /// [`ReadOptions`] but ignores [`follow`](ReadOptions::follow): it never
777    /// streams live appends. Use [`read`](Store::read) when you need to follow.
778    ///
779    /// ```no_run
780    /// use xs::{Store, ReadOptions};
781    ///
782    /// # fn run(store: Store) {
783    /// let opts = ReadOptions::builder().topic("clip.*".to_string()).last(10).build();
784    /// for frame in store.read_sync(opts) {
785    ///     println!("{} {}", frame.id, frame.topic);
786    /// }
787    /// # }
788    /// ```
789    pub fn read_sync(&self, options: ReadOptions) -> impl Iterator<Item = Frame> + '_ {
790        let gc_tx = self.gc_tx.clone();
791
792        // Filter out expired frames
793        let filter_expired = move |frame: Frame, gc_tx: &UnboundedSender<GCTask>| {
794            if let Some(TTL::Time(ttl)) = frame.ttl.as_ref() {
795                if is_expired(&frame.id, ttl) {
796                    let _ = gc_tx.send(GCTask::Remove(frame.id));
797                    return None;
798                }
799            }
800            Some(frame)
801        };
802
803        let filter = TopicFilter::from_option(options.topic.as_deref());
804
805        let frames: Vec<Frame> = if let Some(last_n) = options.last {
806            // Handle --last N: get the N most recent frames
807            let iter = self.iter_for_filter_rev(&filter);
808
809            // Collect last N frames (in reverse order), skipping expired
810            let mut frames: Vec<Frame> = Vec::with_capacity(last_n);
811            for frame in iter {
812                if let Some(frame) = filter_expired(frame, &gc_tx) {
813                    frames.push(frame);
814                    if frames.len() >= last_n {
815                        break;
816                    }
817                }
818            }
819
820            // Reverse to chronological order
821            frames.reverse();
822            frames
823        } else {
824            // Normal forward iteration
825            let start_bound = options
826                .from
827                .as_ref()
828                .map(|id| (id, true))
829                .or_else(|| options.after.as_ref().map(|id| (id, false)));
830
831            let iter = self.iter_for_filter(&filter, start_bound);
832
833            iter.filter_map(|frame| filter_expired(frame, &gc_tx))
834                .take(options.limit.unwrap_or(usize::MAX))
835                .collect()
836        };
837
838        frames.into_iter()
839    }
840
841    /// Returns the current module state as of a given point in the stream.
842    ///
843    /// Scans all frames up to (and including) `as_of` and returns a mapping of
844    /// module name to CAS hash for the latest frame on each `xs.module.<name>`
845    /// topic.
846    /// Resolve the set of registered Nushell modules as of a given frame ID.
847    ///
848    /// Scans `xs.module.<name>` frames up to and including `as_of` and returns a
849    /// map from module name to the content hash of its latest definition. Used
850    /// by the scripting runtime; rarely needed when embedding the store
851    /// directly.
852    pub fn nu_modules_at(
853        &self,
854        as_of: &Scru128Id,
855    ) -> std::collections::HashMap<String, ssri::Integrity> {
856        let mut modules = std::collections::HashMap::new();
857        let options = ReadOptions::builder().follow(FollowOption::Off).build();
858        for frame in self.read_sync(options) {
859            if frame.id > *as_of {
860                break;
861            }
862            if let Some(hash) = frame.hash {
863                if let Some(name) = frame.topic.strip_prefix("xs.module.") {
864                    if !name.is_empty() {
865                        modules.insert(name.to_string(), hash);
866                    }
867                }
868            }
869        }
870        modules
871    }
872
873    /// Fetch a single frame by ID, or `None` if no such frame exists.
874    pub fn get(&self, id: &Scru128Id) -> Option<Frame> {
875        self.stream
876            .get(id.to_bytes())
877            .unwrap()
878            .map(|value| deserialize_frame((id.as_bytes(), value)))
879    }
880
881    /// Delete a frame and its topic index entries. Removing a frame that does
882    /// not exist is a no-op and returns `Ok(())`.
883    ///
884    /// This removes the stream entry only; any payload bytes in the
885    /// content-addressed store are left in place.
886    #[tracing::instrument(skip(self), fields(id = %id.to_string()))]
887    pub fn remove(&self, id: &Scru128Id) -> Result<(), crate::error::Error> {
888        let Some(frame) = self.get(id) else {
889            // Already deleted
890            return Ok(());
891        };
892
893        // Build topic key directly (no validation - frame already exists)
894        let mut topic_key = idx_topic_key_prefix(&frame.topic);
895        topic_key.extend(frame.id.as_bytes());
896
897        // Get prefix index keys for hierarchical queries
898        let prefix_keys = idx_topic_prefix_keys(&frame.topic, &frame.id);
899
900        let mut batch = self.db.batch();
901        batch.remove(&self.stream, id.as_bytes());
902        batch.remove(&self.idx_topic, topic_key);
903        for prefix_key in &prefix_keys {
904            batch.remove(&self.idx_topic, prefix_key);
905        }
906        batch.commit()?;
907        self.db.persist(PersistMode::SyncAll)?;
908        Ok(())
909    }
910
911    // --- Content-addressed store (CAS) ---
912    //
913    // Frame payloads live here, keyed by an integrity hash. The typical flow is
914    // `cas_insert` to store bytes, stash the returned hash on a `Frame`, then
915    // `cas_read` to retrieve them later. Each method has a `_sync` twin for
916    // blocking callers; the streaming `cas_reader`/`cas_writer` variants avoid
917    // buffering the whole payload in memory.
918
919    /// Open a streaming reader for the payload identified by `hash`.
920    pub async fn cas_reader(&self, hash: ssri::Integrity) -> cacache::Result<cacache::Reader> {
921        cacache::Reader::open_hash(&self.path.join("cacache"), hash).await
922    }
923
924    /// Blocking variant of [`cas_reader`](Store::cas_reader).
925    pub fn cas_reader_sync(&self, hash: ssri::Integrity) -> cacache::Result<cacache::SyncReader> {
926        cacache::SyncReader::open_hash(self.path.join("cacache"), hash)
927    }
928
929    /// Open a streaming writer; finish it to obtain the payload's integrity hash.
930    pub async fn cas_writer(&self) -> cacache::Result<cacache::Writer> {
931        cacache::WriteOpts::new()
932            .open_hash(&self.path.join("cacache"))
933            .await
934    }
935
936    /// Blocking variant of [`cas_writer`](Store::cas_writer).
937    pub fn cas_writer_sync(&self) -> cacache::Result<cacache::SyncWriter> {
938        cacache::WriteOpts::new().open_hash_sync(self.path.join("cacache"))
939    }
940
941    /// Store `content` and return its integrity hash, ready to attach to a
942    /// [`Frame::hash`].
943    pub async fn cas_insert(&self, content: impl AsRef<[u8]>) -> cacache::Result<ssri::Integrity> {
944        cacache::write_hash(&self.path.join("cacache"), content).await
945    }
946
947    /// Blocking variant of [`cas_insert`](Store::cas_insert).
948    pub fn cas_insert_sync(&self, content: impl AsRef<[u8]>) -> cacache::Result<ssri::Integrity> {
949        cacache::write_hash_sync(self.path.join("cacache"), content)
950    }
951
952    /// Convenience wrapper over [`cas_insert`](Store::cas_insert) for a byte slice.
953    pub async fn cas_insert_bytes(&self, bytes: &[u8]) -> cacache::Result<ssri::Integrity> {
954        self.cas_insert(bytes).await
955    }
956
957    /// Blocking variant of [`cas_insert_bytes`](Store::cas_insert_bytes).
958    pub fn cas_insert_bytes_sync(&self, bytes: &[u8]) -> cacache::Result<ssri::Integrity> {
959        self.cas_insert_sync(bytes)
960    }
961
962    /// Read back the full payload for `hash` into a `Vec<u8>`.
963    pub async fn cas_read(&self, hash: &ssri::Integrity) -> cacache::Result<Vec<u8>> {
964        cacache::read_hash(&self.path.join("cacache"), hash).await
965    }
966
967    /// Blocking variant of [`cas_read`](Store::cas_read).
968    pub fn cas_read_sync(&self, hash: &ssri::Integrity) -> cacache::Result<Vec<u8>> {
969        cacache::read_hash_sync(self.path.join("cacache"), hash)
970    }
971
972    /// Persist a frame exactly as given, including its existing
973    /// [`id`](Frame::id), without broadcasting it to live readers or scheduling
974    /// TTL garbage collection.
975    ///
976    /// Most callers want [`append`](Store::append) instead, which assigns a
977    /// fresh ID, handles ephemeral and `Last` retention, and notifies
978    /// subscribers. Use `insert_frame` only when you are reconstructing a stream
979    /// with predetermined IDs (for example when restoring a backup).
980    #[tracing::instrument(skip(self))]
981    pub fn insert_frame(&self, frame: &Frame) -> Result<(), crate::error::Error> {
982        let encoded: Vec<u8> = serde_json::to_vec(&frame).unwrap();
983
984        // Get the index topic key (also validates topic)
985        let topic_key = idx_topic_key_from_frame(frame)?;
986
987        // Get prefix index keys for hierarchical queries
988        let prefix_keys = idx_topic_prefix_keys(&frame.topic, &frame.id);
989
990        let mut batch = self.db.batch();
991        batch.insert(&self.stream, frame.id.as_bytes(), encoded);
992        batch.insert(&self.idx_topic, topic_key, b"");
993        for prefix_key in &prefix_keys {
994            batch.insert(&self.idx_topic, prefix_key, b"");
995        }
996        batch.commit()?;
997        self.db.persist(PersistMode::SyncAll)?;
998        Ok(())
999    }
1000
1001    /// Append a frame to the stream and return it with its freshly assigned
1002    /// [`id`](Frame::id).
1003    ///
1004    /// This is the primary write path. It:
1005    ///
1006    /// - assigns a new time-sortable ID (overwriting any ID on the input);
1007    /// - validates the topic (see [`validate_topic`]);
1008    /// - persists the frame, unless its [`TTL`] is [`TTL::Ephemeral`], in which
1009    ///   case it is only broadcast to live readers;
1010    /// - schedules garbage collection for [`TTL::Last`] retention;
1011    /// - broadcasts the frame to everyone currently in a following
1012    ///   [`read`](Store::read).
1013    ///
1014    /// Appends are serialized internally, so frames are assigned IDs and
1015    /// delivered to subscribers in a consistent order.
1016    ///
1017    /// # Errors
1018    ///
1019    /// Returns an error if the topic is invalid or the underlying write fails.
1020    ///
1021    /// ```no_run
1022    /// use xs::{Store, Frame, TTL};
1023    ///
1024    /// # async fn run(store: Store) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
1025    /// let hash = store.cas_insert("hello clipboard").await?;
1026    /// let frame = store.append(
1027    ///     Frame::builder("clip.add").hash(hash).ttl(TTL::Last(100)).build(),
1028    /// )?;
1029    /// println!("appended {}", frame.id);
1030    /// # Ok(())
1031    /// # }
1032    /// ```
1033    pub fn append(&self, mut frame: Frame) -> Result<Frame, crate::error::Error> {
1034        // Serialize all appends to ensure ID generation, write, and broadcast
1035        // happen atomically. This guarantees subscribers receive frames in
1036        // scru128 ID order.
1037        let _guard = self.append_lock.lock().unwrap();
1038
1039        frame.id = scru128::new();
1040
1041        // Check for null byte in topic (in case we're not storing the frame)
1042        idx_topic_key_from_frame(&frame)?;
1043
1044        // only store the frame if it's not ephemeral
1045        if frame.ttl != Some(TTL::Ephemeral) {
1046            self.insert_frame(&frame)?;
1047
1048            // If this is a Last TTL, schedule a gc task
1049            if let Some(TTL::Last(n)) = frame.ttl {
1050                let _ = self.gc_tx.send(GCTask::CheckLastTTL {
1051                    topic: frame.topic.clone(),
1052                    keep: n,
1053                });
1054            }
1055        }
1056
1057        let _ = self.broadcast_tx.send(frame.clone());
1058        Ok(frame)
1059    }
1060
1061    /// Iterate frames starting from a bound.
1062    /// `start` is `(id, inclusive)` where inclusive=true means >= and inclusive=false means >.
1063    fn iter_frames(
1064        &self,
1065        start: Option<(&Scru128Id, bool)>,
1066    ) -> Box<dyn Iterator<Item = Frame> + '_> {
1067        let range = match start {
1068            Some((id, true)) => (Bound::Included(id.as_bytes().to_vec()), Bound::Unbounded),
1069            Some((id, false)) => (Bound::Excluded(id.as_bytes().to_vec()), Bound::Unbounded),
1070            None => (Bound::Unbounded, Bound::Unbounded),
1071        };
1072
1073        Box::new(self.stream.range(range).filter_map(|guard| {
1074            let (key, value) = guard.into_inner().ok()?;
1075            Some(deserialize_frame((key, value)))
1076        }))
1077    }
1078
1079    /// Iterate frames in reverse order (most recent first).
1080    fn iter_frames_rev(&self) -> Box<dyn Iterator<Item = Frame> + '_> {
1081        Box::new(self.stream.iter().rev().filter_map(|guard| {
1082            let (key, value) = guard.into_inner().ok()?;
1083            Some(deserialize_frame((key, value)))
1084        }))
1085    }
1086
1087    /// Iterate frames by topic in reverse order (most recent first).
1088    fn iter_frames_by_topic_rev<'a>(
1089        &'a self,
1090        topic: &'a str,
1091    ) -> Box<dyn Iterator<Item = Frame> + 'a> {
1092        let prefix = idx_topic_key_prefix(topic);
1093        Box::new(
1094            self.idx_topic
1095                .prefix(prefix)
1096                .rev()
1097                .filter_map(move |guard| {
1098                    let key = guard.key().ok()?;
1099                    let frame_id = idx_topic_frame_id_from_key(&key);
1100                    self.get(&frame_id)
1101                }),
1102        )
1103    }
1104
1105    /// Iterate frames by topic prefix in reverse order (most recent first).
1106    fn iter_frames_by_topic_prefix_rev<'a>(
1107        &'a self,
1108        prefix: &'a str,
1109    ) -> Box<dyn Iterator<Item = Frame> + 'a> {
1110        let mut index_prefix = Vec::with_capacity(prefix.len() + 1);
1111        index_prefix.extend(prefix.as_bytes());
1112        index_prefix.push(NULL_DELIMITER);
1113
1114        Box::new(
1115            self.idx_topic
1116                .prefix(index_prefix)
1117                .rev()
1118                .filter_map(move |guard| {
1119                    let key = guard.key().ok()?;
1120                    let frame_id = idx_topic_frame_id_from_key(&key);
1121                    self.get(&frame_id)
1122                }),
1123        )
1124    }
1125
1126    fn iter_frames_by_topic<'a>(
1127        &'a self,
1128        topic: &'a str,
1129        start: Option<(&'a Scru128Id, bool)>,
1130    ) -> Box<dyn Iterator<Item = Frame> + 'a> {
1131        let prefix = idx_topic_key_prefix(topic);
1132        Box::new(self.idx_topic.prefix(prefix).filter_map(move |guard| {
1133            let key = guard.key().ok()?;
1134            let frame_id = idx_topic_frame_id_from_key(&key);
1135            if let Some((bound_id, inclusive)) = start {
1136                if inclusive {
1137                    if frame_id < *bound_id {
1138                        return None;
1139                    }
1140                } else if frame_id <= *bound_id {
1141                    return None;
1142                }
1143            }
1144            self.get(&frame_id)
1145        }))
1146    }
1147
1148    /// Iterate frames matching a topic prefix (for wildcard queries like "user.*").
1149    /// The prefix should include the trailing dot (e.g., "user." for "user.*").
1150    fn iter_frames_by_topic_prefix<'a>(
1151        &'a self,
1152        prefix: &'a str,
1153        start: Option<(&'a Scru128Id, bool)>,
1154    ) -> Box<dyn Iterator<Item = Frame> + 'a> {
1155        // Build index prefix: "user.\0" for scanning all "user.*" entries
1156        let mut index_prefix = Vec::with_capacity(prefix.len() + 1);
1157        index_prefix.extend(prefix.as_bytes());
1158        index_prefix.push(NULL_DELIMITER);
1159
1160        Box::new(
1161            self.idx_topic
1162                .prefix(index_prefix)
1163                .filter_map(move |guard| {
1164                    let key = guard.key().ok()?;
1165                    let frame_id = idx_topic_frame_id_from_key(&key);
1166                    if let Some((bound_id, inclusive)) = start {
1167                        if inclusive {
1168                            if frame_id < *bound_id {
1169                                return None;
1170                            }
1171                        } else if frame_id <= *bound_id {
1172                            return None;
1173                        }
1174                    }
1175                    self.get(&frame_id)
1176                }),
1177        )
1178    }
1179
1180    /// Forward iterator for a single pattern, using the topic index.
1181    fn iter_for_pattern<'a>(
1182        &'a self,
1183        pattern: &'a Pattern,
1184        start: Option<(&'a Scru128Id, bool)>,
1185    ) -> Box<dyn Iterator<Item = Frame> + 'a> {
1186        match pattern {
1187            Pattern::Exact(topic) => self.iter_frames_by_topic(topic, start),
1188            Pattern::Prefix(prefix) => self.iter_frames_by_topic_prefix(prefix, start),
1189        }
1190    }
1191
1192    /// Reverse (most recent first) iterator for a single pattern.
1193    fn iter_for_pattern_rev<'a>(
1194        &'a self,
1195        pattern: &'a Pattern,
1196    ) -> Box<dyn Iterator<Item = Frame> + 'a> {
1197        match pattern {
1198            Pattern::Exact(topic) => self.iter_frames_by_topic_rev(topic),
1199            Pattern::Prefix(prefix) => self.iter_frames_by_topic_prefix_rev(prefix),
1200        }
1201    }
1202
1203    /// Forward iterator for a parsed topic filter, ascending by frame id.
1204    /// A single pattern uses the indexed path directly; multiple patterns
1205    /// are k-way merged with dedupe for overlapping patterns.
1206    fn iter_for_filter<'a>(
1207        &'a self,
1208        filter: &'a TopicFilter,
1209        start: Option<(&'a Scru128Id, bool)>,
1210    ) -> Box<dyn Iterator<Item = Frame> + 'a> {
1211        match filter {
1212            TopicFilter::All => self.iter_frames(start),
1213            TopicFilter::Patterns(patterns) if patterns.len() == 1 => {
1214                self.iter_for_pattern(&patterns[0], start)
1215            }
1216            TopicFilter::Patterns(patterns) => {
1217                let iters = patterns
1218                    .iter()
1219                    .map(|p| self.iter_for_pattern(p, start))
1220                    .collect();
1221                Box::new(MergeById::new(iters, false))
1222            }
1223        }
1224    }
1225
1226    /// Reverse variant of [`iter_for_filter`](Store::iter_for_filter),
1227    /// descending by frame id (most recent first).
1228    fn iter_for_filter_rev<'a>(
1229        &'a self,
1230        filter: &'a TopicFilter,
1231    ) -> Box<dyn Iterator<Item = Frame> + 'a> {
1232        match filter {
1233            TopicFilter::All => self.iter_frames_rev(),
1234            TopicFilter::Patterns(patterns) if patterns.len() == 1 => {
1235                self.iter_for_pattern_rev(&patterns[0])
1236            }
1237            TopicFilter::Patterns(patterns) => {
1238                let iters = patterns
1239                    .iter()
1240                    .map(|p| self.iter_for_pattern_rev(p))
1241                    .collect();
1242                Box::new(MergeById::new(iters, true))
1243            }
1244        }
1245    }
1246}
1247
1248fn spawn_gc_worker(mut gc_rx: UnboundedReceiver<GCTask>, store: Store) {
1249    std::thread::spawn(move || {
1250        while let Some(task) = gc_rx.blocking_recv() {
1251            match task {
1252                GCTask::Remove(id) => {
1253                    let _ = store.remove(&id);
1254                }
1255
1256                GCTask::CheckLastTTL { topic, keep } => {
1257                    let prefix = idx_topic_key_prefix(&topic);
1258                    let frames_to_remove: Vec<_> = store
1259                        .idx_topic
1260                        .prefix(&prefix)
1261                        .rev() // Scan from newest to oldest
1262                        .skip(keep as usize)
1263                        .filter_map(|guard| {
1264                            let key = guard.key().ok()?;
1265                            Some(Scru128Id::from_bytes(
1266                                idx_topic_frame_id_from_key(&key).into(),
1267                            ))
1268                        })
1269                        .collect();
1270
1271                    for frame_id in frames_to_remove {
1272                        let _ = store.remove(&frame_id);
1273                    }
1274                }
1275
1276                GCTask::Drain(tx) => {
1277                    let _ = tx.send(());
1278                }
1279            }
1280        }
1281    });
1282}
1283
1284fn is_expired(id: &Scru128Id, ttl: &Duration) -> bool {
1285    let created_ms = id.timestamp();
1286    let expires_ms = created_ms.saturating_add(ttl.as_millis() as u64);
1287    let now_ms = std::time::SystemTime::now()
1288        .duration_since(std::time::UNIX_EPOCH)
1289        .unwrap()
1290        .as_millis() as u64;
1291
1292    now_ms >= expires_ms
1293}
1294
1295const NULL_DELIMITER: u8 = 0;
1296const MAX_TOPIC_LENGTH: usize = 255;
1297
1298/// Validate a frame topic (per ADR 0001).
1299///
1300/// A topic must be non-empty and at most 255 bytes, start with an ASCII letter
1301/// or `_`, and contain only `a-z A-Z 0-9 _ - .`. It may not end with `.` or
1302/// contain consecutive dots. [`Store::append`] runs this automatically; call it
1303/// directly to validate user input before building a [`Frame`].
1304///
1305/// ```
1306/// use xs::store::validate_topic;
1307///
1308/// assert!(validate_topic("clip.add").is_ok());
1309/// assert!(validate_topic("clip.").is_err());
1310/// ```
1311pub fn validate_topic(topic: &str) -> Result<(), crate::error::Error> {
1312    if topic.is_empty() {
1313        return Err("Topic cannot be empty".to_string().into());
1314    }
1315    if topic.len() > MAX_TOPIC_LENGTH {
1316        return Err(format!("Topic exceeds max length of {MAX_TOPIC_LENGTH} bytes").into());
1317    }
1318    if topic.ends_with('.') {
1319        return Err("Topic cannot end with '.'".to_string().into());
1320    }
1321    if topic.contains("..") {
1322        return Err("Topic cannot contain consecutive dots".to_string().into());
1323    }
1324
1325    let bytes = topic.as_bytes();
1326    let first = bytes[0];
1327    if !first.is_ascii_alphabetic() && first != b'_' {
1328        return Err("Topic must start with a-z, A-Z, or _".to_string().into());
1329    }
1330
1331    for &b in bytes {
1332        if !b.is_ascii_alphanumeric() && b != b'_' && b != b'-' && b != b'.' {
1333            return Err(format!(
1334                "Topic contains invalid character: '{}'. Allowed: a-z A-Z 0-9 _ - .",
1335                b as char
1336            )
1337            .into());
1338        }
1339    }
1340
1341    Ok(())
1342}
1343
1344/// Validates a topic query (for --topic flag).
1345/// Accepts a comma-separated list of patterns; each element is an exact
1346/// topic, a `prefix.*` wildcard, or `*` (match all).
1347pub fn validate_topic_query(spec: &str) -> Result<(), crate::error::Error> {
1348    let mut seen = false;
1349    for part in spec.split(',') {
1350        let part = part.trim();
1351        if part.is_empty() {
1352            continue;
1353        }
1354        seen = true;
1355        validate_topic_pattern(part)?;
1356    }
1357    if !seen {
1358        return Err("Topic query cannot be empty".to_string().into());
1359    }
1360    Ok(())
1361}
1362
1363/// Validate a single topic pattern: "*", "prefix.*", or an exact topic.
1364fn validate_topic_pattern(topic: &str) -> Result<(), crate::error::Error> {
1365    if topic == "*" {
1366        return Ok(());
1367    }
1368    if let Some(prefix) = topic.strip_suffix(".*") {
1369        // Validate the prefix part (e.g., "user" in "user.*")
1370        // Prefix can be empty edge case: ".*" is not valid
1371        if prefix.is_empty() {
1372            return Err("Wildcard '.*' requires a prefix".to_string().into());
1373        }
1374        validate_topic(prefix)
1375    } else {
1376        validate_topic(topic)
1377    }
1378}
1379
1380/// Generate prefix index keys for hierarchical topic queries.
1381/// For topic "user.id1.messages", returns keys for prefixes "user." and "user.id1."
1382fn idx_topic_prefix_keys(topic: &str, frame_id: &scru128::Scru128Id) -> Vec<Vec<u8>> {
1383    let mut keys = Vec::new();
1384    let mut pos = 0;
1385    while let Some(dot_pos) = topic[pos..].find('.') {
1386        let prefix = &topic[..pos + dot_pos + 1]; // include the dot
1387        let mut key = Vec::with_capacity(prefix.len() + 1 + 16);
1388        key.extend(prefix.as_bytes());
1389        key.push(NULL_DELIMITER);
1390        key.extend(frame_id.as_bytes());
1391        keys.push(key);
1392        pos += dot_pos + 1;
1393    }
1394    keys
1395}
1396
1397fn idx_topic_key_prefix(topic: &str) -> Vec<u8> {
1398    let mut v = Vec::with_capacity(topic.len() + 1); // topic bytes + delimiter
1399    v.extend(topic.as_bytes()); // topic string as UTF-8 bytes
1400    v.push(NULL_DELIMITER); // Delimiter for variable-sized keys
1401    v
1402}
1403
1404pub(crate) fn idx_topic_key_from_frame(frame: &Frame) -> Result<Vec<u8>, crate::error::Error> {
1405    validate_topic(&frame.topic)?;
1406    let mut v = idx_topic_key_prefix(&frame.topic);
1407    v.extend(frame.id.as_bytes());
1408    Ok(v)
1409}
1410
1411fn idx_topic_frame_id_from_key(key: &[u8]) -> Scru128Id {
1412    let frame_id_bytes = &key[key.len() - 16..];
1413    Scru128Id::from_bytes(frame_id_bytes.try_into().unwrap())
1414}
1415
1416fn deserialize_frame<B1: AsRef<[u8]> + std::fmt::Debug, B2: AsRef<[u8]>>(
1417    record: (B1, B2),
1418) -> Frame {
1419    serde_json::from_slice(record.1.as_ref()).unwrap_or_else(|e| {
1420        // Try to convert the key to a Scru128Id and print in a format that can be copied for deletion
1421        let key_bytes = record.0.as_ref();
1422        if key_bytes.len() == 16 {
1423            if let Ok(bytes) = key_bytes.try_into() {
1424                let id = Scru128Id::from_bytes(bytes);
1425                eprintln!("CORRUPTED_RECORD_ID: {id}");
1426            }
1427        }
1428        let key = std::str::from_utf8(record.0.as_ref()).unwrap();
1429        let value = std::str::from_utf8(record.1.as_ref()).unwrap();
1430        panic!("Failed to deserialize frame: {e} {key} {value}")
1431    })
1432}