synap_sdk/stream_reactive.rs
1//! Reactive stream operations
2//!
3//! Provides Stream-based event consumption for event streams.
4
5use crate::reactive::{MessageStream, SubscriptionHandle};
6use crate::types::Event;
7use futures::Stream;
8use std::time::Duration;
9use tokio::sync::mpsc;
10use tokio::time::sleep;
11
12impl crate::stream::StreamManager {
13 /// Observe events from a stream room reactively
14 ///
15 /// Returns a Stream of events that can be processed asynchronously.
16 /// The stream will poll for new events at the specified interval.
17 ///
18 /// # Example
19 /// ```no_run
20 /// use futures::StreamExt;
21 /// use synap_sdk::{SynapClient, SynapConfig};
22 /// use std::time::Duration;
23 ///
24 /// # #[tokio::main]
25 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
26 /// # let client = SynapClient::new(SynapConfig::new("http://localhost:15500"))?;
27 /// let (mut stream, handle) = client.stream()
28 /// .observe_events("chat-room-1", Some(0), Duration::from_millis(100));
29 ///
30 /// // Process events reactively
31 /// while let Some(event) = stream.next().await {
32 /// println!("Event {}: {:?}", event.offset, event.data);
33 /// }
34 ///
35 /// // Stop observing
36 /// handle.unsubscribe();
37 /// # Ok(())
38 /// # }
39 /// ```
40 pub fn observe_events(
41 &self,
42 room: impl Into<String>,
43 start_offset: Option<u64>,
44 poll_interval: Duration,
45 ) -> (impl Stream<Item = Event> + 'static, SubscriptionHandle) {
46 let room = room.into();
47 let client = self.client.clone();
48 let mut current_offset = start_offset.unwrap_or(0);
49
50 let (tx, rx) = mpsc::unbounded_channel::<Event>();
51 let (cancel_tx, mut cancel_rx) = mpsc::unbounded_channel();
52
53 tokio::spawn(async move {
54 loop {
55 tokio::select! {
56 _ = cancel_rx.recv() => {
57 tracing::debug!("Event stream cancelled");
58 break;
59 }
60 _ = sleep(poll_interval) => {
61 match client.stream().consume(&room, Some(current_offset), Some(100)).await {
62 Ok(events) => {
63 for event in events {
64 current_offset = event.offset + 1;
65 if tx.send(event).is_err() {
66 return; // Receiver dropped
67 }
68 }
69 }
70 Err(e) => {
71 tracing::error!("Error consuming events: {}", e);
72 }
73 }
74 }
75 }
76 }
77 });
78
79 let stream: MessageStream<Event> =
80 Box::pin(tokio_stream::wrappers::UnboundedReceiverStream::new(rx));
81 let handle = SubscriptionHandle::new(cancel_tx);
82
83 (stream, handle)
84 }
85
86 /// Observe specific event types from a stream
87 ///
88 /// Filters events by event type before delivering them.
89 pub fn observe_event(
90 &self,
91 room: impl Into<String>,
92 event_type: impl Into<String>,
93 start_offset: Option<u64>,
94 poll_interval: Duration,
95 ) -> (impl Stream<Item = Event> + 'static, SubscriptionHandle) {
96 let room = room.into();
97 let event_type = event_type.into();
98 let client = self.client.clone();
99 let mut current_offset = start_offset.unwrap_or(0);
100
101 let (tx, rx) = mpsc::unbounded_channel::<Event>();
102 let (cancel_tx, mut cancel_rx) = mpsc::unbounded_channel();
103
104 tokio::spawn(async move {
105 loop {
106 tokio::select! {
107 _ = cancel_rx.recv() => {
108 break;
109 }
110 _ = sleep(poll_interval) => {
111 match client.stream().consume(&room, Some(current_offset), Some(100)).await {
112 Ok(events) => {
113 for event in events {
114 current_offset = event.offset + 1;
115
116 // Filter by event type
117 if event.event == event_type && tx.send(event).is_err() {
118 return;
119 }
120 }
121 }
122 Err(e) => {
123 tracing::error!("Error consuming events: {}", e);
124 }
125 }
126 }
127 }
128 }
129 });
130
131 let stream: MessageStream<Event> =
132 Box::pin(tokio_stream::wrappers::UnboundedReceiverStream::new(rx));
133 let handle = SubscriptionHandle::new(cancel_tx);
134
135 (stream, handle)
136 }
137}