1mod ttl;
2pub use ttl::*;
3
4#[cfg(test)]
5mod tests;
6
7use std::ops::Bound;
8use std::path::PathBuf;
9use std::time::Duration;
10
11use tokio::sync::broadcast;
12use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender};
13
14use std::collections::HashSet;
15use std::sync::{Arc, RwLock};
16
17use scru128::Scru128Id;
18
19use serde::{Deserialize, Deserializer, Serialize};
20
21use fjall::{Config, Keyspace, PartitionCreateOptions, PartitionHandle};
22
23pub const ZERO_CONTEXT: Scru128Id = Scru128Id::from_bytes([0; 16]);
25
26#[derive(PartialEq, Eq, Serialize, Deserialize, Clone, Default, bon::Builder)]
27pub struct Frame {
28 #[builder(start_fn, into)]
29 pub topic: String,
30 #[builder(start_fn)]
31 pub context_id: Scru128Id,
32 #[builder(default)]
33 pub id: Scru128Id,
34 pub hash: Option<ssri::Integrity>,
35 pub meta: Option<serde_json::Value>,
36 pub ttl: Option<TTL>,
37}
38
39use std::fmt;
40
41impl fmt::Debug for Frame {
42 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
43 f.debug_struct("Frame")
44 .field("id", &format!("{id}", id = self.id))
45 .field(
46 "context_id",
47 &format!("{context_id}", context_id = self.context_id),
48 )
49 .field("topic", &self.topic)
50 .field("hash", &self.hash.as_ref().map(|x| format!("{x}")))
51 .field("meta", &self.meta)
52 .field("ttl", &self.ttl)
53 .finish()
54 }
55}
56
57impl<'de> Deserialize<'de> for FollowOption {
58 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
59 where
60 D: Deserializer<'de>,
61 {
62 let s: String = Deserialize::deserialize(deserializer)?;
63 if s.is_empty() || s == "yes" {
64 Ok(FollowOption::On)
65 } else if let Ok(duration) = s.parse::<u64>() {
66 Ok(FollowOption::WithHeartbeat(Duration::from_millis(duration)))
67 } else {
68 match s.as_str() {
69 "true" => Ok(FollowOption::On),
70 "false" | "no" => Ok(FollowOption::Off),
71 _ => Err(serde::de::Error::custom("Invalid value for follow option")),
72 }
73 }
74 }
75}
76
77fn deserialize_bool<'de, D>(deserializer: D) -> Result<bool, D::Error>
78where
79 D: Deserializer<'de>,
80{
81 let s: String = Deserialize::deserialize(deserializer)?;
82 match s.as_str() {
83 "false" | "no" | "0" => Ok(false),
84 _ => Ok(true),
85 }
86}
87
88#[derive(PartialEq, Deserialize, Clone, Debug, Default, bon::Builder)]
89pub struct ReadOptions {
90 #[serde(default)]
91 #[builder(default)]
92 pub follow: FollowOption,
93 #[serde(default, deserialize_with = "deserialize_bool")]
94 #[builder(default)]
95 pub tail: bool,
96 #[serde(rename = "last-id")]
97 pub last_id: Option<Scru128Id>,
98 pub limit: Option<usize>,
99 #[serde(rename = "context-id")]
100 pub context_id: Option<Scru128Id>,
101 pub topic: Option<String>,
102}
103
104impl ReadOptions {
105 pub fn from_query(query: Option<&str>) -> Result<Self, crate::error::Error> {
106 match query {
107 Some(q) => Ok(serde_urlencoded::from_str(q)?),
108 None => Ok(Self::default()),
109 }
110 }
111
112 pub fn to_query_string(&self) -> String {
113 let mut params = Vec::new();
114
115 match self.follow {
117 FollowOption::Off => {}
118 FollowOption::On => params.push(("follow", "true".to_string())),
119 FollowOption::WithHeartbeat(duration) => {
120 params.push(("follow", duration.as_millis().to_string()));
121 }
122 }
123
124 if let Some(context_id) = self.context_id {
125 params.push(("context-id", context_id.to_string()));
126 }
127
128 if self.tail {
130 params.push(("tail", "true".to_string()));
131 }
132
133 if let Some(last_id) = self.last_id {
135 params.push(("last-id", last_id.to_string()));
136 }
137
138 if let Some(limit) = self.limit {
140 params.push(("limit", limit.to_string()));
141 }
142
143 if let Some(topic) = &self.topic {
144 params.push(("topic", topic.clone()));
145 }
146
147 if params.is_empty() {
149 String::new()
150 } else {
151 url::form_urlencoded::Serializer::new(String::new())
152 .extend_pairs(params)
153 .finish()
154 }
155 }
156}
157
158#[derive(Default, PartialEq, Clone, Debug)]
159pub enum FollowOption {
160 #[default]
161 Off,
162 On,
163 WithHeartbeat(Duration),
164}
165
166#[derive(Debug)]
167enum GCTask {
168 Remove(Scru128Id),
169 CheckHeadTTL {
170 context_id: Scru128Id,
171 topic: String,
172 keep: u32,
173 },
174 Drain(tokio::sync::oneshot::Sender<()>),
175}
176
177#[derive(Clone)]
178pub struct Store {
179 pub path: PathBuf,
180 keyspace: Keyspace,
181 frame_partition: PartitionHandle,
182 idx_topic: PartitionHandle,
183 idx_context: PartitionHandle,
184 contexts: Arc<RwLock<HashSet<Scru128Id>>>,
185 broadcast_tx: broadcast::Sender<Frame>,
186 gc_tx: UnboundedSender<GCTask>,
187}
188
189impl Store {
190 pub fn new(path: PathBuf) -> Store {
191 let config = Config::new(path.join("fjall"));
192 let keyspace = config
193 .flush_workers(1)
194 .compaction_workers(1)
195 .open()
196 .unwrap();
197
198 let frame_partition = keyspace
199 .open_partition("stream", PartitionCreateOptions::default())
200 .unwrap();
201
202 let idx_topic = keyspace
203 .open_partition("idx_topic", PartitionCreateOptions::default())
204 .unwrap();
205
206 let idx_context = keyspace
207 .open_partition("idx_context", PartitionCreateOptions::default())
208 .unwrap();
209
210 let (broadcast_tx, _) = broadcast::channel(1024);
211 let (gc_tx, gc_rx) = mpsc::unbounded_channel();
212
213 let mut contexts = HashSet::new();
214 contexts.insert(ZERO_CONTEXT); let store = Store {
217 path: path.clone(),
218 keyspace: keyspace.clone(),
219 frame_partition: frame_partition.clone(),
220 idx_topic: idx_topic.clone(),
221 idx_context: idx_context.clone(),
222 contexts: Arc::new(RwLock::new(contexts)),
223 broadcast_tx,
224 gc_tx,
225 };
226
227 for frame in store.read_sync(None, None, Some(ZERO_CONTEXT)) {
229 if frame.topic == "xs.context" {
230 store.contexts.write().unwrap().insert(frame.id);
231 }
232 }
233
234 spawn_gc_worker(gc_rx, store.clone());
236
237 store
238 }
239
240 pub async fn wait_for_gc(&self) {
241 let (tx, rx) = tokio::sync::oneshot::channel();
242 let _ = self.gc_tx.send(GCTask::Drain(tx));
243 let _ = rx.await;
244 }
245
246 #[tracing::instrument(skip(self))]
247 pub async fn read(&self, options: ReadOptions) -> tokio::sync::mpsc::Receiver<Frame> {
248 let (tx, rx) = tokio::sync::mpsc::channel(100);
249
250 let should_follow = matches!(
251 options.follow,
252 FollowOption::On | FollowOption::WithHeartbeat(_)
253 );
254
255 let broadcast_rx = if should_follow {
259 Some(self.broadcast_tx.subscribe())
260 } else {
261 None
262 };
263
264 let done_rx = if !options.tail {
266 let (done_tx, done_rx) = tokio::sync::oneshot::channel();
267 let tx_clone = tx.clone();
268 let store = self.clone();
269 let options = options.clone();
270 let should_follow_clone = should_follow;
271 let gc_tx = self.gc_tx.clone();
272
273 std::thread::spawn(move || {
275 let mut last_id = None;
276 let mut count = 0;
277
278 let iter: Box<dyn Iterator<Item = Frame>> = if let Some(ref topic) = options.topic {
279 store.iter_frames_by_topic(options.context_id, topic, options.last_id.as_ref())
280 } else {
281 store.iter_frames(options.context_id, options.last_id.as_ref())
282 };
283
284 for frame in iter {
285 if let Some(TTL::Time(ttl)) = frame.ttl.as_ref() {
286 if is_expired(&frame.id, ttl) {
287 let _ = gc_tx.send(GCTask::Remove(frame.id));
288 continue;
289 }
290 }
291
292 last_id = Some(frame.id);
293
294 if let Some(limit) = options.limit {
295 if count >= limit {
296 return; }
298 }
299
300 if tx_clone.blocking_send(frame).is_err() {
301 return;
302 }
303 count += 1;
304 }
305
306 if should_follow_clone && options.limit.is_none() {
308 let threshold =
309 Frame::builder("xs.threshold", options.context_id.unwrap_or(ZERO_CONTEXT))
310 .id(scru128::new())
311 .ttl(TTL::Ephemeral)
312 .build();
313 if tx_clone.blocking_send(threshold).is_err() {
314 return;
315 }
316 }
317
318 let _ = done_tx.send((last_id, count));
320 });
321
322 Some(done_rx)
323 } else {
324 None
325 };
326
327 if let Some(broadcast_rx) = broadcast_rx {
329 {
330 let tx = tx.clone();
331 let limit = options.limit;
332
333 tokio::spawn(async move {
334 let (last_id, mut count) = match done_rx {
336 Some(done_rx) => match done_rx.await {
337 Ok((id, count)) => (id, count),
338 Err(_) => return, },
340 None => (None, 0),
341 };
342
343 let mut broadcast_rx = broadcast_rx;
344 while let Ok(frame) = broadcast_rx.recv().await {
345 if let Some(context_id) = options.context_id {
347 if frame.context_id != context_id {
348 continue;
349 }
350 }
351
352 if let Some(ref topic) = options.topic {
353 if frame.topic != *topic {
354 continue;
355 }
356 }
357
358 if let Some(last_scanned_id) = last_id {
360 if frame.id <= last_scanned_id {
361 continue;
362 }
363 }
364
365 if tx.send(frame).await.is_err() {
366 break;
367 }
368
369 if let Some(limit) = limit {
370 count += 1;
371 if count >= limit {
372 break;
373 }
374 }
375 }
376 });
377 }
378
379 if let FollowOption::WithHeartbeat(duration) = options.follow {
381 let heartbeat_tx = tx;
382 tokio::spawn(async move {
383 loop {
384 tokio::time::sleep(duration).await;
385 let frame =
386 Frame::builder("xs.pulse", options.context_id.unwrap_or(ZERO_CONTEXT))
387 .id(scru128::new())
388 .ttl(TTL::Ephemeral)
389 .build();
390 if heartbeat_tx.send(frame).await.is_err() {
391 break;
392 }
393 }
394 });
395 }
396 }
397
398 rx
399 }
400
401 #[tracing::instrument(skip(self))]
402 pub fn read_sync(
403 &self,
404 last_id: Option<&Scru128Id>,
405 limit: Option<usize>,
406 context_id: Option<Scru128Id>,
407 ) -> impl Iterator<Item = Frame> + '_ {
408 self.iter_frames(context_id, last_id)
409 .filter(move |frame| {
410 if let Some(TTL::Time(ttl)) = frame.ttl.as_ref() {
411 if is_expired(&frame.id, ttl) {
412 let _ = self.gc_tx.send(GCTask::Remove(frame.id));
413 return false;
414 }
415 }
416 true
417 })
418 .take(limit.unwrap_or(usize::MAX))
419 }
420
421 pub fn get(&self, id: &Scru128Id) -> Option<Frame> {
422 self.frame_partition
423 .get(id.to_bytes())
424 .unwrap()
425 .map(|value| deserialize_frame((id.as_bytes(), value)))
426 }
427
428 #[tracing::instrument(skip(self))]
429 pub fn head(&self, topic: &str, context_id: Scru128Id) -> Option<Frame> {
430 self.idx_topic
431 .prefix(idx_topic_key_prefix(context_id, topic))
432 .rev()
433 .find_map(|kv| self.get(&idx_topic_frame_id_from_key(&kv.unwrap().0)))
434 }
435
436 #[tracing::instrument(skip(self), fields(id = %id.to_string()))]
437 pub fn remove(&self, id: &Scru128Id) -> Result<(), crate::error::Error> {
438 let Some(frame) = self.get(id) else {
439 return Ok(());
441 };
442
443 let topic_key = idx_topic_key_from_frame(&frame)?;
445
446 let mut batch = self.keyspace.batch();
447 batch.remove(&self.frame_partition, id.as_bytes());
448 batch.remove(&self.idx_topic, topic_key);
449 batch.remove(&self.idx_context, idx_context_key_from_frame(&frame));
450
451 if frame.topic == "xs.context" {
453 self.contexts.write().unwrap().remove(&frame.id);
454 }
455
456 batch.commit()?;
457 self.keyspace.persist(fjall::PersistMode::SyncAll)?;
458 Ok(())
459 }
460
461 pub async fn cas_reader(&self, hash: ssri::Integrity) -> cacache::Result<cacache::Reader> {
462 cacache::Reader::open_hash(&self.path.join("cacache"), hash).await
463 }
464
465 pub fn cas_reader_sync(&self, hash: ssri::Integrity) -> cacache::Result<cacache::SyncReader> {
466 cacache::SyncReader::open_hash(self.path.join("cacache"), hash)
467 }
468
469 pub async fn cas_writer(&self) -> cacache::Result<cacache::Writer> {
470 cacache::WriteOpts::new()
471 .open_hash(&self.path.join("cacache"))
472 .await
473 }
474
475 pub fn cas_writer_sync(&self) -> cacache::Result<cacache::SyncWriter> {
476 cacache::WriteOpts::new().open_hash_sync(self.path.join("cacache"))
477 }
478
479 pub async fn cas_insert(&self, content: impl AsRef<[u8]>) -> cacache::Result<ssri::Integrity> {
480 cacache::write_hash(&self.path.join("cacache"), content).await
481 }
482
483 pub fn cas_insert_sync(&self, content: impl AsRef<[u8]>) -> cacache::Result<ssri::Integrity> {
484 cacache::write_hash_sync(self.path.join("cacache"), content)
485 }
486
487 pub async fn cas_insert_bytes(&self, bytes: &[u8]) -> cacache::Result<ssri::Integrity> {
488 self.cas_insert(bytes).await
489 }
490
491 pub fn cas_insert_bytes_sync(&self, bytes: &[u8]) -> cacache::Result<ssri::Integrity> {
492 self.cas_insert_sync(bytes)
493 }
494
495 pub async fn cas_read(&self, hash: &ssri::Integrity) -> cacache::Result<Vec<u8>> {
496 cacache::read_hash(&self.path.join("cacache"), hash).await
497 }
498
499 pub fn cas_read_sync(&self, hash: &ssri::Integrity) -> cacache::Result<Vec<u8>> {
500 cacache::read_hash_sync(self.path.join("cacache"), hash)
501 }
502
503 #[tracing::instrument(skip(self))]
504 pub fn insert_frame(&self, frame: &Frame) -> Result<(), crate::error::Error> {
505 let encoded: Vec<u8> = serde_json::to_vec(&frame).unwrap();
506
507 let topic_key = idx_topic_key_from_frame(frame)?;
509
510 let mut batch = self.keyspace.batch();
511 batch.insert(&self.frame_partition, frame.id.as_bytes(), encoded);
512 batch.insert(&self.idx_topic, topic_key, b"");
513 batch.insert(&self.idx_context, idx_context_key_from_frame(frame), b"");
514 batch.commit()?;
515 self.keyspace.persist(fjall::PersistMode::SyncAll)?;
516 Ok(())
517 }
518
519 pub fn append(&self, mut frame: Frame) -> Result<Frame, crate::error::Error> {
520 frame.id = scru128::new();
521
522 if frame.topic == "xs.context" {
524 if frame.context_id != ZERO_CONTEXT {
525 return Err("xs.context frames must be in zero context".into());
526 }
527 frame.ttl = Some(TTL::Forever);
528 self.contexts.write().unwrap().insert(frame.id);
529 } else {
530 let contexts = self.contexts.read().unwrap();
532 if !contexts.contains(&frame.context_id) {
533 return Err(format!(
534 "Invalid context: {context_id}",
535 context_id = frame.context_id
536 )
537 .into());
538 }
539 }
540
541 idx_topic_key_from_frame(&frame)?;
543
544 if frame.ttl != Some(TTL::Ephemeral) {
546 self.insert_frame(&frame)?;
547
548 if let Some(TTL::Head(n)) = frame.ttl {
550 let _ = self.gc_tx.send(GCTask::CheckHeadTTL {
551 context_id: frame.context_id,
552 topic: frame.topic.clone(),
553 keep: n,
554 });
555 }
556 }
557
558 let _ = self.broadcast_tx.send(frame.clone());
559 Ok(frame)
560 }
561
562 fn iter_frames(
563 &self,
564 context_id: Option<Scru128Id>,
565 last_id: Option<&Scru128Id>,
566 ) -> Box<dyn Iterator<Item = Frame> + '_> {
567 match context_id {
568 Some(ctx_id) => {
569 let start_key = if let Some(last_id) = last_id {
570 let mut v = Vec::with_capacity(32);
572 v.extend(ctx_id.as_bytes());
573 v.extend(last_id.as_bytes());
574 Bound::Excluded(v)
575 } else {
576 Bound::Included(ctx_id.as_bytes().to_vec())
577 };
578
579 let end_key = Bound::Excluded(idx_context_key_range_end(ctx_id));
580
581 Box::new(
582 self.idx_context
583 .range((start_key, end_key))
584 .filter_map(move |r| {
585 let (key, _) = r.ok()?;
586 let frame_id_bytes = &key[16..];
587 let frame_id = Scru128Id::from_bytes(frame_id_bytes.try_into().ok()?);
588 self.get(&frame_id)
589 }),
590 )
591 }
592 None => {
593 let range = match last_id {
594 Some(id) => (Bound::Excluded(id.as_bytes().to_vec()), Bound::Unbounded),
595 None => (Bound::Unbounded, Bound::Unbounded),
596 };
597
598 Box::new(
599 self.frame_partition
600 .range(range)
601 .map(|r| deserialize_frame(r.unwrap())),
602 )
603 }
604 }
605 }
606
607 fn iter_frames_by_topic<'a>(
608 &'a self,
609 context_id: Option<Scru128Id>,
610 topic: &'a str,
611 last_id: Option<&'a Scru128Id>,
612 ) -> Box<dyn Iterator<Item = Frame> + 'a> {
613 if let Some(ctx_id) = context_id {
614 let prefix = idx_topic_key_prefix(ctx_id, topic);
615 Box::new(self.idx_topic.prefix(prefix).filter_map(move |r| {
616 let (key, _) = r.ok()?;
617 let frame_id = idx_topic_frame_id_from_key(&key);
618 if let Some(last) = last_id {
619 if frame_id <= *last {
620 return None;
621 }
622 }
623 self.get(&frame_id)
624 }))
625 } else {
626 let range = match last_id {
627 Some(id) => (Bound::Excluded(id.as_bytes().to_vec()), Bound::Unbounded),
628 None => (Bound::Unbounded, Bound::Unbounded),
629 };
630
631 Box::new(self.frame_partition.range(range).filter_map(move |r| {
632 let frame = deserialize_frame(r.unwrap());
633 if frame.topic == topic {
634 Some(frame)
635 } else {
636 None
637 }
638 }))
639 }
640 }
641}
642
643fn spawn_gc_worker(mut gc_rx: UnboundedReceiver<GCTask>, store: Store) {
644 std::thread::spawn(move || {
645 while let Some(task) = gc_rx.blocking_recv() {
646 match task {
647 GCTask::Remove(id) => {
648 let _ = store.remove(&id);
649 }
650
651 GCTask::CheckHeadTTL {
652 context_id,
653 topic,
654 keep,
655 } => {
656 let prefix = idx_topic_key_prefix(context_id, &topic);
657 let frames_to_remove: Vec<_> = store
658 .idx_topic
659 .prefix(&prefix)
660 .rev() .skip(keep as usize)
662 .map(|r| {
663 Scru128Id::from_bytes(idx_topic_frame_id_from_key(&r.unwrap().0).into())
664 })
665 .collect();
666
667 for frame_id in frames_to_remove {
668 let _ = store.remove(&frame_id);
669 }
670 }
671
672 GCTask::Drain(tx) => {
673 let _ = tx.send(());
674 }
675 }
676 }
677 });
678}
679
680fn is_expired(id: &Scru128Id, ttl: &Duration) -> bool {
681 let created_ms = id.timestamp();
682 let expires_ms = created_ms.saturating_add(ttl.as_millis() as u64);
683 let now_ms = std::time::SystemTime::now()
684 .duration_since(std::time::UNIX_EPOCH)
685 .unwrap()
686 .as_millis() as u64;
687
688 now_ms >= expires_ms
689}
690
691const NULL_DELIMITER: u8 = 0;
692
693fn idx_topic_key_prefix(context_id: Scru128Id, topic: &str) -> Vec<u8> {
694 let mut v = Vec::with_capacity(16 + topic.len() + 1); v.extend(context_id.as_bytes()); v.extend(topic.as_bytes()); v.push(NULL_DELIMITER); v
699}
700
701pub(crate) fn idx_topic_key_from_frame(frame: &Frame) -> Result<Vec<u8>, crate::error::Error> {
702 if frame.topic.as_bytes().contains(&NULL_DELIMITER) {
704 return Err(
705 "Topic cannot contain null byte (0x00) as it's used as a delimiter"
706 .to_string()
707 .into(),
708 );
709 }
710 let mut v = idx_topic_key_prefix(frame.context_id, &frame.topic);
711 v.extend(frame.id.as_bytes());
712 Ok(v)
713}
714
715fn idx_topic_frame_id_from_key(key: &[u8]) -> Scru128Id {
716 let frame_id_bytes = &key[key.len() - 16..];
717 Scru128Id::from_bytes(frame_id_bytes.try_into().unwrap())
718}
719
720fn idx_context_key_from_frame(frame: &Frame) -> Vec<u8> {
722 let mut v = Vec::with_capacity(frame.context_id.as_bytes().len() + frame.id.as_bytes().len());
723 v.extend(frame.context_id.as_bytes());
724 v.extend(frame.id.as_bytes());
725 v
726}
727
728fn idx_context_key_range_end(context_id: Scru128Id) -> Vec<u8> {
730 let mut i = context_id.to_u128();
731
732 i = i.saturating_add(1);
734
735 Scru128Id::from(i).as_bytes().to_vec()
736}
737
738fn deserialize_frame<B1: AsRef<[u8]> + std::fmt::Debug, B2: AsRef<[u8]>>(
739 record: (B1, B2),
740) -> Frame {
741 serde_json::from_slice(record.1.as_ref()).unwrap_or_else(|e| {
742 let key_bytes = record.0.as_ref();
744 if key_bytes.len() == 16 {
745 if let Ok(bytes) = key_bytes.try_into() {
746 let id = Scru128Id::from_bytes(bytes);
747 eprintln!("CORRUPTED_RECORD_ID: {id}");
748 }
749 }
750 let key = std::str::from_utf8(record.0.as_ref()).unwrap();
751 let value = std::str::from_utf8(record.1.as_ref()).unwrap();
752 panic!("Failed to deserialize frame: {e} {key} {value}")
753 })
754}