eventually_util/inmemory/projector.rs
1use std::error::Error as StdError;
2use std::sync::Arc;
3
4use eventually_core::projection::Projection;
5use eventually_core::subscription::Subscription;
6
7use futures::stream::StreamExt;
8use futures::TryFutureExt;
9
10use crate::sync::RwLock;
11
12/// A `Projector` manages the state of a single [`Projection`]
13/// by opening a long-running stream of all events coming from the [`EventStore`].
14///
15/// New instances of a `Projector` are obtainable through a [`ProjectorBuilder`]
16/// instance.
17///
18/// The `Projector` will start updating the [`Projection`] state when [`run`]
19/// is called.
20///
21/// At each update, the `Projector` will broadcast the latest version of the
22/// [`Projection`] on a `Stream` obtainable through [`watch`].
23///
24/// [`Projection`]: ../../../eventually-core/projection/trait.Projection.html
25/// [`EventStore`]: ../../../eventually-core/store/trait.EventStore.html
26/// [`ProjectorBuilder`]: struct.ProjectorBuilder.html
27/// [`run`]: struct.Projector.html#method.run
28/// [`watch`]: struct.Projector.html#method.watch
29pub struct Projector<P, S>
30where
31 P: Projection,
32{
33 projection: Arc<RwLock<P>>,
34 subscription: S,
35}
36
37impl<P, S> Projector<P, S>
38where
39 P: Projection,
40 <P as Projection>::SourceId: std::fmt::Debug,
41 <P as Projection>::Event: std::fmt::Debug,
42 S: Subscription<SourceId = P::SourceId, Event = P::Event>,
43 // NOTE: these bounds are needed for anyhow::Error conversion.
44 <P as Projection>::Error: StdError + Send + Sync + 'static,
45 <S as Subscription>::Error: StdError + Send + Sync + 'static,
46{
47 /// Create a new Projector from the provided [`Projection`] and
48 /// [`Subscription`] values.
49 ///
50 /// [`Projection`]: ../../eventually-core/projection/trait.Projection.html
51 /// [`Subscription`]: ../../eventually-core/subscription/trait.Subscription.html
52 pub fn new(projection: Arc<RwLock<P>>, subscription: S) -> Self {
53 Self {
54 projection,
55 subscription,
56 }
57 }
58
59 /// Starts the update of the `Projection` by processing all the events
60 /// coming from the [`EventStore`].
61 ///
62 /// [`EventStore`]: ../../../eventually-core/store/trait.EventStore.html
63 pub async fn run(&mut self) -> anyhow::Result<()> {
64 #[cfg(feature = "with-tracing")]
65 let projection_type = std::any::type_name::<P>();
66
67 let mut stream = self.subscription.resume().await?;
68
69 while let Some(result) = stream.next().await {
70 let event = result?;
71 let sequence_number = event.sequence_number();
72
73 #[cfg(feature = "with-tracing")]
74 tracing::debug!(
75 sequence_number = sequence_number,
76 projection_type = projection_type,
77 event = ?event,
78 "Projecting new event",
79 );
80
81 self.projection
82 .write()
83 .await
84 .project(event)
85 .inspect_ok(|_| {
86 #[cfg(feature = "with-tracing")]
87 tracing::debug!(
88 sequence_number = sequence_number,
89 projection_type = projection_type,
90 "Projection succeeded"
91 );
92 })
93 .inspect_err(
94 #[allow(unused_variables)]
95 {
96 |e| {
97 #[cfg(feature = "with-tracing")]
98 tracing::error!(
99 error = %e,
100 sequence_number = sequence_number,
101 projection_type = projection_type,
102 "Projection failed"
103 )
104 }
105 },
106 )
107 .await
108 .map_err(anyhow::Error::from)?;
109
110 self.subscription
111 .checkpoint(sequence_number)
112 .inspect_ok(|_| {
113 #[cfg(feature = "with-tracing")]
114 tracing::debug!(
115 sequence_number = sequence_number,
116 projection_type = projection_type,
117 "Subscription checkpointed"
118 );
119 })
120 .inspect_err(
121 #[allow(unused_variables)]
122 {
123 |e| {
124 #[cfg(feature = "with-tracing")]
125 tracing::error!(
126 error = %e,
127 sequence_number = sequence_number,
128 projection_type = projection_type,
129 "Failed to checkpoint subscription"
130 )
131 }
132 },
133 )
134 .await
135 .map_err(anyhow::Error::from)?;
136 }
137
138 Ok(())
139 }
140}