Skip to main content

allsource_core/domain/entities/
event_stream.rs

1use crate::{
2    domain::{
3        entities::Event,
4        value_objects::{EntityId, PartitionKey},
5    },
6    error::{AllSourceError, Result},
7};
8use chrono::{DateTime, Utc};
9use serde::{Deserialize, Serialize};
10
11/// Event Stream aggregate enforcing gapless version numbers
12///
13/// Inspired by SierraDB's watermark pattern for consistent event sourcing.
14/// Ensures no gaps in version sequences, critical for proper event replay.
15///
16/// # SierraDB Pattern
17/// - Watermark tracks "highest continuously confirmed sequence"
18/// - Prevents gaps that would break event sourcing guarantees
19/// - Uses optimistic locking for concurrency control
20///
21/// # Invariants
22/// - Versions start at 1 and increment sequentially
23/// - No gaps allowed in version sequence
24/// - Watermark <= max version always
25/// - All versions below watermark are confirmed (gapless)
26#[derive(Debug, Clone, Serialize, Deserialize)]
27pub struct EventStream {
28    /// Stream identifier (usually entity ID)
29    stream_id: EntityId,
30
31    /// Partition key for distribution
32    partition_key: PartitionKey,
33
34    /// Current version (last event)
35    current_version: u64,
36
37    /// Watermark: highest continuously confirmed version
38    /// All versions <= watermark are guaranteed gapless
39    watermark: u64,
40
41    /// Events in this stream
42    events: Vec<Event>,
43
44    /// Expected version for optimistic locking
45    /// Used to detect concurrent modifications
46    expected_version: Option<u64>,
47
48    /// Stream metadata
49    created_at: DateTime<Utc>,
50    updated_at: DateTime<Utc>,
51}
52
53impl EventStream {
54    /// Create a new event stream
55    pub fn new(stream_id: EntityId) -> Self {
56        let partition_key = PartitionKey::from_entity_id(stream_id.as_str());
57        let now = Utc::now();
58
59        Self {
60            stream_id,
61            partition_key,
62            current_version: 0,
63            watermark: 0,
64            events: Vec::new(),
65            expected_version: None,
66            created_at: now,
67            updated_at: now,
68        }
69    }
70
71    /// Reconstruct an EventStream from persistent storage
72    ///
73    /// Used by repository implementations to restore streams from database.
74    /// Bypasses validation since data is already validated at creation time.
75    ///
76    /// # Arguments
77    /// - `stream_id`: Entity ID of the stream
78    /// - `partition_key`: Pre-computed partition assignment
79    /// - `current_version`: Latest version number
80    /// - `watermark`: Highest continuously confirmed version
81    /// - `events`: All events in the stream
82    /// - `expected_version`: Optional optimistic lock version
83    /// - `created_at`: Stream creation timestamp
84    /// - `updated_at`: Last modification timestamp
85    pub fn reconstruct(
86        stream_id: EntityId,
87        partition_key: PartitionKey,
88        current_version: u64,
89        watermark: u64,
90        events: Vec<Event>,
91        expected_version: Option<u64>,
92        created_at: DateTime<Utc>,
93        updated_at: DateTime<Utc>,
94    ) -> Result<Self> {
95        // Basic validation
96        if watermark > current_version {
97            return Err(AllSourceError::InvalidInput(format!(
98                "Watermark ({watermark}) cannot exceed current version ({current_version})"
99            )));
100        }
101
102        if events.len() as u64 != current_version {
103            return Err(AllSourceError::InvalidInput(format!(
104                "Event count ({}) must match current version ({})",
105                events.len(),
106                current_version
107            )));
108        }
109
110        Ok(Self {
111            stream_id,
112            partition_key,
113            current_version,
114            watermark,
115            events,
116            expected_version,
117            created_at,
118            updated_at,
119        })
120    }
121
122    /// Append an event with optimistic locking
123    ///
124    /// # SierraDB Pattern
125    /// - Checks expected_version matches current_version
126    /// - Prevents concurrent modification conflicts
127    /// - Ensures gapless version sequence
128    pub fn append_event(&mut self, event: Event) -> Result<u64> {
129        // Optimistic locking check
130        if let Some(expected) = self.expected_version
131            && expected != self.current_version
132        {
133            return Err(AllSourceError::ConcurrencyError(format!(
134                "Version conflict: expected {}, got {}",
135                expected, self.current_version
136            )));
137        }
138
139        // Increment version
140        self.current_version += 1;
141        let new_version = self.current_version;
142
143        // Store event
144        self.events.push(event);
145
146        // Advance watermark (all previous events confirmed)
147        self.watermark = new_version;
148
149        self.updated_at = Utc::now();
150
151        Ok(new_version)
152    }
153
154    /// Set expected version for next append (optimistic locking)
155    pub fn expect_version(&mut self, version: u64) {
156        self.expected_version = Some(version);
157    }
158
159    /// Clear expected version
160    pub fn clear_expected_version(&mut self) {
161        self.expected_version = None;
162    }
163
164    /// Get events from version (inclusive)
165    pub fn events_from(&self, from_version: u64) -> Vec<&Event> {
166        if from_version == 0 || from_version > self.current_version {
167            return Vec::new();
168        }
169
170        let start_idx = (from_version - 1) as usize;
171        self.events[start_idx..].iter().collect()
172    }
173
174    /// Check if stream has gapless versions up to watermark
175    pub fn is_gapless(&self) -> bool {
176        if self.watermark > self.current_version {
177            return false; // Watermark shouldn't exceed current version
178        }
179
180        // Check all versions up to watermark exist
181        for version in 1..=self.watermark {
182            let idx = (version - 1) as usize;
183            if idx >= self.events.len() {
184                return false;
185            }
186        }
187
188        true
189    }
190
191    // Getters
192    pub fn stream_id(&self) -> &EntityId {
193        &self.stream_id
194    }
195
196    pub fn partition_key(&self) -> &PartitionKey {
197        &self.partition_key
198    }
199
200    pub fn current_version(&self) -> u64 {
201        self.current_version
202    }
203
204    pub fn watermark(&self) -> u64 {
205        self.watermark
206    }
207
208    pub fn event_count(&self) -> usize {
209        self.events.len()
210    }
211
212    pub fn created_at(&self) -> DateTime<Utc> {
213        self.created_at
214    }
215
216    pub fn updated_at(&self) -> DateTime<Utc> {
217        self.updated_at
218    }
219
220    pub fn expected_version(&self) -> Option<u64> {
221        self.expected_version
222    }
223
224    // Tenant isolation methods
225
226    /// Get the tenant ID for this stream
227    ///
228    /// Returns the tenant_id from the first event, or None if the stream is empty.
229    /// All events in a stream should belong to the same tenant.
230    pub fn tenant_id(&self) -> Option<&crate::domain::value_objects::TenantId> {
231        self.events.first().map(super::event::Event::tenant_id)
232    }
233
234    /// Validate that all events in the stream belong to the same tenant
235    ///
236    /// Returns true if the stream is empty or all events have the same tenant_id.
237    /// This is a safety check to detect tenant isolation violations.
238    pub fn has_consistent_tenant(&self) -> bool {
239        if self.events.is_empty() {
240            return true;
241        }
242
243        let first_tenant = self.events[0].tenant_id();
244        self.events.iter().all(|e| e.tenant_id() == first_tenant)
245    }
246
247    /// Validate that an event belongs to this stream's tenant
248    ///
249    /// Returns an error if:
250    /// - The stream has events and the new event's tenant doesn't match
251    ///
252    /// This ensures tenant isolation at the stream level.
253    pub fn validate_event_tenant(&self, event: &Event) -> Result<()> {
254        if let Some(stream_tenant) = self.tenant_id()
255            && event.tenant_id() != stream_tenant
256        {
257            return Err(AllSourceError::ValidationError(format!(
258                "Tenant mismatch: stream belongs to '{}', but event belongs to '{}'",
259                stream_tenant.as_str(),
260                event.tenant_id().as_str()
261            )));
262        }
263        Ok(())
264    }
265
266    /// Append an event with tenant validation
267    ///
268    /// Like `append_event`, but also validates tenant consistency.
269    /// Prevents cross-tenant event appends for security.
270    pub fn append_event_with_tenant_check(&mut self, event: Event) -> Result<u64> {
271        // Validate tenant consistency
272        self.validate_event_tenant(&event)?;
273
274        // Proceed with normal append
275        self.append_event(event)
276    }
277}
278
279#[cfg(test)]
280mod tests {
281    use super::*;
282    use serde_json::json;
283
284    fn create_test_event(entity_id: &str) -> Event {
285        Event::from_strings(
286            "test.event".to_string(),
287            entity_id.to_string(),
288            "default".to_string(),
289            json!({"data": "test"}),
290            None,
291        )
292        .unwrap()
293    }
294
295    #[test]
296    fn test_new_stream() {
297        let stream_id = EntityId::new("stream-1".to_string()).unwrap();
298        let stream = EventStream::new(stream_id.clone());
299
300        assert_eq!(stream.current_version(), 0);
301        assert_eq!(stream.watermark(), 0);
302        assert_eq!(stream.event_count(), 0);
303        assert!(stream.is_gapless());
304    }
305
306    #[test]
307    fn test_append_event() {
308        let stream_id = EntityId::new("stream-1".to_string()).unwrap();
309        let mut stream = EventStream::new(stream_id.clone());
310
311        let event = create_test_event("stream-1");
312        let version = stream.append_event(event).unwrap();
313
314        assert_eq!(version, 1);
315        assert_eq!(stream.current_version(), 1);
316        assert_eq!(stream.watermark(), 1);
317        assert_eq!(stream.event_count(), 1);
318        assert!(stream.is_gapless());
319    }
320
321    #[test]
322    fn test_multiple_appends() {
323        let stream_id = EntityId::new("stream-1".to_string()).unwrap();
324        let mut stream = EventStream::new(stream_id.clone());
325
326        for i in 1..=10 {
327            let event = create_test_event("stream-1");
328            let version = stream.append_event(event).unwrap();
329            assert_eq!(version, i);
330        }
331
332        assert_eq!(stream.current_version(), 10);
333        assert_eq!(stream.watermark(), 10);
334        assert_eq!(stream.event_count(), 10);
335        assert!(stream.is_gapless());
336    }
337
338    #[test]
339    fn test_optimistic_locking_success() {
340        let stream_id = EntityId::new("stream-1".to_string()).unwrap();
341        let mut stream = EventStream::new(stream_id);
342
343        // Set expected version
344        stream.expect_version(0);
345
346        let event = create_test_event("stream-1");
347        let result = stream.append_event(event);
348
349        assert!(result.is_ok());
350        assert_eq!(result.unwrap(), 1);
351    }
352
353    #[test]
354    fn test_optimistic_locking_failure() {
355        let stream_id = EntityId::new("stream-1".to_string()).unwrap();
356        let mut stream = EventStream::new(stream_id);
357
358        // Append first event
359        let event1 = create_test_event("stream-1");
360        stream.append_event(event1).unwrap();
361
362        // Set wrong expected version
363        stream.expect_version(0);
364
365        let event2 = create_test_event("stream-1");
366        let result = stream.append_event(event2);
367
368        assert!(result.is_err());
369        assert!(matches!(result, Err(AllSourceError::ConcurrencyError(_))));
370    }
371
372    #[test]
373    fn test_events_from() {
374        let stream_id = EntityId::new("stream-1".to_string()).unwrap();
375        let mut stream = EventStream::new(stream_id);
376
377        // Append 5 events
378        for _ in 0..5 {
379            let event = create_test_event("stream-1");
380            stream.append_event(event).unwrap();
381        }
382
383        let events = stream.events_from(3);
384        assert_eq!(events.len(), 3); // Events 3, 4, 5
385    }
386
387    #[test]
388    fn test_partition_assignment() {
389        let stream_id = EntityId::new("stream-1".to_string()).unwrap();
390        let stream = EventStream::new(stream_id);
391
392        let partition_key = stream.partition_key();
393        assert!(partition_key.partition_id() < PartitionKey::DEFAULT_PARTITION_COUNT);
394    }
395
396    #[test]
397    fn test_clear_expected_version() {
398        let stream_id = EntityId::new("stream-1".to_string()).unwrap();
399        let mut stream = EventStream::new(stream_id);
400
401        stream.expect_version(0);
402        stream.clear_expected_version();
403
404        // Should succeed without version check
405        let event = create_test_event("stream-1");
406        let result = stream.append_event(event);
407        assert!(result.is_ok());
408    }
409
410    #[test]
411    fn test_events_from_edge_cases() {
412        let stream_id = EntityId::new("stream-1".to_string()).unwrap();
413        let mut stream = EventStream::new(stream_id);
414
415        // Append 3 events
416        for _ in 0..3 {
417            let event = create_test_event("stream-1");
418            stream.append_event(event).unwrap();
419        }
420
421        // Test edge cases
422        assert_eq!(stream.events_from(0).len(), 0); // Invalid version 0
423        assert_eq!(stream.events_from(1).len(), 3); // From beginning
424        assert_eq!(stream.events_from(3).len(), 1); // Last event
425        assert_eq!(stream.events_from(4).len(), 0); // Beyond current
426    }
427}