Skip to main content

eventfold_es/
storage.rs

1//! Event storage trait and built-in backends.
2
3use std::fs::{self, OpenOptions};
4use std::io::{BufRead, BufReader, Write};
5use std::path::{Path, PathBuf};
6use std::time::SystemTime;
7
8/// Manages the on-disk directory layout for aggregate event streams.
9///
10/// The layout follows this structure:
11/// ```text
12/// <base_dir>/
13///     streams/
14///         <aggregate_type>/
15///             <instance_id>/      -- standard eventfold log directory
16///                 app.jsonl
17///                 views/
18///     projections/
19///         <projection_name>/
20///     meta/
21///         streams.jsonl           -- stream registry
22/// ```
23///
24/// `StreamLayout` is cheap to clone (it wraps a single `PathBuf`) and provides
25/// path helpers plus stream lifecycle management (creation and listing).
26#[derive(Debug, Clone)]
27pub struct StreamLayout {
28    base_dir: PathBuf,
29}
30
31impl StreamLayout {
32    /// Create a new `StreamLayout` rooted at the given base directory.
33    ///
34    /// # Arguments
35    ///
36    /// * `base_dir` - Root directory for all event store data.
37    ///   The directory does not need to exist yet; it will be created
38    ///   lazily when [`ensure_stream`](StreamLayout::ensure_stream) is called.
39    pub fn new(base_dir: impl Into<PathBuf>) -> Self {
40        Self {
41            base_dir: base_dir.into(),
42        }
43    }
44
45    /// Returns the root directory of this layout.
46    pub fn base_dir(&self) -> &Path {
47        &self.base_dir
48    }
49
50    /// Returns the path to a specific aggregate instance's stream directory.
51    ///
52    /// # Arguments
53    ///
54    /// * `aggregate_type` - The aggregate type name (e.g. `"order"`).
55    /// * `instance_id` - The unique instance identifier within that type.
56    ///
57    /// # Returns
58    ///
59    /// `<base_dir>/streams/<aggregate_type>/<instance_id>`
60    pub fn stream_dir(&self, aggregate_type: &str, instance_id: &str) -> PathBuf {
61        self.base_dir
62            .join("streams")
63            .join(aggregate_type)
64            .join(instance_id)
65    }
66
67    /// Returns the path to a specific aggregate instance's views directory.
68    ///
69    /// # Arguments
70    ///
71    /// * `aggregate_type` - The aggregate type name (e.g. `"order"`).
72    /// * `instance_id` - The unique instance identifier within that type.
73    ///
74    /// # Returns
75    ///
76    /// `<base_dir>/streams/<aggregate_type>/<instance_id>/views`
77    pub fn views_dir(&self, aggregate_type: &str, instance_id: &str) -> PathBuf {
78        self.stream_dir(aggregate_type, instance_id).join("views")
79    }
80
81    /// Returns the path to the projections directory.
82    ///
83    /// # Returns
84    ///
85    /// `<base_dir>/projections`
86    pub fn projections_dir(&self) -> PathBuf {
87        self.base_dir.join("projections")
88    }
89
90    /// Returns the path to the process managers directory.
91    ///
92    /// # Returns
93    ///
94    /// `<base_dir>/process_managers`
95    pub fn process_managers_dir(&self) -> PathBuf {
96        self.base_dir.join("process_managers")
97    }
98
99    /// Returns the path to the metadata directory.
100    ///
101    /// # Returns
102    ///
103    /// `<base_dir>/meta`
104    pub fn meta_dir(&self) -> PathBuf {
105        self.base_dir.join("meta")
106    }
107
108    /// Ensures that the stream directory and registry entry exist for the
109    /// given aggregate type and instance ID.
110    ///
111    /// This method is **idempotent**: calling it multiple times with the same
112    /// arguments will not create duplicate directory trees or registry entries.
113    ///
114    /// # Arguments
115    ///
116    /// * `aggregate_type` - The aggregate type name (e.g. `"order"`).
117    /// * `instance_id` - The unique instance identifier within that type.
118    ///
119    /// # Returns
120    ///
121    /// The stream directory path on success.
122    ///
123    /// # Errors
124    ///
125    /// Returns `std::io::Error` if directory creation or file I/O fails.
126    pub fn ensure_stream(
127        &self,
128        aggregate_type: &str,
129        instance_id: &str,
130    ) -> std::io::Result<PathBuf> {
131        let dir = self.stream_dir(aggregate_type, instance_id);
132        fs::create_dir_all(&dir)?;
133
134        let meta = self.meta_dir();
135        fs::create_dir_all(&meta)?;
136
137        let registry_path = meta.join("streams.jsonl");
138
139        // Check whether an entry already exists for this (type, id) pair.
140        // If the registry file doesn't exist yet, we skip straight to appending.
141        let already_registered = registry_path
142            .exists()
143            .then(|| -> std::io::Result<bool> {
144                let file = fs::File::open(&registry_path)?;
145                let reader = BufReader::new(file);
146                for line in reader.lines() {
147                    let line = line?;
148                    if line.is_empty() {
149                        continue;
150                    }
151                    // Parse the JSON to compare fields structurally rather
152                    // than relying on string matching, which would be fragile
153                    // if field ordering ever changed.
154                    if let Ok(entry) = serde_json::from_str::<serde_json::Value>(&line)
155                        && entry.get("type").and_then(|v| v.as_str()) == Some(aggregate_type)
156                        && entry.get("id").and_then(|v| v.as_str()) == Some(instance_id)
157                    {
158                        return Ok(true);
159                    }
160                }
161                Ok(false)
162            })
163            .transpose()?
164            .unwrap_or(false);
165
166        if !already_registered {
167            let ts = SystemTime::UNIX_EPOCH
168                .elapsed()
169                .expect("system clock is before Unix epoch")
170                .as_secs();
171
172            let entry = serde_json::json!({
173                "type": aggregate_type,
174                "id": instance_id,
175                "ts": ts,
176            });
177
178            let mut file = OpenOptions::new()
179                .create(true)
180                .append(true)
181                .open(&registry_path)?;
182
183            // Each entry is a single line of JSON followed by a newline.
184            writeln!(file, "{entry}")?;
185        }
186
187        Ok(dir)
188    }
189
190    /// Lists all aggregate type names that have at least one stream directory.
191    ///
192    /// Reads the `<base_dir>/streams/` directory and returns the name of each
193    /// subdirectory, sorted lexicographically.
194    ///
195    /// # Returns
196    ///
197    /// A sorted `Vec<String>` of aggregate type names. Returns an empty vector
198    /// if the `streams/` directory does not exist or is empty.
199    ///
200    /// # Errors
201    ///
202    /// Returns `std::io::Error` if reading the directory fails for a reason
203    /// other than the directory not existing.
204    pub(crate) fn list_aggregate_types(&self) -> std::io::Result<Vec<String>> {
205        let streams_dir = self.base_dir.join("streams");
206
207        let entries = match fs::read_dir(&streams_dir) {
208            Ok(entries) => entries,
209            Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(Vec::new()),
210            Err(e) => return Err(e),
211        };
212
213        let mut types: Vec<String> = entries
214            .filter_map(|entry| {
215                let entry = entry.ok()?;
216                entry
217                    .file_type()
218                    .ok()?
219                    .is_dir()
220                    .then(|| entry.file_name().to_string_lossy().into_owned())
221            })
222            .collect();
223
224        types.sort();
225        Ok(types)
226    }
227
228    /// Lists all instance IDs for the given aggregate type.
229    ///
230    /// # Arguments
231    ///
232    /// * `aggregate_type` - The aggregate type name to list instances for.
233    ///
234    /// # Returns
235    ///
236    /// A sorted `Vec<String>` of instance IDs. Returns an empty vector
237    /// if the aggregate type directory does not exist.
238    ///
239    /// # Errors
240    ///
241    /// Returns `std::io::Error` if reading the directory fails for a reason
242    /// other than the directory not existing.
243    pub fn list_streams(&self, aggregate_type: &str) -> std::io::Result<Vec<String>> {
244        let type_dir = self.base_dir.join("streams").join(aggregate_type);
245
246        let entries = match fs::read_dir(&type_dir) {
247            Ok(entries) => entries,
248            Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(Vec::new()),
249            Err(e) => return Err(e),
250        };
251
252        let mut ids: Vec<String> = entries
253            .filter_map(|entry| {
254                let entry = entry.ok()?;
255                // Only include directories (each directory is a stream instance).
256                entry
257                    .file_type()
258                    .ok()?
259                    .is_dir()
260                    .then(|| entry.file_name().to_string_lossy().into_owned())
261            })
262            .collect();
263
264        ids.sort();
265        Ok(ids)
266    }
267}
268
269#[cfg(test)]
270mod tests {
271    use super::*;
272    use tempfile::TempDir;
273
274    #[test]
275    fn path_helpers_correct() {
276        let tmp = TempDir::new().expect("failed to create temp dir");
277        let layout = StreamLayout::new(tmp.path());
278
279        assert_eq!(layout.base_dir(), tmp.path());
280
281        assert_eq!(
282            layout.stream_dir("order", "abc-123"),
283            tmp.path().join("streams/order/abc-123")
284        );
285
286        assert_eq!(
287            layout.views_dir("order", "abc-123"),
288            tmp.path().join("streams/order/abc-123/views")
289        );
290
291        assert_eq!(layout.projections_dir(), tmp.path().join("projections"));
292
293        assert_eq!(
294            layout.process_managers_dir(),
295            tmp.path().join("process_managers")
296        );
297
298        assert_eq!(layout.meta_dir(), tmp.path().join("meta"));
299    }
300
301    #[test]
302    fn ensure_stream_creates_dirs() {
303        let tmp = TempDir::new().expect("failed to create temp dir");
304        let layout = StreamLayout::new(tmp.path());
305
306        let dir = layout
307            .ensure_stream("order", "abc-123")
308            .expect("ensure_stream should succeed");
309
310        assert!(dir.is_dir(), "stream directory should exist on disk");
311        assert_eq!(dir, tmp.path().join("streams/order/abc-123"));
312
313        let registry = tmp.path().join("meta/streams.jsonl");
314        assert!(registry.is_file(), "registry file should exist");
315    }
316
317    #[test]
318    fn ensure_stream_idempotent() {
319        let tmp = TempDir::new().expect("failed to create temp dir");
320        let layout = StreamLayout::new(tmp.path());
321
322        layout
323            .ensure_stream("order", "abc-123")
324            .expect("first ensure_stream should succeed");
325        layout
326            .ensure_stream("order", "abc-123")
327            .expect("second ensure_stream should succeed");
328
329        let registry = tmp.path().join("meta/streams.jsonl");
330        let contents = fs::read_to_string(&registry).expect("failed to read registry");
331
332        let matching_entries: Vec<&str> = contents
333            .lines()
334            .filter(|line| {
335                let v: serde_json::Value =
336                    serde_json::from_str(line).expect("line should be valid JSON");
337                v.get("type").and_then(|t| t.as_str()) == Some("order")
338                    && v.get("id").and_then(|i| i.as_str()) == Some("abc-123")
339            })
340            .collect();
341
342        assert_eq!(
343            matching_entries.len(),
344            1,
345            "registry should contain exactly one entry for (order, abc-123)"
346        );
347    }
348
349    #[test]
350    fn list_streams_empty_for_unknown_type() {
351        let tmp = TempDir::new().expect("failed to create temp dir");
352        let layout = StreamLayout::new(tmp.path());
353
354        let streams = layout
355            .list_streams("nonexistent")
356            .expect("list_streams should succeed for unknown type");
357
358        assert!(streams.is_empty());
359    }
360
361    #[test]
362    fn list_streams_after_create() {
363        let tmp = TempDir::new().expect("failed to create temp dir");
364        let layout = StreamLayout::new(tmp.path());
365
366        // Create streams in non-sorted order to verify sorting.
367        layout
368            .ensure_stream("order", "charlie")
369            .expect("ensure_stream should succeed");
370        layout
371            .ensure_stream("order", "alpha")
372            .expect("ensure_stream should succeed");
373        layout
374            .ensure_stream("order", "bravo")
375            .expect("ensure_stream should succeed");
376
377        let streams = layout
378            .list_streams("order")
379            .expect("list_streams should succeed");
380
381        assert_eq!(streams, vec!["alpha", "bravo", "charlie"]);
382    }
383
384    #[test]
385    fn list_aggregate_types_returns_sorted_types() {
386        let tmp = TempDir::new().expect("failed to create temp dir");
387        let layout = StreamLayout::new(tmp.path());
388
389        // Create streams for two aggregate types in non-sorted order.
390        layout
391            .ensure_stream("toggle", "t-1")
392            .expect("ensure_stream should succeed");
393        layout
394            .ensure_stream("counter", "c-1")
395            .expect("ensure_stream should succeed");
396
397        let types = layout
398            .list_aggregate_types()
399            .expect("list_aggregate_types should succeed");
400
401        assert_eq!(types, vec!["counter", "toggle"]);
402    }
403
404    #[test]
405    fn list_aggregate_types_empty_when_no_streams_dir() {
406        let tmp = TempDir::new().expect("failed to create temp dir");
407        let layout = StreamLayout::new(tmp.path());
408
409        // No streams directory created -- should return empty vec.
410        let types = layout
411            .list_aggregate_types()
412            .expect("list_aggregate_types should succeed");
413
414        assert!(types.is_empty());
415    }
416
417    #[test]
418    fn registry_entries_valid_json() {
419        let tmp = TempDir::new().expect("failed to create temp dir");
420        let layout = StreamLayout::new(tmp.path());
421
422        layout
423            .ensure_stream("order", "abc-123")
424            .expect("ensure_stream should succeed");
425        layout
426            .ensure_stream("cart", "xyz-789")
427            .expect("ensure_stream should succeed");
428
429        let registry = tmp.path().join("meta/streams.jsonl");
430        let contents = fs::read_to_string(&registry).expect("failed to read registry");
431
432        for (i, line) in contents.lines().enumerate() {
433            let entry: serde_json::Value = serde_json::from_str(line)
434                .unwrap_or_else(|e| panic!("line {i} is not valid JSON: {e}"));
435
436            assert!(
437                entry.get("type").and_then(|v| v.as_str()).is_some(),
438                "line {i} should have a string 'type' field"
439            );
440            assert!(
441                entry.get("id").and_then(|v| v.as_str()).is_some(),
442                "line {i} should have a string 'id' field"
443            );
444            assert!(
445                entry.get("ts").and_then(|v| v.as_u64()).is_some(),
446                "line {i} should have a numeric 'ts' field"
447            );
448        }
449    }
450}