phantom_protocol/api/
stream.rs1use crate::api::session::SessionCommand;
2use crate::errors::CoreError;
3use crate::transport::multiplexer::{StreamHandle, StreamMessage};
4use bytes::Bytes;
5use tokio::sync::mpsc;
6use tokio::sync::Mutex;
7
8#[cfg_attr(feature = "bindings", derive(uniffi::Object))]
9pub struct PhantomStream {
10 stream_id: u32,
11 tx: mpsc::Sender<SessionCommand>,
13 rx: Mutex<mpsc::Receiver<StreamMessage>>,
15}
16
17impl PhantomStream {
18 pub fn new(handle: StreamHandle, tx: mpsc::Sender<SessionCommand>) -> Self {
19 Self {
20 stream_id: handle.stream_id,
21 tx,
22 rx: Mutex::new(handle.rx),
23 }
24 }
25}
26
27#[cfg_attr(feature = "bindings", uniffi::export(async_runtime = "tokio"))]
28impl PhantomStream {
29 pub fn stream_id(&self) -> u32 {
30 self.stream_id
31 }
32
33 pub async fn send_reliable(&self, data: Vec<u8>) -> Result<(), CoreError> {
34 self.tx
35 .send(SessionCommand::SendStreamReliable {
36 stream_id: self.stream_id,
37 data: Bytes::from(data),
38 })
39 .await
40 .map_err(|_| CoreError::NetworkError("Session closed".into()))
41 }
42
43 pub async fn send_unreliable(&self, data: Vec<u8>) -> Result<(), CoreError> {
44 self.tx
45 .send(SessionCommand::SendStreamUnreliable {
46 stream_id: self.stream_id,
47 data: Bytes::from(data),
48 })
49 .await
50 .map_err(|_| CoreError::NetworkError("Session closed".into()))
51 }
52
53 pub async fn recv(&self) -> Result<Vec<u8>, CoreError> {
54 let mut rx = self.rx.lock().await;
55 loop {
56 match rx.recv().await {
57 Some(StreamMessage::Data(b)) => return Ok(b.to_vec()),
58 Some(StreamMessage::Ack(seq)) => {
59 log::debug!(
60 "PhantomStream {}: received ACK for seq {}",
61 self.stream_id,
62 seq
63 );
64 continue;
66 }
67 Some(StreamMessage::Close) => {
68 return Err(CoreError::NetworkError("Stream closed by peer".into()));
69 }
70 None => {
71 return Err(CoreError::NetworkError("Stream closed locally".into()));
72 }
73 }
74 }
75 }
76
77 pub async fn disconnect(&self) -> Result<(), CoreError> {
83 self.tx
84 .send(SessionCommand::CloseStream {
85 stream_id: self.stream_id,
86 })
87 .await
88 .map_err(|_| CoreError::NetworkError("Session closed".into()))
89 }
90}