Skip to main content

xs/store/
mod.rs

1mod ttl;
2pub use ttl::*;
3
4#[cfg(test)]
5mod tests;
6
7use std::ops::Bound;
8use std::path::PathBuf;
9use std::time::Duration;
10
11use tokio::sync::broadcast;
12use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender};
13
14use std::sync::{Arc, Mutex};
15
16use scru128::Scru128Id;
17
18use serde::{Deserialize, Deserializer, Serialize};
19
20use fjall::{
21    config::{BlockSizePolicy, HashRatioPolicy},
22    Database, Keyspace, KeyspaceCreateOptions, PersistMode,
23};
24
25#[derive(PartialEq, Eq, Serialize, Deserialize, Clone, Default, bon::Builder)]
26pub struct Frame {
27    #[builder(start_fn, into)]
28    pub topic: String,
29    #[builder(default)]
30    pub id: Scru128Id,
31    pub hash: Option<ssri::Integrity>,
32    pub meta: Option<serde_json::Value>,
33    pub ttl: Option<TTL>,
34}
35
36use std::fmt;
37
38impl fmt::Debug for Frame {
39    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
40        f.debug_struct("Frame")
41            .field("id", &format!("{id}", id = self.id))
42            .field("topic", &self.topic)
43            .field("hash", &self.hash.as_ref().map(|x| format!("{x}")))
44            .field("meta", &self.meta)
45            .field("ttl", &self.ttl)
46            .finish()
47    }
48}
49
50impl<'de> Deserialize<'de> for FollowOption {
51    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
52    where
53        D: Deserializer<'de>,
54    {
55        let s: String = Deserialize::deserialize(deserializer)?;
56        if s.is_empty() || s == "yes" {
57            Ok(FollowOption::On)
58        } else if let Ok(duration) = s.parse::<u64>() {
59            Ok(FollowOption::WithHeartbeat(Duration::from_millis(duration)))
60        } else {
61            match s.as_str() {
62                "true" => Ok(FollowOption::On),
63                "false" | "no" => Ok(FollowOption::Off),
64                _ => Err(serde::de::Error::custom("Invalid value for follow option")),
65            }
66        }
67    }
68}
69
70fn deserialize_bool<'de, D>(deserializer: D) -> Result<bool, D::Error>
71where
72    D: Deserializer<'de>,
73{
74    let s: String = Deserialize::deserialize(deserializer)?;
75    match s.as_str() {
76        "false" | "no" | "0" => Ok(false),
77        _ => Ok(true),
78    }
79}
80
81#[derive(PartialEq, Deserialize, Clone, Debug, Default, bon::Builder)]
82pub struct ReadOptions {
83    #[serde(default)]
84    #[builder(default)]
85    pub follow: FollowOption,
86    #[serde(default, deserialize_with = "deserialize_bool")]
87    #[builder(default)]
88    pub new: bool,
89    /// Start after this ID (exclusive)
90    #[serde(rename = "after")]
91    pub after: Option<Scru128Id>,
92    /// Start from this ID (inclusive)
93    pub from: Option<Scru128Id>,
94    pub limit: Option<usize>,
95    /// Return the last N frames (most recent)
96    pub last: Option<usize>,
97    pub topic: Option<String>,
98}
99
100impl ReadOptions {
101    pub fn from_query(query: Option<&str>) -> Result<Self, crate::error::Error> {
102        match query {
103            Some(q) => Ok(serde_urlencoded::from_str(q)?),
104            None => Ok(Self::default()),
105        }
106    }
107
108    pub fn to_query_string(&self) -> String {
109        let mut params = Vec::new();
110
111        // Add follow parameter with heartbeat if specified
112        match self.follow {
113            FollowOption::Off => {}
114            FollowOption::On => params.push(("follow", "true".to_string())),
115            FollowOption::WithHeartbeat(duration) => {
116                params.push(("follow", duration.as_millis().to_string()));
117            }
118        }
119
120        // Add new if true
121        if self.new {
122            params.push(("new", "true".to_string()));
123        }
124
125        // Add after if present
126        if let Some(after) = self.after {
127            params.push(("after", after.to_string()));
128        }
129
130        // Add from if present
131        if let Some(from) = self.from {
132            params.push(("from", from.to_string()));
133        }
134
135        // Add limit if present
136        if let Some(limit) = self.limit {
137            params.push(("limit", limit.to_string()));
138        }
139
140        // Add last if present
141        if let Some(last) = self.last {
142            params.push(("last", last.to_string()));
143        }
144
145        if let Some(topic) = &self.topic {
146            params.push(("topic", topic.clone()));
147        }
148
149        // Return empty string if no params
150        if params.is_empty() {
151            String::new()
152        } else {
153            url::form_urlencoded::Serializer::new(String::new())
154                .extend_pairs(params)
155                .finish()
156        }
157    }
158}
159
160#[derive(Default, PartialEq, Clone, Debug)]
161pub enum FollowOption {
162    #[default]
163    Off,
164    On,
165    WithHeartbeat(Duration),
166}
167
168#[derive(Debug)]
169enum GCTask {
170    Remove(Scru128Id),
171    CheckLastTTL { topic: String, keep: u32 },
172    Drain(tokio::sync::oneshot::Sender<()>),
173}
174
175#[derive(Clone)]
176pub struct Store {
177    pub path: PathBuf,
178    db: Database,
179    stream: Keyspace,
180    idx_topic: Keyspace,
181    broadcast_tx: broadcast::Sender<Frame>,
182    gc_tx: UnboundedSender<GCTask>,
183    append_lock: Arc<Mutex<()>>,
184}
185
186impl Store {
187    pub fn new(path: PathBuf) -> Store {
188        let db = Database::builder(path.join("fjall"))
189            .cache_size(32 * 1024 * 1024) // 32 MiB
190            .worker_threads(1)
191            .open()
192            .unwrap();
193
194        // Options for stream keyspace: point reads by frame ID
195        let stream_opts = || {
196            KeyspaceCreateOptions::default()
197                .max_memtable_size(8 * 1024 * 1024) // 8 MiB
198                .data_block_size_policy(BlockSizePolicy::all(16 * 1024)) // 16 KiB
199                .data_block_hash_ratio_policy(HashRatioPolicy::all(8.0))
200                .expect_point_read_hits(true)
201        };
202
203        // Options for idx_topic keyspace: prefix scans only
204        let idx_opts = || {
205            KeyspaceCreateOptions::default()
206                .max_memtable_size(8 * 1024 * 1024) // 8 MiB
207                .data_block_size_policy(BlockSizePolicy::all(16 * 1024)) // 16 KiB
208                .data_block_hash_ratio_policy(HashRatioPolicy::all(0.0)) // no point reads
209                .expect_point_read_hits(true)
210        };
211
212        let stream = db.keyspace("stream", stream_opts).unwrap();
213        let idx_topic = db.keyspace("idx_topic", idx_opts).unwrap();
214
215        let (broadcast_tx, _) = broadcast::channel(1024);
216        let (gc_tx, gc_rx) = mpsc::unbounded_channel();
217
218        let store = Store {
219            path: path.clone(),
220            db,
221            stream,
222            idx_topic,
223            broadcast_tx,
224            gc_tx,
225            append_lock: Arc::new(Mutex::new(())),
226        };
227
228        // Spawn gc worker thread
229        spawn_gc_worker(gc_rx, store.clone());
230
231        store
232    }
233
234    pub async fn wait_for_gc(&self) {
235        let (tx, rx) = tokio::sync::oneshot::channel();
236        let _ = self.gc_tx.send(GCTask::Drain(tx));
237        let _ = rx.await;
238    }
239
240    #[tracing::instrument(skip(self))]
241    pub async fn read(&self, options: ReadOptions) -> tokio::sync::mpsc::Receiver<Frame> {
242        let (tx, rx) = tokio::sync::mpsc::channel(100);
243
244        let should_follow = matches!(
245            options.follow,
246            FollowOption::On | FollowOption::WithHeartbeat(_)
247        );
248
249        // Only take broadcast subscription if following. We initate the subscription here to
250        // ensure we don't miss any messages between historical processing and starting the
251        // broadcast subscription.
252        let broadcast_rx = if should_follow {
253            Some(self.broadcast_tx.subscribe())
254        } else {
255            None
256        };
257
258        // Only create done channel if we're doing historical processing
259        let done_rx = if !options.new {
260            let (done_tx, done_rx) = tokio::sync::oneshot::channel();
261            let tx_clone = tx.clone();
262            let store = self.clone();
263            let options = options.clone();
264            let should_follow_clone = should_follow;
265            let gc_tx = self.gc_tx.clone();
266
267            // Spawn OS thread to handle historical events
268            std::thread::spawn(move || {
269                let mut last_id = None;
270                let mut count = 0;
271
272                // Handle --last N: get the N most recent frames
273                if let Some(last_n) = options.last {
274                    let iter: Box<dyn Iterator<Item = Frame>> = match options.topic.as_deref() {
275                        None | Some("*") => store.iter_frames_rev(),
276                        Some(topic) if topic.ends_with(".*") => {
277                            let prefix = &topic[..topic.len() - 1];
278                            store.iter_frames_by_topic_prefix_rev(prefix)
279                        }
280                        Some(topic) => store.iter_frames_by_topic_rev(topic),
281                    };
282
283                    // Collect last N frames (in reverse order), skipping expired
284                    let mut frames: Vec<Frame> = Vec::with_capacity(last_n);
285                    for frame in iter {
286                        if let Some(TTL::Time(ttl)) = frame.ttl.as_ref() {
287                            if is_expired(&frame.id, ttl) {
288                                let _ = gc_tx.send(GCTask::Remove(frame.id));
289                                continue;
290                            }
291                        }
292                        frames.push(frame);
293                        if frames.len() >= last_n {
294                            break;
295                        }
296                    }
297
298                    // Reverse to chronological order and send
299                    for frame in frames.into_iter().rev() {
300                        last_id = Some(frame.id);
301                        count += 1;
302                        if tx_clone.blocking_send(frame).is_err() {
303                            return;
304                        }
305                    }
306                } else {
307                    // Normal forward iteration
308                    // Determine start bound: from (inclusive) takes precedence over after (exclusive)
309                    let start_bound = options
310                        .from
311                        .as_ref()
312                        .map(|id| (id, true))
313                        .or_else(|| options.after.as_ref().map(|id| (id, false)));
314
315                    let iter: Box<dyn Iterator<Item = Frame>> = match options.topic.as_deref() {
316                        None | Some("*") => store.iter_frames(start_bound),
317                        Some(topic) if topic.ends_with(".*") => {
318                            // Wildcard: "user.*" -> prefix "user."
319                            let prefix = &topic[..topic.len() - 1]; // strip "*", keep "."
320                            store.iter_frames_by_topic_prefix(prefix, start_bound)
321                        }
322                        Some(topic) => store.iter_frames_by_topic(topic, start_bound),
323                    };
324
325                    for frame in iter {
326                        if let Some(TTL::Time(ttl)) = frame.ttl.as_ref() {
327                            if is_expired(&frame.id, ttl) {
328                                let _ = gc_tx.send(GCTask::Remove(frame.id));
329                                continue;
330                            }
331                        }
332
333                        last_id = Some(frame.id);
334
335                        if let Some(limit) = options.limit {
336                            if count >= limit {
337                                return; // Exit early if limit reached
338                            }
339                        }
340
341                        if tx_clone.blocking_send(frame).is_err() {
342                            return;
343                        }
344                        count += 1;
345                    }
346                }
347
348                // Send threshold message if following
349                if should_follow_clone {
350                    let threshold = Frame::builder("xs.threshold")
351                        .id(scru128::new())
352                        .ttl(TTL::Ephemeral)
353                        .build();
354                    if tx_clone.blocking_send(threshold).is_err() {
355                        return;
356                    }
357                }
358
359                // Signal completion with the last seen ID and count
360                let _ = done_tx.send((last_id, count));
361            });
362
363            Some(done_rx)
364        } else {
365            None
366        };
367
368        // Handle broadcast subscription and heartbeat
369        if let Some(broadcast_rx) = broadcast_rx {
370            {
371                let tx = tx.clone();
372                let limit = options.limit;
373
374                tokio::spawn(async move {
375                    // If we have a done_rx, wait for historical processing
376                    let (last_id, mut count) = match done_rx {
377                        Some(done_rx) => match done_rx.await {
378                            Ok((id, count)) => (id, count),
379                            Err(_) => return, // Historical processing failed/cancelled
380                        },
381                        None => (None, 0),
382                    };
383
384                    let mut broadcast_rx = broadcast_rx;
385                    while let Ok(frame) = broadcast_rx.recv().await {
386                        // Filter by topic (exact match or wildcard)
387                        match options.topic.as_deref() {
388                            None | Some("*") => {}
389                            Some(topic) if topic.ends_with(".*") => {
390                                let prefix = &topic[..topic.len() - 1]; // "user.*" -> "user."
391                                if !frame.topic.starts_with(prefix) {
392                                    continue;
393                                }
394                            }
395                            Some(topic) => {
396                                if frame.topic != topic {
397                                    continue;
398                                }
399                            }
400                        }
401
402                        // Skip if we've already seen this frame during historical scan
403                        if let Some(last_scanned_id) = last_id {
404                            if frame.id <= last_scanned_id {
405                                continue;
406                            }
407                        }
408
409                        if tx.send(frame).await.is_err() {
410                            break;
411                        }
412
413                        if let Some(limit) = limit {
414                            count += 1;
415                            if count >= limit {
416                                break;
417                            }
418                        }
419                    }
420                });
421            }
422
423            // Handle heartbeat if requested
424            if let FollowOption::WithHeartbeat(duration) = options.follow {
425                let heartbeat_tx = tx;
426                tokio::spawn(async move {
427                    loop {
428                        tokio::time::sleep(duration).await;
429                        let frame = Frame::builder("xs.pulse")
430                            .id(scru128::new())
431                            .ttl(TTL::Ephemeral)
432                            .build();
433                        if heartbeat_tx.send(frame).await.is_err() {
434                            break;
435                        }
436                    }
437                });
438            }
439        }
440
441        rx
442    }
443
444    /// Read frames synchronously with full ReadOptions support.
445    pub fn read_sync(&self, options: ReadOptions) -> impl Iterator<Item = Frame> + '_ {
446        let gc_tx = self.gc_tx.clone();
447
448        // Filter out expired frames
449        let filter_expired = move |frame: Frame, gc_tx: &UnboundedSender<GCTask>| {
450            if let Some(TTL::Time(ttl)) = frame.ttl.as_ref() {
451                if is_expired(&frame.id, ttl) {
452                    let _ = gc_tx.send(GCTask::Remove(frame.id));
453                    return None;
454                }
455            }
456            Some(frame)
457        };
458
459        let frames: Vec<Frame> = if let Some(last_n) = options.last {
460            // Handle --last N: get the N most recent frames
461            let iter: Box<dyn Iterator<Item = Frame>> = match options.topic.as_deref() {
462                None | Some("*") => self.iter_frames_rev(),
463                Some(topic) if topic.ends_with(".*") => {
464                    let prefix = &topic[..topic.len() - 1];
465                    self.iter_frames_by_topic_prefix_rev(prefix)
466                }
467                Some(topic) => self.iter_frames_by_topic_rev(topic),
468            };
469
470            // Collect last N frames (in reverse order), skipping expired
471            let mut frames: Vec<Frame> = Vec::with_capacity(last_n);
472            for frame in iter {
473                if let Some(frame) = filter_expired(frame, &gc_tx) {
474                    frames.push(frame);
475                    if frames.len() >= last_n {
476                        break;
477                    }
478                }
479            }
480
481            // Reverse to chronological order
482            frames.reverse();
483            frames
484        } else {
485            // Normal forward iteration
486            let start_bound = options
487                .from
488                .as_ref()
489                .map(|id| (id, true))
490                .or_else(|| options.after.as_ref().map(|id| (id, false)));
491
492            let iter: Box<dyn Iterator<Item = Frame>> = match options.topic.as_deref() {
493                None | Some("*") => self.iter_frames(start_bound),
494                Some(topic) if topic.ends_with(".*") => {
495                    let prefix = &topic[..topic.len() - 1];
496                    self.iter_frames_by_topic_prefix(prefix, start_bound)
497                }
498                Some(topic) => self.iter_frames_by_topic(topic, start_bound),
499            };
500
501            iter.filter_map(|frame| filter_expired(frame, &gc_tx))
502                .take(options.limit.unwrap_or(usize::MAX))
503                .collect()
504        };
505
506        frames.into_iter()
507    }
508
509    pub fn get(&self, id: &Scru128Id) -> Option<Frame> {
510        self.stream
511            .get(id.to_bytes())
512            .unwrap()
513            .map(|value| deserialize_frame((id.as_bytes(), value)))
514    }
515
516    #[tracing::instrument(skip(self), fields(id = %id.to_string()))]
517    pub fn remove(&self, id: &Scru128Id) -> Result<(), crate::error::Error> {
518        let Some(frame) = self.get(id) else {
519            // Already deleted
520            return Ok(());
521        };
522
523        // Build topic key directly (no validation - frame already exists)
524        let mut topic_key = idx_topic_key_prefix(&frame.topic);
525        topic_key.extend(frame.id.as_bytes());
526
527        // Get prefix index keys for hierarchical queries
528        let prefix_keys = idx_topic_prefix_keys(&frame.topic, &frame.id);
529
530        let mut batch = self.db.batch();
531        batch.remove(&self.stream, id.as_bytes());
532        batch.remove(&self.idx_topic, topic_key);
533        for prefix_key in &prefix_keys {
534            batch.remove(&self.idx_topic, prefix_key);
535        }
536        batch.commit()?;
537        self.db.persist(PersistMode::SyncAll)?;
538        Ok(())
539    }
540
541    pub async fn cas_reader(&self, hash: ssri::Integrity) -> cacache::Result<cacache::Reader> {
542        cacache::Reader::open_hash(&self.path.join("cacache"), hash).await
543    }
544
545    pub fn cas_reader_sync(&self, hash: ssri::Integrity) -> cacache::Result<cacache::SyncReader> {
546        cacache::SyncReader::open_hash(self.path.join("cacache"), hash)
547    }
548
549    pub async fn cas_writer(&self) -> cacache::Result<cacache::Writer> {
550        cacache::WriteOpts::new()
551            .open_hash(&self.path.join("cacache"))
552            .await
553    }
554
555    pub fn cas_writer_sync(&self) -> cacache::Result<cacache::SyncWriter> {
556        cacache::WriteOpts::new().open_hash_sync(self.path.join("cacache"))
557    }
558
559    pub async fn cas_insert(&self, content: impl AsRef<[u8]>) -> cacache::Result<ssri::Integrity> {
560        cacache::write_hash(&self.path.join("cacache"), content).await
561    }
562
563    pub fn cas_insert_sync(&self, content: impl AsRef<[u8]>) -> cacache::Result<ssri::Integrity> {
564        cacache::write_hash_sync(self.path.join("cacache"), content)
565    }
566
567    pub async fn cas_insert_bytes(&self, bytes: &[u8]) -> cacache::Result<ssri::Integrity> {
568        self.cas_insert(bytes).await
569    }
570
571    pub fn cas_insert_bytes_sync(&self, bytes: &[u8]) -> cacache::Result<ssri::Integrity> {
572        self.cas_insert_sync(bytes)
573    }
574
575    pub async fn cas_read(&self, hash: &ssri::Integrity) -> cacache::Result<Vec<u8>> {
576        cacache::read_hash(&self.path.join("cacache"), hash).await
577    }
578
579    pub fn cas_read_sync(&self, hash: &ssri::Integrity) -> cacache::Result<Vec<u8>> {
580        cacache::read_hash_sync(self.path.join("cacache"), hash)
581    }
582
583    #[tracing::instrument(skip(self))]
584    pub fn insert_frame(&self, frame: &Frame) -> Result<(), crate::error::Error> {
585        let encoded: Vec<u8> = serde_json::to_vec(&frame).unwrap();
586
587        // Get the index topic key (also validates topic)
588        let topic_key = idx_topic_key_from_frame(frame)?;
589
590        // Get prefix index keys for hierarchical queries
591        let prefix_keys = idx_topic_prefix_keys(&frame.topic, &frame.id);
592
593        let mut batch = self.db.batch();
594        batch.insert(&self.stream, frame.id.as_bytes(), encoded);
595        batch.insert(&self.idx_topic, topic_key, b"");
596        for prefix_key in &prefix_keys {
597            batch.insert(&self.idx_topic, prefix_key, b"");
598        }
599        batch.commit()?;
600        self.db.persist(PersistMode::SyncAll)?;
601        Ok(())
602    }
603
604    pub fn append(&self, mut frame: Frame) -> Result<Frame, crate::error::Error> {
605        // Serialize all appends to ensure ID generation, write, and broadcast
606        // happen atomically. This guarantees subscribers receive frames in
607        // scru128 ID order.
608        let _guard = self.append_lock.lock().unwrap();
609
610        frame.id = scru128::new();
611
612        // Check for null byte in topic (in case we're not storing the frame)
613        idx_topic_key_from_frame(&frame)?;
614
615        // only store the frame if it's not ephemeral
616        if frame.ttl != Some(TTL::Ephemeral) {
617            self.insert_frame(&frame)?;
618
619            // If this is a Last TTL, schedule a gc task
620            if let Some(TTL::Last(n)) = frame.ttl {
621                let _ = self.gc_tx.send(GCTask::CheckLastTTL {
622                    topic: frame.topic.clone(),
623                    keep: n,
624                });
625            }
626        }
627
628        let _ = self.broadcast_tx.send(frame.clone());
629        Ok(frame)
630    }
631
632    /// Iterate frames starting from a bound.
633    /// `start` is `(id, inclusive)` where inclusive=true means >= and inclusive=false means >.
634    fn iter_frames(
635        &self,
636        start: Option<(&Scru128Id, bool)>,
637    ) -> Box<dyn Iterator<Item = Frame> + '_> {
638        let range = match start {
639            Some((id, true)) => (Bound::Included(id.as_bytes().to_vec()), Bound::Unbounded),
640            Some((id, false)) => (Bound::Excluded(id.as_bytes().to_vec()), Bound::Unbounded),
641            None => (Bound::Unbounded, Bound::Unbounded),
642        };
643
644        Box::new(self.stream.range(range).filter_map(|guard| {
645            let (key, value) = guard.into_inner().ok()?;
646            Some(deserialize_frame((key, value)))
647        }))
648    }
649
650    /// Iterate frames in reverse order (most recent first).
651    fn iter_frames_rev(&self) -> Box<dyn Iterator<Item = Frame> + '_> {
652        Box::new(self.stream.iter().rev().filter_map(|guard| {
653            let (key, value) = guard.into_inner().ok()?;
654            Some(deserialize_frame((key, value)))
655        }))
656    }
657
658    /// Iterate frames by topic in reverse order (most recent first).
659    fn iter_frames_by_topic_rev<'a>(
660        &'a self,
661        topic: &'a str,
662    ) -> Box<dyn Iterator<Item = Frame> + 'a> {
663        let prefix = idx_topic_key_prefix(topic);
664        Box::new(
665            self.idx_topic
666                .prefix(prefix)
667                .rev()
668                .filter_map(move |guard| {
669                    let key = guard.key().ok()?;
670                    let frame_id = idx_topic_frame_id_from_key(&key);
671                    self.get(&frame_id)
672                }),
673        )
674    }
675
676    /// Iterate frames by topic prefix in reverse order (most recent first).
677    fn iter_frames_by_topic_prefix_rev<'a>(
678        &'a self,
679        prefix: &'a str,
680    ) -> Box<dyn Iterator<Item = Frame> + 'a> {
681        let mut index_prefix = Vec::with_capacity(prefix.len() + 1);
682        index_prefix.extend(prefix.as_bytes());
683        index_prefix.push(NULL_DELIMITER);
684
685        Box::new(
686            self.idx_topic
687                .prefix(index_prefix)
688                .rev()
689                .filter_map(move |guard| {
690                    let key = guard.key().ok()?;
691                    let frame_id = idx_topic_frame_id_from_key(&key);
692                    self.get(&frame_id)
693                }),
694        )
695    }
696
697    fn iter_frames_by_topic<'a>(
698        &'a self,
699        topic: &'a str,
700        start: Option<(&'a Scru128Id, bool)>,
701    ) -> Box<dyn Iterator<Item = Frame> + 'a> {
702        let prefix = idx_topic_key_prefix(topic);
703        Box::new(self.idx_topic.prefix(prefix).filter_map(move |guard| {
704            let key = guard.key().ok()?;
705            let frame_id = idx_topic_frame_id_from_key(&key);
706            if let Some((bound_id, inclusive)) = start {
707                if inclusive {
708                    if frame_id < *bound_id {
709                        return None;
710                    }
711                } else if frame_id <= *bound_id {
712                    return None;
713                }
714            }
715            self.get(&frame_id)
716        }))
717    }
718
719    /// Iterate frames matching a topic prefix (for wildcard queries like "user.*").
720    /// The prefix should include the trailing dot (e.g., "user." for "user.*").
721    fn iter_frames_by_topic_prefix<'a>(
722        &'a self,
723        prefix: &'a str,
724        start: Option<(&'a Scru128Id, bool)>,
725    ) -> Box<dyn Iterator<Item = Frame> + 'a> {
726        // Build index prefix: "user.\0" for scanning all "user.*" entries
727        let mut index_prefix = Vec::with_capacity(prefix.len() + 1);
728        index_prefix.extend(prefix.as_bytes());
729        index_prefix.push(NULL_DELIMITER);
730
731        Box::new(
732            self.idx_topic
733                .prefix(index_prefix)
734                .filter_map(move |guard| {
735                    let key = guard.key().ok()?;
736                    let frame_id = idx_topic_frame_id_from_key(&key);
737                    if let Some((bound_id, inclusive)) = start {
738                        if inclusive {
739                            if frame_id < *bound_id {
740                                return None;
741                            }
742                        } else if frame_id <= *bound_id {
743                            return None;
744                        }
745                    }
746                    self.get(&frame_id)
747                }),
748        )
749    }
750}
751
752fn spawn_gc_worker(mut gc_rx: UnboundedReceiver<GCTask>, store: Store) {
753    std::thread::spawn(move || {
754        while let Some(task) = gc_rx.blocking_recv() {
755            match task {
756                GCTask::Remove(id) => {
757                    let _ = store.remove(&id);
758                }
759
760                GCTask::CheckLastTTL { topic, keep } => {
761                    let prefix = idx_topic_key_prefix(&topic);
762                    let frames_to_remove: Vec<_> = store
763                        .idx_topic
764                        .prefix(&prefix)
765                        .rev() // Scan from newest to oldest
766                        .skip(keep as usize)
767                        .filter_map(|guard| {
768                            let key = guard.key().ok()?;
769                            Some(Scru128Id::from_bytes(
770                                idx_topic_frame_id_from_key(&key).into(),
771                            ))
772                        })
773                        .collect();
774
775                    for frame_id in frames_to_remove {
776                        let _ = store.remove(&frame_id);
777                    }
778                }
779
780                GCTask::Drain(tx) => {
781                    let _ = tx.send(());
782                }
783            }
784        }
785    });
786}
787
788fn is_expired(id: &Scru128Id, ttl: &Duration) -> bool {
789    let created_ms = id.timestamp();
790    let expires_ms = created_ms.saturating_add(ttl.as_millis() as u64);
791    let now_ms = std::time::SystemTime::now()
792        .duration_since(std::time::UNIX_EPOCH)
793        .unwrap()
794        .as_millis() as u64;
795
796    now_ms >= expires_ms
797}
798
799const NULL_DELIMITER: u8 = 0;
800const MAX_TOPIC_LENGTH: usize = 255;
801
802/// Validates a topic name according to ADR 0001.
803/// - Allowed characters: a-z A-Z 0-9 _ - .
804/// - Must start with: a-z A-Z 0-9 _
805/// - Cannot be empty, cannot end with '.', max 255 bytes
806pub fn validate_topic(topic: &str) -> Result<(), crate::error::Error> {
807    if topic.is_empty() {
808        return Err("Topic cannot be empty".to_string().into());
809    }
810    if topic.len() > MAX_TOPIC_LENGTH {
811        return Err(format!("Topic exceeds max length of {MAX_TOPIC_LENGTH} bytes").into());
812    }
813    if topic.ends_with('.') {
814        return Err("Topic cannot end with '.'".to_string().into());
815    }
816    if topic.contains("..") {
817        return Err("Topic cannot contain consecutive dots".to_string().into());
818    }
819
820    let bytes = topic.as_bytes();
821    let first = bytes[0];
822    if !first.is_ascii_alphanumeric() && first != b'_' {
823        return Err("Topic must start with a-z, A-Z, 0-9, or _"
824            .to_string()
825            .into());
826    }
827
828    for &b in bytes {
829        if !b.is_ascii_alphanumeric() && b != b'_' && b != b'-' && b != b'.' {
830            return Err(format!(
831                "Topic contains invalid character: '{}'. Allowed: a-z A-Z 0-9 _ - .",
832                b as char
833            )
834            .into());
835        }
836    }
837
838    Ok(())
839}
840
841/// Validates a topic query (for --topic flag).
842/// Allows wildcards: "*" (match all) or "prefix.*" (match children).
843pub fn validate_topic_query(topic: &str) -> Result<(), crate::error::Error> {
844    if topic == "*" {
845        return Ok(());
846    }
847    if let Some(prefix) = topic.strip_suffix(".*") {
848        // Validate the prefix part (e.g., "user" in "user.*")
849        // Prefix can be empty edge case: ".*" is not valid
850        if prefix.is_empty() {
851            return Err("Wildcard '.*' requires a prefix".to_string().into());
852        }
853        validate_topic(prefix)
854    } else {
855        validate_topic(topic)
856    }
857}
858
859/// Generate prefix index keys for hierarchical topic queries.
860/// For topic "user.id1.messages", returns keys for prefixes "user." and "user.id1."
861fn idx_topic_prefix_keys(topic: &str, frame_id: &scru128::Scru128Id) -> Vec<Vec<u8>> {
862    let mut keys = Vec::new();
863    let mut pos = 0;
864    while let Some(dot_pos) = topic[pos..].find('.') {
865        let prefix = &topic[..pos + dot_pos + 1]; // include the dot
866        let mut key = Vec::with_capacity(prefix.len() + 1 + 16);
867        key.extend(prefix.as_bytes());
868        key.push(NULL_DELIMITER);
869        key.extend(frame_id.as_bytes());
870        keys.push(key);
871        pos += dot_pos + 1;
872    }
873    keys
874}
875
876fn idx_topic_key_prefix(topic: &str) -> Vec<u8> {
877    let mut v = Vec::with_capacity(topic.len() + 1); // topic bytes + delimiter
878    v.extend(topic.as_bytes()); // topic string as UTF-8 bytes
879    v.push(NULL_DELIMITER); // Delimiter for variable-sized keys
880    v
881}
882
883pub(crate) fn idx_topic_key_from_frame(frame: &Frame) -> Result<Vec<u8>, crate::error::Error> {
884    validate_topic(&frame.topic)?;
885    let mut v = idx_topic_key_prefix(&frame.topic);
886    v.extend(frame.id.as_bytes());
887    Ok(v)
888}
889
890fn idx_topic_frame_id_from_key(key: &[u8]) -> Scru128Id {
891    let frame_id_bytes = &key[key.len() - 16..];
892    Scru128Id::from_bytes(frame_id_bytes.try_into().unwrap())
893}
894
895fn deserialize_frame<B1: AsRef<[u8]> + std::fmt::Debug, B2: AsRef<[u8]>>(
896    record: (B1, B2),
897) -> Frame {
898    serde_json::from_slice(record.1.as_ref()).unwrap_or_else(|e| {
899        // Try to convert the key to a Scru128Id and print in a format that can be copied for deletion
900        let key_bytes = record.0.as_ref();
901        if key_bytes.len() == 16 {
902            if let Ok(bytes) = key_bytes.try_into() {
903                let id = Scru128Id::from_bytes(bytes);
904                eprintln!("CORRUPTED_RECORD_ID: {id}");
905            }
906        }
907        let key = std::str::from_utf8(record.0.as_ref()).unwrap();
908        let value = std::str::from_utf8(record.1.as_ref()).unwrap();
909        panic!("Failed to deserialize frame: {e} {key} {value}")
910    })
911}