synap_sdk/
stream_reactive.rs

1//! Reactive stream operations
2//!
3//! Provides Stream-based event consumption for event streams.
4
5use crate::reactive::{MessageStream, SubscriptionHandle};
6use crate::types::Event;
7use futures::Stream;
8use std::time::Duration;
9use tokio::sync::mpsc;
10use tokio::time::sleep;
11
12impl crate::stream::StreamManager {
13    /// Observe events from a stream room reactively
14    ///
15    /// Returns a Stream of events that can be processed asynchronously.
16    /// The stream will poll for new events at the specified interval.
17    ///
18    /// # Example
19    /// ```no_run
20    /// use futures::StreamExt;
21    /// use synap_sdk::{SynapClient, SynapConfig};
22    /// use std::time::Duration;
23    ///
24    /// # #[tokio::main]
25    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
26    /// # let client = SynapClient::new(SynapConfig::new("http://localhost:15500"))?;
27    /// let (mut stream, handle) = client.stream()
28    ///     .observe_events("chat-room-1", Some(0), Duration::from_millis(100));
29    ///
30    /// // Process events reactively
31    /// while let Some(event) = stream.next().await {
32    ///     println!("Event {}: {:?}", event.offset, event.data);
33    /// }
34    ///
35    /// // Stop observing
36    /// handle.unsubscribe();
37    /// # Ok(())
38    /// # }
39    /// ```
40    pub fn observe_events(
41        &self,
42        room: impl Into<String>,
43        start_offset: Option<u64>,
44        poll_interval: Duration,
45    ) -> (impl Stream<Item = Event> + 'static, SubscriptionHandle) {
46        let room = room.into();
47        let client = self.client.clone();
48        let mut current_offset = start_offset.unwrap_or(0);
49
50        let (tx, rx) = mpsc::unbounded_channel::<Event>();
51        let (cancel_tx, mut cancel_rx) = mpsc::unbounded_channel();
52
53        tokio::spawn(async move {
54            loop {
55                tokio::select! {
56                    _ = cancel_rx.recv() => {
57                        tracing::debug!("Event stream cancelled");
58                        break;
59                    }
60                    _ = sleep(poll_interval) => {
61                        match client.stream().consume(&room, Some(current_offset), Some(100)).await {
62                            Ok(events) => {
63                                for event in events {
64                                    current_offset = event.offset + 1;
65                                    if tx.send(event).is_err() {
66                                        return; // Receiver dropped
67                                    }
68                                }
69                            }
70                            Err(e) => {
71                                tracing::error!("Error consuming events: {}", e);
72                            }
73                        }
74                    }
75                }
76            }
77        });
78
79        let stream: MessageStream<Event> =
80            Box::pin(tokio_stream::wrappers::UnboundedReceiverStream::new(rx));
81        let handle = SubscriptionHandle::new(cancel_tx);
82
83        (stream, handle)
84    }
85
86    /// Observe specific event types from a stream
87    ///
88    /// Filters events by event type before delivering them.
89    pub fn observe_event(
90        &self,
91        room: impl Into<String>,
92        event_type: impl Into<String>,
93        start_offset: Option<u64>,
94        poll_interval: Duration,
95    ) -> (impl Stream<Item = Event> + 'static, SubscriptionHandle) {
96        let room = room.into();
97        let event_type = event_type.into();
98        let client = self.client.clone();
99        let mut current_offset = start_offset.unwrap_or(0);
100
101        let (tx, rx) = mpsc::unbounded_channel::<Event>();
102        let (cancel_tx, mut cancel_rx) = mpsc::unbounded_channel();
103
104        tokio::spawn(async move {
105            loop {
106                tokio::select! {
107                    _ = cancel_rx.recv() => {
108                        break;
109                    }
110                    _ = sleep(poll_interval) => {
111                        match client.stream().consume(&room, Some(current_offset), Some(100)).await {
112                            Ok(events) => {
113                                for event in events {
114                                    current_offset = event.offset + 1;
115
116                                    // Filter by event type
117                                    if event.event == event_type && tx.send(event).is_err() {
118                                        return;
119                                    }
120                                }
121                            }
122                            Err(e) => {
123                                tracing::error!("Error consuming events: {}", e);
124                            }
125                        }
126                    }
127                }
128            }
129        });
130
131        let stream: MessageStream<Event> =
132            Box::pin(tokio_stream::wrappers::UnboundedReceiverStream::new(rx));
133        let handle = SubscriptionHandle::new(cancel_tx);
134
135        (stream, handle)
136    }
137}