pulsedb/watch/types.rs
1//! Public types for the watch system.
2//!
3//! These types define the event model for real-time experience notifications
4//! and the [`WatchStream`] adapter that bridges sync crossbeam channels to
5//! the async [`futures_core::Stream`] interface.
6
7use std::pin::Pin;
8use std::sync::Arc;
9use std::task::{Context, Poll};
10
11use atomic_waker::AtomicWaker;
12use crossbeam_channel::Receiver;
13use futures_core::Stream;
14
15use crate::experience::ExperienceType;
16use crate::storage::schema::{WatchEventRecord, WatchEventTypeTag};
17use crate::types::{CollectiveId, ExperienceId, Timestamp};
18
19/// An event emitted when an experience changes.
20///
21/// Watch events are delivered in-process via bounded crossbeam channels.
22/// Each event identifies the experience, collective, mutation type, and
23/// when it occurred.
24///
25/// # Example
26///
27/// ```rust,no_run
28/// # #[tokio::main]
29/// # async fn main() -> pulsedb::Result<()> {
30/// # let dir = tempfile::tempdir().unwrap();
31/// # let db = pulsedb::PulseDB::open(dir.path().join("test.db"), pulsedb::Config::default())?;
32/// # let collective_id = db.create_collective("example")?;
33/// use futures::StreamExt;
34/// use pulsedb::WatchEventType;
35///
36/// let mut stream = db.watch_experiences(collective_id)?;
37/// while let Some(event) = stream.next().await {
38/// match event.event_type {
39/// WatchEventType::Created => println!("New experience: {}", event.experience_id),
40/// WatchEventType::Deleted => println!("Removed: {}", event.experience_id),
41/// _ => {}
42/// }
43/// }
44/// # Ok(())
45/// # }
46/// ```
47#[derive(Clone, Debug)]
48pub struct WatchEvent {
49 /// The experience that changed.
50 pub experience_id: ExperienceId,
51
52 /// The collective the experience belongs to.
53 pub collective_id: CollectiveId,
54
55 /// What kind of change occurred.
56 pub event_type: WatchEventType,
57
58 /// When the change occurred.
59 pub timestamp: Timestamp,
60}
61
62/// The kind of change that triggered a [`WatchEvent`].
63#[derive(Clone, Copy, Debug, PartialEq, Eq)]
64pub enum WatchEventType {
65 /// A new experience was recorded.
66 Created,
67
68 /// An existing experience was modified (fields updated or reinforced).
69 Updated,
70
71 /// An experience was soft-deleted (archived).
72 Archived,
73
74 /// An experience was permanently deleted.
75 Deleted,
76}
77
78// ============================================================================
79// Conversions between storage tags and public types
80// ============================================================================
81
82impl From<WatchEventType> for WatchEventTypeTag {
83 fn from(value: WatchEventType) -> Self {
84 match value {
85 WatchEventType::Created => Self::Created,
86 WatchEventType::Updated => Self::Updated,
87 WatchEventType::Archived => Self::Archived,
88 WatchEventType::Deleted => Self::Deleted,
89 }
90 }
91}
92
93impl From<WatchEventTypeTag> for WatchEventType {
94 fn from(value: WatchEventTypeTag) -> Self {
95 match value {
96 WatchEventTypeTag::Created => Self::Created,
97 WatchEventTypeTag::Updated => Self::Updated,
98 WatchEventTypeTag::Archived => Self::Archived,
99 WatchEventTypeTag::Deleted => Self::Deleted,
100 }
101 }
102}
103
104impl From<WatchEventRecord> for WatchEvent {
105 fn from(record: WatchEventRecord) -> Self {
106 Self {
107 experience_id: ExperienceId::from_bytes(record.entity_id),
108 collective_id: CollectiveId::from_bytes(record.collective_id),
109 event_type: record.event_type.into(),
110 timestamp: Timestamp::from_millis(record.timestamp_ms),
111 }
112 }
113}
114
115/// Filter for narrowing which watch events a subscriber receives.
116///
117/// All fields are optional. When multiple fields are set, they are combined
118/// with AND logic: an event must match **all** specified criteria.
119///
120/// Filters are applied on the sender side before channel delivery, so
121/// subscribers only receive events they care about.
122///
123/// # Example
124///
125/// ```rust
126/// # fn main() -> pulsedb::Result<()> {
127/// # let dir = tempfile::tempdir().unwrap();
128/// # let db = pulsedb::PulseDB::open(dir.path().join("test.db"), pulsedb::Config::default())?;
129/// # let collective_id = db.create_collective("example")?;
130/// use pulsedb::WatchFilter;
131///
132/// let filter = WatchFilter {
133/// domains: Some(vec!["security".to_string()]),
134/// min_importance: Some(0.7),
135/// ..Default::default()
136/// };
137/// let stream = db.watch_experiences_filtered(collective_id, filter)?;
138/// # Ok(())
139/// # }
140/// ```
141#[derive(Clone, Debug, Default)]
142pub struct WatchFilter {
143 /// Only emit events for experiences in these domains.
144 /// If `None`, all domains match.
145 pub domains: Option<Vec<String>>,
146
147 /// Only emit events for these experience types.
148 /// If `None`, all types match.
149 pub experience_types: Option<Vec<ExperienceType>>,
150
151 /// Only emit events for experiences with importance >= this threshold.
152 /// If `None`, all importance levels match.
153 pub min_importance: Option<f32>,
154}
155
156/// A stream of [`WatchEvent`] values backed by a crossbeam channel.
157///
158/// Implements [`futures_core::Stream`] for async consumption. The stream
159/// yields `Some(event)` for each change and returns `None` when all
160/// senders are dropped (database closed or subscriber removed).
161///
162/// Dropping a `WatchStream` automatically unregisters the subscriber
163/// from the watch service, preventing memory leaks.
164pub struct WatchStream {
165 /// The receiving end of the crossbeam bounded channel.
166 pub(crate) receiver: Receiver<WatchEvent>,
167
168 /// Shared waker that the sender side calls `.wake()` on after `try_send`.
169 pub(crate) waker: Arc<AtomicWaker>,
170
171 /// Cleanup function called on drop to remove subscriber from registry.
172 pub(crate) cleanup: Option<Box<dyn FnOnce() + Send + Sync>>,
173}
174
175impl Stream for WatchStream {
176 type Item = WatchEvent;
177
178 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
179 // Register the waker FIRST so we don't miss a wake between
180 // try_recv and returning Pending.
181 self.waker.register(cx.waker());
182
183 match self.receiver.try_recv() {
184 Ok(event) => Poll::Ready(Some(event)),
185 Err(crossbeam_channel::TryRecvError::Empty) => Poll::Pending,
186 Err(crossbeam_channel::TryRecvError::Disconnected) => Poll::Ready(None),
187 }
188 }
189}
190
191impl Drop for WatchStream {
192 fn drop(&mut self) {
193 if let Some(cleanup) = self.cleanup.take() {
194 cleanup();
195 }
196 }
197}
198
199// Safety: crossbeam Receiver is Send, AtomicWaker is Send+Sync, cleanup is Send+Sync.
200// WatchStream is not Sync (Receiver is not Sync), which is correct for Stream consumers.
201impl std::fmt::Debug for WatchStream {
202 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
203 f.debug_struct("WatchStream")
204 .field("pending_events", &self.receiver.len())
205 .finish_non_exhaustive()
206 }
207}