nucel-sdk-api 0.3.0

Auto-generated Rust client for the nucel.dev REST API (OpenAPI-driven, progenitor-built)
Documentation
//! WebSocket helpers for endpoints that stream events over a persistent
//! connection (e.g. `/api/v1/notifications/stream`).
//!
//! Built on [`tokio_tungstenite`] with rustls + webpki roots so it works out
//! of the box against TLS deployments. Exposes a [`futures::Stream`] of
//! parsed JSON events — callers decide how to handle each one.
//!
//! # Example
//!
//! ```no_run
//! # async fn run() -> Result<(), Box<dyn std::error::Error>> {
//! use futures::StreamExt;
//! use nucel_sdk_api::ws::{NotificationStreamEvent, stream_notifications};
//!
//! let mut events = stream_notifications("https://nucel.dev", "ghp_token").await?;
//! while let Some(Ok(ev)) = events.next().await {
//!     match ev {
//!         NotificationStreamEvent::Init { unread_count } => {
//!             println!("connected, {unread_count} unread");
//!         }
//!         NotificationStreamEvent::Other(v) => {
//!             println!("event: {v}");
//!         }
//!     }
//! }
//! # Ok(())
//! # }
//! ```

use std::pin::Pin;
use std::task::{Context, Poll};

use futures::{Stream, StreamExt};
use tokio_tungstenite::tungstenite::Message;

/// An event on the notification stream.
///
/// The server sends `{"type":"init", ...}` as the first frame and arbitrary
/// event shapes afterwards. This enum handles the one well-known frame and
/// preserves everything else as raw JSON so new server-side event types
/// don't break older clients.
#[derive(Debug, Clone)]
pub enum NotificationStreamEvent {
    /// First frame on connect. Contains the user's unread notification count.
    Init { unread_count: u64 },
    /// Any other event kind. The raw JSON is preserved.
    Other(serde_json::Value),
}

fn parse_event(raw: &str) -> Result<NotificationStreamEvent, serde_json::Error> {
    let value: serde_json::Value = serde_json::from_str(raw)?;
    if let Some(ty) = value.get("type").and_then(|v| v.as_str()) {
        if ty == "init" {
            if let Some(count) = value.get("unread_count").and_then(|v| v.as_u64()) {
                return Ok(NotificationStreamEvent::Init {
                    unread_count: count,
                });
            }
        }
    }
    Ok(NotificationStreamEvent::Other(value))
}

/// Errors the WebSocket stream can produce.
#[derive(Debug, thiserror::Error)]
pub enum WsError {
    #[error("invalid base URL: {0}")]
    Url(#[from] url::ParseError),
    #[error("WebSocket error: {0}")]
    Ws(#[from] tokio_tungstenite::tungstenite::Error),
    #[error("failed to parse event JSON: {0}")]
    Parse(#[from] serde_json::Error),
}

// `thiserror` is a dev-only addition; if the caller doesn't want the extra
// dep we still provide a manual `From` for convenience via the crate root.

/// Open a WebSocket to `/api/v1/notifications/stream` and return a stream
/// of parsed events. The `token` is sent as both an `Authorization: Bearer`
/// header AND a `?token=` query param so the call works against any server
/// config.
pub async fn stream_notifications(
    base_url: &str,
    token: &str,
) -> Result<impl Stream<Item = Result<NotificationStreamEvent, WsError>>, WsError> {
    let ws_url = build_ws_url(base_url, "/api/v1/notifications/stream", token)?;
    let request = tokio_tungstenite::tungstenite::http::Request::builder()
        .method("GET")
        .uri(ws_url.as_str())
        .header("Authorization", format!("Bearer {token}"))
        .header("User-Agent", concat!("nucel-sdk-api/", env!("CARGO_PKG_VERSION")))
        .header("Host", ws_url.host_str().unwrap_or(""))
        .header("Connection", "Upgrade")
        .header("Upgrade", "websocket")
        .header("Sec-WebSocket-Version", "13")
        .header(
            "Sec-WebSocket-Key",
            tokio_tungstenite::tungstenite::handshake::client::generate_key(),
        )
        .body(())
        .expect("valid request");
    let (ws, _resp) = tokio_tungstenite::connect_async(request).await?;
    Ok(NotificationStream { inner: ws })
}

struct NotificationStream<S> {
    inner: S,
}

impl<S> Stream for NotificationStream<S>
where
    S: Stream<Item = Result<Message, tokio_tungstenite::tungstenite::Error>> + Unpin,
{
    type Item = Result<NotificationStreamEvent, WsError>;

    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        loop {
            match futures::ready!(self.inner.poll_next_unpin(cx)) {
                Some(Ok(Message::Text(text))) => {
                    return Poll::Ready(Some(parse_event(&text).map_err(Into::into)));
                }
                Some(Ok(Message::Binary(bin))) => match std::str::from_utf8(&bin) {
                    Ok(text) => {
                        return Poll::Ready(Some(parse_event(text).map_err(Into::into)));
                    }
                    Err(_) => continue,
                },
                Some(Ok(Message::Ping(_) | Message::Pong(_) | Message::Frame(_))) => continue,
                Some(Ok(Message::Close(_))) | None => return Poll::Ready(None),
                Some(Err(e)) => return Poll::Ready(Some(Err(e.into()))),
            }
        }
    }
}

/// Turn `http(s)://host[:port]` + a path into a `ws(s)://host[:port]/path?token=...`.
pub fn build_ws_url(base_url: &str, path: &str, token: &str) -> Result<url::Url, url::ParseError> {
    let mut url = url::Url::parse(base_url)?.join(path)?;
    let scheme = if url.scheme() == "https" { "wss" } else { "ws" };
    let _ = url.set_scheme(scheme);
    url.query_pairs_mut().append_pair("token", token);
    Ok(url)
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn build_ws_url_converts_https_to_wss() {
        let url = build_ws_url("https://nucel.dev", "/api/v1/notifications/stream", "tok").unwrap();
        assert_eq!(url.scheme(), "wss");
        assert_eq!(url.host_str(), Some("nucel.dev"));
        assert_eq!(url.path(), "/api/v1/notifications/stream");
        assert!(url.query().unwrap().contains("token=tok"));
    }

    #[test]
    fn build_ws_url_converts_http_to_ws() {
        let url = build_ws_url("http://localhost:17321", "/api/v1/notifications/stream", "x").unwrap();
        assert_eq!(url.scheme(), "ws");
        assert_eq!(url.port(), Some(17321));
    }

    #[test]
    fn parse_event_init() {
        let raw = r#"{"type":"init","unread_count":7}"#;
        match parse_event(raw).unwrap() {
            NotificationStreamEvent::Init { unread_count } => assert_eq!(unread_count, 7),
            _ => panic!("expected Init"),
        }
    }

    #[test]
    fn parse_event_unknown_falls_back_to_other() {
        let raw = r#"{"type":"issue_opened","issue_id":"123"}"#;
        match parse_event(raw).unwrap() {
            NotificationStreamEvent::Other(v) => assert_eq!(v["issue_id"], "123"),
            _ => panic!("expected Other"),
        }
    }
}