Skip to main content

rmux_sdk/
broadcast.rs

1//! Broadcast helpers for pane groups.
2//!
3//! Broadcast uses the daemon-side batch endpoint when every pane belongs to
4//! the same resolved SDK endpoint. Delivery still does not claim simultaneous
5//! cross-pane execution; callers get a typed partial-failure error when any
6//! pane rejects the input.
7
8use std::error::Error;
9use std::fmt;
10
11use tokio::task::JoinSet;
12
13use crate::{Pane, PaneId, PaneRef, Result, RmuxError};
14use rmux_proto::{PaneBroadcastInputRequest, Request, Response, CAPABILITY_SDK_PANE_BROADCAST};
15
16/// Input that can be broadcast to many panes.
17#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
18#[non_exhaustive]
19pub enum Input<'a> {
20    /// Literal text bytes. No newline is appended.
21    Text(&'a str),
22    /// One tmux-compatible key token such as `Enter` or `Backspace`.
23    Key(&'a str),
24}
25
26impl<'a> Input<'a> {
27    /// Constructs literal text input.
28    #[must_use]
29    pub const fn text(value: &'a str) -> Self {
30        Self::Text(value)
31    }
32
33    /// Constructs key-token input.
34    #[must_use]
35    pub const fn key(value: &'a str) -> Self {
36        Self::Key(value)
37    }
38}
39
40#[derive(Debug, Clone, PartialEq, Eq)]
41enum OwnedInput {
42    Text(String),
43    Key(String),
44}
45
46impl From<Input<'_>> for OwnedInput {
47    fn from(value: Input<'_>) -> Self {
48        match value {
49            Input::Text(value) => Self::Text(value.to_owned()),
50            Input::Key(value) => Self::Key(value.to_owned()),
51        }
52    }
53}
54
55/// Successful broadcast delivery for one pane.
56#[derive(Debug, Clone, PartialEq, Eq)]
57pub struct BroadcastPaneSuccess {
58    target: PaneRef,
59    pane_id: Option<PaneId>,
60}
61
62impl BroadcastPaneSuccess {
63    /// Returns the slot target observed for this pane handle.
64    #[must_use]
65    pub const fn target(&self) -> &PaneRef {
66        &self.target
67    }
68
69    /// Returns the live pane id observed before delivery, when available.
70    #[must_use]
71    pub const fn pane_id(&self) -> Option<PaneId> {
72        self.pane_id
73    }
74}
75
76/// Failed broadcast delivery for one pane.
77#[derive(Debug)]
78pub struct BroadcastPaneFailure {
79    target: PaneRef,
80    pane_id: Option<PaneId>,
81    error: RmuxError,
82}
83
84impl BroadcastPaneFailure {
85    /// Returns the slot target observed for this pane handle.
86    #[must_use]
87    pub const fn target(&self) -> &PaneRef {
88        &self.target
89    }
90
91    /// Returns the live pane id observed before delivery, when available.
92    #[must_use]
93    pub const fn pane_id(&self) -> Option<PaneId> {
94        self.pane_id
95    }
96
97    /// Returns the per-pane delivery error.
98    #[must_use]
99    pub const fn error(&self) -> &RmuxError {
100        &self.error
101    }
102
103    /// Consumes the failure and returns the per-pane delivery error.
104    #[must_use]
105    pub fn into_error(self) -> RmuxError {
106        self.error
107    }
108}
109
110/// Result returned when every pane accepted a broadcast input.
111#[derive(Debug, Clone, PartialEq, Eq)]
112pub struct BroadcastResult {
113    successes: Vec<BroadcastPaneSuccess>,
114}
115
116impl BroadcastResult {
117    /// Returns one success entry per targeted pane.
118    #[must_use]
119    pub fn successes(&self) -> &[BroadcastPaneSuccess] {
120        &self.successes
121    }
122
123    /// Returns the number of panes that accepted the input.
124    #[must_use]
125    pub fn len(&self) -> usize {
126        self.successes.len()
127    }
128
129    /// Returns `true` when the broadcast targeted no panes.
130    #[must_use]
131    pub fn is_empty(&self) -> bool {
132        self.successes.is_empty()
133    }
134}
135
136/// Error payload for a broadcast where at least one pane failed.
137#[derive(Debug)]
138pub struct PartialBroadcastFailure {
139    successes: Vec<BroadcastPaneSuccess>,
140    failures: Vec<BroadcastPaneFailure>,
141}
142
143impl PartialBroadcastFailure {
144    pub(crate) fn new(
145        successes: Vec<BroadcastPaneSuccess>,
146        failures: Vec<BroadcastPaneFailure>,
147    ) -> Self {
148        Self {
149            successes,
150            failures,
151        }
152    }
153
154    /// Returns panes that accepted the input before the partial failure was
155    /// reported.
156    #[must_use]
157    pub fn successes(&self) -> &[BroadcastPaneSuccess] {
158        &self.successes
159    }
160
161    /// Returns panes that rejected the input.
162    #[must_use]
163    pub fn failures(&self) -> &[BroadcastPaneFailure] {
164        &self.failures
165    }
166}
167
168impl fmt::Display for PartialBroadcastFailure {
169    fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
170        writeln!(
171            formatter,
172            "broadcast failed for {} of {} panes",
173            self.failures.len(),
174            self.successes.len() + self.failures.len()
175        )?;
176        for (index, failure) in self.failures.iter().enumerate() {
177            if index > 0 {
178                writeln!(formatter)?;
179            }
180            write!(
181                formatter,
182                "{}. {}",
183                index + 1,
184                RenderBroadcastFailure(failure)
185            )?;
186        }
187        Ok(())
188    }
189}
190
191impl Error for PartialBroadcastFailure {}
192
193pub(crate) async fn broadcast(panes: &[Pane], input: Input<'_>) -> Result<BroadcastResult> {
194    if panes.is_empty() {
195        return Ok(BroadcastResult {
196            successes: Vec::new(),
197        });
198    }
199    if same_endpoint(panes) {
200        match broadcast_daemon_side(panes, input).await {
201            Ok(result) => return Ok(result),
202            Err(error) if is_daemon_broadcast_unavailable(&error) => {}
203            Err(error) => return Err(error),
204        }
205    }
206    broadcast_client_side(panes, input).await
207}
208
209async fn broadcast_daemon_side(panes: &[Pane], input: Input<'_>) -> Result<BroadcastResult> {
210    crate::capabilities::require(panes[0].transport(), &[CAPABILITY_SDK_PANE_BROADCAST]).await?;
211    let response = panes[0]
212        .transport()
213        .request(Request::PaneBroadcastInput(PaneBroadcastInputRequest {
214            targets: panes.iter().map(Pane::proto_target_ref).collect(),
215            keys: input_keys(input),
216            literal: matches!(input, Input::Text(_)),
217        }))
218        .await?;
219
220    let response = match response {
221        Response::PaneBroadcastInput(response) => response,
222        response => {
223            return Err(RmuxError::protocol(rmux_proto::RmuxError::Server(format!(
224                "rmux daemon sent `{}` response for `pane broadcast` request",
225                response.command_name()
226            ))));
227        }
228    };
229
230    let successes = response
231        .successes
232        .into_iter()
233        .map(|success| BroadcastPaneSuccess {
234            target: success.target.into(),
235            pane_id: success.pane_id,
236        })
237        .collect::<Vec<_>>();
238    let failures = response
239        .failures
240        .into_iter()
241        .map(|failure| {
242            let index = usize::try_from(failure.target_index).ok();
243            let target = index
244                .and_then(|index| panes.get(index))
245                .map(|pane| pane.target().clone())
246                .unwrap_or_else(|| fallback_target_from_ref(failure.target));
247            BroadcastPaneFailure {
248                target,
249                pane_id: None,
250                error: RmuxError::protocol(failure.error),
251            }
252        })
253        .collect::<Vec<_>>();
254
255    if failures.is_empty() {
256        Ok(BroadcastResult { successes })
257    } else {
258        Err(RmuxError::partial_broadcast(PartialBroadcastFailure::new(
259            successes, failures,
260        )))
261    }
262}
263
264async fn broadcast_client_side(panes: &[Pane], input: Input<'_>) -> Result<BroadcastResult> {
265    let input = OwnedInput::from(input);
266    let mut tasks = JoinSet::new();
267    for (index, pane) in panes.iter().cloned().enumerate() {
268        let input = input.clone();
269        tasks.spawn(async move { (index, send_one(pane, input).await) });
270    }
271
272    let mut outcomes = Vec::with_capacity(panes.len());
273    while let Some(joined) = tasks.join_next().await {
274        let (index, outcome) = joined.map_err(|error| {
275            RmuxError::transport(
276                "join broadcast worker task",
277                std::io::Error::other(error.to_string()),
278            )
279        })?;
280        outcomes.push((index, outcome));
281    }
282    outcomes.sort_by_key(|(index, _)| *index);
283
284    let mut successes = Vec::new();
285    let mut failures = Vec::new();
286    for (_, outcome) in outcomes {
287        match outcome {
288            PaneBroadcastOutcome::Success(success) => successes.push(success),
289            PaneBroadcastOutcome::Failure(failure) => failures.push(failure),
290        }
291    }
292
293    if failures.is_empty() {
294        Ok(BroadcastResult { successes })
295    } else {
296        Err(RmuxError::partial_broadcast(PartialBroadcastFailure::new(
297            successes, failures,
298        )))
299    }
300}
301
302fn same_endpoint(panes: &[Pane]) -> bool {
303    let Some(first) = panes.first() else {
304        return true;
305    };
306    panes.iter().all(|pane| pane.endpoint() == first.endpoint())
307}
308
309fn input_keys(input: Input<'_>) -> Vec<String> {
310    match input {
311        Input::Text(text) => vec![text.to_owned()],
312        Input::Key(key) => vec![key.to_owned()],
313    }
314}
315
316fn fallback_target_from_ref(target: rmux_proto::PaneTargetRef) -> PaneRef {
317    match target {
318        rmux_proto::PaneTargetRef::Slot(target) => target.into(),
319        rmux_proto::PaneTargetRef::Id { session_name, .. } => PaneRef::new(session_name, 0, 0),
320    }
321}
322
323fn is_daemon_broadcast_unavailable(error: &RmuxError) -> bool {
324    if crate::capabilities::is_unavailable(error, CAPABILITY_SDK_PANE_BROADCAST) {
325        return true;
326    }
327    matches!(error, RmuxError::Unsupported { .. })
328}
329
330async fn send_one(pane: Pane, input: OwnedInput) -> PaneBroadcastOutcome {
331    let target = pane.target().clone();
332    let pane_id = pane.id().await.ok().flatten();
333    let result = match input {
334        OwnedInput::Text(text) => pane.send_text(text).await,
335        OwnedInput::Key(key) => pane.send_key(key).await,
336    };
337
338    match result {
339        Ok(()) => PaneBroadcastOutcome::Success(BroadcastPaneSuccess { target, pane_id }),
340        Err(error) => PaneBroadcastOutcome::Failure(BroadcastPaneFailure {
341            target,
342            pane_id,
343            error,
344        }),
345    }
346}
347
348enum PaneBroadcastOutcome {
349    Success(BroadcastPaneSuccess),
350    Failure(BroadcastPaneFailure),
351}
352
353struct RenderBroadcastFailure<'a>(&'a BroadcastPaneFailure);
354
355impl fmt::Display for RenderBroadcastFailure<'_> {
356    fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
357        write!(formatter, "{:?} failed", self.0.target)?;
358        if let Some(pane_id) = self.0.pane_id {
359            write!(formatter, " ({pane_id})")?;
360        }
361        write!(formatter, ": {}", self.0.error)
362    }
363}
364
365#[cfg(test)]
366mod tests {
367    use tokio::io::{AsyncReadExt, AsyncWriteExt};
368
369    use super::{broadcast, Input};
370    use crate::transport::TransportClient;
371    use crate::{Pane, PaneId, PaneRef, RmuxEndpoint, SessionName};
372    use rmux_proto::{
373        encode_frame, CommandOutput, FrameDecoder, HandshakeRequest, HandshakeResponse,
374        ListPanesRequest, ListPanesResponse, Request, Response, SendKeysExtRequest,
375        SendKeysResponse, CAPABILITY_HANDSHAKE, CAPABILITY_SDK_PANE_BROADCAST,
376    };
377
378    #[tokio::test]
379    async fn broadcast_falls_back_to_client_fanout_when_daemon_batch_is_unsupported() {
380        let (client_stream, mut server_stream) = tokio::io::duplex(4096);
381        let transport = TransportClient::spawn(client_stream);
382        let session_name = SessionName::new("broadcastfallback").expect("valid session name");
383        let pane = Pane::new(
384            PaneRef::new(session_name.clone(), 0, 0),
385            RmuxEndpoint::Default,
386            None,
387            transport,
388        );
389        let broadcast_task =
390            tokio::spawn(async move { broadcast(&[pane], Input::Text("printf ok")).await });
391
392        match read_request(&mut server_stream).await {
393            Request::Handshake(HandshakeRequest {
394                required_capabilities,
395                ..
396            }) => {
397                assert!(required_capabilities
398                    .iter()
399                    .any(|capability| capability == CAPABILITY_HANDSHAKE));
400                assert!(!required_capabilities
401                    .iter()
402                    .any(|capability| capability == CAPABILITY_SDK_PANE_BROADCAST));
403            }
404            request => panic!("expected broadcast capability handshake, got {request:?}"),
405        }
406        write_response(
407            &mut server_stream,
408            Response::Handshake(HandshakeResponse {
409                wire_version: rmux_proto::RMUX_WIRE_VERSION,
410                capabilities: vec![CAPABILITY_HANDSHAKE.to_owned()],
411            }),
412        )
413        .await;
414
415        match read_request(&mut server_stream).await {
416            Request::ListPanes(ListPanesRequest {
417                target,
418                target_window_index,
419                ..
420            }) => {
421                assert_eq!(target, session_name);
422                assert_eq!(target_window_index, Some(0));
423            }
424            request => panic!("expected client fallback pane-id lookup, got {request:?}"),
425        }
426        write_response(
427            &mut server_stream,
428            Response::ListPanes(ListPanesResponse {
429                output: CommandOutput::from_stdout("0:0:%1\n"),
430            }),
431        )
432        .await;
433
434        match read_request(&mut server_stream).await {
435            Request::SendKeysExt(SendKeysExtRequest {
436                keys,
437                literal,
438                target,
439                ..
440            }) => {
441                assert_eq!(keys, ["printf ok"]);
442                assert!(literal);
443                assert!(target.is_some());
444            }
445            request => panic!("expected client-side send-keys fallback, got {request:?}"),
446        }
447        write_response(
448            &mut server_stream,
449            Response::SendKeys(SendKeysResponse { key_count: 1 }),
450        )
451        .await;
452
453        let result = broadcast_task
454            .await
455            .expect("broadcast task")
456            .expect("fallback succeeds");
457        assert_eq!(result.len(), 1);
458        assert_eq!(result.successes()[0].pane_id(), Some(PaneId::new(1)));
459    }
460
461    async fn read_request(stream: &mut tokio::io::DuplexStream) -> Request {
462        let mut decoder = FrameDecoder::new();
463        let mut buffer = [0_u8; 256];
464
465        loop {
466            if let Some(request) = decoder
467                .next_frame::<Request>()
468                .expect("request frame decodes")
469            {
470                return request;
471            }
472
473            let read = stream.read(&mut buffer).await.expect("read request");
474            assert_ne!(read, 0, "client closed before request arrived");
475            decoder.push_bytes(&buffer[..read]);
476        }
477    }
478
479    async fn write_response(stream: &mut tokio::io::DuplexStream, response: Response) {
480        let frame = encode_frame(&response).expect("response encodes");
481        stream.write_all(&frame).await.expect("write response");
482        stream.flush().await.expect("flush response");
483    }
484}