1mod ttl;
23pub use ttl::*;
24
25#[cfg(test)]
26mod tests;
27
28use std::ops::Bound;
29use std::path::PathBuf;
30use std::time::Duration;
31
32use tokio::sync::broadcast;
33use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender};
34
35use std::sync::{Arc, Mutex};
36
37use nu_protocol::engine::EngineState;
38use scru128::Scru128Id;
39
40use serde::{Deserialize, Deserializer, Serialize};
41
42use fjall::{
43 config::{BlockSizePolicy, HashRatioPolicy},
44 Database, Error as FjallError, Keyspace, KeyspaceCreateOptions, PersistMode,
45};
46
47#[derive(Debug)]
49pub enum StoreError {
50 Locked,
52 Other(FjallError),
54}
55
56impl std::fmt::Display for StoreError {
57 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
58 match self {
59 StoreError::Locked => write!(f, "Store is locked by another process"),
60 StoreError::Other(e) => write!(f, "{e}"),
61 }
62 }
63}
64
65impl std::error::Error for StoreError {}
66
67#[derive(PartialEq, Eq, Serialize, Deserialize, Clone, Default, bon::Builder)]
88pub struct Frame {
89 #[builder(start_fn, into)]
93 pub topic: String,
94 #[builder(default)]
97 pub id: Scru128Id,
98 pub hash: Option<ssri::Integrity>,
101 pub meta: Option<serde_json::Value>,
103 pub ttl: Option<TTL>,
105}
106
107use std::fmt;
108
109impl fmt::Debug for Frame {
110 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
111 f.debug_struct("Frame")
112 .field("id", &format!("{id}", id = self.id))
113 .field("topic", &self.topic)
114 .field("hash", &self.hash.as_ref().map(|x| format!("{x}")))
115 .field("meta", &self.meta)
116 .field("ttl", &self.ttl)
117 .finish()
118 }
119}
120
121impl<'de> Deserialize<'de> for FollowOption {
122 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
123 where
124 D: Deserializer<'de>,
125 {
126 let s: String = Deserialize::deserialize(deserializer)?;
127 if s.is_empty() || s == "yes" {
128 Ok(FollowOption::On)
129 } else if let Ok(duration) = s.parse::<u64>() {
130 Ok(FollowOption::WithHeartbeat(Duration::from_millis(duration)))
131 } else {
132 match s.as_str() {
133 "true" => Ok(FollowOption::On),
134 "false" | "no" => Ok(FollowOption::Off),
135 _ => Err(serde::de::Error::custom("Invalid value for follow option")),
136 }
137 }
138 }
139}
140
141fn deserialize_bool<'de, D>(deserializer: D) -> Result<bool, D::Error>
142where
143 D: Deserializer<'de>,
144{
145 let s: String = Deserialize::deserialize(deserializer)?;
146 match s.as_str() {
147 "false" | "no" | "0" => Ok(false),
148 _ => Ok(true),
149 }
150}
151
152#[derive(PartialEq, Deserialize, Clone, Debug, Default, bon::Builder)]
167pub struct ReadOptions {
168 #[serde(default)]
171 #[builder(default)]
172 pub follow: FollowOption,
173 #[serde(default, deserialize_with = "deserialize_bool")]
175 #[builder(default)]
176 pub new: bool,
177 #[serde(rename = "after")]
179 pub after: Option<Scru128Id>,
180 pub from: Option<Scru128Id>,
182 pub limit: Option<usize>,
184 pub last: Option<usize>,
186 pub topic: Option<String>,
191}
192
193#[derive(Clone, Debug, PartialEq)]
195pub enum Pattern {
196 Exact(String),
198 Prefix(String),
201}
202
203impl Pattern {
204 fn matches(&self, topic: &str) -> bool {
205 match self {
206 Pattern::Exact(t) => t == topic,
207 Pattern::Prefix(p) => topic.starts_with(p.as_str()),
208 }
209 }
210}
211
212#[derive(Clone, Debug, PartialEq)]
219pub enum TopicFilter {
220 All,
222 Patterns(Vec<Pattern>),
224}
225
226impl TopicFilter {
227 pub fn parse(spec: &str) -> TopicFilter {
230 let mut patterns = Vec::new();
231 for part in spec.split(',') {
232 let part = part.trim();
233 if part.is_empty() {
234 continue;
235 }
236 if part == "*" {
237 return TopicFilter::All;
238 }
239 if let Some(prefix) = part.strip_suffix(".*") {
240 patterns.push(Pattern::Prefix(format!("{prefix}.")));
241 } else {
242 patterns.push(Pattern::Exact(part.to_string()));
243 }
244 }
245 if patterns.is_empty() {
246 TopicFilter::All
247 } else {
248 TopicFilter::Patterns(patterns)
249 }
250 }
251
252 pub fn from_option(spec: Option<&str>) -> TopicFilter {
254 match spec {
255 Some(s) => TopicFilter::parse(s),
256 None => TopicFilter::All,
257 }
258 }
259
260 pub fn matches(&self, topic: &str) -> bool {
262 match self {
263 TopicFilter::All => true,
264 TopicFilter::Patterns(patterns) => patterns.iter().any(|p| p.matches(topic)),
265 }
266 }
267}
268
269struct MergeById<'a> {
274 iters: Vec<Box<dyn Iterator<Item = Frame> + 'a>>,
275 heap: std::collections::BinaryHeap<MergeEntry>,
276 descending: bool,
277 last_emitted: Option<Scru128Id>,
278}
279
280struct MergeEntry {
281 key: u128,
282 idx: usize,
283 frame: Frame,
284}
285
286impl PartialEq for MergeEntry {
287 fn eq(&self, other: &Self) -> bool {
288 self.key == other.key && self.idx == other.idx
289 }
290}
291impl Eq for MergeEntry {}
292impl PartialOrd for MergeEntry {
293 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
294 Some(self.cmp(other))
295 }
296}
297impl Ord for MergeEntry {
298 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
299 self.key.cmp(&other.key).then(self.idx.cmp(&other.idx))
300 }
301}
302
303fn merge_key(id: &Scru128Id, descending: bool) -> u128 {
306 let raw = u128::from_be_bytes(id.to_bytes());
307 if descending {
308 raw
309 } else {
310 !raw
311 }
312}
313
314impl<'a> MergeById<'a> {
315 fn new(mut iters: Vec<Box<dyn Iterator<Item = Frame> + 'a>>, descending: bool) -> Self {
316 let mut heap = std::collections::BinaryHeap::with_capacity(iters.len());
317 for (idx, iter) in iters.iter_mut().enumerate() {
318 if let Some(frame) = iter.next() {
319 heap.push(MergeEntry {
320 key: merge_key(&frame.id, descending),
321 idx,
322 frame,
323 });
324 }
325 }
326 MergeById {
327 iters,
328 heap,
329 descending,
330 last_emitted: None,
331 }
332 }
333}
334
335impl Iterator for MergeById<'_> {
336 type Item = Frame;
337
338 fn next(&mut self) -> Option<Frame> {
339 loop {
340 let entry = self.heap.pop()?;
341 if let Some(frame) = self.iters[entry.idx].next() {
342 self.heap.push(MergeEntry {
343 key: merge_key(&frame.id, self.descending),
344 idx: entry.idx,
345 frame,
346 });
347 }
348 if self.last_emitted == Some(entry.frame.id) {
351 continue;
352 }
353 self.last_emitted = Some(entry.frame.id);
354 return Some(entry.frame);
355 }
356 }
357}
358
359impl ReadOptions {
360 pub fn from_query(query: Option<&str>) -> Result<Self, crate::error::Error> {
364 match query {
365 Some(q) => Ok(serde_urlencoded::from_str(q)?),
366 None => Ok(Self::default()),
367 }
368 }
369
370 pub fn to_query_string(&self) -> String {
374 let mut params = Vec::new();
375
376 match self.follow {
378 FollowOption::Off => {}
379 FollowOption::On => params.push(("follow", "true".to_string())),
380 FollowOption::WithHeartbeat(duration) => {
381 params.push(("follow", duration.as_millis().to_string()));
382 }
383 }
384
385 if self.new {
387 params.push(("new", "true".to_string()));
388 }
389
390 if let Some(after) = self.after {
392 params.push(("after", after.to_string()));
393 }
394
395 if let Some(from) = self.from {
397 params.push(("from", from.to_string()));
398 }
399
400 if let Some(limit) = self.limit {
402 params.push(("limit", limit.to_string()));
403 }
404
405 if let Some(last) = self.last {
407 params.push(("last", last.to_string()));
408 }
409
410 if let Some(topic) = &self.topic {
411 params.push(("topic", topic.clone()));
412 }
413
414 if params.is_empty() {
416 String::new()
417 } else {
418 url::form_urlencoded::Serializer::new(String::new())
419 .extend_pairs(params)
420 .finish()
421 }
422 }
423}
424
425#[derive(Default, PartialEq, Clone, Debug)]
427pub enum FollowOption {
428 #[default]
430 Off,
431 On,
433 WithHeartbeat(Duration),
436}
437
438#[derive(Debug)]
439enum GCTask {
440 Remove(Scru128Id),
441 CheckLastTTL { topic: String, keep: u32 },
442 Drain(tokio::sync::oneshot::Sender<()>),
443}
444
445#[derive(Clone)]
454pub struct Store {
455 pub path: PathBuf,
457 db: Database,
458 stream: Keyspace,
459 idx_topic: Keyspace,
460 broadcast_tx: broadcast::Sender<Frame>,
461 gc_tx: UnboundedSender<GCTask>,
462 append_lock: Arc<Mutex<()>>,
463 base_engine: Option<Arc<EngineState>>,
467}
468
469impl Store {
470 pub fn new(path: PathBuf) -> Result<Store, StoreError> {
485 let db = match Database::builder(path.join("fjall"))
486 .cache_size(32 * 1024 * 1024) .worker_threads(1)
488 .open()
489 {
490 Ok(db) => db,
491 Err(FjallError::Locked) => return Err(StoreError::Locked),
492 Err(e) => return Err(StoreError::Other(e)),
493 };
494
495 let stream_opts = || {
497 KeyspaceCreateOptions::default()
498 .max_memtable_size(8 * 1024 * 1024) .data_block_size_policy(BlockSizePolicy::all(16 * 1024)) .data_block_hash_ratio_policy(HashRatioPolicy::all(8.0))
501 .expect_point_read_hits(true)
502 };
503
504 let idx_opts = || {
506 KeyspaceCreateOptions::default()
507 .max_memtable_size(8 * 1024 * 1024) .data_block_size_policy(BlockSizePolicy::all(16 * 1024)) .data_block_hash_ratio_policy(HashRatioPolicy::all(0.0)) .expect_point_read_hits(true)
511 };
512
513 let stream = db.keyspace("stream", stream_opts).unwrap();
514 let idx_topic = db.keyspace("idx_topic", idx_opts).unwrap();
515
516 let (broadcast_tx, _) = broadcast::channel(1024);
517 let (gc_tx, gc_rx) = mpsc::unbounded_channel();
518
519 let store = Store {
520 path: path.clone(),
521 db,
522 stream,
523 idx_topic,
524 broadcast_tx,
525 gc_tx,
526 append_lock: Arc::new(Mutex::new(())),
527 base_engine: None,
528 };
529
530 spawn_gc_worker(gc_rx, store.clone());
532
533 Ok(store)
534 }
535
536 pub fn with_base_engine(mut self, base: EngineState) -> Self {
545 self.base_engine = Some(Arc::new(base));
546 self
547 }
548
549 pub fn base_engine(&self) -> Option<&EngineState> {
551 self.base_engine.as_deref()
552 }
553
554 pub async fn wait_for_gc(&self) {
558 let (tx, rx) = tokio::sync::oneshot::channel();
559 let _ = self.gc_tx.send(GCTask::Drain(tx));
560 let _ = rx.await;
561 }
562
563 #[tracing::instrument(skip(self))]
592 pub async fn read(&self, options: ReadOptions) -> tokio::sync::mpsc::Receiver<Frame> {
593 let (tx, rx) = tokio::sync::mpsc::channel(100);
594
595 let should_follow = matches!(
596 options.follow,
597 FollowOption::On | FollowOption::WithHeartbeat(_)
598 );
599
600 let broadcast_rx = if should_follow {
604 Some(self.broadcast_tx.subscribe())
605 } else {
606 None
607 };
608
609 let done_rx = if !options.new {
611 let (done_tx, done_rx) = tokio::sync::oneshot::channel();
612 let tx_clone = tx.clone();
613 let store = self.clone();
614 let options = options.clone();
615 let should_follow_clone = should_follow;
616 let gc_tx = self.gc_tx.clone();
617
618 std::thread::spawn(move || {
620 let mut last_id = None;
621 let mut count = 0;
622
623 let filter = TopicFilter::from_option(options.topic.as_deref());
625
626 if let Some(last_n) = options.last {
627 let iter = store.iter_for_filter_rev(&filter);
628
629 let mut frames: Vec<Frame> = Vec::with_capacity(last_n);
631 for frame in iter {
632 if let Some(TTL::Time(ttl)) = frame.ttl.as_ref() {
633 if is_expired(&frame.id, ttl) {
634 let _ = gc_tx.send(GCTask::Remove(frame.id));
635 continue;
636 }
637 }
638 frames.push(frame);
639 if frames.len() >= last_n {
640 break;
641 }
642 }
643
644 for frame in frames.into_iter().rev() {
646 last_id = Some(frame.id);
647 count += 1;
648 if tx_clone.blocking_send(frame).is_err() {
649 return;
650 }
651 }
652 } else {
653 let start_bound = options
656 .from
657 .as_ref()
658 .map(|id| (id, true))
659 .or_else(|| options.after.as_ref().map(|id| (id, false)));
660
661 let iter = store.iter_for_filter(&filter, start_bound);
662
663 for frame in iter {
664 if let Some(TTL::Time(ttl)) = frame.ttl.as_ref() {
665 if is_expired(&frame.id, ttl) {
666 let _ = gc_tx.send(GCTask::Remove(frame.id));
667 continue;
668 }
669 }
670
671 last_id = Some(frame.id);
672
673 if let Some(limit) = options.limit {
674 if count >= limit {
675 return; }
677 }
678
679 if tx_clone.blocking_send(frame).is_err() {
680 return;
681 }
682 count += 1;
683 }
684 }
685
686 if should_follow_clone {
688 let threshold = Frame::builder("xs.threshold")
689 .id(scru128::new())
690 .ttl(TTL::Ephemeral)
691 .build();
692 if tx_clone.blocking_send(threshold).is_err() {
693 return;
694 }
695 }
696
697 let _ = done_tx.send((last_id, count));
699 });
700
701 Some(done_rx)
702 } else {
703 None
704 };
705
706 if let Some(broadcast_rx) = broadcast_rx {
708 {
709 let tx = tx.clone();
710 let limit = options.limit;
711
712 tokio::spawn(async move {
713 let (last_id, mut count) = match done_rx {
715 Some(done_rx) => match done_rx.await {
716 Ok((id, count)) => (id, count),
717 Err(_) => return, },
719 None => (None, 0),
720 };
721
722 let filter = TopicFilter::from_option(options.topic.as_deref());
723
724 let mut broadcast_rx = broadcast_rx;
725 while let Ok(frame) = broadcast_rx.recv().await {
726 if !filter.matches(&frame.topic) {
728 continue;
729 }
730
731 if let Some(last_scanned_id) = last_id {
733 if frame.id <= last_scanned_id {
734 continue;
735 }
736 }
737
738 if tx.send(frame).await.is_err() {
739 break;
740 }
741
742 if let Some(limit) = limit {
743 count += 1;
744 if count >= limit {
745 break;
746 }
747 }
748 }
749 });
750 }
751
752 if let FollowOption::WithHeartbeat(duration) = options.follow {
754 let heartbeat_tx = tx;
755 tokio::spawn(async move {
756 loop {
757 tokio::time::sleep(duration).await;
758 let frame = Frame::builder("xs.pulse")
759 .id(scru128::new())
760 .ttl(TTL::Ephemeral)
761 .build();
762 if heartbeat_tx.send(frame).await.is_err() {
763 break;
764 }
765 }
766 });
767 }
768 }
769
770 rx
771 }
772
773 pub fn read_sync(&self, options: ReadOptions) -> impl Iterator<Item = Frame> + '_ {
790 let gc_tx = self.gc_tx.clone();
791
792 let filter_expired = move |frame: Frame, gc_tx: &UnboundedSender<GCTask>| {
794 if let Some(TTL::Time(ttl)) = frame.ttl.as_ref() {
795 if is_expired(&frame.id, ttl) {
796 let _ = gc_tx.send(GCTask::Remove(frame.id));
797 return None;
798 }
799 }
800 Some(frame)
801 };
802
803 let filter = TopicFilter::from_option(options.topic.as_deref());
804
805 let frames: Vec<Frame> = if let Some(last_n) = options.last {
806 let iter = self.iter_for_filter_rev(&filter);
808
809 let mut frames: Vec<Frame> = Vec::with_capacity(last_n);
811 for frame in iter {
812 if let Some(frame) = filter_expired(frame, &gc_tx) {
813 frames.push(frame);
814 if frames.len() >= last_n {
815 break;
816 }
817 }
818 }
819
820 frames.reverse();
822 frames
823 } else {
824 let start_bound = options
826 .from
827 .as_ref()
828 .map(|id| (id, true))
829 .or_else(|| options.after.as_ref().map(|id| (id, false)));
830
831 let iter = self.iter_for_filter(&filter, start_bound);
832
833 iter.filter_map(|frame| filter_expired(frame, &gc_tx))
834 .take(options.limit.unwrap_or(usize::MAX))
835 .collect()
836 };
837
838 frames.into_iter()
839 }
840
841 pub fn nu_modules_at(
853 &self,
854 as_of: &Scru128Id,
855 ) -> std::collections::HashMap<String, ssri::Integrity> {
856 let mut modules = std::collections::HashMap::new();
857 let options = ReadOptions::builder().follow(FollowOption::Off).build();
858 for frame in self.read_sync(options) {
859 if frame.id > *as_of {
860 break;
861 }
862 if let Some(hash) = frame.hash {
863 if let Some(name) = frame.topic.strip_prefix("xs.module.") {
864 if !name.is_empty() {
865 modules.insert(name.to_string(), hash);
866 }
867 }
868 }
869 }
870 modules
871 }
872
873 pub fn get(&self, id: &Scru128Id) -> Option<Frame> {
875 self.stream
876 .get(id.to_bytes())
877 .unwrap()
878 .map(|value| deserialize_frame((id.as_bytes(), value)))
879 }
880
881 #[tracing::instrument(skip(self), fields(id = %id.to_string()))]
887 pub fn remove(&self, id: &Scru128Id) -> Result<(), crate::error::Error> {
888 let Some(frame) = self.get(id) else {
889 return Ok(());
891 };
892
893 let mut topic_key = idx_topic_key_prefix(&frame.topic);
895 topic_key.extend(frame.id.as_bytes());
896
897 let prefix_keys = idx_topic_prefix_keys(&frame.topic, &frame.id);
899
900 let mut batch = self.db.batch();
901 batch.remove(&self.stream, id.as_bytes());
902 batch.remove(&self.idx_topic, topic_key);
903 for prefix_key in &prefix_keys {
904 batch.remove(&self.idx_topic, prefix_key);
905 }
906 batch.commit()?;
907 self.db.persist(PersistMode::SyncAll)?;
908 Ok(())
909 }
910
911 pub async fn cas_reader(&self, hash: ssri::Integrity) -> cacache::Result<cacache::Reader> {
921 cacache::Reader::open_hash(&self.path.join("cacache"), hash).await
922 }
923
924 pub fn cas_reader_sync(&self, hash: ssri::Integrity) -> cacache::Result<cacache::SyncReader> {
926 cacache::SyncReader::open_hash(self.path.join("cacache"), hash)
927 }
928
929 pub async fn cas_writer(&self) -> cacache::Result<cacache::Writer> {
931 cacache::WriteOpts::new()
932 .open_hash(&self.path.join("cacache"))
933 .await
934 }
935
936 pub fn cas_writer_sync(&self) -> cacache::Result<cacache::SyncWriter> {
938 cacache::WriteOpts::new().open_hash_sync(self.path.join("cacache"))
939 }
940
941 pub async fn cas_insert(&self, content: impl AsRef<[u8]>) -> cacache::Result<ssri::Integrity> {
944 cacache::write_hash(&self.path.join("cacache"), content).await
945 }
946
947 pub fn cas_insert_sync(&self, content: impl AsRef<[u8]>) -> cacache::Result<ssri::Integrity> {
949 cacache::write_hash_sync(self.path.join("cacache"), content)
950 }
951
952 pub async fn cas_insert_bytes(&self, bytes: &[u8]) -> cacache::Result<ssri::Integrity> {
954 self.cas_insert(bytes).await
955 }
956
957 pub fn cas_insert_bytes_sync(&self, bytes: &[u8]) -> cacache::Result<ssri::Integrity> {
959 self.cas_insert_sync(bytes)
960 }
961
962 pub async fn cas_read(&self, hash: &ssri::Integrity) -> cacache::Result<Vec<u8>> {
964 cacache::read_hash(&self.path.join("cacache"), hash).await
965 }
966
967 pub fn cas_read_sync(&self, hash: &ssri::Integrity) -> cacache::Result<Vec<u8>> {
969 cacache::read_hash_sync(self.path.join("cacache"), hash)
970 }
971
972 #[tracing::instrument(skip(self))]
981 pub fn insert_frame(&self, frame: &Frame) -> Result<(), crate::error::Error> {
982 let encoded: Vec<u8> = serde_json::to_vec(&frame).unwrap();
983
984 let topic_key = idx_topic_key_from_frame(frame)?;
986
987 let prefix_keys = idx_topic_prefix_keys(&frame.topic, &frame.id);
989
990 let mut batch = self.db.batch();
991 batch.insert(&self.stream, frame.id.as_bytes(), encoded);
992 batch.insert(&self.idx_topic, topic_key, b"");
993 for prefix_key in &prefix_keys {
994 batch.insert(&self.idx_topic, prefix_key, b"");
995 }
996 batch.commit()?;
997 self.db.persist(PersistMode::SyncAll)?;
998 Ok(())
999 }
1000
1001 pub fn append(&self, mut frame: Frame) -> Result<Frame, crate::error::Error> {
1034 let _guard = self.append_lock.lock().unwrap();
1038
1039 frame.id = scru128::new();
1040
1041 idx_topic_key_from_frame(&frame)?;
1043
1044 if frame.ttl != Some(TTL::Ephemeral) {
1046 self.insert_frame(&frame)?;
1047
1048 if let Some(TTL::Last(n)) = frame.ttl {
1050 let _ = self.gc_tx.send(GCTask::CheckLastTTL {
1051 topic: frame.topic.clone(),
1052 keep: n,
1053 });
1054 }
1055 }
1056
1057 let _ = self.broadcast_tx.send(frame.clone());
1058 Ok(frame)
1059 }
1060
1061 fn iter_frames(
1064 &self,
1065 start: Option<(&Scru128Id, bool)>,
1066 ) -> Box<dyn Iterator<Item = Frame> + '_> {
1067 let range = match start {
1068 Some((id, true)) => (Bound::Included(id.as_bytes().to_vec()), Bound::Unbounded),
1069 Some((id, false)) => (Bound::Excluded(id.as_bytes().to_vec()), Bound::Unbounded),
1070 None => (Bound::Unbounded, Bound::Unbounded),
1071 };
1072
1073 Box::new(self.stream.range(range).filter_map(|guard| {
1074 let (key, value) = guard.into_inner().ok()?;
1075 Some(deserialize_frame((key, value)))
1076 }))
1077 }
1078
1079 fn iter_frames_rev(&self) -> Box<dyn Iterator<Item = Frame> + '_> {
1081 Box::new(self.stream.iter().rev().filter_map(|guard| {
1082 let (key, value) = guard.into_inner().ok()?;
1083 Some(deserialize_frame((key, value)))
1084 }))
1085 }
1086
1087 fn iter_frames_by_topic_rev<'a>(
1089 &'a self,
1090 topic: &'a str,
1091 ) -> Box<dyn Iterator<Item = Frame> + 'a> {
1092 let prefix = idx_topic_key_prefix(topic);
1093 Box::new(
1094 self.idx_topic
1095 .prefix(prefix)
1096 .rev()
1097 .filter_map(move |guard| {
1098 let key = guard.key().ok()?;
1099 let frame_id = idx_topic_frame_id_from_key(&key);
1100 self.get(&frame_id)
1101 }),
1102 )
1103 }
1104
1105 fn iter_frames_by_topic_prefix_rev<'a>(
1107 &'a self,
1108 prefix: &'a str,
1109 ) -> Box<dyn Iterator<Item = Frame> + 'a> {
1110 let mut index_prefix = Vec::with_capacity(prefix.len() + 1);
1111 index_prefix.extend(prefix.as_bytes());
1112 index_prefix.push(NULL_DELIMITER);
1113
1114 Box::new(
1115 self.idx_topic
1116 .prefix(index_prefix)
1117 .rev()
1118 .filter_map(move |guard| {
1119 let key = guard.key().ok()?;
1120 let frame_id = idx_topic_frame_id_from_key(&key);
1121 self.get(&frame_id)
1122 }),
1123 )
1124 }
1125
1126 fn iter_frames_by_topic<'a>(
1127 &'a self,
1128 topic: &'a str,
1129 start: Option<(&'a Scru128Id, bool)>,
1130 ) -> Box<dyn Iterator<Item = Frame> + 'a> {
1131 let prefix = idx_topic_key_prefix(topic);
1132 Box::new(self.idx_topic.prefix(prefix).filter_map(move |guard| {
1133 let key = guard.key().ok()?;
1134 let frame_id = idx_topic_frame_id_from_key(&key);
1135 if let Some((bound_id, inclusive)) = start {
1136 if inclusive {
1137 if frame_id < *bound_id {
1138 return None;
1139 }
1140 } else if frame_id <= *bound_id {
1141 return None;
1142 }
1143 }
1144 self.get(&frame_id)
1145 }))
1146 }
1147
1148 fn iter_frames_by_topic_prefix<'a>(
1151 &'a self,
1152 prefix: &'a str,
1153 start: Option<(&'a Scru128Id, bool)>,
1154 ) -> Box<dyn Iterator<Item = Frame> + 'a> {
1155 let mut index_prefix = Vec::with_capacity(prefix.len() + 1);
1157 index_prefix.extend(prefix.as_bytes());
1158 index_prefix.push(NULL_DELIMITER);
1159
1160 Box::new(
1161 self.idx_topic
1162 .prefix(index_prefix)
1163 .filter_map(move |guard| {
1164 let key = guard.key().ok()?;
1165 let frame_id = idx_topic_frame_id_from_key(&key);
1166 if let Some((bound_id, inclusive)) = start {
1167 if inclusive {
1168 if frame_id < *bound_id {
1169 return None;
1170 }
1171 } else if frame_id <= *bound_id {
1172 return None;
1173 }
1174 }
1175 self.get(&frame_id)
1176 }),
1177 )
1178 }
1179
1180 fn iter_for_pattern<'a>(
1182 &'a self,
1183 pattern: &'a Pattern,
1184 start: Option<(&'a Scru128Id, bool)>,
1185 ) -> Box<dyn Iterator<Item = Frame> + 'a> {
1186 match pattern {
1187 Pattern::Exact(topic) => self.iter_frames_by_topic(topic, start),
1188 Pattern::Prefix(prefix) => self.iter_frames_by_topic_prefix(prefix, start),
1189 }
1190 }
1191
1192 fn iter_for_pattern_rev<'a>(
1194 &'a self,
1195 pattern: &'a Pattern,
1196 ) -> Box<dyn Iterator<Item = Frame> + 'a> {
1197 match pattern {
1198 Pattern::Exact(topic) => self.iter_frames_by_topic_rev(topic),
1199 Pattern::Prefix(prefix) => self.iter_frames_by_topic_prefix_rev(prefix),
1200 }
1201 }
1202
1203 fn iter_for_filter<'a>(
1207 &'a self,
1208 filter: &'a TopicFilter,
1209 start: Option<(&'a Scru128Id, bool)>,
1210 ) -> Box<dyn Iterator<Item = Frame> + 'a> {
1211 match filter {
1212 TopicFilter::All => self.iter_frames(start),
1213 TopicFilter::Patterns(patterns) if patterns.len() == 1 => {
1214 self.iter_for_pattern(&patterns[0], start)
1215 }
1216 TopicFilter::Patterns(patterns) => {
1217 let iters = patterns
1218 .iter()
1219 .map(|p| self.iter_for_pattern(p, start))
1220 .collect();
1221 Box::new(MergeById::new(iters, false))
1222 }
1223 }
1224 }
1225
1226 fn iter_for_filter_rev<'a>(
1229 &'a self,
1230 filter: &'a TopicFilter,
1231 ) -> Box<dyn Iterator<Item = Frame> + 'a> {
1232 match filter {
1233 TopicFilter::All => self.iter_frames_rev(),
1234 TopicFilter::Patterns(patterns) if patterns.len() == 1 => {
1235 self.iter_for_pattern_rev(&patterns[0])
1236 }
1237 TopicFilter::Patterns(patterns) => {
1238 let iters = patterns
1239 .iter()
1240 .map(|p| self.iter_for_pattern_rev(p))
1241 .collect();
1242 Box::new(MergeById::new(iters, true))
1243 }
1244 }
1245 }
1246}
1247
1248fn spawn_gc_worker(mut gc_rx: UnboundedReceiver<GCTask>, store: Store) {
1249 std::thread::spawn(move || {
1250 while let Some(task) = gc_rx.blocking_recv() {
1251 match task {
1252 GCTask::Remove(id) => {
1253 let _ = store.remove(&id);
1254 }
1255
1256 GCTask::CheckLastTTL { topic, keep } => {
1257 let prefix = idx_topic_key_prefix(&topic);
1258 let frames_to_remove: Vec<_> = store
1259 .idx_topic
1260 .prefix(&prefix)
1261 .rev() .skip(keep as usize)
1263 .filter_map(|guard| {
1264 let key = guard.key().ok()?;
1265 Some(Scru128Id::from_bytes(
1266 idx_topic_frame_id_from_key(&key).into(),
1267 ))
1268 })
1269 .collect();
1270
1271 for frame_id in frames_to_remove {
1272 let _ = store.remove(&frame_id);
1273 }
1274 }
1275
1276 GCTask::Drain(tx) => {
1277 let _ = tx.send(());
1278 }
1279 }
1280 }
1281 });
1282}
1283
1284fn is_expired(id: &Scru128Id, ttl: &Duration) -> bool {
1285 let created_ms = id.timestamp();
1286 let expires_ms = created_ms.saturating_add(ttl.as_millis() as u64);
1287 let now_ms = std::time::SystemTime::now()
1288 .duration_since(std::time::UNIX_EPOCH)
1289 .unwrap()
1290 .as_millis() as u64;
1291
1292 now_ms >= expires_ms
1293}
1294
1295const NULL_DELIMITER: u8 = 0;
1296const MAX_TOPIC_LENGTH: usize = 255;
1297
1298pub fn validate_topic(topic: &str) -> Result<(), crate::error::Error> {
1312 if topic.is_empty() {
1313 return Err("Topic cannot be empty".to_string().into());
1314 }
1315 if topic.len() > MAX_TOPIC_LENGTH {
1316 return Err(format!("Topic exceeds max length of {MAX_TOPIC_LENGTH} bytes").into());
1317 }
1318 if topic.ends_with('.') {
1319 return Err("Topic cannot end with '.'".to_string().into());
1320 }
1321 if topic.contains("..") {
1322 return Err("Topic cannot contain consecutive dots".to_string().into());
1323 }
1324
1325 let bytes = topic.as_bytes();
1326 let first = bytes[0];
1327 if !first.is_ascii_alphabetic() && first != b'_' {
1328 return Err("Topic must start with a-z, A-Z, or _".to_string().into());
1329 }
1330
1331 for &b in bytes {
1332 if !b.is_ascii_alphanumeric() && b != b'_' && b != b'-' && b != b'.' {
1333 return Err(format!(
1334 "Topic contains invalid character: '{}'. Allowed: a-z A-Z 0-9 _ - .",
1335 b as char
1336 )
1337 .into());
1338 }
1339 }
1340
1341 Ok(())
1342}
1343
1344pub fn validate_topic_query(spec: &str) -> Result<(), crate::error::Error> {
1348 let mut seen = false;
1349 for part in spec.split(',') {
1350 let part = part.trim();
1351 if part.is_empty() {
1352 continue;
1353 }
1354 seen = true;
1355 validate_topic_pattern(part)?;
1356 }
1357 if !seen {
1358 return Err("Topic query cannot be empty".to_string().into());
1359 }
1360 Ok(())
1361}
1362
1363fn validate_topic_pattern(topic: &str) -> Result<(), crate::error::Error> {
1365 if topic == "*" {
1366 return Ok(());
1367 }
1368 if let Some(prefix) = topic.strip_suffix(".*") {
1369 if prefix.is_empty() {
1372 return Err("Wildcard '.*' requires a prefix".to_string().into());
1373 }
1374 validate_topic(prefix)
1375 } else {
1376 validate_topic(topic)
1377 }
1378}
1379
1380fn idx_topic_prefix_keys(topic: &str, frame_id: &scru128::Scru128Id) -> Vec<Vec<u8>> {
1383 let mut keys = Vec::new();
1384 let mut pos = 0;
1385 while let Some(dot_pos) = topic[pos..].find('.') {
1386 let prefix = &topic[..pos + dot_pos + 1]; let mut key = Vec::with_capacity(prefix.len() + 1 + 16);
1388 key.extend(prefix.as_bytes());
1389 key.push(NULL_DELIMITER);
1390 key.extend(frame_id.as_bytes());
1391 keys.push(key);
1392 pos += dot_pos + 1;
1393 }
1394 keys
1395}
1396
1397fn idx_topic_key_prefix(topic: &str) -> Vec<u8> {
1398 let mut v = Vec::with_capacity(topic.len() + 1); v.extend(topic.as_bytes()); v.push(NULL_DELIMITER); v
1402}
1403
1404pub(crate) fn idx_topic_key_from_frame(frame: &Frame) -> Result<Vec<u8>, crate::error::Error> {
1405 validate_topic(&frame.topic)?;
1406 let mut v = idx_topic_key_prefix(&frame.topic);
1407 v.extend(frame.id.as_bytes());
1408 Ok(v)
1409}
1410
1411fn idx_topic_frame_id_from_key(key: &[u8]) -> Scru128Id {
1412 let frame_id_bytes = &key[key.len() - 16..];
1413 Scru128Id::from_bytes(frame_id_bytes.try_into().unwrap())
1414}
1415
1416fn deserialize_frame<B1: AsRef<[u8]> + std::fmt::Debug, B2: AsRef<[u8]>>(
1417 record: (B1, B2),
1418) -> Frame {
1419 serde_json::from_slice(record.1.as_ref()).unwrap_or_else(|e| {
1420 let key_bytes = record.0.as_ref();
1422 if key_bytes.len() == 16 {
1423 if let Ok(bytes) = key_bytes.try_into() {
1424 let id = Scru128Id::from_bytes(bytes);
1425 eprintln!("CORRUPTED_RECORD_ID: {id}");
1426 }
1427 }
1428 let key = std::str::from_utf8(record.0.as_ref()).unwrap();
1429 let value = std::str::from_utf8(record.1.as_ref()).unwrap();
1430 panic!("Failed to deserialize frame: {e} {key} {value}")
1431 })
1432}