use std::collections::VecDeque;
use std::time::Duration;
use rmux_proto::{
PaneOutputCursorRequest, PaneOutputEvent, PaneOutputLagNotice as ProtoLagNotice,
PaneOutputSubscriptionId, PaneOutputSubscriptionStart, PaneRecentOutput as ProtoRecentOutput,
PaneTargetRef, Request, Response, SubscribePaneOutputRefRequest, SubscribePaneOutputRequest,
UnsubscribePaneOutputRequest, CAPABILITY_SDK_PANE_BY_ID,
};
use crate::handles::session::unexpected_response;
use crate::transport::{DropGuard, TransportClient};
use crate::{Result, RmuxError};
const PANE_OUTPUT_BATCH_SIZE: u16 = 256;
const POLL_INITIAL_DELAY: Duration = Duration::from_millis(2);
const POLL_MAX_DELAY: Duration = Duration::from_millis(50);
#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Hash)]
#[non_exhaustive]
pub enum PaneOutputStart {
#[default]
Now,
Oldest,
}
impl PaneOutputStart {
fn into_proto(self) -> PaneOutputSubscriptionStart {
match self {
Self::Now => PaneOutputSubscriptionStart::Now,
Self::Oldest => PaneOutputSubscriptionStart::Oldest,
}
}
}
#[derive(Debug, Default, Clone, PartialEq, Eq, Hash)]
pub struct PaneRecentOutput {
pub bytes: Vec<u8>,
pub oldest_sequence: Option<u64>,
pub newest_sequence: Option<u64>,
}
impl PaneRecentOutput {
fn from_proto(value: ProtoRecentOutput) -> Self {
Self {
bytes: value.bytes,
oldest_sequence: value.oldest_sequence,
newest_sequence: value.newest_sequence,
}
}
}
#[derive(Debug, Default, Clone, PartialEq, Eq, Hash)]
pub struct PaneLagNotice {
pub expected_sequence: u64,
pub resume_sequence: u64,
pub missed_events: u64,
pub newest_sequence: u64,
pub recent: PaneRecentOutput,
}
impl PaneLagNotice {
fn from_proto(value: ProtoLagNotice) -> Self {
Self {
expected_sequence: value.expected_sequence,
resume_sequence: value.resume_sequence,
missed_events: value.missed_events,
newest_sequence: value.newest_sequence,
recent: PaneRecentOutput::from_proto(value.recent),
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
#[non_exhaustive]
pub enum PaneOutputChunk {
Bytes {
sequence: u64,
bytes: Vec<u8>,
},
Lag(PaneLagNotice),
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
#[non_exhaustive]
pub enum PaneLineItem {
Line {
text: String,
},
Lag(PaneLagNotice),
}
pub struct PaneOutputStream {
inner: PaneSubscription,
pending: VecDeque<PaneOutputChunk>,
poll_delay: Duration,
}
pub struct PaneLineStream {
inner: PaneOutputStream,
line_buffer: Vec<u8>,
pending: VecDeque<PaneLineItem>,
}
struct PaneSubscription {
transport: TransportClient,
subscription_id: PaneOutputSubscriptionId,
_drop_guard: DropGuard,
closed: bool,
}
impl PaneOutputStream {
pub(crate) async fn open(
transport: TransportClient,
target: PaneTargetRef,
start: PaneOutputStart,
) -> Result<Self> {
let start = start.into_proto();
let response = match target {
PaneTargetRef::Slot(target) => {
transport
.request(Request::SubscribePaneOutput(SubscribePaneOutputRequest {
target,
start,
}))
.await?
}
PaneTargetRef::Id { .. } => {
crate::capabilities::require(&transport, &[CAPABILITY_SDK_PANE_BY_ID]).await?;
transport
.request(Request::SubscribePaneOutputRef(
SubscribePaneOutputRefRequest { target, start },
))
.await?
}
};
let subscription_id = match response {
Response::SubscribePaneOutput(response) => response.subscription_id,
response => return Err(unexpected_response("subscribe-pane-output", response)),
};
let unsubscribe =
Request::UnsubscribePaneOutput(UnsubscribePaneOutputRequest { subscription_id });
let drop_guard = DropGuard::best_effort(transport.clone(), unsubscribe);
Ok(Self {
inner: PaneSubscription {
transport,
subscription_id,
_drop_guard: drop_guard,
closed: false,
},
pending: VecDeque::new(),
poll_delay: POLL_INITIAL_DELAY,
})
}
pub async fn next(&mut self) -> Result<Option<PaneOutputChunk>> {
if let Some(chunk) = self.pop_pending_chunk() {
return Ok(Some(chunk));
}
if self.inner.closed {
return Ok(None);
}
loop {
match self.refill_once().await? {
RefillOutcome::Closed => {
self.inner.closed = true;
return Ok(None);
}
RefillOutcome::Filled => {
if let Some(chunk) = self.pop_pending_chunk() {
self.poll_delay = POLL_INITIAL_DELAY;
return Ok(Some(chunk));
}
let delay = self.poll_delay;
self.poll_delay = (self.poll_delay * 2).min(POLL_MAX_DELAY);
tokio::time::sleep(delay).await;
}
}
}
}
pub async fn poll_once(&mut self) -> Result<Vec<PaneOutputChunk>> {
let mut buffered: Vec<PaneOutputChunk> = self.pending.drain(..).collect();
if self.inner.closed {
return Ok(buffered);
}
match self.refill_once().await? {
RefillOutcome::Closed => {
self.inner.closed = true;
}
RefillOutcome::Filled => {
buffered.extend(self.pending.drain(..));
if buffered.iter().any(output_chunk_is_eof) {
self.inner.closed = true;
}
}
}
Ok(buffered)
}
async fn refill_once(&mut self) -> Result<RefillOutcome> {
let request = Request::PaneOutputCursor(PaneOutputCursorRequest {
subscription_id: self.inner.subscription_id,
max_events: Some(PANE_OUTPUT_BATCH_SIZE),
});
match self.inner.transport.request(request).await {
Ok(Response::PaneOutputCursor(cursor)) => {
self.inner
.validate_response_subscription("pane-output-cursor", cursor.subscription_id)?;
ingest_cursor(&mut self.pending, cursor.events);
Ok(RefillOutcome::Filled)
}
Ok(Response::PaneOutputLag(lag)) => {
self.inner
.validate_response_subscription("pane-output-lag", lag.subscription_id)?;
self.pending
.push_back(PaneOutputChunk::Lag(PaneLagNotice::from_proto(lag.lag)));
Ok(RefillOutcome::Filled)
}
Ok(response) => Err(unexpected_response("pane-output-cursor", response)),
Err(error) if is_subscription_gone(&error) => Ok(RefillOutcome::Closed),
Err(error) => Err(error),
}
}
fn pop_pending_chunk(&mut self) -> Option<PaneOutputChunk> {
let chunk = self.pending.pop_front()?;
if output_chunk_is_eof(&chunk) {
self.inner.closed = true;
}
Some(chunk)
}
}
impl PaneSubscription {
fn validate_response_subscription(
&self,
command: &'static str,
response_id: PaneOutputSubscriptionId,
) -> Result<()> {
if response_id == self.subscription_id {
return Ok(());
}
Err(subscription_mismatch_error(
command,
self.subscription_id,
response_id,
))
}
}
fn subscription_mismatch_error(
command: &'static str,
expected: PaneOutputSubscriptionId,
got: PaneOutputSubscriptionId,
) -> RmuxError {
RmuxError::protocol(rmux_proto::RmuxError::Server(format!(
"rmux daemon sent subscription id {} in `{command}` response for subscription {}",
got.as_u64(),
expected.as_u64()
)))
}
fn ingest_cursor(target: &mut VecDeque<PaneOutputChunk>, events: Vec<PaneOutputEvent>) {
target.reserve(events.len());
for event in events {
target.push_back(PaneOutputChunk::Bytes {
sequence: event.sequence,
bytes: event.bytes,
});
}
}
fn output_chunk_is_eof(chunk: &PaneOutputChunk) -> bool {
matches!(chunk, PaneOutputChunk::Bytes { bytes, .. } if bytes.is_empty())
}
enum RefillOutcome {
Filled,
Closed,
}
fn is_subscription_gone(error: &RmuxError) -> bool {
match error {
RmuxError::Protocol {
source: rmux_proto::RmuxError::Server(message),
} => message == "subscription not found" || message == "subscription receiver not found",
_ => false,
}
}
impl PaneLineStream {
pub(crate) fn wrap(inner: PaneOutputStream) -> Self {
Self {
inner,
line_buffer: Vec::new(),
pending: VecDeque::new(),
}
}
pub async fn next(&mut self) -> Result<Option<PaneLineItem>> {
loop {
if let Some(item) = self.pending.pop_front() {
return Ok(Some(item));
}
match self.inner.next().await? {
Some(PaneOutputChunk::Bytes { bytes, .. }) => {
split_lines(&mut self.line_buffer, &bytes, &mut self.pending);
}
Some(PaneOutputChunk::Lag(notice)) => {
self.line_buffer.clear();
self.pending.push_back(PaneLineItem::Lag(notice));
}
None => return Ok(None),
}
}
}
}
fn split_lines(buffer: &mut Vec<u8>, bytes: &[u8], out: &mut VecDeque<PaneLineItem>) {
for byte in bytes {
if *byte == b'\n' {
let line_bytes = std::mem::take(buffer);
out.push_back(PaneLineItem::Line {
text: String::from_utf8_lossy(&line_bytes).into_owned(),
});
} else {
buffer.push(*byte);
}
}
}
impl std::fmt::Debug for PaneOutputStream {
fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
formatter
.debug_struct("PaneOutputStream")
.field("closed", &self.inner.closed)
.field("buffered_chunks", &self.pending.len())
.finish_non_exhaustive()
}
}
impl std::fmt::Debug for PaneLineStream {
fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
formatter
.debug_struct("PaneLineStream")
.field("buffered_bytes", &self.line_buffer.len())
.field("pending_items", &self.pending.len())
.finish_non_exhaustive()
}
}
#[cfg(test)]
#[path = "streams_contract_tests.rs"]
mod streams_contract_tests;
#[cfg(test)]
#[path = "streams_tests.rs"]
mod streams_tests;