Skip to main content

allsource_core/domain/entities/
event_stream.rs

1use crate::domain::entities::Event;
2use crate::domain::value_objects::{EntityId, PartitionKey};
3use crate::error::{AllSourceError, Result};
4use chrono::{DateTime, Utc};
5use serde::{Deserialize, Serialize};
6
7/// Event Stream aggregate enforcing gapless version numbers
8///
9/// Inspired by SierraDB's watermark pattern for consistent event sourcing.
10/// Ensures no gaps in version sequences, critical for proper event replay.
11///
12/// # SierraDB Pattern
13/// - Watermark tracks "highest continuously confirmed sequence"
14/// - Prevents gaps that would break event sourcing guarantees
15/// - Uses optimistic locking for concurrency control
16///
17/// # Invariants
18/// - Versions start at 1 and increment sequentially
19/// - No gaps allowed in version sequence
20/// - Watermark <= max version always
21/// - All versions below watermark are confirmed (gapless)
22#[derive(Debug, Clone, Serialize, Deserialize)]
23pub struct EventStream {
24    /// Stream identifier (usually entity ID)
25    stream_id: EntityId,
26
27    /// Partition key for distribution
28    partition_key: PartitionKey,
29
30    /// Current version (last event)
31    current_version: u64,
32
33    /// Watermark: highest continuously confirmed version
34    /// All versions <= watermark are guaranteed gapless
35    watermark: u64,
36
37    /// Events in this stream
38    events: Vec<Event>,
39
40    /// Expected version for optimistic locking
41    /// Used to detect concurrent modifications
42    expected_version: Option<u64>,
43
44    /// Stream metadata
45    created_at: DateTime<Utc>,
46    updated_at: DateTime<Utc>,
47}
48
49impl EventStream {
50    /// Create a new event stream
51    pub fn new(stream_id: EntityId) -> Self {
52        let partition_key = PartitionKey::from_entity_id(stream_id.as_str());
53        let now = Utc::now();
54
55        Self {
56            stream_id,
57            partition_key,
58            current_version: 0,
59            watermark: 0,
60            events: Vec::new(),
61            expected_version: None,
62            created_at: now,
63            updated_at: now,
64        }
65    }
66
67    /// Reconstruct an EventStream from persistent storage
68    ///
69    /// Used by repository implementations to restore streams from database.
70    /// Bypasses validation since data is already validated at creation time.
71    ///
72    /// # Arguments
73    /// - `stream_id`: Entity ID of the stream
74    /// - `partition_key`: Pre-computed partition assignment
75    /// - `current_version`: Latest version number
76    /// - `watermark`: Highest continuously confirmed version
77    /// - `events`: All events in the stream
78    /// - `expected_version`: Optional optimistic lock version
79    /// - `created_at`: Stream creation timestamp
80    /// - `updated_at`: Last modification timestamp
81    pub fn reconstruct(
82        stream_id: EntityId,
83        partition_key: PartitionKey,
84        current_version: u64,
85        watermark: u64,
86        events: Vec<Event>,
87        expected_version: Option<u64>,
88        created_at: DateTime<Utc>,
89        updated_at: DateTime<Utc>,
90    ) -> Result<Self> {
91        // Basic validation
92        if watermark > current_version {
93            return Err(AllSourceError::InvalidInput(format!(
94                "Watermark ({}) cannot exceed current version ({})",
95                watermark, current_version
96            )));
97        }
98
99        if events.len() as u64 != current_version {
100            return Err(AllSourceError::InvalidInput(format!(
101                "Event count ({}) must match current version ({})",
102                events.len(),
103                current_version
104            )));
105        }
106
107        Ok(Self {
108            stream_id,
109            partition_key,
110            current_version,
111            watermark,
112            events,
113            expected_version,
114            created_at,
115            updated_at,
116        })
117    }
118
119    /// Append an event with optimistic locking
120    ///
121    /// # SierraDB Pattern
122    /// - Checks expected_version matches current_version
123    /// - Prevents concurrent modification conflicts
124    /// - Ensures gapless version sequence
125    pub fn append_event(&mut self, event: Event) -> Result<u64> {
126        // Optimistic locking check
127        if let Some(expected) = self.expected_version {
128            if expected != self.current_version {
129                return Err(AllSourceError::ConcurrencyError(format!(
130                    "Version conflict: expected {}, got {}",
131                    expected, self.current_version
132                )));
133            }
134        }
135
136        // Increment version
137        self.current_version += 1;
138        let new_version = self.current_version;
139
140        // Store event
141        self.events.push(event);
142
143        // Advance watermark (all previous events confirmed)
144        self.watermark = new_version;
145
146        self.updated_at = Utc::now();
147
148        Ok(new_version)
149    }
150
151    /// Set expected version for next append (optimistic locking)
152    pub fn expect_version(&mut self, version: u64) {
153        self.expected_version = Some(version);
154    }
155
156    /// Clear expected version
157    pub fn clear_expected_version(&mut self) {
158        self.expected_version = None;
159    }
160
161    /// Get events from version (inclusive)
162    pub fn events_from(&self, from_version: u64) -> Vec<&Event> {
163        if from_version == 0 || from_version > self.current_version {
164            return Vec::new();
165        }
166
167        let start_idx = (from_version - 1) as usize;
168        self.events[start_idx..].iter().collect()
169    }
170
171    /// Check if stream has gapless versions up to watermark
172    pub fn is_gapless(&self) -> bool {
173        if self.watermark > self.current_version {
174            return false; // Watermark shouldn't exceed current version
175        }
176
177        // Check all versions up to watermark exist
178        for version in 1..=self.watermark {
179            let idx = (version - 1) as usize;
180            if idx >= self.events.len() {
181                return false;
182            }
183        }
184
185        true
186    }
187
188    // Getters
189    pub fn stream_id(&self) -> &EntityId {
190        &self.stream_id
191    }
192
193    pub fn partition_key(&self) -> &PartitionKey {
194        &self.partition_key
195    }
196
197    pub fn current_version(&self) -> u64 {
198        self.current_version
199    }
200
201    pub fn watermark(&self) -> u64 {
202        self.watermark
203    }
204
205    pub fn event_count(&self) -> usize {
206        self.events.len()
207    }
208
209    pub fn created_at(&self) -> DateTime<Utc> {
210        self.created_at
211    }
212
213    pub fn updated_at(&self) -> DateTime<Utc> {
214        self.updated_at
215    }
216
217    pub fn expected_version(&self) -> Option<u64> {
218        self.expected_version
219    }
220
221    // Tenant isolation methods
222
223    /// Get the tenant ID for this stream
224    ///
225    /// Returns the tenant_id from the first event, or None if the stream is empty.
226    /// All events in a stream should belong to the same tenant.
227    pub fn tenant_id(&self) -> Option<&crate::domain::value_objects::TenantId> {
228        self.events.first().map(|e| e.tenant_id())
229    }
230
231    /// Validate that all events in the stream belong to the same tenant
232    ///
233    /// Returns true if the stream is empty or all events have the same tenant_id.
234    /// This is a safety check to detect tenant isolation violations.
235    pub fn has_consistent_tenant(&self) -> bool {
236        if self.events.is_empty() {
237            return true;
238        }
239
240        let first_tenant = self.events[0].tenant_id();
241        self.events.iter().all(|e| e.tenant_id() == first_tenant)
242    }
243
244    /// Validate that an event belongs to this stream's tenant
245    ///
246    /// Returns an error if:
247    /// - The stream has events and the new event's tenant doesn't match
248    ///
249    /// This ensures tenant isolation at the stream level.
250    pub fn validate_event_tenant(&self, event: &Event) -> Result<()> {
251        if let Some(stream_tenant) = self.tenant_id() {
252            if event.tenant_id() != stream_tenant {
253                return Err(AllSourceError::ValidationError(format!(
254                    "Tenant mismatch: stream belongs to '{}', but event belongs to '{}'",
255                    stream_tenant.as_str(),
256                    event.tenant_id().as_str()
257                )));
258            }
259        }
260        Ok(())
261    }
262
263    /// Append an event with tenant validation
264    ///
265    /// Like `append_event`, but also validates tenant consistency.
266    /// Prevents cross-tenant event appends for security.
267    pub fn append_event_with_tenant_check(&mut self, event: Event) -> Result<u64> {
268        // Validate tenant consistency
269        self.validate_event_tenant(&event)?;
270
271        // Proceed with normal append
272        self.append_event(event)
273    }
274}
275
276#[cfg(test)]
277mod tests {
278    use super::*;
279    use serde_json::json;
280
281    fn create_test_event(entity_id: &str) -> Event {
282        Event::from_strings(
283            "test.event".to_string(),
284            entity_id.to_string(),
285            "default".to_string(),
286            json!({"data": "test"}),
287            None,
288        )
289        .unwrap()
290    }
291
292    #[test]
293    fn test_new_stream() {
294        let stream_id = EntityId::new("stream-1".to_string()).unwrap();
295        let stream = EventStream::new(stream_id.clone());
296
297        assert_eq!(stream.current_version(), 0);
298        assert_eq!(stream.watermark(), 0);
299        assert_eq!(stream.event_count(), 0);
300        assert!(stream.is_gapless());
301    }
302
303    #[test]
304    fn test_append_event() {
305        let stream_id = EntityId::new("stream-1".to_string()).unwrap();
306        let mut stream = EventStream::new(stream_id.clone());
307
308        let event = create_test_event("stream-1");
309        let version = stream.append_event(event).unwrap();
310
311        assert_eq!(version, 1);
312        assert_eq!(stream.current_version(), 1);
313        assert_eq!(stream.watermark(), 1);
314        assert_eq!(stream.event_count(), 1);
315        assert!(stream.is_gapless());
316    }
317
318    #[test]
319    fn test_multiple_appends() {
320        let stream_id = EntityId::new("stream-1".to_string()).unwrap();
321        let mut stream = EventStream::new(stream_id.clone());
322
323        for i in 1..=10 {
324            let event = create_test_event("stream-1");
325            let version = stream.append_event(event).unwrap();
326            assert_eq!(version, i);
327        }
328
329        assert_eq!(stream.current_version(), 10);
330        assert_eq!(stream.watermark(), 10);
331        assert_eq!(stream.event_count(), 10);
332        assert!(stream.is_gapless());
333    }
334
335    #[test]
336    fn test_optimistic_locking_success() {
337        let stream_id = EntityId::new("stream-1".to_string()).unwrap();
338        let mut stream = EventStream::new(stream_id);
339
340        // Set expected version
341        stream.expect_version(0);
342
343        let event = create_test_event("stream-1");
344        let result = stream.append_event(event);
345
346        assert!(result.is_ok());
347        assert_eq!(result.unwrap(), 1);
348    }
349
350    #[test]
351    fn test_optimistic_locking_failure() {
352        let stream_id = EntityId::new("stream-1".to_string()).unwrap();
353        let mut stream = EventStream::new(stream_id);
354
355        // Append first event
356        let event1 = create_test_event("stream-1");
357        stream.append_event(event1).unwrap();
358
359        // Set wrong expected version
360        stream.expect_version(0);
361
362        let event2 = create_test_event("stream-1");
363        let result = stream.append_event(event2);
364
365        assert!(result.is_err());
366        assert!(matches!(result, Err(AllSourceError::ConcurrencyError(_))));
367    }
368
369    #[test]
370    fn test_events_from() {
371        let stream_id = EntityId::new("stream-1".to_string()).unwrap();
372        let mut stream = EventStream::new(stream_id);
373
374        // Append 5 events
375        for _ in 0..5 {
376            let event = create_test_event("stream-1");
377            stream.append_event(event).unwrap();
378        }
379
380        let events = stream.events_from(3);
381        assert_eq!(events.len(), 3); // Events 3, 4, 5
382    }
383
384    #[test]
385    fn test_partition_assignment() {
386        let stream_id = EntityId::new("stream-1".to_string()).unwrap();
387        let stream = EventStream::new(stream_id);
388
389        let partition_key = stream.partition_key();
390        assert!(partition_key.partition_id() < PartitionKey::DEFAULT_PARTITION_COUNT);
391    }
392
393    #[test]
394    fn test_clear_expected_version() {
395        let stream_id = EntityId::new("stream-1".to_string()).unwrap();
396        let mut stream = EventStream::new(stream_id);
397
398        stream.expect_version(0);
399        stream.clear_expected_version();
400
401        // Should succeed without version check
402        let event = create_test_event("stream-1");
403        let result = stream.append_event(event);
404        assert!(result.is_ok());
405    }
406
407    #[test]
408    fn test_events_from_edge_cases() {
409        let stream_id = EntityId::new("stream-1".to_string()).unwrap();
410        let mut stream = EventStream::new(stream_id);
411
412        // Append 3 events
413        for _ in 0..3 {
414            let event = create_test_event("stream-1");
415            stream.append_event(event).unwrap();
416        }
417
418        // Test edge cases
419        assert_eq!(stream.events_from(0).len(), 0); // Invalid version 0
420        assert_eq!(stream.events_from(1).len(), 3); // From beginning
421        assert_eq!(stream.events_from(3).len(), 1); // Last event
422        assert_eq!(stream.events_from(4).len(), 0); // Beyond current
423    }
424}