atomr_patterns/cqrs/projection.rs
1//! [`ProjectionHandle`] — typed handle to a reader's projection state.
2//!
3//! Returned from [`super::CqrsBuilder::with_reader`]. Hold on to it; the
4//! reader runner spawned by `materialize` updates the same `Arc` you
5//! get here.
6
7use std::sync::atomic::{AtomicU64, Ordering};
8use std::sync::Arc;
9
10use tokio::sync::RwLock;
11
12/// Read-only access to a projection's current state and offset.
13///
14/// `P` is the user's projection type (the read model). Cloning a handle
15/// is cheap — the underlying state is shared.
16pub struct ProjectionHandle<P> {
17 pub(crate) state: Arc<RwLock<P>>,
18 pub(crate) offset: Arc<AtomicU64>,
19}
20
21impl<P> Clone for ProjectionHandle<P> {
22 fn clone(&self) -> Self {
23 Self { state: self.state.clone(), offset: self.offset.clone() }
24 }
25}
26
27impl<P: Send + Sync + 'static> ProjectionHandle<P> {
28 /// Highest journal sequence number the runner has applied.
29 /// Useful for tests that wait until the projection has caught up.
30 pub fn offset(&self) -> u64 {
31 self.offset.load(Ordering::Acquire)
32 }
33
34 /// Take a read lock on the projection state.
35 pub async fn snapshot(&self) -> tokio::sync::RwLockReadGuard<'_, P> {
36 self.state.read().await
37 }
38
39 /// Apply a closure to the projection state under a read lock and
40 /// return the result. Convenience wrapper around [`Self::snapshot`].
41 pub async fn read<R>(&self, f: impl FnOnce(&P) -> R) -> R {
42 let guard = self.state.read().await;
43 f(&*guard)
44 }
45}