reifydb_engine/stream/
channel.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later
3
4//! Channel-based stream implementation for query results.
5//!
6//! Provides backpressure through bounded channels - when the buffer is full,
7//! the producer (query executor) will wait, allowing memory usage to be bounded.
8
9use std::{
10	pin::Pin,
11	sync::atomic::{AtomicU64, Ordering},
12	task::{Context, Poll},
13};
14
15use futures_util::Stream;
16use reifydb_core::{
17	Frame,
18	stream::{StreamError, StreamResult},
19};
20use tokio::sync::mpsc;
21use tokio_util::sync::CancellationToken;
22
23/// Counter for generating unique stream IDs.
24static STREAM_ID_COUNTER: AtomicU64 = AtomicU64::new(0);
25
26/// Unique identifier for a stream.
27#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
28pub struct StreamId(u64);
29
30impl StreamId {
31	/// Generate a new unique stream ID.
32	pub fn next() -> Self {
33		Self(STREAM_ID_COUNTER.fetch_add(1, Ordering::Relaxed))
34	}
35}
36
37/// Stream implementation backed by a bounded mpsc channel.
38///
39/// This provides natural backpressure - when the buffer is full,
40/// the producer (query executor) will wait, allowing memory
41/// to be bounded.
42pub struct ChannelFrameStream {
43	receiver: mpsc::Receiver<StreamResult<Frame>>,
44	cancel_token: CancellationToken,
45	cancelled_sent: bool,
46}
47
48impl ChannelFrameStream {
49	/// Create a new channel-based stream with the given buffer size.
50	pub fn new(buffer_size: usize, cancel_token: CancellationToken) -> (FrameSender, Self) {
51		let (tx, rx) = mpsc::channel(buffer_size);
52		let sender = FrameSender {
53			sender: tx,
54		};
55		let stream = Self {
56			receiver: rx,
57			cancel_token,
58			cancelled_sent: false,
59		};
60		(sender, stream)
61	}
62}
63
64impl Stream for ChannelFrameStream {
65	type Item = StreamResult<Frame>;
66
67	fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
68		// Check cancellation first
69		if self.cancel_token.is_cancelled() && !self.cancelled_sent {
70			self.cancelled_sent = true;
71			return Poll::Ready(Some(Err(StreamError::Cancelled)));
72		}
73
74		Pin::new(&mut self.receiver).poll_recv(cx)
75	}
76}
77
78/// Sender half for producing frames into the stream.
79pub struct FrameSender {
80	sender: mpsc::Sender<StreamResult<Frame>>,
81}
82
83impl FrameSender {
84	/// Send a frame to the stream (waits if buffer is full).
85	pub async fn send(&self, frame: StreamResult<Frame>) -> Result<(), StreamError> {
86		self.sender.send(frame).await.map_err(|_| StreamError::Disconnected)
87	}
88
89	/// Try to send without waiting (returns error if buffer is full or closed).
90	pub fn try_send(&self, frame: StreamResult<Frame>) -> Result<(), StreamError> {
91		self.sender.try_send(frame).map_err(|_| StreamError::Disconnected)
92	}
93
94	/// Check if there's capacity without blocking.
95	pub fn has_capacity(&self) -> bool {
96		self.sender.capacity() > 0
97	}
98
99	/// Check if the receiver has been dropped.
100	pub fn is_closed(&self) -> bool {
101		self.sender.is_closed()
102	}
103}
104
105impl Clone for FrameSender {
106	fn clone(&self) -> Self {
107		Self {
108			sender: self.sender.clone(),
109		}
110	}
111}
112
113/// Handle for controlling a running stream query.
114#[derive(Clone)]
115pub struct StreamHandle {
116	cancel_token: CancellationToken,
117	stream_id: StreamId,
118}
119
120impl StreamHandle {
121	/// Create a new stream handle.
122	pub fn new(cancel_token: CancellationToken) -> Self {
123		Self {
124			cancel_token,
125			stream_id: StreamId::next(),
126		}
127	}
128
129	/// Cancel the running query.
130	pub fn cancel(&self) {
131		self.cancel_token.cancel();
132	}
133
134	/// Check if the query has been cancelled.
135	pub fn is_cancelled(&self) -> bool {
136		self.cancel_token.is_cancelled()
137	}
138
139	/// Get the stream's unique identifier.
140	pub fn id(&self) -> StreamId {
141		self.stream_id
142	}
143}