1use std::error::Error;
9use std::fmt;
10
11use tokio::task::JoinSet;
12
13use crate::{Pane, PaneId, PaneRef, Result, RmuxError};
14use rmux_proto::{PaneBroadcastInputRequest, Request, Response, CAPABILITY_SDK_PANE_BROADCAST};
15
16#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
18#[non_exhaustive]
19pub enum Input<'a> {
20 Text(&'a str),
22 Key(&'a str),
24}
25
26impl<'a> Input<'a> {
27 #[must_use]
29 pub const fn text(value: &'a str) -> Self {
30 Self::Text(value)
31 }
32
33 #[must_use]
35 pub const fn key(value: &'a str) -> Self {
36 Self::Key(value)
37 }
38}
39
40#[derive(Debug, Clone, PartialEq, Eq)]
41enum OwnedInput {
42 Text(String),
43 Key(String),
44}
45
46impl From<Input<'_>> for OwnedInput {
47 fn from(value: Input<'_>) -> Self {
48 match value {
49 Input::Text(value) => Self::Text(value.to_owned()),
50 Input::Key(value) => Self::Key(value.to_owned()),
51 }
52 }
53}
54
55#[derive(Debug, Clone, PartialEq, Eq)]
57pub struct BroadcastPaneSuccess {
58 target: PaneRef,
59 pane_id: Option<PaneId>,
60}
61
62impl BroadcastPaneSuccess {
63 #[must_use]
65 pub const fn target(&self) -> &PaneRef {
66 &self.target
67 }
68
69 #[must_use]
71 pub const fn pane_id(&self) -> Option<PaneId> {
72 self.pane_id
73 }
74}
75
76#[derive(Debug)]
78pub struct BroadcastPaneFailure {
79 target: PaneRef,
80 pane_id: Option<PaneId>,
81 error: RmuxError,
82}
83
84impl BroadcastPaneFailure {
85 #[must_use]
87 pub const fn target(&self) -> &PaneRef {
88 &self.target
89 }
90
91 #[must_use]
93 pub const fn pane_id(&self) -> Option<PaneId> {
94 self.pane_id
95 }
96
97 #[must_use]
99 pub const fn error(&self) -> &RmuxError {
100 &self.error
101 }
102
103 #[must_use]
105 pub fn into_error(self) -> RmuxError {
106 self.error
107 }
108}
109
110#[derive(Debug, Clone, PartialEq, Eq)]
112pub struct BroadcastResult {
113 successes: Vec<BroadcastPaneSuccess>,
114}
115
116impl BroadcastResult {
117 #[must_use]
119 pub fn successes(&self) -> &[BroadcastPaneSuccess] {
120 &self.successes
121 }
122
123 #[must_use]
125 pub fn len(&self) -> usize {
126 self.successes.len()
127 }
128
129 #[must_use]
131 pub fn is_empty(&self) -> bool {
132 self.successes.is_empty()
133 }
134}
135
136#[derive(Debug)]
138pub struct PartialBroadcastFailure {
139 successes: Vec<BroadcastPaneSuccess>,
140 failures: Vec<BroadcastPaneFailure>,
141}
142
143impl PartialBroadcastFailure {
144 pub(crate) fn new(
145 successes: Vec<BroadcastPaneSuccess>,
146 failures: Vec<BroadcastPaneFailure>,
147 ) -> Self {
148 Self {
149 successes,
150 failures,
151 }
152 }
153
154 #[must_use]
157 pub fn successes(&self) -> &[BroadcastPaneSuccess] {
158 &self.successes
159 }
160
161 #[must_use]
163 pub fn failures(&self) -> &[BroadcastPaneFailure] {
164 &self.failures
165 }
166}
167
168impl fmt::Display for PartialBroadcastFailure {
169 fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
170 writeln!(
171 formatter,
172 "broadcast failed for {} of {} panes",
173 self.failures.len(),
174 self.successes.len() + self.failures.len()
175 )?;
176 for (index, failure) in self.failures.iter().enumerate() {
177 if index > 0 {
178 writeln!(formatter)?;
179 }
180 write!(
181 formatter,
182 "{}. {}",
183 index + 1,
184 RenderBroadcastFailure(failure)
185 )?;
186 }
187 Ok(())
188 }
189}
190
191impl Error for PartialBroadcastFailure {}
192
193pub(crate) async fn broadcast(panes: &[Pane], input: Input<'_>) -> Result<BroadcastResult> {
194 if panes.is_empty() {
195 return Ok(BroadcastResult {
196 successes: Vec::new(),
197 });
198 }
199 if same_endpoint(panes) {
200 match broadcast_daemon_side(panes, input).await {
201 Ok(result) => return Ok(result),
202 Err(error) if is_daemon_broadcast_unavailable(&error) => {}
203 Err(error) => return Err(error),
204 }
205 }
206 broadcast_client_side(panes, input).await
207}
208
209async fn broadcast_daemon_side(panes: &[Pane], input: Input<'_>) -> Result<BroadcastResult> {
210 crate::capabilities::require(panes[0].transport(), &[CAPABILITY_SDK_PANE_BROADCAST]).await?;
211 let response = panes[0]
212 .transport()
213 .request(Request::PaneBroadcastInput(PaneBroadcastInputRequest {
214 targets: panes.iter().map(Pane::proto_target_ref).collect(),
215 keys: input_keys(input),
216 literal: matches!(input, Input::Text(_)),
217 }))
218 .await?;
219
220 let response = match response {
221 Response::PaneBroadcastInput(response) => response,
222 response => {
223 return Err(RmuxError::protocol(rmux_proto::RmuxError::Server(format!(
224 "rmux daemon sent `{}` response for `pane broadcast` request",
225 response.command_name()
226 ))));
227 }
228 };
229
230 let successes = response
231 .successes
232 .into_iter()
233 .map(|success| BroadcastPaneSuccess {
234 target: success.target.into(),
235 pane_id: success.pane_id,
236 })
237 .collect::<Vec<_>>();
238 let failures = response
239 .failures
240 .into_iter()
241 .map(|failure| {
242 let index = usize::try_from(failure.target_index).ok();
243 let target = index
244 .and_then(|index| panes.get(index))
245 .map(|pane| pane.target().clone())
246 .unwrap_or_else(|| fallback_target_from_ref(failure.target));
247 BroadcastPaneFailure {
248 target,
249 pane_id: None,
250 error: RmuxError::protocol(failure.error),
251 }
252 })
253 .collect::<Vec<_>>();
254
255 if failures.is_empty() {
256 Ok(BroadcastResult { successes })
257 } else {
258 Err(RmuxError::partial_broadcast(PartialBroadcastFailure::new(
259 successes, failures,
260 )))
261 }
262}
263
264async fn broadcast_client_side(panes: &[Pane], input: Input<'_>) -> Result<BroadcastResult> {
265 let input = OwnedInput::from(input);
266 let mut tasks = JoinSet::new();
267 for (index, pane) in panes.iter().cloned().enumerate() {
268 let input = input.clone();
269 tasks.spawn(async move { (index, send_one(pane, input).await) });
270 }
271
272 let mut outcomes = Vec::with_capacity(panes.len());
273 while let Some(joined) = tasks.join_next().await {
274 let (index, outcome) = joined.map_err(|error| {
275 RmuxError::transport(
276 "join broadcast worker task",
277 std::io::Error::other(error.to_string()),
278 )
279 })?;
280 outcomes.push((index, outcome));
281 }
282 outcomes.sort_by_key(|(index, _)| *index);
283
284 let mut successes = Vec::new();
285 let mut failures = Vec::new();
286 for (_, outcome) in outcomes {
287 match outcome {
288 PaneBroadcastOutcome::Success(success) => successes.push(success),
289 PaneBroadcastOutcome::Failure(failure) => failures.push(failure),
290 }
291 }
292
293 if failures.is_empty() {
294 Ok(BroadcastResult { successes })
295 } else {
296 Err(RmuxError::partial_broadcast(PartialBroadcastFailure::new(
297 successes, failures,
298 )))
299 }
300}
301
302fn same_endpoint(panes: &[Pane]) -> bool {
303 let Some(first) = panes.first() else {
304 return true;
305 };
306 panes.iter().all(|pane| pane.endpoint() == first.endpoint())
307}
308
309fn input_keys(input: Input<'_>) -> Vec<String> {
310 match input {
311 Input::Text(text) => vec![text.to_owned()],
312 Input::Key(key) => vec![key.to_owned()],
313 }
314}
315
316fn fallback_target_from_ref(target: rmux_proto::PaneTargetRef) -> PaneRef {
317 match target {
318 rmux_proto::PaneTargetRef::Slot(target) => target.into(),
319 rmux_proto::PaneTargetRef::Id { session_name, .. } => PaneRef::new(session_name, 0, 0),
320 }
321}
322
323fn is_daemon_broadcast_unavailable(error: &RmuxError) -> bool {
324 if crate::capabilities::is_unavailable(error, CAPABILITY_SDK_PANE_BROADCAST) {
325 return true;
326 }
327 matches!(error, RmuxError::Unsupported { .. })
328}
329
330async fn send_one(pane: Pane, input: OwnedInput) -> PaneBroadcastOutcome {
331 let target = pane.target().clone();
332 let pane_id = pane.id().await.ok().flatten();
333 let result = match input {
334 OwnedInput::Text(text) => pane.send_text(text).await,
335 OwnedInput::Key(key) => pane.send_key(key).await,
336 };
337
338 match result {
339 Ok(()) => PaneBroadcastOutcome::Success(BroadcastPaneSuccess { target, pane_id }),
340 Err(error) => PaneBroadcastOutcome::Failure(BroadcastPaneFailure {
341 target,
342 pane_id,
343 error,
344 }),
345 }
346}
347
348enum PaneBroadcastOutcome {
349 Success(BroadcastPaneSuccess),
350 Failure(BroadcastPaneFailure),
351}
352
353struct RenderBroadcastFailure<'a>(&'a BroadcastPaneFailure);
354
355impl fmt::Display for RenderBroadcastFailure<'_> {
356 fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
357 write!(formatter, "{:?} failed", self.0.target)?;
358 if let Some(pane_id) = self.0.pane_id {
359 write!(formatter, " ({pane_id})")?;
360 }
361 write!(formatter, ": {}", self.0.error)
362 }
363}
364
365#[cfg(test)]
366mod tests {
367 use tokio::io::{AsyncReadExt, AsyncWriteExt};
368
369 use super::{broadcast, Input};
370 use crate::transport::TransportClient;
371 use crate::{Pane, PaneId, PaneRef, RmuxEndpoint, SessionName};
372 use rmux_proto::{
373 encode_frame, CommandOutput, FrameDecoder, HandshakeRequest, HandshakeResponse,
374 ListPanesRequest, ListPanesResponse, Request, Response, SendKeysExtRequest,
375 SendKeysResponse, CAPABILITY_HANDSHAKE, CAPABILITY_SDK_PANE_BROADCAST,
376 };
377
378 #[tokio::test]
379 async fn broadcast_falls_back_to_client_fanout_when_daemon_batch_is_unsupported() {
380 let (client_stream, mut server_stream) = tokio::io::duplex(4096);
381 let transport = TransportClient::spawn(client_stream);
382 let session_name = SessionName::new("broadcastfallback").expect("valid session name");
383 let pane = Pane::new(
384 PaneRef::new(session_name.clone(), 0, 0),
385 RmuxEndpoint::Default,
386 None,
387 transport,
388 );
389 let broadcast_task =
390 tokio::spawn(async move { broadcast(&[pane], Input::Text("printf ok")).await });
391
392 match read_request(&mut server_stream).await {
393 Request::Handshake(HandshakeRequest {
394 required_capabilities,
395 ..
396 }) => {
397 assert!(required_capabilities
398 .iter()
399 .any(|capability| capability == CAPABILITY_HANDSHAKE));
400 assert!(!required_capabilities
401 .iter()
402 .any(|capability| capability == CAPABILITY_SDK_PANE_BROADCAST));
403 }
404 request => panic!("expected broadcast capability handshake, got {request:?}"),
405 }
406 write_response(
407 &mut server_stream,
408 Response::Handshake(HandshakeResponse {
409 wire_version: rmux_proto::RMUX_WIRE_VERSION,
410 capabilities: vec![CAPABILITY_HANDSHAKE.to_owned()],
411 }),
412 )
413 .await;
414
415 match read_request(&mut server_stream).await {
416 Request::ListPanes(ListPanesRequest {
417 target,
418 target_window_index,
419 ..
420 }) => {
421 assert_eq!(target, session_name);
422 assert_eq!(target_window_index, Some(0));
423 }
424 request => panic!("expected client fallback pane-id lookup, got {request:?}"),
425 }
426 write_response(
427 &mut server_stream,
428 Response::ListPanes(ListPanesResponse {
429 output: CommandOutput::from_stdout("0:0:%1\n"),
430 }),
431 )
432 .await;
433
434 match read_request(&mut server_stream).await {
435 Request::SendKeysExt(SendKeysExtRequest {
436 keys,
437 literal,
438 target,
439 ..
440 }) => {
441 assert_eq!(keys, ["printf ok"]);
442 assert!(literal);
443 assert!(target.is_some());
444 }
445 request => panic!("expected client-side send-keys fallback, got {request:?}"),
446 }
447 write_response(
448 &mut server_stream,
449 Response::SendKeys(SendKeysResponse { key_count: 1 }),
450 )
451 .await;
452
453 let result = broadcast_task
454 .await
455 .expect("broadcast task")
456 .expect("fallback succeeds");
457 assert_eq!(result.len(), 1);
458 assert_eq!(result.successes()[0].pane_id(), Some(PaneId::new(1)));
459 }
460
461 async fn read_request(stream: &mut tokio::io::DuplexStream) -> Request {
462 let mut decoder = FrameDecoder::new();
463 let mut buffer = [0_u8; 256];
464
465 loop {
466 if let Some(request) = decoder
467 .next_frame::<Request>()
468 .expect("request frame decodes")
469 {
470 return request;
471 }
472
473 let read = stream.read(&mut buffer).await.expect("read request");
474 assert_ne!(read, 0, "client closed before request arrived");
475 decoder.push_bytes(&buffer[..read]);
476 }
477 }
478
479 async fn write_response(stream: &mut tokio::io::DuplexStream, response: Response) {
480 let frame = encode_frame(&response).expect("response encodes");
481 stream.write_all(&frame).await.expect("write response");
482 stream.flush().await.expect("flush response");
483 }
484}