bee/file/
chunks_stream.rs1use bytes::Bytes;
19use futures_util::stream::{SplitSink, SplitStream};
20use futures_util::{SinkExt, StreamExt};
21use tokio::net::TcpStream;
22use tokio_tungstenite::tungstenite::client::IntoClientRequest;
23use tokio_tungstenite::tungstenite::http::HeaderValue;
24use tokio_tungstenite::tungstenite::protocol::Message;
25use tokio_tungstenite::{MaybeTlsStream, WebSocketStream, connect_async};
26
27use crate::swarm::{BatchId, Error};
28
29use super::FileApi;
30
31type Ws = WebSocketStream<MaybeTlsStream<TcpStream>>;
32
33#[derive(Debug)]
40pub struct ChunkStream {
41 sink: SplitSink<Ws, Message>,
42 stream: SplitStream<Ws>,
43 closed: bool,
44}
45
46impl ChunkStream {
47 pub async fn send_chunk(&mut self, chunk: impl Into<Bytes>) -> Result<(), Error> {
54 if self.closed {
55 return Err(Error::argument("chunk stream is closed"));
56 }
57 let bytes: Bytes = chunk.into();
58 self.sink
59 .send(Message::Binary(bytes.to_vec()))
60 .await
61 .map_err(|e| Error::argument(format!("websocket send: {e}")))?;
62
63 loop {
68 let next = self
69 .stream
70 .next()
71 .await
72 .ok_or_else(|| Error::argument("chunk stream closed before ack"))?;
73 match next {
74 Ok(Message::Binary(b)) => {
75 if b.is_empty() || b == [0u8] {
76 return Ok(());
77 }
78 return Err(Error::argument(format!(
79 "chunk stream unexpected ack bytes: {b:?}"
80 )));
81 }
82 Ok(Message::Text(t)) => {
83 return Err(Error::argument(format!("chunk stream server error: {t}")));
84 }
85 Ok(Message::Close(_)) => {
86 self.closed = true;
87 return Err(Error::argument("chunk stream closed by server"));
88 }
89 Ok(Message::Ping(_) | Message::Pong(_) | Message::Frame(_)) => continue,
90 Err(e) => return Err(Error::argument(format!("websocket recv: {e}"))),
91 }
92 }
93 }
94
95 pub async fn close(mut self) -> Result<(), Error> {
98 if !self.closed {
99 let _ = self.sink.send(Message::Close(None)).await;
100 let _ = self.sink.close().await;
101 self.closed = true;
102 }
103 Ok(())
104 }
105}
106
107impl FileApi {
108 pub async fn chunks_stream(
117 &self,
118 batch_id: &BatchId,
119 tag: Option<u64>,
120 ) -> Result<ChunkStream, Error> {
121 let mut url = self.inner.url("chunks/stream")?;
122 let scheme = match url.scheme() {
123 "http" => "ws",
124 "https" => "wss",
125 other => {
126 return Err(Error::argument(format!(
127 "unsupported base URL scheme for websocket: {other}"
128 )));
129 }
130 };
131 url.set_scheme(scheme)
132 .map_err(|_| Error::argument("failed to set websocket scheme"))?;
133 if let Some(t) = tag {
134 url.query_pairs_mut()
135 .append_pair("swarm-tag", &t.to_string());
136 }
137
138 let mut req = url
139 .as_str()
140 .into_client_request()
141 .map_err(|e| Error::argument(format!("websocket request: {e}")))?;
142 let value = HeaderValue::from_str(&batch_id.to_hex())
143 .map_err(|e| Error::argument(format!("invalid batch id header: {e}")))?;
144 req.headers_mut().insert("swarm-postage-batch-id", value);
145
146 let (ws, _resp) = connect_async(req)
147 .await
148 .map_err(|e| Error::argument(format!("websocket connect: {e}")))?;
149 let (sink, stream) = ws.split();
150
151 Ok(ChunkStream {
152 sink,
153 stream,
154 closed: false,
155 })
156 }
157}