Skip to main content

rmux_sdk/events/
streams.rs

1//! SDK-owned facade over the v1 pane-output subscription protocol.
2//!
3//! The two opaque streams in this module — [`PaneOutputStream`] for raw
4//! bytes plus sequence/lag notices, and [`PaneLineStream`] for rendered
5//! UTF-8 lines — are constructed through fallible [`crate::Pane`] methods and
6//! drive the daemon's `SubscribePaneOutput`, `PaneOutputCursor`, and
7//! `UnsubscribePaneOutput` endpoints internally. They never expose
8//! [`rmux_proto::PaneOutputSubscriptionId`] to SDK callers.
9//!
10//! ## Raw bytes vs rendered lines
11//!
12//! [`PaneOutputStream`] emits [`PaneOutputChunk`] items. Bytes
13//! ([`PaneOutputChunk::Bytes`]) preserve every payload byte the daemon
14//! delivered, including NUL and bytes that are not valid UTF-8, and pair
15//! them with the monotonic per-pane [`PaneOutputChunk::Bytes::sequence`]
16//! the daemon assigned. Lag notices ([`PaneOutputChunk::Lag`]) surface
17//! the daemon-side gap between the cursor's expected sequence and the
18//! oldest retained sequence verbatim, including the bounded recent live
19//! bytes the daemon retained at gap detection time. The raw byte stream
20//! never converts payloads through `String::from_utf8_lossy` and never
21//! alters the byte sequence the daemon delivered.
22//!
23//! [`PaneLineStream`] is a strict superset built on top of the raw stream
24//! that adds two well-isolated transformations:
25//!
26//! * **Lossy UTF-8 rendering.** Each completed line's bytes are decoded
27//!   through `String::from_utf8_lossy`, which replaces every byte
28//!   sequence that is not valid UTF-8 with the Unicode replacement
29//!   character `U+FFFD`. The lossy conversion is applied only when the
30//!   line is yielded — not on the underlying byte stream — so a caller
31//!   that wants byte-faithful output should use [`PaneOutputStream`]
32//!   instead. Embedded NUL bytes survive into the rendered string as
33//!   `\0`, only invalid UTF-8 byte sequences are replaced.
34//! * **Partial-line buffering.** The line stream splits on the LF byte
35//!   `b'\n'` only. Carriage returns and any other bytes are preserved
36//!   inside the line. Bytes that are not yet terminated by an LF stay in
37//!   an internal buffer and are not yielded; the buffer is flushed only
38//!   when the next LF arrives. A trailing partial line that the daemon
39//!   never terminates with LF is dropped when the stream ends or lag
40//!   fires, because the next sequence's bytes may not begin at a line
41//!   boundary.
42//!
43//! On a [`PaneOutputChunk::Lag`] the line stream drops the partial-line
44//! buffer (the next sequence may be discontinuous with the buffered
45//! bytes), forwards the lag notice as [`PaneLineItem::Lag`], and resumes
46//! line splitting from a clean state on subsequent bytes. Callers that
47//! want to recover the dropped partial bytes can read
48//! [`PaneLagNotice::recent`].
49//!
50//! ## Drop / unsubscribe contract
51//!
52//! Each stream owns one per-connection subscription on the daemon, and
53//! every drop emits at most one best-effort
54//! [`UnsubscribePaneOutput`](rmux_proto::UnsubscribePaneOutputRequest)
55//! request through the same transport actor. The unsubscribe is fire and
56//! forget — its response is discarded, late or duplicate
57//! `unsubscribe-pane-output` errors do not propagate, and a closed
58//! transport silently no-ops. The daemon's unsubscribe handler only
59//! removes the subscription record; it does not close the pane, the
60//! window, the session, the underlying child process, or the daemon
61//! itself, so dropping an unfinished stream is always safe.
62//!
63//! Wrapping the line stream around the byte stream means the inner byte
64//! stream still owns its own transport drop guard and emits its own
65//! unsubscribe — there is exactly one unsubscribe per subscription
66//! regardless of which wrapper is dropped.
67
68use std::collections::VecDeque;
69use std::time::Duration;
70
71use rmux_proto::{
72    PaneOutputCursorRequest, PaneOutputEvent, PaneOutputLagNotice as ProtoLagNotice,
73    PaneOutputSubscriptionId, PaneOutputSubscriptionStart, PaneRecentOutput as ProtoRecentOutput,
74    PaneTargetRef, Request, Response, SubscribePaneOutputRefRequest, SubscribePaneOutputRequest,
75    UnsubscribePaneOutputRequest, CAPABILITY_SDK_PANE_BY_ID,
76};
77
78use crate::handles::session::unexpected_response;
79use crate::transport::{DropGuard, TransportClient};
80use crate::{Result, RmuxError};
81
82const PANE_OUTPUT_BATCH_SIZE: u16 = 256;
83const POLL_INITIAL_DELAY: Duration = Duration::from_millis(2);
84const POLL_MAX_DELAY: Duration = Duration::from_millis(50);
85
86/// Where a pane-output stream should anchor its cursor at subscription time.
87///
88/// Mirrors the daemon's own
89/// [`rmux_proto::PaneOutputSubscriptionStart`]
90/// vocabulary as a SDK-owned enum so callers do not depend on
91/// `rmux-proto` directly.
92#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Hash)]
93#[non_exhaustive]
94pub enum PaneOutputStart {
95    /// Start after the newest output currently retained by the pane. The
96    /// stream will only deliver bytes the daemon appends after this call.
97    #[default]
98    Now,
99    /// Start at the oldest retained output event, replaying the daemon's
100    /// retained backlog before delivering newly produced bytes.
101    Oldest,
102}
103
104impl PaneOutputStart {
105    fn into_proto(self) -> PaneOutputSubscriptionStart {
106        match self {
107            Self::Now => PaneOutputSubscriptionStart::Now,
108            Self::Oldest => PaneOutputSubscriptionStart::Oldest,
109        }
110    }
111}
112
113/// Recent retained pane bytes attached to a [`PaneLagNotice`].
114///
115/// The byte payload is never converted through `String::from_utf8_lossy`;
116/// it is the exact byte run the daemon retained at gap-detection time,
117/// bounded by the daemon's `MAX_LAG_RECENT_BYTES` window.
118#[derive(Debug, Default, Clone, PartialEq, Eq, Hash)]
119pub struct PaneRecentOutput {
120    /// Retained recent raw pane output bytes.
121    pub bytes: Vec<u8>,
122    /// Oldest output sequence contributing retained bytes.
123    pub oldest_sequence: Option<u64>,
124    /// Newest output sequence contributing retained bytes.
125    pub newest_sequence: Option<u64>,
126}
127
128impl PaneRecentOutput {
129    fn from_proto(value: ProtoRecentOutput) -> Self {
130        Self {
131            bytes: value.bytes,
132            oldest_sequence: value.oldest_sequence,
133            newest_sequence: value.newest_sequence,
134        }
135    }
136}
137
138/// Detailed gap report carried by [`PaneOutputChunk::Lag`].
139///
140/// Sequence numbers are exact mirrors of the daemon's own per-pane output
141/// counter. `expected_sequence` is the next sequence the cursor was
142/// waiting for before lag was detected; `resume_sequence` is the oldest
143/// retained sequence the daemon will start delivering from again.
144#[derive(Debug, Default, Clone, PartialEq, Eq, Hash)]
145pub struct PaneLagNotice {
146    /// Sequence the subscriber expected before lag was detected.
147    pub expected_sequence: u64,
148    /// Oldest retained sequence where the subscriber will resume.
149    pub resume_sequence: u64,
150    /// Number of output events skipped by this lag notice.
151    pub missed_events: u64,
152    /// Newest output sequence appended when lag was detected.
153    pub newest_sequence: u64,
154    /// Bounded recent live output the daemon retained at lag time.
155    pub recent: PaneRecentOutput,
156}
157
158impl PaneLagNotice {
159    fn from_proto(value: ProtoLagNotice) -> Self {
160        Self {
161            expected_sequence: value.expected_sequence,
162            resume_sequence: value.resume_sequence,
163            missed_events: value.missed_events,
164            newest_sequence: value.newest_sequence,
165            recent: PaneRecentOutput::from_proto(value.recent),
166        }
167    }
168}
169
170/// One item delivered by [`PaneOutputStream`].
171#[derive(Debug, Clone, PartialEq, Eq, Hash)]
172#[non_exhaustive]
173pub enum PaneOutputChunk {
174    /// Raw decoded pane bytes paired with the daemon-assigned monotonic
175    /// per-pane sequence.
176    Bytes {
177        /// Per-pane monotonic output sequence.
178        sequence: u64,
179        /// Arbitrary raw pane bytes — may include NUL or non-UTF-8 byte
180        /// sequences.
181        bytes: Vec<u8>,
182    },
183    /// A daemon-side gap report. Subsequent [`Self::Bytes`] chunks resume
184    /// at [`PaneLagNotice::resume_sequence`].
185    Lag(PaneLagNotice),
186}
187
188/// One item delivered by [`PaneLineStream`].
189#[derive(Debug, Clone, PartialEq, Eq, Hash)]
190#[non_exhaustive]
191pub enum PaneLineItem {
192    /// Decoded line text, with `String::from_utf8_lossy` already applied.
193    /// The trailing `\n` and any other line-terminator bytes have been
194    /// stripped from `Line.text`.
195    Line {
196        /// Rendered line text.
197        text: String,
198    },
199    /// A daemon-side gap report propagated unchanged from the underlying
200    /// raw byte stream. The line stream drops its partial-line buffer
201    /// when this fires; subsequent line splitting starts from a clean
202    /// state.
203    Lag(PaneLagNotice),
204}
205
206/// Opaque live stream of pane output bytes plus sequence/lag notices.
207///
208/// Construction goes through [`Pane::output_stream`](crate::Pane::output_stream).
209/// Use [`PaneOutputStream::next`] to drive the cursor; the per-call
210/// polling cadence and any backoff is internal and unspecified. The
211/// daemon's [`PaneOutputSubscriptionId`] is *not* observable through this
212/// type.
213pub struct PaneOutputStream {
214    inner: PaneSubscription,
215    pending: VecDeque<PaneOutputChunk>,
216    poll_delay: Duration,
217}
218
219/// Opaque live stream of rendered pane output lines.
220///
221/// Construction goes through [`Pane::line_stream`](crate::Pane::line_stream).
222/// See the module docs for the lossy UTF-8 and partial-line buffering
223/// rules.
224pub struct PaneLineStream {
225    inner: PaneOutputStream,
226    line_buffer: Vec<u8>,
227    pending: VecDeque<PaneLineItem>,
228}
229
230struct PaneSubscription {
231    transport: TransportClient,
232    subscription_id: PaneOutputSubscriptionId,
233    // The drop guard is held only for its destructor side effect: it
234    // fires the best-effort `unsubscribe-pane-output` request when the
235    // parent stream is dropped. The rename signals to the linter that
236    // we never read it; the guard's own [`Drop`] is the entire reason
237    // it lives in this struct.
238    _drop_guard: DropGuard,
239    closed: bool,
240}
241
242impl PaneOutputStream {
243    pub(crate) async fn open(
244        transport: TransportClient,
245        target: PaneTargetRef,
246        start: PaneOutputStart,
247    ) -> Result<Self> {
248        let start = start.into_proto();
249        let response = match target {
250            PaneTargetRef::Slot(target) => {
251                transport
252                    .request(Request::SubscribePaneOutput(SubscribePaneOutputRequest {
253                        target,
254                        start,
255                    }))
256                    .await?
257            }
258            PaneTargetRef::Id { .. } => {
259                crate::capabilities::require(&transport, &[CAPABILITY_SDK_PANE_BY_ID]).await?;
260                transport
261                    .request(Request::SubscribePaneOutputRef(
262                        SubscribePaneOutputRefRequest { target, start },
263                    ))
264                    .await?
265            }
266        };
267
268        let subscription_id = match response {
269            Response::SubscribePaneOutput(response) => response.subscription_id,
270            response => return Err(unexpected_response("subscribe-pane-output", response)),
271        };
272
273        let unsubscribe =
274            Request::UnsubscribePaneOutput(UnsubscribePaneOutputRequest { subscription_id });
275        let drop_guard = DropGuard::best_effort(transport.clone(), unsubscribe);
276
277        Ok(Self {
278            inner: PaneSubscription {
279                transport,
280                subscription_id,
281                _drop_guard: drop_guard,
282                closed: false,
283            },
284            pending: VecDeque::new(),
285            poll_delay: POLL_INITIAL_DELAY,
286        })
287    }
288
289    /// Returns the next chunk, awaiting daemon output if necessary.
290    ///
291    /// Returns `Ok(None)` once the daemon reports the subscription is no
292    /// longer alive — for example after the pane closed and the daemon
293    /// removed the subscription record. The drop-time best-effort
294    /// unsubscribe still runs in that case.
295    pub async fn next(&mut self) -> Result<Option<PaneOutputChunk>> {
296        if let Some(chunk) = self.pop_pending_chunk() {
297            return Ok(Some(chunk));
298        }
299        if self.inner.closed {
300            return Ok(None);
301        }
302
303        loop {
304            match self.refill_once().await? {
305                RefillOutcome::Closed => {
306                    self.inner.closed = true;
307                    return Ok(None);
308                }
309                RefillOutcome::Filled => {
310                    if let Some(chunk) = self.pop_pending_chunk() {
311                        self.poll_delay = POLL_INITIAL_DELAY;
312                        return Ok(Some(chunk));
313                    }
314                    let delay = self.poll_delay;
315                    self.poll_delay = (self.poll_delay * 2).min(POLL_MAX_DELAY);
316                    tokio::time::sleep(delay).await;
317                }
318            }
319        }
320    }
321
322    /// Drains any chunks that the daemon already has queued for this
323    /// subscription. Returns an empty vec when no chunks were ready.
324    ///
325    /// `poll_once` performs exactly one
326    /// [`PaneOutputCursorRequest`] round trip and never sleeps, which
327    /// makes it the appropriate primitive for callers that want explicit
328    /// control over their own backoff.
329    pub async fn poll_once(&mut self) -> Result<Vec<PaneOutputChunk>> {
330        let mut buffered: Vec<PaneOutputChunk> = self.pending.drain(..).collect();
331        if self.inner.closed {
332            return Ok(buffered);
333        }
334
335        match self.refill_once().await? {
336            RefillOutcome::Closed => {
337                self.inner.closed = true;
338            }
339            RefillOutcome::Filled => {
340                buffered.extend(self.pending.drain(..));
341                if buffered.iter().any(output_chunk_is_eof) {
342                    self.inner.closed = true;
343                }
344            }
345        }
346        Ok(buffered)
347    }
348
349    async fn refill_once(&mut self) -> Result<RefillOutcome> {
350        let request = Request::PaneOutputCursor(PaneOutputCursorRequest {
351            subscription_id: self.inner.subscription_id,
352            max_events: Some(PANE_OUTPUT_BATCH_SIZE),
353        });
354
355        match self.inner.transport.request(request).await {
356            Ok(Response::PaneOutputCursor(cursor)) => {
357                self.inner
358                    .validate_response_subscription("pane-output-cursor", cursor.subscription_id)?;
359                ingest_cursor(&mut self.pending, cursor.events);
360                Ok(RefillOutcome::Filled)
361            }
362            Ok(Response::PaneOutputLag(lag)) => {
363                self.inner
364                    .validate_response_subscription("pane-output-lag", lag.subscription_id)?;
365                self.pending
366                    .push_back(PaneOutputChunk::Lag(PaneLagNotice::from_proto(lag.lag)));
367                Ok(RefillOutcome::Filled)
368            }
369            Ok(response) => Err(unexpected_response("pane-output-cursor", response)),
370            Err(error) if is_subscription_gone(&error) => Ok(RefillOutcome::Closed),
371            Err(error) => Err(error),
372        }
373    }
374
375    fn pop_pending_chunk(&mut self) -> Option<PaneOutputChunk> {
376        let chunk = self.pending.pop_front()?;
377        if output_chunk_is_eof(&chunk) {
378            self.inner.closed = true;
379        }
380        Some(chunk)
381    }
382}
383
384impl PaneSubscription {
385    fn validate_response_subscription(
386        &self,
387        command: &'static str,
388        response_id: PaneOutputSubscriptionId,
389    ) -> Result<()> {
390        if response_id == self.subscription_id {
391            return Ok(());
392        }
393        Err(subscription_mismatch_error(
394            command,
395            self.subscription_id,
396            response_id,
397        ))
398    }
399}
400
401fn subscription_mismatch_error(
402    command: &'static str,
403    expected: PaneOutputSubscriptionId,
404    got: PaneOutputSubscriptionId,
405) -> RmuxError {
406    RmuxError::protocol(rmux_proto::RmuxError::Server(format!(
407        "rmux daemon sent subscription id {} in `{command}` response for subscription {}",
408        got.as_u64(),
409        expected.as_u64()
410    )))
411}
412
413fn ingest_cursor(target: &mut VecDeque<PaneOutputChunk>, events: Vec<PaneOutputEvent>) {
414    target.reserve(events.len());
415    for event in events {
416        target.push_back(PaneOutputChunk::Bytes {
417            sequence: event.sequence,
418            bytes: event.bytes,
419        });
420    }
421}
422
423fn output_chunk_is_eof(chunk: &PaneOutputChunk) -> bool {
424    matches!(chunk, PaneOutputChunk::Bytes { bytes, .. } if bytes.is_empty())
425}
426
427enum RefillOutcome {
428    Filled,
429    Closed,
430}
431
432fn is_subscription_gone(error: &RmuxError) -> bool {
433    match error {
434        RmuxError::Protocol {
435            source: rmux_proto::RmuxError::Server(message),
436        } => message == "subscription not found" || message == "subscription receiver not found",
437        _ => false,
438    }
439}
440
441impl PaneLineStream {
442    pub(crate) fn wrap(inner: PaneOutputStream) -> Self {
443        Self {
444            inner,
445            line_buffer: Vec::new(),
446            pending: VecDeque::new(),
447        }
448    }
449
450    /// Returns the next line or lag notice, awaiting daemon output if
451    /// necessary.
452    ///
453    /// Returns `Ok(None)` when the underlying subscription is gone. Any
454    /// trailing partial-line bytes that were never terminated by `\n`
455    /// are dropped at end-of-stream because the daemon never delivered a
456    /// terminator — they did not represent a complete line.
457    pub async fn next(&mut self) -> Result<Option<PaneLineItem>> {
458        loop {
459            if let Some(item) = self.pending.pop_front() {
460                return Ok(Some(item));
461            }
462            match self.inner.next().await? {
463                Some(PaneOutputChunk::Bytes { bytes, .. }) => {
464                    split_lines(&mut self.line_buffer, &bytes, &mut self.pending);
465                }
466                Some(PaneOutputChunk::Lag(notice)) => {
467                    // Drop partial-line buffer because the byte stream is
468                    // discontinuous after a lag — the next bytes may not
469                    // begin at a line boundary, so concatenating them
470                    // would produce a synthetic line.
471                    self.line_buffer.clear();
472                    self.pending.push_back(PaneLineItem::Lag(notice));
473                }
474                None => return Ok(None),
475            }
476        }
477    }
478}
479
480fn split_lines(buffer: &mut Vec<u8>, bytes: &[u8], out: &mut VecDeque<PaneLineItem>) {
481    for byte in bytes {
482        if *byte == b'\n' {
483            let line_bytes = std::mem::take(buffer);
484            out.push_back(PaneLineItem::Line {
485                text: String::from_utf8_lossy(&line_bytes).into_owned(),
486            });
487        } else {
488            buffer.push(*byte);
489        }
490    }
491}
492
493impl std::fmt::Debug for PaneOutputStream {
494    fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
495        formatter
496            .debug_struct("PaneOutputStream")
497            .field("closed", &self.inner.closed)
498            .field("buffered_chunks", &self.pending.len())
499            .finish_non_exhaustive()
500    }
501}
502
503impl std::fmt::Debug for PaneLineStream {
504    fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
505        formatter
506            .debug_struct("PaneLineStream")
507            .field("buffered_bytes", &self.line_buffer.len())
508            .field("pending_items", &self.pending.len())
509            .finish_non_exhaustive()
510    }
511}
512
513#[cfg(test)]
514#[path = "streams_contract_tests.rs"]
515mod streams_contract_tests;
516
517#[cfg(test)]
518#[path = "streams_tests.rs"]
519mod streams_tests;