Skip to main content

bee/pss/
mod.rs

1//! PSS send / subscribe / receive. Mirrors `pkg/pss` in bee-go and
2//! `Bee.pssSend` / `Bee.pssSubscribe` / `Bee.pssReceive` in bee-js.
3//!
4//! Get a [`PssApi`] handle via [`crate::Client::pss`].
5
6use std::sync::Arc;
7use std::time::Duration;
8
9use bytes::Bytes;
10use futures_util::{SinkExt, StreamExt};
11use reqwest::Method;
12use tokio::sync::mpsc;
13use tokio::time;
14use tokio_tungstenite::connect_async;
15use tokio_tungstenite::tungstenite::protocol::Message;
16
17use crate::client::{Inner, request};
18use crate::swarm::{BatchId, Error, PublicKey, Topic};
19
20/// Handle exposing the PSS endpoints. Cheap to clone.
21#[derive(Clone, Debug)]
22pub struct PssApi {
23    pub(crate) inner: Arc<Inner>,
24}
25
26impl PssApi {
27    pub(crate) fn new(inner: Arc<Inner>) -> Self {
28        Self { inner }
29    }
30
31    /// `POST /pss/send/{topic}/{target}` — send a PSS message.
32    ///
33    /// - `batch_id` is mandatory (Bee 2.7+ rejects requests without
34    ///   `Swarm-Postage-Batch-Id`).
35    /// - `target` is a routing prefix as hex (e.g. `"1234"`), not a
36    ///   full overlay address. Bee uses it as a partial XOR target so
37    ///   gossip can reach the recipient without revealing the full
38    ///   address.
39    /// - `recipient` is optional; when provided, the message is
40    ///   end-to-end encrypted with the recipient's public key.
41    pub async fn send(
42        &self,
43        batch_id: &BatchId,
44        topic: &Topic,
45        target: &str,
46        data: impl Into<Bytes>,
47        recipient: Option<&PublicKey>,
48    ) -> Result<(), Error> {
49        let path = format!("pss/send/{}/{target}", topic.to_hex());
50        let mut builder = request(&self.inner, Method::POST, &path)?
51            .header("Swarm-Postage-Batch-Id", batch_id.to_hex())
52            .body(data.into());
53        if let Some(pk) = recipient {
54            let compressed = pk.compressed_hex()?;
55            builder = builder.query(&[("recipient", compressed)]);
56        }
57        self.inner.send(builder).await?;
58        Ok(())
59    }
60
61    /// `GET /pss/subscribe/{topic}` over a WebSocket. Returns a
62    /// [`Subscription`] yielding decoded message bytes via
63    /// [`Subscription::recv`].
64    pub async fn subscribe(&self, topic: &Topic) -> Result<Subscription, Error> {
65        let path = format!("pss/subscribe/{}", topic.to_hex());
66        Subscription::open(&self.inner, &path).await
67    }
68
69    /// One-shot helper: subscribe, wait for the next message (with
70    /// `timeout`), close the subscription, return the bytes.
71    pub async fn receive(&self, topic: &Topic, timeout: Duration) -> Result<Bytes, Error> {
72        let mut sub = self.subscribe(topic).await?;
73        let result = match time::timeout(timeout, sub.recv()).await {
74            Ok(Some(b)) => Ok(b),
75            Ok(None) => Err(Error::argument("pss subscription closed before message")),
76            Err(_) => Err(Error::argument("pss receive timed out")),
77        };
78        sub.cancel();
79        result
80    }
81}
82
83/// Active PSS or GSOC websocket subscription. Drop it (or call
84/// [`Subscription::cancel`]) to close the underlying socket.
85#[derive(Debug)]
86pub struct Subscription {
87    rx: mpsc::Receiver<Bytes>,
88    cancel: Option<mpsc::Sender<()>>,
89}
90
91impl Subscription {
92    /// Open a websocket subscription against an arbitrary `path` on
93    /// the configured base URL. Used internally by PSS and GSOC.
94    pub(crate) async fn open(inner: &Arc<Inner>, path: &str) -> Result<Self, Error> {
95        let mut url = inner.url(path)?;
96        let scheme = match url.scheme() {
97            "http" => "ws",
98            "https" => "wss",
99            other => {
100                return Err(Error::argument(format!(
101                    "unsupported base URL scheme for websocket: {other}"
102                )));
103            }
104        };
105        url.set_scheme(scheme)
106            .map_err(|_| Error::argument("failed to set websocket scheme"))?;
107
108        let (ws, _resp) = connect_async(url.as_str())
109            .await
110            .map_err(|e| Error::argument(format!("websocket connect: {e}")))?;
111        let (mut sink, mut stream) = ws.split();
112
113        let (msg_tx, msg_rx) = mpsc::channel::<Bytes>(64);
114        let (cancel_tx, mut cancel_rx) = mpsc::channel::<()>(1);
115
116        tokio::spawn(async move {
117            loop {
118                tokio::select! {
119                    _ = cancel_rx.recv() => {
120                        let _ = sink.send(Message::Close(None)).await;
121                        let _ = sink.close().await;
122                        break;
123                    }
124                    msg = stream.next() => {
125                        match msg {
126                            Some(Ok(Message::Binary(b))) => {
127                                if msg_tx.send(Bytes::from(b)).await.is_err() {
128                                    break;
129                                }
130                            }
131                            Some(Ok(Message::Text(t))) => {
132                                if msg_tx
133                                    .send(Bytes::from(t.into_bytes()))
134                                    .await
135                                    .is_err()
136                                {
137                                    break;
138                                }
139                            }
140                            Some(Ok(Message::Close(_))) | None => break,
141                            Some(Ok(_)) => continue, // Ping/Pong/Frame ignored
142                            Some(Err(_)) => break,
143                        }
144                    }
145                }
146            }
147        });
148
149        Ok(Self {
150            rx: msg_rx,
151            cancel: Some(cancel_tx),
152        })
153    }
154
155    /// Wait for the next message. Returns `None` when the
156    /// subscription has been closed.
157    pub async fn recv(&mut self) -> Option<Bytes> {
158        self.rx.recv().await
159    }
160
161    /// Close the subscription. Idempotent.
162    pub fn cancel(&mut self) {
163        if let Some(tx) = self.cancel.take() {
164            let _ = tx.try_send(());
165        }
166    }
167}
168
169impl Drop for Subscription {
170    fn drop(&mut self) {
171        self.cancel();
172    }
173}