1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
use std::collections::HashMap;
use std::io;
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::RwLock;

use crate::commons::api::Handle;
use crate::commons::eventsourcing::agg::AggregateHistory;
use crate::commons::eventsourcing::cmd::{Command, StoredCommandBuilder};
use crate::commons::eventsourcing::{
    Aggregate, DiskKeyStore, Event, EventListener, KeyStore, KeyStoreError,
};

const SNAPSHOT_FREQ: u64 = 5;

pub type StoreResult<T> = Result<T, AggregateStoreError>;

pub trait AggregateStore<A: Aggregate>: Send + Sync
where
    A::Error: From<AggregateStoreError>,
{
    /// Gets the latest version for the given aggregate. Returns
    /// an AggregateStoreError::UnknownAggregate in case the aggregate
    /// does not exist.
    fn get_latest(&self, id: &Handle) -> StoreResult<Arc<A>>;

    /// Adds a new aggregate instance based on the init event.
    fn add(&self, init: A::InitEvent) -> StoreResult<Arc<A>>;

    /// Sends a command to the appropriate aggregate, and on
    /// success: save command and events, return aggregate
    /// no-op: do not save any thing, return aggregate
    /// error: save command and error, return error
    fn command(&self, cmd: A::Command) -> Result<Arc<A>, A::Error>;

    /// Returns true if an instance exists for the id
    fn has(&self, id: &Handle) -> bool;

    /// Lists all known ids.
    fn list(&self) -> Vec<Handle>;

    /// Adds a listener that will receive a reference to all events as they
    /// are stored.
    fn add_listener<L: EventListener<A>>(&mut self, listener: Arc<L>);

    /// Lists the complete history for an aggregate.
    fn history(&self, id: &Handle) -> StoreResult<AggregateHistory<A>>;
}

/// This type defines possible Errors for the AggregateStore
#[derive(Debug, Display)]
pub enum AggregateStoreError {
    #[display(fmt = "{}", _0)]
    KeyStoreError(KeyStoreError),

    #[display(fmt = "Unknown aggregate: {}", _0)]
    UnknownAggregate(Handle),

    #[display(fmt = "Aggregate init event exists, but cannot be applied")]
    InitError,

    #[display(fmt = "Event not applicable to aggregate, id or version is off")]
    WrongEventForAggregate,

    #[display(fmt = "Trying to update outdated aggregate '{}'", _0)]
    ConcurrentModification(Handle),
}

impl From<KeyStoreError> for AggregateStoreError {
    fn from(e: KeyStoreError) -> Self {
        AggregateStoreError::KeyStoreError(e)
    }
}

pub struct DiskAggregateStore<A: Aggregate> {
    store: DiskKeyStore,
    cache: RwLock<HashMap<Handle, Arc<A>>>,
    use_cache: bool,
    listeners: Vec<Arc<dyn EventListener<A>>>,
    outer_lock: RwLock<()>,
}

impl<A: Aggregate> DiskAggregateStore<A> {
    pub fn new(work_dir: &PathBuf, name_space: &str) -> Result<Self, io::Error> {
        let store = DiskKeyStore::under_work_dir(work_dir, name_space)?;
        let cache = RwLock::new(HashMap::new());
        let use_cache = true;
        let listeners = vec![];
        let lock = RwLock::new(());
        Ok(DiskAggregateStore {
            store,
            cache,
            use_cache,
            listeners,
            outer_lock: lock,
        })
    }
}

impl<A: Aggregate> DiskAggregateStore<A> {
    fn has_updates(&self, id: &Handle, aggregate: &A) -> StoreResult<bool> {
        Ok(self
            .store
            .get_event::<A::Event>(id, aggregate.version())?
            .is_some())
    }

    fn cache_get(&self, id: &Handle) -> Option<Arc<A>> {
        if self.use_cache {
            self.cache.read().unwrap().get(id).cloned()
        } else {
            None
        }
    }

    fn cache_update(&self, id: &Handle, arc: Arc<A>) {
        if self.use_cache {
            self.cache.write().unwrap().insert(id.clone(), arc);
        }
    }

    fn get_latest_no_lock(&self, handle: &Handle) -> StoreResult<Arc<A>> {
        trace!("Trying to load aggregate id: {}", handle);
        match self.cache_get(handle) {
            None => match self.store.get_aggregate(handle)? {
                None => {
                    error!("Could not load aggregate with id: {} from disk", handle);
                    Err(AggregateStoreError::UnknownAggregate(handle.clone()))
                }
                Some(agg) => {
                    let arc: Arc<A> = Arc::new(agg);
                    self.cache_update(handle, arc.clone());
                    trace!("Loaded aggregate id: {} from disk", handle);
                    Ok(arc)
                }
            },
            Some(mut arc) => {
                if self.has_updates(handle, &arc)? {
                    let agg = Arc::make_mut(&mut arc);
                    self.store.update_aggregate(handle, agg)?;
                }
                trace!("Loaded aggregate id: {} from memory", handle);
                Ok(arc)
            }
        }
    }
}

impl<A: Aggregate> AggregateStore<A> for DiskAggregateStore<A>
where
    A::Error: From<AggregateStoreError>,
{
    fn get_latest(&self, handle: &Handle) -> StoreResult<Arc<A>> {
        let _lock = self.outer_lock.read().unwrap();
        self.get_latest_no_lock(handle)
    }

    fn add(&self, init: A::InitEvent) -> StoreResult<Arc<A>> {
        let _lock = self.outer_lock.write().unwrap();

        self.store.store_event(&init)?;

        let handle = init.handle().clone();

        let aggregate = A::init(init).map_err(|_| AggregateStoreError::InitError)?;
        self.store.store_aggregate(&handle, &aggregate)?;

        let arc = Arc::new(aggregate);
        self.cache_update(&handle, arc.clone());

        Ok(arc)
    }

    fn command(&self, cmd: A::Command) -> Result<Arc<A>, A::Error> {
        let _lock = self.outer_lock.write().unwrap();

        // Get the latest arc.
        let handle = cmd.handle().clone();

        let mut latest = self.get_latest_no_lock(&handle)?;

        if let Some(version) = cmd.version() {
            if version != latest.version() {
                error!(
                    "Version conflict updating '{}', expected version: {}, found: {}",
                    handle,
                    version,
                    latest.version()
                );

                return Err(A::Error::from(AggregateStoreError::ConcurrentModification(
                    handle,
                )));
            }
        }

        let stored_command_builder = StoredCommandBuilder::new(&cmd, latest.version());

        match latest.process_command(cmd) {
            Err(e) => {
                let stored_command = stored_command_builder.finish_with_error(&e);
                self.store
                    .store_command(stored_command)
                    .map_err(AggregateStoreError::KeyStoreError)?;
                Err(e)
            }
            Ok(events) => {
                if events.is_empty() {
                    Ok(latest)
                } else {
                    let agg = Arc::make_mut(&mut latest);

                    // Using a lock on the hashmap here to ensure that all updates happen sequentially.
                    // It would be better to get a lock only for this specific aggregate. So it may be
                    // worth rethinking the structure.
                    //
                    // That said.. saving and applying events is really quick, so this should not hurt
                    // performance much.
                    //
                    // Also note that we don't need the lock to update the inner arc in the cache. We
                    // just need it to be in scope until we are done updating.
                    let mut cache = self.cache.write().unwrap();

                    // It should be impossible to get events for the wrong aggregate, and the wrong
                    // versions, because we are doing the update here inside the outer lock, and aggregates
                    // generally do not lie about who do they are.
                    //
                    // Still.. some defensive coding in case we do have some issue. Double check that the
                    // events are for this aggregate, and are a contiguous sequence of version starting with
                    // this version.
                    let version_before = agg.version();
                    let nr_events = events.len() as u64;
                    for i in 0..nr_events {
                        let event = &events[i as usize];
                        if event.version() != version_before + i || event.handle() != &handle {
                            return Err(A::Error::from(
                                AggregateStoreError::WrongEventForAggregate,
                            ));
                        }
                    }

                    // Time to start saving things.
                    let stored_command =
                        stored_command_builder.finish_with_events(events.as_slice());
                    self.store
                        .store_command(stored_command)
                        .map_err(AggregateStoreError::KeyStoreError)?;

                    for event in &events {
                        self.store
                            .store_event(event)
                            .map_err(AggregateStoreError::KeyStoreError)?;

                        agg.apply(event.clone());
                        if agg.version() % SNAPSHOT_FREQ == 0 {
                            self.store
                                .store_aggregate(&handle, agg)
                                .map_err(AggregateStoreError::KeyStoreError)?;
                        }
                    }

                    cache.insert(handle, Arc::new(agg.clone()));

                    // Only send this to listeners after everything has been saved.
                    for event in events {
                        for listener in &self.listeners {
                            listener.as_ref().listen(agg, &event);
                        }
                    }

                    Ok(latest)
                }
            }
        }
    }

    fn has(&self, id: &Handle) -> bool {
        let _lock = self.outer_lock.read().unwrap();
        self.store.has_aggregate(id)
    }

    fn list(&self) -> Vec<Handle> {
        let _lock = self.outer_lock.read().unwrap();
        self.store.aggregates()
    }

    fn add_listener<L: EventListener<A>>(&mut self, listener: Arc<L>) {
        let _lock = self.outer_lock.write().unwrap();
        self.listeners.push(listener)
    }

    fn history(&self, id: &Handle) -> StoreResult<AggregateHistory<A>> {
        self.store
            .history::<A>(id)
            .map_err(AggregateStoreError::KeyStoreError)
    }
}