Skip to main content

pulsedb/watch/
types.rs

1//! Public types for the watch system.
2//!
3//! These types define the event model for real-time experience notifications
4//! and the [`WatchStream`] adapter that bridges sync crossbeam channels to
5//! the async [`futures_core::Stream`] interface.
6
7use std::pin::Pin;
8use std::sync::Arc;
9use std::task::{Context, Poll};
10
11use atomic_waker::AtomicWaker;
12use crossbeam_channel::Receiver;
13use futures_core::Stream;
14
15use crate::experience::{Experience, ExperienceType};
16use crate::storage::schema::{WatchEventRecord, WatchEventTypeTag};
17use crate::types::{CollectiveId, ExperienceId, Timestamp};
18
19/// An event emitted when an experience changes.
20///
21/// Watch events are delivered in-process via bounded crossbeam channels.
22/// Each event identifies the experience, collective, mutation type, and
23/// when it occurred.
24///
25/// # Example
26///
27/// ```rust,no_run
28/// # #[tokio::main]
29/// # async fn main() -> pulsedb::Result<()> {
30/// # let dir = tempfile::tempdir().unwrap();
31/// # let db = pulsedb::PulseDB::open(dir.path().join("test.db"), pulsedb::Config::default())?;
32/// # let collective_id = db.create_collective("example")?;
33/// use futures::StreamExt;
34/// use pulsedb::WatchEventType;
35///
36/// let mut stream = db.watch_experiences(collective_id)?;
37/// while let Some(event) = stream.next().await {
38///     match event.event_type {
39///         WatchEventType::Created => println!("New experience: {}", event.experience_id),
40///         WatchEventType::Deleted => println!("Removed: {}", event.experience_id),
41///         _ => {}
42///     }
43/// }
44/// # Ok(())
45/// # }
46/// ```
47#[derive(Clone, Debug)]
48pub struct WatchEvent {
49    /// The experience that changed.
50    pub experience_id: ExperienceId,
51
52    /// The collective the experience belongs to.
53    pub collective_id: CollectiveId,
54
55    /// What kind of change occurred.
56    pub event_type: WatchEventType,
57
58    /// When the change occurred.
59    pub timestamp: Timestamp,
60
61    /// The full experience data (enriched event).
62    ///
63    /// Populated for `Created` and `Updated` events when delivered via
64    /// in-process watch subscriptions. `None` for `Deleted` events and
65    /// events reconstructed from WAL records (cross-process polling).
66    ///
67    /// Includes the embedding vector, enabling visualization tools
68    /// (PulseVision) to update 3D positions without a follow-up fetch.
69    pub experience: Option<Experience>,
70}
71
72/// The kind of change that triggered a [`WatchEvent`].
73#[derive(Clone, Copy, Debug, PartialEq, Eq)]
74pub enum WatchEventType {
75    /// A new experience was recorded.
76    Created,
77
78    /// An existing experience was modified (fields updated or reinforced).
79    Updated,
80
81    /// An experience was soft-deleted (archived).
82    Archived,
83
84    /// An experience was permanently deleted.
85    Deleted,
86}
87
88// ============================================================================
89// Conversions between storage tags and public types
90// ============================================================================
91
92impl From<WatchEventType> for WatchEventTypeTag {
93    fn from(value: WatchEventType) -> Self {
94        match value {
95            WatchEventType::Created => Self::Created,
96            WatchEventType::Updated => Self::Updated,
97            WatchEventType::Archived => Self::Archived,
98            WatchEventType::Deleted => Self::Deleted,
99        }
100    }
101}
102
103impl From<WatchEventTypeTag> for WatchEventType {
104    fn from(value: WatchEventTypeTag) -> Self {
105        match value {
106            WatchEventTypeTag::Created => Self::Created,
107            WatchEventTypeTag::Updated => Self::Updated,
108            WatchEventTypeTag::Archived => Self::Archived,
109            WatchEventTypeTag::Deleted => Self::Deleted,
110        }
111    }
112}
113
114impl From<WatchEventRecord> for WatchEvent {
115    fn from(record: WatchEventRecord) -> Self {
116        Self {
117            experience_id: ExperienceId::from_bytes(record.entity_id),
118            collective_id: CollectiveId::from_bytes(record.collective_id),
119            event_type: record.event_type.into(),
120            timestamp: Timestamp::from_millis(record.timestamp_ms),
121            experience: None, // WAL records don't carry full experience data
122        }
123    }
124}
125
126/// Filter for narrowing which watch events a subscriber receives.
127///
128/// All fields are optional. When multiple fields are set, they are combined
129/// with AND logic: an event must match **all** specified criteria.
130///
131/// Filters are applied on the sender side before channel delivery, so
132/// subscribers only receive events they care about.
133///
134/// # Example
135///
136/// ```rust
137/// # fn main() -> pulsedb::Result<()> {
138/// # let dir = tempfile::tempdir().unwrap();
139/// # let db = pulsedb::PulseDB::open(dir.path().join("test.db"), pulsedb::Config::default())?;
140/// # let collective_id = db.create_collective("example")?;
141/// use pulsedb::WatchFilter;
142///
143/// let filter = WatchFilter {
144///     domains: Some(vec!["security".to_string()]),
145///     min_importance: Some(0.7),
146///     ..Default::default()
147/// };
148/// let stream = db.watch_experiences_filtered(collective_id, filter)?;
149/// # Ok(())
150/// # }
151/// ```
152#[derive(Clone, Debug, Default)]
153pub struct WatchFilter {
154    /// Only emit events for experiences in these domains.
155    /// If `None`, all domains match.
156    pub domains: Option<Vec<String>>,
157
158    /// Only emit events for these experience types.
159    /// If `None`, all types match.
160    pub experience_types: Option<Vec<ExperienceType>>,
161
162    /// Only emit events for experiences with importance >= this threshold.
163    /// If `None`, all importance levels match.
164    pub min_importance: Option<f32>,
165}
166
167/// A stream of [`WatchEvent`] values backed by a crossbeam channel.
168///
169/// Implements [`futures_core::Stream`] for async consumption. The stream
170/// yields `Some(event)` for each change and returns `None` when all
171/// senders are dropped (database closed or subscriber removed).
172///
173/// Dropping a `WatchStream` automatically unregisters the subscriber
174/// from the watch service, preventing memory leaks.
175pub struct WatchStream {
176    /// The receiving end of the crossbeam bounded channel.
177    pub(crate) receiver: Receiver<WatchEvent>,
178
179    /// Shared waker that the sender side calls `.wake()` on after `try_send`.
180    pub(crate) waker: Arc<AtomicWaker>,
181
182    /// Cleanup function called on drop to remove subscriber from registry.
183    pub(crate) cleanup: Option<Box<dyn FnOnce() + Send + Sync>>,
184}
185
186impl Stream for WatchStream {
187    type Item = WatchEvent;
188
189    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
190        // Register the waker FIRST so we don't miss a wake between
191        // try_recv and returning Pending.
192        self.waker.register(cx.waker());
193
194        match self.receiver.try_recv() {
195            Ok(event) => Poll::Ready(Some(event)),
196            Err(crossbeam_channel::TryRecvError::Empty) => Poll::Pending,
197            Err(crossbeam_channel::TryRecvError::Disconnected) => Poll::Ready(None),
198        }
199    }
200}
201
202impl Drop for WatchStream {
203    fn drop(&mut self) {
204        if let Some(cleanup) = self.cleanup.take() {
205            cleanup();
206        }
207    }
208}
209
210// Safety: crossbeam Receiver is Send, AtomicWaker is Send+Sync, cleanup is Send+Sync.
211// WatchStream is not Sync (Receiver is not Sync), which is correct for Stream consumers.
212impl std::fmt::Debug for WatchStream {
213    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
214        f.debug_struct("WatchStream")
215            .field("pending_events", &self.receiver.len())
216            .finish_non_exhaustive()
217    }
218}