1use std::fs::{self, OpenOptions};
4use std::io::{BufRead, BufReader, Write};
5use std::path::{Path, PathBuf};
6use std::time::SystemTime;
7
8#[derive(Debug, Clone)]
27pub struct StreamLayout {
28 base_dir: PathBuf,
29}
30
31impl StreamLayout {
32 pub fn new(base_dir: impl Into<PathBuf>) -> Self {
40 Self {
41 base_dir: base_dir.into(),
42 }
43 }
44
45 pub fn base_dir(&self) -> &Path {
47 &self.base_dir
48 }
49
50 pub fn stream_dir(&self, aggregate_type: &str, instance_id: &str) -> PathBuf {
61 self.base_dir
62 .join("streams")
63 .join(aggregate_type)
64 .join(instance_id)
65 }
66
67 pub fn views_dir(&self, aggregate_type: &str, instance_id: &str) -> PathBuf {
78 self.stream_dir(aggregate_type, instance_id).join("views")
79 }
80
81 pub fn projections_dir(&self) -> PathBuf {
87 self.base_dir.join("projections")
88 }
89
90 pub fn process_managers_dir(&self) -> PathBuf {
96 self.base_dir.join("process_managers")
97 }
98
99 pub fn meta_dir(&self) -> PathBuf {
105 self.base_dir.join("meta")
106 }
107
108 pub fn ensure_stream(
127 &self,
128 aggregate_type: &str,
129 instance_id: &str,
130 ) -> std::io::Result<PathBuf> {
131 let dir = self.stream_dir(aggregate_type, instance_id);
132 fs::create_dir_all(&dir)?;
133
134 let meta = self.meta_dir();
135 fs::create_dir_all(&meta)?;
136
137 let registry_path = meta.join("streams.jsonl");
138
139 let already_registered = registry_path
142 .exists()
143 .then(|| -> std::io::Result<bool> {
144 let file = fs::File::open(®istry_path)?;
145 let reader = BufReader::new(file);
146 for line in reader.lines() {
147 let line = line?;
148 if line.is_empty() {
149 continue;
150 }
151 if let Ok(entry) = serde_json::from_str::<serde_json::Value>(&line)
155 && entry.get("type").and_then(|v| v.as_str()) == Some(aggregate_type)
156 && entry.get("id").and_then(|v| v.as_str()) == Some(instance_id)
157 {
158 return Ok(true);
159 }
160 }
161 Ok(false)
162 })
163 .transpose()?
164 .unwrap_or(false);
165
166 if !already_registered {
167 let ts = SystemTime::UNIX_EPOCH
168 .elapsed()
169 .expect("system clock is before Unix epoch")
170 .as_secs();
171
172 let entry = serde_json::json!({
173 "type": aggregate_type,
174 "id": instance_id,
175 "ts": ts,
176 });
177
178 let mut file = OpenOptions::new()
179 .create(true)
180 .append(true)
181 .open(®istry_path)?;
182
183 writeln!(file, "{entry}")?;
185 }
186
187 Ok(dir)
188 }
189
190 pub(crate) fn list_aggregate_types(&self) -> std::io::Result<Vec<String>> {
205 let streams_dir = self.base_dir.join("streams");
206
207 let entries = match fs::read_dir(&streams_dir) {
208 Ok(entries) => entries,
209 Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(Vec::new()),
210 Err(e) => return Err(e),
211 };
212
213 let mut types: Vec<String> = entries
214 .filter_map(|entry| {
215 let entry = entry.ok()?;
216 entry
217 .file_type()
218 .ok()?
219 .is_dir()
220 .then(|| entry.file_name().to_string_lossy().into_owned())
221 })
222 .collect();
223
224 types.sort();
225 Ok(types)
226 }
227
228 pub fn list_streams(&self, aggregate_type: &str) -> std::io::Result<Vec<String>> {
244 let type_dir = self.base_dir.join("streams").join(aggregate_type);
245
246 let entries = match fs::read_dir(&type_dir) {
247 Ok(entries) => entries,
248 Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(Vec::new()),
249 Err(e) => return Err(e),
250 };
251
252 let mut ids: Vec<String> = entries
253 .filter_map(|entry| {
254 let entry = entry.ok()?;
255 entry
257 .file_type()
258 .ok()?
259 .is_dir()
260 .then(|| entry.file_name().to_string_lossy().into_owned())
261 })
262 .collect();
263
264 ids.sort();
265 Ok(ids)
266 }
267}
268
269#[cfg(test)]
270mod tests {
271 use super::*;
272 use tempfile::TempDir;
273
274 #[test]
275 fn path_helpers_correct() {
276 let tmp = TempDir::new().expect("failed to create temp dir");
277 let layout = StreamLayout::new(tmp.path());
278
279 assert_eq!(layout.base_dir(), tmp.path());
280
281 assert_eq!(
282 layout.stream_dir("order", "abc-123"),
283 tmp.path().join("streams/order/abc-123")
284 );
285
286 assert_eq!(
287 layout.views_dir("order", "abc-123"),
288 tmp.path().join("streams/order/abc-123/views")
289 );
290
291 assert_eq!(layout.projections_dir(), tmp.path().join("projections"));
292
293 assert_eq!(
294 layout.process_managers_dir(),
295 tmp.path().join("process_managers")
296 );
297
298 assert_eq!(layout.meta_dir(), tmp.path().join("meta"));
299 }
300
301 #[test]
302 fn ensure_stream_creates_dirs() {
303 let tmp = TempDir::new().expect("failed to create temp dir");
304 let layout = StreamLayout::new(tmp.path());
305
306 let dir = layout
307 .ensure_stream("order", "abc-123")
308 .expect("ensure_stream should succeed");
309
310 assert!(dir.is_dir(), "stream directory should exist on disk");
311 assert_eq!(dir, tmp.path().join("streams/order/abc-123"));
312
313 let registry = tmp.path().join("meta/streams.jsonl");
314 assert!(registry.is_file(), "registry file should exist");
315 }
316
317 #[test]
318 fn ensure_stream_idempotent() {
319 let tmp = TempDir::new().expect("failed to create temp dir");
320 let layout = StreamLayout::new(tmp.path());
321
322 layout
323 .ensure_stream("order", "abc-123")
324 .expect("first ensure_stream should succeed");
325 layout
326 .ensure_stream("order", "abc-123")
327 .expect("second ensure_stream should succeed");
328
329 let registry = tmp.path().join("meta/streams.jsonl");
330 let contents = fs::read_to_string(®istry).expect("failed to read registry");
331
332 let matching_entries: Vec<&str> = contents
333 .lines()
334 .filter(|line| {
335 let v: serde_json::Value =
336 serde_json::from_str(line).expect("line should be valid JSON");
337 v.get("type").and_then(|t| t.as_str()) == Some("order")
338 && v.get("id").and_then(|i| i.as_str()) == Some("abc-123")
339 })
340 .collect();
341
342 assert_eq!(
343 matching_entries.len(),
344 1,
345 "registry should contain exactly one entry for (order, abc-123)"
346 );
347 }
348
349 #[test]
350 fn list_streams_empty_for_unknown_type() {
351 let tmp = TempDir::new().expect("failed to create temp dir");
352 let layout = StreamLayout::new(tmp.path());
353
354 let streams = layout
355 .list_streams("nonexistent")
356 .expect("list_streams should succeed for unknown type");
357
358 assert!(streams.is_empty());
359 }
360
361 #[test]
362 fn list_streams_after_create() {
363 let tmp = TempDir::new().expect("failed to create temp dir");
364 let layout = StreamLayout::new(tmp.path());
365
366 layout
368 .ensure_stream("order", "charlie")
369 .expect("ensure_stream should succeed");
370 layout
371 .ensure_stream("order", "alpha")
372 .expect("ensure_stream should succeed");
373 layout
374 .ensure_stream("order", "bravo")
375 .expect("ensure_stream should succeed");
376
377 let streams = layout
378 .list_streams("order")
379 .expect("list_streams should succeed");
380
381 assert_eq!(streams, vec!["alpha", "bravo", "charlie"]);
382 }
383
384 #[test]
385 fn list_aggregate_types_returns_sorted_types() {
386 let tmp = TempDir::new().expect("failed to create temp dir");
387 let layout = StreamLayout::new(tmp.path());
388
389 layout
391 .ensure_stream("toggle", "t-1")
392 .expect("ensure_stream should succeed");
393 layout
394 .ensure_stream("counter", "c-1")
395 .expect("ensure_stream should succeed");
396
397 let types = layout
398 .list_aggregate_types()
399 .expect("list_aggregate_types should succeed");
400
401 assert_eq!(types, vec!["counter", "toggle"]);
402 }
403
404 #[test]
405 fn list_aggregate_types_empty_when_no_streams_dir() {
406 let tmp = TempDir::new().expect("failed to create temp dir");
407 let layout = StreamLayout::new(tmp.path());
408
409 let types = layout
411 .list_aggregate_types()
412 .expect("list_aggregate_types should succeed");
413
414 assert!(types.is_empty());
415 }
416
417 #[test]
418 fn registry_entries_valid_json() {
419 let tmp = TempDir::new().expect("failed to create temp dir");
420 let layout = StreamLayout::new(tmp.path());
421
422 layout
423 .ensure_stream("order", "abc-123")
424 .expect("ensure_stream should succeed");
425 layout
426 .ensure_stream("cart", "xyz-789")
427 .expect("ensure_stream should succeed");
428
429 let registry = tmp.path().join("meta/streams.jsonl");
430 let contents = fs::read_to_string(®istry).expect("failed to read registry");
431
432 for (i, line) in contents.lines().enumerate() {
433 let entry: serde_json::Value = serde_json::from_str(line)
434 .unwrap_or_else(|e| panic!("line {i} is not valid JSON: {e}"));
435
436 assert!(
437 entry.get("type").and_then(|v| v.as_str()).is_some(),
438 "line {i} should have a string 'type' field"
439 );
440 assert!(
441 entry.get("id").and_then(|v| v.as_str()).is_some(),
442 "line {i} should have a string 'id' field"
443 );
444 assert!(
445 entry.get("ts").and_then(|v| v.as_u64()).is_some(),
446 "line {i} should have a numeric 'ts' field"
447 );
448 }
449 }
450}