xs/store/mod.rs
1//! The event stream store.
2//!
3//! A [`Store`] is an append-only log of [`Frame`]s persisted to a directory on
4//! disk. Append events with [`Store::append`], replay and follow them with
5//! [`Store::read`], and stash payload bytes in the content-addressed store with
6//! the `cas_*` methods.
7//!
8//! ## Topics
9//!
10//! Every frame has a dot-delimited `topic` (for example `clip.add`). Topics form
11//! a hierarchy: a reader can ask for an exact topic, a prefix wildcard like
12//! `clip.*`, or `*` for everything. See [`validate_topic`](crate::store::validate_topic)
13//! for the allowed characters and [`ReadOptions::topic`] for querying.
14//!
15//! ## Retention
16//!
17//! Each frame carries a [`TTL`] that controls how long it is kept: forever, for
18//! a fixed duration, only the last N per topic, or ephemeral (broadcast to live
19//! readers but never stored).
20
21mod ttl;
22pub use ttl::*;
23
24#[cfg(test)]
25mod tests;
26
27use std::ops::Bound;
28use std::path::PathBuf;
29use std::time::Duration;
30
31use tokio::sync::broadcast;
32use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender};
33
34use std::sync::{Arc, Mutex};
35
36use scru128::Scru128Id;
37
38use serde::{Deserialize, Deserializer, Serialize};
39
40use fjall::{
41 config::{BlockSizePolicy, HashRatioPolicy},
42 Database, Error as FjallError, Keyspace, KeyspaceCreateOptions, PersistMode,
43};
44
45/// Error returned when opening a [`Store`].
46#[derive(Debug)]
47pub enum StoreError {
48 /// The store directory is already open in another process.
49 Locked,
50 /// An error from the underlying `fjall` database.
51 Other(FjallError),
52}
53
54impl std::fmt::Display for StoreError {
55 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
56 match self {
57 StoreError::Locked => write!(f, "Store is locked by another process"),
58 StoreError::Other(e) => write!(f, "{e}"),
59 }
60 }
61}
62
63impl std::error::Error for StoreError {}
64
65/// A single event in the stream.
66///
67/// A frame is metadata; the payload bytes (if any) live in the content-addressed
68/// store and are referenced by [`hash`](Frame::hash). Build one with the
69/// [`bon`](https://docs.rs/bon) builder, where the topic is the required
70/// starting argument:
71///
72/// ```
73/// use xs::{Frame, TTL};
74///
75/// let frame = Frame::builder("clip.add")
76/// .meta(serde_json::json!({ "source": "keyboard" }))
77/// .ttl(TTL::Last(100))
78/// .build();
79///
80/// assert_eq!(frame.topic, "clip.add");
81/// ```
82///
83/// The [`id`](Frame::id) is assigned by [`Store::append`] at write time, so the
84/// value you set on a builder is ignored when appending.
85#[derive(PartialEq, Eq, Serialize, Deserialize, Clone, Default, bon::Builder)]
86pub struct Frame {
87 /// Dot-delimited topic this frame belongs to (for example `clip.add`).
88 ///
89 /// Must satisfy [`validate_topic`]; [`Store::append`] rejects invalid topics.
90 #[builder(start_fn, into)]
91 pub topic: String,
92 /// Time-sortable identifier. Assigned by [`Store::append`]; any value set
93 /// before appending is overwritten.
94 #[builder(default)]
95 pub id: Scru128Id,
96 /// Integrity hash of the payload in the content-addressed store, if this
97 /// frame has one. Produce it with [`Store::cas_insert`].
98 pub hash: Option<ssri::Integrity>,
99 /// Arbitrary JSON metadata carried inline with the frame.
100 pub meta: Option<serde_json::Value>,
101 /// Retention policy for this frame. Defaults to [`TTL::Forever`] when unset.
102 pub ttl: Option<TTL>,
103}
104
105use std::fmt;
106
107impl fmt::Debug for Frame {
108 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
109 f.debug_struct("Frame")
110 .field("id", &format!("{id}", id = self.id))
111 .field("topic", &self.topic)
112 .field("hash", &self.hash.as_ref().map(|x| format!("{x}")))
113 .field("meta", &self.meta)
114 .field("ttl", &self.ttl)
115 .finish()
116 }
117}
118
119impl<'de> Deserialize<'de> for FollowOption {
120 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
121 where
122 D: Deserializer<'de>,
123 {
124 let s: String = Deserialize::deserialize(deserializer)?;
125 if s.is_empty() || s == "yes" {
126 Ok(FollowOption::On)
127 } else if let Ok(duration) = s.parse::<u64>() {
128 Ok(FollowOption::WithHeartbeat(Duration::from_millis(duration)))
129 } else {
130 match s.as_str() {
131 "true" => Ok(FollowOption::On),
132 "false" | "no" => Ok(FollowOption::Off),
133 _ => Err(serde::de::Error::custom("Invalid value for follow option")),
134 }
135 }
136 }
137}
138
139fn deserialize_bool<'de, D>(deserializer: D) -> Result<bool, D::Error>
140where
141 D: Deserializer<'de>,
142{
143 let s: String = Deserialize::deserialize(deserializer)?;
144 match s.as_str() {
145 "false" | "no" | "0" => Ok(false),
146 _ => Ok(true),
147 }
148}
149
150/// Options controlling a [`Store::read`] or [`Store::read_sync`] call.
151///
152/// Defaults replay every stored frame once, oldest first, then stop. Build a
153/// query with the [`bon`](https://docs.rs/bon) builder:
154///
155/// ```
156/// use xs::{ReadOptions, FollowOption};
157///
158/// // Replay history for one topic, then keep streaming new appends.
159/// let opts = ReadOptions::builder()
160/// .topic("clip.*".to_string())
161/// .follow(FollowOption::On)
162/// .build();
163/// ```
164#[derive(PartialEq, Deserialize, Clone, Debug, Default, bon::Builder)]
165pub struct ReadOptions {
166 /// Whether to keep streaming live appends after history is replayed.
167 /// Defaults to [`FollowOption::Off`].
168 #[serde(default)]
169 #[builder(default)]
170 pub follow: FollowOption,
171 /// Skip historical frames and emit only appends made after the read starts.
172 #[serde(default, deserialize_with = "deserialize_bool")]
173 #[builder(default)]
174 pub new: bool,
175 /// Start after this ID (exclusive).
176 #[serde(rename = "after")]
177 pub after: Option<Scru128Id>,
178 /// Start from this ID (inclusive).
179 pub from: Option<Scru128Id>,
180 /// Stop after emitting this many historical frames.
181 pub limit: Option<usize>,
182 /// Return the last N frames (most recent), in chronological order.
183 pub last: Option<usize>,
184 /// Restrict to a topic: an exact name, a `prefix.*` wildcard, or `*` for all.
185 pub topic: Option<String>,
186}
187
188impl ReadOptions {
189 /// Parse options from a URL query string (the form used by the HTTP API),
190 /// for example `follow=true&topic=clip.*&last=10`. `None` yields the
191 /// defaults.
192 pub fn from_query(query: Option<&str>) -> Result<Self, crate::error::Error> {
193 match query {
194 Some(q) => Ok(serde_urlencoded::from_str(q)?),
195 None => Ok(Self::default()),
196 }
197 }
198
199 /// Render these options back into a URL query string, the inverse of
200 /// [`from_query`](ReadOptions::from_query). Returns an empty string when no
201 /// options are set.
202 pub fn to_query_string(&self) -> String {
203 let mut params = Vec::new();
204
205 // Add follow parameter with heartbeat if specified
206 match self.follow {
207 FollowOption::Off => {}
208 FollowOption::On => params.push(("follow", "true".to_string())),
209 FollowOption::WithHeartbeat(duration) => {
210 params.push(("follow", duration.as_millis().to_string()));
211 }
212 }
213
214 // Add new if true
215 if self.new {
216 params.push(("new", "true".to_string()));
217 }
218
219 // Add after if present
220 if let Some(after) = self.after {
221 params.push(("after", after.to_string()));
222 }
223
224 // Add from if present
225 if let Some(from) = self.from {
226 params.push(("from", from.to_string()));
227 }
228
229 // Add limit if present
230 if let Some(limit) = self.limit {
231 params.push(("limit", limit.to_string()));
232 }
233
234 // Add last if present
235 if let Some(last) = self.last {
236 params.push(("last", last.to_string()));
237 }
238
239 if let Some(topic) = &self.topic {
240 params.push(("topic", topic.clone()));
241 }
242
243 // Return empty string if no params
244 if params.is_empty() {
245 String::new()
246 } else {
247 url::form_urlencoded::Serializer::new(String::new())
248 .extend_pairs(params)
249 .finish()
250 }
251 }
252}
253
254/// Whether a read keeps streaming after history is replayed.
255#[derive(Default, PartialEq, Clone, Debug)]
256pub enum FollowOption {
257 /// Stop once historical frames are exhausted.
258 #[default]
259 Off,
260 /// Replay history, then stream live appends indefinitely.
261 On,
262 /// Like [`On`](FollowOption::On), but also emit a periodic heartbeat frame
263 /// at the given interval so idle readers can detect a live connection.
264 WithHeartbeat(Duration),
265}
266
267#[derive(Debug)]
268enum GCTask {
269 Remove(Scru128Id),
270 CheckLastTTL { topic: String, keep: u32 },
271 Drain(tokio::sync::oneshot::Sender<()>),
272}
273
274/// An append-only event stream backed by a directory on disk.
275///
276/// Open one with [`new`](Store::new). `Store` is cheaply [`Clone`]able: every
277/// clone shares the same underlying database, broadcast channel, and
278/// content-addressed store, so clone it freely across tasks and threads instead
279/// of wrapping it in an `Arc`.
280///
281/// See the [module docs](crate::store) for the topic and retention model.
282#[derive(Clone)]
283pub struct Store {
284 /// Directory backing this store (the path passed to [`new`](Store::new)).
285 pub path: PathBuf,
286 db: Database,
287 stream: Keyspace,
288 idx_topic: Keyspace,
289 broadcast_tx: broadcast::Sender<Frame>,
290 gc_tx: UnboundedSender<GCTask>,
291 append_lock: Arc<Mutex<()>>,
292}
293
294impl Store {
295 /// Open the store at `path`, creating the directory layout if it does not
296 /// exist. Spawns a background worker that garbage-collects expired frames.
297 ///
298 /// # Errors
299 ///
300 /// Returns [`StoreError::Locked`] if another process already holds the
301 /// store open, or [`StoreError::Other`] for any other database error.
302 ///
303 /// ```no_run
304 /// use xs::Store;
305 ///
306 /// let store = Store::new("./clipboard-store".into())?;
307 /// # Ok::<(), xs::StoreError>(())
308 /// ```
309 pub fn new(path: PathBuf) -> Result<Store, StoreError> {
310 let db = match Database::builder(path.join("fjall"))
311 .cache_size(32 * 1024 * 1024) // 32 MiB
312 .worker_threads(1)
313 .open()
314 {
315 Ok(db) => db,
316 Err(FjallError::Locked) => return Err(StoreError::Locked),
317 Err(e) => return Err(StoreError::Other(e)),
318 };
319
320 // Options for stream keyspace: point reads by frame ID
321 let stream_opts = || {
322 KeyspaceCreateOptions::default()
323 .max_memtable_size(8 * 1024 * 1024) // 8 MiB
324 .data_block_size_policy(BlockSizePolicy::all(16 * 1024)) // 16 KiB
325 .data_block_hash_ratio_policy(HashRatioPolicy::all(8.0))
326 .expect_point_read_hits(true)
327 };
328
329 // Options for idx_topic keyspace: prefix scans only
330 let idx_opts = || {
331 KeyspaceCreateOptions::default()
332 .max_memtable_size(8 * 1024 * 1024) // 8 MiB
333 .data_block_size_policy(BlockSizePolicy::all(16 * 1024)) // 16 KiB
334 .data_block_hash_ratio_policy(HashRatioPolicy::all(0.0)) // no point reads
335 .expect_point_read_hits(true)
336 };
337
338 let stream = db.keyspace("stream", stream_opts).unwrap();
339 let idx_topic = db.keyspace("idx_topic", idx_opts).unwrap();
340
341 let (broadcast_tx, _) = broadcast::channel(1024);
342 let (gc_tx, gc_rx) = mpsc::unbounded_channel();
343
344 let store = Store {
345 path: path.clone(),
346 db,
347 stream,
348 idx_topic,
349 broadcast_tx,
350 gc_tx,
351 append_lock: Arc::new(Mutex::new(())),
352 };
353
354 // Spawn gc worker thread
355 spawn_gc_worker(gc_rx, store.clone());
356
357 Ok(store)
358 }
359
360 /// Wait until the background garbage-collection worker has processed every
361 /// task queued so far. Useful in tests to observe TTL eviction
362 /// deterministically.
363 pub async fn wait_for_gc(&self) {
364 let (tx, rx) = tokio::sync::oneshot::channel();
365 let _ = self.gc_tx.send(GCTask::Drain(tx));
366 let _ = rx.await;
367 }
368
369 /// Read frames into an async channel according to `options`.
370 ///
371 /// By default this replays matching historical frames oldest-first and then
372 /// closes the channel. With [`FollowOption::On`] it instead keeps the
373 /// channel open and streams new appends as they arrive. When following, a
374 /// single ephemeral `xs.threshold` frame is emitted to mark the boundary
375 /// between replayed history and live events.
376 ///
377 /// The returned [`Receiver`](tokio::sync::mpsc::Receiver) is bounded;
378 /// dropping it stops the read. For a blocking, non-async caller use
379 /// [`read_sync`](Store::read_sync).
380 ///
381 /// ```no_run
382 /// use xs::{Store, ReadOptions, FollowOption};
383 ///
384 /// # async fn run(store: Store) {
385 /// let mut rx = store
386 /// .read(ReadOptions::builder().follow(FollowOption::On).build())
387 /// .await;
388 /// while let Some(frame) = rx.recv().await {
389 /// if frame.topic == "xs.threshold" {
390 /// // caught up to live; everything after this is new
391 /// continue;
392 /// }
393 /// println!("{} {}", frame.id, frame.topic);
394 /// }
395 /// # }
396 /// ```
397 #[tracing::instrument(skip(self))]
398 pub async fn read(&self, options: ReadOptions) -> tokio::sync::mpsc::Receiver<Frame> {
399 let (tx, rx) = tokio::sync::mpsc::channel(100);
400
401 let should_follow = matches!(
402 options.follow,
403 FollowOption::On | FollowOption::WithHeartbeat(_)
404 );
405
406 // Only take broadcast subscription if following. We initate the subscription here to
407 // ensure we don't miss any messages between historical processing and starting the
408 // broadcast subscription.
409 let broadcast_rx = if should_follow {
410 Some(self.broadcast_tx.subscribe())
411 } else {
412 None
413 };
414
415 // Only create done channel if we're doing historical processing
416 let done_rx = if !options.new {
417 let (done_tx, done_rx) = tokio::sync::oneshot::channel();
418 let tx_clone = tx.clone();
419 let store = self.clone();
420 let options = options.clone();
421 let should_follow_clone = should_follow;
422 let gc_tx = self.gc_tx.clone();
423
424 // Spawn OS thread to handle historical events
425 std::thread::spawn(move || {
426 let mut last_id = None;
427 let mut count = 0;
428
429 // Handle --last N: get the N most recent frames
430 if let Some(last_n) = options.last {
431 let iter: Box<dyn Iterator<Item = Frame>> = match options.topic.as_deref() {
432 None | Some("*") => store.iter_frames_rev(),
433 Some(topic) if topic.ends_with(".*") => {
434 let prefix = &topic[..topic.len() - 1];
435 store.iter_frames_by_topic_prefix_rev(prefix)
436 }
437 Some(topic) => store.iter_frames_by_topic_rev(topic),
438 };
439
440 // Collect last N frames (in reverse order), skipping expired
441 let mut frames: Vec<Frame> = Vec::with_capacity(last_n);
442 for frame in iter {
443 if let Some(TTL::Time(ttl)) = frame.ttl.as_ref() {
444 if is_expired(&frame.id, ttl) {
445 let _ = gc_tx.send(GCTask::Remove(frame.id));
446 continue;
447 }
448 }
449 frames.push(frame);
450 if frames.len() >= last_n {
451 break;
452 }
453 }
454
455 // Reverse to chronological order and send
456 for frame in frames.into_iter().rev() {
457 last_id = Some(frame.id);
458 count += 1;
459 if tx_clone.blocking_send(frame).is_err() {
460 return;
461 }
462 }
463 } else {
464 // Normal forward iteration
465 // Determine start bound: from (inclusive) takes precedence over after (exclusive)
466 let start_bound = options
467 .from
468 .as_ref()
469 .map(|id| (id, true))
470 .or_else(|| options.after.as_ref().map(|id| (id, false)));
471
472 let iter: Box<dyn Iterator<Item = Frame>> = match options.topic.as_deref() {
473 None | Some("*") => store.iter_frames(start_bound),
474 Some(topic) if topic.ends_with(".*") => {
475 // Wildcard: "user.*" -> prefix "user."
476 let prefix = &topic[..topic.len() - 1]; // strip "*", keep "."
477 store.iter_frames_by_topic_prefix(prefix, start_bound)
478 }
479 Some(topic) => store.iter_frames_by_topic(topic, start_bound),
480 };
481
482 for frame in iter {
483 if let Some(TTL::Time(ttl)) = frame.ttl.as_ref() {
484 if is_expired(&frame.id, ttl) {
485 let _ = gc_tx.send(GCTask::Remove(frame.id));
486 continue;
487 }
488 }
489
490 last_id = Some(frame.id);
491
492 if let Some(limit) = options.limit {
493 if count >= limit {
494 return; // Exit early if limit reached
495 }
496 }
497
498 if tx_clone.blocking_send(frame).is_err() {
499 return;
500 }
501 count += 1;
502 }
503 }
504
505 // Send threshold message if following
506 if should_follow_clone {
507 let threshold = Frame::builder("xs.threshold")
508 .id(scru128::new())
509 .ttl(TTL::Ephemeral)
510 .build();
511 if tx_clone.blocking_send(threshold).is_err() {
512 return;
513 }
514 }
515
516 // Signal completion with the last seen ID and count
517 let _ = done_tx.send((last_id, count));
518 });
519
520 Some(done_rx)
521 } else {
522 None
523 };
524
525 // Handle broadcast subscription and heartbeat
526 if let Some(broadcast_rx) = broadcast_rx {
527 {
528 let tx = tx.clone();
529 let limit = options.limit;
530
531 tokio::spawn(async move {
532 // If we have a done_rx, wait for historical processing
533 let (last_id, mut count) = match done_rx {
534 Some(done_rx) => match done_rx.await {
535 Ok((id, count)) => (id, count),
536 Err(_) => return, // Historical processing failed/cancelled
537 },
538 None => (None, 0),
539 };
540
541 let mut broadcast_rx = broadcast_rx;
542 while let Ok(frame) = broadcast_rx.recv().await {
543 // Filter by topic (exact match or wildcard)
544 match options.topic.as_deref() {
545 None | Some("*") => {}
546 Some(topic) if topic.ends_with(".*") => {
547 let prefix = &topic[..topic.len() - 1]; // "user.*" -> "user."
548 if !frame.topic.starts_with(prefix) {
549 continue;
550 }
551 }
552 Some(topic) => {
553 if frame.topic != topic {
554 continue;
555 }
556 }
557 }
558
559 // Skip if we've already seen this frame during historical scan
560 if let Some(last_scanned_id) = last_id {
561 if frame.id <= last_scanned_id {
562 continue;
563 }
564 }
565
566 if tx.send(frame).await.is_err() {
567 break;
568 }
569
570 if let Some(limit) = limit {
571 count += 1;
572 if count >= limit {
573 break;
574 }
575 }
576 }
577 });
578 }
579
580 // Handle heartbeat if requested
581 if let FollowOption::WithHeartbeat(duration) = options.follow {
582 let heartbeat_tx = tx;
583 tokio::spawn(async move {
584 loop {
585 tokio::time::sleep(duration).await;
586 let frame = Frame::builder("xs.pulse")
587 .id(scru128::new())
588 .ttl(TTL::Ephemeral)
589 .build();
590 if heartbeat_tx.send(frame).await.is_err() {
591 break;
592 }
593 }
594 });
595 }
596 }
597
598 rx
599 }
600
601 /// Replay matching historical frames as a blocking iterator.
602 ///
603 /// This honours the `topic`, `from`, `after`, `limit`, and `last` parts of
604 /// [`ReadOptions`] but ignores [`follow`](ReadOptions::follow): it never
605 /// streams live appends. Use [`read`](Store::read) when you need to follow.
606 ///
607 /// ```no_run
608 /// use xs::{Store, ReadOptions};
609 ///
610 /// # fn run(store: Store) {
611 /// let opts = ReadOptions::builder().topic("clip.*".to_string()).last(10).build();
612 /// for frame in store.read_sync(opts) {
613 /// println!("{} {}", frame.id, frame.topic);
614 /// }
615 /// # }
616 /// ```
617 pub fn read_sync(&self, options: ReadOptions) -> impl Iterator<Item = Frame> + '_ {
618 let gc_tx = self.gc_tx.clone();
619
620 // Filter out expired frames
621 let filter_expired = move |frame: Frame, gc_tx: &UnboundedSender<GCTask>| {
622 if let Some(TTL::Time(ttl)) = frame.ttl.as_ref() {
623 if is_expired(&frame.id, ttl) {
624 let _ = gc_tx.send(GCTask::Remove(frame.id));
625 return None;
626 }
627 }
628 Some(frame)
629 };
630
631 let frames: Vec<Frame> = if let Some(last_n) = options.last {
632 // Handle --last N: get the N most recent frames
633 let iter: Box<dyn Iterator<Item = Frame>> = match options.topic.as_deref() {
634 None | Some("*") => self.iter_frames_rev(),
635 Some(topic) if topic.ends_with(".*") => {
636 let prefix = &topic[..topic.len() - 1];
637 self.iter_frames_by_topic_prefix_rev(prefix)
638 }
639 Some(topic) => self.iter_frames_by_topic_rev(topic),
640 };
641
642 // Collect last N frames (in reverse order), skipping expired
643 let mut frames: Vec<Frame> = Vec::with_capacity(last_n);
644 for frame in iter {
645 if let Some(frame) = filter_expired(frame, &gc_tx) {
646 frames.push(frame);
647 if frames.len() >= last_n {
648 break;
649 }
650 }
651 }
652
653 // Reverse to chronological order
654 frames.reverse();
655 frames
656 } else {
657 // Normal forward iteration
658 let start_bound = options
659 .from
660 .as_ref()
661 .map(|id| (id, true))
662 .or_else(|| options.after.as_ref().map(|id| (id, false)));
663
664 let iter: Box<dyn Iterator<Item = Frame>> = match options.topic.as_deref() {
665 None | Some("*") => self.iter_frames(start_bound),
666 Some(topic) if topic.ends_with(".*") => {
667 let prefix = &topic[..topic.len() - 1];
668 self.iter_frames_by_topic_prefix(prefix, start_bound)
669 }
670 Some(topic) => self.iter_frames_by_topic(topic, start_bound),
671 };
672
673 iter.filter_map(|frame| filter_expired(frame, &gc_tx))
674 .take(options.limit.unwrap_or(usize::MAX))
675 .collect()
676 };
677
678 frames.into_iter()
679 }
680
681 /// Returns the current module state as of a given point in the stream.
682 ///
683 /// Scans all frames up to (and including) `as_of` and returns a mapping of
684 /// module name to CAS hash for the latest frame on each `xs.module.<name>`
685 /// topic.
686 /// Resolve the set of registered Nushell modules as of a given frame ID.
687 ///
688 /// Scans `xs.module.<name>` frames up to and including `as_of` and returns a
689 /// map from module name to the content hash of its latest definition. Used
690 /// by the scripting runtime; rarely needed when embedding the store
691 /// directly.
692 pub fn nu_modules_at(
693 &self,
694 as_of: &Scru128Id,
695 ) -> std::collections::HashMap<String, ssri::Integrity> {
696 let mut modules = std::collections::HashMap::new();
697 let options = ReadOptions::builder().follow(FollowOption::Off).build();
698 for frame in self.read_sync(options) {
699 if frame.id > *as_of {
700 break;
701 }
702 if let Some(hash) = frame.hash {
703 if let Some(name) = frame.topic.strip_prefix("xs.module.") {
704 if !name.is_empty() {
705 modules.insert(name.to_string(), hash);
706 }
707 }
708 }
709 }
710 modules
711 }
712
713 /// Fetch a single frame by ID, or `None` if no such frame exists.
714 pub fn get(&self, id: &Scru128Id) -> Option<Frame> {
715 self.stream
716 .get(id.to_bytes())
717 .unwrap()
718 .map(|value| deserialize_frame((id.as_bytes(), value)))
719 }
720
721 /// Delete a frame and its topic index entries. Removing a frame that does
722 /// not exist is a no-op and returns `Ok(())`.
723 ///
724 /// This removes the stream entry only; any payload bytes in the
725 /// content-addressed store are left in place.
726 #[tracing::instrument(skip(self), fields(id = %id.to_string()))]
727 pub fn remove(&self, id: &Scru128Id) -> Result<(), crate::error::Error> {
728 let Some(frame) = self.get(id) else {
729 // Already deleted
730 return Ok(());
731 };
732
733 // Build topic key directly (no validation - frame already exists)
734 let mut topic_key = idx_topic_key_prefix(&frame.topic);
735 topic_key.extend(frame.id.as_bytes());
736
737 // Get prefix index keys for hierarchical queries
738 let prefix_keys = idx_topic_prefix_keys(&frame.topic, &frame.id);
739
740 let mut batch = self.db.batch();
741 batch.remove(&self.stream, id.as_bytes());
742 batch.remove(&self.idx_topic, topic_key);
743 for prefix_key in &prefix_keys {
744 batch.remove(&self.idx_topic, prefix_key);
745 }
746 batch.commit()?;
747 self.db.persist(PersistMode::SyncAll)?;
748 Ok(())
749 }
750
751 // --- Content-addressed store (CAS) ---
752 //
753 // Frame payloads live here, keyed by an integrity hash. The typical flow is
754 // `cas_insert` to store bytes, stash the returned hash on a `Frame`, then
755 // `cas_read` to retrieve them later. Each method has a `_sync` twin for
756 // blocking callers; the streaming `cas_reader`/`cas_writer` variants avoid
757 // buffering the whole payload in memory.
758
759 /// Open a streaming reader for the payload identified by `hash`.
760 pub async fn cas_reader(&self, hash: ssri::Integrity) -> cacache::Result<cacache::Reader> {
761 cacache::Reader::open_hash(&self.path.join("cacache"), hash).await
762 }
763
764 /// Blocking variant of [`cas_reader`](Store::cas_reader).
765 pub fn cas_reader_sync(&self, hash: ssri::Integrity) -> cacache::Result<cacache::SyncReader> {
766 cacache::SyncReader::open_hash(self.path.join("cacache"), hash)
767 }
768
769 /// Open a streaming writer; finish it to obtain the payload's integrity hash.
770 pub async fn cas_writer(&self) -> cacache::Result<cacache::Writer> {
771 cacache::WriteOpts::new()
772 .open_hash(&self.path.join("cacache"))
773 .await
774 }
775
776 /// Blocking variant of [`cas_writer`](Store::cas_writer).
777 pub fn cas_writer_sync(&self) -> cacache::Result<cacache::SyncWriter> {
778 cacache::WriteOpts::new().open_hash_sync(self.path.join("cacache"))
779 }
780
781 /// Store `content` and return its integrity hash, ready to attach to a
782 /// [`Frame::hash`].
783 pub async fn cas_insert(&self, content: impl AsRef<[u8]>) -> cacache::Result<ssri::Integrity> {
784 cacache::write_hash(&self.path.join("cacache"), content).await
785 }
786
787 /// Blocking variant of [`cas_insert`](Store::cas_insert).
788 pub fn cas_insert_sync(&self, content: impl AsRef<[u8]>) -> cacache::Result<ssri::Integrity> {
789 cacache::write_hash_sync(self.path.join("cacache"), content)
790 }
791
792 /// Convenience wrapper over [`cas_insert`](Store::cas_insert) for a byte slice.
793 pub async fn cas_insert_bytes(&self, bytes: &[u8]) -> cacache::Result<ssri::Integrity> {
794 self.cas_insert(bytes).await
795 }
796
797 /// Blocking variant of [`cas_insert_bytes`](Store::cas_insert_bytes).
798 pub fn cas_insert_bytes_sync(&self, bytes: &[u8]) -> cacache::Result<ssri::Integrity> {
799 self.cas_insert_sync(bytes)
800 }
801
802 /// Read back the full payload for `hash` into a `Vec<u8>`.
803 pub async fn cas_read(&self, hash: &ssri::Integrity) -> cacache::Result<Vec<u8>> {
804 cacache::read_hash(&self.path.join("cacache"), hash).await
805 }
806
807 /// Blocking variant of [`cas_read`](Store::cas_read).
808 pub fn cas_read_sync(&self, hash: &ssri::Integrity) -> cacache::Result<Vec<u8>> {
809 cacache::read_hash_sync(self.path.join("cacache"), hash)
810 }
811
812 /// Persist a frame exactly as given, including its existing
813 /// [`id`](Frame::id), without broadcasting it to live readers or scheduling
814 /// TTL garbage collection.
815 ///
816 /// Most callers want [`append`](Store::append) instead, which assigns a
817 /// fresh ID, handles ephemeral and `Last` retention, and notifies
818 /// subscribers. Use `insert_frame` only when you are reconstructing a stream
819 /// with predetermined IDs (for example when restoring a backup).
820 #[tracing::instrument(skip(self))]
821 pub fn insert_frame(&self, frame: &Frame) -> Result<(), crate::error::Error> {
822 let encoded: Vec<u8> = serde_json::to_vec(&frame).unwrap();
823
824 // Get the index topic key (also validates topic)
825 let topic_key = idx_topic_key_from_frame(frame)?;
826
827 // Get prefix index keys for hierarchical queries
828 let prefix_keys = idx_topic_prefix_keys(&frame.topic, &frame.id);
829
830 let mut batch = self.db.batch();
831 batch.insert(&self.stream, frame.id.as_bytes(), encoded);
832 batch.insert(&self.idx_topic, topic_key, b"");
833 for prefix_key in &prefix_keys {
834 batch.insert(&self.idx_topic, prefix_key, b"");
835 }
836 batch.commit()?;
837 self.db.persist(PersistMode::SyncAll)?;
838 Ok(())
839 }
840
841 /// Append a frame to the stream and return it with its freshly assigned
842 /// [`id`](Frame::id).
843 ///
844 /// This is the primary write path. It:
845 ///
846 /// - assigns a new time-sortable ID (overwriting any ID on the input);
847 /// - validates the topic (see [`validate_topic`]);
848 /// - persists the frame, unless its [`TTL`] is [`TTL::Ephemeral`], in which
849 /// case it is only broadcast to live readers;
850 /// - schedules garbage collection for [`TTL::Last`] retention;
851 /// - broadcasts the frame to everyone currently in a following
852 /// [`read`](Store::read).
853 ///
854 /// Appends are serialized internally, so frames are assigned IDs and
855 /// delivered to subscribers in a consistent order.
856 ///
857 /// # Errors
858 ///
859 /// Returns an error if the topic is invalid or the underlying write fails.
860 ///
861 /// ```no_run
862 /// use xs::{Store, Frame, TTL};
863 ///
864 /// # async fn run(store: Store) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
865 /// let hash = store.cas_insert("hello clipboard").await?;
866 /// let frame = store.append(
867 /// Frame::builder("clip.add").hash(hash).ttl(TTL::Last(100)).build(),
868 /// )?;
869 /// println!("appended {}", frame.id);
870 /// # Ok(())
871 /// # }
872 /// ```
873 pub fn append(&self, mut frame: Frame) -> Result<Frame, crate::error::Error> {
874 // Serialize all appends to ensure ID generation, write, and broadcast
875 // happen atomically. This guarantees subscribers receive frames in
876 // scru128 ID order.
877 let _guard = self.append_lock.lock().unwrap();
878
879 frame.id = scru128::new();
880
881 // Check for null byte in topic (in case we're not storing the frame)
882 idx_topic_key_from_frame(&frame)?;
883
884 // only store the frame if it's not ephemeral
885 if frame.ttl != Some(TTL::Ephemeral) {
886 self.insert_frame(&frame)?;
887
888 // If this is a Last TTL, schedule a gc task
889 if let Some(TTL::Last(n)) = frame.ttl {
890 let _ = self.gc_tx.send(GCTask::CheckLastTTL {
891 topic: frame.topic.clone(),
892 keep: n,
893 });
894 }
895 }
896
897 let _ = self.broadcast_tx.send(frame.clone());
898 Ok(frame)
899 }
900
901 /// Iterate frames starting from a bound.
902 /// `start` is `(id, inclusive)` where inclusive=true means >= and inclusive=false means >.
903 fn iter_frames(
904 &self,
905 start: Option<(&Scru128Id, bool)>,
906 ) -> Box<dyn Iterator<Item = Frame> + '_> {
907 let range = match start {
908 Some((id, true)) => (Bound::Included(id.as_bytes().to_vec()), Bound::Unbounded),
909 Some((id, false)) => (Bound::Excluded(id.as_bytes().to_vec()), Bound::Unbounded),
910 None => (Bound::Unbounded, Bound::Unbounded),
911 };
912
913 Box::new(self.stream.range(range).filter_map(|guard| {
914 let (key, value) = guard.into_inner().ok()?;
915 Some(deserialize_frame((key, value)))
916 }))
917 }
918
919 /// Iterate frames in reverse order (most recent first).
920 fn iter_frames_rev(&self) -> Box<dyn Iterator<Item = Frame> + '_> {
921 Box::new(self.stream.iter().rev().filter_map(|guard| {
922 let (key, value) = guard.into_inner().ok()?;
923 Some(deserialize_frame((key, value)))
924 }))
925 }
926
927 /// Iterate frames by topic in reverse order (most recent first).
928 fn iter_frames_by_topic_rev<'a>(
929 &'a self,
930 topic: &'a str,
931 ) -> Box<dyn Iterator<Item = Frame> + 'a> {
932 let prefix = idx_topic_key_prefix(topic);
933 Box::new(
934 self.idx_topic
935 .prefix(prefix)
936 .rev()
937 .filter_map(move |guard| {
938 let key = guard.key().ok()?;
939 let frame_id = idx_topic_frame_id_from_key(&key);
940 self.get(&frame_id)
941 }),
942 )
943 }
944
945 /// Iterate frames by topic prefix in reverse order (most recent first).
946 fn iter_frames_by_topic_prefix_rev<'a>(
947 &'a self,
948 prefix: &'a str,
949 ) -> Box<dyn Iterator<Item = Frame> + 'a> {
950 let mut index_prefix = Vec::with_capacity(prefix.len() + 1);
951 index_prefix.extend(prefix.as_bytes());
952 index_prefix.push(NULL_DELIMITER);
953
954 Box::new(
955 self.idx_topic
956 .prefix(index_prefix)
957 .rev()
958 .filter_map(move |guard| {
959 let key = guard.key().ok()?;
960 let frame_id = idx_topic_frame_id_from_key(&key);
961 self.get(&frame_id)
962 }),
963 )
964 }
965
966 fn iter_frames_by_topic<'a>(
967 &'a self,
968 topic: &'a str,
969 start: Option<(&'a Scru128Id, bool)>,
970 ) -> Box<dyn Iterator<Item = Frame> + 'a> {
971 let prefix = idx_topic_key_prefix(topic);
972 Box::new(self.idx_topic.prefix(prefix).filter_map(move |guard| {
973 let key = guard.key().ok()?;
974 let frame_id = idx_topic_frame_id_from_key(&key);
975 if let Some((bound_id, inclusive)) = start {
976 if inclusive {
977 if frame_id < *bound_id {
978 return None;
979 }
980 } else if frame_id <= *bound_id {
981 return None;
982 }
983 }
984 self.get(&frame_id)
985 }))
986 }
987
988 /// Iterate frames matching a topic prefix (for wildcard queries like "user.*").
989 /// The prefix should include the trailing dot (e.g., "user." for "user.*").
990 fn iter_frames_by_topic_prefix<'a>(
991 &'a self,
992 prefix: &'a str,
993 start: Option<(&'a Scru128Id, bool)>,
994 ) -> Box<dyn Iterator<Item = Frame> + 'a> {
995 // Build index prefix: "user.\0" for scanning all "user.*" entries
996 let mut index_prefix = Vec::with_capacity(prefix.len() + 1);
997 index_prefix.extend(prefix.as_bytes());
998 index_prefix.push(NULL_DELIMITER);
999
1000 Box::new(
1001 self.idx_topic
1002 .prefix(index_prefix)
1003 .filter_map(move |guard| {
1004 let key = guard.key().ok()?;
1005 let frame_id = idx_topic_frame_id_from_key(&key);
1006 if let Some((bound_id, inclusive)) = start {
1007 if inclusive {
1008 if frame_id < *bound_id {
1009 return None;
1010 }
1011 } else if frame_id <= *bound_id {
1012 return None;
1013 }
1014 }
1015 self.get(&frame_id)
1016 }),
1017 )
1018 }
1019}
1020
1021fn spawn_gc_worker(mut gc_rx: UnboundedReceiver<GCTask>, store: Store) {
1022 std::thread::spawn(move || {
1023 while let Some(task) = gc_rx.blocking_recv() {
1024 match task {
1025 GCTask::Remove(id) => {
1026 let _ = store.remove(&id);
1027 }
1028
1029 GCTask::CheckLastTTL { topic, keep } => {
1030 let prefix = idx_topic_key_prefix(&topic);
1031 let frames_to_remove: Vec<_> = store
1032 .idx_topic
1033 .prefix(&prefix)
1034 .rev() // Scan from newest to oldest
1035 .skip(keep as usize)
1036 .filter_map(|guard| {
1037 let key = guard.key().ok()?;
1038 Some(Scru128Id::from_bytes(
1039 idx_topic_frame_id_from_key(&key).into(),
1040 ))
1041 })
1042 .collect();
1043
1044 for frame_id in frames_to_remove {
1045 let _ = store.remove(&frame_id);
1046 }
1047 }
1048
1049 GCTask::Drain(tx) => {
1050 let _ = tx.send(());
1051 }
1052 }
1053 }
1054 });
1055}
1056
1057fn is_expired(id: &Scru128Id, ttl: &Duration) -> bool {
1058 let created_ms = id.timestamp();
1059 let expires_ms = created_ms.saturating_add(ttl.as_millis() as u64);
1060 let now_ms = std::time::SystemTime::now()
1061 .duration_since(std::time::UNIX_EPOCH)
1062 .unwrap()
1063 .as_millis() as u64;
1064
1065 now_ms >= expires_ms
1066}
1067
1068const NULL_DELIMITER: u8 = 0;
1069const MAX_TOPIC_LENGTH: usize = 255;
1070
1071/// Validate a frame topic (per ADR 0001).
1072///
1073/// A topic must be non-empty and at most 255 bytes, start with an ASCII letter
1074/// or `_`, and contain only `a-z A-Z 0-9 _ - .`. It may not end with `.` or
1075/// contain consecutive dots. [`Store::append`] runs this automatically; call it
1076/// directly to validate user input before building a [`Frame`].
1077///
1078/// ```
1079/// use xs::store::validate_topic;
1080///
1081/// assert!(validate_topic("clip.add").is_ok());
1082/// assert!(validate_topic("clip.").is_err());
1083/// ```
1084pub fn validate_topic(topic: &str) -> Result<(), crate::error::Error> {
1085 if topic.is_empty() {
1086 return Err("Topic cannot be empty".to_string().into());
1087 }
1088 if topic.len() > MAX_TOPIC_LENGTH {
1089 return Err(format!("Topic exceeds max length of {MAX_TOPIC_LENGTH} bytes").into());
1090 }
1091 if topic.ends_with('.') {
1092 return Err("Topic cannot end with '.'".to_string().into());
1093 }
1094 if topic.contains("..") {
1095 return Err("Topic cannot contain consecutive dots".to_string().into());
1096 }
1097
1098 let bytes = topic.as_bytes();
1099 let first = bytes[0];
1100 if !first.is_ascii_alphabetic() && first != b'_' {
1101 return Err("Topic must start with a-z, A-Z, or _".to_string().into());
1102 }
1103
1104 for &b in bytes {
1105 if !b.is_ascii_alphanumeric() && b != b'_' && b != b'-' && b != b'.' {
1106 return Err(format!(
1107 "Topic contains invalid character: '{}'. Allowed: a-z A-Z 0-9 _ - .",
1108 b as char
1109 )
1110 .into());
1111 }
1112 }
1113
1114 Ok(())
1115}
1116
1117/// Validates a topic query (for --topic flag).
1118/// Allows wildcards: "*" (match all) or "prefix.*" (match children).
1119pub fn validate_topic_query(topic: &str) -> Result<(), crate::error::Error> {
1120 if topic == "*" {
1121 return Ok(());
1122 }
1123 if let Some(prefix) = topic.strip_suffix(".*") {
1124 // Validate the prefix part (e.g., "user" in "user.*")
1125 // Prefix can be empty edge case: ".*" is not valid
1126 if prefix.is_empty() {
1127 return Err("Wildcard '.*' requires a prefix".to_string().into());
1128 }
1129 validate_topic(prefix)
1130 } else {
1131 validate_topic(topic)
1132 }
1133}
1134
1135/// Generate prefix index keys for hierarchical topic queries.
1136/// For topic "user.id1.messages", returns keys for prefixes "user." and "user.id1."
1137fn idx_topic_prefix_keys(topic: &str, frame_id: &scru128::Scru128Id) -> Vec<Vec<u8>> {
1138 let mut keys = Vec::new();
1139 let mut pos = 0;
1140 while let Some(dot_pos) = topic[pos..].find('.') {
1141 let prefix = &topic[..pos + dot_pos + 1]; // include the dot
1142 let mut key = Vec::with_capacity(prefix.len() + 1 + 16);
1143 key.extend(prefix.as_bytes());
1144 key.push(NULL_DELIMITER);
1145 key.extend(frame_id.as_bytes());
1146 keys.push(key);
1147 pos += dot_pos + 1;
1148 }
1149 keys
1150}
1151
1152fn idx_topic_key_prefix(topic: &str) -> Vec<u8> {
1153 let mut v = Vec::with_capacity(topic.len() + 1); // topic bytes + delimiter
1154 v.extend(topic.as_bytes()); // topic string as UTF-8 bytes
1155 v.push(NULL_DELIMITER); // Delimiter for variable-sized keys
1156 v
1157}
1158
1159pub(crate) fn idx_topic_key_from_frame(frame: &Frame) -> Result<Vec<u8>, crate::error::Error> {
1160 validate_topic(&frame.topic)?;
1161 let mut v = idx_topic_key_prefix(&frame.topic);
1162 v.extend(frame.id.as_bytes());
1163 Ok(v)
1164}
1165
1166fn idx_topic_frame_id_from_key(key: &[u8]) -> Scru128Id {
1167 let frame_id_bytes = &key[key.len() - 16..];
1168 Scru128Id::from_bytes(frame_id_bytes.try_into().unwrap())
1169}
1170
1171fn deserialize_frame<B1: AsRef<[u8]> + std::fmt::Debug, B2: AsRef<[u8]>>(
1172 record: (B1, B2),
1173) -> Frame {
1174 serde_json::from_slice(record.1.as_ref()).unwrap_or_else(|e| {
1175 // Try to convert the key to a Scru128Id and print in a format that can be copied for deletion
1176 let key_bytes = record.0.as_ref();
1177 if key_bytes.len() == 16 {
1178 if let Ok(bytes) = key_bytes.try_into() {
1179 let id = Scru128Id::from_bytes(bytes);
1180 eprintln!("CORRUPTED_RECORD_ID: {id}");
1181 }
1182 }
1183 let key = std::str::from_utf8(record.0.as_ref()).unwrap();
1184 let value = std::str::from_utf8(record.1.as_ref()).unwrap();
1185 panic!("Failed to deserialize frame: {e} {key} {value}")
1186 })
1187}