eventually_util/inmemory/
projector.rs

1use std::error::Error as StdError;
2use std::sync::Arc;
3
4use eventually_core::projection::Projection;
5use eventually_core::subscription::Subscription;
6
7use futures::stream::StreamExt;
8use futures::TryFutureExt;
9
10use crate::sync::RwLock;
11
12/// A `Projector` manages the state of a single [`Projection`]
13/// by opening a long-running stream of all events coming from the [`EventStore`].
14///
15/// New instances of a `Projector` are obtainable through a [`ProjectorBuilder`]
16/// instance.
17///
18/// The `Projector` will start updating the [`Projection`] state when [`run`]
19/// is called.
20///
21/// At each update, the `Projector` will broadcast the latest version of the
22/// [`Projection`] on a `Stream` obtainable through [`watch`].
23///
24/// [`Projection`]: ../../../eventually-core/projection/trait.Projection.html
25/// [`EventStore`]: ../../../eventually-core/store/trait.EventStore.html
26/// [`ProjectorBuilder`]: struct.ProjectorBuilder.html
27/// [`run`]: struct.Projector.html#method.run
28/// [`watch`]: struct.Projector.html#method.watch
29pub struct Projector<P, S>
30where
31    P: Projection,
32{
33    projection: Arc<RwLock<P>>,
34    subscription: S,
35}
36
37impl<P, S> Projector<P, S>
38where
39    P: Projection,
40    <P as Projection>::SourceId: std::fmt::Debug,
41    <P as Projection>::Event: std::fmt::Debug,
42    S: Subscription<SourceId = P::SourceId, Event = P::Event>,
43    // NOTE: these bounds are needed for anyhow::Error conversion.
44    <P as Projection>::Error: StdError + Send + Sync + 'static,
45    <S as Subscription>::Error: StdError + Send + Sync + 'static,
46{
47    /// Create a new Projector from the provided [`Projection`] and
48    /// [`Subscription`] values.
49    ///
50    /// [`Projection`]: ../../eventually-core/projection/trait.Projection.html
51    /// [`Subscription`]: ../../eventually-core/subscription/trait.Subscription.html
52    pub fn new(projection: Arc<RwLock<P>>, subscription: S) -> Self {
53        Self {
54            projection,
55            subscription,
56        }
57    }
58
59    /// Starts the update of the `Projection` by processing all the events
60    /// coming from the [`EventStore`].
61    ///
62    /// [`EventStore`]: ../../../eventually-core/store/trait.EventStore.html
63    pub async fn run(&mut self) -> anyhow::Result<()> {
64        #[cfg(feature = "with-tracing")]
65        let projection_type = std::any::type_name::<P>();
66
67        let mut stream = self.subscription.resume().await?;
68
69        while let Some(result) = stream.next().await {
70            let event = result?;
71            let sequence_number = event.sequence_number();
72
73            #[cfg(feature = "with-tracing")]
74            tracing::debug!(
75                sequence_number = sequence_number,
76                projection_type = projection_type,
77                event = ?event,
78                "Projecting new event",
79            );
80
81            self.projection
82                .write()
83                .await
84                .project(event)
85                .inspect_ok(|_| {
86                    #[cfg(feature = "with-tracing")]
87                    tracing::debug!(
88                        sequence_number = sequence_number,
89                        projection_type = projection_type,
90                        "Projection succeeded"
91                    );
92                })
93                .inspect_err(
94                    #[allow(unused_variables)]
95                    {
96                        |e| {
97                            #[cfg(feature = "with-tracing")]
98                            tracing::error!(
99                                error = %e,
100                                sequence_number = sequence_number,
101                                projection_type = projection_type,
102                                "Projection failed"
103                            )
104                        }
105                    },
106                )
107                .await
108                .map_err(anyhow::Error::from)?;
109
110            self.subscription
111                .checkpoint(sequence_number)
112                .inspect_ok(|_| {
113                    #[cfg(feature = "with-tracing")]
114                    tracing::debug!(
115                        sequence_number = sequence_number,
116                        projection_type = projection_type,
117                        "Subscription checkpointed"
118                    );
119                })
120                .inspect_err(
121                    #[allow(unused_variables)]
122                    {
123                        |e| {
124                            #[cfg(feature = "with-tracing")]
125                            tracing::error!(
126                                error = %e,
127                                sequence_number = sequence_number,
128                                projection_type = projection_type,
129                                "Failed to checkpoint subscription"
130                            )
131                        }
132                    },
133                )
134                .await
135                .map_err(anyhow::Error::from)?;
136        }
137
138        Ok(())
139    }
140}