1mod ttl;
2pub use ttl::*;
3
4#[cfg(test)]
5mod tests;
6
7use std::ops::Bound;
8use std::path::PathBuf;
9use std::time::Duration;
10
11use tokio::sync::broadcast;
12use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender};
13
14use std::sync::{Arc, Mutex};
15
16use scru128::Scru128Id;
17
18use serde::{Deserialize, Deserializer, Serialize};
19
20use fjall::{
21 config::{BlockSizePolicy, HashRatioPolicy},
22 Database, Keyspace, KeyspaceCreateOptions, PersistMode,
23};
24
25#[derive(PartialEq, Eq, Serialize, Deserialize, Clone, Default, bon::Builder)]
26pub struct Frame {
27 #[builder(start_fn, into)]
28 pub topic: String,
29 #[builder(default)]
30 pub id: Scru128Id,
31 pub hash: Option<ssri::Integrity>,
32 pub meta: Option<serde_json::Value>,
33 pub ttl: Option<TTL>,
34}
35
36use std::fmt;
37
38impl fmt::Debug for Frame {
39 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
40 f.debug_struct("Frame")
41 .field("id", &format!("{id}", id = self.id))
42 .field("topic", &self.topic)
43 .field("hash", &self.hash.as_ref().map(|x| format!("{x}")))
44 .field("meta", &self.meta)
45 .field("ttl", &self.ttl)
46 .finish()
47 }
48}
49
50impl<'de> Deserialize<'de> for FollowOption {
51 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
52 where
53 D: Deserializer<'de>,
54 {
55 let s: String = Deserialize::deserialize(deserializer)?;
56 if s.is_empty() || s == "yes" {
57 Ok(FollowOption::On)
58 } else if let Ok(duration) = s.parse::<u64>() {
59 Ok(FollowOption::WithHeartbeat(Duration::from_millis(duration)))
60 } else {
61 match s.as_str() {
62 "true" => Ok(FollowOption::On),
63 "false" | "no" => Ok(FollowOption::Off),
64 _ => Err(serde::de::Error::custom("Invalid value for follow option")),
65 }
66 }
67 }
68}
69
70fn deserialize_bool<'de, D>(deserializer: D) -> Result<bool, D::Error>
71where
72 D: Deserializer<'de>,
73{
74 let s: String = Deserialize::deserialize(deserializer)?;
75 match s.as_str() {
76 "false" | "no" | "0" => Ok(false),
77 _ => Ok(true),
78 }
79}
80
81#[derive(PartialEq, Deserialize, Clone, Debug, Default, bon::Builder)]
82pub struct ReadOptions {
83 #[serde(default)]
84 #[builder(default)]
85 pub follow: FollowOption,
86 #[serde(default, deserialize_with = "deserialize_bool")]
87 #[builder(default)]
88 pub new: bool,
89 #[serde(rename = "after")]
91 pub after: Option<Scru128Id>,
92 pub from: Option<Scru128Id>,
94 pub limit: Option<usize>,
95 pub last: Option<usize>,
97 pub topic: Option<String>,
98}
99
100impl ReadOptions {
101 pub fn from_query(query: Option<&str>) -> Result<Self, crate::error::Error> {
102 match query {
103 Some(q) => Ok(serde_urlencoded::from_str(q)?),
104 None => Ok(Self::default()),
105 }
106 }
107
108 pub fn to_query_string(&self) -> String {
109 let mut params = Vec::new();
110
111 match self.follow {
113 FollowOption::Off => {}
114 FollowOption::On => params.push(("follow", "true".to_string())),
115 FollowOption::WithHeartbeat(duration) => {
116 params.push(("follow", duration.as_millis().to_string()));
117 }
118 }
119
120 if self.new {
122 params.push(("new", "true".to_string()));
123 }
124
125 if let Some(after) = self.after {
127 params.push(("after", after.to_string()));
128 }
129
130 if let Some(from) = self.from {
132 params.push(("from", from.to_string()));
133 }
134
135 if let Some(limit) = self.limit {
137 params.push(("limit", limit.to_string()));
138 }
139
140 if let Some(last) = self.last {
142 params.push(("last", last.to_string()));
143 }
144
145 if let Some(topic) = &self.topic {
146 params.push(("topic", topic.clone()));
147 }
148
149 if params.is_empty() {
151 String::new()
152 } else {
153 url::form_urlencoded::Serializer::new(String::new())
154 .extend_pairs(params)
155 .finish()
156 }
157 }
158}
159
160#[derive(Default, PartialEq, Clone, Debug)]
161pub enum FollowOption {
162 #[default]
163 Off,
164 On,
165 WithHeartbeat(Duration),
166}
167
168#[derive(Debug)]
169enum GCTask {
170 Remove(Scru128Id),
171 CheckLastTTL { topic: String, keep: u32 },
172 Drain(tokio::sync::oneshot::Sender<()>),
173}
174
175#[derive(Clone)]
176pub struct Store {
177 pub path: PathBuf,
178 db: Database,
179 stream: Keyspace,
180 idx_topic: Keyspace,
181 broadcast_tx: broadcast::Sender<Frame>,
182 gc_tx: UnboundedSender<GCTask>,
183 append_lock: Arc<Mutex<()>>,
184}
185
186impl Store {
187 pub fn new(path: PathBuf) -> Store {
188 let db = Database::builder(path.join("fjall"))
189 .cache_size(32 * 1024 * 1024) .worker_threads(1)
191 .open()
192 .unwrap();
193
194 let stream_opts = || {
196 KeyspaceCreateOptions::default()
197 .max_memtable_size(8 * 1024 * 1024) .data_block_size_policy(BlockSizePolicy::all(16 * 1024)) .data_block_hash_ratio_policy(HashRatioPolicy::all(8.0))
200 .expect_point_read_hits(true)
201 };
202
203 let idx_opts = || {
205 KeyspaceCreateOptions::default()
206 .max_memtable_size(8 * 1024 * 1024) .data_block_size_policy(BlockSizePolicy::all(16 * 1024)) .data_block_hash_ratio_policy(HashRatioPolicy::all(0.0)) .expect_point_read_hits(true)
210 };
211
212 let stream = db.keyspace("stream", stream_opts).unwrap();
213 let idx_topic = db.keyspace("idx_topic", idx_opts).unwrap();
214
215 let (broadcast_tx, _) = broadcast::channel(1024);
216 let (gc_tx, gc_rx) = mpsc::unbounded_channel();
217
218 let store = Store {
219 path: path.clone(),
220 db,
221 stream,
222 idx_topic,
223 broadcast_tx,
224 gc_tx,
225 append_lock: Arc::new(Mutex::new(())),
226 };
227
228 spawn_gc_worker(gc_rx, store.clone());
230
231 store
232 }
233
234 pub async fn wait_for_gc(&self) {
235 let (tx, rx) = tokio::sync::oneshot::channel();
236 let _ = self.gc_tx.send(GCTask::Drain(tx));
237 let _ = rx.await;
238 }
239
240 #[tracing::instrument(skip(self))]
241 pub async fn read(&self, options: ReadOptions) -> tokio::sync::mpsc::Receiver<Frame> {
242 let (tx, rx) = tokio::sync::mpsc::channel(100);
243
244 let should_follow = matches!(
245 options.follow,
246 FollowOption::On | FollowOption::WithHeartbeat(_)
247 );
248
249 let broadcast_rx = if should_follow {
253 Some(self.broadcast_tx.subscribe())
254 } else {
255 None
256 };
257
258 let done_rx = if !options.new {
260 let (done_tx, done_rx) = tokio::sync::oneshot::channel();
261 let tx_clone = tx.clone();
262 let store = self.clone();
263 let options = options.clone();
264 let should_follow_clone = should_follow;
265 let gc_tx = self.gc_tx.clone();
266
267 std::thread::spawn(move || {
269 let mut last_id = None;
270 let mut count = 0;
271
272 if let Some(last_n) = options.last {
274 let iter: Box<dyn Iterator<Item = Frame>> = match options.topic.as_deref() {
275 None | Some("*") => store.iter_frames_rev(),
276 Some(topic) if topic.ends_with(".*") => {
277 let prefix = &topic[..topic.len() - 1];
278 store.iter_frames_by_topic_prefix_rev(prefix)
279 }
280 Some(topic) => store.iter_frames_by_topic_rev(topic),
281 };
282
283 let mut frames: Vec<Frame> = Vec::with_capacity(last_n);
285 for frame in iter {
286 if let Some(TTL::Time(ttl)) = frame.ttl.as_ref() {
287 if is_expired(&frame.id, ttl) {
288 let _ = gc_tx.send(GCTask::Remove(frame.id));
289 continue;
290 }
291 }
292 frames.push(frame);
293 if frames.len() >= last_n {
294 break;
295 }
296 }
297
298 for frame in frames.into_iter().rev() {
300 last_id = Some(frame.id);
301 count += 1;
302 if tx_clone.blocking_send(frame).is_err() {
303 return;
304 }
305 }
306 } else {
307 let start_bound = options
310 .from
311 .as_ref()
312 .map(|id| (id, true))
313 .or_else(|| options.after.as_ref().map(|id| (id, false)));
314
315 let iter: Box<dyn Iterator<Item = Frame>> = match options.topic.as_deref() {
316 None | Some("*") => store.iter_frames(start_bound),
317 Some(topic) if topic.ends_with(".*") => {
318 let prefix = &topic[..topic.len() - 1]; store.iter_frames_by_topic_prefix(prefix, start_bound)
321 }
322 Some(topic) => store.iter_frames_by_topic(topic, start_bound),
323 };
324
325 for frame in iter {
326 if let Some(TTL::Time(ttl)) = frame.ttl.as_ref() {
327 if is_expired(&frame.id, ttl) {
328 let _ = gc_tx.send(GCTask::Remove(frame.id));
329 continue;
330 }
331 }
332
333 last_id = Some(frame.id);
334
335 if let Some(limit) = options.limit {
336 if count >= limit {
337 return; }
339 }
340
341 if tx_clone.blocking_send(frame).is_err() {
342 return;
343 }
344 count += 1;
345 }
346 }
347
348 if should_follow_clone {
350 let threshold = Frame::builder("xs.threshold")
351 .id(scru128::new())
352 .ttl(TTL::Ephemeral)
353 .build();
354 if tx_clone.blocking_send(threshold).is_err() {
355 return;
356 }
357 }
358
359 let _ = done_tx.send((last_id, count));
361 });
362
363 Some(done_rx)
364 } else {
365 None
366 };
367
368 if let Some(broadcast_rx) = broadcast_rx {
370 {
371 let tx = tx.clone();
372 let limit = options.limit;
373
374 tokio::spawn(async move {
375 let (last_id, mut count) = match done_rx {
377 Some(done_rx) => match done_rx.await {
378 Ok((id, count)) => (id, count),
379 Err(_) => return, },
381 None => (None, 0),
382 };
383
384 let mut broadcast_rx = broadcast_rx;
385 while let Ok(frame) = broadcast_rx.recv().await {
386 match options.topic.as_deref() {
388 None | Some("*") => {}
389 Some(topic) if topic.ends_with(".*") => {
390 let prefix = &topic[..topic.len() - 1]; if !frame.topic.starts_with(prefix) {
392 continue;
393 }
394 }
395 Some(topic) => {
396 if frame.topic != topic {
397 continue;
398 }
399 }
400 }
401
402 if let Some(last_scanned_id) = last_id {
404 if frame.id <= last_scanned_id {
405 continue;
406 }
407 }
408
409 if tx.send(frame).await.is_err() {
410 break;
411 }
412
413 if let Some(limit) = limit {
414 count += 1;
415 if count >= limit {
416 break;
417 }
418 }
419 }
420 });
421 }
422
423 if let FollowOption::WithHeartbeat(duration) = options.follow {
425 let heartbeat_tx = tx;
426 tokio::spawn(async move {
427 loop {
428 tokio::time::sleep(duration).await;
429 let frame = Frame::builder("xs.pulse")
430 .id(scru128::new())
431 .ttl(TTL::Ephemeral)
432 .build();
433 if heartbeat_tx.send(frame).await.is_err() {
434 break;
435 }
436 }
437 });
438 }
439 }
440
441 rx
442 }
443
444 pub fn read_sync(&self, options: ReadOptions) -> impl Iterator<Item = Frame> + '_ {
446 let gc_tx = self.gc_tx.clone();
447
448 let filter_expired = move |frame: Frame, gc_tx: &UnboundedSender<GCTask>| {
450 if let Some(TTL::Time(ttl)) = frame.ttl.as_ref() {
451 if is_expired(&frame.id, ttl) {
452 let _ = gc_tx.send(GCTask::Remove(frame.id));
453 return None;
454 }
455 }
456 Some(frame)
457 };
458
459 let frames: Vec<Frame> = if let Some(last_n) = options.last {
460 let iter: Box<dyn Iterator<Item = Frame>> = match options.topic.as_deref() {
462 None | Some("*") => self.iter_frames_rev(),
463 Some(topic) if topic.ends_with(".*") => {
464 let prefix = &topic[..topic.len() - 1];
465 self.iter_frames_by_topic_prefix_rev(prefix)
466 }
467 Some(topic) => self.iter_frames_by_topic_rev(topic),
468 };
469
470 let mut frames: Vec<Frame> = Vec::with_capacity(last_n);
472 for frame in iter {
473 if let Some(frame) = filter_expired(frame, &gc_tx) {
474 frames.push(frame);
475 if frames.len() >= last_n {
476 break;
477 }
478 }
479 }
480
481 frames.reverse();
483 frames
484 } else {
485 let start_bound = options
487 .from
488 .as_ref()
489 .map(|id| (id, true))
490 .or_else(|| options.after.as_ref().map(|id| (id, false)));
491
492 let iter: Box<dyn Iterator<Item = Frame>> = match options.topic.as_deref() {
493 None | Some("*") => self.iter_frames(start_bound),
494 Some(topic) if topic.ends_with(".*") => {
495 let prefix = &topic[..topic.len() - 1];
496 self.iter_frames_by_topic_prefix(prefix, start_bound)
497 }
498 Some(topic) => self.iter_frames_by_topic(topic, start_bound),
499 };
500
501 iter.filter_map(|frame| filter_expired(frame, &gc_tx))
502 .take(options.limit.unwrap_or(usize::MAX))
503 .collect()
504 };
505
506 frames.into_iter()
507 }
508
509 pub fn get(&self, id: &Scru128Id) -> Option<Frame> {
510 self.stream
511 .get(id.to_bytes())
512 .unwrap()
513 .map(|value| deserialize_frame((id.as_bytes(), value)))
514 }
515
516 #[tracing::instrument(skip(self), fields(id = %id.to_string()))]
517 pub fn remove(&self, id: &Scru128Id) -> Result<(), crate::error::Error> {
518 let Some(frame) = self.get(id) else {
519 return Ok(());
521 };
522
523 let mut topic_key = idx_topic_key_prefix(&frame.topic);
525 topic_key.extend(frame.id.as_bytes());
526
527 let prefix_keys = idx_topic_prefix_keys(&frame.topic, &frame.id);
529
530 let mut batch = self.db.batch();
531 batch.remove(&self.stream, id.as_bytes());
532 batch.remove(&self.idx_topic, topic_key);
533 for prefix_key in &prefix_keys {
534 batch.remove(&self.idx_topic, prefix_key);
535 }
536 batch.commit()?;
537 self.db.persist(PersistMode::SyncAll)?;
538 Ok(())
539 }
540
541 pub async fn cas_reader(&self, hash: ssri::Integrity) -> cacache::Result<cacache::Reader> {
542 cacache::Reader::open_hash(&self.path.join("cacache"), hash).await
543 }
544
545 pub fn cas_reader_sync(&self, hash: ssri::Integrity) -> cacache::Result<cacache::SyncReader> {
546 cacache::SyncReader::open_hash(self.path.join("cacache"), hash)
547 }
548
549 pub async fn cas_writer(&self) -> cacache::Result<cacache::Writer> {
550 cacache::WriteOpts::new()
551 .open_hash(&self.path.join("cacache"))
552 .await
553 }
554
555 pub fn cas_writer_sync(&self) -> cacache::Result<cacache::SyncWriter> {
556 cacache::WriteOpts::new().open_hash_sync(self.path.join("cacache"))
557 }
558
559 pub async fn cas_insert(&self, content: impl AsRef<[u8]>) -> cacache::Result<ssri::Integrity> {
560 cacache::write_hash(&self.path.join("cacache"), content).await
561 }
562
563 pub fn cas_insert_sync(&self, content: impl AsRef<[u8]>) -> cacache::Result<ssri::Integrity> {
564 cacache::write_hash_sync(self.path.join("cacache"), content)
565 }
566
567 pub async fn cas_insert_bytes(&self, bytes: &[u8]) -> cacache::Result<ssri::Integrity> {
568 self.cas_insert(bytes).await
569 }
570
571 pub fn cas_insert_bytes_sync(&self, bytes: &[u8]) -> cacache::Result<ssri::Integrity> {
572 self.cas_insert_sync(bytes)
573 }
574
575 pub async fn cas_read(&self, hash: &ssri::Integrity) -> cacache::Result<Vec<u8>> {
576 cacache::read_hash(&self.path.join("cacache"), hash).await
577 }
578
579 pub fn cas_read_sync(&self, hash: &ssri::Integrity) -> cacache::Result<Vec<u8>> {
580 cacache::read_hash_sync(self.path.join("cacache"), hash)
581 }
582
583 #[tracing::instrument(skip(self))]
584 pub fn insert_frame(&self, frame: &Frame) -> Result<(), crate::error::Error> {
585 let encoded: Vec<u8> = serde_json::to_vec(&frame).unwrap();
586
587 let topic_key = idx_topic_key_from_frame(frame)?;
589
590 let prefix_keys = idx_topic_prefix_keys(&frame.topic, &frame.id);
592
593 let mut batch = self.db.batch();
594 batch.insert(&self.stream, frame.id.as_bytes(), encoded);
595 batch.insert(&self.idx_topic, topic_key, b"");
596 for prefix_key in &prefix_keys {
597 batch.insert(&self.idx_topic, prefix_key, b"");
598 }
599 batch.commit()?;
600 self.db.persist(PersistMode::SyncAll)?;
601 Ok(())
602 }
603
604 pub fn append(&self, mut frame: Frame) -> Result<Frame, crate::error::Error> {
605 let _guard = self.append_lock.lock().unwrap();
609
610 frame.id = scru128::new();
611
612 idx_topic_key_from_frame(&frame)?;
614
615 if frame.ttl != Some(TTL::Ephemeral) {
617 self.insert_frame(&frame)?;
618
619 if let Some(TTL::Last(n)) = frame.ttl {
621 let _ = self.gc_tx.send(GCTask::CheckLastTTL {
622 topic: frame.topic.clone(),
623 keep: n,
624 });
625 }
626 }
627
628 let _ = self.broadcast_tx.send(frame.clone());
629 Ok(frame)
630 }
631
632 fn iter_frames(
635 &self,
636 start: Option<(&Scru128Id, bool)>,
637 ) -> Box<dyn Iterator<Item = Frame> + '_> {
638 let range = match start {
639 Some((id, true)) => (Bound::Included(id.as_bytes().to_vec()), Bound::Unbounded),
640 Some((id, false)) => (Bound::Excluded(id.as_bytes().to_vec()), Bound::Unbounded),
641 None => (Bound::Unbounded, Bound::Unbounded),
642 };
643
644 Box::new(self.stream.range(range).filter_map(|guard| {
645 let (key, value) = guard.into_inner().ok()?;
646 Some(deserialize_frame((key, value)))
647 }))
648 }
649
650 fn iter_frames_rev(&self) -> Box<dyn Iterator<Item = Frame> + '_> {
652 Box::new(self.stream.iter().rev().filter_map(|guard| {
653 let (key, value) = guard.into_inner().ok()?;
654 Some(deserialize_frame((key, value)))
655 }))
656 }
657
658 fn iter_frames_by_topic_rev<'a>(
660 &'a self,
661 topic: &'a str,
662 ) -> Box<dyn Iterator<Item = Frame> + 'a> {
663 let prefix = idx_topic_key_prefix(topic);
664 Box::new(
665 self.idx_topic
666 .prefix(prefix)
667 .rev()
668 .filter_map(move |guard| {
669 let key = guard.key().ok()?;
670 let frame_id = idx_topic_frame_id_from_key(&key);
671 self.get(&frame_id)
672 }),
673 )
674 }
675
676 fn iter_frames_by_topic_prefix_rev<'a>(
678 &'a self,
679 prefix: &'a str,
680 ) -> Box<dyn Iterator<Item = Frame> + 'a> {
681 let mut index_prefix = Vec::with_capacity(prefix.len() + 1);
682 index_prefix.extend(prefix.as_bytes());
683 index_prefix.push(NULL_DELIMITER);
684
685 Box::new(
686 self.idx_topic
687 .prefix(index_prefix)
688 .rev()
689 .filter_map(move |guard| {
690 let key = guard.key().ok()?;
691 let frame_id = idx_topic_frame_id_from_key(&key);
692 self.get(&frame_id)
693 }),
694 )
695 }
696
697 fn iter_frames_by_topic<'a>(
698 &'a self,
699 topic: &'a str,
700 start: Option<(&'a Scru128Id, bool)>,
701 ) -> Box<dyn Iterator<Item = Frame> + 'a> {
702 let prefix = idx_topic_key_prefix(topic);
703 Box::new(self.idx_topic.prefix(prefix).filter_map(move |guard| {
704 let key = guard.key().ok()?;
705 let frame_id = idx_topic_frame_id_from_key(&key);
706 if let Some((bound_id, inclusive)) = start {
707 if inclusive {
708 if frame_id < *bound_id {
709 return None;
710 }
711 } else if frame_id <= *bound_id {
712 return None;
713 }
714 }
715 self.get(&frame_id)
716 }))
717 }
718
719 fn iter_frames_by_topic_prefix<'a>(
722 &'a self,
723 prefix: &'a str,
724 start: Option<(&'a Scru128Id, bool)>,
725 ) -> Box<dyn Iterator<Item = Frame> + 'a> {
726 let mut index_prefix = Vec::with_capacity(prefix.len() + 1);
728 index_prefix.extend(prefix.as_bytes());
729 index_prefix.push(NULL_DELIMITER);
730
731 Box::new(
732 self.idx_topic
733 .prefix(index_prefix)
734 .filter_map(move |guard| {
735 let key = guard.key().ok()?;
736 let frame_id = idx_topic_frame_id_from_key(&key);
737 if let Some((bound_id, inclusive)) = start {
738 if inclusive {
739 if frame_id < *bound_id {
740 return None;
741 }
742 } else if frame_id <= *bound_id {
743 return None;
744 }
745 }
746 self.get(&frame_id)
747 }),
748 )
749 }
750}
751
752fn spawn_gc_worker(mut gc_rx: UnboundedReceiver<GCTask>, store: Store) {
753 std::thread::spawn(move || {
754 while let Some(task) = gc_rx.blocking_recv() {
755 match task {
756 GCTask::Remove(id) => {
757 let _ = store.remove(&id);
758 }
759
760 GCTask::CheckLastTTL { topic, keep } => {
761 let prefix = idx_topic_key_prefix(&topic);
762 let frames_to_remove: Vec<_> = store
763 .idx_topic
764 .prefix(&prefix)
765 .rev() .skip(keep as usize)
767 .filter_map(|guard| {
768 let key = guard.key().ok()?;
769 Some(Scru128Id::from_bytes(
770 idx_topic_frame_id_from_key(&key).into(),
771 ))
772 })
773 .collect();
774
775 for frame_id in frames_to_remove {
776 let _ = store.remove(&frame_id);
777 }
778 }
779
780 GCTask::Drain(tx) => {
781 let _ = tx.send(());
782 }
783 }
784 }
785 });
786}
787
788fn is_expired(id: &Scru128Id, ttl: &Duration) -> bool {
789 let created_ms = id.timestamp();
790 let expires_ms = created_ms.saturating_add(ttl.as_millis() as u64);
791 let now_ms = std::time::SystemTime::now()
792 .duration_since(std::time::UNIX_EPOCH)
793 .unwrap()
794 .as_millis() as u64;
795
796 now_ms >= expires_ms
797}
798
799const NULL_DELIMITER: u8 = 0;
800const MAX_TOPIC_LENGTH: usize = 255;
801
802pub fn validate_topic(topic: &str) -> Result<(), crate::error::Error> {
807 if topic.is_empty() {
808 return Err("Topic cannot be empty".to_string().into());
809 }
810 if topic.len() > MAX_TOPIC_LENGTH {
811 return Err(format!("Topic exceeds max length of {MAX_TOPIC_LENGTH} bytes").into());
812 }
813 if topic.ends_with('.') {
814 return Err("Topic cannot end with '.'".to_string().into());
815 }
816 if topic.contains("..") {
817 return Err("Topic cannot contain consecutive dots".to_string().into());
818 }
819
820 let bytes = topic.as_bytes();
821 let first = bytes[0];
822 if !first.is_ascii_alphanumeric() && first != b'_' {
823 return Err("Topic must start with a-z, A-Z, 0-9, or _"
824 .to_string()
825 .into());
826 }
827
828 for &b in bytes {
829 if !b.is_ascii_alphanumeric() && b != b'_' && b != b'-' && b != b'.' {
830 return Err(format!(
831 "Topic contains invalid character: '{}'. Allowed: a-z A-Z 0-9 _ - .",
832 b as char
833 )
834 .into());
835 }
836 }
837
838 Ok(())
839}
840
841pub fn validate_topic_query(topic: &str) -> Result<(), crate::error::Error> {
844 if topic == "*" {
845 return Ok(());
846 }
847 if let Some(prefix) = topic.strip_suffix(".*") {
848 if prefix.is_empty() {
851 return Err("Wildcard '.*' requires a prefix".to_string().into());
852 }
853 validate_topic(prefix)
854 } else {
855 validate_topic(topic)
856 }
857}
858
859fn idx_topic_prefix_keys(topic: &str, frame_id: &scru128::Scru128Id) -> Vec<Vec<u8>> {
862 let mut keys = Vec::new();
863 let mut pos = 0;
864 while let Some(dot_pos) = topic[pos..].find('.') {
865 let prefix = &topic[..pos + dot_pos + 1]; let mut key = Vec::with_capacity(prefix.len() + 1 + 16);
867 key.extend(prefix.as_bytes());
868 key.push(NULL_DELIMITER);
869 key.extend(frame_id.as_bytes());
870 keys.push(key);
871 pos += dot_pos + 1;
872 }
873 keys
874}
875
876fn idx_topic_key_prefix(topic: &str) -> Vec<u8> {
877 let mut v = Vec::with_capacity(topic.len() + 1); v.extend(topic.as_bytes()); v.push(NULL_DELIMITER); v
881}
882
883pub(crate) fn idx_topic_key_from_frame(frame: &Frame) -> Result<Vec<u8>, crate::error::Error> {
884 validate_topic(&frame.topic)?;
885 let mut v = idx_topic_key_prefix(&frame.topic);
886 v.extend(frame.id.as_bytes());
887 Ok(v)
888}
889
890fn idx_topic_frame_id_from_key(key: &[u8]) -> Scru128Id {
891 let frame_id_bytes = &key[key.len() - 16..];
892 Scru128Id::from_bytes(frame_id_bytes.try_into().unwrap())
893}
894
895fn deserialize_frame<B1: AsRef<[u8]> + std::fmt::Debug, B2: AsRef<[u8]>>(
896 record: (B1, B2),
897) -> Frame {
898 serde_json::from_slice(record.1.as_ref()).unwrap_or_else(|e| {
899 let key_bytes = record.0.as_ref();
901 if key_bytes.len() == 16 {
902 if let Ok(bytes) = key_bytes.try_into() {
903 let id = Scru128Id::from_bytes(bytes);
904 eprintln!("CORRUPTED_RECORD_ID: {id}");
905 }
906 }
907 let key = std::str::from_utf8(record.0.as_ref()).unwrap();
908 let value = std::str::from_utf8(record.1.as_ref()).unwrap();
909 panic!("Failed to deserialize frame: {e} {key} {value}")
910 })
911}