1mod ttl;
2pub use ttl::*;
3
4#[cfg(test)]
5mod tests;
6
7use std::ops::Bound;
8use std::path::PathBuf;
9use std::time::Duration;
10
11use tokio::sync::broadcast;
12use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender};
13
14use std::collections::HashSet;
15use std::sync::{Arc, Mutex, RwLock};
16
17use scru128::Scru128Id;
18
19use serde::{Deserialize, Deserializer, Serialize};
20
21use fjall::{Config, Keyspace, PartitionCreateOptions, PartitionHandle};
22
23pub const ZERO_CONTEXT: Scru128Id = Scru128Id::from_bytes([0; 16]);
25
26#[derive(PartialEq, Eq, Serialize, Deserialize, Clone, Default, bon::Builder)]
27pub struct Frame {
28 #[builder(start_fn, into)]
29 pub topic: String,
30 #[builder(start_fn)]
31 pub context_id: Scru128Id,
32 #[builder(default)]
33 pub id: Scru128Id,
34 pub hash: Option<ssri::Integrity>,
35 pub meta: Option<serde_json::Value>,
36 pub ttl: Option<TTL>,
37}
38
39use std::fmt;
40
41impl fmt::Debug for Frame {
42 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
43 f.debug_struct("Frame")
44 .field("id", &format!("{id}", id = self.id))
45 .field(
46 "context_id",
47 &format!("{context_id}", context_id = self.context_id),
48 )
49 .field("topic", &self.topic)
50 .field("hash", &self.hash.as_ref().map(|x| format!("{x}")))
51 .field("meta", &self.meta)
52 .field("ttl", &self.ttl)
53 .finish()
54 }
55}
56
57impl<'de> Deserialize<'de> for FollowOption {
58 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
59 where
60 D: Deserializer<'de>,
61 {
62 let s: String = Deserialize::deserialize(deserializer)?;
63 if s.is_empty() || s == "yes" {
64 Ok(FollowOption::On)
65 } else if let Ok(duration) = s.parse::<u64>() {
66 Ok(FollowOption::WithHeartbeat(Duration::from_millis(duration)))
67 } else {
68 match s.as_str() {
69 "true" => Ok(FollowOption::On),
70 "false" | "no" => Ok(FollowOption::Off),
71 _ => Err(serde::de::Error::custom("Invalid value for follow option")),
72 }
73 }
74 }
75}
76
77fn deserialize_bool<'de, D>(deserializer: D) -> Result<bool, D::Error>
78where
79 D: Deserializer<'de>,
80{
81 let s: String = Deserialize::deserialize(deserializer)?;
82 match s.as_str() {
83 "false" | "no" | "0" => Ok(false),
84 _ => Ok(true),
85 }
86}
87
88#[derive(PartialEq, Deserialize, Clone, Debug, Default, bon::Builder)]
89pub struct ReadOptions {
90 #[serde(default)]
91 #[builder(default)]
92 pub follow: FollowOption,
93 #[serde(default, deserialize_with = "deserialize_bool")]
94 #[builder(default)]
95 pub tail: bool,
96 #[serde(rename = "last-id")]
97 pub last_id: Option<Scru128Id>,
98 pub limit: Option<usize>,
99 #[serde(rename = "context-id")]
100 pub context_id: Option<Scru128Id>,
101 pub topic: Option<String>,
102}
103
104impl ReadOptions {
105 pub fn from_query(query: Option<&str>) -> Result<Self, crate::error::Error> {
106 match query {
107 Some(q) => Ok(serde_urlencoded::from_str(q)?),
108 None => Ok(Self::default()),
109 }
110 }
111
112 pub fn to_query_string(&self) -> String {
113 let mut params = Vec::new();
114
115 match self.follow {
117 FollowOption::Off => {}
118 FollowOption::On => params.push(("follow", "true".to_string())),
119 FollowOption::WithHeartbeat(duration) => {
120 params.push(("follow", duration.as_millis().to_string()));
121 }
122 }
123
124 if let Some(context_id) = self.context_id {
125 params.push(("context-id", context_id.to_string()));
126 }
127
128 if self.tail {
130 params.push(("tail", "true".to_string()));
131 }
132
133 if let Some(last_id) = self.last_id {
135 params.push(("last-id", last_id.to_string()));
136 }
137
138 if let Some(limit) = self.limit {
140 params.push(("limit", limit.to_string()));
141 }
142
143 if let Some(topic) = &self.topic {
144 params.push(("topic", topic.clone()));
145 }
146
147 if params.is_empty() {
149 String::new()
150 } else {
151 url::form_urlencoded::Serializer::new(String::new())
152 .extend_pairs(params)
153 .finish()
154 }
155 }
156}
157
158#[derive(Default, PartialEq, Clone, Debug)]
159pub enum FollowOption {
160 #[default]
161 Off,
162 On,
163 WithHeartbeat(Duration),
164}
165
166#[derive(Debug)]
167enum GCTask {
168 Remove(Scru128Id),
169 CheckHeadTTL {
170 context_id: Scru128Id,
171 topic: String,
172 keep: u32,
173 },
174 Drain(tokio::sync::oneshot::Sender<()>),
175}
176
177#[derive(Clone)]
178pub struct Store {
179 pub path: PathBuf,
180 keyspace: Keyspace,
181 frame_partition: PartitionHandle,
182 idx_topic: PartitionHandle,
183 idx_context: PartitionHandle,
184 contexts: Arc<RwLock<HashSet<Scru128Id>>>,
185 broadcast_tx: broadcast::Sender<Frame>,
186 gc_tx: UnboundedSender<GCTask>,
187 append_lock: Arc<Mutex<()>>,
188}
189
190impl Store {
191 pub fn new(path: PathBuf) -> Store {
192 let config = Config::new(path.join("fjall"));
193 let keyspace = config
194 .flush_workers(1)
195 .compaction_workers(1)
196 .open()
197 .unwrap();
198
199 let frame_partition = keyspace
200 .open_partition("stream", PartitionCreateOptions::default())
201 .unwrap();
202
203 let idx_topic = keyspace
204 .open_partition("idx_topic", PartitionCreateOptions::default())
205 .unwrap();
206
207 let idx_context = keyspace
208 .open_partition("idx_context", PartitionCreateOptions::default())
209 .unwrap();
210
211 let (broadcast_tx, _) = broadcast::channel(1024);
212 let (gc_tx, gc_rx) = mpsc::unbounded_channel();
213
214 let mut contexts = HashSet::new();
215 contexts.insert(ZERO_CONTEXT); let store = Store {
218 path: path.clone(),
219 keyspace: keyspace.clone(),
220 frame_partition: frame_partition.clone(),
221 idx_topic: idx_topic.clone(),
222 idx_context: idx_context.clone(),
223 contexts: Arc::new(RwLock::new(contexts)),
224 broadcast_tx,
225 gc_tx,
226 append_lock: Arc::new(Mutex::new(())),
227 };
228
229 for frame in store.read_sync(None, None, Some(ZERO_CONTEXT)) {
231 if frame.topic == "xs.context" {
232 store.contexts.write().unwrap().insert(frame.id);
233 }
234 }
235
236 spawn_gc_worker(gc_rx, store.clone());
238
239 store
240 }
241
242 pub async fn wait_for_gc(&self) {
243 let (tx, rx) = tokio::sync::oneshot::channel();
244 let _ = self.gc_tx.send(GCTask::Drain(tx));
245 let _ = rx.await;
246 }
247
248 #[tracing::instrument(skip(self))]
249 pub async fn read(&self, options: ReadOptions) -> tokio::sync::mpsc::Receiver<Frame> {
250 let (tx, rx) = tokio::sync::mpsc::channel(100);
251
252 let should_follow = matches!(
253 options.follow,
254 FollowOption::On | FollowOption::WithHeartbeat(_)
255 );
256
257 let broadcast_rx = if should_follow {
261 Some(self.broadcast_tx.subscribe())
262 } else {
263 None
264 };
265
266 let done_rx = if !options.tail {
268 let (done_tx, done_rx) = tokio::sync::oneshot::channel();
269 let tx_clone = tx.clone();
270 let store = self.clone();
271 let options = options.clone();
272 let should_follow_clone = should_follow;
273 let gc_tx = self.gc_tx.clone();
274
275 std::thread::spawn(move || {
277 let mut last_id = None;
278 let mut count = 0;
279
280 let iter: Box<dyn Iterator<Item = Frame>> = if let Some(ref topic) = options.topic {
281 store.iter_frames_by_topic(options.context_id, topic, options.last_id.as_ref())
282 } else {
283 store.iter_frames(options.context_id, options.last_id.as_ref())
284 };
285
286 for frame in iter {
287 if let Some(TTL::Time(ttl)) = frame.ttl.as_ref() {
288 if is_expired(&frame.id, ttl) {
289 let _ = gc_tx.send(GCTask::Remove(frame.id));
290 continue;
291 }
292 }
293
294 last_id = Some(frame.id);
295
296 if let Some(limit) = options.limit {
297 if count >= limit {
298 return; }
300 }
301
302 if tx_clone.blocking_send(frame).is_err() {
303 return;
304 }
305 count += 1;
306 }
307
308 if should_follow_clone && options.limit.is_none() {
310 let threshold =
311 Frame::builder("xs.threshold", options.context_id.unwrap_or(ZERO_CONTEXT))
312 .id(scru128::new())
313 .ttl(TTL::Ephemeral)
314 .build();
315 if tx_clone.blocking_send(threshold).is_err() {
316 return;
317 }
318 }
319
320 let _ = done_tx.send((last_id, count));
322 });
323
324 Some(done_rx)
325 } else {
326 None
327 };
328
329 if let Some(broadcast_rx) = broadcast_rx {
331 {
332 let tx = tx.clone();
333 let limit = options.limit;
334
335 tokio::spawn(async move {
336 let (last_id, mut count) = match done_rx {
338 Some(done_rx) => match done_rx.await {
339 Ok((id, count)) => (id, count),
340 Err(_) => return, },
342 None => (None, 0),
343 };
344
345 let mut broadcast_rx = broadcast_rx;
346 while let Ok(frame) = broadcast_rx.recv().await {
347 if let Some(context_id) = options.context_id {
349 if frame.context_id != context_id {
350 continue;
351 }
352 }
353
354 if let Some(ref topic) = options.topic {
355 if frame.topic != *topic {
356 continue;
357 }
358 }
359
360 if let Some(last_scanned_id) = last_id {
362 if frame.id <= last_scanned_id {
363 continue;
364 }
365 }
366
367 if tx.send(frame).await.is_err() {
368 break;
369 }
370
371 if let Some(limit) = limit {
372 count += 1;
373 if count >= limit {
374 break;
375 }
376 }
377 }
378 });
379 }
380
381 if let FollowOption::WithHeartbeat(duration) = options.follow {
383 let heartbeat_tx = tx;
384 tokio::spawn(async move {
385 loop {
386 tokio::time::sleep(duration).await;
387 let frame =
388 Frame::builder("xs.pulse", options.context_id.unwrap_or(ZERO_CONTEXT))
389 .id(scru128::new())
390 .ttl(TTL::Ephemeral)
391 .build();
392 if heartbeat_tx.send(frame).await.is_err() {
393 break;
394 }
395 }
396 });
397 }
398 }
399
400 rx
401 }
402
403 #[tracing::instrument(skip(self))]
404 pub fn read_sync(
405 &self,
406 last_id: Option<&Scru128Id>,
407 limit: Option<usize>,
408 context_id: Option<Scru128Id>,
409 ) -> impl Iterator<Item = Frame> + '_ {
410 self.iter_frames(context_id, last_id)
411 .filter(move |frame| {
412 if let Some(TTL::Time(ttl)) = frame.ttl.as_ref() {
413 if is_expired(&frame.id, ttl) {
414 let _ = self.gc_tx.send(GCTask::Remove(frame.id));
415 return false;
416 }
417 }
418 true
419 })
420 .take(limit.unwrap_or(usize::MAX))
421 }
422
423 pub fn get(&self, id: &Scru128Id) -> Option<Frame> {
424 self.frame_partition
425 .get(id.to_bytes())
426 .unwrap()
427 .map(|value| deserialize_frame((id.as_bytes(), value)))
428 }
429
430 #[tracing::instrument(skip(self))]
431 pub fn head(&self, topic: &str, context_id: Scru128Id) -> Option<Frame> {
432 self.idx_topic
433 .prefix(idx_topic_key_prefix(context_id, topic))
434 .rev()
435 .find_map(|kv| self.get(&idx_topic_frame_id_from_key(&kv.unwrap().0)))
436 }
437
438 #[tracing::instrument(skip(self), fields(id = %id.to_string()))]
439 pub fn remove(&self, id: &Scru128Id) -> Result<(), crate::error::Error> {
440 let Some(frame) = self.get(id) else {
441 return Ok(());
443 };
444
445 let topic_key = idx_topic_key_from_frame(&frame)?;
447
448 let mut batch = self.keyspace.batch();
449 batch.remove(&self.frame_partition, id.as_bytes());
450 batch.remove(&self.idx_topic, topic_key);
451 batch.remove(&self.idx_context, idx_context_key_from_frame(&frame));
452
453 if frame.topic == "xs.context" {
455 self.contexts.write().unwrap().remove(&frame.id);
456 }
457
458 batch.commit()?;
459 self.keyspace.persist(fjall::PersistMode::SyncAll)?;
460 Ok(())
461 }
462
463 pub async fn cas_reader(&self, hash: ssri::Integrity) -> cacache::Result<cacache::Reader> {
464 cacache::Reader::open_hash(&self.path.join("cacache"), hash).await
465 }
466
467 pub fn cas_reader_sync(&self, hash: ssri::Integrity) -> cacache::Result<cacache::SyncReader> {
468 cacache::SyncReader::open_hash(self.path.join("cacache"), hash)
469 }
470
471 pub async fn cas_writer(&self) -> cacache::Result<cacache::Writer> {
472 cacache::WriteOpts::new()
473 .open_hash(&self.path.join("cacache"))
474 .await
475 }
476
477 pub fn cas_writer_sync(&self) -> cacache::Result<cacache::SyncWriter> {
478 cacache::WriteOpts::new().open_hash_sync(self.path.join("cacache"))
479 }
480
481 pub async fn cas_insert(&self, content: impl AsRef<[u8]>) -> cacache::Result<ssri::Integrity> {
482 cacache::write_hash(&self.path.join("cacache"), content).await
483 }
484
485 pub fn cas_insert_sync(&self, content: impl AsRef<[u8]>) -> cacache::Result<ssri::Integrity> {
486 cacache::write_hash_sync(self.path.join("cacache"), content)
487 }
488
489 pub async fn cas_insert_bytes(&self, bytes: &[u8]) -> cacache::Result<ssri::Integrity> {
490 self.cas_insert(bytes).await
491 }
492
493 pub fn cas_insert_bytes_sync(&self, bytes: &[u8]) -> cacache::Result<ssri::Integrity> {
494 self.cas_insert_sync(bytes)
495 }
496
497 pub async fn cas_read(&self, hash: &ssri::Integrity) -> cacache::Result<Vec<u8>> {
498 cacache::read_hash(&self.path.join("cacache"), hash).await
499 }
500
501 pub fn cas_read_sync(&self, hash: &ssri::Integrity) -> cacache::Result<Vec<u8>> {
502 cacache::read_hash_sync(self.path.join("cacache"), hash)
503 }
504
505 #[tracing::instrument(skip(self))]
506 pub fn insert_frame(&self, frame: &Frame) -> Result<(), crate::error::Error> {
507 let encoded: Vec<u8> = serde_json::to_vec(&frame).unwrap();
508
509 let topic_key = idx_topic_key_from_frame(frame)?;
511
512 let mut batch = self.keyspace.batch();
513 batch.insert(&self.frame_partition, frame.id.as_bytes(), encoded);
514 batch.insert(&self.idx_topic, topic_key, b"");
515 batch.insert(&self.idx_context, idx_context_key_from_frame(frame), b"");
516 batch.commit()?;
517 self.keyspace.persist(fjall::PersistMode::SyncAll)?;
518 Ok(())
519 }
520
521 pub fn append(&self, mut frame: Frame) -> Result<Frame, crate::error::Error> {
522 let _guard = self.append_lock.lock().unwrap();
526
527 frame.id = scru128::new();
528
529 if frame.topic == "xs.context" {
531 if frame.context_id != ZERO_CONTEXT {
532 return Err("xs.context frames must be in zero context".into());
533 }
534 frame.ttl = Some(TTL::Forever);
535 self.contexts.write().unwrap().insert(frame.id);
536 } else {
537 let contexts = self.contexts.read().unwrap();
539 if !contexts.contains(&frame.context_id) {
540 return Err(format!(
541 "Invalid context: {context_id}",
542 context_id = frame.context_id
543 )
544 .into());
545 }
546 }
547
548 idx_topic_key_from_frame(&frame)?;
550
551 if frame.ttl != Some(TTL::Ephemeral) {
553 self.insert_frame(&frame)?;
554
555 if let Some(TTL::Head(n)) = frame.ttl {
557 let _ = self.gc_tx.send(GCTask::CheckHeadTTL {
558 context_id: frame.context_id,
559 topic: frame.topic.clone(),
560 keep: n,
561 });
562 }
563 }
564
565 let _ = self.broadcast_tx.send(frame.clone());
566 Ok(frame)
567 }
568
569 fn iter_frames(
570 &self,
571 context_id: Option<Scru128Id>,
572 last_id: Option<&Scru128Id>,
573 ) -> Box<dyn Iterator<Item = Frame> + '_> {
574 match context_id {
575 Some(ctx_id) => {
576 let start_key = if let Some(last_id) = last_id {
577 let mut v = Vec::with_capacity(32);
579 v.extend(ctx_id.as_bytes());
580 v.extend(last_id.as_bytes());
581 Bound::Excluded(v)
582 } else {
583 Bound::Included(ctx_id.as_bytes().to_vec())
584 };
585
586 let end_key = Bound::Excluded(idx_context_key_range_end(ctx_id));
587
588 Box::new(
589 self.idx_context
590 .range((start_key, end_key))
591 .filter_map(move |r| {
592 let (key, _) = r.ok()?;
593 let frame_id_bytes = &key[16..];
594 let frame_id = Scru128Id::from_bytes(frame_id_bytes.try_into().ok()?);
595 self.get(&frame_id)
596 }),
597 )
598 }
599 None => {
600 let range = match last_id {
601 Some(id) => (Bound::Excluded(id.as_bytes().to_vec()), Bound::Unbounded),
602 None => (Bound::Unbounded, Bound::Unbounded),
603 };
604
605 Box::new(
606 self.frame_partition
607 .range(range)
608 .map(|r| deserialize_frame(r.unwrap())),
609 )
610 }
611 }
612 }
613
614 fn iter_frames_by_topic<'a>(
615 &'a self,
616 context_id: Option<Scru128Id>,
617 topic: &'a str,
618 last_id: Option<&'a Scru128Id>,
619 ) -> Box<dyn Iterator<Item = Frame> + 'a> {
620 if let Some(ctx_id) = context_id {
621 let prefix = idx_topic_key_prefix(ctx_id, topic);
622 Box::new(self.idx_topic.prefix(prefix).filter_map(move |r| {
623 let (key, _) = r.ok()?;
624 let frame_id = idx_topic_frame_id_from_key(&key);
625 if let Some(last) = last_id {
626 if frame_id <= *last {
627 return None;
628 }
629 }
630 self.get(&frame_id)
631 }))
632 } else {
633 let range = match last_id {
634 Some(id) => (Bound::Excluded(id.as_bytes().to_vec()), Bound::Unbounded),
635 None => (Bound::Unbounded, Bound::Unbounded),
636 };
637
638 Box::new(self.frame_partition.range(range).filter_map(move |r| {
639 let frame = deserialize_frame(r.unwrap());
640 if frame.topic == topic {
641 Some(frame)
642 } else {
643 None
644 }
645 }))
646 }
647 }
648}
649
650fn spawn_gc_worker(mut gc_rx: UnboundedReceiver<GCTask>, store: Store) {
651 std::thread::spawn(move || {
652 while let Some(task) = gc_rx.blocking_recv() {
653 match task {
654 GCTask::Remove(id) => {
655 let _ = store.remove(&id);
656 }
657
658 GCTask::CheckHeadTTL {
659 context_id,
660 topic,
661 keep,
662 } => {
663 let prefix = idx_topic_key_prefix(context_id, &topic);
664 let frames_to_remove: Vec<_> = store
665 .idx_topic
666 .prefix(&prefix)
667 .rev() .skip(keep as usize)
669 .map(|r| {
670 Scru128Id::from_bytes(idx_topic_frame_id_from_key(&r.unwrap().0).into())
671 })
672 .collect();
673
674 for frame_id in frames_to_remove {
675 let _ = store.remove(&frame_id);
676 }
677 }
678
679 GCTask::Drain(tx) => {
680 let _ = tx.send(());
681 }
682 }
683 }
684 });
685}
686
687fn is_expired(id: &Scru128Id, ttl: &Duration) -> bool {
688 let created_ms = id.timestamp();
689 let expires_ms = created_ms.saturating_add(ttl.as_millis() as u64);
690 let now_ms = std::time::SystemTime::now()
691 .duration_since(std::time::UNIX_EPOCH)
692 .unwrap()
693 .as_millis() as u64;
694
695 now_ms >= expires_ms
696}
697
698const NULL_DELIMITER: u8 = 0;
699
700fn idx_topic_key_prefix(context_id: Scru128Id, topic: &str) -> Vec<u8> {
701 let mut v = Vec::with_capacity(16 + topic.len() + 1); v.extend(context_id.as_bytes()); v.extend(topic.as_bytes()); v.push(NULL_DELIMITER); v
706}
707
708pub(crate) fn idx_topic_key_from_frame(frame: &Frame) -> Result<Vec<u8>, crate::error::Error> {
709 if frame.topic.as_bytes().contains(&NULL_DELIMITER) {
711 return Err(
712 "Topic cannot contain null byte (0x00) as it's used as a delimiter"
713 .to_string()
714 .into(),
715 );
716 }
717 let mut v = idx_topic_key_prefix(frame.context_id, &frame.topic);
718 v.extend(frame.id.as_bytes());
719 Ok(v)
720}
721
722fn idx_topic_frame_id_from_key(key: &[u8]) -> Scru128Id {
723 let frame_id_bytes = &key[key.len() - 16..];
724 Scru128Id::from_bytes(frame_id_bytes.try_into().unwrap())
725}
726
727fn idx_context_key_from_frame(frame: &Frame) -> Vec<u8> {
729 let mut v = Vec::with_capacity(frame.context_id.as_bytes().len() + frame.id.as_bytes().len());
730 v.extend(frame.context_id.as_bytes());
731 v.extend(frame.id.as_bytes());
732 v
733}
734
735fn idx_context_key_range_end(context_id: Scru128Id) -> Vec<u8> {
737 let mut i = context_id.to_u128();
738
739 i = i.saturating_add(1);
741
742 Scru128Id::from(i).as_bytes().to_vec()
743}
744
745fn deserialize_frame<B1: AsRef<[u8]> + std::fmt::Debug, B2: AsRef<[u8]>>(
746 record: (B1, B2),
747) -> Frame {
748 serde_json::from_slice(record.1.as_ref()).unwrap_or_else(|e| {
749 let key_bytes = record.0.as_ref();
751 if key_bytes.len() == 16 {
752 if let Ok(bytes) = key_bytes.try_into() {
753 let id = Scru128Id::from_bytes(bytes);
754 eprintln!("CORRUPTED_RECORD_ID: {id}");
755 }
756 }
757 let key = std::str::from_utf8(record.0.as_ref()).unwrap();
758 let value = std::str::from_utf8(record.1.as_ref()).unwrap();
759 panic!("Failed to deserialize frame: {e} {key} {value}")
760 })
761}