Skip to main content

phantom_protocol/api/
stream.rs

1use crate::api::session::SessionCommand;
2use crate::errors::CoreError;
3use crate::transport::multiplexer::{StreamHandle, StreamMessage};
4use bytes::Bytes;
5use tokio::sync::mpsc;
6use tokio::sync::Mutex;
7
8#[cfg_attr(feature = "bindings", derive(uniffi::Object))]
9pub struct PhantomStream {
10    stream_id: u32,
11    /// Channel to send data to the session to be packaged and sent
12    tx: mpsc::Sender<SessionCommand>,
13    /// Receiver for incoming demultiplexed stream data
14    rx: Mutex<mpsc::Receiver<StreamMessage>>,
15}
16
17impl PhantomStream {
18    pub fn new(handle: StreamHandle, tx: mpsc::Sender<SessionCommand>) -> Self {
19        Self {
20            stream_id: handle.stream_id,
21            tx,
22            rx: Mutex::new(handle.rx),
23        }
24    }
25}
26
27#[cfg_attr(feature = "bindings", uniffi::export(async_runtime = "tokio"))]
28impl PhantomStream {
29    pub fn stream_id(&self) -> u32 {
30        self.stream_id
31    }
32
33    pub async fn send_reliable(&self, data: Vec<u8>) -> Result<(), CoreError> {
34        self.tx
35            .send(SessionCommand::SendStreamReliable {
36                stream_id: self.stream_id,
37                data: Bytes::from(data),
38            })
39            .await
40            .map_err(|_| CoreError::NetworkError("Session closed".into()))
41    }
42
43    pub async fn send_unreliable(&self, data: Vec<u8>) -> Result<(), CoreError> {
44        self.tx
45            .send(SessionCommand::SendStreamUnreliable {
46                stream_id: self.stream_id,
47                data: Bytes::from(data),
48            })
49            .await
50            .map_err(|_| CoreError::NetworkError("Session closed".into()))
51    }
52
53    pub async fn recv(&self) -> Result<Vec<u8>, CoreError> {
54        let mut rx = self.rx.lock().await;
55        loop {
56            match rx.recv().await {
57                Some(StreamMessage::Data(b)) => return Ok(b.to_vec()),
58                Some(StreamMessage::Ack(seq)) => {
59                    log::debug!(
60                        "PhantomStream {}: received ACK for seq {}",
61                        self.stream_id,
62                        seq
63                    );
64                    // Just log for now until ARQ is implemented
65                    continue;
66                }
67                Some(StreamMessage::Close) => {
68                    return Err(CoreError::NetworkError("Stream closed by peer".into()));
69                }
70                None => {
71                    return Err(CoreError::NetworkError("Stream closed locally".into()));
72                }
73            }
74        }
75    }
76
77    /// Close this stream; the peer will see EOF on its read half.
78    ///
79    /// Named `disconnect` rather than `close` for the same reason as
80    /// `PhantomSession::disconnect` — UniFFI's Kotlin generator emits
81    /// `AutoCloseable.close()` on every object.
82    pub async fn disconnect(&self) -> Result<(), CoreError> {
83        self.tx
84            .send(SessionCommand::CloseStream {
85                stream_id: self.stream_id,
86            })
87            .await
88            .map_err(|_| CoreError::NetworkError("Session closed".into()))
89    }
90}