allsource_core/domain/entities/
event_stream.rs1use crate::{
2 domain::{
3 entities::Event,
4 value_objects::{EntityId, PartitionKey},
5 },
6 error::{AllSourceError, Result},
7};
8use chrono::{DateTime, Utc};
9use serde::{Deserialize, Serialize};
10
11#[derive(Debug, Clone, Serialize, Deserialize)]
27pub struct EventStream {
28 stream_id: EntityId,
30
31 partition_key: PartitionKey,
33
34 current_version: u64,
36
37 watermark: u64,
40
41 events: Vec<Event>,
43
44 expected_version: Option<u64>,
47
48 created_at: DateTime<Utc>,
50 updated_at: DateTime<Utc>,
51}
52
53impl EventStream {
54 pub fn new(stream_id: EntityId) -> Self {
56 let partition_key = PartitionKey::from_entity_id(stream_id.as_str());
57 let now = Utc::now();
58
59 Self {
60 stream_id,
61 partition_key,
62 current_version: 0,
63 watermark: 0,
64 events: Vec::new(),
65 expected_version: None,
66 created_at: now,
67 updated_at: now,
68 }
69 }
70
71 pub fn reconstruct(
86 stream_id: EntityId,
87 partition_key: PartitionKey,
88 current_version: u64,
89 watermark: u64,
90 events: Vec<Event>,
91 expected_version: Option<u64>,
92 created_at: DateTime<Utc>,
93 updated_at: DateTime<Utc>,
94 ) -> Result<Self> {
95 if watermark > current_version {
97 return Err(AllSourceError::InvalidInput(format!(
98 "Watermark ({}) cannot exceed current version ({})",
99 watermark, current_version
100 )));
101 }
102
103 if events.len() as u64 != current_version {
104 return Err(AllSourceError::InvalidInput(format!(
105 "Event count ({}) must match current version ({})",
106 events.len(),
107 current_version
108 )));
109 }
110
111 Ok(Self {
112 stream_id,
113 partition_key,
114 current_version,
115 watermark,
116 events,
117 expected_version,
118 created_at,
119 updated_at,
120 })
121 }
122
123 pub fn append_event(&mut self, event: Event) -> Result<u64> {
130 if let Some(expected) = self.expected_version
132 && expected != self.current_version
133 {
134 return Err(AllSourceError::ConcurrencyError(format!(
135 "Version conflict: expected {}, got {}",
136 expected, self.current_version
137 )));
138 }
139
140 self.current_version += 1;
142 let new_version = self.current_version;
143
144 self.events.push(event);
146
147 self.watermark = new_version;
149
150 self.updated_at = Utc::now();
151
152 Ok(new_version)
153 }
154
155 pub fn expect_version(&mut self, version: u64) {
157 self.expected_version = Some(version);
158 }
159
160 pub fn clear_expected_version(&mut self) {
162 self.expected_version = None;
163 }
164
165 pub fn events_from(&self, from_version: u64) -> Vec<&Event> {
167 if from_version == 0 || from_version > self.current_version {
168 return Vec::new();
169 }
170
171 let start_idx = (from_version - 1) as usize;
172 self.events[start_idx..].iter().collect()
173 }
174
175 pub fn is_gapless(&self) -> bool {
177 if self.watermark > self.current_version {
178 return false; }
180
181 for version in 1..=self.watermark {
183 let idx = (version - 1) as usize;
184 if idx >= self.events.len() {
185 return false;
186 }
187 }
188
189 true
190 }
191
192 pub fn stream_id(&self) -> &EntityId {
194 &self.stream_id
195 }
196
197 pub fn partition_key(&self) -> &PartitionKey {
198 &self.partition_key
199 }
200
201 pub fn current_version(&self) -> u64 {
202 self.current_version
203 }
204
205 pub fn watermark(&self) -> u64 {
206 self.watermark
207 }
208
209 pub fn event_count(&self) -> usize {
210 self.events.len()
211 }
212
213 pub fn created_at(&self) -> DateTime<Utc> {
214 self.created_at
215 }
216
217 pub fn updated_at(&self) -> DateTime<Utc> {
218 self.updated_at
219 }
220
221 pub fn expected_version(&self) -> Option<u64> {
222 self.expected_version
223 }
224
225 pub fn tenant_id(&self) -> Option<&crate::domain::value_objects::TenantId> {
232 self.events.first().map(|e| e.tenant_id())
233 }
234
235 pub fn has_consistent_tenant(&self) -> bool {
240 if self.events.is_empty() {
241 return true;
242 }
243
244 let first_tenant = self.events[0].tenant_id();
245 self.events.iter().all(|e| e.tenant_id() == first_tenant)
246 }
247
248 pub fn validate_event_tenant(&self, event: &Event) -> Result<()> {
255 if let Some(stream_tenant) = self.tenant_id()
256 && event.tenant_id() != stream_tenant
257 {
258 return Err(AllSourceError::ValidationError(format!(
259 "Tenant mismatch: stream belongs to '{}', but event belongs to '{}'",
260 stream_tenant.as_str(),
261 event.tenant_id().as_str()
262 )));
263 }
264 Ok(())
265 }
266
267 pub fn append_event_with_tenant_check(&mut self, event: Event) -> Result<u64> {
272 self.validate_event_tenant(&event)?;
274
275 self.append_event(event)
277 }
278}
279
280#[cfg(test)]
281mod tests {
282 use super::*;
283 use serde_json::json;
284
285 fn create_test_event(entity_id: &str) -> Event {
286 Event::from_strings(
287 "test.event".to_string(),
288 entity_id.to_string(),
289 "default".to_string(),
290 json!({"data": "test"}),
291 None,
292 )
293 .unwrap()
294 }
295
296 #[test]
297 fn test_new_stream() {
298 let stream_id = EntityId::new("stream-1".to_string()).unwrap();
299 let stream = EventStream::new(stream_id.clone());
300
301 assert_eq!(stream.current_version(), 0);
302 assert_eq!(stream.watermark(), 0);
303 assert_eq!(stream.event_count(), 0);
304 assert!(stream.is_gapless());
305 }
306
307 #[test]
308 fn test_append_event() {
309 let stream_id = EntityId::new("stream-1".to_string()).unwrap();
310 let mut stream = EventStream::new(stream_id.clone());
311
312 let event = create_test_event("stream-1");
313 let version = stream.append_event(event).unwrap();
314
315 assert_eq!(version, 1);
316 assert_eq!(stream.current_version(), 1);
317 assert_eq!(stream.watermark(), 1);
318 assert_eq!(stream.event_count(), 1);
319 assert!(stream.is_gapless());
320 }
321
322 #[test]
323 fn test_multiple_appends() {
324 let stream_id = EntityId::new("stream-1".to_string()).unwrap();
325 let mut stream = EventStream::new(stream_id.clone());
326
327 for i in 1..=10 {
328 let event = create_test_event("stream-1");
329 let version = stream.append_event(event).unwrap();
330 assert_eq!(version, i);
331 }
332
333 assert_eq!(stream.current_version(), 10);
334 assert_eq!(stream.watermark(), 10);
335 assert_eq!(stream.event_count(), 10);
336 assert!(stream.is_gapless());
337 }
338
339 #[test]
340 fn test_optimistic_locking_success() {
341 let stream_id = EntityId::new("stream-1".to_string()).unwrap();
342 let mut stream = EventStream::new(stream_id);
343
344 stream.expect_version(0);
346
347 let event = create_test_event("stream-1");
348 let result = stream.append_event(event);
349
350 assert!(result.is_ok());
351 assert_eq!(result.unwrap(), 1);
352 }
353
354 #[test]
355 fn test_optimistic_locking_failure() {
356 let stream_id = EntityId::new("stream-1".to_string()).unwrap();
357 let mut stream = EventStream::new(stream_id);
358
359 let event1 = create_test_event("stream-1");
361 stream.append_event(event1).unwrap();
362
363 stream.expect_version(0);
365
366 let event2 = create_test_event("stream-1");
367 let result = stream.append_event(event2);
368
369 assert!(result.is_err());
370 assert!(matches!(result, Err(AllSourceError::ConcurrencyError(_))));
371 }
372
373 #[test]
374 fn test_events_from() {
375 let stream_id = EntityId::new("stream-1".to_string()).unwrap();
376 let mut stream = EventStream::new(stream_id);
377
378 for _ in 0..5 {
380 let event = create_test_event("stream-1");
381 stream.append_event(event).unwrap();
382 }
383
384 let events = stream.events_from(3);
385 assert_eq!(events.len(), 3); }
387
388 #[test]
389 fn test_partition_assignment() {
390 let stream_id = EntityId::new("stream-1".to_string()).unwrap();
391 let stream = EventStream::new(stream_id);
392
393 let partition_key = stream.partition_key();
394 assert!(partition_key.partition_id() < PartitionKey::DEFAULT_PARTITION_COUNT);
395 }
396
397 #[test]
398 fn test_clear_expected_version() {
399 let stream_id = EntityId::new("stream-1".to_string()).unwrap();
400 let mut stream = EventStream::new(stream_id);
401
402 stream.expect_version(0);
403 stream.clear_expected_version();
404
405 let event = create_test_event("stream-1");
407 let result = stream.append_event(event);
408 assert!(result.is_ok());
409 }
410
411 #[test]
412 fn test_events_from_edge_cases() {
413 let stream_id = EntityId::new("stream-1".to_string()).unwrap();
414 let mut stream = EventStream::new(stream_id);
415
416 for _ in 0..3 {
418 let event = create_test_event("stream-1");
419 stream.append_event(event).unwrap();
420 }
421
422 assert_eq!(stream.events_from(0).len(), 0); assert_eq!(stream.events_from(1).len(), 3); assert_eq!(stream.events_from(3).len(), 1); assert_eq!(stream.events_from(4).len(), 0); }
428}