pulsedb/watch/types.rs
1//! Public types for the watch system.
2//!
3//! These types define the event model for real-time experience notifications
4//! and the [`WatchStream`] adapter that bridges sync crossbeam channels to
5//! the async [`futures_core::Stream`] interface.
6
7use std::pin::Pin;
8use std::sync::Arc;
9use std::task::{Context, Poll};
10
11use atomic_waker::AtomicWaker;
12use crossbeam_channel::Receiver;
13use futures_core::Stream;
14
15use crate::experience::{Experience, ExperienceType};
16use crate::storage::schema::{WatchEventRecord, WatchEventTypeTag};
17use crate::types::{CollectiveId, ExperienceId, Timestamp};
18
19/// An event emitted when an experience changes.
20///
21/// Watch events are delivered in-process via bounded crossbeam channels.
22/// Each event identifies the experience, collective, mutation type, and
23/// when it occurred.
24///
25/// # Example
26///
27/// ```rust,no_run
28/// # #[tokio::main]
29/// # async fn main() -> pulsedb::Result<()> {
30/// # let dir = tempfile::tempdir().unwrap();
31/// # let db = pulsedb::PulseDB::open(dir.path().join("test.db"), pulsedb::Config::default())?;
32/// # let collective_id = db.create_collective("example")?;
33/// use futures::StreamExt;
34/// use pulsedb::WatchEventType;
35///
36/// let mut stream = db.watch_experiences(collective_id)?;
37/// while let Some(event) = stream.next().await {
38/// match event.event_type {
39/// WatchEventType::Created => println!("New experience: {}", event.experience_id),
40/// WatchEventType::Deleted => println!("Removed: {}", event.experience_id),
41/// _ => {}
42/// }
43/// }
44/// # Ok(())
45/// # }
46/// ```
47#[derive(Clone, Debug)]
48pub struct WatchEvent {
49 /// The experience that changed.
50 pub experience_id: ExperienceId,
51
52 /// The collective the experience belongs to.
53 pub collective_id: CollectiveId,
54
55 /// What kind of change occurred.
56 pub event_type: WatchEventType,
57
58 /// When the change occurred.
59 pub timestamp: Timestamp,
60
61 /// The full experience data (enriched event).
62 ///
63 /// Populated for `Created` and `Updated` events when delivered via
64 /// in-process watch subscriptions. `None` for `Deleted` events and
65 /// events reconstructed from WAL records (cross-process polling).
66 ///
67 /// Includes the embedding vector, enabling visualization tools
68 /// (PulseVision) to update 3D positions without a follow-up fetch.
69 pub experience: Option<Experience>,
70}
71
72/// The kind of change that triggered a [`WatchEvent`].
73#[derive(Clone, Copy, Debug, PartialEq, Eq)]
74pub enum WatchEventType {
75 /// A new experience was recorded.
76 Created,
77
78 /// An existing experience was modified (fields updated or reinforced).
79 Updated,
80
81 /// An experience was soft-deleted (archived).
82 Archived,
83
84 /// An experience was permanently deleted.
85 Deleted,
86}
87
88// ============================================================================
89// Conversions between storage tags and public types
90// ============================================================================
91
92impl From<WatchEventType> for WatchEventTypeTag {
93 fn from(value: WatchEventType) -> Self {
94 match value {
95 WatchEventType::Created => Self::Created,
96 WatchEventType::Updated => Self::Updated,
97 WatchEventType::Archived => Self::Archived,
98 WatchEventType::Deleted => Self::Deleted,
99 }
100 }
101}
102
103impl From<WatchEventTypeTag> for WatchEventType {
104 fn from(value: WatchEventTypeTag) -> Self {
105 match value {
106 WatchEventTypeTag::Created => Self::Created,
107 WatchEventTypeTag::Updated => Self::Updated,
108 WatchEventTypeTag::Archived => Self::Archived,
109 WatchEventTypeTag::Deleted => Self::Deleted,
110 }
111 }
112}
113
114impl From<WatchEventRecord> for WatchEvent {
115 fn from(record: WatchEventRecord) -> Self {
116 Self {
117 experience_id: ExperienceId::from_bytes(record.entity_id),
118 collective_id: CollectiveId::from_bytes(record.collective_id),
119 event_type: record.event_type.into(),
120 timestamp: Timestamp::from_millis(record.timestamp_ms),
121 experience: None, // WAL records don't carry full experience data
122 }
123 }
124}
125
126/// Filter for narrowing which watch events a subscriber receives.
127///
128/// All fields are optional. When multiple fields are set, they are combined
129/// with AND logic: an event must match **all** specified criteria.
130///
131/// Filters are applied on the sender side before channel delivery, so
132/// subscribers only receive events they care about.
133///
134/// # Example
135///
136/// ```rust
137/// # fn main() -> pulsedb::Result<()> {
138/// # let dir = tempfile::tempdir().unwrap();
139/// # let db = pulsedb::PulseDB::open(dir.path().join("test.db"), pulsedb::Config::default())?;
140/// # let collective_id = db.create_collective("example")?;
141/// use pulsedb::WatchFilter;
142///
143/// let filter = WatchFilter {
144/// domains: Some(vec!["security".to_string()]),
145/// min_importance: Some(0.7),
146/// ..Default::default()
147/// };
148/// let stream = db.watch_experiences_filtered(collective_id, filter)?;
149/// # Ok(())
150/// # }
151/// ```
152#[derive(Clone, Debug, Default)]
153pub struct WatchFilter {
154 /// Only emit events for experiences in these domains.
155 /// If `None`, all domains match.
156 pub domains: Option<Vec<String>>,
157
158 /// Only emit events for these experience types.
159 /// If `None`, all types match.
160 pub experience_types: Option<Vec<ExperienceType>>,
161
162 /// Only emit events for experiences with importance >= this threshold.
163 /// If `None`, all importance levels match.
164 pub min_importance: Option<f32>,
165}
166
167/// A stream of [`WatchEvent`] values backed by a crossbeam channel.
168///
169/// Implements [`futures_core::Stream`] for async consumption. The stream
170/// yields `Some(event)` for each change and returns `None` when all
171/// senders are dropped (database closed or subscriber removed).
172///
173/// Dropping a `WatchStream` automatically unregisters the subscriber
174/// from the watch service, preventing memory leaks.
175pub struct WatchStream {
176 /// The receiving end of the crossbeam bounded channel.
177 pub(crate) receiver: Receiver<WatchEvent>,
178
179 /// Shared waker that the sender side calls `.wake()` on after `try_send`.
180 pub(crate) waker: Arc<AtomicWaker>,
181
182 /// Cleanup function called on drop to remove subscriber from registry.
183 pub(crate) cleanup: Option<Box<dyn FnOnce() + Send + Sync>>,
184}
185
186impl Stream for WatchStream {
187 type Item = WatchEvent;
188
189 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
190 // Register the waker FIRST so we don't miss a wake between
191 // try_recv and returning Pending.
192 self.waker.register(cx.waker());
193
194 match self.receiver.try_recv() {
195 Ok(event) => Poll::Ready(Some(event)),
196 Err(crossbeam_channel::TryRecvError::Empty) => Poll::Pending,
197 Err(crossbeam_channel::TryRecvError::Disconnected) => Poll::Ready(None),
198 }
199 }
200}
201
202impl Drop for WatchStream {
203 fn drop(&mut self) {
204 if let Some(cleanup) = self.cleanup.take() {
205 cleanup();
206 }
207 }
208}
209
210// Safety: crossbeam Receiver is Send, AtomicWaker is Send+Sync, cleanup is Send+Sync.
211// WatchStream is not Sync (Receiver is not Sync), which is correct for Stream consumers.
212impl std::fmt::Debug for WatchStream {
213 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
214 f.debug_struct("WatchStream")
215 .field("pending_events", &self.receiver.len())
216 .finish_non_exhaustive()
217 }
218}