xs/store/
mod.rs

1mod ttl;
2pub use ttl::*;
3
4#[cfg(test)]
5mod tests;
6
7use std::ops::Bound;
8use std::path::PathBuf;
9use std::time::Duration;
10
11use tokio::sync::broadcast;
12use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender};
13
14use std::sync::{Arc, Mutex};
15
16use scru128::Scru128Id;
17
18use serde::{Deserialize, Deserializer, Serialize};
19
20use fjall::{
21    config::{BlockSizePolicy, HashRatioPolicy},
22    Database, Keyspace, KeyspaceCreateOptions, PersistMode,
23};
24
25#[derive(PartialEq, Eq, Serialize, Deserialize, Clone, Default, bon::Builder)]
26pub struct Frame {
27    #[builder(start_fn, into)]
28    pub topic: String,
29    #[builder(default)]
30    pub id: Scru128Id,
31    pub hash: Option<ssri::Integrity>,
32    pub meta: Option<serde_json::Value>,
33    pub ttl: Option<TTL>,
34}
35
36use std::fmt;
37
38impl fmt::Debug for Frame {
39    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
40        f.debug_struct("Frame")
41            .field("id", &format!("{id}", id = self.id))
42            .field("topic", &self.topic)
43            .field("hash", &self.hash.as_ref().map(|x| format!("{x}")))
44            .field("meta", &self.meta)
45            .field("ttl", &self.ttl)
46            .finish()
47    }
48}
49
50impl<'de> Deserialize<'de> for FollowOption {
51    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
52    where
53        D: Deserializer<'de>,
54    {
55        let s: String = Deserialize::deserialize(deserializer)?;
56        if s.is_empty() || s == "yes" {
57            Ok(FollowOption::On)
58        } else if let Ok(duration) = s.parse::<u64>() {
59            Ok(FollowOption::WithHeartbeat(Duration::from_millis(duration)))
60        } else {
61            match s.as_str() {
62                "true" => Ok(FollowOption::On),
63                "false" | "no" => Ok(FollowOption::Off),
64                _ => Err(serde::de::Error::custom("Invalid value for follow option")),
65            }
66        }
67    }
68}
69
70fn deserialize_bool<'de, D>(deserializer: D) -> Result<bool, D::Error>
71where
72    D: Deserializer<'de>,
73{
74    let s: String = Deserialize::deserialize(deserializer)?;
75    match s.as_str() {
76        "false" | "no" | "0" => Ok(false),
77        _ => Ok(true),
78    }
79}
80
81#[derive(PartialEq, Deserialize, Clone, Debug, Default, bon::Builder)]
82pub struct ReadOptions {
83    #[serde(default)]
84    #[builder(default)]
85    pub follow: FollowOption,
86    #[serde(default, deserialize_with = "deserialize_bool")]
87    #[builder(default)]
88    pub new: bool,
89    /// Start after this ID (exclusive)
90    #[serde(rename = "after")]
91    pub after: Option<Scru128Id>,
92    /// Start from this ID (inclusive)
93    pub from: Option<Scru128Id>,
94    pub limit: Option<usize>,
95    /// Return the last N frames (most recent)
96    pub last: Option<usize>,
97    pub topic: Option<String>,
98}
99
100impl ReadOptions {
101    pub fn from_query(query: Option<&str>) -> Result<Self, crate::error::Error> {
102        match query {
103            Some(q) => Ok(serde_urlencoded::from_str(q)?),
104            None => Ok(Self::default()),
105        }
106    }
107
108    pub fn to_query_string(&self) -> String {
109        let mut params = Vec::new();
110
111        // Add follow parameter with heartbeat if specified
112        match self.follow {
113            FollowOption::Off => {}
114            FollowOption::On => params.push(("follow", "true".to_string())),
115            FollowOption::WithHeartbeat(duration) => {
116                params.push(("follow", duration.as_millis().to_string()));
117            }
118        }
119
120        // Add new if true
121        if self.new {
122            params.push(("new", "true".to_string()));
123        }
124
125        // Add after if present
126        if let Some(after) = self.after {
127            params.push(("after", after.to_string()));
128        }
129
130        // Add from if present
131        if let Some(from) = self.from {
132            params.push(("from", from.to_string()));
133        }
134
135        // Add limit if present
136        if let Some(limit) = self.limit {
137            params.push(("limit", limit.to_string()));
138        }
139
140        // Add last if present
141        if let Some(last) = self.last {
142            params.push(("last", last.to_string()));
143        }
144
145        if let Some(topic) = &self.topic {
146            params.push(("topic", topic.clone()));
147        }
148
149        // Return empty string if no params
150        if params.is_empty() {
151            String::new()
152        } else {
153            url::form_urlencoded::Serializer::new(String::new())
154                .extend_pairs(params)
155                .finish()
156        }
157    }
158}
159
160#[derive(Default, PartialEq, Clone, Debug)]
161pub enum FollowOption {
162    #[default]
163    Off,
164    On,
165    WithHeartbeat(Duration),
166}
167
168#[derive(Debug)]
169enum GCTask {
170    Remove(Scru128Id),
171    CheckLastTTL { topic: String, keep: u32 },
172    Drain(tokio::sync::oneshot::Sender<()>),
173}
174
175#[derive(Clone)]
176pub struct Store {
177    pub path: PathBuf,
178    db: Database,
179    stream: Keyspace,
180    idx_topic: Keyspace,
181    broadcast_tx: broadcast::Sender<Frame>,
182    gc_tx: UnboundedSender<GCTask>,
183    append_lock: Arc<Mutex<()>>,
184}
185
186impl Store {
187    pub fn new(path: PathBuf) -> Store {
188        let db = Database::builder(path.join("fjall"))
189            .cache_size(32 * 1024 * 1024) // 32 MiB
190            .worker_threads(1)
191            .open()
192            .unwrap();
193
194        // Options for stream keyspace: point reads by frame ID
195        let stream_opts = || {
196            KeyspaceCreateOptions::default()
197                .max_memtable_size(8 * 1024 * 1024) // 8 MiB
198                .data_block_size_policy(BlockSizePolicy::all(16 * 1024)) // 16 KiB
199                .data_block_hash_ratio_policy(HashRatioPolicy::all(8.0))
200                .expect_point_read_hits(true)
201        };
202
203        // Options for idx_topic keyspace: prefix scans only
204        let idx_opts = || {
205            KeyspaceCreateOptions::default()
206                .max_memtable_size(8 * 1024 * 1024) // 8 MiB
207                .data_block_size_policy(BlockSizePolicy::all(16 * 1024)) // 16 KiB
208                .data_block_hash_ratio_policy(HashRatioPolicy::all(0.0)) // no point reads
209                .expect_point_read_hits(true)
210        };
211
212        let stream = db.keyspace("stream", stream_opts).unwrap();
213        let idx_topic = db.keyspace("idx_topic", idx_opts).unwrap();
214
215        let (broadcast_tx, _) = broadcast::channel(1024);
216        let (gc_tx, gc_rx) = mpsc::unbounded_channel();
217
218        let store = Store {
219            path: path.clone(),
220            db,
221            stream,
222            idx_topic,
223            broadcast_tx,
224            gc_tx,
225            append_lock: Arc::new(Mutex::new(())),
226        };
227
228        // Spawn gc worker thread
229        spawn_gc_worker(gc_rx, store.clone());
230
231        store
232    }
233
234    pub async fn wait_for_gc(&self) {
235        let (tx, rx) = tokio::sync::oneshot::channel();
236        let _ = self.gc_tx.send(GCTask::Drain(tx));
237        let _ = rx.await;
238    }
239
240    #[tracing::instrument(skip(self))]
241    pub async fn read(&self, options: ReadOptions) -> tokio::sync::mpsc::Receiver<Frame> {
242        let (tx, rx) = tokio::sync::mpsc::channel(100);
243
244        let should_follow = matches!(
245            options.follow,
246            FollowOption::On | FollowOption::WithHeartbeat(_)
247        );
248
249        // Only take broadcast subscription if following. We initate the subscription here to
250        // ensure we don't miss any messages between historical processing and starting the
251        // broadcast subscription.
252        let broadcast_rx = if should_follow {
253            Some(self.broadcast_tx.subscribe())
254        } else {
255            None
256        };
257
258        // Only create done channel if we're doing historical processing
259        let done_rx = if !options.new {
260            let (done_tx, done_rx) = tokio::sync::oneshot::channel();
261            let tx_clone = tx.clone();
262            let store = self.clone();
263            let options = options.clone();
264            let should_follow_clone = should_follow;
265            let gc_tx = self.gc_tx.clone();
266
267            // Spawn OS thread to handle historical events
268            std::thread::spawn(move || {
269                let mut last_id = None;
270                let mut count = 0;
271
272                // Handle --last N: get the N most recent frames
273                if let Some(last_n) = options.last {
274                    let iter: Box<dyn Iterator<Item = Frame>> = match options.topic.as_deref() {
275                        None | Some("*") => store.iter_frames_rev(),
276                        Some(topic) if topic.ends_with(".*") => {
277                            let prefix = &topic[..topic.len() - 1];
278                            store.iter_frames_by_topic_prefix_rev(prefix)
279                        }
280                        Some(topic) => store.iter_frames_by_topic_rev(topic),
281                    };
282
283                    // Collect last N frames (in reverse order), skipping expired
284                    let mut frames: Vec<Frame> = Vec::with_capacity(last_n);
285                    for frame in iter {
286                        if let Some(TTL::Time(ttl)) = frame.ttl.as_ref() {
287                            if is_expired(&frame.id, ttl) {
288                                let _ = gc_tx.send(GCTask::Remove(frame.id));
289                                continue;
290                            }
291                        }
292                        frames.push(frame);
293                        if frames.len() >= last_n {
294                            break;
295                        }
296                    }
297
298                    // Reverse to chronological order and send
299                    for frame in frames.into_iter().rev() {
300                        last_id = Some(frame.id);
301                        count += 1;
302                        if tx_clone.blocking_send(frame).is_err() {
303                            return;
304                        }
305                    }
306                } else {
307                    // Normal forward iteration
308                    // Determine start bound: from (inclusive) takes precedence over after (exclusive)
309                    let start_bound = options
310                        .from
311                        .as_ref()
312                        .map(|id| (id, true))
313                        .or_else(|| options.after.as_ref().map(|id| (id, false)));
314
315                    let iter: Box<dyn Iterator<Item = Frame>> = match options.topic.as_deref() {
316                        None | Some("*") => store.iter_frames(start_bound),
317                        Some(topic) if topic.ends_with(".*") => {
318                            // Wildcard: "user.*" -> prefix "user."
319                            let prefix = &topic[..topic.len() - 1]; // strip "*", keep "."
320                            store.iter_frames_by_topic_prefix(prefix, start_bound)
321                        }
322                        Some(topic) => store.iter_frames_by_topic(topic, start_bound),
323                    };
324
325                    for frame in iter {
326                        if let Some(TTL::Time(ttl)) = frame.ttl.as_ref() {
327                            if is_expired(&frame.id, ttl) {
328                                let _ = gc_tx.send(GCTask::Remove(frame.id));
329                                continue;
330                            }
331                        }
332
333                        last_id = Some(frame.id);
334
335                        if let Some(limit) = options.limit {
336                            if count >= limit {
337                                return; // Exit early if limit reached
338                            }
339                        }
340
341                        if tx_clone.blocking_send(frame).is_err() {
342                            return;
343                        }
344                        count += 1;
345                    }
346                }
347
348                // Send threshold message if following and no limit (--last counts as having a limit for this purpose)
349                if should_follow_clone && options.limit.is_none() && options.last.is_none() {
350                    let threshold = Frame::builder("xs.threshold")
351                        .id(scru128::new())
352                        .ttl(TTL::Ephemeral)
353                        .build();
354                    if tx_clone.blocking_send(threshold).is_err() {
355                        return;
356                    }
357                }
358
359                // Signal completion with the last seen ID and count
360                let _ = done_tx.send((last_id, count));
361            });
362
363            Some(done_rx)
364        } else {
365            None
366        };
367
368        // Handle broadcast subscription and heartbeat
369        if let Some(broadcast_rx) = broadcast_rx {
370            {
371                let tx = tx.clone();
372                let limit = options.limit;
373
374                tokio::spawn(async move {
375                    // If we have a done_rx, wait for historical processing
376                    let (last_id, mut count) = match done_rx {
377                        Some(done_rx) => match done_rx.await {
378                            Ok((id, count)) => (id, count),
379                            Err(_) => return, // Historical processing failed/cancelled
380                        },
381                        None => (None, 0),
382                    };
383
384                    let mut broadcast_rx = broadcast_rx;
385                    while let Ok(frame) = broadcast_rx.recv().await {
386                        // Filter by topic (exact match or wildcard)
387                        match options.topic.as_deref() {
388                            None | Some("*") => {}
389                            Some(topic) if topic.ends_with(".*") => {
390                                let prefix = &topic[..topic.len() - 1]; // "user.*" -> "user."
391                                if !frame.topic.starts_with(prefix) {
392                                    continue;
393                                }
394                            }
395                            Some(topic) => {
396                                if frame.topic != topic {
397                                    continue;
398                                }
399                            }
400                        }
401
402                        // Skip if we've already seen this frame during historical scan
403                        if let Some(last_scanned_id) = last_id {
404                            if frame.id <= last_scanned_id {
405                                continue;
406                            }
407                        }
408
409                        if tx.send(frame).await.is_err() {
410                            break;
411                        }
412
413                        if let Some(limit) = limit {
414                            count += 1;
415                            if count >= limit {
416                                break;
417                            }
418                        }
419                    }
420                });
421            }
422
423            // Handle heartbeat if requested
424            if let FollowOption::WithHeartbeat(duration) = options.follow {
425                let heartbeat_tx = tx;
426                tokio::spawn(async move {
427                    loop {
428                        tokio::time::sleep(duration).await;
429                        let frame = Frame::builder("xs.pulse")
430                            .id(scru128::new())
431                            .ttl(TTL::Ephemeral)
432                            .build();
433                        if heartbeat_tx.send(frame).await.is_err() {
434                            break;
435                        }
436                    }
437                });
438            }
439        }
440
441        rx
442    }
443
444    #[tracing::instrument(skip(self))]
445    pub fn read_sync(
446        &self,
447        after: Option<&Scru128Id>,
448        limit: Option<usize>,
449    ) -> impl Iterator<Item = Frame> + '_ {
450        self.iter_frames(after.map(|id| (id, false)))
451            .filter(move |frame| {
452                if let Some(TTL::Time(ttl)) = frame.ttl.as_ref() {
453                    if is_expired(&frame.id, ttl) {
454                        let _ = self.gc_tx.send(GCTask::Remove(frame.id));
455                        return false;
456                    }
457                }
458                true
459            })
460            .take(limit.unwrap_or(usize::MAX))
461    }
462
463    pub fn get(&self, id: &Scru128Id) -> Option<Frame> {
464        self.stream
465            .get(id.to_bytes())
466            .unwrap()
467            .map(|value| deserialize_frame((id.as_bytes(), value)))
468    }
469
470    #[tracing::instrument(skip(self))]
471    pub fn last(&self, topic: &str) -> Option<Frame> {
472        self.idx_topic
473            .prefix(idx_topic_key_prefix(topic))
474            .rev()
475            .find_map(|guard| {
476                let key = guard.key().ok()?;
477                self.get(&idx_topic_frame_id_from_key(&key))
478            })
479    }
480
481    #[tracing::instrument(skip(self), fields(id = %id.to_string()))]
482    pub fn remove(&self, id: &Scru128Id) -> Result<(), crate::error::Error> {
483        let Some(frame) = self.get(id) else {
484            // Already deleted
485            return Ok(());
486        };
487
488        // Build topic key directly (no validation - frame already exists)
489        let mut topic_key = idx_topic_key_prefix(&frame.topic);
490        topic_key.extend(frame.id.as_bytes());
491
492        // Get prefix index keys for hierarchical queries
493        let prefix_keys = idx_topic_prefix_keys(&frame.topic, &frame.id);
494
495        let mut batch = self.db.batch();
496        batch.remove(&self.stream, id.as_bytes());
497        batch.remove(&self.idx_topic, topic_key);
498        for prefix_key in &prefix_keys {
499            batch.remove(&self.idx_topic, prefix_key);
500        }
501        batch.commit()?;
502        self.db.persist(PersistMode::SyncAll)?;
503        Ok(())
504    }
505
506    pub async fn cas_reader(&self, hash: ssri::Integrity) -> cacache::Result<cacache::Reader> {
507        cacache::Reader::open_hash(&self.path.join("cacache"), hash).await
508    }
509
510    pub fn cas_reader_sync(&self, hash: ssri::Integrity) -> cacache::Result<cacache::SyncReader> {
511        cacache::SyncReader::open_hash(self.path.join("cacache"), hash)
512    }
513
514    pub async fn cas_writer(&self) -> cacache::Result<cacache::Writer> {
515        cacache::WriteOpts::new()
516            .open_hash(&self.path.join("cacache"))
517            .await
518    }
519
520    pub fn cas_writer_sync(&self) -> cacache::Result<cacache::SyncWriter> {
521        cacache::WriteOpts::new().open_hash_sync(self.path.join("cacache"))
522    }
523
524    pub async fn cas_insert(&self, content: impl AsRef<[u8]>) -> cacache::Result<ssri::Integrity> {
525        cacache::write_hash(&self.path.join("cacache"), content).await
526    }
527
528    pub fn cas_insert_sync(&self, content: impl AsRef<[u8]>) -> cacache::Result<ssri::Integrity> {
529        cacache::write_hash_sync(self.path.join("cacache"), content)
530    }
531
532    pub async fn cas_insert_bytes(&self, bytes: &[u8]) -> cacache::Result<ssri::Integrity> {
533        self.cas_insert(bytes).await
534    }
535
536    pub fn cas_insert_bytes_sync(&self, bytes: &[u8]) -> cacache::Result<ssri::Integrity> {
537        self.cas_insert_sync(bytes)
538    }
539
540    pub async fn cas_read(&self, hash: &ssri::Integrity) -> cacache::Result<Vec<u8>> {
541        cacache::read_hash(&self.path.join("cacache"), hash).await
542    }
543
544    pub fn cas_read_sync(&self, hash: &ssri::Integrity) -> cacache::Result<Vec<u8>> {
545        cacache::read_hash_sync(self.path.join("cacache"), hash)
546    }
547
548    #[tracing::instrument(skip(self))]
549    pub fn insert_frame(&self, frame: &Frame) -> Result<(), crate::error::Error> {
550        let encoded: Vec<u8> = serde_json::to_vec(&frame).unwrap();
551
552        // Get the index topic key (also validates topic)
553        let topic_key = idx_topic_key_from_frame(frame)?;
554
555        // Get prefix index keys for hierarchical queries
556        let prefix_keys = idx_topic_prefix_keys(&frame.topic, &frame.id);
557
558        let mut batch = self.db.batch();
559        batch.insert(&self.stream, frame.id.as_bytes(), encoded);
560        batch.insert(&self.idx_topic, topic_key, b"");
561        for prefix_key in &prefix_keys {
562            batch.insert(&self.idx_topic, prefix_key, b"");
563        }
564        batch.commit()?;
565        self.db.persist(PersistMode::SyncAll)?;
566        Ok(())
567    }
568
569    pub fn append(&self, mut frame: Frame) -> Result<Frame, crate::error::Error> {
570        // Serialize all appends to ensure ID generation, write, and broadcast
571        // happen atomically. This guarantees subscribers receive frames in
572        // scru128 ID order.
573        let _guard = self.append_lock.lock().unwrap();
574
575        frame.id = scru128::new();
576
577        // Check for null byte in topic (in case we're not storing the frame)
578        idx_topic_key_from_frame(&frame)?;
579
580        // only store the frame if it's not ephemeral
581        if frame.ttl != Some(TTL::Ephemeral) {
582            self.insert_frame(&frame)?;
583
584            // If this is a Last TTL, schedule a gc task
585            if let Some(TTL::Last(n)) = frame.ttl {
586                let _ = self.gc_tx.send(GCTask::CheckLastTTL {
587                    topic: frame.topic.clone(),
588                    keep: n,
589                });
590            }
591        }
592
593        let _ = self.broadcast_tx.send(frame.clone());
594        Ok(frame)
595    }
596
597    /// Iterate frames starting from a bound.
598    /// `start` is `(id, inclusive)` where inclusive=true means >= and inclusive=false means >.
599    fn iter_frames(
600        &self,
601        start: Option<(&Scru128Id, bool)>,
602    ) -> Box<dyn Iterator<Item = Frame> + '_> {
603        let range = match start {
604            Some((id, true)) => (Bound::Included(id.as_bytes().to_vec()), Bound::Unbounded),
605            Some((id, false)) => (Bound::Excluded(id.as_bytes().to_vec()), Bound::Unbounded),
606            None => (Bound::Unbounded, Bound::Unbounded),
607        };
608
609        Box::new(self.stream.range(range).filter_map(|guard| {
610            let (key, value) = guard.into_inner().ok()?;
611            Some(deserialize_frame((key, value)))
612        }))
613    }
614
615    /// Iterate frames in reverse order (most recent first).
616    fn iter_frames_rev(&self) -> Box<dyn Iterator<Item = Frame> + '_> {
617        Box::new(self.stream.iter().rev().filter_map(|guard| {
618            let (key, value) = guard.into_inner().ok()?;
619            Some(deserialize_frame((key, value)))
620        }))
621    }
622
623    /// Iterate frames by topic in reverse order (most recent first).
624    fn iter_frames_by_topic_rev<'a>(
625        &'a self,
626        topic: &'a str,
627    ) -> Box<dyn Iterator<Item = Frame> + 'a> {
628        let prefix = idx_topic_key_prefix(topic);
629        Box::new(
630            self.idx_topic
631                .prefix(prefix)
632                .rev()
633                .filter_map(move |guard| {
634                    let key = guard.key().ok()?;
635                    let frame_id = idx_topic_frame_id_from_key(&key);
636                    self.get(&frame_id)
637                }),
638        )
639    }
640
641    /// Iterate frames by topic prefix in reverse order (most recent first).
642    fn iter_frames_by_topic_prefix_rev<'a>(
643        &'a self,
644        prefix: &'a str,
645    ) -> Box<dyn Iterator<Item = Frame> + 'a> {
646        let mut index_prefix = Vec::with_capacity(prefix.len() + 1);
647        index_prefix.extend(prefix.as_bytes());
648        index_prefix.push(NULL_DELIMITER);
649
650        Box::new(
651            self.idx_topic
652                .prefix(index_prefix)
653                .rev()
654                .filter_map(move |guard| {
655                    let key = guard.key().ok()?;
656                    let frame_id = idx_topic_frame_id_from_key(&key);
657                    self.get(&frame_id)
658                }),
659        )
660    }
661
662    fn iter_frames_by_topic<'a>(
663        &'a self,
664        topic: &'a str,
665        start: Option<(&'a Scru128Id, bool)>,
666    ) -> Box<dyn Iterator<Item = Frame> + 'a> {
667        let prefix = idx_topic_key_prefix(topic);
668        Box::new(self.idx_topic.prefix(prefix).filter_map(move |guard| {
669            let key = guard.key().ok()?;
670            let frame_id = idx_topic_frame_id_from_key(&key);
671            if let Some((bound_id, inclusive)) = start {
672                if inclusive {
673                    if frame_id < *bound_id {
674                        return None;
675                    }
676                } else if frame_id <= *bound_id {
677                    return None;
678                }
679            }
680            self.get(&frame_id)
681        }))
682    }
683
684    /// Iterate frames matching a topic prefix (for wildcard queries like "user.*").
685    /// The prefix should include the trailing dot (e.g., "user." for "user.*").
686    fn iter_frames_by_topic_prefix<'a>(
687        &'a self,
688        prefix: &'a str,
689        start: Option<(&'a Scru128Id, bool)>,
690    ) -> Box<dyn Iterator<Item = Frame> + 'a> {
691        // Build index prefix: "user.\0" for scanning all "user.*" entries
692        let mut index_prefix = Vec::with_capacity(prefix.len() + 1);
693        index_prefix.extend(prefix.as_bytes());
694        index_prefix.push(NULL_DELIMITER);
695
696        Box::new(
697            self.idx_topic
698                .prefix(index_prefix)
699                .filter_map(move |guard| {
700                    let key = guard.key().ok()?;
701                    let frame_id = idx_topic_frame_id_from_key(&key);
702                    if let Some((bound_id, inclusive)) = start {
703                        if inclusive {
704                            if frame_id < *bound_id {
705                                return None;
706                            }
707                        } else if frame_id <= *bound_id {
708                            return None;
709                        }
710                    }
711                    self.get(&frame_id)
712                }),
713        )
714    }
715}
716
717fn spawn_gc_worker(mut gc_rx: UnboundedReceiver<GCTask>, store: Store) {
718    std::thread::spawn(move || {
719        while let Some(task) = gc_rx.blocking_recv() {
720            match task {
721                GCTask::Remove(id) => {
722                    let _ = store.remove(&id);
723                }
724
725                GCTask::CheckLastTTL { topic, keep } => {
726                    let prefix = idx_topic_key_prefix(&topic);
727                    let frames_to_remove: Vec<_> = store
728                        .idx_topic
729                        .prefix(&prefix)
730                        .rev() // Scan from newest to oldest
731                        .skip(keep as usize)
732                        .filter_map(|guard| {
733                            let key = guard.key().ok()?;
734                            Some(Scru128Id::from_bytes(
735                                idx_topic_frame_id_from_key(&key).into(),
736                            ))
737                        })
738                        .collect();
739
740                    for frame_id in frames_to_remove {
741                        let _ = store.remove(&frame_id);
742                    }
743                }
744
745                GCTask::Drain(tx) => {
746                    let _ = tx.send(());
747                }
748            }
749        }
750    });
751}
752
753fn is_expired(id: &Scru128Id, ttl: &Duration) -> bool {
754    let created_ms = id.timestamp();
755    let expires_ms = created_ms.saturating_add(ttl.as_millis() as u64);
756    let now_ms = std::time::SystemTime::now()
757        .duration_since(std::time::UNIX_EPOCH)
758        .unwrap()
759        .as_millis() as u64;
760
761    now_ms >= expires_ms
762}
763
764const NULL_DELIMITER: u8 = 0;
765const MAX_TOPIC_LENGTH: usize = 255;
766
767/// Validates a topic name according to ADR 0001.
768/// - Allowed characters: a-z A-Z 0-9 _ - .
769/// - Must start with: a-z A-Z 0-9 _
770/// - Cannot be empty, cannot end with '.', max 255 bytes
771pub fn validate_topic(topic: &str) -> Result<(), crate::error::Error> {
772    if topic.is_empty() {
773        return Err("Topic cannot be empty".to_string().into());
774    }
775    if topic.len() > MAX_TOPIC_LENGTH {
776        return Err(format!("Topic exceeds max length of {MAX_TOPIC_LENGTH} bytes").into());
777    }
778    if topic.ends_with('.') {
779        return Err("Topic cannot end with '.'".to_string().into());
780    }
781    if topic.contains("..") {
782        return Err("Topic cannot contain consecutive dots".to_string().into());
783    }
784
785    let bytes = topic.as_bytes();
786    let first = bytes[0];
787    if !first.is_ascii_alphanumeric() && first != b'_' {
788        return Err("Topic must start with a-z, A-Z, 0-9, or _"
789            .to_string()
790            .into());
791    }
792
793    for &b in bytes {
794        if !b.is_ascii_alphanumeric() && b != b'_' && b != b'-' && b != b'.' {
795            return Err(format!(
796                "Topic contains invalid character: '{}'. Allowed: a-z A-Z 0-9 _ - .",
797                b as char
798            )
799            .into());
800        }
801    }
802
803    Ok(())
804}
805
806/// Validates a topic query (for --topic flag).
807/// Allows wildcards: "*" (match all) or "prefix.*" (match children).
808pub fn validate_topic_query(topic: &str) -> Result<(), crate::error::Error> {
809    if topic == "*" {
810        return Ok(());
811    }
812    if let Some(prefix) = topic.strip_suffix(".*") {
813        // Validate the prefix part (e.g., "user" in "user.*")
814        // Prefix can be empty edge case: ".*" is not valid
815        if prefix.is_empty() {
816            return Err("Wildcard '.*' requires a prefix".to_string().into());
817        }
818        validate_topic(prefix)
819    } else {
820        validate_topic(topic)
821    }
822}
823
824/// Generate prefix index keys for hierarchical topic queries.
825/// For topic "user.id1.messages", returns keys for prefixes "user." and "user.id1."
826fn idx_topic_prefix_keys(topic: &str, frame_id: &scru128::Scru128Id) -> Vec<Vec<u8>> {
827    let mut keys = Vec::new();
828    let mut pos = 0;
829    while let Some(dot_pos) = topic[pos..].find('.') {
830        let prefix = &topic[..pos + dot_pos + 1]; // include the dot
831        let mut key = Vec::with_capacity(prefix.len() + 1 + 16);
832        key.extend(prefix.as_bytes());
833        key.push(NULL_DELIMITER);
834        key.extend(frame_id.as_bytes());
835        keys.push(key);
836        pos += dot_pos + 1;
837    }
838    keys
839}
840
841fn idx_topic_key_prefix(topic: &str) -> Vec<u8> {
842    let mut v = Vec::with_capacity(topic.len() + 1); // topic bytes + delimiter
843    v.extend(topic.as_bytes()); // topic string as UTF-8 bytes
844    v.push(NULL_DELIMITER); // Delimiter for variable-sized keys
845    v
846}
847
848pub(crate) fn idx_topic_key_from_frame(frame: &Frame) -> Result<Vec<u8>, crate::error::Error> {
849    validate_topic(&frame.topic)?;
850    let mut v = idx_topic_key_prefix(&frame.topic);
851    v.extend(frame.id.as_bytes());
852    Ok(v)
853}
854
855fn idx_topic_frame_id_from_key(key: &[u8]) -> Scru128Id {
856    let frame_id_bytes = &key[key.len() - 16..];
857    Scru128Id::from_bytes(frame_id_bytes.try_into().unwrap())
858}
859
860fn deserialize_frame<B1: AsRef<[u8]> + std::fmt::Debug, B2: AsRef<[u8]>>(
861    record: (B1, B2),
862) -> Frame {
863    serde_json::from_slice(record.1.as_ref()).unwrap_or_else(|e| {
864        // Try to convert the key to a Scru128Id and print in a format that can be copied for deletion
865        let key_bytes = record.0.as_ref();
866        if key_bytes.len() == 16 {
867            if let Ok(bytes) = key_bytes.try_into() {
868                let id = Scru128Id::from_bytes(bytes);
869                eprintln!("CORRUPTED_RECORD_ID: {id}");
870            }
871        }
872        let key = std::str::from_utf8(record.0.as_ref()).unwrap();
873        let value = std::str::from_utf8(record.1.as_ref()).unwrap();
874        panic!("Failed to deserialize frame: {e} {key} {value}")
875    })
876}