1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
use super::{Commit, CommitError, CommitResult, CommitStore, Event};
use crate::{EntityId, Model};
use async_trait::async_trait;
use futures::lock::Mutex;
use futures::stream::{self, BoxStream, StreamExt, TryStreamExt};
use std::collections::HashMap;
use std::iter;
use std::sync::Arc;

#[derive(Debug)]
pub struct MemStore<M: Model>(Arc<Mutex<HashMap<EntityId, (Commit<M>, Vec<Commit<M>>)>>>);

impl<M: Model> MemStore<M> {
    pub fn new() -> Self {
        MemStore(Arc::new(Mutex::new(HashMap::new())))
    }
}

#[async_trait]
impl<M: Model> CommitStore<M> for MemStore<M> {
    fn keys(&self) -> BoxStream<CommitResult<EntityId>> {
        let map = self.0.clone();
        stream::once(async move {
            let keys = map.lock().await.keys().map(|k| Ok(*k)).collect::<Vec<_>>();
            stream::iter(keys)
        })
        .flatten()
        .boxed()
    }

    fn change_list(&self, id: EntityId) -> BoxStream<CommitResult<Commit<M>>> {
        let map = self.0.clone();
        stream::once(async move {
            let map = map.lock().await;
            let (initial_commit, changes) = map.get(&id).ok_or(CommitError::NotFound)?;
            let changes = changes.to_owned();
            let commits =
                iter::once(Ok(initial_commit.clone())).chain(changes.into_iter().map(Result::Ok));
            Ok(stream::iter(commits))
        })
        .try_flatten()
        .boxed()
    }

    async fn commit(&self, c: Commit<M>) -> Result<(), CommitError> {
        let id = c.event.entity_id();
        let mut entities = self.0.lock().await;
        match c.event {
            Event::Create(_) => {
                entities.insert(id, (c, vec![]));
            }
            Event::Change(_, _) => {
                let (_, updates) = entities.get_mut(&id).ok_or(CommitError::CantChange)?;
                updates.push(c);
            }
        }
        Ok(())
    }
}

impl<M: Model> Clone for MemStore<M> {
    fn clone(&self) -> Self {
        Self(self.0.clone())
    }
}