Skip to main content

rune_framework/
stream.rs

1//! StreamSender for streaming Rune handlers.
2
3use bytes::Bytes;
4use tokio::sync::mpsc;
5
6use crate::error::{SdkError, SdkResult};
7
8/// Sends stream events to the runtime.
9///
10/// Accepts `Bytes`, `&str`, `String`, or any `serde::Serialize` value
11/// (via [`emit_json`]).
12#[derive(Debug)]
13pub struct StreamSender {
14    tx: mpsc::Sender<Bytes>,
15    ended: std::sync::atomic::AtomicBool,
16}
17
18impl StreamSender {
19    /// Create a new StreamSender backed by the given channel sender.
20    ///
21    /// In typical usage this is called internally by the Caster.
22    /// Exposed publicly for testing and advanced use cases.
23    pub fn new(tx: mpsc::Sender<Bytes>) -> Self {
24        Self {
25            tx,
26            ended: std::sync::atomic::AtomicBool::new(false),
27        }
28    }
29
30    /// Whether the stream has been ended.
31    pub fn is_ended(&self) -> bool {
32        self.ended.load(std::sync::atomic::Ordering::Relaxed)
33    }
34
35    /// Emit raw bytes.
36    pub async fn emit(&self, data: Bytes) -> SdkResult<()> {
37        if self.is_ended() {
38            return Err(SdkError::StreamEnded);
39        }
40        self.tx
41            .send(data)
42            .await
43            .map_err(|e| SdkError::ChannelSend(e.to_string()))
44    }
45
46    /// Emit a string (auto-encoded to UTF-8 bytes).
47    pub async fn emit_str(&self, data: &str) -> SdkResult<()> {
48        self.emit(Bytes::from(data.to_owned())).await
49    }
50
51    /// Emit a serializable value as JSON bytes.
52    pub async fn emit_json<T: serde::Serialize>(&self, data: &T) -> SdkResult<()> {
53        let json = serde_json::to_vec(data)
54            .map_err(|e| SdkError::Other(format!("json serialization error: {e}")))?;
55        self.emit(Bytes::from(json)).await
56    }
57
58    /// Signal end of stream. Idempotent — calling multiple times is safe.
59    pub fn end(&self) {
60        self.ended
61            .store(true, std::sync::atomic::Ordering::Relaxed);
62    }
63}