rmux_sdk/events/streams.rs
1//! SDK-owned facade over the v1 pane-output subscription protocol.
2//!
3//! The two opaque streams in this module — [`PaneOutputStream`] for raw
4//! bytes plus sequence/lag notices, and [`PaneLineStream`] for rendered
5//! UTF-8 lines — are constructed through fallible [`crate::Pane`] methods and
6//! drive the daemon's `SubscribePaneOutput`, `PaneOutputCursor`, and
7//! `UnsubscribePaneOutput` endpoints internally. They never expose
8//! [`rmux_proto::PaneOutputSubscriptionId`] to SDK callers.
9//!
10//! ## Raw bytes vs rendered lines
11//!
12//! [`PaneOutputStream`] emits [`PaneOutputChunk`] items. Bytes
13//! ([`PaneOutputChunk::Bytes`]) preserve every payload byte the daemon
14//! delivered, including NUL and bytes that are not valid UTF-8, and pair
15//! them with the monotonic per-pane [`PaneOutputChunk::Bytes::sequence`]
16//! the daemon assigned. Lag notices ([`PaneOutputChunk::Lag`]) surface
17//! the daemon-side gap between the cursor's expected sequence and the
18//! oldest retained sequence verbatim, including the bounded recent live
19//! bytes the daemon retained at gap detection time. The raw byte stream
20//! never converts payloads through `String::from_utf8_lossy` and never
21//! alters the byte sequence the daemon delivered.
22//!
23//! [`PaneLineStream`] is a strict superset built on top of the raw stream
24//! that adds two well-isolated transformations:
25//!
26//! * **Lossy UTF-8 rendering.** Each completed line's bytes are decoded
27//! through `String::from_utf8_lossy`, which replaces every byte
28//! sequence that is not valid UTF-8 with the Unicode replacement
29//! character `U+FFFD`. The lossy conversion is applied only when the
30//! line is yielded — not on the underlying byte stream — so a caller
31//! that wants byte-faithful output should use [`PaneOutputStream`]
32//! instead. Embedded NUL bytes survive into the rendered string as
33//! `\0`, only invalid UTF-8 byte sequences are replaced.
34//! * **Partial-line buffering.** The line stream splits on the LF byte
35//! `b'\n'` only. Carriage returns and any other bytes are preserved
36//! inside the line. Bytes that are not yet terminated by an LF stay in
37//! an internal buffer and are not yielded; the buffer is flushed only
38//! when the next LF arrives. A trailing partial line that the daemon
39//! never terminates with LF is dropped when the stream ends or lag
40//! fires, because the next sequence's bytes may not begin at a line
41//! boundary.
42//!
43//! On a [`PaneOutputChunk::Lag`] the line stream drops the partial-line
44//! buffer (the next sequence may be discontinuous with the buffered
45//! bytes), forwards the lag notice as [`PaneLineItem::Lag`], and resumes
46//! line splitting from a clean state on subsequent bytes. Callers that
47//! want to recover the dropped partial bytes can read
48//! [`PaneLagNotice::recent`].
49//!
50//! ## Drop / unsubscribe contract
51//!
52//! Each stream owns one per-connection subscription on the daemon, and
53//! every drop emits at most one best-effort
54//! [`UnsubscribePaneOutput`](rmux_proto::UnsubscribePaneOutputRequest)
55//! request through the same transport actor. The unsubscribe is fire and
56//! forget — its response is discarded, late or duplicate
57//! `unsubscribe-pane-output` errors do not propagate, and a closed
58//! transport silently no-ops. The daemon's unsubscribe handler only
59//! removes the subscription record; it does not close the pane, the
60//! window, the session, the underlying child process, or the daemon
61//! itself, so dropping an unfinished stream is always safe.
62//!
63//! Wrapping the line stream around the byte stream means the inner byte
64//! stream still owns its own transport drop guard and emits its own
65//! unsubscribe — there is exactly one unsubscribe per subscription
66//! regardless of which wrapper is dropped.
67
68use std::collections::VecDeque;
69use std::time::Duration;
70
71use rmux_proto::{
72 PaneOutputCursorRequest, PaneOutputEvent, PaneOutputLagNotice as ProtoLagNotice,
73 PaneOutputSubscriptionId, PaneOutputSubscriptionStart, PaneRecentOutput as ProtoRecentOutput,
74 PaneTargetRef, Request, Response, SubscribePaneOutputRefRequest, SubscribePaneOutputRequest,
75 UnsubscribePaneOutputRequest, CAPABILITY_SDK_PANE_BY_ID,
76};
77
78use crate::handles::session::unexpected_response;
79use crate::transport::{DropGuard, TransportClient};
80use crate::{Result, RmuxError};
81
82const PANE_OUTPUT_BATCH_SIZE: u16 = 256;
83const POLL_INITIAL_DELAY: Duration = Duration::from_millis(2);
84const POLL_MAX_DELAY: Duration = Duration::from_millis(50);
85
86/// Where a pane-output stream should anchor its cursor at subscription time.
87///
88/// Mirrors the daemon's own
89/// [`rmux_proto::PaneOutputSubscriptionStart`]
90/// vocabulary as a SDK-owned enum so callers do not depend on
91/// `rmux-proto` directly.
92#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Hash)]
93#[non_exhaustive]
94pub enum PaneOutputStart {
95 /// Start after the newest output currently retained by the pane. The
96 /// stream will only deliver bytes the daemon appends after this call.
97 #[default]
98 Now,
99 /// Start at the oldest retained output event, replaying the daemon's
100 /// retained backlog before delivering newly produced bytes.
101 Oldest,
102}
103
104impl PaneOutputStart {
105 fn into_proto(self) -> PaneOutputSubscriptionStart {
106 match self {
107 Self::Now => PaneOutputSubscriptionStart::Now,
108 Self::Oldest => PaneOutputSubscriptionStart::Oldest,
109 }
110 }
111}
112
113/// Recent retained pane bytes attached to a [`PaneLagNotice`].
114///
115/// The byte payload is never converted through `String::from_utf8_lossy`;
116/// it is the exact byte run the daemon retained at gap-detection time,
117/// bounded by the daemon's `MAX_LAG_RECENT_BYTES` window.
118#[derive(Debug, Default, Clone, PartialEq, Eq, Hash)]
119pub struct PaneRecentOutput {
120 /// Retained recent raw pane output bytes.
121 pub bytes: Vec<u8>,
122 /// Oldest output sequence contributing retained bytes.
123 pub oldest_sequence: Option<u64>,
124 /// Newest output sequence contributing retained bytes.
125 pub newest_sequence: Option<u64>,
126}
127
128impl PaneRecentOutput {
129 fn from_proto(value: ProtoRecentOutput) -> Self {
130 Self {
131 bytes: value.bytes,
132 oldest_sequence: value.oldest_sequence,
133 newest_sequence: value.newest_sequence,
134 }
135 }
136}
137
138/// Detailed gap report carried by [`PaneOutputChunk::Lag`].
139///
140/// Sequence numbers are exact mirrors of the daemon's own per-pane output
141/// counter. `expected_sequence` is the next sequence the cursor was
142/// waiting for before lag was detected; `resume_sequence` is the oldest
143/// retained sequence the daemon will start delivering from again.
144#[derive(Debug, Default, Clone, PartialEq, Eq, Hash)]
145pub struct PaneLagNotice {
146 /// Sequence the subscriber expected before lag was detected.
147 pub expected_sequence: u64,
148 /// Oldest retained sequence where the subscriber will resume.
149 pub resume_sequence: u64,
150 /// Number of output events skipped by this lag notice.
151 pub missed_events: u64,
152 /// Newest output sequence appended when lag was detected.
153 pub newest_sequence: u64,
154 /// Bounded recent live output the daemon retained at lag time.
155 pub recent: PaneRecentOutput,
156}
157
158impl PaneLagNotice {
159 fn from_proto(value: ProtoLagNotice) -> Self {
160 Self {
161 expected_sequence: value.expected_sequence,
162 resume_sequence: value.resume_sequence,
163 missed_events: value.missed_events,
164 newest_sequence: value.newest_sequence,
165 recent: PaneRecentOutput::from_proto(value.recent),
166 }
167 }
168}
169
170/// One item delivered by [`PaneOutputStream`].
171#[derive(Debug, Clone, PartialEq, Eq, Hash)]
172#[non_exhaustive]
173pub enum PaneOutputChunk {
174 /// Raw decoded pane bytes paired with the daemon-assigned monotonic
175 /// per-pane sequence.
176 Bytes {
177 /// Per-pane monotonic output sequence.
178 sequence: u64,
179 /// Arbitrary raw pane bytes — may include NUL or non-UTF-8 byte
180 /// sequences.
181 bytes: Vec<u8>,
182 },
183 /// A daemon-side gap report. Subsequent [`Self::Bytes`] chunks resume
184 /// at [`PaneLagNotice::resume_sequence`].
185 Lag(PaneLagNotice),
186}
187
188/// One item delivered by [`PaneLineStream`].
189#[derive(Debug, Clone, PartialEq, Eq, Hash)]
190#[non_exhaustive]
191pub enum PaneLineItem {
192 /// Decoded line text, with `String::from_utf8_lossy` already applied.
193 /// The trailing `\n` and any other line-terminator bytes have been
194 /// stripped from `Line.text`.
195 Line {
196 /// Rendered line text.
197 text: String,
198 },
199 /// A daemon-side gap report propagated unchanged from the underlying
200 /// raw byte stream. The line stream drops its partial-line buffer
201 /// when this fires; subsequent line splitting starts from a clean
202 /// state.
203 Lag(PaneLagNotice),
204}
205
206/// Opaque live stream of pane output bytes plus sequence/lag notices.
207///
208/// Construction goes through [`Pane::output_stream`](crate::Pane::output_stream).
209/// Use [`PaneOutputStream::next`] to drive the cursor; the per-call
210/// polling cadence and any backoff is internal and unspecified. The
211/// daemon's [`PaneOutputSubscriptionId`] is *not* observable through this
212/// type.
213pub struct PaneOutputStream {
214 inner: PaneSubscription,
215 pending: VecDeque<PaneOutputChunk>,
216 poll_delay: Duration,
217}
218
219/// Opaque live stream of rendered pane output lines.
220///
221/// Construction goes through [`Pane::line_stream`](crate::Pane::line_stream).
222/// See the module docs for the lossy UTF-8 and partial-line buffering
223/// rules.
224pub struct PaneLineStream {
225 inner: PaneOutputStream,
226 line_buffer: Vec<u8>,
227 pending: VecDeque<PaneLineItem>,
228}
229
230struct PaneSubscription {
231 transport: TransportClient,
232 subscription_id: PaneOutputSubscriptionId,
233 // The drop guard is held only for its destructor side effect: it
234 // fires the best-effort `unsubscribe-pane-output` request when the
235 // parent stream is dropped. The rename signals to the linter that
236 // we never read it; the guard's own [`Drop`] is the entire reason
237 // it lives in this struct.
238 _drop_guard: DropGuard,
239 closed: bool,
240}
241
242impl PaneOutputStream {
243 pub(crate) async fn open(
244 transport: TransportClient,
245 target: PaneTargetRef,
246 start: PaneOutputStart,
247 ) -> Result<Self> {
248 let start = start.into_proto();
249 let response = match target {
250 PaneTargetRef::Slot(target) => {
251 transport
252 .request(Request::SubscribePaneOutput(SubscribePaneOutputRequest {
253 target,
254 start,
255 }))
256 .await?
257 }
258 PaneTargetRef::Id { .. } => {
259 crate::capabilities::require(&transport, &[CAPABILITY_SDK_PANE_BY_ID]).await?;
260 transport
261 .request(Request::SubscribePaneOutputRef(
262 SubscribePaneOutputRefRequest { target, start },
263 ))
264 .await?
265 }
266 };
267
268 let subscription_id = match response {
269 Response::SubscribePaneOutput(response) => response.subscription_id,
270 response => return Err(unexpected_response("subscribe-pane-output", response)),
271 };
272
273 let unsubscribe =
274 Request::UnsubscribePaneOutput(UnsubscribePaneOutputRequest { subscription_id });
275 let drop_guard = DropGuard::best_effort(transport.clone(), unsubscribe);
276
277 Ok(Self {
278 inner: PaneSubscription {
279 transport,
280 subscription_id,
281 _drop_guard: drop_guard,
282 closed: false,
283 },
284 pending: VecDeque::new(),
285 poll_delay: POLL_INITIAL_DELAY,
286 })
287 }
288
289 /// Returns the next chunk, awaiting daemon output if necessary.
290 ///
291 /// Returns `Ok(None)` once the daemon reports the subscription is no
292 /// longer alive — for example after the pane closed and the daemon
293 /// removed the subscription record. The drop-time best-effort
294 /// unsubscribe still runs in that case.
295 pub async fn next(&mut self) -> Result<Option<PaneOutputChunk>> {
296 if let Some(chunk) = self.pop_pending_chunk() {
297 return Ok(Some(chunk));
298 }
299 if self.inner.closed {
300 return Ok(None);
301 }
302
303 loop {
304 match self.refill_once().await? {
305 RefillOutcome::Closed => {
306 self.inner.closed = true;
307 return Ok(None);
308 }
309 RefillOutcome::Filled => {
310 if let Some(chunk) = self.pop_pending_chunk() {
311 self.poll_delay = POLL_INITIAL_DELAY;
312 return Ok(Some(chunk));
313 }
314 let delay = self.poll_delay;
315 self.poll_delay = (self.poll_delay * 2).min(POLL_MAX_DELAY);
316 tokio::time::sleep(delay).await;
317 }
318 }
319 }
320 }
321
322 /// Drains any chunks that the daemon already has queued for this
323 /// subscription. Returns an empty vec when no chunks were ready.
324 ///
325 /// `poll_once` performs exactly one
326 /// [`PaneOutputCursorRequest`] round trip and never sleeps, which
327 /// makes it the appropriate primitive for callers that want explicit
328 /// control over their own backoff.
329 pub async fn poll_once(&mut self) -> Result<Vec<PaneOutputChunk>> {
330 let mut buffered: Vec<PaneOutputChunk> = self.pending.drain(..).collect();
331 if self.inner.closed {
332 return Ok(buffered);
333 }
334
335 match self.refill_once().await? {
336 RefillOutcome::Closed => {
337 self.inner.closed = true;
338 }
339 RefillOutcome::Filled => {
340 buffered.extend(self.pending.drain(..));
341 if buffered.iter().any(output_chunk_is_eof) {
342 self.inner.closed = true;
343 }
344 }
345 }
346 Ok(buffered)
347 }
348
349 async fn refill_once(&mut self) -> Result<RefillOutcome> {
350 let request = Request::PaneOutputCursor(PaneOutputCursorRequest {
351 subscription_id: self.inner.subscription_id,
352 max_events: Some(PANE_OUTPUT_BATCH_SIZE),
353 });
354
355 match self.inner.transport.request(request).await {
356 Ok(Response::PaneOutputCursor(cursor)) => {
357 self.inner
358 .validate_response_subscription("pane-output-cursor", cursor.subscription_id)?;
359 ingest_cursor(&mut self.pending, cursor.events);
360 Ok(RefillOutcome::Filled)
361 }
362 Ok(Response::PaneOutputLag(lag)) => {
363 self.inner
364 .validate_response_subscription("pane-output-lag", lag.subscription_id)?;
365 self.pending
366 .push_back(PaneOutputChunk::Lag(PaneLagNotice::from_proto(lag.lag)));
367 Ok(RefillOutcome::Filled)
368 }
369 Ok(response) => Err(unexpected_response("pane-output-cursor", response)),
370 Err(error) if is_subscription_gone(&error) => Ok(RefillOutcome::Closed),
371 Err(error) => Err(error),
372 }
373 }
374
375 fn pop_pending_chunk(&mut self) -> Option<PaneOutputChunk> {
376 let chunk = self.pending.pop_front()?;
377 if output_chunk_is_eof(&chunk) {
378 self.inner.closed = true;
379 }
380 Some(chunk)
381 }
382}
383
384impl PaneSubscription {
385 fn validate_response_subscription(
386 &self,
387 command: &'static str,
388 response_id: PaneOutputSubscriptionId,
389 ) -> Result<()> {
390 if response_id == self.subscription_id {
391 return Ok(());
392 }
393 Err(subscription_mismatch_error(
394 command,
395 self.subscription_id,
396 response_id,
397 ))
398 }
399}
400
401fn subscription_mismatch_error(
402 command: &'static str,
403 expected: PaneOutputSubscriptionId,
404 got: PaneOutputSubscriptionId,
405) -> RmuxError {
406 RmuxError::protocol(rmux_proto::RmuxError::Server(format!(
407 "rmux daemon sent subscription id {} in `{command}` response for subscription {}",
408 got.as_u64(),
409 expected.as_u64()
410 )))
411}
412
413fn ingest_cursor(target: &mut VecDeque<PaneOutputChunk>, events: Vec<PaneOutputEvent>) {
414 target.reserve(events.len());
415 for event in events {
416 target.push_back(PaneOutputChunk::Bytes {
417 sequence: event.sequence,
418 bytes: event.bytes,
419 });
420 }
421}
422
423fn output_chunk_is_eof(chunk: &PaneOutputChunk) -> bool {
424 matches!(chunk, PaneOutputChunk::Bytes { bytes, .. } if bytes.is_empty())
425}
426
427enum RefillOutcome {
428 Filled,
429 Closed,
430}
431
432fn is_subscription_gone(error: &RmuxError) -> bool {
433 match error {
434 RmuxError::Protocol {
435 source: rmux_proto::RmuxError::Server(message),
436 } => message == "subscription not found" || message == "subscription receiver not found",
437 _ => false,
438 }
439}
440
441impl PaneLineStream {
442 pub(crate) fn wrap(inner: PaneOutputStream) -> Self {
443 Self {
444 inner,
445 line_buffer: Vec::new(),
446 pending: VecDeque::new(),
447 }
448 }
449
450 /// Returns the next line or lag notice, awaiting daemon output if
451 /// necessary.
452 ///
453 /// Returns `Ok(None)` when the underlying subscription is gone. Any
454 /// trailing partial-line bytes that were never terminated by `\n`
455 /// are dropped at end-of-stream because the daemon never delivered a
456 /// terminator — they did not represent a complete line.
457 pub async fn next(&mut self) -> Result<Option<PaneLineItem>> {
458 loop {
459 if let Some(item) = self.pending.pop_front() {
460 return Ok(Some(item));
461 }
462 match self.inner.next().await? {
463 Some(PaneOutputChunk::Bytes { bytes, .. }) => {
464 split_lines(&mut self.line_buffer, &bytes, &mut self.pending);
465 }
466 Some(PaneOutputChunk::Lag(notice)) => {
467 // Drop partial-line buffer because the byte stream is
468 // discontinuous after a lag — the next bytes may not
469 // begin at a line boundary, so concatenating them
470 // would produce a synthetic line.
471 self.line_buffer.clear();
472 self.pending.push_back(PaneLineItem::Lag(notice));
473 }
474 None => return Ok(None),
475 }
476 }
477 }
478}
479
480fn split_lines(buffer: &mut Vec<u8>, bytes: &[u8], out: &mut VecDeque<PaneLineItem>) {
481 for byte in bytes {
482 if *byte == b'\n' {
483 let line_bytes = std::mem::take(buffer);
484 out.push_back(PaneLineItem::Line {
485 text: String::from_utf8_lossy(&line_bytes).into_owned(),
486 });
487 } else {
488 buffer.push(*byte);
489 }
490 }
491}
492
493impl std::fmt::Debug for PaneOutputStream {
494 fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
495 formatter
496 .debug_struct("PaneOutputStream")
497 .field("closed", &self.inner.closed)
498 .field("buffered_chunks", &self.pending.len())
499 .finish_non_exhaustive()
500 }
501}
502
503impl std::fmt::Debug for PaneLineStream {
504 fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
505 formatter
506 .debug_struct("PaneLineStream")
507 .field("buffered_bytes", &self.line_buffer.len())
508 .field("pending_items", &self.pending.len())
509 .finish_non_exhaustive()
510 }
511}
512
513#[cfg(test)]
514#[path = "streams_contract_tests.rs"]
515mod streams_contract_tests;
516
517#[cfg(test)]
518#[path = "streams_tests.rs"]
519mod streams_tests;