Skip to main content

atomr_patterns/cqrs/
projection.rs

1//! [`ProjectionHandle`] — typed handle to a reader's projection state.
2//!
3//! Returned from [`super::CqrsBuilder::with_reader`]. Hold on to it; the
4//! reader runner spawned by `materialize` updates the same `Arc` you
5//! get here.
6
7use std::sync::atomic::{AtomicU64, Ordering};
8use std::sync::Arc;
9
10use tokio::sync::RwLock;
11
12/// Read-only access to a projection's current state and offset.
13///
14/// `P` is the user's projection type (the read model). Cloning a handle
15/// is cheap — the underlying state is shared.
16pub struct ProjectionHandle<P> {
17    pub(crate) state: Arc<RwLock<P>>,
18    pub(crate) offset: Arc<AtomicU64>,
19}
20
21impl<P> Clone for ProjectionHandle<P> {
22    fn clone(&self) -> Self {
23        Self { state: self.state.clone(), offset: self.offset.clone() }
24    }
25}
26
27impl<P: Send + Sync + 'static> ProjectionHandle<P> {
28    /// Highest journal sequence number the runner has applied.
29    /// Useful for tests that wait until the projection has caught up.
30    pub fn offset(&self) -> u64 {
31        self.offset.load(Ordering::Acquire)
32    }
33
34    /// Take a read lock on the projection state.
35    pub async fn snapshot(&self) -> tokio::sync::RwLockReadGuard<'_, P> {
36        self.state.read().await
37    }
38
39    /// Apply a closure to the projection state under a read lock and
40    /// return the result. Convenience wrapper around [`Self::snapshot`].
41    pub async fn read<R>(&self, f: impl FnOnce(&P) -> R) -> R {
42        let guard = self.state.read().await;
43        f(&*guard)
44    }
45}