allsource_core/domain/entities/
event_stream.rs1use crate::domain::entities::Event;
2use crate::domain::value_objects::{EntityId, PartitionKey};
3use crate::error::{AllSourceError, Result};
4use chrono::{DateTime, Utc};
5use serde::{Deserialize, Serialize};
6
7#[derive(Debug, Clone, Serialize, Deserialize)]
23pub struct EventStream {
24 stream_id: EntityId,
26
27 partition_key: PartitionKey,
29
30 current_version: u64,
32
33 watermark: u64,
36
37 events: Vec<Event>,
39
40 expected_version: Option<u64>,
43
44 created_at: DateTime<Utc>,
46 updated_at: DateTime<Utc>,
47}
48
49impl EventStream {
50 pub fn new(stream_id: EntityId) -> Self {
52 let partition_key = PartitionKey::from_entity_id(stream_id.as_str());
53 let now = Utc::now();
54
55 Self {
56 stream_id,
57 partition_key,
58 current_version: 0,
59 watermark: 0,
60 events: Vec::new(),
61 expected_version: None,
62 created_at: now,
63 updated_at: now,
64 }
65 }
66
67 pub fn reconstruct(
82 stream_id: EntityId,
83 partition_key: PartitionKey,
84 current_version: u64,
85 watermark: u64,
86 events: Vec<Event>,
87 expected_version: Option<u64>,
88 created_at: DateTime<Utc>,
89 updated_at: DateTime<Utc>,
90 ) -> Result<Self> {
91 if watermark > current_version {
93 return Err(AllSourceError::InvalidInput(format!(
94 "Watermark ({}) cannot exceed current version ({})",
95 watermark, current_version
96 )));
97 }
98
99 if events.len() as u64 != current_version {
100 return Err(AllSourceError::InvalidInput(format!(
101 "Event count ({}) must match current version ({})",
102 events.len(),
103 current_version
104 )));
105 }
106
107 Ok(Self {
108 stream_id,
109 partition_key,
110 current_version,
111 watermark,
112 events,
113 expected_version,
114 created_at,
115 updated_at,
116 })
117 }
118
119 pub fn append_event(&mut self, event: Event) -> Result<u64> {
126 if let Some(expected) = self.expected_version {
128 if expected != self.current_version {
129 return Err(AllSourceError::ConcurrencyError(format!(
130 "Version conflict: expected {}, got {}",
131 expected, self.current_version
132 )));
133 }
134 }
135
136 self.current_version += 1;
138 let new_version = self.current_version;
139
140 self.events.push(event);
142
143 self.watermark = new_version;
145
146 self.updated_at = Utc::now();
147
148 Ok(new_version)
149 }
150
151 pub fn expect_version(&mut self, version: u64) {
153 self.expected_version = Some(version);
154 }
155
156 pub fn clear_expected_version(&mut self) {
158 self.expected_version = None;
159 }
160
161 pub fn events_from(&self, from_version: u64) -> Vec<&Event> {
163 if from_version == 0 || from_version > self.current_version {
164 return Vec::new();
165 }
166
167 let start_idx = (from_version - 1) as usize;
168 self.events[start_idx..].iter().collect()
169 }
170
171 pub fn is_gapless(&self) -> bool {
173 if self.watermark > self.current_version {
174 return false; }
176
177 for version in 1..=self.watermark {
179 let idx = (version - 1) as usize;
180 if idx >= self.events.len() {
181 return false;
182 }
183 }
184
185 true
186 }
187
188 pub fn stream_id(&self) -> &EntityId {
190 &self.stream_id
191 }
192
193 pub fn partition_key(&self) -> &PartitionKey {
194 &self.partition_key
195 }
196
197 pub fn current_version(&self) -> u64 {
198 self.current_version
199 }
200
201 pub fn watermark(&self) -> u64 {
202 self.watermark
203 }
204
205 pub fn event_count(&self) -> usize {
206 self.events.len()
207 }
208
209 pub fn created_at(&self) -> DateTime<Utc> {
210 self.created_at
211 }
212
213 pub fn updated_at(&self) -> DateTime<Utc> {
214 self.updated_at
215 }
216
217 pub fn expected_version(&self) -> Option<u64> {
218 self.expected_version
219 }
220
221 pub fn tenant_id(&self) -> Option<&crate::domain::value_objects::TenantId> {
228 self.events.first().map(|e| e.tenant_id())
229 }
230
231 pub fn has_consistent_tenant(&self) -> bool {
236 if self.events.is_empty() {
237 return true;
238 }
239
240 let first_tenant = self.events[0].tenant_id();
241 self.events.iter().all(|e| e.tenant_id() == first_tenant)
242 }
243
244 pub fn validate_event_tenant(&self, event: &Event) -> Result<()> {
251 if let Some(stream_tenant) = self.tenant_id() {
252 if event.tenant_id() != stream_tenant {
253 return Err(AllSourceError::ValidationError(format!(
254 "Tenant mismatch: stream belongs to '{}', but event belongs to '{}'",
255 stream_tenant.as_str(),
256 event.tenant_id().as_str()
257 )));
258 }
259 }
260 Ok(())
261 }
262
263 pub fn append_event_with_tenant_check(&mut self, event: Event) -> Result<u64> {
268 self.validate_event_tenant(&event)?;
270
271 self.append_event(event)
273 }
274}
275
276#[cfg(test)]
277mod tests {
278 use super::*;
279 use serde_json::json;
280
281 fn create_test_event(entity_id: &str) -> Event {
282 Event::from_strings(
283 "test.event".to_string(),
284 entity_id.to_string(),
285 "default".to_string(),
286 json!({"data": "test"}),
287 None,
288 )
289 .unwrap()
290 }
291
292 #[test]
293 fn test_new_stream() {
294 let stream_id = EntityId::new("stream-1".to_string()).unwrap();
295 let stream = EventStream::new(stream_id.clone());
296
297 assert_eq!(stream.current_version(), 0);
298 assert_eq!(stream.watermark(), 0);
299 assert_eq!(stream.event_count(), 0);
300 assert!(stream.is_gapless());
301 }
302
303 #[test]
304 fn test_append_event() {
305 let stream_id = EntityId::new("stream-1".to_string()).unwrap();
306 let mut stream = EventStream::new(stream_id.clone());
307
308 let event = create_test_event("stream-1");
309 let version = stream.append_event(event).unwrap();
310
311 assert_eq!(version, 1);
312 assert_eq!(stream.current_version(), 1);
313 assert_eq!(stream.watermark(), 1);
314 assert_eq!(stream.event_count(), 1);
315 assert!(stream.is_gapless());
316 }
317
318 #[test]
319 fn test_multiple_appends() {
320 let stream_id = EntityId::new("stream-1".to_string()).unwrap();
321 let mut stream = EventStream::new(stream_id.clone());
322
323 for i in 1..=10 {
324 let event = create_test_event("stream-1");
325 let version = stream.append_event(event).unwrap();
326 assert_eq!(version, i);
327 }
328
329 assert_eq!(stream.current_version(), 10);
330 assert_eq!(stream.watermark(), 10);
331 assert_eq!(stream.event_count(), 10);
332 assert!(stream.is_gapless());
333 }
334
335 #[test]
336 fn test_optimistic_locking_success() {
337 let stream_id = EntityId::new("stream-1".to_string()).unwrap();
338 let mut stream = EventStream::new(stream_id);
339
340 stream.expect_version(0);
342
343 let event = create_test_event("stream-1");
344 let result = stream.append_event(event);
345
346 assert!(result.is_ok());
347 assert_eq!(result.unwrap(), 1);
348 }
349
350 #[test]
351 fn test_optimistic_locking_failure() {
352 let stream_id = EntityId::new("stream-1".to_string()).unwrap();
353 let mut stream = EventStream::new(stream_id);
354
355 let event1 = create_test_event("stream-1");
357 stream.append_event(event1).unwrap();
358
359 stream.expect_version(0);
361
362 let event2 = create_test_event("stream-1");
363 let result = stream.append_event(event2);
364
365 assert!(result.is_err());
366 assert!(matches!(result, Err(AllSourceError::ConcurrencyError(_))));
367 }
368
369 #[test]
370 fn test_events_from() {
371 let stream_id = EntityId::new("stream-1".to_string()).unwrap();
372 let mut stream = EventStream::new(stream_id);
373
374 for _ in 0..5 {
376 let event = create_test_event("stream-1");
377 stream.append_event(event).unwrap();
378 }
379
380 let events = stream.events_from(3);
381 assert_eq!(events.len(), 3); }
383
384 #[test]
385 fn test_partition_assignment() {
386 let stream_id = EntityId::new("stream-1".to_string()).unwrap();
387 let stream = EventStream::new(stream_id);
388
389 let partition_key = stream.partition_key();
390 assert!(partition_key.partition_id() < PartitionKey::DEFAULT_PARTITION_COUNT);
391 }
392
393 #[test]
394 fn test_clear_expected_version() {
395 let stream_id = EntityId::new("stream-1".to_string()).unwrap();
396 let mut stream = EventStream::new(stream_id);
397
398 stream.expect_version(0);
399 stream.clear_expected_version();
400
401 let event = create_test_event("stream-1");
403 let result = stream.append_event(event);
404 assert!(result.is_ok());
405 }
406
407 #[test]
408 fn test_events_from_edge_cases() {
409 let stream_id = EntityId::new("stream-1".to_string()).unwrap();
410 let mut stream = EventStream::new(stream_id);
411
412 for _ in 0..3 {
414 let event = create_test_event("stream-1");
415 stream.append_event(event).unwrap();
416 }
417
418 assert_eq!(stream.events_from(0).len(), 0); assert_eq!(stream.events_from(1).len(), 3); assert_eq!(stream.events_from(3).len(), 1); assert_eq!(stream.events_from(4).len(), 0); }
424}