1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
use crate::{
    Cache, CacheConnection, CacheRecord, CacheRepo, CacheRepoError, EventConnection, EventKind,
    EventLog, EventRecord, EventRepo, EventRepoError, InterpidRepo, IntoCache, IntoEvent,
};

/// A connection to an intrepid repository that combines the functionality of a
/// cache and an event connection.
#[derive(Clone, Debug)]
pub struct IntrepidConnection<Repo> {
    /// The underlying repository.
    pub repo: Repo,
}

impl<Repo> IntrepidConnection<Repo>
where
    Repo: InterpidRepo + Clone + Send + Sync + 'static,
{
    /// Create a new connection.
    pub fn new(repo: Repo) -> Self {
        Self { repo }
    }

    /// Accept a stream name and marker stream name, and return an event connection
    pub fn event_connection(
        &self,
        target_stream_name: impl AsRef<str> + Send,
        own_stream_name: impl AsRef<str> + Send,
    ) -> EventConnection<Repo> {
        EventConnection::new(self.repo.clone(), target_stream_name, own_stream_name)
    }

    /// Get a cache connection
    pub fn cache_connection(&self) -> CacheConnection<Repo> {
        CacheConnection::new(self.repo.clone())
    }

    /// Consume the connection and return the inner repository
    pub fn into_inner(self) -> Repo {
        self.repo
    }
}

#[async_trait::async_trait]
impl<Repo> CacheRepo for IntrepidConnection<Repo>
where
    Repo: InterpidRepo + Send + Sync,
{
    async fn get_memo(
        &self,
        uri: impl AsRef<str> + Send,
    ) -> Result<Cache<CacheRecord>, CacheRepoError> {
        self.repo.get_memo(uri).await
    }

    async fn set_memo<CacheCandidate>(
        &self,
        record: CacheCandidate,
    ) -> Result<CacheRecord, CacheRepoError>
    where
        CacheCandidate: IntoCache + Send,
    {
        self.repo.set_memo(record).await
    }
}

#[async_trait::async_trait]
impl<Repo> EventRepo for IntrepidConnection<Repo>
where
    Repo: InterpidRepo + Send + Sync,
{
    async fn entries_since_position(
        &self,
        stream_name: impl AsRef<str> + Send,
        position: i64,
    ) -> Result<EventLog, EventRepoError> {
        self.repo
            .entries_since_position(stream_name, position)
            .await
    }

    async fn publish<EventCandidate>(&self, event: EventCandidate) -> Result<(), EventRepoError>
    where
        EventCandidate: IntoEvent + Send,
    {
        self.repo.publish(event).await
    }

    async fn last(
        &self,
        stream_name: impl AsRef<str> + Send,
        kind: EventKind,
    ) -> Option<EventRecord> {
        self.repo.last(stream_name, kind).await
    }
}