Skip to main content

xs/store/
mod.rs

1mod ttl;
2pub use ttl::*;
3
4#[cfg(test)]
5mod tests;
6
7use std::ops::Bound;
8use std::path::PathBuf;
9use std::time::Duration;
10
11use tokio::sync::broadcast;
12use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender};
13
14use std::sync::{Arc, Mutex};
15
16use scru128::Scru128Id;
17
18use serde::{Deserialize, Deserializer, Serialize};
19
20use fjall::{
21    config::{BlockSizePolicy, HashRatioPolicy},
22    Database, Error as FjallError, Keyspace, KeyspaceCreateOptions, PersistMode,
23};
24
25#[derive(Debug)]
26pub enum StoreError {
27    Locked,
28    Other(FjallError),
29}
30
31impl std::fmt::Display for StoreError {
32    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
33        match self {
34            StoreError::Locked => write!(f, "Store is locked by another process"),
35            StoreError::Other(e) => write!(f, "{e}"),
36        }
37    }
38}
39
40impl std::error::Error for StoreError {}
41
42#[derive(PartialEq, Eq, Serialize, Deserialize, Clone, Default, bon::Builder)]
43pub struct Frame {
44    #[builder(start_fn, into)]
45    pub topic: String,
46    #[builder(default)]
47    pub id: Scru128Id,
48    pub hash: Option<ssri::Integrity>,
49    pub meta: Option<serde_json::Value>,
50    pub ttl: Option<TTL>,
51}
52
53use std::fmt;
54
55impl fmt::Debug for Frame {
56    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
57        f.debug_struct("Frame")
58            .field("id", &format!("{id}", id = self.id))
59            .field("topic", &self.topic)
60            .field("hash", &self.hash.as_ref().map(|x| format!("{x}")))
61            .field("meta", &self.meta)
62            .field("ttl", &self.ttl)
63            .finish()
64    }
65}
66
67impl<'de> Deserialize<'de> for FollowOption {
68    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
69    where
70        D: Deserializer<'de>,
71    {
72        let s: String = Deserialize::deserialize(deserializer)?;
73        if s.is_empty() || s == "yes" {
74            Ok(FollowOption::On)
75        } else if let Ok(duration) = s.parse::<u64>() {
76            Ok(FollowOption::WithHeartbeat(Duration::from_millis(duration)))
77        } else {
78            match s.as_str() {
79                "true" => Ok(FollowOption::On),
80                "false" | "no" => Ok(FollowOption::Off),
81                _ => Err(serde::de::Error::custom("Invalid value for follow option")),
82            }
83        }
84    }
85}
86
87fn deserialize_bool<'de, D>(deserializer: D) -> Result<bool, D::Error>
88where
89    D: Deserializer<'de>,
90{
91    let s: String = Deserialize::deserialize(deserializer)?;
92    match s.as_str() {
93        "false" | "no" | "0" => Ok(false),
94        _ => Ok(true),
95    }
96}
97
98#[derive(PartialEq, Deserialize, Clone, Debug, Default, bon::Builder)]
99pub struct ReadOptions {
100    #[serde(default)]
101    #[builder(default)]
102    pub follow: FollowOption,
103    #[serde(default, deserialize_with = "deserialize_bool")]
104    #[builder(default)]
105    pub new: bool,
106    /// Start after this ID (exclusive)
107    #[serde(rename = "after")]
108    pub after: Option<Scru128Id>,
109    /// Start from this ID (inclusive)
110    pub from: Option<Scru128Id>,
111    pub limit: Option<usize>,
112    /// Return the last N frames (most recent)
113    pub last: Option<usize>,
114    pub topic: Option<String>,
115}
116
117impl ReadOptions {
118    pub fn from_query(query: Option<&str>) -> Result<Self, crate::error::Error> {
119        match query {
120            Some(q) => Ok(serde_urlencoded::from_str(q)?),
121            None => Ok(Self::default()),
122        }
123    }
124
125    pub fn to_query_string(&self) -> String {
126        let mut params = Vec::new();
127
128        // Add follow parameter with heartbeat if specified
129        match self.follow {
130            FollowOption::Off => {}
131            FollowOption::On => params.push(("follow", "true".to_string())),
132            FollowOption::WithHeartbeat(duration) => {
133                params.push(("follow", duration.as_millis().to_string()));
134            }
135        }
136
137        // Add new if true
138        if self.new {
139            params.push(("new", "true".to_string()));
140        }
141
142        // Add after if present
143        if let Some(after) = self.after {
144            params.push(("after", after.to_string()));
145        }
146
147        // Add from if present
148        if let Some(from) = self.from {
149            params.push(("from", from.to_string()));
150        }
151
152        // Add limit if present
153        if let Some(limit) = self.limit {
154            params.push(("limit", limit.to_string()));
155        }
156
157        // Add last if present
158        if let Some(last) = self.last {
159            params.push(("last", last.to_string()));
160        }
161
162        if let Some(topic) = &self.topic {
163            params.push(("topic", topic.clone()));
164        }
165
166        // Return empty string if no params
167        if params.is_empty() {
168            String::new()
169        } else {
170            url::form_urlencoded::Serializer::new(String::new())
171                .extend_pairs(params)
172                .finish()
173        }
174    }
175}
176
177#[derive(Default, PartialEq, Clone, Debug)]
178pub enum FollowOption {
179    #[default]
180    Off,
181    On,
182    WithHeartbeat(Duration),
183}
184
185#[derive(Debug)]
186enum GCTask {
187    Remove(Scru128Id),
188    CheckLastTTL { topic: String, keep: u32 },
189    Drain(tokio::sync::oneshot::Sender<()>),
190}
191
192#[derive(Clone)]
193pub struct Store {
194    pub path: PathBuf,
195    db: Database,
196    stream: Keyspace,
197    idx_topic: Keyspace,
198    broadcast_tx: broadcast::Sender<Frame>,
199    gc_tx: UnboundedSender<GCTask>,
200    append_lock: Arc<Mutex<()>>,
201}
202
203impl Store {
204    pub fn new(path: PathBuf) -> Result<Store, StoreError> {
205        let db = match Database::builder(path.join("fjall"))
206            .cache_size(32 * 1024 * 1024) // 32 MiB
207            .worker_threads(1)
208            .open()
209        {
210            Ok(db) => db,
211            Err(FjallError::Locked) => return Err(StoreError::Locked),
212            Err(e) => return Err(StoreError::Other(e)),
213        };
214
215        // Options for stream keyspace: point reads by frame ID
216        let stream_opts = || {
217            KeyspaceCreateOptions::default()
218                .max_memtable_size(8 * 1024 * 1024) // 8 MiB
219                .data_block_size_policy(BlockSizePolicy::all(16 * 1024)) // 16 KiB
220                .data_block_hash_ratio_policy(HashRatioPolicy::all(8.0))
221                .expect_point_read_hits(true)
222        };
223
224        // Options for idx_topic keyspace: prefix scans only
225        let idx_opts = || {
226            KeyspaceCreateOptions::default()
227                .max_memtable_size(8 * 1024 * 1024) // 8 MiB
228                .data_block_size_policy(BlockSizePolicy::all(16 * 1024)) // 16 KiB
229                .data_block_hash_ratio_policy(HashRatioPolicy::all(0.0)) // no point reads
230                .expect_point_read_hits(true)
231        };
232
233        let stream = db.keyspace("stream", stream_opts).unwrap();
234        let idx_topic = db.keyspace("idx_topic", idx_opts).unwrap();
235
236        let (broadcast_tx, _) = broadcast::channel(1024);
237        let (gc_tx, gc_rx) = mpsc::unbounded_channel();
238
239        let store = Store {
240            path: path.clone(),
241            db,
242            stream,
243            idx_topic,
244            broadcast_tx,
245            gc_tx,
246            append_lock: Arc::new(Mutex::new(())),
247        };
248
249        // Spawn gc worker thread
250        spawn_gc_worker(gc_rx, store.clone());
251
252        Ok(store)
253    }
254
255    pub async fn wait_for_gc(&self) {
256        let (tx, rx) = tokio::sync::oneshot::channel();
257        let _ = self.gc_tx.send(GCTask::Drain(tx));
258        let _ = rx.await;
259    }
260
261    #[tracing::instrument(skip(self))]
262    pub async fn read(&self, options: ReadOptions) -> tokio::sync::mpsc::Receiver<Frame> {
263        let (tx, rx) = tokio::sync::mpsc::channel(100);
264
265        let should_follow = matches!(
266            options.follow,
267            FollowOption::On | FollowOption::WithHeartbeat(_)
268        );
269
270        // Only take broadcast subscription if following. We initate the subscription here to
271        // ensure we don't miss any messages between historical processing and starting the
272        // broadcast subscription.
273        let broadcast_rx = if should_follow {
274            Some(self.broadcast_tx.subscribe())
275        } else {
276            None
277        };
278
279        // Only create done channel if we're doing historical processing
280        let done_rx = if !options.new {
281            let (done_tx, done_rx) = tokio::sync::oneshot::channel();
282            let tx_clone = tx.clone();
283            let store = self.clone();
284            let options = options.clone();
285            let should_follow_clone = should_follow;
286            let gc_tx = self.gc_tx.clone();
287
288            // Spawn OS thread to handle historical events
289            std::thread::spawn(move || {
290                let mut last_id = None;
291                let mut count = 0;
292
293                // Handle --last N: get the N most recent frames
294                if let Some(last_n) = options.last {
295                    let iter: Box<dyn Iterator<Item = Frame>> = match options.topic.as_deref() {
296                        None | Some("*") => store.iter_frames_rev(),
297                        Some(topic) if topic.ends_with(".*") => {
298                            let prefix = &topic[..topic.len() - 1];
299                            store.iter_frames_by_topic_prefix_rev(prefix)
300                        }
301                        Some(topic) => store.iter_frames_by_topic_rev(topic),
302                    };
303
304                    // Collect last N frames (in reverse order), skipping expired
305                    let mut frames: Vec<Frame> = Vec::with_capacity(last_n);
306                    for frame in iter {
307                        if let Some(TTL::Time(ttl)) = frame.ttl.as_ref() {
308                            if is_expired(&frame.id, ttl) {
309                                let _ = gc_tx.send(GCTask::Remove(frame.id));
310                                continue;
311                            }
312                        }
313                        frames.push(frame);
314                        if frames.len() >= last_n {
315                            break;
316                        }
317                    }
318
319                    // Reverse to chronological order and send
320                    for frame in frames.into_iter().rev() {
321                        last_id = Some(frame.id);
322                        count += 1;
323                        if tx_clone.blocking_send(frame).is_err() {
324                            return;
325                        }
326                    }
327                } else {
328                    // Normal forward iteration
329                    // Determine start bound: from (inclusive) takes precedence over after (exclusive)
330                    let start_bound = options
331                        .from
332                        .as_ref()
333                        .map(|id| (id, true))
334                        .or_else(|| options.after.as_ref().map(|id| (id, false)));
335
336                    let iter: Box<dyn Iterator<Item = Frame>> = match options.topic.as_deref() {
337                        None | Some("*") => store.iter_frames(start_bound),
338                        Some(topic) if topic.ends_with(".*") => {
339                            // Wildcard: "user.*" -> prefix "user."
340                            let prefix = &topic[..topic.len() - 1]; // strip "*", keep "."
341                            store.iter_frames_by_topic_prefix(prefix, start_bound)
342                        }
343                        Some(topic) => store.iter_frames_by_topic(topic, start_bound),
344                    };
345
346                    for frame in iter {
347                        if let Some(TTL::Time(ttl)) = frame.ttl.as_ref() {
348                            if is_expired(&frame.id, ttl) {
349                                let _ = gc_tx.send(GCTask::Remove(frame.id));
350                                continue;
351                            }
352                        }
353
354                        last_id = Some(frame.id);
355
356                        if let Some(limit) = options.limit {
357                            if count >= limit {
358                                return; // Exit early if limit reached
359                            }
360                        }
361
362                        if tx_clone.blocking_send(frame).is_err() {
363                            return;
364                        }
365                        count += 1;
366                    }
367                }
368
369                // Send threshold message if following
370                if should_follow_clone {
371                    let threshold = Frame::builder("xs.threshold")
372                        .id(scru128::new())
373                        .ttl(TTL::Ephemeral)
374                        .build();
375                    if tx_clone.blocking_send(threshold).is_err() {
376                        return;
377                    }
378                }
379
380                // Signal completion with the last seen ID and count
381                let _ = done_tx.send((last_id, count));
382            });
383
384            Some(done_rx)
385        } else {
386            None
387        };
388
389        // Handle broadcast subscription and heartbeat
390        if let Some(broadcast_rx) = broadcast_rx {
391            {
392                let tx = tx.clone();
393                let limit = options.limit;
394
395                tokio::spawn(async move {
396                    // If we have a done_rx, wait for historical processing
397                    let (last_id, mut count) = match done_rx {
398                        Some(done_rx) => match done_rx.await {
399                            Ok((id, count)) => (id, count),
400                            Err(_) => return, // Historical processing failed/cancelled
401                        },
402                        None => (None, 0),
403                    };
404
405                    let mut broadcast_rx = broadcast_rx;
406                    while let Ok(frame) = broadcast_rx.recv().await {
407                        // Filter by topic (exact match or wildcard)
408                        match options.topic.as_deref() {
409                            None | Some("*") => {}
410                            Some(topic) if topic.ends_with(".*") => {
411                                let prefix = &topic[..topic.len() - 1]; // "user.*" -> "user."
412                                if !frame.topic.starts_with(prefix) {
413                                    continue;
414                                }
415                            }
416                            Some(topic) => {
417                                if frame.topic != topic {
418                                    continue;
419                                }
420                            }
421                        }
422
423                        // Skip if we've already seen this frame during historical scan
424                        if let Some(last_scanned_id) = last_id {
425                            if frame.id <= last_scanned_id {
426                                continue;
427                            }
428                        }
429
430                        if tx.send(frame).await.is_err() {
431                            break;
432                        }
433
434                        if let Some(limit) = limit {
435                            count += 1;
436                            if count >= limit {
437                                break;
438                            }
439                        }
440                    }
441                });
442            }
443
444            // Handle heartbeat if requested
445            if let FollowOption::WithHeartbeat(duration) = options.follow {
446                let heartbeat_tx = tx;
447                tokio::spawn(async move {
448                    loop {
449                        tokio::time::sleep(duration).await;
450                        let frame = Frame::builder("xs.pulse")
451                            .id(scru128::new())
452                            .ttl(TTL::Ephemeral)
453                            .build();
454                        if heartbeat_tx.send(frame).await.is_err() {
455                            break;
456                        }
457                    }
458                });
459            }
460        }
461
462        rx
463    }
464
465    /// Read frames synchronously with full ReadOptions support.
466    pub fn read_sync(&self, options: ReadOptions) -> impl Iterator<Item = Frame> + '_ {
467        let gc_tx = self.gc_tx.clone();
468
469        // Filter out expired frames
470        let filter_expired = move |frame: Frame, gc_tx: &UnboundedSender<GCTask>| {
471            if let Some(TTL::Time(ttl)) = frame.ttl.as_ref() {
472                if is_expired(&frame.id, ttl) {
473                    let _ = gc_tx.send(GCTask::Remove(frame.id));
474                    return None;
475                }
476            }
477            Some(frame)
478        };
479
480        let frames: Vec<Frame> = if let Some(last_n) = options.last {
481            // Handle --last N: get the N most recent frames
482            let iter: Box<dyn Iterator<Item = Frame>> = match options.topic.as_deref() {
483                None | Some("*") => self.iter_frames_rev(),
484                Some(topic) if topic.ends_with(".*") => {
485                    let prefix = &topic[..topic.len() - 1];
486                    self.iter_frames_by_topic_prefix_rev(prefix)
487                }
488                Some(topic) => self.iter_frames_by_topic_rev(topic),
489            };
490
491            // Collect last N frames (in reverse order), skipping expired
492            let mut frames: Vec<Frame> = Vec::with_capacity(last_n);
493            for frame in iter {
494                if let Some(frame) = filter_expired(frame, &gc_tx) {
495                    frames.push(frame);
496                    if frames.len() >= last_n {
497                        break;
498                    }
499                }
500            }
501
502            // Reverse to chronological order
503            frames.reverse();
504            frames
505        } else {
506            // Normal forward iteration
507            let start_bound = options
508                .from
509                .as_ref()
510                .map(|id| (id, true))
511                .or_else(|| options.after.as_ref().map(|id| (id, false)));
512
513            let iter: Box<dyn Iterator<Item = Frame>> = match options.topic.as_deref() {
514                None | Some("*") => self.iter_frames(start_bound),
515                Some(topic) if topic.ends_with(".*") => {
516                    let prefix = &topic[..topic.len() - 1];
517                    self.iter_frames_by_topic_prefix(prefix, start_bound)
518                }
519                Some(topic) => self.iter_frames_by_topic(topic, start_bound),
520            };
521
522            iter.filter_map(|frame| filter_expired(frame, &gc_tx))
523                .take(options.limit.unwrap_or(usize::MAX))
524                .collect()
525        };
526
527        frames.into_iter()
528    }
529
530    /// Returns the current module state as of a given point in the stream.
531    ///
532    /// Scans all frames up to (and including) `as_of` and returns a mapping of
533    /// module name to CAS hash for the latest frame on each `xs.module.<name>`
534    /// topic.
535    pub fn nu_modules_at(
536        &self,
537        as_of: &Scru128Id,
538    ) -> std::collections::HashMap<String, ssri::Integrity> {
539        let mut modules = std::collections::HashMap::new();
540        let options = ReadOptions::builder().follow(FollowOption::Off).build();
541        for frame in self.read_sync(options) {
542            if frame.id > *as_of {
543                break;
544            }
545            if let Some(hash) = frame.hash {
546                if let Some(name) = frame.topic.strip_prefix("xs.module.") {
547                    if !name.is_empty() {
548                        modules.insert(name.to_string(), hash);
549                    }
550                }
551            }
552        }
553        modules
554    }
555
556    pub fn get(&self, id: &Scru128Id) -> Option<Frame> {
557        self.stream
558            .get(id.to_bytes())
559            .unwrap()
560            .map(|value| deserialize_frame((id.as_bytes(), value)))
561    }
562
563    #[tracing::instrument(skip(self), fields(id = %id.to_string()))]
564    pub fn remove(&self, id: &Scru128Id) -> Result<(), crate::error::Error> {
565        let Some(frame) = self.get(id) else {
566            // Already deleted
567            return Ok(());
568        };
569
570        // Build topic key directly (no validation - frame already exists)
571        let mut topic_key = idx_topic_key_prefix(&frame.topic);
572        topic_key.extend(frame.id.as_bytes());
573
574        // Get prefix index keys for hierarchical queries
575        let prefix_keys = idx_topic_prefix_keys(&frame.topic, &frame.id);
576
577        let mut batch = self.db.batch();
578        batch.remove(&self.stream, id.as_bytes());
579        batch.remove(&self.idx_topic, topic_key);
580        for prefix_key in &prefix_keys {
581            batch.remove(&self.idx_topic, prefix_key);
582        }
583        batch.commit()?;
584        self.db.persist(PersistMode::SyncAll)?;
585        Ok(())
586    }
587
588    pub async fn cas_reader(&self, hash: ssri::Integrity) -> cacache::Result<cacache::Reader> {
589        cacache::Reader::open_hash(&self.path.join("cacache"), hash).await
590    }
591
592    pub fn cas_reader_sync(&self, hash: ssri::Integrity) -> cacache::Result<cacache::SyncReader> {
593        cacache::SyncReader::open_hash(self.path.join("cacache"), hash)
594    }
595
596    pub async fn cas_writer(&self) -> cacache::Result<cacache::Writer> {
597        cacache::WriteOpts::new()
598            .open_hash(&self.path.join("cacache"))
599            .await
600    }
601
602    pub fn cas_writer_sync(&self) -> cacache::Result<cacache::SyncWriter> {
603        cacache::WriteOpts::new().open_hash_sync(self.path.join("cacache"))
604    }
605
606    pub async fn cas_insert(&self, content: impl AsRef<[u8]>) -> cacache::Result<ssri::Integrity> {
607        cacache::write_hash(&self.path.join("cacache"), content).await
608    }
609
610    pub fn cas_insert_sync(&self, content: impl AsRef<[u8]>) -> cacache::Result<ssri::Integrity> {
611        cacache::write_hash_sync(self.path.join("cacache"), content)
612    }
613
614    pub async fn cas_insert_bytes(&self, bytes: &[u8]) -> cacache::Result<ssri::Integrity> {
615        self.cas_insert(bytes).await
616    }
617
618    pub fn cas_insert_bytes_sync(&self, bytes: &[u8]) -> cacache::Result<ssri::Integrity> {
619        self.cas_insert_sync(bytes)
620    }
621
622    pub async fn cas_read(&self, hash: &ssri::Integrity) -> cacache::Result<Vec<u8>> {
623        cacache::read_hash(&self.path.join("cacache"), hash).await
624    }
625
626    pub fn cas_read_sync(&self, hash: &ssri::Integrity) -> cacache::Result<Vec<u8>> {
627        cacache::read_hash_sync(self.path.join("cacache"), hash)
628    }
629
630    #[tracing::instrument(skip(self))]
631    pub fn insert_frame(&self, frame: &Frame) -> Result<(), crate::error::Error> {
632        let encoded: Vec<u8> = serde_json::to_vec(&frame).unwrap();
633
634        // Get the index topic key (also validates topic)
635        let topic_key = idx_topic_key_from_frame(frame)?;
636
637        // Get prefix index keys for hierarchical queries
638        let prefix_keys = idx_topic_prefix_keys(&frame.topic, &frame.id);
639
640        let mut batch = self.db.batch();
641        batch.insert(&self.stream, frame.id.as_bytes(), encoded);
642        batch.insert(&self.idx_topic, topic_key, b"");
643        for prefix_key in &prefix_keys {
644            batch.insert(&self.idx_topic, prefix_key, b"");
645        }
646        batch.commit()?;
647        self.db.persist(PersistMode::SyncAll)?;
648        Ok(())
649    }
650
651    pub fn append(&self, mut frame: Frame) -> Result<Frame, crate::error::Error> {
652        // Serialize all appends to ensure ID generation, write, and broadcast
653        // happen atomically. This guarantees subscribers receive frames in
654        // scru128 ID order.
655        let _guard = self.append_lock.lock().unwrap();
656
657        frame.id = scru128::new();
658
659        // Check for null byte in topic (in case we're not storing the frame)
660        idx_topic_key_from_frame(&frame)?;
661
662        // only store the frame if it's not ephemeral
663        if frame.ttl != Some(TTL::Ephemeral) {
664            self.insert_frame(&frame)?;
665
666            // If this is a Last TTL, schedule a gc task
667            if let Some(TTL::Last(n)) = frame.ttl {
668                let _ = self.gc_tx.send(GCTask::CheckLastTTL {
669                    topic: frame.topic.clone(),
670                    keep: n,
671                });
672            }
673        }
674
675        let _ = self.broadcast_tx.send(frame.clone());
676        Ok(frame)
677    }
678
679    /// Iterate frames starting from a bound.
680    /// `start` is `(id, inclusive)` where inclusive=true means >= and inclusive=false means >.
681    fn iter_frames(
682        &self,
683        start: Option<(&Scru128Id, bool)>,
684    ) -> Box<dyn Iterator<Item = Frame> + '_> {
685        let range = match start {
686            Some((id, true)) => (Bound::Included(id.as_bytes().to_vec()), Bound::Unbounded),
687            Some((id, false)) => (Bound::Excluded(id.as_bytes().to_vec()), Bound::Unbounded),
688            None => (Bound::Unbounded, Bound::Unbounded),
689        };
690
691        Box::new(self.stream.range(range).filter_map(|guard| {
692            let (key, value) = guard.into_inner().ok()?;
693            Some(deserialize_frame((key, value)))
694        }))
695    }
696
697    /// Iterate frames in reverse order (most recent first).
698    fn iter_frames_rev(&self) -> Box<dyn Iterator<Item = Frame> + '_> {
699        Box::new(self.stream.iter().rev().filter_map(|guard| {
700            let (key, value) = guard.into_inner().ok()?;
701            Some(deserialize_frame((key, value)))
702        }))
703    }
704
705    /// Iterate frames by topic in reverse order (most recent first).
706    fn iter_frames_by_topic_rev<'a>(
707        &'a self,
708        topic: &'a str,
709    ) -> Box<dyn Iterator<Item = Frame> + 'a> {
710        let prefix = idx_topic_key_prefix(topic);
711        Box::new(
712            self.idx_topic
713                .prefix(prefix)
714                .rev()
715                .filter_map(move |guard| {
716                    let key = guard.key().ok()?;
717                    let frame_id = idx_topic_frame_id_from_key(&key);
718                    self.get(&frame_id)
719                }),
720        )
721    }
722
723    /// Iterate frames by topic prefix in reverse order (most recent first).
724    fn iter_frames_by_topic_prefix_rev<'a>(
725        &'a self,
726        prefix: &'a str,
727    ) -> Box<dyn Iterator<Item = Frame> + 'a> {
728        let mut index_prefix = Vec::with_capacity(prefix.len() + 1);
729        index_prefix.extend(prefix.as_bytes());
730        index_prefix.push(NULL_DELIMITER);
731
732        Box::new(
733            self.idx_topic
734                .prefix(index_prefix)
735                .rev()
736                .filter_map(move |guard| {
737                    let key = guard.key().ok()?;
738                    let frame_id = idx_topic_frame_id_from_key(&key);
739                    self.get(&frame_id)
740                }),
741        )
742    }
743
744    fn iter_frames_by_topic<'a>(
745        &'a self,
746        topic: &'a str,
747        start: Option<(&'a Scru128Id, bool)>,
748    ) -> Box<dyn Iterator<Item = Frame> + 'a> {
749        let prefix = idx_topic_key_prefix(topic);
750        Box::new(self.idx_topic.prefix(prefix).filter_map(move |guard| {
751            let key = guard.key().ok()?;
752            let frame_id = idx_topic_frame_id_from_key(&key);
753            if let Some((bound_id, inclusive)) = start {
754                if inclusive {
755                    if frame_id < *bound_id {
756                        return None;
757                    }
758                } else if frame_id <= *bound_id {
759                    return None;
760                }
761            }
762            self.get(&frame_id)
763        }))
764    }
765
766    /// Iterate frames matching a topic prefix (for wildcard queries like "user.*").
767    /// The prefix should include the trailing dot (e.g., "user." for "user.*").
768    fn iter_frames_by_topic_prefix<'a>(
769        &'a self,
770        prefix: &'a str,
771        start: Option<(&'a Scru128Id, bool)>,
772    ) -> Box<dyn Iterator<Item = Frame> + 'a> {
773        // Build index prefix: "user.\0" for scanning all "user.*" entries
774        let mut index_prefix = Vec::with_capacity(prefix.len() + 1);
775        index_prefix.extend(prefix.as_bytes());
776        index_prefix.push(NULL_DELIMITER);
777
778        Box::new(
779            self.idx_topic
780                .prefix(index_prefix)
781                .filter_map(move |guard| {
782                    let key = guard.key().ok()?;
783                    let frame_id = idx_topic_frame_id_from_key(&key);
784                    if let Some((bound_id, inclusive)) = start {
785                        if inclusive {
786                            if frame_id < *bound_id {
787                                return None;
788                            }
789                        } else if frame_id <= *bound_id {
790                            return None;
791                        }
792                    }
793                    self.get(&frame_id)
794                }),
795        )
796    }
797}
798
799fn spawn_gc_worker(mut gc_rx: UnboundedReceiver<GCTask>, store: Store) {
800    std::thread::spawn(move || {
801        while let Some(task) = gc_rx.blocking_recv() {
802            match task {
803                GCTask::Remove(id) => {
804                    let _ = store.remove(&id);
805                }
806
807                GCTask::CheckLastTTL { topic, keep } => {
808                    let prefix = idx_topic_key_prefix(&topic);
809                    let frames_to_remove: Vec<_> = store
810                        .idx_topic
811                        .prefix(&prefix)
812                        .rev() // Scan from newest to oldest
813                        .skip(keep as usize)
814                        .filter_map(|guard| {
815                            let key = guard.key().ok()?;
816                            Some(Scru128Id::from_bytes(
817                                idx_topic_frame_id_from_key(&key).into(),
818                            ))
819                        })
820                        .collect();
821
822                    for frame_id in frames_to_remove {
823                        let _ = store.remove(&frame_id);
824                    }
825                }
826
827                GCTask::Drain(tx) => {
828                    let _ = tx.send(());
829                }
830            }
831        }
832    });
833}
834
835fn is_expired(id: &Scru128Id, ttl: &Duration) -> bool {
836    let created_ms = id.timestamp();
837    let expires_ms = created_ms.saturating_add(ttl.as_millis() as u64);
838    let now_ms = std::time::SystemTime::now()
839        .duration_since(std::time::UNIX_EPOCH)
840        .unwrap()
841        .as_millis() as u64;
842
843    now_ms >= expires_ms
844}
845
846const NULL_DELIMITER: u8 = 0;
847const MAX_TOPIC_LENGTH: usize = 255;
848
849/// Validates a topic name according to ADR 0001.
850/// - Allowed characters: a-z A-Z 0-9 _ - .
851/// - Must start with: a-z A-Z 0-9 _
852/// - Cannot be empty, cannot end with '.', max 255 bytes
853pub fn validate_topic(topic: &str) -> Result<(), crate::error::Error> {
854    if topic.is_empty() {
855        return Err("Topic cannot be empty".to_string().into());
856    }
857    if topic.len() > MAX_TOPIC_LENGTH {
858        return Err(format!("Topic exceeds max length of {MAX_TOPIC_LENGTH} bytes").into());
859    }
860    if topic.ends_with('.') {
861        return Err("Topic cannot end with '.'".to_string().into());
862    }
863    if topic.contains("..") {
864        return Err("Topic cannot contain consecutive dots".to_string().into());
865    }
866
867    let bytes = topic.as_bytes();
868    let first = bytes[0];
869    if !first.is_ascii_alphabetic() && first != b'_' {
870        return Err("Topic must start with a-z, A-Z, or _".to_string().into());
871    }
872
873    for &b in bytes {
874        if !b.is_ascii_alphanumeric() && b != b'_' && b != b'-' && b != b'.' {
875            return Err(format!(
876                "Topic contains invalid character: '{}'. Allowed: a-z A-Z 0-9 _ - .",
877                b as char
878            )
879            .into());
880        }
881    }
882
883    Ok(())
884}
885
886/// Validates a topic query (for --topic flag).
887/// Allows wildcards: "*" (match all) or "prefix.*" (match children).
888pub fn validate_topic_query(topic: &str) -> Result<(), crate::error::Error> {
889    if topic == "*" {
890        return Ok(());
891    }
892    if let Some(prefix) = topic.strip_suffix(".*") {
893        // Validate the prefix part (e.g., "user" in "user.*")
894        // Prefix can be empty edge case: ".*" is not valid
895        if prefix.is_empty() {
896            return Err("Wildcard '.*' requires a prefix".to_string().into());
897        }
898        validate_topic(prefix)
899    } else {
900        validate_topic(topic)
901    }
902}
903
904/// Generate prefix index keys for hierarchical topic queries.
905/// For topic "user.id1.messages", returns keys for prefixes "user." and "user.id1."
906fn idx_topic_prefix_keys(topic: &str, frame_id: &scru128::Scru128Id) -> Vec<Vec<u8>> {
907    let mut keys = Vec::new();
908    let mut pos = 0;
909    while let Some(dot_pos) = topic[pos..].find('.') {
910        let prefix = &topic[..pos + dot_pos + 1]; // include the dot
911        let mut key = Vec::with_capacity(prefix.len() + 1 + 16);
912        key.extend(prefix.as_bytes());
913        key.push(NULL_DELIMITER);
914        key.extend(frame_id.as_bytes());
915        keys.push(key);
916        pos += dot_pos + 1;
917    }
918    keys
919}
920
921fn idx_topic_key_prefix(topic: &str) -> Vec<u8> {
922    let mut v = Vec::with_capacity(topic.len() + 1); // topic bytes + delimiter
923    v.extend(topic.as_bytes()); // topic string as UTF-8 bytes
924    v.push(NULL_DELIMITER); // Delimiter for variable-sized keys
925    v
926}
927
928pub(crate) fn idx_topic_key_from_frame(frame: &Frame) -> Result<Vec<u8>, crate::error::Error> {
929    validate_topic(&frame.topic)?;
930    let mut v = idx_topic_key_prefix(&frame.topic);
931    v.extend(frame.id.as_bytes());
932    Ok(v)
933}
934
935fn idx_topic_frame_id_from_key(key: &[u8]) -> Scru128Id {
936    let frame_id_bytes = &key[key.len() - 16..];
937    Scru128Id::from_bytes(frame_id_bytes.try_into().unwrap())
938}
939
940fn deserialize_frame<B1: AsRef<[u8]> + std::fmt::Debug, B2: AsRef<[u8]>>(
941    record: (B1, B2),
942) -> Frame {
943    serde_json::from_slice(record.1.as_ref()).unwrap_or_else(|e| {
944        // Try to convert the key to a Scru128Id and print in a format that can be copied for deletion
945        let key_bytes = record.0.as_ref();
946        if key_bytes.len() == 16 {
947            if let Ok(bytes) = key_bytes.try_into() {
948                let id = Scru128Id::from_bytes(bytes);
949                eprintln!("CORRUPTED_RECORD_ID: {id}");
950            }
951        }
952        let key = std::str::from_utf8(record.0.as_ref()).unwrap();
953        let value = std::str::from_utf8(record.1.as_ref()).unwrap();
954        panic!("Failed to deserialize frame: {e} {key} {value}")
955    })
956}