Skip to main content

xs/store/
mod.rs

1//! The event stream store.
2//!
3//! A [`Store`] is an append-only log of [`Frame`]s persisted to a directory on
4//! disk. Append events with [`Store::append`], replay and follow them with
5//! [`Store::read`], and stash payload bytes in the content-addressed store with
6//! the `cas_*` methods.
7//!
8//! ## Topics
9//!
10//! Every frame has a dot-delimited `topic` (for example `clip.add`). Topics form
11//! a hierarchy: a reader can ask for an exact topic, a prefix wildcard like
12//! `clip.*`, or `*` for everything. See [`validate_topic`](crate::store::validate_topic)
13//! for the allowed characters and [`ReadOptions::topic`] for querying.
14//!
15//! ## Retention
16//!
17//! Each frame carries a [`TTL`] that controls how long it is kept: forever, for
18//! a fixed duration, only the last N per topic, or ephemeral (broadcast to live
19//! readers but never stored).
20
21mod ttl;
22pub use ttl::*;
23
24#[cfg(test)]
25mod tests;
26
27use std::ops::Bound;
28use std::path::PathBuf;
29use std::time::Duration;
30
31use tokio::sync::broadcast;
32use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender};
33
34use std::sync::{Arc, Mutex};
35
36use scru128::Scru128Id;
37
38use serde::{Deserialize, Deserializer, Serialize};
39
40use fjall::{
41    config::{BlockSizePolicy, HashRatioPolicy},
42    Database, Error as FjallError, Keyspace, KeyspaceCreateOptions, PersistMode,
43};
44
45/// Error returned when opening a [`Store`].
46#[derive(Debug)]
47pub enum StoreError {
48    /// The store directory is already open in another process.
49    Locked,
50    /// An error from the underlying `fjall` database.
51    Other(FjallError),
52}
53
54impl std::fmt::Display for StoreError {
55    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
56        match self {
57            StoreError::Locked => write!(f, "Store is locked by another process"),
58            StoreError::Other(e) => write!(f, "{e}"),
59        }
60    }
61}
62
63impl std::error::Error for StoreError {}
64
65/// A single event in the stream.
66///
67/// A frame is metadata; the payload bytes (if any) live in the content-addressed
68/// store and are referenced by [`hash`](Frame::hash). Build one with the
69/// [`bon`](https://docs.rs/bon) builder, where the topic is the required
70/// starting argument:
71///
72/// ```
73/// use xs::{Frame, TTL};
74///
75/// let frame = Frame::builder("clip.add")
76///     .meta(serde_json::json!({ "source": "keyboard" }))
77///     .ttl(TTL::Last(100))
78///     .build();
79///
80/// assert_eq!(frame.topic, "clip.add");
81/// ```
82///
83/// The [`id`](Frame::id) is assigned by [`Store::append`] at write time, so the
84/// value you set on a builder is ignored when appending.
85#[derive(PartialEq, Eq, Serialize, Deserialize, Clone, Default, bon::Builder)]
86pub struct Frame {
87    /// Dot-delimited topic this frame belongs to (for example `clip.add`).
88    ///
89    /// Must satisfy [`validate_topic`]; [`Store::append`] rejects invalid topics.
90    #[builder(start_fn, into)]
91    pub topic: String,
92    /// Time-sortable identifier. Assigned by [`Store::append`]; any value set
93    /// before appending is overwritten.
94    #[builder(default)]
95    pub id: Scru128Id,
96    /// Integrity hash of the payload in the content-addressed store, if this
97    /// frame has one. Produce it with [`Store::cas_insert`].
98    pub hash: Option<ssri::Integrity>,
99    /// Arbitrary JSON metadata carried inline with the frame.
100    pub meta: Option<serde_json::Value>,
101    /// Retention policy for this frame. Defaults to [`TTL::Forever`] when unset.
102    pub ttl: Option<TTL>,
103}
104
105use std::fmt;
106
107impl fmt::Debug for Frame {
108    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
109        f.debug_struct("Frame")
110            .field("id", &format!("{id}", id = self.id))
111            .field("topic", &self.topic)
112            .field("hash", &self.hash.as_ref().map(|x| format!("{x}")))
113            .field("meta", &self.meta)
114            .field("ttl", &self.ttl)
115            .finish()
116    }
117}
118
119impl<'de> Deserialize<'de> for FollowOption {
120    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
121    where
122        D: Deserializer<'de>,
123    {
124        let s: String = Deserialize::deserialize(deserializer)?;
125        if s.is_empty() || s == "yes" {
126            Ok(FollowOption::On)
127        } else if let Ok(duration) = s.parse::<u64>() {
128            Ok(FollowOption::WithHeartbeat(Duration::from_millis(duration)))
129        } else {
130            match s.as_str() {
131                "true" => Ok(FollowOption::On),
132                "false" | "no" => Ok(FollowOption::Off),
133                _ => Err(serde::de::Error::custom("Invalid value for follow option")),
134            }
135        }
136    }
137}
138
139fn deserialize_bool<'de, D>(deserializer: D) -> Result<bool, D::Error>
140where
141    D: Deserializer<'de>,
142{
143    let s: String = Deserialize::deserialize(deserializer)?;
144    match s.as_str() {
145        "false" | "no" | "0" => Ok(false),
146        _ => Ok(true),
147    }
148}
149
150/// Options controlling a [`Store::read`] or [`Store::read_sync`] call.
151///
152/// Defaults replay every stored frame once, oldest first, then stop. Build a
153/// query with the [`bon`](https://docs.rs/bon) builder:
154///
155/// ```
156/// use xs::{ReadOptions, FollowOption};
157///
158/// // Replay history for one topic, then keep streaming new appends.
159/// let opts = ReadOptions::builder()
160///     .topic("clip.*".to_string())
161///     .follow(FollowOption::On)
162///     .build();
163/// ```
164#[derive(PartialEq, Deserialize, Clone, Debug, Default, bon::Builder)]
165pub struct ReadOptions {
166    /// Whether to keep streaming live appends after history is replayed.
167    /// Defaults to [`FollowOption::Off`].
168    #[serde(default)]
169    #[builder(default)]
170    pub follow: FollowOption,
171    /// Skip historical frames and emit only appends made after the read starts.
172    #[serde(default, deserialize_with = "deserialize_bool")]
173    #[builder(default)]
174    pub new: bool,
175    /// Start after this ID (exclusive).
176    #[serde(rename = "after")]
177    pub after: Option<Scru128Id>,
178    /// Start from this ID (inclusive).
179    pub from: Option<Scru128Id>,
180    /// Stop after emitting this many historical frames.
181    pub limit: Option<usize>,
182    /// Return the last N frames (most recent), in chronological order.
183    pub last: Option<usize>,
184    /// Restrict to a topic: an exact name, a `prefix.*` wildcard, or `*` for all.
185    pub topic: Option<String>,
186}
187
188impl ReadOptions {
189    /// Parse options from a URL query string (the form used by the HTTP API),
190    /// for example `follow=true&topic=clip.*&last=10`. `None` yields the
191    /// defaults.
192    pub fn from_query(query: Option<&str>) -> Result<Self, crate::error::Error> {
193        match query {
194            Some(q) => Ok(serde_urlencoded::from_str(q)?),
195            None => Ok(Self::default()),
196        }
197    }
198
199    /// Render these options back into a URL query string, the inverse of
200    /// [`from_query`](ReadOptions::from_query). Returns an empty string when no
201    /// options are set.
202    pub fn to_query_string(&self) -> String {
203        let mut params = Vec::new();
204
205        // Add follow parameter with heartbeat if specified
206        match self.follow {
207            FollowOption::Off => {}
208            FollowOption::On => params.push(("follow", "true".to_string())),
209            FollowOption::WithHeartbeat(duration) => {
210                params.push(("follow", duration.as_millis().to_string()));
211            }
212        }
213
214        // Add new if true
215        if self.new {
216            params.push(("new", "true".to_string()));
217        }
218
219        // Add after if present
220        if let Some(after) = self.after {
221            params.push(("after", after.to_string()));
222        }
223
224        // Add from if present
225        if let Some(from) = self.from {
226            params.push(("from", from.to_string()));
227        }
228
229        // Add limit if present
230        if let Some(limit) = self.limit {
231            params.push(("limit", limit.to_string()));
232        }
233
234        // Add last if present
235        if let Some(last) = self.last {
236            params.push(("last", last.to_string()));
237        }
238
239        if let Some(topic) = &self.topic {
240            params.push(("topic", topic.clone()));
241        }
242
243        // Return empty string if no params
244        if params.is_empty() {
245            String::new()
246        } else {
247            url::form_urlencoded::Serializer::new(String::new())
248                .extend_pairs(params)
249                .finish()
250        }
251    }
252}
253
254/// Whether a read keeps streaming after history is replayed.
255#[derive(Default, PartialEq, Clone, Debug)]
256pub enum FollowOption {
257    /// Stop once historical frames are exhausted.
258    #[default]
259    Off,
260    /// Replay history, then stream live appends indefinitely.
261    On,
262    /// Like [`On`](FollowOption::On), but also emit a periodic heartbeat frame
263    /// at the given interval so idle readers can detect a live connection.
264    WithHeartbeat(Duration),
265}
266
267#[derive(Debug)]
268enum GCTask {
269    Remove(Scru128Id),
270    CheckLastTTL { topic: String, keep: u32 },
271    Drain(tokio::sync::oneshot::Sender<()>),
272}
273
274/// An append-only event stream backed by a directory on disk.
275///
276/// Open one with [`new`](Store::new). `Store` is cheaply [`Clone`]able: every
277/// clone shares the same underlying database, broadcast channel, and
278/// content-addressed store, so clone it freely across tasks and threads instead
279/// of wrapping it in an `Arc`.
280///
281/// See the [module docs](crate::store) for the topic and retention model.
282#[derive(Clone)]
283pub struct Store {
284    /// Directory backing this store (the path passed to [`new`](Store::new)).
285    pub path: PathBuf,
286    db: Database,
287    stream: Keyspace,
288    idx_topic: Keyspace,
289    broadcast_tx: broadcast::Sender<Frame>,
290    gc_tx: UnboundedSender<GCTask>,
291    append_lock: Arc<Mutex<()>>,
292}
293
294impl Store {
295    /// Open the store at `path`, creating the directory layout if it does not
296    /// exist. Spawns a background worker that garbage-collects expired frames.
297    ///
298    /// # Errors
299    ///
300    /// Returns [`StoreError::Locked`] if another process already holds the
301    /// store open, or [`StoreError::Other`] for any other database error.
302    ///
303    /// ```no_run
304    /// use xs::Store;
305    ///
306    /// let store = Store::new("./clipboard-store".into())?;
307    /// # Ok::<(), xs::StoreError>(())
308    /// ```
309    pub fn new(path: PathBuf) -> Result<Store, StoreError> {
310        let db = match Database::builder(path.join("fjall"))
311            .cache_size(32 * 1024 * 1024) // 32 MiB
312            .worker_threads(1)
313            .open()
314        {
315            Ok(db) => db,
316            Err(FjallError::Locked) => return Err(StoreError::Locked),
317            Err(e) => return Err(StoreError::Other(e)),
318        };
319
320        // Options for stream keyspace: point reads by frame ID
321        let stream_opts = || {
322            KeyspaceCreateOptions::default()
323                .max_memtable_size(8 * 1024 * 1024) // 8 MiB
324                .data_block_size_policy(BlockSizePolicy::all(16 * 1024)) // 16 KiB
325                .data_block_hash_ratio_policy(HashRatioPolicy::all(8.0))
326                .expect_point_read_hits(true)
327        };
328
329        // Options for idx_topic keyspace: prefix scans only
330        let idx_opts = || {
331            KeyspaceCreateOptions::default()
332                .max_memtable_size(8 * 1024 * 1024) // 8 MiB
333                .data_block_size_policy(BlockSizePolicy::all(16 * 1024)) // 16 KiB
334                .data_block_hash_ratio_policy(HashRatioPolicy::all(0.0)) // no point reads
335                .expect_point_read_hits(true)
336        };
337
338        let stream = db.keyspace("stream", stream_opts).unwrap();
339        let idx_topic = db.keyspace("idx_topic", idx_opts).unwrap();
340
341        let (broadcast_tx, _) = broadcast::channel(1024);
342        let (gc_tx, gc_rx) = mpsc::unbounded_channel();
343
344        let store = Store {
345            path: path.clone(),
346            db,
347            stream,
348            idx_topic,
349            broadcast_tx,
350            gc_tx,
351            append_lock: Arc::new(Mutex::new(())),
352        };
353
354        // Spawn gc worker thread
355        spawn_gc_worker(gc_rx, store.clone());
356
357        Ok(store)
358    }
359
360    /// Wait until the background garbage-collection worker has processed every
361    /// task queued so far. Useful in tests to observe TTL eviction
362    /// deterministically.
363    pub async fn wait_for_gc(&self) {
364        let (tx, rx) = tokio::sync::oneshot::channel();
365        let _ = self.gc_tx.send(GCTask::Drain(tx));
366        let _ = rx.await;
367    }
368
369    /// Read frames into an async channel according to `options`.
370    ///
371    /// By default this replays matching historical frames oldest-first and then
372    /// closes the channel. With [`FollowOption::On`] it instead keeps the
373    /// channel open and streams new appends as they arrive. When following, a
374    /// single ephemeral `xs.threshold` frame is emitted to mark the boundary
375    /// between replayed history and live events.
376    ///
377    /// The returned [`Receiver`](tokio::sync::mpsc::Receiver) is bounded;
378    /// dropping it stops the read. For a blocking, non-async caller use
379    /// [`read_sync`](Store::read_sync).
380    ///
381    /// ```no_run
382    /// use xs::{Store, ReadOptions, FollowOption};
383    ///
384    /// # async fn run(store: Store) {
385    /// let mut rx = store
386    ///     .read(ReadOptions::builder().follow(FollowOption::On).build())
387    ///     .await;
388    /// while let Some(frame) = rx.recv().await {
389    ///     if frame.topic == "xs.threshold" {
390    ///         // caught up to live; everything after this is new
391    ///         continue;
392    ///     }
393    ///     println!("{} {}", frame.id, frame.topic);
394    /// }
395    /// # }
396    /// ```
397    #[tracing::instrument(skip(self))]
398    pub async fn read(&self, options: ReadOptions) -> tokio::sync::mpsc::Receiver<Frame> {
399        let (tx, rx) = tokio::sync::mpsc::channel(100);
400
401        let should_follow = matches!(
402            options.follow,
403            FollowOption::On | FollowOption::WithHeartbeat(_)
404        );
405
406        // Only take broadcast subscription if following. We initate the subscription here to
407        // ensure we don't miss any messages between historical processing and starting the
408        // broadcast subscription.
409        let broadcast_rx = if should_follow {
410            Some(self.broadcast_tx.subscribe())
411        } else {
412            None
413        };
414
415        // Only create done channel if we're doing historical processing
416        let done_rx = if !options.new {
417            let (done_tx, done_rx) = tokio::sync::oneshot::channel();
418            let tx_clone = tx.clone();
419            let store = self.clone();
420            let options = options.clone();
421            let should_follow_clone = should_follow;
422            let gc_tx = self.gc_tx.clone();
423
424            // Spawn OS thread to handle historical events
425            std::thread::spawn(move || {
426                let mut last_id = None;
427                let mut count = 0;
428
429                // Handle --last N: get the N most recent frames
430                if let Some(last_n) = options.last {
431                    let iter: Box<dyn Iterator<Item = Frame>> = match options.topic.as_deref() {
432                        None | Some("*") => store.iter_frames_rev(),
433                        Some(topic) if topic.ends_with(".*") => {
434                            let prefix = &topic[..topic.len() - 1];
435                            store.iter_frames_by_topic_prefix_rev(prefix)
436                        }
437                        Some(topic) => store.iter_frames_by_topic_rev(topic),
438                    };
439
440                    // Collect last N frames (in reverse order), skipping expired
441                    let mut frames: Vec<Frame> = Vec::with_capacity(last_n);
442                    for frame in iter {
443                        if let Some(TTL::Time(ttl)) = frame.ttl.as_ref() {
444                            if is_expired(&frame.id, ttl) {
445                                let _ = gc_tx.send(GCTask::Remove(frame.id));
446                                continue;
447                            }
448                        }
449                        frames.push(frame);
450                        if frames.len() >= last_n {
451                            break;
452                        }
453                    }
454
455                    // Reverse to chronological order and send
456                    for frame in frames.into_iter().rev() {
457                        last_id = Some(frame.id);
458                        count += 1;
459                        if tx_clone.blocking_send(frame).is_err() {
460                            return;
461                        }
462                    }
463                } else {
464                    // Normal forward iteration
465                    // Determine start bound: from (inclusive) takes precedence over after (exclusive)
466                    let start_bound = options
467                        .from
468                        .as_ref()
469                        .map(|id| (id, true))
470                        .or_else(|| options.after.as_ref().map(|id| (id, false)));
471
472                    let iter: Box<dyn Iterator<Item = Frame>> = match options.topic.as_deref() {
473                        None | Some("*") => store.iter_frames(start_bound),
474                        Some(topic) if topic.ends_with(".*") => {
475                            // Wildcard: "user.*" -> prefix "user."
476                            let prefix = &topic[..topic.len() - 1]; // strip "*", keep "."
477                            store.iter_frames_by_topic_prefix(prefix, start_bound)
478                        }
479                        Some(topic) => store.iter_frames_by_topic(topic, start_bound),
480                    };
481
482                    for frame in iter {
483                        if let Some(TTL::Time(ttl)) = frame.ttl.as_ref() {
484                            if is_expired(&frame.id, ttl) {
485                                let _ = gc_tx.send(GCTask::Remove(frame.id));
486                                continue;
487                            }
488                        }
489
490                        last_id = Some(frame.id);
491
492                        if let Some(limit) = options.limit {
493                            if count >= limit {
494                                return; // Exit early if limit reached
495                            }
496                        }
497
498                        if tx_clone.blocking_send(frame).is_err() {
499                            return;
500                        }
501                        count += 1;
502                    }
503                }
504
505                // Send threshold message if following
506                if should_follow_clone {
507                    let threshold = Frame::builder("xs.threshold")
508                        .id(scru128::new())
509                        .ttl(TTL::Ephemeral)
510                        .build();
511                    if tx_clone.blocking_send(threshold).is_err() {
512                        return;
513                    }
514                }
515
516                // Signal completion with the last seen ID and count
517                let _ = done_tx.send((last_id, count));
518            });
519
520            Some(done_rx)
521        } else {
522            None
523        };
524
525        // Handle broadcast subscription and heartbeat
526        if let Some(broadcast_rx) = broadcast_rx {
527            {
528                let tx = tx.clone();
529                let limit = options.limit;
530
531                tokio::spawn(async move {
532                    // If we have a done_rx, wait for historical processing
533                    let (last_id, mut count) = match done_rx {
534                        Some(done_rx) => match done_rx.await {
535                            Ok((id, count)) => (id, count),
536                            Err(_) => return, // Historical processing failed/cancelled
537                        },
538                        None => (None, 0),
539                    };
540
541                    let mut broadcast_rx = broadcast_rx;
542                    while let Ok(frame) = broadcast_rx.recv().await {
543                        // Filter by topic (exact match or wildcard)
544                        match options.topic.as_deref() {
545                            None | Some("*") => {}
546                            Some(topic) if topic.ends_with(".*") => {
547                                let prefix = &topic[..topic.len() - 1]; // "user.*" -> "user."
548                                if !frame.topic.starts_with(prefix) {
549                                    continue;
550                                }
551                            }
552                            Some(topic) => {
553                                if frame.topic != topic {
554                                    continue;
555                                }
556                            }
557                        }
558
559                        // Skip if we've already seen this frame during historical scan
560                        if let Some(last_scanned_id) = last_id {
561                            if frame.id <= last_scanned_id {
562                                continue;
563                            }
564                        }
565
566                        if tx.send(frame).await.is_err() {
567                            break;
568                        }
569
570                        if let Some(limit) = limit {
571                            count += 1;
572                            if count >= limit {
573                                break;
574                            }
575                        }
576                    }
577                });
578            }
579
580            // Handle heartbeat if requested
581            if let FollowOption::WithHeartbeat(duration) = options.follow {
582                let heartbeat_tx = tx;
583                tokio::spawn(async move {
584                    loop {
585                        tokio::time::sleep(duration).await;
586                        let frame = Frame::builder("xs.pulse")
587                            .id(scru128::new())
588                            .ttl(TTL::Ephemeral)
589                            .build();
590                        if heartbeat_tx.send(frame).await.is_err() {
591                            break;
592                        }
593                    }
594                });
595            }
596        }
597
598        rx
599    }
600
601    /// Replay matching historical frames as a blocking iterator.
602    ///
603    /// This honours the `topic`, `from`, `after`, `limit`, and `last` parts of
604    /// [`ReadOptions`] but ignores [`follow`](ReadOptions::follow): it never
605    /// streams live appends. Use [`read`](Store::read) when you need to follow.
606    ///
607    /// ```no_run
608    /// use xs::{Store, ReadOptions};
609    ///
610    /// # fn run(store: Store) {
611    /// let opts = ReadOptions::builder().topic("clip.*".to_string()).last(10).build();
612    /// for frame in store.read_sync(opts) {
613    ///     println!("{} {}", frame.id, frame.topic);
614    /// }
615    /// # }
616    /// ```
617    pub fn read_sync(&self, options: ReadOptions) -> impl Iterator<Item = Frame> + '_ {
618        let gc_tx = self.gc_tx.clone();
619
620        // Filter out expired frames
621        let filter_expired = move |frame: Frame, gc_tx: &UnboundedSender<GCTask>| {
622            if let Some(TTL::Time(ttl)) = frame.ttl.as_ref() {
623                if is_expired(&frame.id, ttl) {
624                    let _ = gc_tx.send(GCTask::Remove(frame.id));
625                    return None;
626                }
627            }
628            Some(frame)
629        };
630
631        let frames: Vec<Frame> = if let Some(last_n) = options.last {
632            // Handle --last N: get the N most recent frames
633            let iter: Box<dyn Iterator<Item = Frame>> = match options.topic.as_deref() {
634                None | Some("*") => self.iter_frames_rev(),
635                Some(topic) if topic.ends_with(".*") => {
636                    let prefix = &topic[..topic.len() - 1];
637                    self.iter_frames_by_topic_prefix_rev(prefix)
638                }
639                Some(topic) => self.iter_frames_by_topic_rev(topic),
640            };
641
642            // Collect last N frames (in reverse order), skipping expired
643            let mut frames: Vec<Frame> = Vec::with_capacity(last_n);
644            for frame in iter {
645                if let Some(frame) = filter_expired(frame, &gc_tx) {
646                    frames.push(frame);
647                    if frames.len() >= last_n {
648                        break;
649                    }
650                }
651            }
652
653            // Reverse to chronological order
654            frames.reverse();
655            frames
656        } else {
657            // Normal forward iteration
658            let start_bound = options
659                .from
660                .as_ref()
661                .map(|id| (id, true))
662                .or_else(|| options.after.as_ref().map(|id| (id, false)));
663
664            let iter: Box<dyn Iterator<Item = Frame>> = match options.topic.as_deref() {
665                None | Some("*") => self.iter_frames(start_bound),
666                Some(topic) if topic.ends_with(".*") => {
667                    let prefix = &topic[..topic.len() - 1];
668                    self.iter_frames_by_topic_prefix(prefix, start_bound)
669                }
670                Some(topic) => self.iter_frames_by_topic(topic, start_bound),
671            };
672
673            iter.filter_map(|frame| filter_expired(frame, &gc_tx))
674                .take(options.limit.unwrap_or(usize::MAX))
675                .collect()
676        };
677
678        frames.into_iter()
679    }
680
681    /// Returns the current module state as of a given point in the stream.
682    ///
683    /// Scans all frames up to (and including) `as_of` and returns a mapping of
684    /// module name to CAS hash for the latest frame on each `xs.module.<name>`
685    /// topic.
686    /// Resolve the set of registered Nushell modules as of a given frame ID.
687    ///
688    /// Scans `xs.module.<name>` frames up to and including `as_of` and returns a
689    /// map from module name to the content hash of its latest definition. Used
690    /// by the scripting runtime; rarely needed when embedding the store
691    /// directly.
692    pub fn nu_modules_at(
693        &self,
694        as_of: &Scru128Id,
695    ) -> std::collections::HashMap<String, ssri::Integrity> {
696        let mut modules = std::collections::HashMap::new();
697        let options = ReadOptions::builder().follow(FollowOption::Off).build();
698        for frame in self.read_sync(options) {
699            if frame.id > *as_of {
700                break;
701            }
702            if let Some(hash) = frame.hash {
703                if let Some(name) = frame.topic.strip_prefix("xs.module.") {
704                    if !name.is_empty() {
705                        modules.insert(name.to_string(), hash);
706                    }
707                }
708            }
709        }
710        modules
711    }
712
713    /// Fetch a single frame by ID, or `None` if no such frame exists.
714    pub fn get(&self, id: &Scru128Id) -> Option<Frame> {
715        self.stream
716            .get(id.to_bytes())
717            .unwrap()
718            .map(|value| deserialize_frame((id.as_bytes(), value)))
719    }
720
721    /// Delete a frame and its topic index entries. Removing a frame that does
722    /// not exist is a no-op and returns `Ok(())`.
723    ///
724    /// This removes the stream entry only; any payload bytes in the
725    /// content-addressed store are left in place.
726    #[tracing::instrument(skip(self), fields(id = %id.to_string()))]
727    pub fn remove(&self, id: &Scru128Id) -> Result<(), crate::error::Error> {
728        let Some(frame) = self.get(id) else {
729            // Already deleted
730            return Ok(());
731        };
732
733        // Build topic key directly (no validation - frame already exists)
734        let mut topic_key = idx_topic_key_prefix(&frame.topic);
735        topic_key.extend(frame.id.as_bytes());
736
737        // Get prefix index keys for hierarchical queries
738        let prefix_keys = idx_topic_prefix_keys(&frame.topic, &frame.id);
739
740        let mut batch = self.db.batch();
741        batch.remove(&self.stream, id.as_bytes());
742        batch.remove(&self.idx_topic, topic_key);
743        for prefix_key in &prefix_keys {
744            batch.remove(&self.idx_topic, prefix_key);
745        }
746        batch.commit()?;
747        self.db.persist(PersistMode::SyncAll)?;
748        Ok(())
749    }
750
751    // --- Content-addressed store (CAS) ---
752    //
753    // Frame payloads live here, keyed by an integrity hash. The typical flow is
754    // `cas_insert` to store bytes, stash the returned hash on a `Frame`, then
755    // `cas_read` to retrieve them later. Each method has a `_sync` twin for
756    // blocking callers; the streaming `cas_reader`/`cas_writer` variants avoid
757    // buffering the whole payload in memory.
758
759    /// Open a streaming reader for the payload identified by `hash`.
760    pub async fn cas_reader(&self, hash: ssri::Integrity) -> cacache::Result<cacache::Reader> {
761        cacache::Reader::open_hash(&self.path.join("cacache"), hash).await
762    }
763
764    /// Blocking variant of [`cas_reader`](Store::cas_reader).
765    pub fn cas_reader_sync(&self, hash: ssri::Integrity) -> cacache::Result<cacache::SyncReader> {
766        cacache::SyncReader::open_hash(self.path.join("cacache"), hash)
767    }
768
769    /// Open a streaming writer; finish it to obtain the payload's integrity hash.
770    pub async fn cas_writer(&self) -> cacache::Result<cacache::Writer> {
771        cacache::WriteOpts::new()
772            .open_hash(&self.path.join("cacache"))
773            .await
774    }
775
776    /// Blocking variant of [`cas_writer`](Store::cas_writer).
777    pub fn cas_writer_sync(&self) -> cacache::Result<cacache::SyncWriter> {
778        cacache::WriteOpts::new().open_hash_sync(self.path.join("cacache"))
779    }
780
781    /// Store `content` and return its integrity hash, ready to attach to a
782    /// [`Frame::hash`].
783    pub async fn cas_insert(&self, content: impl AsRef<[u8]>) -> cacache::Result<ssri::Integrity> {
784        cacache::write_hash(&self.path.join("cacache"), content).await
785    }
786
787    /// Blocking variant of [`cas_insert`](Store::cas_insert).
788    pub fn cas_insert_sync(&self, content: impl AsRef<[u8]>) -> cacache::Result<ssri::Integrity> {
789        cacache::write_hash_sync(self.path.join("cacache"), content)
790    }
791
792    /// Convenience wrapper over [`cas_insert`](Store::cas_insert) for a byte slice.
793    pub async fn cas_insert_bytes(&self, bytes: &[u8]) -> cacache::Result<ssri::Integrity> {
794        self.cas_insert(bytes).await
795    }
796
797    /// Blocking variant of [`cas_insert_bytes`](Store::cas_insert_bytes).
798    pub fn cas_insert_bytes_sync(&self, bytes: &[u8]) -> cacache::Result<ssri::Integrity> {
799        self.cas_insert_sync(bytes)
800    }
801
802    /// Read back the full payload for `hash` into a `Vec<u8>`.
803    pub async fn cas_read(&self, hash: &ssri::Integrity) -> cacache::Result<Vec<u8>> {
804        cacache::read_hash(&self.path.join("cacache"), hash).await
805    }
806
807    /// Blocking variant of [`cas_read`](Store::cas_read).
808    pub fn cas_read_sync(&self, hash: &ssri::Integrity) -> cacache::Result<Vec<u8>> {
809        cacache::read_hash_sync(self.path.join("cacache"), hash)
810    }
811
812    /// Persist a frame exactly as given, including its existing
813    /// [`id`](Frame::id), without broadcasting it to live readers or scheduling
814    /// TTL garbage collection.
815    ///
816    /// Most callers want [`append`](Store::append) instead, which assigns a
817    /// fresh ID, handles ephemeral and `Last` retention, and notifies
818    /// subscribers. Use `insert_frame` only when you are reconstructing a stream
819    /// with predetermined IDs (for example when restoring a backup).
820    #[tracing::instrument(skip(self))]
821    pub fn insert_frame(&self, frame: &Frame) -> Result<(), crate::error::Error> {
822        let encoded: Vec<u8> = serde_json::to_vec(&frame).unwrap();
823
824        // Get the index topic key (also validates topic)
825        let topic_key = idx_topic_key_from_frame(frame)?;
826
827        // Get prefix index keys for hierarchical queries
828        let prefix_keys = idx_topic_prefix_keys(&frame.topic, &frame.id);
829
830        let mut batch = self.db.batch();
831        batch.insert(&self.stream, frame.id.as_bytes(), encoded);
832        batch.insert(&self.idx_topic, topic_key, b"");
833        for prefix_key in &prefix_keys {
834            batch.insert(&self.idx_topic, prefix_key, b"");
835        }
836        batch.commit()?;
837        self.db.persist(PersistMode::SyncAll)?;
838        Ok(())
839    }
840
841    /// Append a frame to the stream and return it with its freshly assigned
842    /// [`id`](Frame::id).
843    ///
844    /// This is the primary write path. It:
845    ///
846    /// - assigns a new time-sortable ID (overwriting any ID on the input);
847    /// - validates the topic (see [`validate_topic`]);
848    /// - persists the frame, unless its [`TTL`] is [`TTL::Ephemeral`], in which
849    ///   case it is only broadcast to live readers;
850    /// - schedules garbage collection for [`TTL::Last`] retention;
851    /// - broadcasts the frame to everyone currently in a following
852    ///   [`read`](Store::read).
853    ///
854    /// Appends are serialized internally, so frames are assigned IDs and
855    /// delivered to subscribers in a consistent order.
856    ///
857    /// # Errors
858    ///
859    /// Returns an error if the topic is invalid or the underlying write fails.
860    ///
861    /// ```no_run
862    /// use xs::{Store, Frame, TTL};
863    ///
864    /// # async fn run(store: Store) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
865    /// let hash = store.cas_insert("hello clipboard").await?;
866    /// let frame = store.append(
867    ///     Frame::builder("clip.add").hash(hash).ttl(TTL::Last(100)).build(),
868    /// )?;
869    /// println!("appended {}", frame.id);
870    /// # Ok(())
871    /// # }
872    /// ```
873    pub fn append(&self, mut frame: Frame) -> Result<Frame, crate::error::Error> {
874        // Serialize all appends to ensure ID generation, write, and broadcast
875        // happen atomically. This guarantees subscribers receive frames in
876        // scru128 ID order.
877        let _guard = self.append_lock.lock().unwrap();
878
879        frame.id = scru128::new();
880
881        // Check for null byte in topic (in case we're not storing the frame)
882        idx_topic_key_from_frame(&frame)?;
883
884        // only store the frame if it's not ephemeral
885        if frame.ttl != Some(TTL::Ephemeral) {
886            self.insert_frame(&frame)?;
887
888            // If this is a Last TTL, schedule a gc task
889            if let Some(TTL::Last(n)) = frame.ttl {
890                let _ = self.gc_tx.send(GCTask::CheckLastTTL {
891                    topic: frame.topic.clone(),
892                    keep: n,
893                });
894            }
895        }
896
897        let _ = self.broadcast_tx.send(frame.clone());
898        Ok(frame)
899    }
900
901    /// Iterate frames starting from a bound.
902    /// `start` is `(id, inclusive)` where inclusive=true means >= and inclusive=false means >.
903    fn iter_frames(
904        &self,
905        start: Option<(&Scru128Id, bool)>,
906    ) -> Box<dyn Iterator<Item = Frame> + '_> {
907        let range = match start {
908            Some((id, true)) => (Bound::Included(id.as_bytes().to_vec()), Bound::Unbounded),
909            Some((id, false)) => (Bound::Excluded(id.as_bytes().to_vec()), Bound::Unbounded),
910            None => (Bound::Unbounded, Bound::Unbounded),
911        };
912
913        Box::new(self.stream.range(range).filter_map(|guard| {
914            let (key, value) = guard.into_inner().ok()?;
915            Some(deserialize_frame((key, value)))
916        }))
917    }
918
919    /// Iterate frames in reverse order (most recent first).
920    fn iter_frames_rev(&self) -> Box<dyn Iterator<Item = Frame> + '_> {
921        Box::new(self.stream.iter().rev().filter_map(|guard| {
922            let (key, value) = guard.into_inner().ok()?;
923            Some(deserialize_frame((key, value)))
924        }))
925    }
926
927    /// Iterate frames by topic in reverse order (most recent first).
928    fn iter_frames_by_topic_rev<'a>(
929        &'a self,
930        topic: &'a str,
931    ) -> Box<dyn Iterator<Item = Frame> + 'a> {
932        let prefix = idx_topic_key_prefix(topic);
933        Box::new(
934            self.idx_topic
935                .prefix(prefix)
936                .rev()
937                .filter_map(move |guard| {
938                    let key = guard.key().ok()?;
939                    let frame_id = idx_topic_frame_id_from_key(&key);
940                    self.get(&frame_id)
941                }),
942        )
943    }
944
945    /// Iterate frames by topic prefix in reverse order (most recent first).
946    fn iter_frames_by_topic_prefix_rev<'a>(
947        &'a self,
948        prefix: &'a str,
949    ) -> Box<dyn Iterator<Item = Frame> + 'a> {
950        let mut index_prefix = Vec::with_capacity(prefix.len() + 1);
951        index_prefix.extend(prefix.as_bytes());
952        index_prefix.push(NULL_DELIMITER);
953
954        Box::new(
955            self.idx_topic
956                .prefix(index_prefix)
957                .rev()
958                .filter_map(move |guard| {
959                    let key = guard.key().ok()?;
960                    let frame_id = idx_topic_frame_id_from_key(&key);
961                    self.get(&frame_id)
962                }),
963        )
964    }
965
966    fn iter_frames_by_topic<'a>(
967        &'a self,
968        topic: &'a str,
969        start: Option<(&'a Scru128Id, bool)>,
970    ) -> Box<dyn Iterator<Item = Frame> + 'a> {
971        let prefix = idx_topic_key_prefix(topic);
972        Box::new(self.idx_topic.prefix(prefix).filter_map(move |guard| {
973            let key = guard.key().ok()?;
974            let frame_id = idx_topic_frame_id_from_key(&key);
975            if let Some((bound_id, inclusive)) = start {
976                if inclusive {
977                    if frame_id < *bound_id {
978                        return None;
979                    }
980                } else if frame_id <= *bound_id {
981                    return None;
982                }
983            }
984            self.get(&frame_id)
985        }))
986    }
987
988    /// Iterate frames matching a topic prefix (for wildcard queries like "user.*").
989    /// The prefix should include the trailing dot (e.g., "user." for "user.*").
990    fn iter_frames_by_topic_prefix<'a>(
991        &'a self,
992        prefix: &'a str,
993        start: Option<(&'a Scru128Id, bool)>,
994    ) -> Box<dyn Iterator<Item = Frame> + 'a> {
995        // Build index prefix: "user.\0" for scanning all "user.*" entries
996        let mut index_prefix = Vec::with_capacity(prefix.len() + 1);
997        index_prefix.extend(prefix.as_bytes());
998        index_prefix.push(NULL_DELIMITER);
999
1000        Box::new(
1001            self.idx_topic
1002                .prefix(index_prefix)
1003                .filter_map(move |guard| {
1004                    let key = guard.key().ok()?;
1005                    let frame_id = idx_topic_frame_id_from_key(&key);
1006                    if let Some((bound_id, inclusive)) = start {
1007                        if inclusive {
1008                            if frame_id < *bound_id {
1009                                return None;
1010                            }
1011                        } else if frame_id <= *bound_id {
1012                            return None;
1013                        }
1014                    }
1015                    self.get(&frame_id)
1016                }),
1017        )
1018    }
1019}
1020
1021fn spawn_gc_worker(mut gc_rx: UnboundedReceiver<GCTask>, store: Store) {
1022    std::thread::spawn(move || {
1023        while let Some(task) = gc_rx.blocking_recv() {
1024            match task {
1025                GCTask::Remove(id) => {
1026                    let _ = store.remove(&id);
1027                }
1028
1029                GCTask::CheckLastTTL { topic, keep } => {
1030                    let prefix = idx_topic_key_prefix(&topic);
1031                    let frames_to_remove: Vec<_> = store
1032                        .idx_topic
1033                        .prefix(&prefix)
1034                        .rev() // Scan from newest to oldest
1035                        .skip(keep as usize)
1036                        .filter_map(|guard| {
1037                            let key = guard.key().ok()?;
1038                            Some(Scru128Id::from_bytes(
1039                                idx_topic_frame_id_from_key(&key).into(),
1040                            ))
1041                        })
1042                        .collect();
1043
1044                    for frame_id in frames_to_remove {
1045                        let _ = store.remove(&frame_id);
1046                    }
1047                }
1048
1049                GCTask::Drain(tx) => {
1050                    let _ = tx.send(());
1051                }
1052            }
1053        }
1054    });
1055}
1056
1057fn is_expired(id: &Scru128Id, ttl: &Duration) -> bool {
1058    let created_ms = id.timestamp();
1059    let expires_ms = created_ms.saturating_add(ttl.as_millis() as u64);
1060    let now_ms = std::time::SystemTime::now()
1061        .duration_since(std::time::UNIX_EPOCH)
1062        .unwrap()
1063        .as_millis() as u64;
1064
1065    now_ms >= expires_ms
1066}
1067
1068const NULL_DELIMITER: u8 = 0;
1069const MAX_TOPIC_LENGTH: usize = 255;
1070
1071/// Validate a frame topic (per ADR 0001).
1072///
1073/// A topic must be non-empty and at most 255 bytes, start with an ASCII letter
1074/// or `_`, and contain only `a-z A-Z 0-9 _ - .`. It may not end with `.` or
1075/// contain consecutive dots. [`Store::append`] runs this automatically; call it
1076/// directly to validate user input before building a [`Frame`].
1077///
1078/// ```
1079/// use xs::store::validate_topic;
1080///
1081/// assert!(validate_topic("clip.add").is_ok());
1082/// assert!(validate_topic("clip.").is_err());
1083/// ```
1084pub fn validate_topic(topic: &str) -> Result<(), crate::error::Error> {
1085    if topic.is_empty() {
1086        return Err("Topic cannot be empty".to_string().into());
1087    }
1088    if topic.len() > MAX_TOPIC_LENGTH {
1089        return Err(format!("Topic exceeds max length of {MAX_TOPIC_LENGTH} bytes").into());
1090    }
1091    if topic.ends_with('.') {
1092        return Err("Topic cannot end with '.'".to_string().into());
1093    }
1094    if topic.contains("..") {
1095        return Err("Topic cannot contain consecutive dots".to_string().into());
1096    }
1097
1098    let bytes = topic.as_bytes();
1099    let first = bytes[0];
1100    if !first.is_ascii_alphabetic() && first != b'_' {
1101        return Err("Topic must start with a-z, A-Z, or _".to_string().into());
1102    }
1103
1104    for &b in bytes {
1105        if !b.is_ascii_alphanumeric() && b != b'_' && b != b'-' && b != b'.' {
1106            return Err(format!(
1107                "Topic contains invalid character: '{}'. Allowed: a-z A-Z 0-9 _ - .",
1108                b as char
1109            )
1110            .into());
1111        }
1112    }
1113
1114    Ok(())
1115}
1116
1117/// Validates a topic query (for --topic flag).
1118/// Allows wildcards: "*" (match all) or "prefix.*" (match children).
1119pub fn validate_topic_query(topic: &str) -> Result<(), crate::error::Error> {
1120    if topic == "*" {
1121        return Ok(());
1122    }
1123    if let Some(prefix) = topic.strip_suffix(".*") {
1124        // Validate the prefix part (e.g., "user" in "user.*")
1125        // Prefix can be empty edge case: ".*" is not valid
1126        if prefix.is_empty() {
1127            return Err("Wildcard '.*' requires a prefix".to_string().into());
1128        }
1129        validate_topic(prefix)
1130    } else {
1131        validate_topic(topic)
1132    }
1133}
1134
1135/// Generate prefix index keys for hierarchical topic queries.
1136/// For topic "user.id1.messages", returns keys for prefixes "user." and "user.id1."
1137fn idx_topic_prefix_keys(topic: &str, frame_id: &scru128::Scru128Id) -> Vec<Vec<u8>> {
1138    let mut keys = Vec::new();
1139    let mut pos = 0;
1140    while let Some(dot_pos) = topic[pos..].find('.') {
1141        let prefix = &topic[..pos + dot_pos + 1]; // include the dot
1142        let mut key = Vec::with_capacity(prefix.len() + 1 + 16);
1143        key.extend(prefix.as_bytes());
1144        key.push(NULL_DELIMITER);
1145        key.extend(frame_id.as_bytes());
1146        keys.push(key);
1147        pos += dot_pos + 1;
1148    }
1149    keys
1150}
1151
1152fn idx_topic_key_prefix(topic: &str) -> Vec<u8> {
1153    let mut v = Vec::with_capacity(topic.len() + 1); // topic bytes + delimiter
1154    v.extend(topic.as_bytes()); // topic string as UTF-8 bytes
1155    v.push(NULL_DELIMITER); // Delimiter for variable-sized keys
1156    v
1157}
1158
1159pub(crate) fn idx_topic_key_from_frame(frame: &Frame) -> Result<Vec<u8>, crate::error::Error> {
1160    validate_topic(&frame.topic)?;
1161    let mut v = idx_topic_key_prefix(&frame.topic);
1162    v.extend(frame.id.as_bytes());
1163    Ok(v)
1164}
1165
1166fn idx_topic_frame_id_from_key(key: &[u8]) -> Scru128Id {
1167    let frame_id_bytes = &key[key.len() - 16..];
1168    Scru128Id::from_bytes(frame_id_bytes.try_into().unwrap())
1169}
1170
1171fn deserialize_frame<B1: AsRef<[u8]> + std::fmt::Debug, B2: AsRef<[u8]>>(
1172    record: (B1, B2),
1173) -> Frame {
1174    serde_json::from_slice(record.1.as_ref()).unwrap_or_else(|e| {
1175        // Try to convert the key to a Scru128Id and print in a format that can be copied for deletion
1176        let key_bytes = record.0.as_ref();
1177        if key_bytes.len() == 16 {
1178            if let Ok(bytes) = key_bytes.try_into() {
1179                let id = Scru128Id::from_bytes(bytes);
1180                eprintln!("CORRUPTED_RECORD_ID: {id}");
1181            }
1182        }
1183        let key = std::str::from_utf8(record.0.as_ref()).unwrap();
1184        let value = std::str::from_utf8(record.1.as_ref()).unwrap();
1185        panic!("Failed to deserialize frame: {e} {key} {value}")
1186    })
1187}