Skip to main content

xs/store/
mod.rs

1mod ttl;
2pub use ttl::*;
3
4#[cfg(test)]
5mod tests;
6
7use std::ops::Bound;
8use std::path::PathBuf;
9use std::time::Duration;
10
11use tokio::sync::broadcast;
12use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender};
13
14use std::sync::{Arc, Mutex};
15
16use scru128::Scru128Id;
17
18use serde::{Deserialize, Deserializer, Serialize};
19
20use fjall::{
21    config::{BlockSizePolicy, HashRatioPolicy},
22    Database, Keyspace, KeyspaceCreateOptions, PersistMode,
23};
24
25#[derive(PartialEq, Eq, Serialize, Deserialize, Clone, Default, bon::Builder)]
26pub struct Frame {
27    #[builder(start_fn, into)]
28    pub topic: String,
29    #[builder(default)]
30    pub id: Scru128Id,
31    pub hash: Option<ssri::Integrity>,
32    pub meta: Option<serde_json::Value>,
33    pub ttl: Option<TTL>,
34}
35
36use std::fmt;
37
38impl fmt::Debug for Frame {
39    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
40        f.debug_struct("Frame")
41            .field("id", &format!("{id}", id = self.id))
42            .field("topic", &self.topic)
43            .field("hash", &self.hash.as_ref().map(|x| format!("{x}")))
44            .field("meta", &self.meta)
45            .field("ttl", &self.ttl)
46            .finish()
47    }
48}
49
50impl<'de> Deserialize<'de> for FollowOption {
51    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
52    where
53        D: Deserializer<'de>,
54    {
55        let s: String = Deserialize::deserialize(deserializer)?;
56        if s.is_empty() || s == "yes" {
57            Ok(FollowOption::On)
58        } else if let Ok(duration) = s.parse::<u64>() {
59            Ok(FollowOption::WithHeartbeat(Duration::from_millis(duration)))
60        } else {
61            match s.as_str() {
62                "true" => Ok(FollowOption::On),
63                "false" | "no" => Ok(FollowOption::Off),
64                _ => Err(serde::de::Error::custom("Invalid value for follow option")),
65            }
66        }
67    }
68}
69
70fn deserialize_bool<'de, D>(deserializer: D) -> Result<bool, D::Error>
71where
72    D: Deserializer<'de>,
73{
74    let s: String = Deserialize::deserialize(deserializer)?;
75    match s.as_str() {
76        "false" | "no" | "0" => Ok(false),
77        _ => Ok(true),
78    }
79}
80
81#[derive(PartialEq, Deserialize, Clone, Debug, Default, bon::Builder)]
82pub struct ReadOptions {
83    #[serde(default)]
84    #[builder(default)]
85    pub follow: FollowOption,
86    #[serde(default, deserialize_with = "deserialize_bool")]
87    #[builder(default)]
88    pub new: bool,
89    /// Start after this ID (exclusive)
90    #[serde(rename = "after")]
91    pub after: Option<Scru128Id>,
92    /// Start from this ID (inclusive)
93    pub from: Option<Scru128Id>,
94    pub limit: Option<usize>,
95    /// Return the last N frames (most recent)
96    pub last: Option<usize>,
97    pub topic: Option<String>,
98}
99
100impl ReadOptions {
101    pub fn from_query(query: Option<&str>) -> Result<Self, crate::error::Error> {
102        match query {
103            Some(q) => Ok(serde_urlencoded::from_str(q)?),
104            None => Ok(Self::default()),
105        }
106    }
107
108    pub fn to_query_string(&self) -> String {
109        let mut params = Vec::new();
110
111        // Add follow parameter with heartbeat if specified
112        match self.follow {
113            FollowOption::Off => {}
114            FollowOption::On => params.push(("follow", "true".to_string())),
115            FollowOption::WithHeartbeat(duration) => {
116                params.push(("follow", duration.as_millis().to_string()));
117            }
118        }
119
120        // Add new if true
121        if self.new {
122            params.push(("new", "true".to_string()));
123        }
124
125        // Add after if present
126        if let Some(after) = self.after {
127            params.push(("after", after.to_string()));
128        }
129
130        // Add from if present
131        if let Some(from) = self.from {
132            params.push(("from", from.to_string()));
133        }
134
135        // Add limit if present
136        if let Some(limit) = self.limit {
137            params.push(("limit", limit.to_string()));
138        }
139
140        // Add last if present
141        if let Some(last) = self.last {
142            params.push(("last", last.to_string()));
143        }
144
145        if let Some(topic) = &self.topic {
146            params.push(("topic", topic.clone()));
147        }
148
149        // Return empty string if no params
150        if params.is_empty() {
151            String::new()
152        } else {
153            url::form_urlencoded::Serializer::new(String::new())
154                .extend_pairs(params)
155                .finish()
156        }
157    }
158}
159
160#[derive(Default, PartialEq, Clone, Debug)]
161pub enum FollowOption {
162    #[default]
163    Off,
164    On,
165    WithHeartbeat(Duration),
166}
167
168#[derive(Debug)]
169enum GCTask {
170    Remove(Scru128Id),
171    CheckLastTTL { topic: String, keep: u32 },
172    Drain(tokio::sync::oneshot::Sender<()>),
173}
174
175#[derive(Clone)]
176pub struct Store {
177    pub path: PathBuf,
178    db: Database,
179    stream: Keyspace,
180    idx_topic: Keyspace,
181    broadcast_tx: broadcast::Sender<Frame>,
182    gc_tx: UnboundedSender<GCTask>,
183    append_lock: Arc<Mutex<()>>,
184}
185
186impl Store {
187    pub fn new(path: PathBuf) -> Store {
188        let db = Database::builder(path.join("fjall"))
189            .cache_size(32 * 1024 * 1024) // 32 MiB
190            .worker_threads(1)
191            .open()
192            .unwrap();
193
194        // Options for stream keyspace: point reads by frame ID
195        let stream_opts = || {
196            KeyspaceCreateOptions::default()
197                .max_memtable_size(8 * 1024 * 1024) // 8 MiB
198                .data_block_size_policy(BlockSizePolicy::all(16 * 1024)) // 16 KiB
199                .data_block_hash_ratio_policy(HashRatioPolicy::all(8.0))
200                .expect_point_read_hits(true)
201        };
202
203        // Options for idx_topic keyspace: prefix scans only
204        let idx_opts = || {
205            KeyspaceCreateOptions::default()
206                .max_memtable_size(8 * 1024 * 1024) // 8 MiB
207                .data_block_size_policy(BlockSizePolicy::all(16 * 1024)) // 16 KiB
208                .data_block_hash_ratio_policy(HashRatioPolicy::all(0.0)) // no point reads
209                .expect_point_read_hits(true)
210        };
211
212        let stream = db.keyspace("stream", stream_opts).unwrap();
213        let idx_topic = db.keyspace("idx_topic", idx_opts).unwrap();
214
215        let (broadcast_tx, _) = broadcast::channel(1024);
216        let (gc_tx, gc_rx) = mpsc::unbounded_channel();
217
218        let store = Store {
219            path: path.clone(),
220            db,
221            stream,
222            idx_topic,
223            broadcast_tx,
224            gc_tx,
225            append_lock: Arc::new(Mutex::new(())),
226        };
227
228        // Spawn gc worker thread
229        spawn_gc_worker(gc_rx, store.clone());
230
231        store
232    }
233
234    pub async fn wait_for_gc(&self) {
235        let (tx, rx) = tokio::sync::oneshot::channel();
236        let _ = self.gc_tx.send(GCTask::Drain(tx));
237        let _ = rx.await;
238    }
239
240    #[tracing::instrument(skip(self))]
241    pub async fn read(&self, options: ReadOptions) -> tokio::sync::mpsc::Receiver<Frame> {
242        let (tx, rx) = tokio::sync::mpsc::channel(100);
243
244        let should_follow = matches!(
245            options.follow,
246            FollowOption::On | FollowOption::WithHeartbeat(_)
247        );
248
249        // Only take broadcast subscription if following. We initate the subscription here to
250        // ensure we don't miss any messages between historical processing and starting the
251        // broadcast subscription.
252        let broadcast_rx = if should_follow {
253            Some(self.broadcast_tx.subscribe())
254        } else {
255            None
256        };
257
258        // Only create done channel if we're doing historical processing
259        let done_rx = if !options.new {
260            let (done_tx, done_rx) = tokio::sync::oneshot::channel();
261            let tx_clone = tx.clone();
262            let store = self.clone();
263            let options = options.clone();
264            let should_follow_clone = should_follow;
265            let gc_tx = self.gc_tx.clone();
266
267            // Spawn OS thread to handle historical events
268            std::thread::spawn(move || {
269                let mut last_id = None;
270                let mut count = 0;
271
272                // Handle --last N: get the N most recent frames
273                if let Some(last_n) = options.last {
274                    let iter: Box<dyn Iterator<Item = Frame>> = match options.topic.as_deref() {
275                        None | Some("*") => store.iter_frames_rev(),
276                        Some(topic) if topic.ends_with(".*") => {
277                            let prefix = &topic[..topic.len() - 1];
278                            store.iter_frames_by_topic_prefix_rev(prefix)
279                        }
280                        Some(topic) => store.iter_frames_by_topic_rev(topic),
281                    };
282
283                    // Collect last N frames (in reverse order), skipping expired
284                    let mut frames: Vec<Frame> = Vec::with_capacity(last_n);
285                    for frame in iter {
286                        if let Some(TTL::Time(ttl)) = frame.ttl.as_ref() {
287                            if is_expired(&frame.id, ttl) {
288                                let _ = gc_tx.send(GCTask::Remove(frame.id));
289                                continue;
290                            }
291                        }
292                        frames.push(frame);
293                        if frames.len() >= last_n {
294                            break;
295                        }
296                    }
297
298                    // Reverse to chronological order and send
299                    for frame in frames.into_iter().rev() {
300                        last_id = Some(frame.id);
301                        count += 1;
302                        if tx_clone.blocking_send(frame).is_err() {
303                            return;
304                        }
305                    }
306                } else {
307                    // Normal forward iteration
308                    // Determine start bound: from (inclusive) takes precedence over after (exclusive)
309                    let start_bound = options
310                        .from
311                        .as_ref()
312                        .map(|id| (id, true))
313                        .or_else(|| options.after.as_ref().map(|id| (id, false)));
314
315                    let iter: Box<dyn Iterator<Item = Frame>> = match options.topic.as_deref() {
316                        None | Some("*") => store.iter_frames(start_bound),
317                        Some(topic) if topic.ends_with(".*") => {
318                            // Wildcard: "user.*" -> prefix "user."
319                            let prefix = &topic[..topic.len() - 1]; // strip "*", keep "."
320                            store.iter_frames_by_topic_prefix(prefix, start_bound)
321                        }
322                        Some(topic) => store.iter_frames_by_topic(topic, start_bound),
323                    };
324
325                    for frame in iter {
326                        if let Some(TTL::Time(ttl)) = frame.ttl.as_ref() {
327                            if is_expired(&frame.id, ttl) {
328                                let _ = gc_tx.send(GCTask::Remove(frame.id));
329                                continue;
330                            }
331                        }
332
333                        last_id = Some(frame.id);
334
335                        if let Some(limit) = options.limit {
336                            if count >= limit {
337                                return; // Exit early if limit reached
338                            }
339                        }
340
341                        if tx_clone.blocking_send(frame).is_err() {
342                            return;
343                        }
344                        count += 1;
345                    }
346                }
347
348                // Send threshold message if following
349                if should_follow_clone {
350                    let threshold = Frame::builder("xs.threshold")
351                        .id(scru128::new())
352                        .ttl(TTL::Ephemeral)
353                        .build();
354                    if tx_clone.blocking_send(threshold).is_err() {
355                        return;
356                    }
357                }
358
359                // Signal completion with the last seen ID and count
360                let _ = done_tx.send((last_id, count));
361            });
362
363            Some(done_rx)
364        } else {
365            None
366        };
367
368        // Handle broadcast subscription and heartbeat
369        if let Some(broadcast_rx) = broadcast_rx {
370            {
371                let tx = tx.clone();
372                let limit = options.limit;
373
374                tokio::spawn(async move {
375                    // If we have a done_rx, wait for historical processing
376                    let (last_id, mut count) = match done_rx {
377                        Some(done_rx) => match done_rx.await {
378                            Ok((id, count)) => (id, count),
379                            Err(_) => return, // Historical processing failed/cancelled
380                        },
381                        None => (None, 0),
382                    };
383
384                    let mut broadcast_rx = broadcast_rx;
385                    while let Ok(frame) = broadcast_rx.recv().await {
386                        // Filter by topic (exact match or wildcard)
387                        match options.topic.as_deref() {
388                            None | Some("*") => {}
389                            Some(topic) if topic.ends_with(".*") => {
390                                let prefix = &topic[..topic.len() - 1]; // "user.*" -> "user."
391                                if !frame.topic.starts_with(prefix) {
392                                    continue;
393                                }
394                            }
395                            Some(topic) => {
396                                if frame.topic != topic {
397                                    continue;
398                                }
399                            }
400                        }
401
402                        // Skip if we've already seen this frame during historical scan
403                        if let Some(last_scanned_id) = last_id {
404                            if frame.id <= last_scanned_id {
405                                continue;
406                            }
407                        }
408
409                        if tx.send(frame).await.is_err() {
410                            break;
411                        }
412
413                        if let Some(limit) = limit {
414                            count += 1;
415                            if count >= limit {
416                                break;
417                            }
418                        }
419                    }
420                });
421            }
422
423            // Handle heartbeat if requested
424            if let FollowOption::WithHeartbeat(duration) = options.follow {
425                let heartbeat_tx = tx;
426                tokio::spawn(async move {
427                    loop {
428                        tokio::time::sleep(duration).await;
429                        let frame = Frame::builder("xs.pulse")
430                            .id(scru128::new())
431                            .ttl(TTL::Ephemeral)
432                            .build();
433                        if heartbeat_tx.send(frame).await.is_err() {
434                            break;
435                        }
436                    }
437                });
438            }
439        }
440
441        rx
442    }
443
444    #[tracing::instrument(skip(self))]
445    pub fn read_sync(
446        &self,
447        after: Option<&Scru128Id>,
448        limit: Option<usize>,
449    ) -> impl Iterator<Item = Frame> + '_ {
450        self.iter_frames(after.map(|id| (id, false)))
451            .filter(move |frame| {
452                if let Some(TTL::Time(ttl)) = frame.ttl.as_ref() {
453                    if is_expired(&frame.id, ttl) {
454                        let _ = self.gc_tx.send(GCTask::Remove(frame.id));
455                        return false;
456                    }
457                }
458                true
459            })
460            .take(limit.unwrap_or(usize::MAX))
461    }
462
463    /// Read frames synchronously with full ReadOptions support.
464    pub fn read_sync_with_options(&self, options: ReadOptions) -> impl Iterator<Item = Frame> + '_ {
465        let gc_tx = self.gc_tx.clone();
466
467        // Filter out expired frames
468        let filter_expired = move |frame: Frame, gc_tx: &UnboundedSender<GCTask>| {
469            if let Some(TTL::Time(ttl)) = frame.ttl.as_ref() {
470                if is_expired(&frame.id, ttl) {
471                    let _ = gc_tx.send(GCTask::Remove(frame.id));
472                    return None;
473                }
474            }
475            Some(frame)
476        };
477
478        let frames: Vec<Frame> = if let Some(last_n) = options.last {
479            // Handle --last N: get the N most recent frames
480            let iter: Box<dyn Iterator<Item = Frame>> = match options.topic.as_deref() {
481                None | Some("*") => self.iter_frames_rev(),
482                Some(topic) if topic.ends_with(".*") => {
483                    let prefix = &topic[..topic.len() - 1];
484                    self.iter_frames_by_topic_prefix_rev(prefix)
485                }
486                Some(topic) => self.iter_frames_by_topic_rev(topic),
487            };
488
489            // Collect last N frames (in reverse order), skipping expired
490            let mut frames: Vec<Frame> = Vec::with_capacity(last_n);
491            for frame in iter {
492                if let Some(frame) = filter_expired(frame, &gc_tx) {
493                    frames.push(frame);
494                    if frames.len() >= last_n {
495                        break;
496                    }
497                }
498            }
499
500            // Reverse to chronological order
501            frames.reverse();
502            frames
503        } else {
504            // Normal forward iteration
505            let start_bound = options
506                .from
507                .as_ref()
508                .map(|id| (id, true))
509                .or_else(|| options.after.as_ref().map(|id| (id, false)));
510
511            let iter: Box<dyn Iterator<Item = Frame>> = match options.topic.as_deref() {
512                None | Some("*") => self.iter_frames(start_bound),
513                Some(topic) if topic.ends_with(".*") => {
514                    let prefix = &topic[..topic.len() - 1];
515                    self.iter_frames_by_topic_prefix(prefix, start_bound)
516                }
517                Some(topic) => self.iter_frames_by_topic(topic, start_bound),
518            };
519
520            iter.filter_map(|frame| filter_expired(frame, &gc_tx))
521                .take(options.limit.unwrap_or(usize::MAX))
522                .collect()
523        };
524
525        frames.into_iter()
526    }
527
528    pub fn get(&self, id: &Scru128Id) -> Option<Frame> {
529        self.stream
530            .get(id.to_bytes())
531            .unwrap()
532            .map(|value| deserialize_frame((id.as_bytes(), value)))
533    }
534
535    #[tracing::instrument(skip(self))]
536    pub fn last(&self, topic: &str) -> Option<Frame> {
537        if topic == "*" {
538            // All topics: scan all frames and find the one with the highest ID
539            self.idx_topic.iter().rev().find_map(|guard| {
540                let key = guard.key().ok()?;
541                self.get(&idx_topic_frame_id_from_key(&key))
542            })
543        } else if let Some(prefix) = topic.strip_suffix(".*") {
544            // Wildcard: find last frame matching prefix
545            let prefix_with_dot = format!("{}.", prefix);
546            self.iter_frames_by_topic_prefix(&prefix_with_dot, None)
547                .max_by_key(|f| f.id)
548        } else {
549            // Exact topic match
550            self.idx_topic
551                .prefix(idx_topic_key_prefix(topic))
552                .rev()
553                .find_map(|guard| {
554                    let key = guard.key().ok()?;
555                    self.get(&idx_topic_frame_id_from_key(&key))
556                })
557        }
558    }
559
560    #[tracing::instrument(skip(self), fields(id = %id.to_string()))]
561    pub fn remove(&self, id: &Scru128Id) -> Result<(), crate::error::Error> {
562        let Some(frame) = self.get(id) else {
563            // Already deleted
564            return Ok(());
565        };
566
567        // Build topic key directly (no validation - frame already exists)
568        let mut topic_key = idx_topic_key_prefix(&frame.topic);
569        topic_key.extend(frame.id.as_bytes());
570
571        // Get prefix index keys for hierarchical queries
572        let prefix_keys = idx_topic_prefix_keys(&frame.topic, &frame.id);
573
574        let mut batch = self.db.batch();
575        batch.remove(&self.stream, id.as_bytes());
576        batch.remove(&self.idx_topic, topic_key);
577        for prefix_key in &prefix_keys {
578            batch.remove(&self.idx_topic, prefix_key);
579        }
580        batch.commit()?;
581        self.db.persist(PersistMode::SyncAll)?;
582        Ok(())
583    }
584
585    pub async fn cas_reader(&self, hash: ssri::Integrity) -> cacache::Result<cacache::Reader> {
586        cacache::Reader::open_hash(&self.path.join("cacache"), hash).await
587    }
588
589    pub fn cas_reader_sync(&self, hash: ssri::Integrity) -> cacache::Result<cacache::SyncReader> {
590        cacache::SyncReader::open_hash(self.path.join("cacache"), hash)
591    }
592
593    pub async fn cas_writer(&self) -> cacache::Result<cacache::Writer> {
594        cacache::WriteOpts::new()
595            .open_hash(&self.path.join("cacache"))
596            .await
597    }
598
599    pub fn cas_writer_sync(&self) -> cacache::Result<cacache::SyncWriter> {
600        cacache::WriteOpts::new().open_hash_sync(self.path.join("cacache"))
601    }
602
603    pub async fn cas_insert(&self, content: impl AsRef<[u8]>) -> cacache::Result<ssri::Integrity> {
604        cacache::write_hash(&self.path.join("cacache"), content).await
605    }
606
607    pub fn cas_insert_sync(&self, content: impl AsRef<[u8]>) -> cacache::Result<ssri::Integrity> {
608        cacache::write_hash_sync(self.path.join("cacache"), content)
609    }
610
611    pub async fn cas_insert_bytes(&self, bytes: &[u8]) -> cacache::Result<ssri::Integrity> {
612        self.cas_insert(bytes).await
613    }
614
615    pub fn cas_insert_bytes_sync(&self, bytes: &[u8]) -> cacache::Result<ssri::Integrity> {
616        self.cas_insert_sync(bytes)
617    }
618
619    pub async fn cas_read(&self, hash: &ssri::Integrity) -> cacache::Result<Vec<u8>> {
620        cacache::read_hash(&self.path.join("cacache"), hash).await
621    }
622
623    pub fn cas_read_sync(&self, hash: &ssri::Integrity) -> cacache::Result<Vec<u8>> {
624        cacache::read_hash_sync(self.path.join("cacache"), hash)
625    }
626
627    #[tracing::instrument(skip(self))]
628    pub fn insert_frame(&self, frame: &Frame) -> Result<(), crate::error::Error> {
629        let encoded: Vec<u8> = serde_json::to_vec(&frame).unwrap();
630
631        // Get the index topic key (also validates topic)
632        let topic_key = idx_topic_key_from_frame(frame)?;
633
634        // Get prefix index keys for hierarchical queries
635        let prefix_keys = idx_topic_prefix_keys(&frame.topic, &frame.id);
636
637        let mut batch = self.db.batch();
638        batch.insert(&self.stream, frame.id.as_bytes(), encoded);
639        batch.insert(&self.idx_topic, topic_key, b"");
640        for prefix_key in &prefix_keys {
641            batch.insert(&self.idx_topic, prefix_key, b"");
642        }
643        batch.commit()?;
644        self.db.persist(PersistMode::SyncAll)?;
645        Ok(())
646    }
647
648    pub fn append(&self, mut frame: Frame) -> Result<Frame, crate::error::Error> {
649        // Serialize all appends to ensure ID generation, write, and broadcast
650        // happen atomically. This guarantees subscribers receive frames in
651        // scru128 ID order.
652        let _guard = self.append_lock.lock().unwrap();
653
654        frame.id = scru128::new();
655
656        // Check for null byte in topic (in case we're not storing the frame)
657        idx_topic_key_from_frame(&frame)?;
658
659        // only store the frame if it's not ephemeral
660        if frame.ttl != Some(TTL::Ephemeral) {
661            self.insert_frame(&frame)?;
662
663            // If this is a Last TTL, schedule a gc task
664            if let Some(TTL::Last(n)) = frame.ttl {
665                let _ = self.gc_tx.send(GCTask::CheckLastTTL {
666                    topic: frame.topic.clone(),
667                    keep: n,
668                });
669            }
670        }
671
672        let _ = self.broadcast_tx.send(frame.clone());
673        Ok(frame)
674    }
675
676    /// Iterate frames starting from a bound.
677    /// `start` is `(id, inclusive)` where inclusive=true means >= and inclusive=false means >.
678    fn iter_frames(
679        &self,
680        start: Option<(&Scru128Id, bool)>,
681    ) -> Box<dyn Iterator<Item = Frame> + '_> {
682        let range = match start {
683            Some((id, true)) => (Bound::Included(id.as_bytes().to_vec()), Bound::Unbounded),
684            Some((id, false)) => (Bound::Excluded(id.as_bytes().to_vec()), Bound::Unbounded),
685            None => (Bound::Unbounded, Bound::Unbounded),
686        };
687
688        Box::new(self.stream.range(range).filter_map(|guard| {
689            let (key, value) = guard.into_inner().ok()?;
690            Some(deserialize_frame((key, value)))
691        }))
692    }
693
694    /// Iterate frames in reverse order (most recent first).
695    fn iter_frames_rev(&self) -> Box<dyn Iterator<Item = Frame> + '_> {
696        Box::new(self.stream.iter().rev().filter_map(|guard| {
697            let (key, value) = guard.into_inner().ok()?;
698            Some(deserialize_frame((key, value)))
699        }))
700    }
701
702    /// Iterate frames by topic in reverse order (most recent first).
703    fn iter_frames_by_topic_rev<'a>(
704        &'a self,
705        topic: &'a str,
706    ) -> Box<dyn Iterator<Item = Frame> + 'a> {
707        let prefix = idx_topic_key_prefix(topic);
708        Box::new(
709            self.idx_topic
710                .prefix(prefix)
711                .rev()
712                .filter_map(move |guard| {
713                    let key = guard.key().ok()?;
714                    let frame_id = idx_topic_frame_id_from_key(&key);
715                    self.get(&frame_id)
716                }),
717        )
718    }
719
720    /// Iterate frames by topic prefix in reverse order (most recent first).
721    fn iter_frames_by_topic_prefix_rev<'a>(
722        &'a self,
723        prefix: &'a str,
724    ) -> Box<dyn Iterator<Item = Frame> + 'a> {
725        let mut index_prefix = Vec::with_capacity(prefix.len() + 1);
726        index_prefix.extend(prefix.as_bytes());
727        index_prefix.push(NULL_DELIMITER);
728
729        Box::new(
730            self.idx_topic
731                .prefix(index_prefix)
732                .rev()
733                .filter_map(move |guard| {
734                    let key = guard.key().ok()?;
735                    let frame_id = idx_topic_frame_id_from_key(&key);
736                    self.get(&frame_id)
737                }),
738        )
739    }
740
741    fn iter_frames_by_topic<'a>(
742        &'a self,
743        topic: &'a str,
744        start: Option<(&'a Scru128Id, bool)>,
745    ) -> Box<dyn Iterator<Item = Frame> + 'a> {
746        let prefix = idx_topic_key_prefix(topic);
747        Box::new(self.idx_topic.prefix(prefix).filter_map(move |guard| {
748            let key = guard.key().ok()?;
749            let frame_id = idx_topic_frame_id_from_key(&key);
750            if let Some((bound_id, inclusive)) = start {
751                if inclusive {
752                    if frame_id < *bound_id {
753                        return None;
754                    }
755                } else if frame_id <= *bound_id {
756                    return None;
757                }
758            }
759            self.get(&frame_id)
760        }))
761    }
762
763    /// Iterate frames matching a topic prefix (for wildcard queries like "user.*").
764    /// The prefix should include the trailing dot (e.g., "user." for "user.*").
765    fn iter_frames_by_topic_prefix<'a>(
766        &'a self,
767        prefix: &'a str,
768        start: Option<(&'a Scru128Id, bool)>,
769    ) -> Box<dyn Iterator<Item = Frame> + 'a> {
770        // Build index prefix: "user.\0" for scanning all "user.*" entries
771        let mut index_prefix = Vec::with_capacity(prefix.len() + 1);
772        index_prefix.extend(prefix.as_bytes());
773        index_prefix.push(NULL_DELIMITER);
774
775        Box::new(
776            self.idx_topic
777                .prefix(index_prefix)
778                .filter_map(move |guard| {
779                    let key = guard.key().ok()?;
780                    let frame_id = idx_topic_frame_id_from_key(&key);
781                    if let Some((bound_id, inclusive)) = start {
782                        if inclusive {
783                            if frame_id < *bound_id {
784                                return None;
785                            }
786                        } else if frame_id <= *bound_id {
787                            return None;
788                        }
789                    }
790                    self.get(&frame_id)
791                }),
792        )
793    }
794}
795
796fn spawn_gc_worker(mut gc_rx: UnboundedReceiver<GCTask>, store: Store) {
797    std::thread::spawn(move || {
798        while let Some(task) = gc_rx.blocking_recv() {
799            match task {
800                GCTask::Remove(id) => {
801                    let _ = store.remove(&id);
802                }
803
804                GCTask::CheckLastTTL { topic, keep } => {
805                    let prefix = idx_topic_key_prefix(&topic);
806                    let frames_to_remove: Vec<_> = store
807                        .idx_topic
808                        .prefix(&prefix)
809                        .rev() // Scan from newest to oldest
810                        .skip(keep as usize)
811                        .filter_map(|guard| {
812                            let key = guard.key().ok()?;
813                            Some(Scru128Id::from_bytes(
814                                idx_topic_frame_id_from_key(&key).into(),
815                            ))
816                        })
817                        .collect();
818
819                    for frame_id in frames_to_remove {
820                        let _ = store.remove(&frame_id);
821                    }
822                }
823
824                GCTask::Drain(tx) => {
825                    let _ = tx.send(());
826                }
827            }
828        }
829    });
830}
831
832fn is_expired(id: &Scru128Id, ttl: &Duration) -> bool {
833    let created_ms = id.timestamp();
834    let expires_ms = created_ms.saturating_add(ttl.as_millis() as u64);
835    let now_ms = std::time::SystemTime::now()
836        .duration_since(std::time::UNIX_EPOCH)
837        .unwrap()
838        .as_millis() as u64;
839
840    now_ms >= expires_ms
841}
842
843const NULL_DELIMITER: u8 = 0;
844const MAX_TOPIC_LENGTH: usize = 255;
845
846/// Validates a topic name according to ADR 0001.
847/// - Allowed characters: a-z A-Z 0-9 _ - .
848/// - Must start with: a-z A-Z 0-9 _
849/// - Cannot be empty, cannot end with '.', max 255 bytes
850pub fn validate_topic(topic: &str) -> Result<(), crate::error::Error> {
851    if topic.is_empty() {
852        return Err("Topic cannot be empty".to_string().into());
853    }
854    if topic.len() > MAX_TOPIC_LENGTH {
855        return Err(format!("Topic exceeds max length of {MAX_TOPIC_LENGTH} bytes").into());
856    }
857    if topic.ends_with('.') {
858        return Err("Topic cannot end with '.'".to_string().into());
859    }
860    if topic.contains("..") {
861        return Err("Topic cannot contain consecutive dots".to_string().into());
862    }
863
864    let bytes = topic.as_bytes();
865    let first = bytes[0];
866    if !first.is_ascii_alphanumeric() && first != b'_' {
867        return Err("Topic must start with a-z, A-Z, 0-9, or _"
868            .to_string()
869            .into());
870    }
871
872    for &b in bytes {
873        if !b.is_ascii_alphanumeric() && b != b'_' && b != b'-' && b != b'.' {
874            return Err(format!(
875                "Topic contains invalid character: '{}'. Allowed: a-z A-Z 0-9 _ - .",
876                b as char
877            )
878            .into());
879        }
880    }
881
882    Ok(())
883}
884
885/// Validates a topic query (for --topic flag).
886/// Allows wildcards: "*" (match all) or "prefix.*" (match children).
887pub fn validate_topic_query(topic: &str) -> Result<(), crate::error::Error> {
888    if topic == "*" {
889        return Ok(());
890    }
891    if let Some(prefix) = topic.strip_suffix(".*") {
892        // Validate the prefix part (e.g., "user" in "user.*")
893        // Prefix can be empty edge case: ".*" is not valid
894        if prefix.is_empty() {
895            return Err("Wildcard '.*' requires a prefix".to_string().into());
896        }
897        validate_topic(prefix)
898    } else {
899        validate_topic(topic)
900    }
901}
902
903/// Generate prefix index keys for hierarchical topic queries.
904/// For topic "user.id1.messages", returns keys for prefixes "user." and "user.id1."
905fn idx_topic_prefix_keys(topic: &str, frame_id: &scru128::Scru128Id) -> Vec<Vec<u8>> {
906    let mut keys = Vec::new();
907    let mut pos = 0;
908    while let Some(dot_pos) = topic[pos..].find('.') {
909        let prefix = &topic[..pos + dot_pos + 1]; // include the dot
910        let mut key = Vec::with_capacity(prefix.len() + 1 + 16);
911        key.extend(prefix.as_bytes());
912        key.push(NULL_DELIMITER);
913        key.extend(frame_id.as_bytes());
914        keys.push(key);
915        pos += dot_pos + 1;
916    }
917    keys
918}
919
920fn idx_topic_key_prefix(topic: &str) -> Vec<u8> {
921    let mut v = Vec::with_capacity(topic.len() + 1); // topic bytes + delimiter
922    v.extend(topic.as_bytes()); // topic string as UTF-8 bytes
923    v.push(NULL_DELIMITER); // Delimiter for variable-sized keys
924    v
925}
926
927pub(crate) fn idx_topic_key_from_frame(frame: &Frame) -> Result<Vec<u8>, crate::error::Error> {
928    validate_topic(&frame.topic)?;
929    let mut v = idx_topic_key_prefix(&frame.topic);
930    v.extend(frame.id.as_bytes());
931    Ok(v)
932}
933
934fn idx_topic_frame_id_from_key(key: &[u8]) -> Scru128Id {
935    let frame_id_bytes = &key[key.len() - 16..];
936    Scru128Id::from_bytes(frame_id_bytes.try_into().unwrap())
937}
938
939fn deserialize_frame<B1: AsRef<[u8]> + std::fmt::Debug, B2: AsRef<[u8]>>(
940    record: (B1, B2),
941) -> Frame {
942    serde_json::from_slice(record.1.as_ref()).unwrap_or_else(|e| {
943        // Try to convert the key to a Scru128Id and print in a format that can be copied for deletion
944        let key_bytes = record.0.as_ref();
945        if key_bytes.len() == 16 {
946            if let Ok(bytes) = key_bytes.try_into() {
947                let id = Scru128Id::from_bytes(bytes);
948                eprintln!("CORRUPTED_RECORD_ID: {id}");
949            }
950        }
951        let key = std::str::from_utf8(record.0.as_ref()).unwrap();
952        let value = std::str::from_utf8(record.1.as_ref()).unwrap();
953        panic!("Failed to deserialize frame: {e} {key} {value}")
954    })
955}