Skip to main content

evento_fjall/
lib.rs

1//! Fjall embedded key-value store implementation for evento.
2//!
3//! This crate provides an [`Executor`] implementation using [fjall](https://crates.io/crates/fjall),
4//! an LSM-tree based embedded key-value storage engine.
5//!
6//! # Features
7//!
8//! - **Embedded storage** - No external database server required
9//! - **LSM-tree based** - Optimized for write-heavy workloads
10//! - **Atomic writes** - Cross-partition transactional semantics
11//! - **Efficient range scans** - Fast prefix and range queries
12//!
13//! # Example
14//!
15//! ```rust,ignore
16//! use evento_fjall::Fjall;
17//! use evento_core::{Executor, metadata::Metadata, cursor::Args, ReadAggregator};
18//!
19//! // Define events using an enum
20//! #[evento::aggregator]
21//! pub enum User {
22//!     UserCreated { name: String },
23//! }
24//!
25//! // Open the database
26//! let executor = Fjall::open("./my-events")?;
27//!
28//! // Create events
29//! let id = evento::create()
30//!     .event(&UserCreated { name: "Alice".into() })
31//!     .metadata(&Metadata::default())
32//!     .commit(&executor)
33//!     .await?;
34//!
35//! // Query events
36//! let events = executor.read(
37//!     Some(vec![ReadAggregator::id("user/User", &id)]),
38//!     None,
39//!     Args::forward(10, None),
40//! ).await?;
41//! ```
42//!
43//! # Data Model
44//!
45//! Events are stored across multiple partitions for efficient querying:
46//!
47//! - `events` - Primary storage: `ULID -> Event`
48//! - `agg_index` - Aggregate index: `{type}\0{id}\0{version}` -> `ULID`
49//! - `routing_index` - Routing key index: `{routing_key}\0{ULID}` -> `()`
50//! - `type_index` - Event type index: `{type}\0{name}\0{ULID}` -> `()`
51//! - `subscribers` - Subscription state: `{key}` -> `SubscriberState`
52
53use std::path::Path;
54
55use evento_core::{
56    cursor::{Args, Cursor, ReadResult, Value},
57    metadata::Metadata,
58    Event, Executor, ReadAggregator, RoutingKey, WriteError,
59};
60use fjall::{Config, Keyspace, Partition, PartitionCreateOptions, PersistMode};
61use ulid::Ulid;
62
63/// Subscriber state stored in the database.
64#[derive(Debug, Clone, bitcode::Encode, bitcode::Decode)]
65struct SubscriberState {
66    worker_id: String,
67    cursor: Option<String>,
68    lag: u64,
69}
70
71/// Stored event in fjall format.
72#[derive(Debug, Clone, bitcode::Encode, bitcode::Decode)]
73struct StoredEvent {
74    id: String,
75    aggregator_id: String,
76    aggregator_type: String,
77    version: u16,
78    name: String,
79    routing_key: Option<String>,
80    data: Vec<u8>,
81    metadata: Metadata,
82    timestamp: u64,
83    timestamp_subsec: u32,
84}
85
86impl From<&Event> for StoredEvent {
87    fn from(event: &Event) -> Self {
88        Self {
89            id: event.id.to_string(),
90            aggregator_id: event.aggregator_id.clone(),
91            aggregator_type: event.aggregator_type.clone(),
92            version: event.version,
93            name: event.name.clone(),
94            routing_key: event.routing_key.clone(),
95            data: event.data.clone(),
96            metadata: event.metadata.clone(),
97            timestamp: event.timestamp,
98            timestamp_subsec: event.timestamp_subsec,
99        }
100    }
101}
102
103impl TryFrom<StoredEvent> for Event {
104    type Error = ulid::DecodeError;
105
106    fn try_from(stored: StoredEvent) -> Result<Self, Self::Error> {
107        Ok(Self {
108            id: Ulid::from_string(&stored.id)?,
109            aggregator_id: stored.aggregator_id,
110            aggregator_type: stored.aggregator_type,
111            version: stored.version,
112            name: stored.name,
113            routing_key: stored.routing_key,
114            data: stored.data,
115            metadata: stored.metadata,
116            timestamp: stored.timestamp,
117            timestamp_subsec: stored.timestamp_subsec,
118        })
119    }
120}
121
122/// Fjall-based event store executor.
123///
124/// Implements the [`Executor`] trait using fjall for embedded storage.
125/// Events are stored in an LSM-tree structure with secondary indexes
126/// for efficient querying by aggregate, routing key, and event type.
127///
128/// # Example
129///
130/// ```rust,ignore
131/// use evento_fjall::Fjall;
132///
133/// // Open with default options
134/// let executor = Fjall::open("./events.db")?;
135///
136/// // Or with custom configuration
137/// let keyspace = fjall::Config::new("./events.db")
138///     .max_write_buffer_size(64 * 1024 * 1024)
139///     .open()?;
140/// let executor = Fjall::from_keyspace(keyspace)?;
141/// ```
142pub struct Fjall {
143    keyspace: Keyspace,
144    events: Partition,
145    agg_index: Partition,
146    routing_index: Partition,
147    type_index: Partition,
148    subscribers: Partition,
149}
150
151impl Clone for Fjall {
152    fn clone(&self) -> Self {
153        Self {
154            keyspace: self.keyspace.clone(),
155            events: self.events.clone(),
156            agg_index: self.agg_index.clone(),
157            routing_index: self.routing_index.clone(),
158            type_index: self.type_index.clone(),
159            subscribers: self.subscribers.clone(),
160        }
161    }
162}
163
164impl Fjall {
165    /// Opens a new fjall database at the specified path.
166    ///
167    /// Creates the database directory if it doesn't exist.
168    ///
169    /// # Errors
170    ///
171    /// Returns an error if the database cannot be opened or partitions
172    /// cannot be created.
173    pub fn open(path: impl AsRef<Path>) -> anyhow::Result<Self> {
174        let keyspace = Config::new(path).open()?;
175        Self::from_keyspace(keyspace)
176    }
177
178    /// Creates an executor from an existing keyspace.
179    ///
180    /// Use this when you need custom keyspace configuration.
181    ///
182    /// # Example
183    ///
184    /// ```rust,ignore
185    /// let keyspace = fjall::Config::new("./events.db")
186    ///     .max_write_buffer_size(128 * 1024 * 1024)
187    ///     .open()?;
188    /// let executor = Fjall::from_keyspace(keyspace)?;
189    /// ```
190    pub fn from_keyspace(keyspace: Keyspace) -> anyhow::Result<Self> {
191        let opts = PartitionCreateOptions::default();
192
193        Ok(Self {
194            events: keyspace.open_partition("events", opts.clone())?,
195            agg_index: keyspace.open_partition("agg_index", opts.clone())?,
196            routing_index: keyspace.open_partition("routing_index", opts.clone())?,
197            type_index: keyspace.open_partition("type_index", opts.clone())?,
198            subscribers: keyspace.open_partition("subscribers", opts)?,
199            keyspace,
200        })
201    }
202
203    /// Returns a reference to the underlying keyspace.
204    pub fn keyspace(&self) -> &Keyspace {
205        &self.keyspace
206    }
207
208    /// Persists all pending writes to disk.
209    ///
210    /// By default, writes are persisted after each batch. Call this
211    /// if you need to ensure durability at a specific point.
212    pub fn persist(&self) -> anyhow::Result<()> {
213        self.keyspace.persist(PersistMode::SyncAll)?;
214        Ok(())
215    }
216
217    /// Builds the aggregate index key.
218    fn agg_key(aggregator_type: &str, aggregator_id: &str, version: u16) -> Vec<u8> {
219        let mut key = format!("{}\x00{}\x00", aggregator_type, aggregator_id).into_bytes();
220        key.extend_from_slice(&version.to_be_bytes());
221        key
222    }
223
224    /// Builds the aggregate index prefix (without version).
225    fn agg_prefix(aggregator_type: &str, aggregator_id: &str) -> String {
226        format!("{}\x00{}\x00", aggregator_type, aggregator_id)
227    }
228
229    /// Builds the type index key.
230    fn type_key(aggregator_type: &str, name: &str, id: &Ulid) -> Vec<u8> {
231        let mut key = format!("{}\x00{}\x00", aggregator_type, name).into_bytes();
232        key.extend_from_slice(&id.to_bytes());
233        key
234    }
235
236    /// Builds the type index prefix.
237    fn type_prefix(aggregator_type: &str, name: &str) -> String {
238        format!("{}\x00{}\x00", aggregator_type, name)
239    }
240
241    /// Builds the routing index key.
242    fn routing_key(routing_key: &str, id: &Ulid) -> Vec<u8> {
243        let mut key = format!("{}\x00", routing_key).into_bytes();
244        key.extend_from_slice(&id.to_bytes());
245        key
246    }
247
248    /// Builds the routing index prefix.
249    fn routing_prefix(routing_key: &str) -> String {
250        format!("{}\x00", routing_key)
251    }
252
253    /// Gets the last version for an aggregate.
254    fn get_last_version(
255        &self,
256        aggregator_type: &str,
257        aggregator_id: &str,
258    ) -> anyhow::Result<Option<u16>> {
259        let prefix = Self::agg_prefix(aggregator_type, aggregator_id);
260
261        if let Some(result) = self.agg_index.prefix(&prefix).next_back() {
262            let kv = result?;
263            // Version is the last 2 bytes of the key
264            let key_bytes = kv.0.as_ref();
265            if key_bytes.len() >= 2 {
266                let version_bytes: [u8; 2] = key_bytes[key_bytes.len() - 2..].try_into().unwrap();
267                return Ok(Some(u16::from_be_bytes(version_bytes)));
268            }
269        }
270
271        Ok(None)
272    }
273
274    /// Loads an event by its ULID.
275    fn load_event(&self, id: &Ulid) -> anyhow::Result<Option<Event>> {
276        match self.events.get(id.to_bytes())? {
277            Some(bytes) => {
278                let stored: StoredEvent = bitcode::decode(&bytes)
279                    .map_err(|e| anyhow::anyhow!("Failed to deserialize event: {}", e))?;
280                Ok(Some(stored.try_into()?))
281            }
282            None => Ok(None),
283        }
284    }
285
286    /// Collects event IDs matching the given filters.
287    fn collect_event_ids(
288        &self,
289        aggregators: &Option<Vec<ReadAggregator>>,
290        routing_key: &Option<RoutingKey>,
291    ) -> anyhow::Result<Vec<Ulid>> {
292        use std::collections::HashSet;
293        let mut event_ids_set = HashSet::new();
294        let mut event_ids = Vec::new();
295
296        // Helper macro to add unique event IDs
297        macro_rules! add_unique {
298            ($ulid:expr) => {
299                if event_ids_set.insert($ulid) {
300                    event_ids.push($ulid);
301                }
302            };
303        }
304
305        match (aggregators, routing_key) {
306            // Query by specific aggregator ID and optionally event name
307            (Some(aggs), _) => {
308                for agg in aggs {
309                    match (&agg.aggregator_id, &agg.name) {
310                        // Specific aggregate ID with event name filter
311                        (Some(id), Some(name)) => {
312                            let prefix = Self::agg_prefix(&agg.aggregator_type, id);
313                            for kv in self.agg_index.prefix(&prefix) {
314                                let kv = kv?;
315                                let ulid_bytes: [u8; 16] = kv.1.as_ref().try_into()?;
316                                let ulid = Ulid::from_bytes(ulid_bytes);
317
318                                // Check if event matches name filter
319                                if let Some(event) = self.load_event(&ulid)? {
320                                    if &event.name == name {
321                                        add_unique!(ulid);
322                                    }
323                                }
324                            }
325                        }
326                        // Specific aggregate ID, all events
327                        (Some(id), None) => {
328                            let prefix = Self::agg_prefix(&agg.aggregator_type, id);
329                            for kv in self.agg_index.prefix(&prefix) {
330                                let kv = kv?;
331                                let ulid_bytes: [u8; 16] = kv.1.as_ref().try_into()?;
332                                add_unique!(Ulid::from_bytes(ulid_bytes));
333                            }
334                        }
335                        // All aggregates of type, specific event name
336                        (None, Some(name)) => {
337                            let prefix = Self::type_prefix(&agg.aggregator_type, name);
338                            for kv in self.type_index.prefix(&prefix) {
339                                let kv = kv?;
340                                let key_bytes = kv.0.as_ref();
341                                if key_bytes.len() >= 16 {
342                                    let ulid_bytes: [u8; 16] =
343                                        key_bytes[key_bytes.len() - 16..].try_into()?;
344                                    add_unique!(Ulid::from_bytes(ulid_bytes));
345                                }
346                            }
347                        }
348                        // All events of aggregator type - scan all
349                        (None, None) => {
350                            let prefix = format!("{}\x00", agg.aggregator_type);
351                            for kv in self.agg_index.prefix(&prefix) {
352                                let kv = kv?;
353                                let ulid_bytes: [u8; 16] = kv.1.as_ref().try_into()?;
354                                add_unique!(Ulid::from_bytes(ulid_bytes));
355                            }
356                        }
357                    }
358                }
359            }
360            // Query by routing key only
361            (None, Some(RoutingKey::Value(Some(ref key)))) => {
362                let prefix = Self::routing_prefix(key);
363                for kv in self.routing_index.prefix(&prefix) {
364                    let kv = kv?;
365                    let key_bytes = kv.0.as_ref();
366                    if key_bytes.len() >= 16 {
367                        let ulid_bytes: [u8; 16] = key_bytes[key_bytes.len() - 16..].try_into()?;
368                        add_unique!(Ulid::from_bytes(ulid_bytes));
369                    }
370                }
371            }
372            // Query all events
373            _ => {
374                for kv in self.events.iter() {
375                    let kv = kv?;
376                    let ulid_bytes: [u8; 16] = kv.0.as_ref().try_into()?;
377                    add_unique!(Ulid::from_bytes(ulid_bytes));
378                }
379            }
380        }
381
382        Ok(event_ids)
383    }
384}
385
386#[async_trait::async_trait]
387impl Executor for Fjall {
388    async fn write(&self, events: Vec<Event>) -> Result<(), WriteError> {
389        let executor = self.clone();
390
391        tokio::task::spawn_blocking(move || {
392            // Validate versions first (optimistic concurrency)
393            for event in &events {
394                let last_version = executor
395                    .get_last_version(&event.aggregator_type, &event.aggregator_id)
396                    .map_err(WriteError::Unknown)?;
397
398                match last_version {
399                    Some(v) if event.version != v + 1 => {
400                        return Err(WriteError::InvalidOriginalVersion);
401                    }
402                    None if event.version != 1 => {
403                        return Err(WriteError::InvalidOriginalVersion);
404                    }
405                    _ => {}
406                }
407            }
408
409            // Write atomically using batch
410            let mut batch = executor.keyspace.batch();
411
412            for event in &events {
413                let id_bytes = event.id.to_bytes();
414                let stored = StoredEvent::from(event);
415                let event_bytes = bitcode::encode(&stored);
416
417                // Primary: ULID -> Event
418                batch.insert(&executor.events, id_bytes, event_bytes.as_slice());
419
420                // Aggregate index: {type}\0{id}\0{version} -> ULID
421                let agg_key =
422                    Fjall::agg_key(&event.aggregator_type, &event.aggregator_id, event.version);
423                batch.insert(&executor.agg_index, agg_key, id_bytes);
424
425                // Type index: {type}\0{name}\0{ULID} -> ()
426                let type_key = Fjall::type_key(&event.aggregator_type, &event.name, &event.id);
427                batch.insert(&executor.type_index, type_key, []);
428
429                // Routing index (if routing key exists): {routing}\0{ULID} -> ()
430                if let Some(ref routing_key) = event.routing_key {
431                    let routing_key = Fjall::routing_key(routing_key, &event.id);
432                    batch.insert(&executor.routing_index, routing_key, []);
433                }
434            }
435
436            batch.commit().map_err(|e| WriteError::Unknown(e.into()))?;
437            executor
438                .keyspace
439                .persist(PersistMode::SyncAll)
440                .map_err(|e| WriteError::Unknown(e.into()))?;
441
442            Ok(())
443        })
444        .await
445        .map_err(|e| WriteError::Unknown(e.into()))?
446    }
447
448    async fn read(
449        &self,
450        aggregators: Option<Vec<ReadAggregator>>,
451        routing_key: Option<RoutingKey>,
452        args: Args,
453    ) -> anyhow::Result<ReadResult<Event>> {
454        let executor = self.clone();
455
456        tokio::task::spawn_blocking(move || {
457            let is_backward = args.is_backward();
458            let (limit, cursor) = args.get_info();
459
460            // Collect matching event IDs
461            let mut event_ids = executor.collect_event_ids(&aggregators, &routing_key)?;
462
463            // Sort by ULID (time-ordered)
464            event_ids.sort();
465            if is_backward {
466                event_ids.reverse();
467            }
468
469            // Apply cursor filter
470            if let Some(ref cursor_value) = cursor {
471                let cursor_data = Event::deserialize_cursor(cursor_value)?;
472                let cursor_ulid = Ulid::from_string(&cursor_data.i)?;
473
474                event_ids.retain(|id| {
475                    if is_backward {
476                        *id < cursor_ulid
477                    } else {
478                        *id > cursor_ulid
479                    }
480                });
481            }
482
483            // Load events, filtering by routing key, until we have enough
484            let target_count = (limit + 1) as usize;
485            let mut events = Vec::new();
486
487            for id in event_ids {
488                if events.len() >= target_count {
489                    break;
490                }
491
492                if let Some(event) = executor.load_event(&id)? {
493                    // Apply routing key filter if specified
494                    let matches = match &routing_key {
495                        Some(RoutingKey::Value(Some(ref key))) => {
496                            event.routing_key.as_ref() == Some(key)
497                        }
498                        Some(RoutingKey::Value(None)) => event.routing_key.is_none(),
499                        Some(RoutingKey::All) | None => true,
500                    };
501
502                    if matches {
503                        events.push(event);
504                    }
505                }
506            }
507
508            // Build paginated result
509            evento_core::cursor::Reader::new(events)
510                .args(args)
511                .execute()
512                .map_err(|e| anyhow::anyhow!("{}", e))
513        })
514        .await?
515    }
516
517    async fn get_subscriber_cursor(&self, key: String) -> anyhow::Result<Option<Value>> {
518        let executor = self.clone();
519
520        tokio::task::spawn_blocking(move || match executor.subscribers.get(&key)? {
521            Some(bytes) => {
522                let state: SubscriberState = bitcode::decode(&bytes)
523                    .map_err(|e| anyhow::anyhow!("Failed to deserialize subscriber: {}", e))?;
524                Ok(state.cursor.map(Value))
525            }
526            None => Ok(None),
527        })
528        .await?
529    }
530
531    async fn is_subscriber_running(&self, key: String, worker_id: Ulid) -> anyhow::Result<bool> {
532        let executor = self.clone();
533
534        tokio::task::spawn_blocking(move || match executor.subscribers.get(&key)? {
535            Some(bytes) => {
536                let state: SubscriberState = bitcode::decode(&bytes)
537                    .map_err(|e| anyhow::anyhow!("Failed to deserialize subscriber: {}", e))?;
538                Ok(state.worker_id == worker_id.to_string())
539            }
540            None => Ok(false),
541        })
542        .await?
543    }
544
545    async fn upsert_subscriber(&self, key: String, worker_id: Ulid) -> anyhow::Result<()> {
546        let executor = self.clone();
547
548        tokio::task::spawn_blocking(move || {
549            // Try to preserve existing cursor if subscriber exists
550            let cursor = match executor.subscribers.get(&key)? {
551                Some(bytes) => {
552                    let state: SubscriberState = bitcode::decode(&bytes)
553                        .map_err(|e| anyhow::anyhow!("Failed to deserialize subscriber: {}", e))?;
554                    state.cursor
555                }
556                None => None,
557            };
558
559            let state = SubscriberState {
560                worker_id: worker_id.to_string(),
561                cursor,
562                lag: 0,
563            };
564
565            executor.subscribers.insert(&key, bitcode::encode(&state))?;
566            Ok(())
567        })
568        .await?
569    }
570
571    async fn acknowledge(&self, key: String, cursor: Value, lag: u64) -> anyhow::Result<()> {
572        let executor = self.clone();
573
574        tokio::task::spawn_blocking(move || {
575            let state = match executor.subscribers.get(&key)? {
576                Some(bytes) => {
577                    let mut state: SubscriberState = bitcode::decode(&bytes)
578                        .map_err(|e| anyhow::anyhow!("Failed to deserialize subscriber: {}", e))?;
579                    state.cursor = Some(cursor.0);
580                    state.lag = lag;
581                    state
582                }
583                None => anyhow::bail!("Subscriber not found: {}", key),
584            };
585
586            executor.subscribers.insert(&key, bitcode::encode(&state))?;
587            Ok(())
588        })
589        .await?
590    }
591
592    async fn get_snapshot(
593        &self,
594        _aggregator_type: String,
595        _aggregator_revision: String,
596        _id: String,
597    ) -> anyhow::Result<Option<(Vec<u8>, Value)>> {
598        todo!()
599    }
600
601    async fn save_snapshot(
602        &self,
603        _aggregator_type: String,
604        _aggregator_revision: String,
605        _id: String,
606        _data: Vec<u8>,
607        _cursor: Value,
608    ) -> anyhow::Result<()> {
609        todo!()
610    }
611}
612
613impl From<Keyspace> for Fjall {
614    fn from(keyspace: Keyspace) -> Self {
615        Self::from_keyspace(keyspace).expect("Failed to create Fjall from keyspace")
616    }
617}
618
619#[cfg(test)]
620mod tests {
621    use super::*;
622    use std::time::{SystemTime, UNIX_EPOCH};
623
624    fn create_test_event(aggregator_id: &str, version: u16, name: &str) -> Event {
625        let now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap();
626        Event {
627            id: Ulid::new(),
628            aggregator_id: aggregator_id.to_string(),
629            aggregator_type: "test/Account".to_string(),
630            version,
631            name: name.to_string(),
632            routing_key: Some("test-routing".to_string()),
633            data: vec![1, 2, 3],
634            metadata: Metadata::default(),
635            timestamp: now.as_secs(),
636            timestamp_subsec: now.subsec_millis(),
637        }
638    }
639
640    #[tokio::test]
641    async fn test_write_and_read_events() {
642        let temp_dir = tempfile::tempdir().unwrap();
643        let executor = Fjall::open(temp_dir.path()).unwrap();
644
645        let event1 = create_test_event("agg-1", 1, "Created");
646        let event2 = create_test_event("agg-1", 2, "Updated");
647
648        // Write events
649        executor.write(vec![event1.clone()]).await.unwrap();
650        executor.write(vec![event2.clone()]).await.unwrap();
651
652        // Read all events
653        let result = executor
654            .read(
655                Some(vec![ReadAggregator::id("test/Account", "agg-1")]),
656                None,
657                Args::forward(10, None),
658            )
659            .await
660            .unwrap();
661
662        assert_eq!(result.edges.len(), 2);
663        assert_eq!(result.edges[0].node.version, 1);
664        assert_eq!(result.edges[1].node.version, 2);
665    }
666
667    #[tokio::test]
668    async fn test_version_conflict() {
669        let temp_dir = tempfile::tempdir().unwrap();
670        let executor = Fjall::open(temp_dir.path()).unwrap();
671
672        let event1 = create_test_event("agg-1", 1, "Created");
673        executor.write(vec![event1]).await.unwrap();
674
675        // Try to write with wrong version
676        let event2 = create_test_event("agg-1", 1, "Duplicate");
677        let result = executor.write(vec![event2]).await;
678
679        assert!(matches!(result, Err(WriteError::InvalidOriginalVersion)));
680    }
681
682    #[tokio::test]
683    async fn test_subscriber_lifecycle() {
684        let temp_dir = tempfile::tempdir().unwrap();
685        let executor = Fjall::open(temp_dir.path()).unwrap();
686
687        let worker_id = Ulid::new();
688        let key = "test-subscriber".to_string();
689
690        // Create subscriber
691        executor
692            .upsert_subscriber(key.clone(), worker_id)
693            .await
694            .unwrap();
695
696        // Check if running
697        assert!(executor
698            .is_subscriber_running(key.clone(), worker_id)
699            .await
700            .unwrap());
701
702        // Check cursor is None initially
703        assert!(executor
704            .get_subscriber_cursor(key.clone())
705            .await
706            .unwrap()
707            .is_none());
708
709        // Acknowledge with cursor
710        executor
711            .acknowledge(key.clone(), Value("test-cursor".to_string()), 0)
712            .await
713            .unwrap();
714
715        // Check cursor is updated
716        let cursor = executor.get_subscriber_cursor(key).await.unwrap();
717        assert_eq!(cursor.unwrap().0, "test-cursor");
718    }
719}