Skip to main content

pulsedb/watch/
types.rs

1//! Public types for the watch system.
2//!
3//! These types define the event model for real-time experience notifications
4//! and the [`WatchStream`] adapter that bridges sync crossbeam channels to
5//! the async [`futures_core::Stream`] interface.
6
7use std::pin::Pin;
8use std::sync::Arc;
9use std::task::{Context, Poll};
10
11use atomic_waker::AtomicWaker;
12use crossbeam_channel::Receiver;
13use futures_core::Stream;
14
15use crate::experience::ExperienceType;
16use crate::storage::schema::{WatchEventRecord, WatchEventTypeTag};
17use crate::types::{CollectiveId, ExperienceId, Timestamp};
18
19/// An event emitted when an experience changes.
20///
21/// Watch events are delivered in-process via bounded crossbeam channels.
22/// Each event identifies the experience, collective, mutation type, and
23/// when it occurred.
24///
25/// # Example
26///
27/// ```rust,no_run
28/// # #[tokio::main]
29/// # async fn main() -> pulsedb::Result<()> {
30/// # let dir = tempfile::tempdir().unwrap();
31/// # let db = pulsedb::PulseDB::open(dir.path().join("test.db"), pulsedb::Config::default())?;
32/// # let collective_id = db.create_collective("example")?;
33/// use futures::StreamExt;
34/// use pulsedb::WatchEventType;
35///
36/// let mut stream = db.watch_experiences(collective_id)?;
37/// while let Some(event) = stream.next().await {
38///     match event.event_type {
39///         WatchEventType::Created => println!("New experience: {}", event.experience_id),
40///         WatchEventType::Deleted => println!("Removed: {}", event.experience_id),
41///         _ => {}
42///     }
43/// }
44/// # Ok(())
45/// # }
46/// ```
47#[derive(Clone, Debug)]
48pub struct WatchEvent {
49    /// The experience that changed.
50    pub experience_id: ExperienceId,
51
52    /// The collective the experience belongs to.
53    pub collective_id: CollectiveId,
54
55    /// What kind of change occurred.
56    pub event_type: WatchEventType,
57
58    /// When the change occurred.
59    pub timestamp: Timestamp,
60}
61
62/// The kind of change that triggered a [`WatchEvent`].
63#[derive(Clone, Copy, Debug, PartialEq, Eq)]
64pub enum WatchEventType {
65    /// A new experience was recorded.
66    Created,
67
68    /// An existing experience was modified (fields updated or reinforced).
69    Updated,
70
71    /// An experience was soft-deleted (archived).
72    Archived,
73
74    /// An experience was permanently deleted.
75    Deleted,
76}
77
78// ============================================================================
79// Conversions between storage tags and public types
80// ============================================================================
81
82impl From<WatchEventType> for WatchEventTypeTag {
83    fn from(value: WatchEventType) -> Self {
84        match value {
85            WatchEventType::Created => Self::Created,
86            WatchEventType::Updated => Self::Updated,
87            WatchEventType::Archived => Self::Archived,
88            WatchEventType::Deleted => Self::Deleted,
89        }
90    }
91}
92
93impl From<WatchEventTypeTag> for WatchEventType {
94    fn from(value: WatchEventTypeTag) -> Self {
95        match value {
96            WatchEventTypeTag::Created => Self::Created,
97            WatchEventTypeTag::Updated => Self::Updated,
98            WatchEventTypeTag::Archived => Self::Archived,
99            WatchEventTypeTag::Deleted => Self::Deleted,
100        }
101    }
102}
103
104impl From<WatchEventRecord> for WatchEvent {
105    fn from(record: WatchEventRecord) -> Self {
106        Self {
107            experience_id: ExperienceId::from_bytes(record.entity_id),
108            collective_id: CollectiveId::from_bytes(record.collective_id),
109            event_type: record.event_type.into(),
110            timestamp: Timestamp::from_millis(record.timestamp_ms),
111        }
112    }
113}
114
115/// Filter for narrowing which watch events a subscriber receives.
116///
117/// All fields are optional. When multiple fields are set, they are combined
118/// with AND logic: an event must match **all** specified criteria.
119///
120/// Filters are applied on the sender side before channel delivery, so
121/// subscribers only receive events they care about.
122///
123/// # Example
124///
125/// ```rust
126/// # fn main() -> pulsedb::Result<()> {
127/// # let dir = tempfile::tempdir().unwrap();
128/// # let db = pulsedb::PulseDB::open(dir.path().join("test.db"), pulsedb::Config::default())?;
129/// # let collective_id = db.create_collective("example")?;
130/// use pulsedb::WatchFilter;
131///
132/// let filter = WatchFilter {
133///     domains: Some(vec!["security".to_string()]),
134///     min_importance: Some(0.7),
135///     ..Default::default()
136/// };
137/// let stream = db.watch_experiences_filtered(collective_id, filter)?;
138/// # Ok(())
139/// # }
140/// ```
141#[derive(Clone, Debug, Default)]
142pub struct WatchFilter {
143    /// Only emit events for experiences in these domains.
144    /// If `None`, all domains match.
145    pub domains: Option<Vec<String>>,
146
147    /// Only emit events for these experience types.
148    /// If `None`, all types match.
149    pub experience_types: Option<Vec<ExperienceType>>,
150
151    /// Only emit events for experiences with importance >= this threshold.
152    /// If `None`, all importance levels match.
153    pub min_importance: Option<f32>,
154}
155
156/// A stream of [`WatchEvent`] values backed by a crossbeam channel.
157///
158/// Implements [`futures_core::Stream`] for async consumption. The stream
159/// yields `Some(event)` for each change and returns `None` when all
160/// senders are dropped (database closed or subscriber removed).
161///
162/// Dropping a `WatchStream` automatically unregisters the subscriber
163/// from the watch service, preventing memory leaks.
164pub struct WatchStream {
165    /// The receiving end of the crossbeam bounded channel.
166    pub(crate) receiver: Receiver<WatchEvent>,
167
168    /// Shared waker that the sender side calls `.wake()` on after `try_send`.
169    pub(crate) waker: Arc<AtomicWaker>,
170
171    /// Cleanup function called on drop to remove subscriber from registry.
172    pub(crate) cleanup: Option<Box<dyn FnOnce() + Send + Sync>>,
173}
174
175impl Stream for WatchStream {
176    type Item = WatchEvent;
177
178    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
179        // Register the waker FIRST so we don't miss a wake between
180        // try_recv and returning Pending.
181        self.waker.register(cx.waker());
182
183        match self.receiver.try_recv() {
184            Ok(event) => Poll::Ready(Some(event)),
185            Err(crossbeam_channel::TryRecvError::Empty) => Poll::Pending,
186            Err(crossbeam_channel::TryRecvError::Disconnected) => Poll::Ready(None),
187        }
188    }
189}
190
191impl Drop for WatchStream {
192    fn drop(&mut self) {
193        if let Some(cleanup) = self.cleanup.take() {
194            cleanup();
195        }
196    }
197}
198
199// Safety: crossbeam Receiver is Send, AtomicWaker is Send+Sync, cleanup is Send+Sync.
200// WatchStream is not Sync (Receiver is not Sync), which is correct for Stream consumers.
201impl std::fmt::Debug for WatchStream {
202    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
203        f.debug_struct("WatchStream")
204            .field("pending_events", &self.receiver.len())
205            .finish_non_exhaustive()
206    }
207}