1use std::path::Path;
54
55use evento_core::{
56 cursor::{Args, Cursor, ReadResult, Value},
57 metadata::Metadata,
58 Event, Executor, ReadAggregator, RoutingKey, WriteError,
59};
60use fjall::{Config, Keyspace, Partition, PartitionCreateOptions, PersistMode};
61use ulid::Ulid;
62
63#[derive(Debug, Clone, bitcode::Encode, bitcode::Decode)]
65struct SubscriberState {
66 worker_id: String,
67 cursor: Option<String>,
68 lag: u64,
69}
70
71#[derive(Debug, Clone, bitcode::Encode, bitcode::Decode)]
73struct StoredEvent {
74 id: String,
75 aggregator_id: String,
76 aggregator_type: String,
77 version: u16,
78 name: String,
79 routing_key: Option<String>,
80 data: Vec<u8>,
81 metadata: Metadata,
82 timestamp: u64,
83 timestamp_subsec: u32,
84}
85
86impl From<&Event> for StoredEvent {
87 fn from(event: &Event) -> Self {
88 Self {
89 id: event.id.to_string(),
90 aggregator_id: event.aggregator_id.clone(),
91 aggregator_type: event.aggregator_type.clone(),
92 version: event.version,
93 name: event.name.clone(),
94 routing_key: event.routing_key.clone(),
95 data: event.data.clone(),
96 metadata: event.metadata.clone(),
97 timestamp: event.timestamp,
98 timestamp_subsec: event.timestamp_subsec,
99 }
100 }
101}
102
103impl TryFrom<StoredEvent> for Event {
104 type Error = ulid::DecodeError;
105
106 fn try_from(stored: StoredEvent) -> Result<Self, Self::Error> {
107 Ok(Self {
108 id: Ulid::from_string(&stored.id)?,
109 aggregator_id: stored.aggregator_id,
110 aggregator_type: stored.aggregator_type,
111 version: stored.version,
112 name: stored.name,
113 routing_key: stored.routing_key,
114 data: stored.data,
115 metadata: stored.metadata,
116 timestamp: stored.timestamp,
117 timestamp_subsec: stored.timestamp_subsec,
118 })
119 }
120}
121
122pub struct Fjall {
143 keyspace: Keyspace,
144 events: Partition,
145 agg_index: Partition,
146 routing_index: Partition,
147 type_index: Partition,
148 subscribers: Partition,
149}
150
151impl Clone for Fjall {
152 fn clone(&self) -> Self {
153 Self {
154 keyspace: self.keyspace.clone(),
155 events: self.events.clone(),
156 agg_index: self.agg_index.clone(),
157 routing_index: self.routing_index.clone(),
158 type_index: self.type_index.clone(),
159 subscribers: self.subscribers.clone(),
160 }
161 }
162}
163
164impl Fjall {
165 pub fn open(path: impl AsRef<Path>) -> anyhow::Result<Self> {
174 let keyspace = Config::new(path).open()?;
175 Self::from_keyspace(keyspace)
176 }
177
178 pub fn from_keyspace(keyspace: Keyspace) -> anyhow::Result<Self> {
191 let opts = PartitionCreateOptions::default();
192
193 Ok(Self {
194 events: keyspace.open_partition("events", opts.clone())?,
195 agg_index: keyspace.open_partition("agg_index", opts.clone())?,
196 routing_index: keyspace.open_partition("routing_index", opts.clone())?,
197 type_index: keyspace.open_partition("type_index", opts.clone())?,
198 subscribers: keyspace.open_partition("subscribers", opts)?,
199 keyspace,
200 })
201 }
202
203 pub fn keyspace(&self) -> &Keyspace {
205 &self.keyspace
206 }
207
208 pub fn persist(&self) -> anyhow::Result<()> {
213 self.keyspace.persist(PersistMode::SyncAll)?;
214 Ok(())
215 }
216
217 fn agg_key(aggregator_type: &str, aggregator_id: &str, version: u16) -> Vec<u8> {
219 let mut key = format!("{}\x00{}\x00", aggregator_type, aggregator_id).into_bytes();
220 key.extend_from_slice(&version.to_be_bytes());
221 key
222 }
223
224 fn agg_prefix(aggregator_type: &str, aggregator_id: &str) -> String {
226 format!("{}\x00{}\x00", aggregator_type, aggregator_id)
227 }
228
229 fn type_key(aggregator_type: &str, name: &str, id: &Ulid) -> Vec<u8> {
231 let mut key = format!("{}\x00{}\x00", aggregator_type, name).into_bytes();
232 key.extend_from_slice(&id.to_bytes());
233 key
234 }
235
236 fn type_prefix(aggregator_type: &str, name: &str) -> String {
238 format!("{}\x00{}\x00", aggregator_type, name)
239 }
240
241 fn routing_key(routing_key: &str, id: &Ulid) -> Vec<u8> {
243 let mut key = format!("{}\x00", routing_key).into_bytes();
244 key.extend_from_slice(&id.to_bytes());
245 key
246 }
247
248 fn routing_prefix(routing_key: &str) -> String {
250 format!("{}\x00", routing_key)
251 }
252
253 fn get_last_version(
255 &self,
256 aggregator_type: &str,
257 aggregator_id: &str,
258 ) -> anyhow::Result<Option<u16>> {
259 let prefix = Self::agg_prefix(aggregator_type, aggregator_id);
260
261 if let Some(result) = self.agg_index.prefix(&prefix).next_back() {
262 let kv = result?;
263 let key_bytes = kv.0.as_ref();
265 if key_bytes.len() >= 2 {
266 let version_bytes: [u8; 2] = key_bytes[key_bytes.len() - 2..].try_into().unwrap();
267 return Ok(Some(u16::from_be_bytes(version_bytes)));
268 }
269 }
270
271 Ok(None)
272 }
273
274 fn load_event(&self, id: &Ulid) -> anyhow::Result<Option<Event>> {
276 match self.events.get(id.to_bytes())? {
277 Some(bytes) => {
278 let stored: StoredEvent = bitcode::decode(&bytes)
279 .map_err(|e| anyhow::anyhow!("Failed to deserialize event: {}", e))?;
280 Ok(Some(stored.try_into()?))
281 }
282 None => Ok(None),
283 }
284 }
285
286 fn collect_event_ids(
288 &self,
289 aggregators: &Option<Vec<ReadAggregator>>,
290 routing_key: &Option<RoutingKey>,
291 ) -> anyhow::Result<Vec<Ulid>> {
292 use std::collections::HashSet;
293 let mut event_ids_set = HashSet::new();
294 let mut event_ids = Vec::new();
295
296 macro_rules! add_unique {
298 ($ulid:expr) => {
299 if event_ids_set.insert($ulid) {
300 event_ids.push($ulid);
301 }
302 };
303 }
304
305 match (aggregators, routing_key) {
306 (Some(aggs), _) => {
308 for agg in aggs {
309 match (&agg.aggregator_id, &agg.name) {
310 (Some(id), Some(name)) => {
312 let prefix = Self::agg_prefix(&agg.aggregator_type, id);
313 for kv in self.agg_index.prefix(&prefix) {
314 let kv = kv?;
315 let ulid_bytes: [u8; 16] = kv.1.as_ref().try_into()?;
316 let ulid = Ulid::from_bytes(ulid_bytes);
317
318 if let Some(event) = self.load_event(&ulid)? {
320 if &event.name == name {
321 add_unique!(ulid);
322 }
323 }
324 }
325 }
326 (Some(id), None) => {
328 let prefix = Self::agg_prefix(&agg.aggregator_type, id);
329 for kv in self.agg_index.prefix(&prefix) {
330 let kv = kv?;
331 let ulid_bytes: [u8; 16] = kv.1.as_ref().try_into()?;
332 add_unique!(Ulid::from_bytes(ulid_bytes));
333 }
334 }
335 (None, Some(name)) => {
337 let prefix = Self::type_prefix(&agg.aggregator_type, name);
338 for kv in self.type_index.prefix(&prefix) {
339 let kv = kv?;
340 let key_bytes = kv.0.as_ref();
341 if key_bytes.len() >= 16 {
342 let ulid_bytes: [u8; 16] =
343 key_bytes[key_bytes.len() - 16..].try_into()?;
344 add_unique!(Ulid::from_bytes(ulid_bytes));
345 }
346 }
347 }
348 (None, None) => {
350 let prefix = format!("{}\x00", agg.aggregator_type);
351 for kv in self.agg_index.prefix(&prefix) {
352 let kv = kv?;
353 let ulid_bytes: [u8; 16] = kv.1.as_ref().try_into()?;
354 add_unique!(Ulid::from_bytes(ulid_bytes));
355 }
356 }
357 }
358 }
359 }
360 (None, Some(RoutingKey::Value(Some(ref key)))) => {
362 let prefix = Self::routing_prefix(key);
363 for kv in self.routing_index.prefix(&prefix) {
364 let kv = kv?;
365 let key_bytes = kv.0.as_ref();
366 if key_bytes.len() >= 16 {
367 let ulid_bytes: [u8; 16] = key_bytes[key_bytes.len() - 16..].try_into()?;
368 add_unique!(Ulid::from_bytes(ulid_bytes));
369 }
370 }
371 }
372 _ => {
374 for kv in self.events.iter() {
375 let kv = kv?;
376 let ulid_bytes: [u8; 16] = kv.0.as_ref().try_into()?;
377 add_unique!(Ulid::from_bytes(ulid_bytes));
378 }
379 }
380 }
381
382 Ok(event_ids)
383 }
384}
385
386#[async_trait::async_trait]
387impl Executor for Fjall {
388 async fn write(&self, events: Vec<Event>) -> Result<(), WriteError> {
389 let executor = self.clone();
390
391 tokio::task::spawn_blocking(move || {
392 for event in &events {
394 let last_version = executor
395 .get_last_version(&event.aggregator_type, &event.aggregator_id)
396 .map_err(WriteError::Unknown)?;
397
398 match last_version {
399 Some(v) if event.version != v + 1 => {
400 return Err(WriteError::InvalidOriginalVersion);
401 }
402 None if event.version != 1 => {
403 return Err(WriteError::InvalidOriginalVersion);
404 }
405 _ => {}
406 }
407 }
408
409 let mut batch = executor.keyspace.batch();
411
412 for event in &events {
413 let id_bytes = event.id.to_bytes();
414 let stored = StoredEvent::from(event);
415 let event_bytes = bitcode::encode(&stored);
416
417 batch.insert(&executor.events, id_bytes, event_bytes.as_slice());
419
420 let agg_key =
422 Fjall::agg_key(&event.aggregator_type, &event.aggregator_id, event.version);
423 batch.insert(&executor.agg_index, agg_key, id_bytes);
424
425 let type_key = Fjall::type_key(&event.aggregator_type, &event.name, &event.id);
427 batch.insert(&executor.type_index, type_key, []);
428
429 if let Some(ref routing_key) = event.routing_key {
431 let routing_key = Fjall::routing_key(routing_key, &event.id);
432 batch.insert(&executor.routing_index, routing_key, []);
433 }
434 }
435
436 batch.commit().map_err(|e| WriteError::Unknown(e.into()))?;
437 executor
438 .keyspace
439 .persist(PersistMode::SyncAll)
440 .map_err(|e| WriteError::Unknown(e.into()))?;
441
442 Ok(())
443 })
444 .await
445 .map_err(|e| WriteError::Unknown(e.into()))?
446 }
447
448 async fn read(
449 &self,
450 aggregators: Option<Vec<ReadAggregator>>,
451 routing_key: Option<RoutingKey>,
452 args: Args,
453 ) -> anyhow::Result<ReadResult<Event>> {
454 let executor = self.clone();
455
456 tokio::task::spawn_blocking(move || {
457 let is_backward = args.is_backward();
458 let (limit, cursor) = args.get_info();
459
460 let mut event_ids = executor.collect_event_ids(&aggregators, &routing_key)?;
462
463 event_ids.sort();
465 if is_backward {
466 event_ids.reverse();
467 }
468
469 if let Some(ref cursor_value) = cursor {
471 let cursor_data = Event::deserialize_cursor(cursor_value)?;
472 let cursor_ulid = Ulid::from_string(&cursor_data.i)?;
473
474 event_ids.retain(|id| {
475 if is_backward {
476 *id < cursor_ulid
477 } else {
478 *id > cursor_ulid
479 }
480 });
481 }
482
483 let target_count = (limit + 1) as usize;
485 let mut events = Vec::new();
486
487 for id in event_ids {
488 if events.len() >= target_count {
489 break;
490 }
491
492 if let Some(event) = executor.load_event(&id)? {
493 let matches = match &routing_key {
495 Some(RoutingKey::Value(Some(ref key))) => {
496 event.routing_key.as_ref() == Some(key)
497 }
498 Some(RoutingKey::Value(None)) => event.routing_key.is_none(),
499 Some(RoutingKey::All) | None => true,
500 };
501
502 if matches {
503 events.push(event);
504 }
505 }
506 }
507
508 evento_core::cursor::Reader::new(events)
510 .args(args)
511 .execute()
512 .map_err(|e| anyhow::anyhow!("{}", e))
513 })
514 .await?
515 }
516
517 async fn get_subscriber_cursor(&self, key: String) -> anyhow::Result<Option<Value>> {
518 let executor = self.clone();
519
520 tokio::task::spawn_blocking(move || match executor.subscribers.get(&key)? {
521 Some(bytes) => {
522 let state: SubscriberState = bitcode::decode(&bytes)
523 .map_err(|e| anyhow::anyhow!("Failed to deserialize subscriber: {}", e))?;
524 Ok(state.cursor.map(Value))
525 }
526 None => Ok(None),
527 })
528 .await?
529 }
530
531 async fn is_subscriber_running(&self, key: String, worker_id: Ulid) -> anyhow::Result<bool> {
532 let executor = self.clone();
533
534 tokio::task::spawn_blocking(move || match executor.subscribers.get(&key)? {
535 Some(bytes) => {
536 let state: SubscriberState = bitcode::decode(&bytes)
537 .map_err(|e| anyhow::anyhow!("Failed to deserialize subscriber: {}", e))?;
538 Ok(state.worker_id == worker_id.to_string())
539 }
540 None => Ok(false),
541 })
542 .await?
543 }
544
545 async fn upsert_subscriber(&self, key: String, worker_id: Ulid) -> anyhow::Result<()> {
546 let executor = self.clone();
547
548 tokio::task::spawn_blocking(move || {
549 let cursor = match executor.subscribers.get(&key)? {
551 Some(bytes) => {
552 let state: SubscriberState = bitcode::decode(&bytes)
553 .map_err(|e| anyhow::anyhow!("Failed to deserialize subscriber: {}", e))?;
554 state.cursor
555 }
556 None => None,
557 };
558
559 let state = SubscriberState {
560 worker_id: worker_id.to_string(),
561 cursor,
562 lag: 0,
563 };
564
565 executor.subscribers.insert(&key, bitcode::encode(&state))?;
566 Ok(())
567 })
568 .await?
569 }
570
571 async fn acknowledge(&self, key: String, cursor: Value, lag: u64) -> anyhow::Result<()> {
572 let executor = self.clone();
573
574 tokio::task::spawn_blocking(move || {
575 let state = match executor.subscribers.get(&key)? {
576 Some(bytes) => {
577 let mut state: SubscriberState = bitcode::decode(&bytes)
578 .map_err(|e| anyhow::anyhow!("Failed to deserialize subscriber: {}", e))?;
579 state.cursor = Some(cursor.0);
580 state.lag = lag;
581 state
582 }
583 None => anyhow::bail!("Subscriber not found: {}", key),
584 };
585
586 executor.subscribers.insert(&key, bitcode::encode(&state))?;
587 Ok(())
588 })
589 .await?
590 }
591
592 async fn get_snapshot(
593 &self,
594 _aggregator_type: String,
595 _aggregator_revision: String,
596 _id: String,
597 ) -> anyhow::Result<Option<(Vec<u8>, Value)>> {
598 todo!()
599 }
600
601 async fn save_snapshot(
602 &self,
603 _aggregator_type: String,
604 _aggregator_revision: String,
605 _id: String,
606 _data: Vec<u8>,
607 _cursor: Value,
608 ) -> anyhow::Result<()> {
609 todo!()
610 }
611}
612
613impl From<Keyspace> for Fjall {
614 fn from(keyspace: Keyspace) -> Self {
615 Self::from_keyspace(keyspace).expect("Failed to create Fjall from keyspace")
616 }
617}
618
619#[cfg(test)]
620mod tests {
621 use super::*;
622 use std::time::{SystemTime, UNIX_EPOCH};
623
624 fn create_test_event(aggregator_id: &str, version: u16, name: &str) -> Event {
625 let now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap();
626 Event {
627 id: Ulid::new(),
628 aggregator_id: aggregator_id.to_string(),
629 aggregator_type: "test/Account".to_string(),
630 version,
631 name: name.to_string(),
632 routing_key: Some("test-routing".to_string()),
633 data: vec![1, 2, 3],
634 metadata: Metadata::default(),
635 timestamp: now.as_secs(),
636 timestamp_subsec: now.subsec_millis(),
637 }
638 }
639
640 #[tokio::test]
641 async fn test_write_and_read_events() {
642 let temp_dir = tempfile::tempdir().unwrap();
643 let executor = Fjall::open(temp_dir.path()).unwrap();
644
645 let event1 = create_test_event("agg-1", 1, "Created");
646 let event2 = create_test_event("agg-1", 2, "Updated");
647
648 executor.write(vec![event1.clone()]).await.unwrap();
650 executor.write(vec![event2.clone()]).await.unwrap();
651
652 let result = executor
654 .read(
655 Some(vec![ReadAggregator::id("test/Account", "agg-1")]),
656 None,
657 Args::forward(10, None),
658 )
659 .await
660 .unwrap();
661
662 assert_eq!(result.edges.len(), 2);
663 assert_eq!(result.edges[0].node.version, 1);
664 assert_eq!(result.edges[1].node.version, 2);
665 }
666
667 #[tokio::test]
668 async fn test_version_conflict() {
669 let temp_dir = tempfile::tempdir().unwrap();
670 let executor = Fjall::open(temp_dir.path()).unwrap();
671
672 let event1 = create_test_event("agg-1", 1, "Created");
673 executor.write(vec![event1]).await.unwrap();
674
675 let event2 = create_test_event("agg-1", 1, "Duplicate");
677 let result = executor.write(vec![event2]).await;
678
679 assert!(matches!(result, Err(WriteError::InvalidOriginalVersion)));
680 }
681
682 #[tokio::test]
683 async fn test_subscriber_lifecycle() {
684 let temp_dir = tempfile::tempdir().unwrap();
685 let executor = Fjall::open(temp_dir.path()).unwrap();
686
687 let worker_id = Ulid::new();
688 let key = "test-subscriber".to_string();
689
690 executor
692 .upsert_subscriber(key.clone(), worker_id)
693 .await
694 .unwrap();
695
696 assert!(executor
698 .is_subscriber_running(key.clone(), worker_id)
699 .await
700 .unwrap());
701
702 assert!(executor
704 .get_subscriber_cursor(key.clone())
705 .await
706 .unwrap()
707 .is_none());
708
709 executor
711 .acknowledge(key.clone(), Value("test-cursor".to_string()), 0)
712 .await
713 .unwrap();
714
715 let cursor = executor.get_subscriber_cursor(key).await.unwrap();
717 assert_eq!(cursor.unwrap().0, "test-cursor");
718 }
719}