Skip to main content

allsource_core/domain/entities/
event_stream.rs

1use crate::{
2    domain::{
3        entities::Event,
4        value_objects::{EntityId, PartitionKey},
5    },
6    error::{AllSourceError, Result},
7};
8use chrono::{DateTime, Utc};
9use serde::{Deserialize, Serialize};
10
11/// Event Stream aggregate enforcing gapless version numbers
12///
13/// Inspired by SierraDB's watermark pattern for consistent event sourcing.
14/// Ensures no gaps in version sequences, critical for proper event replay.
15///
16/// # SierraDB Pattern
17/// - Watermark tracks "highest continuously confirmed sequence"
18/// - Prevents gaps that would break event sourcing guarantees
19/// - Uses optimistic locking for concurrency control
20///
21/// # Invariants
22/// - Versions start at 1 and increment sequentially
23/// - No gaps allowed in version sequence
24/// - Watermark <= max version always
25/// - All versions below watermark are confirmed (gapless)
26#[derive(Debug, Clone, Serialize, Deserialize)]
27pub struct EventStream {
28    /// Stream identifier (usually entity ID)
29    stream_id: EntityId,
30
31    /// Partition key for distribution
32    partition_key: PartitionKey,
33
34    /// Current version (last event)
35    current_version: u64,
36
37    /// Watermark: highest continuously confirmed version
38    /// All versions <= watermark are guaranteed gapless
39    watermark: u64,
40
41    /// Events in this stream
42    events: Vec<Event>,
43
44    /// Expected version for optimistic locking
45    /// Used to detect concurrent modifications
46    expected_version: Option<u64>,
47
48    /// Stream metadata
49    created_at: DateTime<Utc>,
50    updated_at: DateTime<Utc>,
51}
52
53impl EventStream {
54    /// Create a new event stream
55    pub fn new(stream_id: EntityId) -> Self {
56        let partition_key = PartitionKey::from_entity_id(stream_id.as_str());
57        let now = Utc::now();
58
59        Self {
60            stream_id,
61            partition_key,
62            current_version: 0,
63            watermark: 0,
64            events: Vec::new(),
65            expected_version: None,
66            created_at: now,
67            updated_at: now,
68        }
69    }
70
71    /// Reconstruct an EventStream from persistent storage
72    ///
73    /// Used by repository implementations to restore streams from database.
74    /// Bypasses validation since data is already validated at creation time.
75    ///
76    /// # Arguments
77    /// - `stream_id`: Entity ID of the stream
78    /// - `partition_key`: Pre-computed partition assignment
79    /// - `current_version`: Latest version number
80    /// - `watermark`: Highest continuously confirmed version
81    /// - `events`: All events in the stream
82    /// - `expected_version`: Optional optimistic lock version
83    /// - `created_at`: Stream creation timestamp
84    /// - `updated_at`: Last modification timestamp
85    pub fn reconstruct(
86        stream_id: EntityId,
87        partition_key: PartitionKey,
88        current_version: u64,
89        watermark: u64,
90        events: Vec<Event>,
91        expected_version: Option<u64>,
92        created_at: DateTime<Utc>,
93        updated_at: DateTime<Utc>,
94    ) -> Result<Self> {
95        // Basic validation
96        if watermark > current_version {
97            return Err(AllSourceError::InvalidInput(format!(
98                "Watermark ({}) cannot exceed current version ({})",
99                watermark, current_version
100            )));
101        }
102
103        if events.len() as u64 != current_version {
104            return Err(AllSourceError::InvalidInput(format!(
105                "Event count ({}) must match current version ({})",
106                events.len(),
107                current_version
108            )));
109        }
110
111        Ok(Self {
112            stream_id,
113            partition_key,
114            current_version,
115            watermark,
116            events,
117            expected_version,
118            created_at,
119            updated_at,
120        })
121    }
122
123    /// Append an event with optimistic locking
124    ///
125    /// # SierraDB Pattern
126    /// - Checks expected_version matches current_version
127    /// - Prevents concurrent modification conflicts
128    /// - Ensures gapless version sequence
129    pub fn append_event(&mut self, event: Event) -> Result<u64> {
130        // Optimistic locking check
131        if let Some(expected) = self.expected_version
132            && expected != self.current_version
133        {
134            return Err(AllSourceError::ConcurrencyError(format!(
135                "Version conflict: expected {}, got {}",
136                expected, self.current_version
137            )));
138        }
139
140        // Increment version
141        self.current_version += 1;
142        let new_version = self.current_version;
143
144        // Store event
145        self.events.push(event);
146
147        // Advance watermark (all previous events confirmed)
148        self.watermark = new_version;
149
150        self.updated_at = Utc::now();
151
152        Ok(new_version)
153    }
154
155    /// Set expected version for next append (optimistic locking)
156    pub fn expect_version(&mut self, version: u64) {
157        self.expected_version = Some(version);
158    }
159
160    /// Clear expected version
161    pub fn clear_expected_version(&mut self) {
162        self.expected_version = None;
163    }
164
165    /// Get events from version (inclusive)
166    pub fn events_from(&self, from_version: u64) -> Vec<&Event> {
167        if from_version == 0 || from_version > self.current_version {
168            return Vec::new();
169        }
170
171        let start_idx = (from_version - 1) as usize;
172        self.events[start_idx..].iter().collect()
173    }
174
175    /// Check if stream has gapless versions up to watermark
176    pub fn is_gapless(&self) -> bool {
177        if self.watermark > self.current_version {
178            return false; // Watermark shouldn't exceed current version
179        }
180
181        // Check all versions up to watermark exist
182        for version in 1..=self.watermark {
183            let idx = (version - 1) as usize;
184            if idx >= self.events.len() {
185                return false;
186            }
187        }
188
189        true
190    }
191
192    // Getters
193    pub fn stream_id(&self) -> &EntityId {
194        &self.stream_id
195    }
196
197    pub fn partition_key(&self) -> &PartitionKey {
198        &self.partition_key
199    }
200
201    pub fn current_version(&self) -> u64 {
202        self.current_version
203    }
204
205    pub fn watermark(&self) -> u64 {
206        self.watermark
207    }
208
209    pub fn event_count(&self) -> usize {
210        self.events.len()
211    }
212
213    pub fn created_at(&self) -> DateTime<Utc> {
214        self.created_at
215    }
216
217    pub fn updated_at(&self) -> DateTime<Utc> {
218        self.updated_at
219    }
220
221    pub fn expected_version(&self) -> Option<u64> {
222        self.expected_version
223    }
224
225    // Tenant isolation methods
226
227    /// Get the tenant ID for this stream
228    ///
229    /// Returns the tenant_id from the first event, or None if the stream is empty.
230    /// All events in a stream should belong to the same tenant.
231    pub fn tenant_id(&self) -> Option<&crate::domain::value_objects::TenantId> {
232        self.events.first().map(|e| e.tenant_id())
233    }
234
235    /// Validate that all events in the stream belong to the same tenant
236    ///
237    /// Returns true if the stream is empty or all events have the same tenant_id.
238    /// This is a safety check to detect tenant isolation violations.
239    pub fn has_consistent_tenant(&self) -> bool {
240        if self.events.is_empty() {
241            return true;
242        }
243
244        let first_tenant = self.events[0].tenant_id();
245        self.events.iter().all(|e| e.tenant_id() == first_tenant)
246    }
247
248    /// Validate that an event belongs to this stream's tenant
249    ///
250    /// Returns an error if:
251    /// - The stream has events and the new event's tenant doesn't match
252    ///
253    /// This ensures tenant isolation at the stream level.
254    pub fn validate_event_tenant(&self, event: &Event) -> Result<()> {
255        if let Some(stream_tenant) = self.tenant_id()
256            && event.tenant_id() != stream_tenant
257        {
258            return Err(AllSourceError::ValidationError(format!(
259                "Tenant mismatch: stream belongs to '{}', but event belongs to '{}'",
260                stream_tenant.as_str(),
261                event.tenant_id().as_str()
262            )));
263        }
264        Ok(())
265    }
266
267    /// Append an event with tenant validation
268    ///
269    /// Like `append_event`, but also validates tenant consistency.
270    /// Prevents cross-tenant event appends for security.
271    pub fn append_event_with_tenant_check(&mut self, event: Event) -> Result<u64> {
272        // Validate tenant consistency
273        self.validate_event_tenant(&event)?;
274
275        // Proceed with normal append
276        self.append_event(event)
277    }
278}
279
280#[cfg(test)]
281mod tests {
282    use super::*;
283    use serde_json::json;
284
285    fn create_test_event(entity_id: &str) -> Event {
286        Event::from_strings(
287            "test.event".to_string(),
288            entity_id.to_string(),
289            "default".to_string(),
290            json!({"data": "test"}),
291            None,
292        )
293        .unwrap()
294    }
295
296    #[test]
297    fn test_new_stream() {
298        let stream_id = EntityId::new("stream-1".to_string()).unwrap();
299        let stream = EventStream::new(stream_id.clone());
300
301        assert_eq!(stream.current_version(), 0);
302        assert_eq!(stream.watermark(), 0);
303        assert_eq!(stream.event_count(), 0);
304        assert!(stream.is_gapless());
305    }
306
307    #[test]
308    fn test_append_event() {
309        let stream_id = EntityId::new("stream-1".to_string()).unwrap();
310        let mut stream = EventStream::new(stream_id.clone());
311
312        let event = create_test_event("stream-1");
313        let version = stream.append_event(event).unwrap();
314
315        assert_eq!(version, 1);
316        assert_eq!(stream.current_version(), 1);
317        assert_eq!(stream.watermark(), 1);
318        assert_eq!(stream.event_count(), 1);
319        assert!(stream.is_gapless());
320    }
321
322    #[test]
323    fn test_multiple_appends() {
324        let stream_id = EntityId::new("stream-1".to_string()).unwrap();
325        let mut stream = EventStream::new(stream_id.clone());
326
327        for i in 1..=10 {
328            let event = create_test_event("stream-1");
329            let version = stream.append_event(event).unwrap();
330            assert_eq!(version, i);
331        }
332
333        assert_eq!(stream.current_version(), 10);
334        assert_eq!(stream.watermark(), 10);
335        assert_eq!(stream.event_count(), 10);
336        assert!(stream.is_gapless());
337    }
338
339    #[test]
340    fn test_optimistic_locking_success() {
341        let stream_id = EntityId::new("stream-1".to_string()).unwrap();
342        let mut stream = EventStream::new(stream_id);
343
344        // Set expected version
345        stream.expect_version(0);
346
347        let event = create_test_event("stream-1");
348        let result = stream.append_event(event);
349
350        assert!(result.is_ok());
351        assert_eq!(result.unwrap(), 1);
352    }
353
354    #[test]
355    fn test_optimistic_locking_failure() {
356        let stream_id = EntityId::new("stream-1".to_string()).unwrap();
357        let mut stream = EventStream::new(stream_id);
358
359        // Append first event
360        let event1 = create_test_event("stream-1");
361        stream.append_event(event1).unwrap();
362
363        // Set wrong expected version
364        stream.expect_version(0);
365
366        let event2 = create_test_event("stream-1");
367        let result = stream.append_event(event2);
368
369        assert!(result.is_err());
370        assert!(matches!(result, Err(AllSourceError::ConcurrencyError(_))));
371    }
372
373    #[test]
374    fn test_events_from() {
375        let stream_id = EntityId::new("stream-1".to_string()).unwrap();
376        let mut stream = EventStream::new(stream_id);
377
378        // Append 5 events
379        for _ in 0..5 {
380            let event = create_test_event("stream-1");
381            stream.append_event(event).unwrap();
382        }
383
384        let events = stream.events_from(3);
385        assert_eq!(events.len(), 3); // Events 3, 4, 5
386    }
387
388    #[test]
389    fn test_partition_assignment() {
390        let stream_id = EntityId::new("stream-1".to_string()).unwrap();
391        let stream = EventStream::new(stream_id);
392
393        let partition_key = stream.partition_key();
394        assert!(partition_key.partition_id() < PartitionKey::DEFAULT_PARTITION_COUNT);
395    }
396
397    #[test]
398    fn test_clear_expected_version() {
399        let stream_id = EntityId::new("stream-1".to_string()).unwrap();
400        let mut stream = EventStream::new(stream_id);
401
402        stream.expect_version(0);
403        stream.clear_expected_version();
404
405        // Should succeed without version check
406        let event = create_test_event("stream-1");
407        let result = stream.append_event(event);
408        assert!(result.is_ok());
409    }
410
411    #[test]
412    fn test_events_from_edge_cases() {
413        let stream_id = EntityId::new("stream-1".to_string()).unwrap();
414        let mut stream = EventStream::new(stream_id);
415
416        // Append 3 events
417        for _ in 0..3 {
418            let event = create_test_event("stream-1");
419            stream.append_event(event).unwrap();
420        }
421
422        // Test edge cases
423        assert_eq!(stream.events_from(0).len(), 0); // Invalid version 0
424        assert_eq!(stream.events_from(1).len(), 3); // From beginning
425        assert_eq!(stream.events_from(3).len(), 1); // Last event
426        assert_eq!(stream.events_from(4).len(), 0); // Beyond current
427    }
428}