Skip to main content

bee/file/
chunks_stream.rs

1//! `WS /chunks/stream` — websocket-driven chunk upload session.
2//!
3//! Mirrors bee-go's `chunks/stream` server-side handler: the client
4//! sends each chunk as a binary websocket message and the server
5//! replies with a single-byte `0x00` ack. A non-zero or text frame is
6//! treated as an error from the server.
7//!
8//! Tag-bound uploads (`swarm-tag` query) keep chunks local until the
9//! socket closes; tag-less uploads forward each chunk to the network
10//! as it arrives. See the OpenAPI summary for `/chunks/stream`.
11//!
12//! # Cancellation
13//!
14//! Drop the [`ChunkStream`] to release the socket; the in-flight TCP
15//! connection is closed without a graceful websocket close frame.
16//! Call [`ChunkStream::close`] for a graceful shutdown.
17
18use bytes::Bytes;
19use futures_util::stream::{SplitSink, SplitStream};
20use futures_util::{SinkExt, StreamExt};
21use tokio::net::TcpStream;
22use tokio_tungstenite::tungstenite::client::IntoClientRequest;
23use tokio_tungstenite::tungstenite::http::HeaderValue;
24use tokio_tungstenite::tungstenite::protocol::Message;
25use tokio_tungstenite::{MaybeTlsStream, WebSocketStream, connect_async};
26
27use crate::swarm::{BatchId, Error};
28
29use super::FileApi;
30
31type Ws = WebSocketStream<MaybeTlsStream<TcpStream>>;
32
33/// Active `/chunks/stream` upload session. Open one via
34/// [`FileApi::chunks_stream`].
35///
36/// Each [`Self::send_chunk`] sends one binary frame and awaits the
37/// server's `0x00` ack before returning. The session remains usable
38/// until [`Self::close`] (or until dropped).
39#[derive(Debug)]
40pub struct ChunkStream {
41    sink: SplitSink<Ws, Message>,
42    stream: SplitStream<Ws>,
43    closed: bool,
44}
45
46impl ChunkStream {
47    /// Send one chunk (`span || payload` framing per Bee's content-
48    /// addressed chunk format) and await the server ack.
49    ///
50    /// Returns [`Error::Argument`] if the websocket has already been
51    /// closed or if the server replied with anything other than the
52    /// single-byte `0` ack.
53    pub async fn send_chunk(&mut self, chunk: impl Into<Bytes>) -> Result<(), Error> {
54        if self.closed {
55            return Err(Error::argument("chunk stream is closed"));
56        }
57        let bytes: Bytes = chunk.into();
58        self.sink
59            .send(Message::Binary(bytes.to_vec()))
60            .await
61            .map_err(|e| Error::argument(format!("websocket send: {e}")))?;
62
63        // Wait for the next non-control frame and validate it as the
64        // ack. Ping / Pong / Frame are forwarded transparently by
65        // tokio-tungstenite, so they shouldn't surface here, but we
66        // guard for them anyway.
67        loop {
68            let next = self
69                .stream
70                .next()
71                .await
72                .ok_or_else(|| Error::argument("chunk stream closed before ack"))?;
73            match next {
74                Ok(Message::Binary(b)) => {
75                    if b.is_empty() || b == [0u8] {
76                        return Ok(());
77                    }
78                    return Err(Error::argument(format!(
79                        "chunk stream unexpected ack bytes: {b:?}"
80                    )));
81                }
82                Ok(Message::Text(t)) => {
83                    return Err(Error::argument(format!("chunk stream server error: {t}")));
84                }
85                Ok(Message::Close(_)) => {
86                    self.closed = true;
87                    return Err(Error::argument("chunk stream closed by server"));
88                }
89                Ok(Message::Ping(_) | Message::Pong(_) | Message::Frame(_)) => continue,
90                Err(e) => return Err(Error::argument(format!("websocket recv: {e}"))),
91            }
92        }
93    }
94
95    /// Close the websocket gracefully. Idempotent. Errors during
96    /// close are swallowed — the socket is dropped regardless.
97    pub async fn close(mut self) -> Result<(), Error> {
98        if !self.closed {
99            let _ = self.sink.send(Message::Close(None)).await;
100            let _ = self.sink.close().await;
101            self.closed = true;
102        }
103        Ok(())
104    }
105}
106
107impl FileApi {
108    /// Open a `/chunks/stream` websocket upload session against the
109    /// configured Bee node.
110    ///
111    /// - `batch_id` is sent in the `Swarm-Postage-Batch-Id` header and
112    ///   is used to stamp every chunk uploaded over this socket.
113    /// - `tag` (optional) is the existing tag UID to associate with
114    ///   the upload via the `swarm-tag` query parameter; without a
115    ///   tag, chunks are forwarded to the network as they arrive.
116    pub async fn chunks_stream(
117        &self,
118        batch_id: &BatchId,
119        tag: Option<u64>,
120    ) -> Result<ChunkStream, Error> {
121        let mut url = self.inner.url("chunks/stream")?;
122        let scheme = match url.scheme() {
123            "http" => "ws",
124            "https" => "wss",
125            other => {
126                return Err(Error::argument(format!(
127                    "unsupported base URL scheme for websocket: {other}"
128                )));
129            }
130        };
131        url.set_scheme(scheme)
132            .map_err(|_| Error::argument("failed to set websocket scheme"))?;
133        if let Some(t) = tag {
134            url.query_pairs_mut()
135                .append_pair("swarm-tag", &t.to_string());
136        }
137
138        let mut req = url
139            .as_str()
140            .into_client_request()
141            .map_err(|e| Error::argument(format!("websocket request: {e}")))?;
142        let value = HeaderValue::from_str(&batch_id.to_hex())
143            .map_err(|e| Error::argument(format!("invalid batch id header: {e}")))?;
144        req.headers_mut().insert("swarm-postage-batch-id", value);
145
146        let (ws, _resp) = connect_async(req)
147            .await
148            .map_err(|e| Error::argument(format!("websocket connect: {e}")))?;
149        let (sink, stream) = ws.split();
150
151        Ok(ChunkStream {
152            sink,
153            stream,
154            closed: false,
155        })
156    }
157}