intrepid_model/events/
event_repo.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
use super::{event::EventLogReadMarker, Event, EventKind, EventLog, EventRecord, IntoEvent};

/// Errors that can occur when interacting with an event repository.
#[derive(Debug, thiserror::Error)]
pub enum EventRepoError {
    /// An error occurred while trying to publish an event record.
    #[error("failed to publish event")]
    PublishFailed,
}

/// The trait for an event repository. Implement this trait to provide an event
/// repository interface for any given event storage implementation.
#[async_trait::async_trait]
pub trait EventRepo {
    /// Get all entries in the stream since the given position.
    async fn entries_since_position(
        &self,
        stream_name: impl AsRef<str> + Send,
        position: i64,
    ) -> Result<EventLog, EventRepoError>;

    /// Publish an event to the target stream.
    async fn publish<EventCandidate>(&self, event: EventCandidate) -> Result<(), EventRepoError>
    where
        EventCandidate: IntoEvent + Send;

    /// Get the last event in the stream.
    async fn last(
        &self,
        stream_name: impl AsRef<str> + Send,
        kind: EventKind,
    ) -> Option<EventRecord>;

    /// Log the last read position for a subscriber. This leverages markers.
    async fn log_last_read(
        &self,
        stream_name: impl AsRef<str> + Send,
        subscriber_name: impl AsRef<str> + Send,
        position: i64,
    ) -> Result<(), EventRepoError> {
        self.publish(Event::read_marker(stream_name, subscriber_name, position))
            .await?;

        Ok(())
    }

    /// Get the last read position for a subscriber.
    async fn last_read_position(&self, stream_name: impl AsRef<str> + Send) -> i64 {
        self.last(stream_name, EventKind::Marker)
            .await
            // TODO: Can this be something other than serde_json? It's a bit heavy for this.
            .and_then(|message| serde_json::from_slice::<EventLogReadMarker>(&message.data).ok())
            .map(|marker| marker.position)
            .unwrap_or(-1_i64)
    }

    /// Get the last event position in the stream.
    async fn last_event_position(&self, stream_name: impl AsRef<str> + Send) -> i64 {
        self.last(stream_name, EventKind::Entry)
            .await
            .map(|message| message.position)
            .unwrap_or(-1_i64)
    }
}