rtcom-core 0.2.1

Core library for rtcom (Rust Terminal Communication): serial device abstraction, event bus, and session orchestration.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
//! Integration tests for [`Session`]: drives a real PTY pair via the
//! default backend so we exercise the full read → bus → write → device
//! pipeline (no mocks).
//!
//! Unix-only because [`SerialPortDevice::pair`] is not available on Windows.

// Linux-only for the same reason as pty_roundtrip.rs: macOS PTYs
// behave subtly differently and the Session loop's read/write timing
// assumptions don't hold there. Linux gives canonical PTY semantics
// that match real serial devices closely enough for the tests to
// stay deterministic.
#![cfg(target_os = "linux")]

use std::time::Duration;

use bytes::Bytes;
use rtcom_core::{Command, Event, LineEnding, LineEndingMapper, SerialPortDevice, Session};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::sync::broadcast::Receiver;
use tokio::time::timeout;

/// Hard upper bound for any single operation. Generous because PTY
/// scheduling on busy CI runners can be slow.
const STEP_TIMEOUT: Duration = Duration::from_secs(2);

#[tokio::test]
async fn session_publishes_rx_bytes_for_external_writes() {
    let (mut external, internal) = SerialPortDevice::pair().expect("pty pair");
    let session = Session::new(internal);
    let bus = session.bus().clone();
    let cancel = session.cancellation_token();
    let mut rx = bus.subscribe();

    let session_handle = tokio::spawn(session.run());

    // First event: connected.
    let event = timeout(STEP_TIMEOUT, rx.recv())
        .await
        .expect("timed out waiting for DeviceConnected")
        .expect("bus closed before DeviceConnected");
    assert!(matches!(event, Event::DeviceConnected));

    external.write_all(b"hello").await.unwrap();
    external.flush().await.unwrap();

    // We may receive bytes in one or more chunks depending on PTY chunking.
    let mut received = Vec::new();
    while received.len() < 5 {
        let event = timeout(STEP_TIMEOUT, rx.recv())
            .await
            .expect("timed out waiting for RxBytes")
            .expect("bus closed before RxBytes arrived");
        if let Event::RxBytes(bytes) = event {
            received.extend_from_slice(&bytes);
        }
    }
    assert_eq!(&received, b"hello");

    cancel.cancel();
    timeout(STEP_TIMEOUT, session_handle)
        .await
        .expect("session did not shut down")
        .expect("session task panicked")
        .expect("session returned error");
}

#[tokio::test]
async fn session_writes_tx_bytes_to_device() {
    let (mut external, internal) = SerialPortDevice::pair().expect("pty pair");
    let session = Session::new(internal);
    let bus = session.bus().clone();
    let cancel = session.cancellation_token();

    let session_handle = tokio::spawn(session.run());

    // Give the writer task a moment to subscribe before we publish.
    tokio::task::yield_now().await;

    bus.publish(Event::TxBytes(Bytes::from_static(b"ping")));

    let mut wire = [0_u8; 4];
    timeout(STEP_TIMEOUT, external.read_exact(&mut wire))
        .await
        .expect("timed out reading from external end")
        .expect("read failed");
    assert_eq!(&wire, b"ping");

    cancel.cancel();
    timeout(STEP_TIMEOUT, session_handle)
        .await
        .expect("session did not shut down")
        .expect("session task panicked")
        .expect("session returned error");
}

#[tokio::test]
async fn cancellation_unblocks_run_with_no_io_pending() {
    let (_external, internal) = SerialPortDevice::pair().expect("pty pair");
    let session = Session::new(internal);
    let cancel = session.cancellation_token();

    let session_handle = tokio::spawn(session.run());

    // No traffic at all — both tasks are blocked on read/recv. Cancelling
    // must unblock them and let run() return promptly.
    cancel.cancel();
    timeout(STEP_TIMEOUT, session_handle)
        .await
        .expect("session did not shut down on cancel")
        .expect("session task panicked")
        .expect("session returned error");
}

/// Drain bus events until the predicate matches one. Bounded by
/// `STEP_TIMEOUT` so a missing event fails the test instead of hanging.
async fn wait_for(rx: &mut Receiver<Event>, mut pred: impl FnMut(&Event) -> bool) -> Event {
    timeout(STEP_TIMEOUT, async move {
        loop {
            match rx.recv().await {
                Ok(event) if pred(&event) => return event,
                Ok(_) => {}
                Err(err) => panic!("bus error before match: {err:?}"),
            }
        }
    })
    .await
    .expect("predicate never matched within STEP_TIMEOUT")
}

#[tokio::test]
async fn omap_add_cr_to_lf_converts_lf_to_crlf_on_wire() {
    let (mut external, internal) = SerialPortDevice::pair().expect("pty pair");
    let session = Session::new(internal).with_omap(LineEndingMapper::new(LineEnding::AddCrToLf));
    let bus = session.bus().clone();
    let cancel = session.cancellation_token();

    let session_handle = tokio::spawn(session.run());
    tokio::task::yield_now().await;

    bus.publish(Event::TxBytes(Bytes::from_static(b"hi\n")));

    let mut wire = [0_u8; 4];
    timeout(STEP_TIMEOUT, external.read_exact(&mut wire))
        .await
        .expect("timed out reading mapped bytes")
        .expect("read failed");
    assert_eq!(&wire, b"hi\r\n");

    cancel.cancel();
    timeout(STEP_TIMEOUT, session_handle)
        .await
        .expect("session did not shut down")
        .expect("session task panicked")
        .expect("session returned error");
}

#[tokio::test]
async fn imap_add_cr_to_lf_converts_received_lf_to_crlf_in_event() {
    let (mut external, internal) = SerialPortDevice::pair().expect("pty pair");
    let session = Session::new(internal).with_imap(LineEndingMapper::new(LineEnding::AddCrToLf));
    let bus = session.bus().clone();
    let cancel = session.cancellation_token();
    let mut rx = bus.subscribe();

    let session_handle = tokio::spawn(session.run());
    tokio::task::yield_now().await;

    external.write_all(b"hi\n").await.unwrap();
    external.flush().await.unwrap();

    let mut received = Vec::new();
    while received.len() < 4 {
        let event = wait_for(&mut rx, |e| matches!(e, Event::RxBytes(_))).await;
        if let Event::RxBytes(bytes) = event {
            received.extend_from_slice(&bytes);
        }
    }
    assert_eq!(&received[..4], b"hi\r\n");

    cancel.cancel();
    timeout(STEP_TIMEOUT, session_handle)
        .await
        .expect("session did not shut down")
        .expect("session task panicked")
        .expect("session returned error");
}

#[tokio::test]
async fn quit_command_returns_run() {
    let (_external, internal) = SerialPortDevice::pair().expect("pty pair");
    let session = Session::new(internal);
    let bus = session.bus().clone();

    let session_handle = tokio::spawn(session.run());
    tokio::task::yield_now().await;

    bus.publish(Event::Command(Command::Quit));

    timeout(STEP_TIMEOUT, session_handle)
        .await
        .expect("session did not shut down on Quit command")
        .expect("session task panicked")
        .expect("session returned error");
}

/// Spawns a session, yields once so the loop subscribes, publishes
/// `cmd`, waits for a `SystemMessage` to arrive on `rx`, then shuts the
/// session down cleanly. Returns the message text.
async fn capture_system_message(cmd: Command) -> String {
    let (_external, internal) = SerialPortDevice::pair().expect("pty pair");
    let session = Session::new(internal);
    let bus = session.bus().clone();
    let cancel = session.cancellation_token();
    let mut rx = bus.subscribe();

    let session_handle = tokio::spawn(session.run());
    tokio::task::yield_now().await;

    bus.publish(Event::Command(cmd));

    let event = wait_for(&mut rx, |e| matches!(e, Event::SystemMessage(_))).await;
    let text = match event {
        Event::SystemMessage(t) => t,
        other => panic!("unexpected: {other:?}"),
    };

    cancel.cancel();
    timeout(STEP_TIMEOUT, session_handle)
        .await
        .expect("session did not shut down")
        .expect("session task panicked")
        .expect("session returned error");
    text
}

#[tokio::test]
async fn show_config_command_emits_system_message_with_current_settings() {
    let text = capture_system_message(Command::ShowConfig).await;
    // Default config is 115200 8N1 — at minimum the baud should appear.
    assert!(
        text.contains("115200"),
        "expected baud in SystemMessage: {text:?}"
    );
}

#[tokio::test]
async fn help_command_emits_system_message_listing_keys() {
    let text = capture_system_message(Command::Help).await;
    assert!(
        text.to_lowercase().contains("quit"),
        "expected Help text to mention 'quit': {text:?}"
    );
}

/// Same idea as `capture_system_message`, but accepts an `Event::Error`
/// outcome too — useful for tests of device-control commands that PTYs
/// may reject (DTR/RTS/break ioctls). The error-path acceptance proves
/// the dispatcher tried, even if the kernel turned it down.
async fn dispatch_and_wait_for_message_or_error(cmd: Command) -> Option<String> {
    let (_external, internal) = SerialPortDevice::pair().expect("pty pair");
    let session = Session::new(internal);
    let bus = session.bus().clone();
    let cancel = session.cancellation_token();
    let mut rx = bus.subscribe();

    let session_handle = tokio::spawn(session.run());
    tokio::task::yield_now().await;

    bus.publish(Event::Command(cmd));

    let event = wait_for(&mut rx, |e| {
        matches!(e, Event::SystemMessage(_) | Event::Error(_))
    })
    .await;
    let text = match event {
        Event::SystemMessage(t) => Some(t),
        Event::Error(_) => None,
        other => panic!("unexpected: {other:?}"),
    };

    cancel.cancel();
    timeout(STEP_TIMEOUT, session_handle)
        .await
        .expect("session did not shut down")
        .expect("session task panicked")
        .expect("session returned error");
    text
}

#[tokio::test]
async fn initial_dtr_lowered_first_toggle_yields_asserted() {
    // Mirrors what main does after `--lower-dtr`: caller already
    // called `device.set_dtr(false)` and tells Session about it via
    // the builder so the cached state is honest. The first ^A t
    // toggle should therefore raise the line, not lower it again.
    let (_external, internal) = SerialPortDevice::pair().expect("pty pair");
    let session = Session::new(internal).with_initial_dtr(false);
    let bus = session.bus().clone();
    let cancel = session.cancellation_token();
    let mut rx = bus.subscribe();

    let h = tokio::spawn(session.run());
    tokio::task::yield_now().await;

    bus.publish(Event::Command(Command::ToggleDtr));

    let event = wait_for(&mut rx, |e| {
        matches!(e, Event::SystemMessage(_) | Event::Error(_))
    })
    .await;
    if let Event::SystemMessage(text) = event {
        assert_eq!(text, "DTR: asserted", "lowered → toggled must be asserted");
    }
    // Err path: the PTY rejected set_dtr; proves dispatcher tried.

    cancel.cancel();
    timeout(STEP_TIMEOUT, h)
        .await
        .expect("session shutdown")
        .expect("task panicked")
        .expect("session error");
}

#[tokio::test]
async fn initial_rts_lowered_first_toggle_yields_asserted() {
    let (_external, internal) = SerialPortDevice::pair().expect("pty pair");
    let session = Session::new(internal).with_initial_rts(false);
    let bus = session.bus().clone();
    let cancel = session.cancellation_token();
    let mut rx = bus.subscribe();

    let h = tokio::spawn(session.run());
    tokio::task::yield_now().await;

    bus.publish(Event::Command(Command::ToggleRts));

    let event = wait_for(&mut rx, |e| {
        matches!(e, Event::SystemMessage(_) | Event::Error(_))
    })
    .await;
    if let Event::SystemMessage(text) = event {
        assert_eq!(text, "RTS: asserted", "lowered → toggled must be asserted");
    }

    cancel.cancel();
    timeout(STEP_TIMEOUT, h)
        .await
        .expect("session shutdown")
        .expect("task panicked")
        .expect("session error");
}

#[tokio::test]
async fn toggle_dtr_command_emits_system_message_or_error() {
    if let Some(text) = dispatch_and_wait_for_message_or_error(Command::ToggleDtr).await {
        assert!(
            text.contains("DTR"),
            "expected DTR mention in SystemMessage: {text:?}"
        );
    }
    // Err path means the PTY rejected the ioctl, which still proves the
    // dispatcher hit set_dtr — that is what we care about here.
}

#[tokio::test]
async fn toggle_rts_command_emits_system_message_or_error() {
    if let Some(text) = dispatch_and_wait_for_message_or_error(Command::ToggleRts).await {
        assert!(
            text.contains("RTS"),
            "expected RTS mention in SystemMessage: {text:?}"
        );
    }
}

#[tokio::test]
async fn send_break_command_emits_system_message() {
    // PTYs may reject set_break entirely (return Err); accept either
    // SystemMessage("...break...") or Event::Error as success — both
    // mean the dispatcher tried.
    let (_external, internal) = SerialPortDevice::pair().expect("pty pair");
    let session = Session::new(internal);
    let bus = session.bus().clone();
    let cancel = session.cancellation_token();
    let mut rx = bus.subscribe();

    let session_handle = tokio::spawn(session.run());
    tokio::task::yield_now().await;

    bus.publish(Event::Command(Command::SendBreak));

    let event = wait_for(&mut rx, |e| {
        matches!(e, Event::SystemMessage(_) | Event::Error(_))
    })
    .await;
    match event {
        Event::SystemMessage(text) => assert!(
            text.to_lowercase().contains("break"),
            "expected break mention: {text:?}"
        ),
        Event::Error(_) => {} // PTY rejected set_break, acceptable
        other => panic!("unexpected: {other:?}"),
    }

    cancel.cancel();
    timeout(STEP_TIMEOUT, session_handle)
        .await
        .expect("session did not shut down")
        .expect("session task panicked")
        .expect("session returned error");
}

#[tokio::test]
async fn set_baud_command_updates_device_and_emits_config_changed() {
    let (_external, internal) = SerialPortDevice::pair().expect("pty pair");
    let session = Session::new(internal);
    let bus = session.bus().clone();
    let cancel = session.cancellation_token();
    let mut rx = bus.subscribe();

    let session_handle = tokio::spawn(session.run());
    tokio::task::yield_now().await;

    bus.publish(Event::Command(Command::SetBaud(9600)));

    let event = wait_for(&mut rx, |e| matches!(e, Event::ConfigChanged(_))).await;
    match event {
        Event::ConfigChanged(cfg) => assert_eq!(cfg.baud_rate, 9600),
        other => panic!("unexpected: {other:?}"),
    }

    cancel.cancel();
    timeout(STEP_TIMEOUT, session_handle)
        .await
        .expect("session did not shut down")
        .expect("session task panicked")
        .expect("session returned error");
}