allsource_core/domain/entities/
event_stream.rs1use crate::{
2 domain::{
3 entities::Event,
4 value_objects::{EntityId, PartitionKey},
5 },
6 error::{AllSourceError, Result},
7};
8use chrono::{DateTime, Utc};
9use serde::{Deserialize, Serialize};
10
11#[derive(Debug, Clone, Serialize, Deserialize)]
27pub struct EventStream {
28 stream_id: EntityId,
30
31 partition_key: PartitionKey,
33
34 current_version: u64,
36
37 watermark: u64,
40
41 events: Vec<Event>,
43
44 expected_version: Option<u64>,
47
48 created_at: DateTime<Utc>,
50 updated_at: DateTime<Utc>,
51}
52
53impl EventStream {
54 pub fn new(stream_id: EntityId) -> Self {
56 let partition_key = PartitionKey::from_entity_id(stream_id.as_str());
57 let now = Utc::now();
58
59 Self {
60 stream_id,
61 partition_key,
62 current_version: 0,
63 watermark: 0,
64 events: Vec::new(),
65 expected_version: None,
66 created_at: now,
67 updated_at: now,
68 }
69 }
70
71 pub fn reconstruct(
86 stream_id: EntityId,
87 partition_key: PartitionKey,
88 current_version: u64,
89 watermark: u64,
90 events: Vec<Event>,
91 expected_version: Option<u64>,
92 created_at: DateTime<Utc>,
93 updated_at: DateTime<Utc>,
94 ) -> Result<Self> {
95 if watermark > current_version {
97 return Err(AllSourceError::InvalidInput(format!(
98 "Watermark ({watermark}) cannot exceed current version ({current_version})"
99 )));
100 }
101
102 if events.len() as u64 != current_version {
103 return Err(AllSourceError::InvalidInput(format!(
104 "Event count ({}) must match current version ({})",
105 events.len(),
106 current_version
107 )));
108 }
109
110 Ok(Self {
111 stream_id,
112 partition_key,
113 current_version,
114 watermark,
115 events,
116 expected_version,
117 created_at,
118 updated_at,
119 })
120 }
121
122 pub fn append_event(&mut self, event: Event) -> Result<u64> {
129 if let Some(expected) = self.expected_version
131 && expected != self.current_version
132 {
133 return Err(AllSourceError::ConcurrencyError(format!(
134 "Version conflict: expected {}, got {}",
135 expected, self.current_version
136 )));
137 }
138
139 self.current_version += 1;
141 let new_version = self.current_version;
142
143 self.events.push(event);
145
146 self.watermark = new_version;
148
149 self.updated_at = Utc::now();
150
151 Ok(new_version)
152 }
153
154 pub fn expect_version(&mut self, version: u64) {
156 self.expected_version = Some(version);
157 }
158
159 pub fn clear_expected_version(&mut self) {
161 self.expected_version = None;
162 }
163
164 pub fn events_from(&self, from_version: u64) -> Vec<&Event> {
166 if from_version == 0 || from_version > self.current_version {
167 return Vec::new();
168 }
169
170 let start_idx = (from_version - 1) as usize;
171 self.events[start_idx..].iter().collect()
172 }
173
174 pub fn is_gapless(&self) -> bool {
176 if self.watermark > self.current_version {
177 return false; }
179
180 for version in 1..=self.watermark {
182 let idx = (version - 1) as usize;
183 if idx >= self.events.len() {
184 return false;
185 }
186 }
187
188 true
189 }
190
191 pub fn stream_id(&self) -> &EntityId {
193 &self.stream_id
194 }
195
196 pub fn partition_key(&self) -> &PartitionKey {
197 &self.partition_key
198 }
199
200 pub fn current_version(&self) -> u64 {
201 self.current_version
202 }
203
204 pub fn watermark(&self) -> u64 {
205 self.watermark
206 }
207
208 pub fn event_count(&self) -> usize {
209 self.events.len()
210 }
211
212 pub fn created_at(&self) -> DateTime<Utc> {
213 self.created_at
214 }
215
216 pub fn updated_at(&self) -> DateTime<Utc> {
217 self.updated_at
218 }
219
220 pub fn expected_version(&self) -> Option<u64> {
221 self.expected_version
222 }
223
224 pub fn tenant_id(&self) -> Option<&crate::domain::value_objects::TenantId> {
231 self.events.first().map(super::event::Event::tenant_id)
232 }
233
234 pub fn has_consistent_tenant(&self) -> bool {
239 if self.events.is_empty() {
240 return true;
241 }
242
243 let first_tenant = self.events[0].tenant_id();
244 self.events.iter().all(|e| e.tenant_id() == first_tenant)
245 }
246
247 pub fn validate_event_tenant(&self, event: &Event) -> Result<()> {
254 if let Some(stream_tenant) = self.tenant_id()
255 && event.tenant_id() != stream_tenant
256 {
257 return Err(AllSourceError::ValidationError(format!(
258 "Tenant mismatch: stream belongs to '{}', but event belongs to '{}'",
259 stream_tenant.as_str(),
260 event.tenant_id().as_str()
261 )));
262 }
263 Ok(())
264 }
265
266 pub fn append_event_with_tenant_check(&mut self, event: Event) -> Result<u64> {
271 self.validate_event_tenant(&event)?;
273
274 self.append_event(event)
276 }
277}
278
279#[cfg(test)]
280mod tests {
281 use super::*;
282 use serde_json::json;
283
284 fn create_test_event(entity_id: &str) -> Event {
285 Event::from_strings(
286 "test.event".to_string(),
287 entity_id.to_string(),
288 "default".to_string(),
289 json!({"data": "test"}),
290 None,
291 )
292 .unwrap()
293 }
294
295 #[test]
296 fn test_new_stream() {
297 let stream_id = EntityId::new("stream-1".to_string()).unwrap();
298 let stream = EventStream::new(stream_id.clone());
299
300 assert_eq!(stream.current_version(), 0);
301 assert_eq!(stream.watermark(), 0);
302 assert_eq!(stream.event_count(), 0);
303 assert!(stream.is_gapless());
304 }
305
306 #[test]
307 fn test_append_event() {
308 let stream_id = EntityId::new("stream-1".to_string()).unwrap();
309 let mut stream = EventStream::new(stream_id.clone());
310
311 let event = create_test_event("stream-1");
312 let version = stream.append_event(event).unwrap();
313
314 assert_eq!(version, 1);
315 assert_eq!(stream.current_version(), 1);
316 assert_eq!(stream.watermark(), 1);
317 assert_eq!(stream.event_count(), 1);
318 assert!(stream.is_gapless());
319 }
320
321 #[test]
322 fn test_multiple_appends() {
323 let stream_id = EntityId::new("stream-1".to_string()).unwrap();
324 let mut stream = EventStream::new(stream_id.clone());
325
326 for i in 1..=10 {
327 let event = create_test_event("stream-1");
328 let version = stream.append_event(event).unwrap();
329 assert_eq!(version, i);
330 }
331
332 assert_eq!(stream.current_version(), 10);
333 assert_eq!(stream.watermark(), 10);
334 assert_eq!(stream.event_count(), 10);
335 assert!(stream.is_gapless());
336 }
337
338 #[test]
339 fn test_optimistic_locking_success() {
340 let stream_id = EntityId::new("stream-1".to_string()).unwrap();
341 let mut stream = EventStream::new(stream_id);
342
343 stream.expect_version(0);
345
346 let event = create_test_event("stream-1");
347 let result = stream.append_event(event);
348
349 assert!(result.is_ok());
350 assert_eq!(result.unwrap(), 1);
351 }
352
353 #[test]
354 fn test_optimistic_locking_failure() {
355 let stream_id = EntityId::new("stream-1".to_string()).unwrap();
356 let mut stream = EventStream::new(stream_id);
357
358 let event1 = create_test_event("stream-1");
360 stream.append_event(event1).unwrap();
361
362 stream.expect_version(0);
364
365 let event2 = create_test_event("stream-1");
366 let result = stream.append_event(event2);
367
368 assert!(result.is_err());
369 assert!(matches!(result, Err(AllSourceError::ConcurrencyError(_))));
370 }
371
372 #[test]
373 fn test_events_from() {
374 let stream_id = EntityId::new("stream-1".to_string()).unwrap();
375 let mut stream = EventStream::new(stream_id);
376
377 for _ in 0..5 {
379 let event = create_test_event("stream-1");
380 stream.append_event(event).unwrap();
381 }
382
383 let events = stream.events_from(3);
384 assert_eq!(events.len(), 3); }
386
387 #[test]
388 fn test_partition_assignment() {
389 let stream_id = EntityId::new("stream-1".to_string()).unwrap();
390 let stream = EventStream::new(stream_id);
391
392 let partition_key = stream.partition_key();
393 assert!(partition_key.partition_id() < PartitionKey::DEFAULT_PARTITION_COUNT);
394 }
395
396 #[test]
397 fn test_clear_expected_version() {
398 let stream_id = EntityId::new("stream-1".to_string()).unwrap();
399 let mut stream = EventStream::new(stream_id);
400
401 stream.expect_version(0);
402 stream.clear_expected_version();
403
404 let event = create_test_event("stream-1");
406 let result = stream.append_event(event);
407 assert!(result.is_ok());
408 }
409
410 #[test]
411 fn test_events_from_edge_cases() {
412 let stream_id = EntityId::new("stream-1".to_string()).unwrap();
413 let mut stream = EventStream::new(stream_id);
414
415 for _ in 0..3 {
417 let event = create_test_event("stream-1");
418 stream.append_event(event).unwrap();
419 }
420
421 assert_eq!(stream.events_from(0).len(), 0); assert_eq!(stream.events_from(1).len(), 3); assert_eq!(stream.events_from(3).len(), 1); assert_eq!(stream.events_from(4).len(), 0); }
427}