1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
use std::marker::PhantomData;

use async_trait::async_trait;

use crate::decider::{EventComputation, StateComputation};

/// Event Repository trait
///
/// Generic parameters:
///
/// - `C` - Command
/// - `E` - Event
/// - `Version` - Version/Offset/Sequence number
/// - `Error` - Error
#[async_trait]
pub trait EventRepository<C, E, Version, Error> {
    /// Fetches current events, based on the command.
    async fn fetch_events(&self, command: &C) -> Result<Vec<(E, Version)>, Error>;
    /// Saves events.
    async fn save(
        &self,
        events: &[E],
        latest_version: &Option<Version>,
    ) -> Result<Vec<(E, Version)>, Error>;
}

/// Event Sourced Aggregate.
///
/// It is using a `Decider` / [EventComputation] to compute new events based on the current events and the command.
/// It is using a [EventRepository] to fetch the current events and to save the new events.
///
/// Generic parameters:
///
/// - `C` - Command
/// - `S` - State
/// - `E` - Event
/// - `Repository` - Event repository
/// - `Decider` - Event computation
/// - `Version` - Version/Offset/Sequence number
/// - `Error` - Error
pub struct EventSourcedAggregate<C, S, E, Repository, Decider, Version, Error>
where
    Repository: EventRepository<C, E, Version, Error>,
    Decider: EventComputation<C, S, E>,
{
    repository: Repository,
    decider: Decider,
    _marker: PhantomData<(C, S, E, Version, Error)>,
}

impl<C, S, E, Repository, Decider, Version, Error>
    EventSourcedAggregate<C, S, E, Repository, Decider, Version, Error>
where
    Repository: EventRepository<C, E, Version, Error>,
    Decider: EventComputation<C, S, E>,
{
    /// Creates a new instance of [EventSourcedAggregate].
    pub fn new(repository: Repository, decider: Decider) -> Self {
        EventSourcedAggregate {
            repository,
            decider,
            _marker: PhantomData,
        }
    }
    /// Handles the command by fetching the events from the repository, computing new events based on the current events and the command, and saving the new events to the repository.
    pub async fn handle(&self, command: &C) -> Result<Vec<(E, Version)>, Error> {
        let events: Vec<(E, Version)> = self.repository.fetch_events(command).await?;
        let mut version: Option<Version> = None;
        let mut current_events: Vec<E> = vec![];
        for (event, ver) in events {
            version = Some(ver);
            current_events.push(event);
        }
        let new_events = self.decider.compute_new_events(&current_events, command);
        let saved_events = self.repository.save(&new_events, &version).await?;
        Ok(saved_events)
    }
}

/// State Repository trait
///
/// Generic parameters:
///
/// - `C` - Command
/// - `S` - State
/// - `Version` - Version
/// - `Error` - Error
#[async_trait]
pub trait StateRepository<C, S, Version, Error> {
    /// Fetches current state, based on the command.
    async fn fetch_state(&self, command: &C) -> Result<Option<(S, Version)>, Error>;
    /// Saves state.
    async fn save(&self, state: &S, version: &Option<Version>) -> Result<(S, Version), Error>;
}

/// State Stored Aggregate.
///
/// It is using a `Decider` / [StateComputation] to compute new state based on the current state and the command.
/// It is using a [StateRepository] to fetch the current state and to save the new state.
///
/// Generic parameters:
///
/// - `C` - Command
/// - `S` - State
/// - `E` - Event
/// - `Repository` - State repository
/// - `Decider` - State computation
/// - `Version` - Version
/// - `Error` - Error
pub struct StateStoredAggregate<C, S, E, Repository, Decider, Version, Error>
where
    Repository: StateRepository<C, S, Version, Error>,
    Decider: StateComputation<C, S, E>,
{
    repository: Repository,
    decider: Decider,
    _marker: PhantomData<(C, S, E, Version, Error)>,
}

impl<C, S, E, Repository, Decider, Version, Error>
    StateStoredAggregate<C, S, E, Repository, Decider, Version, Error>
where
    Repository: StateRepository<C, S, Version, Error>,
    Decider: StateComputation<C, S, E>,
{
    /// Creates a new instance of [StateStoredAggregate].
    pub fn new(repository: Repository, decider: Decider) -> Self {
        StateStoredAggregate {
            repository,
            decider,
            _marker: PhantomData,
        }
    }
    /// Handles the command by fetching the state from the repository, computing new state based on the current state and the command, and saving the new state to the repository.
    pub async fn handle(&self, command: &C) -> Result<(S, Version), Error> {
        let state_version = self.repository.fetch_state(command).await?;
        match state_version {
            None => {
                let new_state = self.decider.compute_new_state(None, command);
                let saved_state = self.repository.save(&new_state, &None).await?;
                Ok(saved_state)
            }
            Some((state, version)) => {
                let new_state = self.decider.compute_new_state(Some(state), command);
                let saved_state = self.repository.save(&new_state, &Some(version)).await?;
                Ok(saved_state)
            }
        }
    }
}