dink-sdk 0.3.1

Rust SDK for Dink edge mesh platform — JSON-over-NATS RPC for IoT and edge computing
Documentation
use std::marker::PhantomData;
use std::pin::Pin;
use std::task::{Context, Poll};

use async_nats::Subscriber;
use bytes::Bytes;
use futures::{Stream, StreamExt};
use serde::de::DeserializeOwned;

use crate::error::{DinkError, Result};

/// A typed stream of messages from a server-streaming RPC.
///
/// Wraps NATS subscriptions for data, done, and optional error channels.
/// Automatically cancels the server-side stream on drop if not already complete.
///
/// Implements [`futures::Stream`] so it can be used with `StreamExt` combinators:
/// ```ignore
/// use futures::StreamExt;
/// while let Some(item) = stream.next().await {
///     let value = item?;
///     // process value
/// }
/// ```
pub struct DinkStream<T> {
    data_sub: Subscriber,
    done_sub: Subscriber,
    _error_sub: Option<Subscriber>,
    cancel_subject: Option<String>,
    nc: async_nats::Client,
    done: bool,
    _phantom: PhantomData<T>,
}

impl<T: DeserializeOwned> DinkStream<T> {
    pub(crate) fn new(
        data_sub: Subscriber,
        done_sub: Subscriber,
        error_sub: Option<Subscriber>,
        cancel_subject: Option<String>,
        nc: async_nats::Client,
    ) -> Self {
        Self {
            data_sub,
            done_sub,
            _error_sub: error_sub,
            cancel_subject,
            nc,
            done: false,
            _phantom: PhantomData,
        }
    }

    /// Receive the next item from the stream.
    /// Returns `None` when the stream is complete (done signal received or subscription closed).
    pub async fn recv(&mut self) -> Result<Option<T>> {
        if self.done {
            return Ok(None);
        }

        tokio::select! {
            msg = self.data_sub.next() => {
                match msg {
                    Some(msg) => {
                        let parsed: T = crate::internal::envelope::parse_service_response(&msg.payload)?;
                        Ok(Some(parsed))
                    }
                    None => {
                        self.done = true;
                        Ok(None)
                    }
                }
            }
            _done = self.done_sub.next() => {
                self.done = true;
                Ok(None)
            }
        }
    }

    /// Cancel the server-side stream.
    ///
    /// Publishes an empty message to the cancel subject provided by the server
    /// in the stream acknowledgement. Safe to call multiple times.
    pub async fn cancel(&self) -> Result<()> {
        if let Some(ref subject) = self.cancel_subject {
            self.nc
                .publish(subject.clone(), Bytes::new())
                .await
                .map_err(|e| DinkError::Nats(e.to_string()))?;
        }
        Ok(())
    }
}

impl<T: DeserializeOwned + Unpin> Stream for DinkStream<T> {
    type Item = Result<T>;

    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        let this = self.get_mut();

        if this.done {
            return Poll::Ready(None);
        }

        // Check done_sub first — completion signal takes priority.
        match Pin::new(&mut this.done_sub).poll_next(cx) {
            Poll::Ready(Some(_)) => {
                this.done = true;
                return Poll::Ready(None);
            }
            Poll::Ready(None) => {
                // done_sub subscription closed unexpectedly; treat as done.
                this.done = true;
                return Poll::Ready(None);
            }
            Poll::Pending => {}
        }

        // Then check data_sub.
        match Pin::new(&mut this.data_sub).poll_next(cx) {
            Poll::Ready(Some(msg)) => {
                let result = crate::internal::envelope::parse_service_response(&msg.payload);
                Poll::Ready(Some(result))
            }
            Poll::Ready(None) => {
                this.done = true;
                Poll::Ready(None)
            }
            Poll::Pending => Poll::Pending,
        }
    }
}

impl<T> Drop for DinkStream<T> {
    fn drop(&mut self) {
        if !self.done {
            if let Some(ref subject) = self.cancel_subject {
                let nc = self.nc.clone();
                let subject = subject.clone();
                // Best-effort cancel: spawn a task since publish is async.
                // If the runtime is shutting down, this silently fails — acceptable.
                let _ = tokio::spawn(async move {
                    let _ = nc.publish(subject, Bytes::new()).await;
                });
            }
        }
    }
}