1mod ttl;
2pub use ttl::*;
3
4#[cfg(test)]
5mod tests;
6
7use std::ops::Bound;
8use std::path::PathBuf;
9use std::time::Duration;
10
11use tokio::sync::broadcast;
12use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender};
13
14use std::sync::{Arc, Mutex};
15
16use scru128::Scru128Id;
17
18use serde::{Deserialize, Deserializer, Serialize};
19
20use fjall::{
21 config::{BlockSizePolicy, HashRatioPolicy},
22 Database, Error as FjallError, Keyspace, KeyspaceCreateOptions, PersistMode,
23};
24
25#[derive(Debug)]
26pub enum StoreError {
27 Locked,
28 Other(FjallError),
29}
30
31impl std::fmt::Display for StoreError {
32 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
33 match self {
34 StoreError::Locked => write!(f, "Store is locked by another process"),
35 StoreError::Other(e) => write!(f, "{e}"),
36 }
37 }
38}
39
40impl std::error::Error for StoreError {}
41
42#[derive(PartialEq, Eq, Serialize, Deserialize, Clone, Default, bon::Builder)]
43pub struct Frame {
44 #[builder(start_fn, into)]
45 pub topic: String,
46 #[builder(default)]
47 pub id: Scru128Id,
48 pub hash: Option<ssri::Integrity>,
49 pub meta: Option<serde_json::Value>,
50 pub ttl: Option<TTL>,
51}
52
53use std::fmt;
54
55impl fmt::Debug for Frame {
56 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
57 f.debug_struct("Frame")
58 .field("id", &format!("{id}", id = self.id))
59 .field("topic", &self.topic)
60 .field("hash", &self.hash.as_ref().map(|x| format!("{x}")))
61 .field("meta", &self.meta)
62 .field("ttl", &self.ttl)
63 .finish()
64 }
65}
66
67impl<'de> Deserialize<'de> for FollowOption {
68 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
69 where
70 D: Deserializer<'de>,
71 {
72 let s: String = Deserialize::deserialize(deserializer)?;
73 if s.is_empty() || s == "yes" {
74 Ok(FollowOption::On)
75 } else if let Ok(duration) = s.parse::<u64>() {
76 Ok(FollowOption::WithHeartbeat(Duration::from_millis(duration)))
77 } else {
78 match s.as_str() {
79 "true" => Ok(FollowOption::On),
80 "false" | "no" => Ok(FollowOption::Off),
81 _ => Err(serde::de::Error::custom("Invalid value for follow option")),
82 }
83 }
84 }
85}
86
87fn deserialize_bool<'de, D>(deserializer: D) -> Result<bool, D::Error>
88where
89 D: Deserializer<'de>,
90{
91 let s: String = Deserialize::deserialize(deserializer)?;
92 match s.as_str() {
93 "false" | "no" | "0" => Ok(false),
94 _ => Ok(true),
95 }
96}
97
98#[derive(PartialEq, Deserialize, Clone, Debug, Default, bon::Builder)]
99pub struct ReadOptions {
100 #[serde(default)]
101 #[builder(default)]
102 pub follow: FollowOption,
103 #[serde(default, deserialize_with = "deserialize_bool")]
104 #[builder(default)]
105 pub new: bool,
106 #[serde(rename = "after")]
108 pub after: Option<Scru128Id>,
109 pub from: Option<Scru128Id>,
111 pub limit: Option<usize>,
112 pub last: Option<usize>,
114 pub topic: Option<String>,
115}
116
117impl ReadOptions {
118 pub fn from_query(query: Option<&str>) -> Result<Self, crate::error::Error> {
119 match query {
120 Some(q) => Ok(serde_urlencoded::from_str(q)?),
121 None => Ok(Self::default()),
122 }
123 }
124
125 pub fn to_query_string(&self) -> String {
126 let mut params = Vec::new();
127
128 match self.follow {
130 FollowOption::Off => {}
131 FollowOption::On => params.push(("follow", "true".to_string())),
132 FollowOption::WithHeartbeat(duration) => {
133 params.push(("follow", duration.as_millis().to_string()));
134 }
135 }
136
137 if self.new {
139 params.push(("new", "true".to_string()));
140 }
141
142 if let Some(after) = self.after {
144 params.push(("after", after.to_string()));
145 }
146
147 if let Some(from) = self.from {
149 params.push(("from", from.to_string()));
150 }
151
152 if let Some(limit) = self.limit {
154 params.push(("limit", limit.to_string()));
155 }
156
157 if let Some(last) = self.last {
159 params.push(("last", last.to_string()));
160 }
161
162 if let Some(topic) = &self.topic {
163 params.push(("topic", topic.clone()));
164 }
165
166 if params.is_empty() {
168 String::new()
169 } else {
170 url::form_urlencoded::Serializer::new(String::new())
171 .extend_pairs(params)
172 .finish()
173 }
174 }
175}
176
177#[derive(Default, PartialEq, Clone, Debug)]
178pub enum FollowOption {
179 #[default]
180 Off,
181 On,
182 WithHeartbeat(Duration),
183}
184
185#[derive(Debug)]
186enum GCTask {
187 Remove(Scru128Id),
188 CheckLastTTL { topic: String, keep: u32 },
189 Drain(tokio::sync::oneshot::Sender<()>),
190}
191
192#[derive(Clone)]
193pub struct Store {
194 pub path: PathBuf,
195 db: Database,
196 stream: Keyspace,
197 idx_topic: Keyspace,
198 broadcast_tx: broadcast::Sender<Frame>,
199 gc_tx: UnboundedSender<GCTask>,
200 append_lock: Arc<Mutex<()>>,
201}
202
203impl Store {
204 pub fn new(path: PathBuf) -> Result<Store, StoreError> {
205 let db = match Database::builder(path.join("fjall"))
206 .cache_size(32 * 1024 * 1024) .worker_threads(1)
208 .open()
209 {
210 Ok(db) => db,
211 Err(FjallError::Locked) => return Err(StoreError::Locked),
212 Err(e) => return Err(StoreError::Other(e)),
213 };
214
215 let stream_opts = || {
217 KeyspaceCreateOptions::default()
218 .max_memtable_size(8 * 1024 * 1024) .data_block_size_policy(BlockSizePolicy::all(16 * 1024)) .data_block_hash_ratio_policy(HashRatioPolicy::all(8.0))
221 .expect_point_read_hits(true)
222 };
223
224 let idx_opts = || {
226 KeyspaceCreateOptions::default()
227 .max_memtable_size(8 * 1024 * 1024) .data_block_size_policy(BlockSizePolicy::all(16 * 1024)) .data_block_hash_ratio_policy(HashRatioPolicy::all(0.0)) .expect_point_read_hits(true)
231 };
232
233 let stream = db.keyspace("stream", stream_opts).unwrap();
234 let idx_topic = db.keyspace("idx_topic", idx_opts).unwrap();
235
236 let (broadcast_tx, _) = broadcast::channel(1024);
237 let (gc_tx, gc_rx) = mpsc::unbounded_channel();
238
239 let store = Store {
240 path: path.clone(),
241 db,
242 stream,
243 idx_topic,
244 broadcast_tx,
245 gc_tx,
246 append_lock: Arc::new(Mutex::new(())),
247 };
248
249 spawn_gc_worker(gc_rx, store.clone());
251
252 Ok(store)
253 }
254
255 pub async fn wait_for_gc(&self) {
256 let (tx, rx) = tokio::sync::oneshot::channel();
257 let _ = self.gc_tx.send(GCTask::Drain(tx));
258 let _ = rx.await;
259 }
260
261 #[tracing::instrument(skip(self))]
262 pub async fn read(&self, options: ReadOptions) -> tokio::sync::mpsc::Receiver<Frame> {
263 let (tx, rx) = tokio::sync::mpsc::channel(100);
264
265 let should_follow = matches!(
266 options.follow,
267 FollowOption::On | FollowOption::WithHeartbeat(_)
268 );
269
270 let broadcast_rx = if should_follow {
274 Some(self.broadcast_tx.subscribe())
275 } else {
276 None
277 };
278
279 let done_rx = if !options.new {
281 let (done_tx, done_rx) = tokio::sync::oneshot::channel();
282 let tx_clone = tx.clone();
283 let store = self.clone();
284 let options = options.clone();
285 let should_follow_clone = should_follow;
286 let gc_tx = self.gc_tx.clone();
287
288 std::thread::spawn(move || {
290 let mut last_id = None;
291 let mut count = 0;
292
293 if let Some(last_n) = options.last {
295 let iter: Box<dyn Iterator<Item = Frame>> = match options.topic.as_deref() {
296 None | Some("*") => store.iter_frames_rev(),
297 Some(topic) if topic.ends_with(".*") => {
298 let prefix = &topic[..topic.len() - 1];
299 store.iter_frames_by_topic_prefix_rev(prefix)
300 }
301 Some(topic) => store.iter_frames_by_topic_rev(topic),
302 };
303
304 let mut frames: Vec<Frame> = Vec::with_capacity(last_n);
306 for frame in iter {
307 if let Some(TTL::Time(ttl)) = frame.ttl.as_ref() {
308 if is_expired(&frame.id, ttl) {
309 let _ = gc_tx.send(GCTask::Remove(frame.id));
310 continue;
311 }
312 }
313 frames.push(frame);
314 if frames.len() >= last_n {
315 break;
316 }
317 }
318
319 for frame in frames.into_iter().rev() {
321 last_id = Some(frame.id);
322 count += 1;
323 if tx_clone.blocking_send(frame).is_err() {
324 return;
325 }
326 }
327 } else {
328 let start_bound = options
331 .from
332 .as_ref()
333 .map(|id| (id, true))
334 .or_else(|| options.after.as_ref().map(|id| (id, false)));
335
336 let iter: Box<dyn Iterator<Item = Frame>> = match options.topic.as_deref() {
337 None | Some("*") => store.iter_frames(start_bound),
338 Some(topic) if topic.ends_with(".*") => {
339 let prefix = &topic[..topic.len() - 1]; store.iter_frames_by_topic_prefix(prefix, start_bound)
342 }
343 Some(topic) => store.iter_frames_by_topic(topic, start_bound),
344 };
345
346 for frame in iter {
347 if let Some(TTL::Time(ttl)) = frame.ttl.as_ref() {
348 if is_expired(&frame.id, ttl) {
349 let _ = gc_tx.send(GCTask::Remove(frame.id));
350 continue;
351 }
352 }
353
354 last_id = Some(frame.id);
355
356 if let Some(limit) = options.limit {
357 if count >= limit {
358 return; }
360 }
361
362 if tx_clone.blocking_send(frame).is_err() {
363 return;
364 }
365 count += 1;
366 }
367 }
368
369 if should_follow_clone {
371 let threshold = Frame::builder("xs.threshold")
372 .id(scru128::new())
373 .ttl(TTL::Ephemeral)
374 .build();
375 if tx_clone.blocking_send(threshold).is_err() {
376 return;
377 }
378 }
379
380 let _ = done_tx.send((last_id, count));
382 });
383
384 Some(done_rx)
385 } else {
386 None
387 };
388
389 if let Some(broadcast_rx) = broadcast_rx {
391 {
392 let tx = tx.clone();
393 let limit = options.limit;
394
395 tokio::spawn(async move {
396 let (last_id, mut count) = match done_rx {
398 Some(done_rx) => match done_rx.await {
399 Ok((id, count)) => (id, count),
400 Err(_) => return, },
402 None => (None, 0),
403 };
404
405 let mut broadcast_rx = broadcast_rx;
406 while let Ok(frame) = broadcast_rx.recv().await {
407 match options.topic.as_deref() {
409 None | Some("*") => {}
410 Some(topic) if topic.ends_with(".*") => {
411 let prefix = &topic[..topic.len() - 1]; if !frame.topic.starts_with(prefix) {
413 continue;
414 }
415 }
416 Some(topic) => {
417 if frame.topic != topic {
418 continue;
419 }
420 }
421 }
422
423 if let Some(last_scanned_id) = last_id {
425 if frame.id <= last_scanned_id {
426 continue;
427 }
428 }
429
430 if tx.send(frame).await.is_err() {
431 break;
432 }
433
434 if let Some(limit) = limit {
435 count += 1;
436 if count >= limit {
437 break;
438 }
439 }
440 }
441 });
442 }
443
444 if let FollowOption::WithHeartbeat(duration) = options.follow {
446 let heartbeat_tx = tx;
447 tokio::spawn(async move {
448 loop {
449 tokio::time::sleep(duration).await;
450 let frame = Frame::builder("xs.pulse")
451 .id(scru128::new())
452 .ttl(TTL::Ephemeral)
453 .build();
454 if heartbeat_tx.send(frame).await.is_err() {
455 break;
456 }
457 }
458 });
459 }
460 }
461
462 rx
463 }
464
465 pub fn read_sync(&self, options: ReadOptions) -> impl Iterator<Item = Frame> + '_ {
467 let gc_tx = self.gc_tx.clone();
468
469 let filter_expired = move |frame: Frame, gc_tx: &UnboundedSender<GCTask>| {
471 if let Some(TTL::Time(ttl)) = frame.ttl.as_ref() {
472 if is_expired(&frame.id, ttl) {
473 let _ = gc_tx.send(GCTask::Remove(frame.id));
474 return None;
475 }
476 }
477 Some(frame)
478 };
479
480 let frames: Vec<Frame> = if let Some(last_n) = options.last {
481 let iter: Box<dyn Iterator<Item = Frame>> = match options.topic.as_deref() {
483 None | Some("*") => self.iter_frames_rev(),
484 Some(topic) if topic.ends_with(".*") => {
485 let prefix = &topic[..topic.len() - 1];
486 self.iter_frames_by_topic_prefix_rev(prefix)
487 }
488 Some(topic) => self.iter_frames_by_topic_rev(topic),
489 };
490
491 let mut frames: Vec<Frame> = Vec::with_capacity(last_n);
493 for frame in iter {
494 if let Some(frame) = filter_expired(frame, &gc_tx) {
495 frames.push(frame);
496 if frames.len() >= last_n {
497 break;
498 }
499 }
500 }
501
502 frames.reverse();
504 frames
505 } else {
506 let start_bound = options
508 .from
509 .as_ref()
510 .map(|id| (id, true))
511 .or_else(|| options.after.as_ref().map(|id| (id, false)));
512
513 let iter: Box<dyn Iterator<Item = Frame>> = match options.topic.as_deref() {
514 None | Some("*") => self.iter_frames(start_bound),
515 Some(topic) if topic.ends_with(".*") => {
516 let prefix = &topic[..topic.len() - 1];
517 self.iter_frames_by_topic_prefix(prefix, start_bound)
518 }
519 Some(topic) => self.iter_frames_by_topic(topic, start_bound),
520 };
521
522 iter.filter_map(|frame| filter_expired(frame, &gc_tx))
523 .take(options.limit.unwrap_or(usize::MAX))
524 .collect()
525 };
526
527 frames.into_iter()
528 }
529
530 pub fn nu_modules_at(
536 &self,
537 as_of: &Scru128Id,
538 ) -> std::collections::HashMap<String, ssri::Integrity> {
539 let mut modules = std::collections::HashMap::new();
540 let options = ReadOptions::builder().follow(FollowOption::Off).build();
541 for frame in self.read_sync(options) {
542 if frame.id > *as_of {
543 break;
544 }
545 if let Some(hash) = frame.hash {
546 if let Some(name) = frame.topic.strip_prefix("xs.module.") {
547 if !name.is_empty() {
548 modules.insert(name.to_string(), hash);
549 }
550 }
551 }
552 }
553 modules
554 }
555
556 pub fn get(&self, id: &Scru128Id) -> Option<Frame> {
557 self.stream
558 .get(id.to_bytes())
559 .unwrap()
560 .map(|value| deserialize_frame((id.as_bytes(), value)))
561 }
562
563 #[tracing::instrument(skip(self), fields(id = %id.to_string()))]
564 pub fn remove(&self, id: &Scru128Id) -> Result<(), crate::error::Error> {
565 let Some(frame) = self.get(id) else {
566 return Ok(());
568 };
569
570 let mut topic_key = idx_topic_key_prefix(&frame.topic);
572 topic_key.extend(frame.id.as_bytes());
573
574 let prefix_keys = idx_topic_prefix_keys(&frame.topic, &frame.id);
576
577 let mut batch = self.db.batch();
578 batch.remove(&self.stream, id.as_bytes());
579 batch.remove(&self.idx_topic, topic_key);
580 for prefix_key in &prefix_keys {
581 batch.remove(&self.idx_topic, prefix_key);
582 }
583 batch.commit()?;
584 self.db.persist(PersistMode::SyncAll)?;
585 Ok(())
586 }
587
588 pub async fn cas_reader(&self, hash: ssri::Integrity) -> cacache::Result<cacache::Reader> {
589 cacache::Reader::open_hash(&self.path.join("cacache"), hash).await
590 }
591
592 pub fn cas_reader_sync(&self, hash: ssri::Integrity) -> cacache::Result<cacache::SyncReader> {
593 cacache::SyncReader::open_hash(self.path.join("cacache"), hash)
594 }
595
596 pub async fn cas_writer(&self) -> cacache::Result<cacache::Writer> {
597 cacache::WriteOpts::new()
598 .open_hash(&self.path.join("cacache"))
599 .await
600 }
601
602 pub fn cas_writer_sync(&self) -> cacache::Result<cacache::SyncWriter> {
603 cacache::WriteOpts::new().open_hash_sync(self.path.join("cacache"))
604 }
605
606 pub async fn cas_insert(&self, content: impl AsRef<[u8]>) -> cacache::Result<ssri::Integrity> {
607 cacache::write_hash(&self.path.join("cacache"), content).await
608 }
609
610 pub fn cas_insert_sync(&self, content: impl AsRef<[u8]>) -> cacache::Result<ssri::Integrity> {
611 cacache::write_hash_sync(self.path.join("cacache"), content)
612 }
613
614 pub async fn cas_insert_bytes(&self, bytes: &[u8]) -> cacache::Result<ssri::Integrity> {
615 self.cas_insert(bytes).await
616 }
617
618 pub fn cas_insert_bytes_sync(&self, bytes: &[u8]) -> cacache::Result<ssri::Integrity> {
619 self.cas_insert_sync(bytes)
620 }
621
622 pub async fn cas_read(&self, hash: &ssri::Integrity) -> cacache::Result<Vec<u8>> {
623 cacache::read_hash(&self.path.join("cacache"), hash).await
624 }
625
626 pub fn cas_read_sync(&self, hash: &ssri::Integrity) -> cacache::Result<Vec<u8>> {
627 cacache::read_hash_sync(self.path.join("cacache"), hash)
628 }
629
630 #[tracing::instrument(skip(self))]
631 pub fn insert_frame(&self, frame: &Frame) -> Result<(), crate::error::Error> {
632 let encoded: Vec<u8> = serde_json::to_vec(&frame).unwrap();
633
634 let topic_key = idx_topic_key_from_frame(frame)?;
636
637 let prefix_keys = idx_topic_prefix_keys(&frame.topic, &frame.id);
639
640 let mut batch = self.db.batch();
641 batch.insert(&self.stream, frame.id.as_bytes(), encoded);
642 batch.insert(&self.idx_topic, topic_key, b"");
643 for prefix_key in &prefix_keys {
644 batch.insert(&self.idx_topic, prefix_key, b"");
645 }
646 batch.commit()?;
647 self.db.persist(PersistMode::SyncAll)?;
648 Ok(())
649 }
650
651 pub fn append(&self, mut frame: Frame) -> Result<Frame, crate::error::Error> {
652 let _guard = self.append_lock.lock().unwrap();
656
657 frame.id = scru128::new();
658
659 idx_topic_key_from_frame(&frame)?;
661
662 if frame.ttl != Some(TTL::Ephemeral) {
664 self.insert_frame(&frame)?;
665
666 if let Some(TTL::Last(n)) = frame.ttl {
668 let _ = self.gc_tx.send(GCTask::CheckLastTTL {
669 topic: frame.topic.clone(),
670 keep: n,
671 });
672 }
673 }
674
675 let _ = self.broadcast_tx.send(frame.clone());
676 Ok(frame)
677 }
678
679 fn iter_frames(
682 &self,
683 start: Option<(&Scru128Id, bool)>,
684 ) -> Box<dyn Iterator<Item = Frame> + '_> {
685 let range = match start {
686 Some((id, true)) => (Bound::Included(id.as_bytes().to_vec()), Bound::Unbounded),
687 Some((id, false)) => (Bound::Excluded(id.as_bytes().to_vec()), Bound::Unbounded),
688 None => (Bound::Unbounded, Bound::Unbounded),
689 };
690
691 Box::new(self.stream.range(range).filter_map(|guard| {
692 let (key, value) = guard.into_inner().ok()?;
693 Some(deserialize_frame((key, value)))
694 }))
695 }
696
697 fn iter_frames_rev(&self) -> Box<dyn Iterator<Item = Frame> + '_> {
699 Box::new(self.stream.iter().rev().filter_map(|guard| {
700 let (key, value) = guard.into_inner().ok()?;
701 Some(deserialize_frame((key, value)))
702 }))
703 }
704
705 fn iter_frames_by_topic_rev<'a>(
707 &'a self,
708 topic: &'a str,
709 ) -> Box<dyn Iterator<Item = Frame> + 'a> {
710 let prefix = idx_topic_key_prefix(topic);
711 Box::new(
712 self.idx_topic
713 .prefix(prefix)
714 .rev()
715 .filter_map(move |guard| {
716 let key = guard.key().ok()?;
717 let frame_id = idx_topic_frame_id_from_key(&key);
718 self.get(&frame_id)
719 }),
720 )
721 }
722
723 fn iter_frames_by_topic_prefix_rev<'a>(
725 &'a self,
726 prefix: &'a str,
727 ) -> Box<dyn Iterator<Item = Frame> + 'a> {
728 let mut index_prefix = Vec::with_capacity(prefix.len() + 1);
729 index_prefix.extend(prefix.as_bytes());
730 index_prefix.push(NULL_DELIMITER);
731
732 Box::new(
733 self.idx_topic
734 .prefix(index_prefix)
735 .rev()
736 .filter_map(move |guard| {
737 let key = guard.key().ok()?;
738 let frame_id = idx_topic_frame_id_from_key(&key);
739 self.get(&frame_id)
740 }),
741 )
742 }
743
744 fn iter_frames_by_topic<'a>(
745 &'a self,
746 topic: &'a str,
747 start: Option<(&'a Scru128Id, bool)>,
748 ) -> Box<dyn Iterator<Item = Frame> + 'a> {
749 let prefix = idx_topic_key_prefix(topic);
750 Box::new(self.idx_topic.prefix(prefix).filter_map(move |guard| {
751 let key = guard.key().ok()?;
752 let frame_id = idx_topic_frame_id_from_key(&key);
753 if let Some((bound_id, inclusive)) = start {
754 if inclusive {
755 if frame_id < *bound_id {
756 return None;
757 }
758 } else if frame_id <= *bound_id {
759 return None;
760 }
761 }
762 self.get(&frame_id)
763 }))
764 }
765
766 fn iter_frames_by_topic_prefix<'a>(
769 &'a self,
770 prefix: &'a str,
771 start: Option<(&'a Scru128Id, bool)>,
772 ) -> Box<dyn Iterator<Item = Frame> + 'a> {
773 let mut index_prefix = Vec::with_capacity(prefix.len() + 1);
775 index_prefix.extend(prefix.as_bytes());
776 index_prefix.push(NULL_DELIMITER);
777
778 Box::new(
779 self.idx_topic
780 .prefix(index_prefix)
781 .filter_map(move |guard| {
782 let key = guard.key().ok()?;
783 let frame_id = idx_topic_frame_id_from_key(&key);
784 if let Some((bound_id, inclusive)) = start {
785 if inclusive {
786 if frame_id < *bound_id {
787 return None;
788 }
789 } else if frame_id <= *bound_id {
790 return None;
791 }
792 }
793 self.get(&frame_id)
794 }),
795 )
796 }
797}
798
799fn spawn_gc_worker(mut gc_rx: UnboundedReceiver<GCTask>, store: Store) {
800 std::thread::spawn(move || {
801 while let Some(task) = gc_rx.blocking_recv() {
802 match task {
803 GCTask::Remove(id) => {
804 let _ = store.remove(&id);
805 }
806
807 GCTask::CheckLastTTL { topic, keep } => {
808 let prefix = idx_topic_key_prefix(&topic);
809 let frames_to_remove: Vec<_> = store
810 .idx_topic
811 .prefix(&prefix)
812 .rev() .skip(keep as usize)
814 .filter_map(|guard| {
815 let key = guard.key().ok()?;
816 Some(Scru128Id::from_bytes(
817 idx_topic_frame_id_from_key(&key).into(),
818 ))
819 })
820 .collect();
821
822 for frame_id in frames_to_remove {
823 let _ = store.remove(&frame_id);
824 }
825 }
826
827 GCTask::Drain(tx) => {
828 let _ = tx.send(());
829 }
830 }
831 }
832 });
833}
834
835fn is_expired(id: &Scru128Id, ttl: &Duration) -> bool {
836 let created_ms = id.timestamp();
837 let expires_ms = created_ms.saturating_add(ttl.as_millis() as u64);
838 let now_ms = std::time::SystemTime::now()
839 .duration_since(std::time::UNIX_EPOCH)
840 .unwrap()
841 .as_millis() as u64;
842
843 now_ms >= expires_ms
844}
845
846const NULL_DELIMITER: u8 = 0;
847const MAX_TOPIC_LENGTH: usize = 255;
848
849pub fn validate_topic(topic: &str) -> Result<(), crate::error::Error> {
854 if topic.is_empty() {
855 return Err("Topic cannot be empty".to_string().into());
856 }
857 if topic.len() > MAX_TOPIC_LENGTH {
858 return Err(format!("Topic exceeds max length of {MAX_TOPIC_LENGTH} bytes").into());
859 }
860 if topic.ends_with('.') {
861 return Err("Topic cannot end with '.'".to_string().into());
862 }
863 if topic.contains("..") {
864 return Err("Topic cannot contain consecutive dots".to_string().into());
865 }
866
867 let bytes = topic.as_bytes();
868 let first = bytes[0];
869 if !first.is_ascii_alphabetic() && first != b'_' {
870 return Err("Topic must start with a-z, A-Z, or _".to_string().into());
871 }
872
873 for &b in bytes {
874 if !b.is_ascii_alphanumeric() && b != b'_' && b != b'-' && b != b'.' {
875 return Err(format!(
876 "Topic contains invalid character: '{}'. Allowed: a-z A-Z 0-9 _ - .",
877 b as char
878 )
879 .into());
880 }
881 }
882
883 Ok(())
884}
885
886pub fn validate_topic_query(topic: &str) -> Result<(), crate::error::Error> {
889 if topic == "*" {
890 return Ok(());
891 }
892 if let Some(prefix) = topic.strip_suffix(".*") {
893 if prefix.is_empty() {
896 return Err("Wildcard '.*' requires a prefix".to_string().into());
897 }
898 validate_topic(prefix)
899 } else {
900 validate_topic(topic)
901 }
902}
903
904fn idx_topic_prefix_keys(topic: &str, frame_id: &scru128::Scru128Id) -> Vec<Vec<u8>> {
907 let mut keys = Vec::new();
908 let mut pos = 0;
909 while let Some(dot_pos) = topic[pos..].find('.') {
910 let prefix = &topic[..pos + dot_pos + 1]; let mut key = Vec::with_capacity(prefix.len() + 1 + 16);
912 key.extend(prefix.as_bytes());
913 key.push(NULL_DELIMITER);
914 key.extend(frame_id.as_bytes());
915 keys.push(key);
916 pos += dot_pos + 1;
917 }
918 keys
919}
920
921fn idx_topic_key_prefix(topic: &str) -> Vec<u8> {
922 let mut v = Vec::with_capacity(topic.len() + 1); v.extend(topic.as_bytes()); v.push(NULL_DELIMITER); v
926}
927
928pub(crate) fn idx_topic_key_from_frame(frame: &Frame) -> Result<Vec<u8>, crate::error::Error> {
929 validate_topic(&frame.topic)?;
930 let mut v = idx_topic_key_prefix(&frame.topic);
931 v.extend(frame.id.as_bytes());
932 Ok(v)
933}
934
935fn idx_topic_frame_id_from_key(key: &[u8]) -> Scru128Id {
936 let frame_id_bytes = &key[key.len() - 16..];
937 Scru128Id::from_bytes(frame_id_bytes.try_into().unwrap())
938}
939
940fn deserialize_frame<B1: AsRef<[u8]> + std::fmt::Debug, B2: AsRef<[u8]>>(
941 record: (B1, B2),
942) -> Frame {
943 serde_json::from_slice(record.1.as_ref()).unwrap_or_else(|e| {
944 let key_bytes = record.0.as_ref();
946 if key_bytes.len() == 16 {
947 if let Ok(bytes) = key_bytes.try_into() {
948 let id = Scru128Id::from_bytes(bytes);
949 eprintln!("CORRUPTED_RECORD_ID: {id}");
950 }
951 }
952 let key = std::str::from_utf8(record.0.as_ref()).unwrap();
953 let value = std::str::from_utf8(record.1.as_ref()).unwrap();
954 panic!("Failed to deserialize frame: {e} {key} {value}")
955 })
956}