pasta_lua 0.2.4

Pasta Lua - Lua integration for Pasta DSL
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
//! Transport: TCP listener + DAP-compliant Content-Length framing on an
//! I/O-ONLY thread (design "Transport & DapAdapter", requirements 3.1 / 5.5).
//!
//! # Role in the backend
//!
//! [`Transport`] is the wire layer of the debug backend. It owns a long-lived
//! listener thread (promoted from the PoC `transport_loop::serve`) that:
//!
//! 1. binds a [`TcpListener`] to the configured address (ONLY when debugging is
//!    enabled — `listen == None` opens nothing, R5.5),
//! 2. accepts exactly ONE client connection (single-client by design),
//! 3. bridges that socket to two `std::sync::mpsc` channels, reading framed JSON
//!    from the socket into an inbound channel and writing outbound JSON from a
//!    channel back to the socket.
//!
//! Unlike the PoC, which used a bare line protocol (`stopped <src> <line>\n`),
//! production uses **DAP-compliant `Content-Length` framing** (design
//! "Transport": "PoC の素朴な行プロトコルは DAP 準拠フレーミングへ作り直す").
//!
//! # I/O ONLY — never touches Lua (design "Transport"/"Architecture")
//!
//! The transport thread uses only `std::net` / `std::io` / `std::sync::mpsc` and
//! the existing `serde_json`. It MUST NOT touch `mlua::Lua` / Lua state: the
//! `mlua::Lua` handle is `!Send` and is pinned to the VM thread. The only seam
//! the transport exposes is a pair of channels carrying raw
//! [`serde_json::Value`] frames; DAP message SEMANTICS (initialize /
//! setBreakpoints / …) are owned by the DAP adapter (task 3.2), NOT here. This
//! layer is purely the byte/JSON wire boundary.
//!
//! # Wire frame: `Content-Length: <N>\r\n\r\n<json>` (byte length)
//!
//! A frame is a header block terminated by a blank line (`\r\n\r\n`) followed by
//! exactly `N` bytes of UTF-8 JSON body, where `N` is the **byte** length of the
//! body (NOT its char count — multi-byte UTF-8 such as Japanese makes the two
//! differ). Reads are robust to extra/reordered headers; only `Content-Length`
//! is significant. See [`read_frame`] / [`write_frame`].
//!
//! # Clean shutdown (no hang on EOF / disconnect)
//!
//! The listener thread returns safely on socket EOF, an I/O error, or channel
//! disconnect (the inbound `Sender` being dropped, or the outbound `Receiver`
//! being dropped) — mirroring the PoC's "safe return on error" so the thread
//! never hangs. [`Transport::shutdown`] drops the outbound sender and unblocks
//! the writer; the reader unblocks on the next socket EOF / error. Tests use a
//! TEST-ONLY `set_read_timeout` and bounded joins so CI cannot hang; the
//! production path has no timeout baked in.

use std::io::{self, BufRead, BufReader, Write};
use std::net::{SocketAddr, TcpListener};
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{Receiver, Sender};
use std::thread::JoinHandle;
use std::time::Duration;

use serde_json::Value;
use socket2::{Domain, Protocol, Socket, Type};

use crate::debug::DebugError;

/// The DAP header that carries the body byte length.
const CONTENT_LENGTH: &str = "Content-Length";

/// Upper bound for an inbound frame body accepted by [`read_frame`].
///
/// The `Content-Length` value is **attacker-controlled** (the TCP debugger
/// client is a trust boundary): without a cap, a single malicious header could
/// drive an arbitrarily large body allocation before any byte of the body is
/// read (memory-exhaustion DoS). Real DAP messages are tiny; 16 MiB is far
/// above any legitimate frame while keeping the worst-case allocation bounded.
const MAX_CONTENT_LENGTH: usize = 16 * 1024 * 1024;

/// Poll cadence for the interruptible non-blocking `accept()` loop in
/// [`serve`]. Matches the established 5ms cooperative-poll convention used by
/// the socket bridge (`wiring::POLL_INTERVAL`): small enough to be
/// imperceptible at teardown while keeping the parked accept from busy-spinning
/// (it sleeps for this interval when no client is waiting). The shutdown flag is
/// checked once per interval, so a parked accept winds down within ~one
/// `POLL_INTERVAL` of [`Transport::shutdown`] / drop (design "State Management").
const POLL_INTERVAL: Duration = Duration::from_millis(5);

// ---------------------------------------------------------------------------
// Frame codec (Content-Length framing) — pure, Lua-free, unit-testable
// ---------------------------------------------------------------------------

/// Serialize `value` into a `Content-Length`-framed DAP wire frame and write it
/// to `out`.
///
/// The body is compact UTF-8 JSON; the header reports its **byte** length
/// (`buf.len()` of the UTF-8 encoding, NOT the char count), then a blank
/// `\r\n\r\n` separates the header block from the body. The whole frame is
/// flushed so the peer can read it immediately.
///
/// I/O only — never touches Lua.
pub(crate) fn write_frame<W: Write>(out: &mut W, value: &Value) -> io::Result<()> {
    // Compact JSON body. `to_vec` yields the exact UTF-8 bytes; the header MUST
    // use this byte length (multi-byte UTF-8 makes bytes != chars).
    let body = serde_json::to_vec(value)
        .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
    write!(out, "{CONTENT_LENGTH}: {}\r\n\r\n", body.len())?;
    out.write_all(&body)?;
    out.flush()
}

/// Read one `Content-Length`-framed DAP wire frame from `reader` and parse the
/// body into a [`serde_json::Value`].
///
/// Parsing is robust to header ordering and to extra headers: the header block
/// is read line by line until a blank line (the `\r\n\r\n` separator), and only
/// the `Content-Length` header is significant (its name is matched
/// case-insensitively, surrounding whitespace trimmed). Then EXACTLY that many
/// body bytes are read (no over- or under-read), decoded as UTF-8, and parsed.
///
/// Returns `Ok(None)` on a clean EOF *before* any header bytes (the peer closed
/// the connection between frames). Any malformed frame (missing
/// `Content-Length`, truncated body, non-UTF-8, invalid JSON) is an
/// [`io::Error`].
///
/// I/O only — never touches Lua.
pub(crate) fn read_frame<R: BufRead>(reader: &mut R) -> io::Result<Option<Value>> {
    let mut content_length: Option<usize> = None;
    let mut saw_any_header_byte = false;

    // (1) Read the header block, line by line, until a blank line.
    loop {
        let mut line = String::new();
        let n = reader.read_line(&mut line)?;
        if n == 0 {
            // EOF. If it landed exactly between frames (no header bytes read),
            // it's a clean close; otherwise the frame was truncated.
            if saw_any_header_byte {
                return Err(io::Error::new(
                    io::ErrorKind::UnexpectedEof,
                    "EOF in the middle of a frame header block",
                ));
            }
            return Ok(None);
        }
        saw_any_header_byte = true;

        // The blank line (`\r\n` or `\n`) terminates the header block.
        let trimmed = line.trim_end_matches(['\r', '\n']);
        if trimmed.is_empty() {
            break;
        }

        // Parse `Header-Name: value`; only Content-Length matters. Robust to
        // ordering and to additional headers (which are ignored).
        if let Some((name, value)) = trimmed.split_once(':')
            && name.trim().eq_ignore_ascii_case(CONTENT_LENGTH)
        {
            let parsed = value.trim().parse::<usize>().map_err(|_| {
                io::Error::new(
                    io::ErrorKind::InvalidData,
                    format!("invalid Content-Length value: {value:?}"),
                )
            })?;
            content_length = Some(parsed);
        }
    }

    let len = content_length.ok_or_else(|| {
        io::Error::new(
            io::ErrorKind::InvalidData,
            "frame header block missing Content-Length",
        )
    })?;

    // DoS guard: the length is attacker-controlled, so reject absurd values
    // BEFORE allocating the body buffer (see [`MAX_CONTENT_LENGTH`]).
    if len > MAX_CONTENT_LENGTH {
        return Err(io::Error::new(
            io::ErrorKind::InvalidData,
            format!("Content-Length {len} exceeds the maximum {MAX_CONTENT_LENGTH}"),
        ));
    }

    // (2) Read EXACTLY `len` body bytes (no over/under-read).
    let mut body = vec![0u8; len];
    reader.read_exact(&mut body)?;

    // (3) Decode UTF-8 and parse JSON.
    let text = String::from_utf8(body)
        .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
    let value = serde_json::from_str::<Value>(&text)
        .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
    Ok(Some(value))
}

// ---------------------------------------------------------------------------
// Transport: bind + accept (single client) + socket<->channel bridge
// ---------------------------------------------------------------------------

/// The wire-layer transport: a long-lived listener thread bridging one TCP
/// client to inbound/outbound [`serde_json::Value`] channels.
///
/// Built by [`Transport::start`]. When `listen == None` the constructor opens
/// NOTHING (no bind, no port, no thread — R5.5) and the inbound channel yields
/// nothing. When `listen == Some(addr)` it binds, spawns the listener thread,
/// and accepts exactly one client.
///
/// The owner reads inbound frames from [`Transport::inbound`] and pushes
/// outbound frames via [`Transport::outbound`]. Dropping the [`Transport`] (or
/// calling [`shutdown`](Transport::shutdown)) drops the outbound sender, which
/// unblocks and ends the writer side of the bridge.
pub(crate) struct Transport {
    /// Inbound frames parsed off the socket (reader → owner). `None` when
    /// disabled (`listen == None`), so the owner observes an immediately-closed
    /// channel and never blocks.
    inbound: Receiver<Value>,
    /// Outbound frames to write to the socket (owner → writer). `None` when
    /// disabled; sending is a silent no-op (the channel is already closed).
    outbound: Option<Sender<Value>>,
    /// The listener thread join handle (long-lived). `None` when disabled (no
    /// thread was spawned).
    handle: Option<JoinHandle<()>>,
    /// The bound local address, when enabled. `None` when disabled (R5.5: no
    /// port is opened, so there is no address to report).
    local_addr: Option<SocketAddr>,
    /// Internal cooperative shutdown signal (R2.2/R2.3). A clone is moved into
    /// [`serve`], whose non-blocking accept poll loop checks it once per
    /// [`POLL_INTERVAL`]; setting it (via [`shutdown`](Transport::shutdown) or
    /// [`Drop`]) interrupts a parked accept so the listener thread can wind down
    /// and its port be released. This is the fix for the parked-listener
    /// port-leak that breaks unload→reload. The same flag also drives the
    /// CONNECTED-state writer poll in [`serve`] (interruptible `recv_timeout` +
    /// flag check), after which the reader sub-thread is joined — so a connected
    /// client's socket is released synchronously at teardown too (R2.5).
    shutdown: Arc<AtomicBool>,
}

impl Transport {
    /// Start the transport for `listen`.
    ///
    /// - `listen == None` → **opens nothing** (no bind, no port, no thread —
    ///   R5.5). Returns a disabled [`Transport`] whose `inbound` is an
    ///   already-closed channel and whose `outbound` is `None`. [`local_addr`]
    ///   is `None`. This is the zero-network-footprint disabled path.
    /// - `listen == Some(addr)` → binds a [`TcpListener`] (a bind failure maps
    ///   to [`DebugError::Bind`]), records the bound [`local_addr`], spawns the
    ///   long-lived listener thread, and accepts exactly one client.
    ///
    /// I/O only — never touches Lua.
    ///
    /// [`local_addr`]: Transport::local_addr
    pub(crate) fn start(listen: Option<SocketAddr>) -> Result<Self, DebugError> {
        let Some(addr) = listen else {
            // R5.5: disabled → open nothing. Hand back a Transport whose inbound
            // channel is already closed (the Sender is dropped here) so the owner
            // never blocks waiting on a port that will never exist.
            let (_dead_tx, inbound) = std::sync::mpsc::channel::<Value>();
            return Ok(Self {
                inbound,
                outbound: None,
                handle: None,
                local_addr: None,
                shutdown: Arc::new(AtomicBool::new(false)),
            });
        };

        // Enabled: build the listener via socket2 so SO_REUSEADDR is set BEFORE
        // bind (R3.1). std `TcpListener::bind` cannot set SO_REUSEADDR pre-bind,
        // so we drive the raw socket: SO_REUSEADDR → bind → listen → convert to a
        // std `TcpListener`, then set it NON-BLOCKING. Non-blocking is the
        // precondition for `serve()`'s interruptible accept poll loop: a parked
        // accept must yield `WouldBlock` so the loop can check the shutdown flag
        // and wind down (R2.2/R2.3). Any socket2 / nonblocking step failing maps
        // to `DebugError::Bind` (same as today's bind-failure path, R3.1).
        let socket = Socket::new(Domain::IPV4, Type::STREAM, Some(Protocol::TCP))
            .map_err(DebugError::Bind)?;
        socket.set_reuse_address(true).map_err(DebugError::Bind)?; // SO_REUSEADDR (R3.1/R3.2)
        socket.bind(&addr.into()).map_err(DebugError::Bind)?;
        socket.listen(1).map_err(DebugError::Bind)?; // single-client design → tiny backlog
        let listener = TcpListener::from(socket);
        let local_addr = listener.local_addr().map_err(DebugError::Bind)?;
        // Interruptible accept (R2.2): the poll loop in `serve` relies on this.
        listener.set_nonblocking(true).map_err(DebugError::Bind)?;

        // Channels are the ONLY seam. The transport thread owns the socket ends;
        // the owner keeps the other ends.
        let (in_tx, in_rx) = std::sync::mpsc::channel::<Value>();
        let (out_tx, out_rx) = std::sync::mpsc::channel::<Value>();

        // Internal shutdown signal: a clone is moved into `serve` so the accept
        // poll loop can be interrupted; the `Transport` retains the original to
        // set on `shutdown()` / `Drop`.
        let shutdown = Arc::new(AtomicBool::new(false));
        let serve_shutdown = Arc::clone(&shutdown);

        // Long-lived listener thread (socket I/O only — no Lua).
        let handle = std::thread::spawn(move || {
            serve(listener, in_tx, out_rx, serve_shutdown);
        });

        Ok(Self {
            inbound: in_rx,
            outbound: Some(out_tx),
            handle: Some(handle),
            local_addr: Some(local_addr),
            shutdown,
        })
    }

    /// The bound local address, or `None` when disabled (R5.5: no port opened).
    pub(crate) fn local_addr(&self) -> Option<SocketAddr> {
        self.local_addr
    }

    /// Receiver of inbound frames parsed off the socket (reader → owner).
    ///
    /// When disabled this channel is already closed, so `recv()` returns `Err`
    /// immediately and the owner never blocks on a non-existent connection.
    pub(crate) fn inbound(&self) -> &Receiver<Value> {
        &self.inbound
    }

    /// Send `value` as an outbound frame (owner → writer → socket).
    ///
    /// Returns `Ok(())` if it was queued for the writer. When disabled, or after
    /// the writer has gone (peer disconnected / shut down), returns
    /// [`DebugError::Disconnected`] so the owner can stop the session cleanly.
    pub(crate) fn send(&self, value: Value) -> Result<(), DebugError> {
        match &self.outbound {
            Some(tx) => tx.send(value).map_err(|_| DebugError::Disconnected),
            None => Err(DebugError::Disconnected),
        }
    }

    /// Signal shutdown: drop the outbound sender so the writer side unblocks and
    /// the listener thread can wind down. Idempotent; the reader side completes
    /// on the next socket EOF / error.
    ///
    /// Production teardown goes through [`Drop`] (same effect); this explicit
    /// form is exercised by the `#[cfg(test)]` teardown paths.
    #[allow(dead_code)] // test-facing; production uses Drop (kept per design seam)
    pub(crate) fn shutdown(&mut self) {
        // Set the internal shutdown flag so a parked non-blocking accept poll
        // loop in `serve` observes it within one POLL_INTERVAL and returns even
        // when NO client ever connects (R2.2/R2.3). `Release` pairs with the
        // loop's `Acquire` load.
        self.shutdown.store(true, Ordering::Release);
        // Dropping the outbound Sender closes the channel; the writer loop sees a
        // disconnect and returns. The reader loop returns on socket EOF/error.
        self.outbound = None;
    }

    /// Join the listener thread (used by tests / orderly teardown). No-op when
    /// disabled (no thread was spawned).
    #[allow(dead_code)] // test-facing bounded-teardown helper (used by #[cfg(test)])
    pub(crate) fn join(&mut self) {
        if let Some(handle) = self.handle.take() {
            let _ = handle.join();
        }
    }

    /// TEST-ONLY: raise the internal cooperative shutdown flag WITHOUT dropping
    /// the outbound sender.
    ///
    /// Production teardown ([`shutdown`](Transport::shutdown) / [`Drop`]) always
    /// raises the flag AND drops the outbound sender together, so either signal
    /// alone is enough to stop the writer. This helper isolates the FLAG signal
    /// so a test can prove the connected writer loop breaks on the flag even
    /// while the outbound channel is still open (R2.5) — the property that the
    /// pre-2.3 `while out_rx.recv()` loop did not have.
    #[cfg(test)]
    fn signal_shutdown_flag_only(&self) {
        self.shutdown.store(true, Ordering::Release);
    }
}

impl Drop for Transport {
    fn drop(&mut self) {
        // Synchronous teardown (R2.1/R2.2/R2.4/R2.5): set the internal shutdown
        // flag FIRST so a `serve` parked in the interruptible accept poll loop —
        // or in the connected-state writer poll — observes it within one
        // POLL_INTERVAL, then drop the outbound sender to unblock the writer.
        // ORDER MATTERS: the flag must be set BEFORE the join, otherwise the join
        // would wait on a `serve()` that has not yet been told to stop.
        self.shutdown.store(true, Ordering::Release);
        self.outbound = None;

        // Block-JOIN the serve listener thread so the bound port is released
        // BEFORE drop returns (no detached-listener port leak — the root cause of
        // the unload→reload 10048; design "State Management" invariant: after
        // teardown the handle is joined, not detached). This join is BOUNDED, not
        // a hang risk: every blocking point inside `serve()` is an interruptible
        // POLL_INTERVAL poll (accept poll, connected writer poll, and the reader
        // sub-thread's own flag poll), so `serve()` winds down within ~one
        // POLL_INTERVAL of the flag being set above. If the handle was already
        // taken (e.g. by the test-only `join()` / watchdog helper, or a prior
        // `shutdown()`+`join()`), this is a no-op — no double-join.
        if let Some(handle) = self.handle.take() {
            let _ = handle.join();
        }
    }
}

/// The listener thread body (long-lived, socket I/O only — never touches Lua).
///
/// Accepts exactly one client (single-client by design), then runs the
/// socket↔channel bridge with two sub-threads:
/// - **reader** (sub-thread, `JoinHandle` kept): parse `Content-Length` frames
///   off the socket and forward each as a [`serde_json::Value`] to `in_tx`.
///   Returns on EOF / I/O error / when `in_tx`'s receiver is gone, OR when the
///   `shutdown` flag is observed between frames. To make the flag interruption
///   reliable cross-platform the reader's socket carries a [`POLL_INTERVAL`]
///   read timeout: it polls for inbound data at each frame boundary (a timeout
///   yields no data → re-check the flag), then parses a full frame with blocking
///   reads once data is present (so framing is never split by the timeout).
/// - **writer** (this thread): an interruptible poll of `out_rx` — each
///   iteration checks the `shutdown` flag, then `recv_timeout(POLL_INTERVAL)`;
///   it writes each value as a frame and returns when the flag is set, the
///   outbound channel disconnects (the [`Transport`]'s sender dropped), or a
///   socket write fails.
///
/// At teardown the writer breaks (flag OR disconnect), then `shutdown(Both)` is
/// called and the reader is JOINED before `serve` returns — so a connected
/// client's socket is released synchronously (R2.5). The reader join is bounded
/// because the reader winds down within one `POLL_INTERVAL` of the flag even
/// when the peer keeps the connection open (a local socket `shutdown` does NOT
/// reliably cancel an in-flight blocking recv on Windows, so the flag poll — not
/// the `shutdown(Both)` EOF — is the load-bearing interrupt for the connected
/// path; design "Transport" Risks). Mirrors the PoC's "safe return on error" so
/// neither side hangs.
///
/// The listener is NON-BLOCKING: `accept()` is polled on a [`POLL_INTERVAL`]
/// cadence so the `shutdown` flag can interrupt a parked accept (no client
/// connected). Once a client connects the listener is dropped immediately
/// (single-client design → no further accepts, earliest possible port release).
fn serve(
    listener: TcpListener,
    in_tx: Sender<Value>,
    out_rx: Receiver<Value>,
    shutdown: Arc<AtomicBool>,
) {
    // Interruptible accept poll loop (R2.2/R2.3): the listener is non-blocking,
    // so a parked accept yields `WouldBlock`; we sleep one POLL_INTERVAL and
    // re-check the shutdown flag. This is what lets a no-client teardown wind the
    // listener thread down (and release the port) instead of leaking it.
    let stream = loop {
        // Shutdown requested while waiting for a client → drop the listener (the
        // `match` binding `listener` is moved out at the end of `serve`; an early
        // return drops it here) and return with no client.
        if shutdown.load(Ordering::Acquire) {
            return;
        }
        match listener.accept() {
            // Got the single client → stop accepting. Drop the listener NOW so
            // the port is released at the earliest point (single-client design).
            Ok((s, _peer)) => {
                // The accepted stream inherits the listener's non-blocking flag
                // (Windows/BSD). The CONNECTED-state bridge below expects a
                // BLOCKING stream (the reader parks on a blocking read, the
                // writer blocks on write); restore blocking so a transient
                // `WouldBlock` is never mistaken for an error/EOF that would
                // abort the freshly accepted connection. (Interruptible poll for
                // the connected bridge is a later task; this keeps the existing
                // blocking bridge unchanged in behavior.)
                if s.set_nonblocking(false).is_err() {
                    return;
                }
                drop(listener);
                break s;
            }
            // No client yet → sleep one interval and re-check the flag.
            Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
                std::thread::sleep(POLL_INTERVAL);
                continue;
            }
            // Any other accept error → safe return (no hang), as today.
            Err(_) => return,
        }
    };
    // Read and write halves share the socket; clone so the reader thread owns one
    // half (BufReader) and the writer the other.
    let write_half = match stream.try_clone() {
        Ok(w) => w,
        Err(_) => return,
    };

    // Give the reader's socket a POLL_INTERVAL read timeout so its frame-boundary
    // poll can observe the shutdown flag within one interval even while the peer
    // keeps the connection open. A local `shutdown(Both)` does NOT reliably cancel
    // an in-flight blocking recv on Windows, so this cooperative poll — mirroring
    // the writer's `recv_timeout(POLL_INTERVAL)` — is what makes the reader join
    // bounded at teardown (R2.5). If setting the timeout fails we cannot guarantee
    // a bounded reader join, so return without spawning a reader we could not
    // interrupt (safe: no client bridge, serve winds down).
    if stream.set_read_timeout(Some(POLL_INTERVAL)).is_err() {
        return;
    }

    // Reader sub-thread: socket → in_tx. Its `JoinHandle` is KEPT (not detached)
    // and joined after the writer loop ends, so the accepted connection is
    // released SYNCHRONOUSLY before `serve` returns (R2.5). The reader returns on
    // EOF / I/O error / when `in_tx`'s receiver is gone, or when the `shutdown`
    // flag is observed between frames. Dropping `in_tx` here (when the reader
    // returns) closes the inbound channel — the owner's "reader done" signal.
    let reader_shutdown = Arc::clone(&shutdown);
    let reader_handle = std::thread::spawn(move || {
        let mut reader = BufReader::new(stream);
        loop {
            // (1) Teardown requested → stop between frames (bounded by one
            // POLL_INTERVAL of the read timeout below).
            if reader_shutdown.load(Ordering::Acquire) {
                return;
            }
            // (2) Frame-boundary poll: wait for inbound data with the read
            // timeout. No data this interval (Timeout/WouldBlock) → re-check the
            // flag. Clean EOF (empty fill) → peer closed → done. Other error →
            // safe return.
            match reader.fill_buf() {
                Ok([]) => return, // EOF between frames → peer closed
                Ok(_) => {}       // data buffered → a full frame can be parsed
                Err(e)
                    if e.kind() == io::ErrorKind::WouldBlock
                        || e.kind() == io::ErrorKind::TimedOut =>
                {
                    continue;
                }
                Err(_) => return,
            }
            // (3) Data is present at a frame boundary → parse ONE full frame with
            // BLOCKING reads. The read timeout is cleared for the duration of the
            // parse so a frame split across TCP segments is never mis-read as a
            // truncation error, then restored for the next boundary poll. (The
            // flag is only polled BETWEEN frames; a frame, once started, is read to
            // completion — DAP frames are tiny, so this window is negligible.)
            if reader.get_ref().set_read_timeout(None).is_err() {
                return;
            }
            let parsed = read_frame(&mut reader);
            if reader.get_ref().set_read_timeout(Some(POLL_INTERVAL)).is_err() {
                return;
            }
            match parsed {
                Ok(Some(value)) => {
                    // Owner gone → stop reading (clean shutdown).
                    if in_tx.send(value).is_err() {
                        return;
                    }
                }
                // Clean EOF between frames → peer closed → done.
                Ok(None) => return,
                // Malformed frame / I/O error → safe return (no hang).
                Err(_) => return,
            }
        }
    });

    // Connected-state writer loop runs on THIS (listener) thread: out_rx →
    // socket. It is an INTERRUPTIBLE poll (R2.5) so teardown breaks it on EITHER
    // the internal `shutdown` flag OR the outbound sender being dropped:
    // - flag set (e.g. `shutdown()`/`Drop`, or the flag alone) → FLUSH any
    //   already-queued frames, then break (the outbound owner — the socket bridge
    //   — relies on pending frames being flushed before teardown);
    // - `recv_timeout` `Ok(frame)` → write it (stop on a socket write error);
    // - `Timeout` → re-check the flag (this is the `POLL_INTERVAL` cadence);
    // - `Disconnected` (outbound sender dropped) → drain the rest, then done.
    let mut writer = write_half;
    'writer: loop {
        if shutdown.load(Ordering::Acquire) {
            // Teardown via the flag: the outbound sender may still be alive with
            // frames already queued (e.g. the bridge enqueued a final flush then
            // dropped the Transport, racing the flag). Drain whatever is currently
            // buffered so those frames still reach the peer, then break.
            while let Ok(value) = out_rx.try_recv() {
                if write_frame(&mut writer, &value).is_err() {
                    break;
                }
            }
            break;
        }
        match out_rx.recv_timeout(POLL_INTERVAL) {
            Ok(value) => {
                if write_frame(&mut writer, &value).is_err() {
                    // Socket write failed (peer gone) → stop writing.
                    break 'writer;
                }
            }
            // No outbound frame this interval → loop and re-check the flag.
            Err(std::sync::mpsc::RecvTimeoutError::Timeout) => continue,
            // Outbound sender dropped (shutdown / drop) → `recv_timeout` has
            // already yielded every queued frame above, so nothing remains; done.
            Err(std::sync::mpsc::RecvTimeoutError::Disconnected) => break 'writer,
        }
    }

    // Writer loop ended (flag, disconnect, or write error). Shut the socket down
    // (best-effort EOF for the reader on the peer-alive write path), THEN join the
    // reader so the accepted connection is released synchronously before `serve`
    // returns (R2.5). The join is bounded because the reader also winds down within
    // one POLL_INTERVAL via its own `shutdown` flag poll — on Windows a local
    // `shutdown(Both)` does NOT reliably cancel an in-flight blocking recv, so the
    // flag poll (not this shutdown) is the load-bearing interrupt. `serve` thus
    // never hangs on the reader.
    let _ = writer.shutdown(std::net::Shutdown::Both);
    let _ = reader_handle.join();
}

#[cfg(test)]
mod tests {
    use super::*;

    use std::io::Cursor;
    use std::net::TcpStream;
    use std::sync::mpsc::RecvTimeoutError;
    use std::time::Duration;

    use serde_json::json;

    /// TEST-ONLY watchdog / read timeout so CI cannot hang. The production
    /// transport has NO timeout baked in (design "スレッドモデル ④").
    const WATCHDOG: Duration = Duration::from_secs(10);

    /// Connect a test client to `addr`. The production peer is the external DAP
    /// client, so this loopback connector is test-only.
    fn connect_client(addr: SocketAddr) -> io::Result<TcpStream> {
        TcpStream::connect(addr)
    }

    // -----------------------------------------------------------------------
    // Frame codec unit tests (byte-length framing, header robustness, exactness)
    // -----------------------------------------------------------------------

    /// `write_frame` then `read_frame` round-trips an arbitrary JSON value.
    #[test]
    fn frame_round_trip_ascii() {
        let value = json!({
            "seq": 1,
            "type": "request",
            "command": "initialize",
            "arguments": { "adapterID": "pasta" }
        });

        let mut buf: Vec<u8> = Vec::new();
        write_frame(&mut buf, &value).expect("write_frame must succeed");

        let mut reader = Cursor::new(buf);
        let read = read_frame(&mut reader)
            .expect("read_frame must succeed")
            .expect("a frame must be present");
        assert_eq!(read, value, "round-trip must preserve the JSON value");
    }

    /// The `Content-Length` header MUST be the UTF-8 BYTE length, not the char
    /// count. A multi-byte payload (Japanese) proves byte-vs-char correctness.
    #[test]
    fn content_length_is_byte_length_not_char_count() {
        // "こんにちは" is 5 chars but 15 UTF-8 bytes.
        let payload = "こんにちは";
        assert_eq!(payload.chars().count(), 5);
        assert_eq!(payload.len(), 15);

        let value = json!({ "text": payload });

        let mut buf: Vec<u8> = Vec::new();
        write_frame(&mut buf, &value).expect("write must succeed");

        // The emitted header must carry the BYTE length of the JSON body.
        let text = String::from_utf8(buf.clone()).expect("frame is UTF-8");
        let body = serde_json::to_vec(&value).unwrap();
        let expected_header = format!("Content-Length: {}\r\n\r\n", body.len());
        assert!(
            text.starts_with(&expected_header),
            "header must report the BYTE length ({}), got frame starting: {:?}",
            body.len(),
            &text[..expected_header.len().min(text.len())]
        );

        // And it must round-trip exactly (no over/under-read of the multi-byte body).
        let mut reader = Cursor::new(buf);
        let read = read_frame(&mut reader)
            .expect("read must succeed")
            .expect("a frame must be present");
        assert_eq!(read, value, "multi-byte body must round-trip intact");
        assert_eq!(read["text"], json!(payload));
    }

    /// `read_frame` is robust to extra and reordered headers; only
    /// `Content-Length` is significant, matched case-insensitively.
    #[test]
    fn read_frame_tolerates_extra_and_reordered_headers() {
        let body = br#"{"ok":true}"#;
        // Extra header BEFORE Content-Length, a different-cased name, and a
        // trailing extra header — all must be ignored except Content-Length.
        let mut frame: Vec<u8> = Vec::new();
        frame.extend_from_slice(b"X-Extra: hello\r\n");
        frame.extend_from_slice(format!("content-length: {}\r\n", body.len()).as_bytes());
        frame.extend_from_slice(b"X-Another: world\r\n");
        frame.extend_from_slice(b"\r\n");
        frame.extend_from_slice(body);

        let mut reader = Cursor::new(frame);
        let read = read_frame(&mut reader)
            .expect("read must succeed with extra/reordered headers")
            .expect("a frame must be present");
        assert_eq!(read, json!({ "ok": true }));
    }

    /// `read_frame` reads EXACTLY N body bytes and does not consume the start of
    /// a following frame (no over-read).
    #[test]
    fn read_frame_reads_exactly_n_bytes_and_leaves_the_next_frame() {
        let first = json!({ "a": 1 });
        let second = json!({ "b": 2 });

        let mut buf: Vec<u8> = Vec::new();
        write_frame(&mut buf, &first).unwrap();
        write_frame(&mut buf, &second).unwrap();

        let mut reader = Cursor::new(buf);
        let r1 = read_frame(&mut reader).unwrap().expect("first frame");
        assert_eq!(r1, first, "first frame parsed");
        let r2 = read_frame(&mut reader).unwrap().expect("second frame");
        assert_eq!(r2, second, "second frame intact (no over-read of the first)");
        // A third read hits clean EOF between frames.
        assert!(
            read_frame(&mut reader).unwrap().is_none(),
            "clean EOF between frames yields Ok(None)"
        );
    }

    /// A missing `Content-Length` header is a framing error (not silent).
    #[test]
    fn read_frame_missing_content_length_is_error() {
        let mut frame: Vec<u8> = Vec::new();
        frame.extend_from_slice(b"X-Only: nope\r\n\r\n");
        frame.extend_from_slice(br#"{"x":1}"#);
        let mut reader = Cursor::new(frame);
        let err = read_frame(&mut reader).expect_err("missing Content-Length must error");
        assert_eq!(err.kind(), io::ErrorKind::InvalidData);
    }

    /// A truncated body (fewer than `Content-Length` bytes) is an error, not a
    /// short/partial parse.
    #[test]
    fn read_frame_truncated_body_is_error() {
        let mut frame: Vec<u8> = Vec::new();
        // Claim 20 bytes but provide far fewer.
        frame.extend_from_slice(b"Content-Length: 20\r\n\r\n");
        frame.extend_from_slice(br#"{"x":1}"#);
        let mut reader = Cursor::new(frame);
        let err = read_frame(&mut reader).expect_err("truncated body must error");
        assert_eq!(err.kind(), io::ErrorKind::UnexpectedEof);
    }

    /// A non-numeric `Content-Length` value is a framing error (`InvalidData`),
    /// not a silent skip — a malformed client cannot smuggle an unframed body.
    #[test]
    fn read_frame_non_numeric_content_length_is_error() {
        let mut frame: Vec<u8> = Vec::new();
        frame.extend_from_slice(b"Content-Length: abc\r\n\r\n");
        frame.extend_from_slice(br#"{"x":1}"#);
        let mut reader = Cursor::new(frame);
        let err = read_frame(&mut reader).expect_err("non-numeric Content-Length must error");
        assert_eq!(err.kind(), io::ErrorKind::InvalidData);
        assert!(
            err.to_string().contains("invalid Content-Length"),
            "error should name the bad header, got: {err}"
        );
    }

    /// EOF in the MIDDLE of a header block (some header bytes seen, no blank
    /// line yet) is `UnexpectedEof` — distinct from the clean `Ok(None)` EOF
    /// BETWEEN frames. This is the truncated-header half of the EOF contract.
    #[test]
    fn read_frame_eof_mid_header_is_unexpected_eof() {
        // A header line was read, but the stream ends before the blank line.
        let frame: Vec<u8> = b"Content-Length: 7\r\n".to_vec();
        let mut reader = Cursor::new(frame);
        let err = read_frame(&mut reader).expect_err("EOF mid-header must error");
        assert_eq!(err.kind(), io::ErrorKind::UnexpectedEof);
        assert!(
            err.to_string().contains("header"),
            "error should mention the header block, got: {err}"
        );
    }

    /// A body that is not valid UTF-8 is `InvalidData` (the DAP body is JSON,
    /// which is UTF-8 by definition), not a panic or a lossy decode.
    #[test]
    fn read_frame_non_utf8_body_is_error() {
        let body: &[u8] = &[0xFF, 0xFE, 0xFD, 0xFC]; // invalid UTF-8
        let mut frame: Vec<u8> = Vec::new();
        frame.extend_from_slice(format!("Content-Length: {}\r\n\r\n", body.len()).as_bytes());
        frame.extend_from_slice(body);
        let mut reader = Cursor::new(frame);
        let err = read_frame(&mut reader).expect_err("non-UTF-8 body must error");
        assert_eq!(err.kind(), io::ErrorKind::InvalidData);
    }

    /// A body that is valid UTF-8 but not valid JSON is `InvalidData`. Also
    /// pins `Content-Length: 0` (an empty body is NOT valid JSON, so a zero
    /// length frame is rejected rather than yielding a phantom value).
    #[test]
    fn read_frame_invalid_json_body_is_error() {
        // Valid UTF-8, invalid JSON.
        let body = b"not json at all";
        let mut frame: Vec<u8> = Vec::new();
        frame.extend_from_slice(format!("Content-Length: {}\r\n\r\n", body.len()).as_bytes());
        frame.extend_from_slice(body);
        let mut reader = Cursor::new(frame);
        let err = read_frame(&mut reader).expect_err("invalid JSON body must error");
        assert_eq!(err.kind(), io::ErrorKind::InvalidData);

        // Content-Length: 0 → empty body → invalid JSON → InvalidData.
        let mut zero: Vec<u8> = b"Content-Length: 0\r\n\r\n".to_vec();
        // Append a following frame to prove the zero-length read consumed nothing.
        write_frame(&mut zero, &json!({"after": true})).unwrap();
        let mut reader = Cursor::new(zero);
        let err = read_frame(&mut reader).expect_err("zero-length body must error");
        assert_eq!(err.kind(), io::ErrorKind::InvalidData);
    }

    /// G3 hardening regression: an attacker-controlled oversized
    /// `Content-Length` is rejected as `InvalidData` BEFORE the body buffer is
    /// allocated, so a malicious client cannot drive an unbounded allocation
    /// (memory-exhaustion DoS) with a single cheap header. At the limit itself
    /// the frame is still read normally (the guard is exclusive-above), which
    /// here surfaces as `UnexpectedEof` because no body bytes follow.
    #[test]
    fn read_frame_rejects_oversized_content_length_before_allocating() {
        // One byte over the cap → rejected up front (InvalidData, names the cap).
        let mut over: Vec<u8> = Vec::new();
        over.extend_from_slice(
            format!("Content-Length: {}\r\n\r\n", MAX_CONTENT_LENGTH + 1).as_bytes(),
        );
        let mut reader = Cursor::new(over);
        let err = read_frame(&mut reader).expect_err("oversized Content-Length must error");
        assert_eq!(err.kind(), io::ErrorKind::InvalidData);
        assert!(
            err.to_string().contains("exceeds"),
            "error should name the exceeded cap, got: {err}"
        );

        // Exactly AT the cap the guard does not fire: the read proceeds and
        // fails only because the body is absent (UnexpectedEof, not InvalidData).
        let mut at: Vec<u8> = Vec::new();
        at.extend_from_slice(format!("Content-Length: {MAX_CONTENT_LENGTH}\r\n\r\n").as_bytes());
        let mut reader = Cursor::new(at);
        let err = read_frame(&mut reader).expect_err("missing body must error");
        assert_eq!(err.kind(), io::ErrorKind::UnexpectedEof, "at-limit passes the guard");
    }

    /// Header lines terminated by bare `\n` (no `\r`) are tolerated: the
    /// terminator trim accepts both `\r\n` and `\n`, so a lenient client still
    /// frames correctly.
    #[test]
    fn read_frame_tolerates_bare_lf_header_endings() {
        let body = br#"{"lf":true}"#;
        let mut frame: Vec<u8> = Vec::new();
        frame.extend_from_slice(format!("Content-Length: {}\n", body.len()).as_bytes());
        frame.extend_from_slice(b"\n");
        frame.extend_from_slice(body);
        let mut reader = Cursor::new(frame);
        let read = read_frame(&mut reader)
            .expect("bare-LF headers must parse")
            .expect("a frame must be present");
        assert_eq!(read, json!({ "lf": true }));
    }

    // -----------------------------------------------------------------------
    // Disabled path: listen == None opens NO port (R5.5)
    // -----------------------------------------------------------------------

    /// R5.5: `listen == None` opens nothing — no bind, no port, no address. A
    /// connection attempt to any would-be port cannot reach this transport
    /// because it never bound one.
    #[test]
    fn disabled_listen_none_opens_no_port() {
        let transport = Transport::start(None).expect("disabled start must succeed");

        // No bound address is exposed (nothing was opened).
        assert!(
            transport.local_addr().is_none(),
            "disabled transport must not bind a port (R5.5)"
        );

        // The inbound channel is already closed → the owner never blocks on a
        // connection that will never arrive.
        match transport.inbound().recv_timeout(Duration::from_millis(50)) {
            Err(RecvTimeoutError::Disconnected) => {}
            other => panic!("disabled inbound must be a closed channel, got {other:?}"),
        }

        // Sending is a clean Disconnected (no socket exists).
        assert!(
            matches!(transport.send(json!({"x":1})), Err(DebugError::Disconnected)),
            "disabled send must report Disconnected (no socket)"
        );
    }

    /// `join` on a disabled transport is a no-op (no thread was ever spawned)
    /// and `shutdown` stays idempotent — neither hangs nor panics.
    #[test]
    fn disabled_join_and_shutdown_are_noops() {
        let mut transport = Transport::start(None).expect("disabled start must succeed");
        transport.join(); // no thread → returns immediately
        transport.shutdown();
        transport.shutdown(); // idempotent
        transport.join(); // still a no-op after shutdown
        assert!(transport.local_addr().is_none());
    }

    /// After `shutdown` the outbound sender is gone: `send` on an ENABLED
    /// transport reports `Disconnected` (the owner's clean stop signal), and
    /// `shutdown` is idempotent.
    #[test]
    fn send_after_shutdown_reports_disconnected() {
        let mut transport =
            Transport::start(Some("127.0.0.1:0".parse().unwrap())).expect("bind must succeed");
        assert!(transport.local_addr().is_some(), "enabled transport binds");

        transport.shutdown();
        assert!(
            matches!(transport.send(json!({"x":1})), Err(DebugError::Disconnected)),
            "send after shutdown must report Disconnected"
        );
        transport.shutdown(); // idempotent

        // Unblock the listener (it is parked in accept()) so the bounded join
        // can complete: connect-and-drop makes accept return, the reader sees
        // EOF, and the writer sees the already-closed outbound channel.
        let _ = connect_client(transport.local_addr().unwrap());
        join_transport_with_watchdog(transport, WATCHDOG);
    }

    /// R3.1: the enabled path is built via the `socket2` (SO_REUSEADDR) chain
    /// and still binds an OS-assigned loopback port, exposing a concrete
    /// `local_addr`. (SO_REUSEADDR cannot be read back via getsockopt once the
    /// socket is converted to `TcpListener`; the rebind/reuse behavior is
    /// verified by later tasks. This pins that the socket2 bind path works.)
    #[test]
    fn enabled_socket2_path_binds_and_exposes_local_addr() {
        let mut transport =
            Transport::start(Some("127.0.0.1:0".parse().unwrap())).expect("socket2 bind must succeed");
        let addr = transport
            .local_addr()
            .expect("socket2-built enabled transport must expose its bound addr (R3.1)");
        assert_eq!(addr.ip().to_string(), "127.0.0.1");
        assert_ne!(addr.port(), 0, "OS must assign a concrete port via the socket2 listener");

        // Unblock the listener (parked in accept()) so the bounded join completes.
        transport.shutdown();
        let _ = connect_client(addr);
        join_transport_with_watchdog(transport, WATCHDOG);
    }

    // -----------------------------------------------------------------------
    // Enabled path: bind 127.0.0.1:0, one client, framed round-trip both ways
    // -----------------------------------------------------------------------

    /// R3.1 + framing: bind an OS-assigned loopback port, connect a test client,
    /// and round-trip a Content-Length-framed JSON message BOTH directions:
    /// client → transport (delivered on `inbound`) and transport → client
    /// (received by the client as a correctly framed frame).
    #[test]
    fn enabled_round_trips_framed_json_both_directions() {
        let transport =
            Transport::start(Some("127.0.0.1:0".parse().unwrap())).expect("bind must succeed");
        let addr = transport
            .local_addr()
            .expect("enabled transport must expose its bound addr (R3.1)");
        assert_eq!(addr.ip().to_string(), "127.0.0.1");
        assert_ne!(addr.port(), 0, "OS must assign a concrete port");

        // Test client connects (single-client accept).
        let client = connect_client(addr).expect("client connect must succeed");
        client
            .set_read_timeout(Some(WATCHDOG))
            .expect("TEST-ONLY read timeout");
        let client_write = client.try_clone().expect("clone client");
        let mut client_reader = BufReader::new(client);
        let mut client_writer = client_write;

        // (A) client → transport: send a framed request; transport delivers it
        // parsed on `inbound`.
        let request = json!({ "seq": 7, "command": "setBreakpoints" });
        write_frame(&mut client_writer, &request).expect("client write must succeed");

        let delivered = transport
            .inbound()
            .recv_timeout(WATCHDOG)
            .expect("transport must deliver the inbound frame (R3.1)");
        assert_eq!(delivered, request, "inbound JSON must match what the client sent");

        // (B) transport → client: push an outbound frame; the client reads it as
        // a correctly framed frame.
        let response = json!({ "seq": 7, "type": "response", "success": true });
        transport.send(response.clone()).expect("transport send must succeed");

        let received = read_frame(&mut client_reader)
            .expect("client read must succeed")
            .expect("client must receive a frame");
        assert_eq!(received, response, "outbound JSON must match what the transport sent");

        // Clean teardown: drop the client (peer EOF) and the transport, then
        // join the listener thread bounded by the watchdog.
        drop(client_reader);
        drop(client_writer);
        let mut transport = transport;
        transport.shutdown();
        join_transport_with_watchdog(transport, WATCHDOG);
    }

    /// The transport thread carries RAW frames only (no DAP semantics): an
    /// arbitrary, non-DAP JSON shape round-trips unchanged. Proves this layer is
    /// the byte/JSON wire boundary, not a DAP parser (task 3.2 owns semantics).
    #[test]
    fn transport_carries_raw_values_without_interpreting_dap() {
        let transport =
            Transport::start(Some("127.0.0.1:0".parse().unwrap())).expect("bind must succeed");
        let addr = transport.local_addr().unwrap();

        let client = connect_client(addr).expect("connect");
        client.set_read_timeout(Some(WATCHDOG)).unwrap();
        let mut client_writer = client.try_clone().unwrap();

        // A shape that is NOT a DAP message at all.
        let weird = json!([1, "two", { "three": [true, null] }, "日本語"]);
        write_frame(&mut client_writer, &weird).expect("client write");

        let delivered = transport
            .inbound()
            .recv_timeout(WATCHDOG)
            .expect("inbound frame delivered");
        assert_eq!(delivered, weird, "raw value must pass through uninterpreted");

        drop(client);
        let mut transport = transport;
        transport.shutdown();
        join_transport_with_watchdog(transport, WATCHDOG);
    }

    /// Clean shutdown: dropping the client (peer EOF) lets the listener thread
    /// wind down without hanging. The owner observes the inbound channel close.
    #[test]
    fn client_disconnect_winds_down_without_hang() {
        let transport =
            Transport::start(Some("127.0.0.1:0".parse().unwrap())).expect("bind must succeed");
        let addr = transport.local_addr().unwrap();

        let client = connect_client(addr).expect("connect");
        // Immediately drop the client → peer EOF.
        drop(client);

        // The inbound channel must close (reader returned on EOF) within the
        // watchdog — proving no hang.
        match transport.inbound().recv_timeout(WATCHDOG) {
            Err(RecvTimeoutError::Disconnected) => {} // reader done
            Err(RecvTimeoutError::Timeout) => {
                panic!("inbound did not close after client disconnect (hang?)")
            }
            Ok(v) => panic!("unexpected inbound frame after disconnect: {v:?}"),
        }

        let mut transport = transport;
        transport.shutdown();
        join_transport_with_watchdog(transport, WATCHDOG);
    }

    /// R2.2/R2.3: with NO client connecting, the listener thread is parked in
    /// `accept()`. `shutdown()` must set the internal shutdown flag so the
    /// non-blocking accept poll loop observes it and `serve()` returns, letting
    /// the listener handle join within the bounded watchdog — proving a parked
    /// accept is interruptible (the root-cause of the port-leak reload bug).
    ///
    /// On the pre-2.2 code (blocking accept, flag not consulted) this hangs and
    /// the watchdog join fails.
    #[test]
    fn no_client_shutdown_unblocks_serve() {
        let mut transport =
            Transport::start(Some("127.0.0.1:0".parse().unwrap())).expect("bind must succeed");
        assert!(transport.local_addr().is_some(), "enabled transport binds");

        // No client ever connects → serve() is parked in the accept poll loop.
        // Setting the shutdown flag must wind it down.
        transport.shutdown();
        join_transport_with_watchdog(transport, WATCHDOG);
    }

    /// R2.5: with a client CONNECTED, the connected-state writer loop must honor
    /// the internal `shutdown` FLAG — not only an outbound-sender drop. Here we
    /// raise the flag WHILE keeping the outbound sender alive (and the inbound
    /// receiver held so the reader is not torn down by an owner-gone signal),
    /// then join the listener handle under the watchdog.
    ///
    /// On the pre-2.3 connected writer loop (`while let Ok(_) = out_rx.recv()`,
    /// flag not consulted, outbound still alive) the writer never returns, the
    /// reader is detached, and `serve()` hangs → the watchdog join FAILS. After
    /// 2.3 the writer breaks on the flag, `writer.shutdown(Both)` EOFs the reader,
    /// the reader is joined, and `serve()` returns within the watchdog.
    #[test]
    fn connected_writer_honors_shutdown_flag_while_outbound_alive() {
        let mut transport =
            Transport::start(Some("127.0.0.1:0".parse().unwrap())).expect("bind must succeed");
        let addr = transport.local_addr().expect("enabled transport binds");

        // Connect so serve() leaves the accept loop and enters the connected-state
        // bridge (reader sub-thread + writer loop). Keep the client alive so the
        // reader's blocking read does NOT see a peer EOF on its own — teardown
        // must come from the flag → writer.shutdown(Both), not from the client.
        let client = connect_client(addr).expect("client connect must succeed");
        client
            .set_read_timeout(Some(WATCHDOG))
            .expect("TEST-ONLY read timeout");

        // Give serve() a moment to accept and spin up the bridge before signaling.
        // (Bounded; the watchdog join below is the real timeout guard.)
        std::thread::sleep(Duration::from_millis(50));

        // Raise the FLAG ONLY — outbound sender stays alive. This is the signal a
        // pre-2.3 `while out_rx.recv()` writer loop ignores.
        transport.signal_shutdown_flag_only();

        // Join the listener handle under the watchdog WITHOUT dropping the outbound
        // sender (the whole point is that the flag alone tears the writer down).
        let handle = transport.handle.take().expect("enabled transport has a handle");
        let (done_tx, done_rx) = std::sync::mpsc::channel();
        std::thread::spawn(move || {
            let _ = done_tx.send(handle.join());
        });
        done_rx
            .recv_timeout(WATCHDOG)
            .expect("connected writer must break on the shutdown FLAG and serve() must return (R2.5)")
            .expect("listener thread must not panic");

        // Keep the client alive until after the join so its EOF could not have been
        // the thing that woke the reader.
        drop(client);
    }

    /// R2.5 (normal path): with a client still CONNECTED, `shutdown()` (which
    /// drops the outbound sender) must tear the connected bridge down
    /// synchronously — the writer breaks on the outbound disconnect and the
    /// reader is joined — so `serve()` returns within the watchdog even though
    /// the peer never closed its end. This is the everyday unload-while-connected
    /// case (R2.5); the flag-only test above isolates the OTHER teardown trigger.
    #[test]
    fn connected_shutdown_tears_down_synchronously_with_client_alive() {
        let transport =
            Transport::start(Some("127.0.0.1:0".parse().unwrap())).expect("bind must succeed");
        let addr = transport.local_addr().expect("enabled transport binds");

        // Connect and round-trip one frame each way so we are definitely in the
        // connected-state bridge, then leave the client ALIVE.
        let client = connect_client(addr).expect("client connect must succeed");
        client
            .set_read_timeout(Some(WATCHDOG))
            .expect("TEST-ONLY read timeout");
        let mut client_writer = client.try_clone().expect("clone client");
        let mut client_reader = BufReader::new(client.try_clone().expect("clone client"));

        write_frame(&mut client_writer, &json!({ "seq": 1, "command": "ping" }))
            .expect("client write");
        let delivered = transport
            .inbound()
            .recv_timeout(WATCHDOG)
            .expect("inbound frame delivered");
        assert_eq!(delivered["command"], json!("ping"));

        transport.send(json!({ "seq": 1, "type": "response" })).expect("send");
        let received = read_frame(&mut client_reader)
            .expect("client read")
            .expect("client frame");
        assert_eq!(received["type"], json!("response"));

        // Teardown via shutdown() (drops outbound) with the client STILL ALIVE.
        // The writer must break on the outbound disconnect and the reader must be
        // joined → serve returns within the watchdog.
        let mut transport = transport;
        transport.shutdown();
        let handle = transport.handle.take().expect("enabled transport has a handle");
        let (done_tx, done_rx) = std::sync::mpsc::channel();
        std::thread::spawn(move || {
            let _ = done_tx.send(handle.join());
        });
        done_rx
            .recv_timeout(WATCHDOG)
            .expect("connected shutdown() must tear down synchronously (R2.5)")
            .expect("listener thread must not panic");

        // Client kept alive until after the join, proving teardown did not depend
        // on a peer EOF.
        drop(client_writer);
        drop(client_reader);
        drop(client);
    }

    /// R2.4 (teardown 後の再 start 可能性, masking-aware): dropping a
    /// no-client `Transport` must SYNCHRONOUSLY join the serve listener thread
    /// so the bound port is released BEFORE `drop` returns. We prove this with a
    /// PLAIN `TcpListener::bind` (NO socket2, NO `SO_REUSEADDR`) on the very same
    /// port immediately after the drop: a plain rebind has no Windows
    /// `SO_REUSEADDR` hijack semantics, so it can ONLY succeed if the previous
    /// listener was truly gone (design Security Considerations — `SO_REUSEADDR`
    /// must not be allowed to MASK a still-held leaked listener).
    ///
    /// On the pre-2.4 code (`Drop` sets the flag and drops outbound but does NOT
    /// join the serve handle) `drop` returns while `serve()` may still hold the
    /// listener, so the immediate plain rebind fails with `AddrInUse`
    /// (`os error 10048`). After 2.4 `Drop` block-joins the interruptible
    /// `serve()`, so the rebind deterministically succeeds.
    #[test]
    fn drop_synchronously_frees_port_for_plain_rebind() {
        use std::net::TcpListener as PlainTcpListener;

        // Start enabled with an OS-assigned loopback port; capture the port P.
        let transport =
            Transport::start(Some("127.0.0.1:0".parse().unwrap())).expect("bind must succeed");
        let addr = transport
            .local_addr()
            .expect("enabled transport must expose its bound addr");
        let port = addr.port();
        assert_ne!(port, 0, "OS must assign a concrete port");

        // No client connects → serve() is parked in the interruptible accept
        // poll loop, holding the listener on port P. Drop the Transport DIRECTLY
        // (not via the watchdog helper that pre-takes/joins the handle): Drop
        // must set the flag AND synchronously join serve() before returning.
        drop(transport);

        // Immediately rebind the SAME port with a PLAIN listener — no socket2, no
        // SO_REUSEADDR. This must SUCCEED, proving the serve listener was joined
        // and the port freed before drop() returned. A plain bind deliberately
        // avoids the Windows SO_REUSEADDR hijack that could otherwise mask a
        // still-held listener (design Security Considerations / R2.4).
        let rebind = PlainTcpListener::bind(("127.0.0.1", port));
        assert!(
            rebind.is_ok(),
            "plain rebind of port {port} immediately after drop must succeed \
             (synchronous serve join freed the port); got: {:?}",
            rebind.err()
        );
    }

    /// R1.3/R2.4 (robustness, Transport unit level): a REPEATED teardown→rebind
    /// cycle on the SAME fixed port must succeed every time. The production reload
    /// bug is a SECOND (and Nth) bind onto the SAME port failing because the prior
    /// listener thread was detached and still held it; a single rebind
    /// (`drop_synchronously_frees_port_for_plain_rebind`) proves one cycle, but the
    /// reload contract (R1.3: "最低でも約15秒間隔で2回以上") is about MULTIPLE
    /// cycles holding deterministically. This drives the no-client teardown path
    /// through ~3 `Transport::start(Some(P)) → drop` cycles on one OS-assigned port
    /// and asserts each successive `start` on the SAME port succeeds.
    ///
    /// Mechanism under test: the synchronous Drop join (R2.4) makes each rebind
    /// deterministic — every cycle's listener thread is joined (port released)
    /// BEFORE `drop` returns, so the next `start` cannot race a still-held port.
    /// On detached/non-joining teardown the 2nd `start` would fail with `AddrInUse`
    /// (`os error 10048`); here `start` returning `DebugError::Bind` is the failure
    /// surface. Each cycle is bounded by the WATCHDOG so a hung teardown fails fast
    /// instead of stalling CI. We use a `socket2`-built `Transport::start` rebind
    /// (not a plain bind) deliberately — this exercises the REAL production rebind
    /// path repeatedly; the masking-aware plain-bind detector is the separate
    /// `drop_synchronously_frees_port_for_plain_rebind` test.
    #[test]
    fn repeated_teardown_rebind_same_port_succeeds() {
        // Capture an OS-assigned loopback port P via a throwaway transport, then
        // drop it (synchronous join frees P) so we can reuse the SAME fixed port.
        let port = {
            let throwaway = Transport::start(Some("127.0.0.1:0".parse().unwrap()))
                .expect("throwaway bind must succeed");
            let p = throwaway
                .local_addr()
                .expect("enabled transport must expose its bound addr")
                .port();
            assert_ne!(p, 0, "OS must assign a concrete port");
            drop(throwaway); // synchronous join releases P before the next start
            p
        };
        let fixed: SocketAddr = format!("127.0.0.1:{port}").parse().unwrap();

        // ~3 cycles of start(SAME port) → no client → bounded teardown. Each start
        // must succeed; a leaked/held listener from the previous cycle would make
        // the next start fail with DebugError::Bind (AddrInUse / 10048).
        for cycle in 0..3 {
            let mut transport = Transport::start(Some(fixed)).unwrap_or_else(|e| {
                panic!(
                    "cycle {cycle}: rebind of fixed port {port} must succeed \
                     (prior cycle's listener was synchronously joined); got {e:?}"
                )
            });
            assert_eq!(
                transport.local_addr().map(|a| a.port()),
                Some(port),
                "cycle {cycle}: must rebind the SAME fixed port",
            );

            // No client connects → serve() is parked in the interruptible accept
            // poll loop. Tear down via the bounded watchdog join so a regression
            // that fails to wind the listener down fails fast (no CI stall) and the
            // port is provably released before the next cycle's start.
            let handle = transport.handle.take();
            transport.shutdown.store(true, Ordering::Release);
            transport.outbound = None;
            if let Some(handle) = handle {
                let (done_tx, done_rx) = std::sync::mpsc::channel();
                std::thread::spawn(move || {
                    let _ = done_tx.send(handle.join());
                });
                done_rx
                    .recv_timeout(WATCHDOG)
                    .unwrap_or_else(|_| panic!(
                        "cycle {cycle}: listener must wind down within the watchdog \
                         (no hang) so the port is freed for the next rebind"
                    ))
                    .unwrap_or_else(|_| panic!("cycle {cycle}: listener thread must not panic"));
            }
            // `transport` is dropped here; the handle was already taken, so Drop's
            // join is a no-op (no double-join) — the port is already released.
        }
    }

    // -----------------------------------------------------------------------
    // Test helpers (TEST-ONLY bounded join; production has no timeout)
    // -----------------------------------------------------------------------

    /// Join the transport's listener thread bounded by a TEST-ONLY watchdog so a
    /// regression that hangs the thread fails the test instead of the suite.
    fn join_transport_with_watchdog(mut transport: Transport, timeout: Duration) {
        let handle = transport.handle.take();
        // Set the internal shutdown flag so a `serve` parked in the accept poll
        // loop (no client connected) winds down, and drop the outbound sender so
        // the writer unblocks.
        transport.shutdown.store(true, Ordering::Release);
        transport.outbound = None;
        if let Some(handle) = handle {
            let (done_tx, done_rx) = std::sync::mpsc::channel();
            std::thread::spawn(move || {
                let _ = done_tx.send(handle.join());
            });
            done_rx
                .recv_timeout(timeout)
                .expect("listener thread must wind down within the watchdog (no hang)")
                .expect("listener thread must not panic");
        }
    }
}