subduction_http_longpoll 0.8.0

HTTP long-poll transport layer for the Subduction sync protocol
Documentation
//! HTTP long-poll connection implementing [`Transport<K>`].
//!
//! Unlike the WebSocket transport, there are no background listener/sender
//! tasks. Instead, the HTTP server's request handlers directly push to and
//! pull from the internal channels:
//!
//! ```text
//! POST /lp/send   ──► inbound_writer  ──► inbound_reader  ──► recv_bytes()
//! send_bytes()    ──► outbound_tx     ──► outbound_rx      ──► POST /lp/recv
//! ```
//!
//! The `call()` method delegates request-response correlation to the shared
//! [`Multiplexer`], identical to other transports.

use alloc::{sync::Arc, vec::Vec};

use async_lock::Mutex;
use future_form::{FutureForm, Local, Sendable, future_form};
use rand::{RngCore, rngs::OsRng};
use subduction_core::{peer::id::PeerId, transport::Transport};

use crate::error::{DisconnectionError, RecvError, SendError};

/// Channel capacity for outbound messages (server → client via `/lp/recv`).
const OUTBOUND_CHANNEL_CAPACITY: usize = 1024;

/// Channel capacity for inbound messages (client → server via `/lp/send`).
const INBOUND_CHANNEL_CAPACITY: usize = 128;

/// Shared interior state for an HTTP long-poll connection.
///
/// This struct is wrapped in an `Arc` so that clones share the same channels
/// and pending-request map — exactly like the WebSocket transport.
#[derive(Debug)]
struct Inner {
    chan_id: u64,
    peer_id: PeerId,

    /// Raw bytes from `send_bytes()` / `call()` → picked up by `/lp/recv` handler.
    outbound_tx: async_channel::Sender<Vec<u8>>,

    /// Raw bytes from `/lp/send` handler → picked up by `recv_bytes()`.
    inbound_writer: async_channel::Sender<Vec<u8>>,
    inbound_reader: async_channel::Receiver<Vec<u8>>,

    /// Keeps background poll/send tasks alive. Dropping this closes the cancel
    /// channel, which signals the tasks to exit. Set via
    /// [`HttpLongPollTransport::set_cancel_guard`] after construction.
    cancel_guard: Mutex<Option<async_channel::Sender<()>>>,
}

/// An HTTP long-poll connection that implements [`Transport<K>`].
///
/// Created during handshake and stored in the [`SessionStore`](crate::session::SessionStore).
/// The server's HTTP handlers interact with this connection's channels to
/// bridge HTTP request-response pairs to Subduction's bidirectional protocol.
#[derive(Debug, Clone)]
pub struct HttpLongPollTransport {
    inner: Arc<Inner>,
    /// Server-facing receiver: the `/lp/recv` handler drains this.
    outbound_rx: async_channel::Receiver<Vec<u8>>,
}

impl HttpLongPollTransport {
    /// Create a new HTTP long-poll connection for the given peer.
    #[must_use]
    pub fn new(peer_id: PeerId) -> Self {
        let (inbound_writer, inbound_reader) = async_channel::bounded(INBOUND_CHANNEL_CAPACITY);
        let (outbound_tx, outbound_rx) = async_channel::bounded(OUTBOUND_CHANNEL_CAPACITY);
        let chan_id = OsRng.next_u64();

        Self {
            inner: Arc::new(Inner {
                chan_id,
                peer_id,
                outbound_tx,
                inbound_writer,
                inbound_reader,
                cancel_guard: Mutex::new(None),
            }),
            outbound_rx,
        }
    }

    /// Store the cancel-channel sender so that background poll/send tasks stay
    /// alive for as long as this connection (and its clones) exist.
    ///
    /// When the connection is closed via [`Self::close`] or all clones are
    /// dropped, the guard is released and the tasks exit.
    pub async fn set_cancel_guard(&self, guard: async_channel::Sender<()>) {
        *self.inner.cancel_guard.lock().await = Some(guard);
    }

    /// Push raw inbound bytes into the connection's channels.
    ///
    /// Called by HTTP handlers (server's `POST /lp/send`) or the client poll
    /// loop when a message arrives from the remote peer.
    ///
    /// All bytes go to the inbound channel for `recv_bytes()`. Response routing
    /// is handled by
    /// [`Subduction::listen`](subduction_core::subduction::Subduction::listen).
    ///
    /// # Errors
    ///
    /// Returns an error if the inbound channel is full or closed.
    pub async fn push_inbound(
        &self,
        bytes: Vec<u8>,
    ) -> Result<(), async_channel::SendError<Vec<u8>>> {
        self.inner.inbound_writer.send(bytes).await
    }

    /// Pull the next outbound bytes destined for the client (via `POST /lp/recv`).
    ///
    /// Blocks until bytes are available or the channel closes.
    ///
    /// # Errors
    ///
    /// Returns an error if the outbound channel is closed.
    pub async fn pull_outbound(&self) -> Result<Vec<u8>, async_channel::RecvError> {
        self.outbound_rx.recv().await
    }

    /// Close the connection's channels and cancel background tasks.
    pub fn close(&self) {
        self.inner.inbound_writer.close();
        self.inner.outbound_tx.close();
        self.outbound_rx.close();
        self.inner.inbound_reader.close();
        // Clear the cancel guard synchronously (try_lock avoids blocking close).
        if let Some(mut guard) = self.inner.cancel_guard.try_lock() {
            *guard = None;
        }
    }
}

#[future_form(Sendable, Local)]
impl<K: FutureForm> Transport<K> for HttpLongPollTransport {
    type SendError = SendError;
    type RecvError = RecvError;
    type DisconnectionError = DisconnectionError;

    fn disconnect(&self) -> K::Future<'_, Result<(), Self::DisconnectionError>> {
        tracing::info!(peer_id = %self.inner.peer_id, "HttpLongPoll::disconnect");
        let conn = self.clone();
        K::from_future(async move {
            conn.close();
            Ok(())
        })
    }

    fn send_bytes(&self, bytes: &[u8]) -> K::Future<'_, Result<(), Self::SendError>> {
        tracing::debug!(
            "http-lp: sending {} outbound bytes to peer {}",
            bytes.len(),
            self.inner.peer_id
        );

        let data = bytes.to_vec();
        let tx = self.inner.outbound_tx.clone();
        K::from_future(async move {
            tx.send(data).await.map_err(|_| SendError)?;
            Ok(())
        })
    }

    fn recv_bytes(&self) -> K::Future<'_, Result<Vec<u8>, Self::RecvError>> {
        let chan = self.inner.inbound_reader.clone();
        tracing::debug!(
            chan_id = self.inner.chan_id,
            "waiting on recv {:?}",
            self.inner.peer_id
        );

        K::from_future(async move {
            let bytes = chan.recv().await.map_err(|_| {
                tracing::error!("inbound channel closed unexpectedly");
                RecvError
            })?;

            tracing::debug!("recv: inbound {} bytes", bytes.len());
            Ok(bytes)
        })
    }
}

impl PartialEq for HttpLongPollTransport {
    fn eq(&self, other: &Self) -> bool {
        Arc::ptr_eq(&self.inner, &other.inner)
    }
}

#[cfg(test)]
#[allow(clippy::expect_used)]
mod tests {
    use super::*;
    use subduction_core::connection::message::SyncMessage;

    #[tokio::test]
    async fn peer_id_preserved() {
        let peer_id = PeerId::new([1u8; 32]);
        let conn = HttpLongPollTransport::new(peer_id);

        assert_eq!(conn.inner.peer_id, peer_id);
    }

    #[tokio::test]
    async fn push_inbound_and_recv() {
        use sedimentree_core::id::SedimentreeId;
        use subduction_core::{
            connection::{Connection, message::RemoveSubscriptions},
            transport::message::MessageTransport,
        };

        let peer_id = PeerId::new([2u8; 32]);
        let conn = HttpLongPollTransport::new(peer_id);

        let msg = SyncMessage::RemoveSubscriptions(RemoveSubscriptions {
            ids: alloc::vec![SedimentreeId::from_bytes([0u8; 32])],
        });

        conn.push_inbound(msg.encode()).await.expect("push ok");
        let mt = MessageTransport::new(conn);
        let received = Connection::<Sendable, SyncMessage>::recv(&mt)
            .await
            .expect("recv ok");

        assert!(matches!(received, SyncMessage::RemoveSubscriptions(_)));
    }

    #[tokio::test]
    async fn send_and_pull_outbound() {
        use sedimentree_core::id::SedimentreeId;
        use subduction_core::{
            connection::{Connection, message::RemoveSubscriptions},
            transport::message::MessageTransport,
        };

        let peer_id = PeerId::new([3u8; 32]);
        let conn = HttpLongPollTransport::new(peer_id);

        let msg = SyncMessage::RemoveSubscriptions(RemoveSubscriptions {
            ids: alloc::vec![SedimentreeId::from_bytes([0u8; 32])],
        });

        let mt = MessageTransport::new(conn.clone());
        Connection::<Sendable, SyncMessage>::send(&mt, &msg)
            .await
            .expect("send ok");
        let pulled = conn.pull_outbound().await.expect("pull ok");
        let decoded = SyncMessage::try_decode(&pulled).expect("decode ok");

        assert!(matches!(decoded, SyncMessage::RemoveSubscriptions(_)));
    }
}