reifydb_engine/stream/
channel.rs1use std::{
10 pin::Pin,
11 sync::atomic::{AtomicU64, Ordering},
12 task::{Context, Poll},
13};
14
15use futures_util::Stream;
16use reifydb_core::{
17 Frame,
18 stream::{StreamError, StreamResult},
19};
20use tokio::sync::mpsc;
21use tokio_util::sync::CancellationToken;
22
23static STREAM_ID_COUNTER: AtomicU64 = AtomicU64::new(0);
25
26#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
28pub struct StreamId(u64);
29
30impl StreamId {
31 pub fn next() -> Self {
33 Self(STREAM_ID_COUNTER.fetch_add(1, Ordering::Relaxed))
34 }
35}
36
37pub struct ChannelFrameStream {
43 receiver: mpsc::Receiver<StreamResult<Frame>>,
44 cancel_token: CancellationToken,
45 cancelled_sent: bool,
46}
47
48impl ChannelFrameStream {
49 pub fn new(buffer_size: usize, cancel_token: CancellationToken) -> (FrameSender, Self) {
51 let (tx, rx) = mpsc::channel(buffer_size);
52 let sender = FrameSender {
53 sender: tx,
54 };
55 let stream = Self {
56 receiver: rx,
57 cancel_token,
58 cancelled_sent: false,
59 };
60 (sender, stream)
61 }
62}
63
64impl Stream for ChannelFrameStream {
65 type Item = StreamResult<Frame>;
66
67 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
68 if self.cancel_token.is_cancelled() && !self.cancelled_sent {
70 self.cancelled_sent = true;
71 return Poll::Ready(Some(Err(StreamError::Cancelled)));
72 }
73
74 Pin::new(&mut self.receiver).poll_recv(cx)
75 }
76}
77
78pub struct FrameSender {
80 sender: mpsc::Sender<StreamResult<Frame>>,
81}
82
83impl FrameSender {
84 pub async fn send(&self, frame: StreamResult<Frame>) -> Result<(), StreamError> {
86 self.sender.send(frame).await.map_err(|_| StreamError::Disconnected)
87 }
88
89 pub fn try_send(&self, frame: StreamResult<Frame>) -> Result<(), StreamError> {
91 self.sender.try_send(frame).map_err(|_| StreamError::Disconnected)
92 }
93
94 pub fn has_capacity(&self) -> bool {
96 self.sender.capacity() > 0
97 }
98
99 pub fn is_closed(&self) -> bool {
101 self.sender.is_closed()
102 }
103}
104
105impl Clone for FrameSender {
106 fn clone(&self) -> Self {
107 Self {
108 sender: self.sender.clone(),
109 }
110 }
111}
112
113#[derive(Clone)]
115pub struct StreamHandle {
116 cancel_token: CancellationToken,
117 stream_id: StreamId,
118}
119
120impl StreamHandle {
121 pub fn new(cancel_token: CancellationToken) -> Self {
123 Self {
124 cancel_token,
125 stream_id: StreamId::next(),
126 }
127 }
128
129 pub fn cancel(&self) {
131 self.cancel_token.cancel();
132 }
133
134 pub fn is_cancelled(&self) -> bool {
136 self.cancel_token.is_cancelled()
137 }
138
139 pub fn id(&self) -> StreamId {
141 self.stream_id
142 }
143}