use super::flow as projection_flow;
use super::Freshness;
use crate::event::EventSourced;
use crate::store::delivery::subscription::Subscription;
use crate::store::{Open, Store, StoreError};
use std::sync::Arc;
#[derive(Debug)]
#[non_exhaustive]
pub enum WatcherError {
StoreClosed,
Store(StoreError),
}
impl std::fmt::Display for WatcherError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::StoreClosed => write!(
f,
"projection watcher stopped: underlying notification channel closed"
),
Self::Store(e) => write!(f, "projection watcher failed: {e}"),
}
}
}
impl std::error::Error for WatcherError {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
match self {
Self::StoreClosed => None,
Self::Store(e) => Some(e),
}
}
}
impl From<StoreError> for WatcherError {
fn from(e: StoreError) -> Self {
Self::Store(e)
}
}
pub struct ProjectionWatcher<T> {
sub: Subscription,
store: Arc<Store<Open>>,
entity: String,
freshness: Freshness,
last_delivered_generation: u64,
pending_initial_check: bool,
_phantom: std::marker::PhantomData<T>,
}
impl<T> ProjectionWatcher<T> {
pub(crate) fn new(
sub: Subscription,
store: Arc<Store<Open>>,
entity: String,
freshness: Freshness,
last_seen_generation: u64,
pending_initial_check: bool,
) -> Self {
Self {
sub,
store,
entity,
freshness,
last_delivered_generation: last_seen_generation,
pending_initial_check,
_phantom: std::marker::PhantomData,
}
}
}
impl<T> ProjectionWatcher<T>
where
T: EventSourced + serde::Serialize + serde::de::DeserializeOwned + 'static,
T::Input: projection_flow::ReplayInput,
{
fn wait_for_check_or_notification(&mut self) -> Result<(), WatcherError> {
if self.pending_initial_check {
self.pending_initial_check = false;
return Ok(());
}
self.sub.recv().map(|_| ()).ok_or(WatcherError::StoreClosed)
}
fn project_next_change(&self) -> Result<Option<(u64, Option<T>)>, WatcherError> {
projection_flow::project_if_changed::<T, Open>(
&self.store,
&self.entity,
self.last_delivered_generation,
&self.freshness,
)
.map_err(WatcherError::from)
}
pub fn recv(&mut self) -> Result<(u64, Option<T>), WatcherError> {
loop {
self.wait_for_check_or_notification()?;
match self.project_next_change()? {
Some((returned_gen, projected)) => {
if returned_gen <= self.last_delivered_generation {
continue;
}
self.last_delivered_generation = returned_gen;
return Ok((returned_gen, projected));
}
None => {
continue;
}
}
}
}
#[doc(hidden)]
pub fn subscription(&self) -> &Subscription {
&self.sub
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::coordinate::Coordinate;
use crate::event::{Event, EventKind, JsonValueInput};
use crate::store::StoreConfig;
use std::sync::mpsc;
use std::time::Duration;
use tempfile::TempDir;
#[derive(Default, Debug, serde::Serialize, serde::Deserialize, PartialEq)]
struct CountAll(u64);
impl EventSourced for CountAll {
type Input = JsonValueInput;
fn from_events(events: &[Event<serde_json::Value>]) -> Option<Self> {
(!events.is_empty()).then_some(Self(events.len() as u64))
}
fn apply_event(&mut self, _event: &Event<serde_json::Value>) {
self.0 += 1;
}
fn relevant_event_kinds() -> &'static [EventKind] {
&[]
}
}
#[test]
fn watcher_error_display_names_terminal_and_store_errors() {
assert_eq!(
WatcherError::StoreClosed.to_string(),
"projection watcher stopped: underlying notification channel closed",
"PROPERTY: terminal watcher closure should remain visible in Display output"
);
let store_error = StoreError::Configuration("bad watcher config".to_owned());
let error = WatcherError::Store(store_error);
let display = error.to_string();
assert!(
display.contains("projection watcher failed"),
"PROPERTY: wrapped store errors should retain watcher context in Display output"
);
assert!(
display.contains("bad watcher config"),
"PROPERTY: wrapped store errors should retain their inner diagnostic message"
);
assert!(
std::error::Error::source(&error).is_some(),
"PROPERTY: wrapped store errors should remain available through source()"
);
}
#[test]
fn recv_performs_pending_initial_check_before_blocking_on_subscription() {
let dir = TempDir::new().expect("temp dir");
let store = Arc::new(Store::open(StoreConfig::new(dir.path())).expect("open"));
let coord = Coordinate::new("watch:startup-race", "watch:scope").expect("coord");
let sub = store.subscribe_lossy(&crate::coordinate::Region::entity("watch:startup-race"));
store
.append(
&coord,
EventKind::custom(0xF, 1),
&serde_json::json!({"n": 1}),
)
.expect("append");
let mut watcher = ProjectionWatcher::<CountAll>::new(
sub,
Arc::clone(&store),
"watch:startup-race".to_owned(),
Freshness::Consistent,
0,
true,
);
let (tx, rx) = mpsc::channel();
std::thread::Builder::new()
.name("projection-watch-pending-check-test".to_owned())
.spawn(move || {
let result = watcher
.recv()
.map(|(generation, state)| (generation, state.map(|s| s.0)));
let _ = tx.send(result);
})
.expect("spawn watcher test helper thread");
let result = rx
.recv_timeout(Duration::from_secs(1))
.expect("pending initial check should return without a second append")
.expect("watcher recv");
assert!(
result.0 > 0,
"generation should advance on the first append"
);
assert_eq!(result.1, Some(1));
}
}