use crate::core::error::Result;
use crate::core::id::{EdgeId, NodeId, VersionId};
use crate::core::temporal::Timestamp;
use std::sync::Arc;
#[cfg(feature = "observability")]
use tracing;
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum StorageEvent {
NodeAnchorCreated {
version_id: VersionId,
node_id: NodeId,
timestamp: Timestamp,
},
EdgeAnchorCreated {
version_id: VersionId,
edge_id: EdgeId,
timestamp: Timestamp,
},
NodeVersionCreated {
version_id: VersionId,
node_id: NodeId,
timestamp: Timestamp,
is_anchor: bool,
},
EdgeVersionCreated {
version_id: VersionId,
edge_id: EdgeId,
timestamp: Timestamp,
is_anchor: bool,
},
}
impl StorageEvent {
pub fn timestamp(&self) -> Timestamp {
match self {
StorageEvent::NodeAnchorCreated { timestamp, .. }
| StorageEvent::EdgeAnchorCreated { timestamp, .. }
| StorageEvent::NodeVersionCreated { timestamp, .. }
| StorageEvent::EdgeVersionCreated { timestamp, .. } => *timestamp,
}
}
pub fn is_anchor_event(&self) -> bool {
matches!(
self,
StorageEvent::NodeAnchorCreated { .. } | StorageEvent::EdgeAnchorCreated { .. }
)
}
}
pub trait StorageObserver: Send + Sync {
fn on_event(&self, event: &StorageEvent) -> Result<()>;
fn interested_in(&self, event: &StorageEvent) -> bool {
let _ = event;
true
}
}
pub type Observer = Arc<dyn StorageObserver>;
pub fn notify_observers(observers: &[Observer], event: &StorageEvent) {
for observer in observers {
if !observer.interested_in(event) {
continue;
}
if let Err(e) = observer.on_event(event) {
#[cfg(feature = "observability")]
{
use crate::core::error::Error;
match &e {
Error::Vector(ve) => {
tracing::warn!("Observer error for event {:?}: {}", event, ve);
}
_ => {
tracing::warn!("Observer error for event {:?}: {:?}", event, e);
}
}
}
#[cfg(not(feature = "observability"))]
{
let _ = e; }
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::core::id::{NodeId, VersionId};
use std::sync::Mutex;
use std::sync::atomic::{AtomicUsize, Ordering};
struct CountingObserver {
count: AtomicUsize,
}
impl StorageObserver for CountingObserver {
fn on_event(&self, _event: &StorageEvent) -> Result<()> {
self.count.fetch_add(1, Ordering::SeqCst);
Ok(())
}
}
struct AnchorOnlyObserver {
count: AtomicUsize,
}
impl StorageObserver for AnchorOnlyObserver {
fn on_event(&self, _event: &StorageEvent) -> Result<()> {
self.count.fetch_add(1, Ordering::SeqCst);
Ok(())
}
fn interested_in(&self, event: &StorageEvent) -> bool {
event.is_anchor_event()
}
}
struct CollectingObserver {
events: Mutex<Vec<StorageEvent>>,
}
impl StorageObserver for CollectingObserver {
fn on_event(&self, event: &StorageEvent) -> Result<()> {
self.events.lock().unwrap().push(event.clone());
Ok(())
}
}
#[test]
fn test_notify_observers() {
let observer = Arc::new(CountingObserver {
count: AtomicUsize::new(0),
});
let observers: Vec<Observer> = vec![Arc::clone(&observer) as Observer];
let event = StorageEvent::NodeAnchorCreated {
version_id: VersionId::new(1).unwrap(),
node_id: NodeId::new(1).unwrap(),
timestamp: 1000.into(),
};
notify_observers(&observers, &event);
assert_eq!(observer.count.load(Ordering::SeqCst), 1);
}
#[test]
fn test_multiple_observers() {
let observer1 = Arc::new(CountingObserver {
count: AtomicUsize::new(0),
});
let observer2 = Arc::new(CountingObserver {
count: AtomicUsize::new(0),
});
let observers: Vec<Observer> = vec![
Arc::clone(&observer1) as Observer,
Arc::clone(&observer2) as Observer,
];
let event = StorageEvent::NodeAnchorCreated {
version_id: VersionId::new(1).unwrap(),
node_id: NodeId::new(1).unwrap(),
timestamp: 1000.into(),
};
notify_observers(&observers, &event);
assert_eq!(observer1.count.load(Ordering::SeqCst), 1);
assert_eq!(observer2.count.load(Ordering::SeqCst), 1);
}
#[test]
fn test_filtered_observer() {
let anchor_observer = Arc::new(AnchorOnlyObserver {
count: AtomicUsize::new(0),
});
let observers: Vec<Observer> = vec![Arc::clone(&anchor_observer) as Observer];
let anchor_event = StorageEvent::NodeAnchorCreated {
version_id: VersionId::new(1).unwrap(),
node_id: NodeId::new(1).unwrap(),
timestamp: 1000.into(),
};
notify_observers(&observers, &anchor_event);
assert_eq!(anchor_observer.count.load(Ordering::SeqCst), 1);
let version_event = StorageEvent::NodeVersionCreated {
version_id: VersionId::new(2).unwrap(),
node_id: NodeId::new(1).unwrap(),
timestamp: 2000.into(),
is_anchor: false,
};
notify_observers(&observers, &version_event);
assert_eq!(anchor_observer.count.load(Ordering::SeqCst), 1); }
#[test]
fn test_event_timestamp() {
let event = StorageEvent::NodeAnchorCreated {
version_id: VersionId::new(1).unwrap(),
node_id: NodeId::new(1).unwrap(),
timestamp: 12345.into(),
};
assert_eq!(event.timestamp().wallclock(), 12345);
}
#[test]
fn test_event_collection() {
let collector = Arc::new(CollectingObserver {
events: Mutex::new(Vec::new()),
});
let observers: Vec<Observer> = vec![Arc::clone(&collector) as Observer];
let event1 = StorageEvent::NodeAnchorCreated {
version_id: VersionId::new(1).unwrap(),
node_id: NodeId::new(1).unwrap(),
timestamp: 1000.into(),
};
let event2 = StorageEvent::NodeAnchorCreated {
version_id: VersionId::new(2).unwrap(),
node_id: NodeId::new(2).unwrap(),
timestamp: 2000.into(),
};
notify_observers(&observers, &event1);
notify_observers(&observers, &event2);
let collected = collector.events.lock().unwrap();
assert_eq!(collected.len(), 2);
assert_eq!(collected[0], event1);
assert_eq!(collected[1], event2);
}
#[test]
#[should_panic(expected = "Observer panic!")]
fn test_notify_observers_propagates_panic() {
struct PanickingObserver;
impl StorageObserver for PanickingObserver {
fn on_event(&self, _event: &StorageEvent) -> Result<()> {
panic!("Observer panic!");
}
}
let observer = Arc::new(PanickingObserver);
let observers: Vec<Observer> = vec![observer as Observer];
let event = StorageEvent::NodeAnchorCreated {
version_id: VersionId::new(1).unwrap(),
node_id: NodeId::new(1).unwrap(),
timestamp: 1000.into(),
};
notify_observers(&observers, &event);
}
}
#[cfg(test)]
mod sentry_tests {
use super::*;
use crate::core::id::{NodeId, VersionId};
use std::sync::atomic::{AtomicUsize, Ordering};
struct TrackingObserver {
interested: bool,
should_fail: bool,
call_count: AtomicUsize,
}
impl TrackingObserver {
fn new(interested: bool, should_fail: bool) -> Self {
Self {
interested,
should_fail,
call_count: AtomicUsize::new(0),
}
}
fn count(&self) -> usize {
self.call_count.load(Ordering::SeqCst)
}
}
impl StorageObserver for TrackingObserver {
fn on_event(&self, _event: &StorageEvent) -> Result<()> {
self.call_count.fetch_add(1, Ordering::SeqCst);
if self.should_fail {
Err(crate::core::error::Error::Other(
"Intentional failure".into(),
))
} else {
Ok(())
}
}
fn interested_in(&self, _event: &StorageEvent) -> bool {
self.interested
}
}
#[test]
fn test_sentry_filtering_does_not_block_subsequent_observers() {
let uninterested = Arc::new(TrackingObserver::new(false, false));
let interested = Arc::new(TrackingObserver::new(true, false));
let observers: Vec<Observer> = vec![
Arc::clone(&uninterested) as Observer,
Arc::clone(&interested) as Observer,
];
let event = StorageEvent::NodeAnchorCreated {
version_id: VersionId::new(1).unwrap(),
node_id: NodeId::new(1).unwrap(),
timestamp: 1000.into(),
};
notify_observers(&observers, &event);
assert_eq!(
uninterested.count(),
0,
"Uninterested observer should not be called"
);
assert_eq!(
interested.count(),
1,
"Subsequent interested observer SHOULD be called"
);
}
#[test]
fn test_sentry_error_does_not_block_subsequent_observers() {
let failing = Arc::new(TrackingObserver::new(true, true));
let success = Arc::new(TrackingObserver::new(true, false));
let observers: Vec<Observer> = vec![
Arc::clone(&failing) as Observer,
Arc::clone(&success) as Observer,
];
let event = StorageEvent::NodeAnchorCreated {
version_id: VersionId::new(1).unwrap(),
node_id: NodeId::new(1).unwrap(),
timestamp: 1000.into(),
};
notify_observers(&observers, &event);
assert_eq!(failing.count(), 1, "Failing observer should be called");
assert_eq!(
success.count(),
1,
"Subsequent observer SHOULD be called despite previous error"
);
}
}