magnetar-runtime-moonpool 1.0.0

moonpool runtime engine for magnetar — deterministic-sim friendly. Custom rustls-over-bytepipe TLS adapter. No channels.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
1559
1560
1561
1562
1563
1564
1565
1566
1567
1568
1569
1570
1571
1572
1573
1574
1575
1576
1577
1578
1579
1580
1581
1582
1583
1584
1585
1586
1587
1588
1589
1590
1591
1592
1593
1594
1595
1596
1597
1598
1599
1600
1601
1602
1603
1604
1605
1606
1607
1608
1609
1610
1611
1612
1613
1614
1615
1616
1617
1618
1619
1620
1621
1622
1623
1624
1625
1626
1627
1628
1629
1630
1631
1632
1633
1634
1635
1636
1637
1638
1639
1640
1641
1642
1643
1644
1645
1646
1647
1648
1649
1650
1651
1652
1653
1654
1655
1656
1657
1658
1659
1660
1661
1662
1663
1664
1665
1666
1667
1668
1669
1670
1671
1672
1673
1674
1675
1676
1677
1678
1679
1680
1681
1682
1683
1684
1685
1686
1687
1688
1689
1690
1691
1692
1693
1694
1695
1696
1697
1698
1699
1700
1701
1702
1703
1704
1705
1706
1707
1708
1709
1710
1711
1712
1713
1714
1715
1716
1717
1718
1719
1720
1721
1722
1723
1724
1725
1726
1727
1728
1729
1730
1731
1732
1733
1734
1735
1736
1737
1738
1739
1740
1741
1742
1743
1744
1745
1746
1747
1748
1749
1750
1751
1752
1753
1754
1755
1756
1757
1758
1759
1760
1761
1762
1763
1764
1765
1766
1767
1768
1769
1770
1771
1772
1773
1774
1775
1776
1777
1778
1779
1780
1781
1782
1783
1784
1785
1786
1787
1788
1789
1790
1791
1792
1793
1794
1795
1796
1797
1798
1799
1800
1801
1802
1803
1804
1805
1806
1807
1808
1809
1810
1811
1812
1813
1814
1815
1816
1817
1818
1819
1820
1821
1822
1823
1824
1825
1826
1827
1828
1829
1830
1831
1832
1833
1834
1835
1836
1837
1838
1839
1840
1841
1842
1843
1844
1845
1846
1847
1848
1849
1850
1851
1852
1853
1854
1855
1856
1857
1858
1859
1860
1861
1862
1863
1864
1865
1866
1867
1868
1869
1870
1871
1872
1873
1874
1875
1876
1877
1878
1879
1880
1881
1882
1883
1884
1885
1886
1887
1888
1889
1890
1891
1892
1893
1894
1895
1896
1897
1898
1899
1900
1901
1902
1903
1904
1905
1906
1907
1908
1909
1910
1911
1912
1913
1914
1915
1916
1917
1918
1919
1920
1921
1922
1923
1924
1925
1926
1927
1928
1929
1930
1931
1932
1933
1934
1935
1936
1937
1938
1939
1940
1941
1942
1943
1944
1945
1946
1947
1948
1949
1950
1951
1952
1953
1954
1955
1956
1957
1958
1959
1960
1961
1962
1963
1964
1965
1966
1967
1968
1969
1970
1971
1972
1973
1974
1975
1976
1977
1978
1979
1980
1981
1982
1983
1984
1985
1986
1987
1988
1989
1990
1991
1992
1993
1994
1995
1996
1997
1998
1999
2000
2001
2002
2003
2004
2005
2006
2007
2008
2009
2010
2011
2012
2013
2014
2015
2016
2017
2018
2019
2020
2021
2022
2023
2024
2025
2026
2027
2028
2029
2030
2031
2032
2033
2034
2035
2036
2037
2038
2039
2040
2041
2042
2043
2044
2045
2046
2047
2048
2049
2050
2051
2052
2053
2054
2055
2056
2057
2058
2059
2060
2061
2062
2063
2064
2065
2066
2067
2068
2069
2070
2071
2072
2073
2074
2075
2076
2077
2078
2079
2080
2081
2082
2083
2084
2085
2086
2087
2088
2089
2090
2091
2092
2093
2094
2095
2096
2097
2098
2099
2100
2101
2102
2103
2104
2105
2106
2107
2108
2109
2110
2111
2112
2113
2114
2115
2116
2117
2118
2119
2120
2121
2122
2123
2124
2125
2126
2127
2128
2129
2130
2131
2132
2133
2134
2135
2136
2137
2138
2139
2140
2141
2142
2143
2144
2145
2146
2147
2148
2149
2150
2151
2152
2153
2154
2155
2156
2157
2158
2159
2160
2161
2162
2163
2164
2165
2166
2167
2168
2169
2170
2171
2172
2173
2174
2175
2176
2177
2178
2179
2180
2181
2182
2183
2184
2185
2186
2187
2188
2189
2190
2191
2192
2193
2194
2195
2196
2197
2198
2199
2200
2201
2202
2203
2204
2205
2206
2207
2208
2209
2210
2211
2212
2213
2214
2215
2216
// SPDX-License-Identifier: Apache-2.0

//! Producer façade for the moonpool engine.
//!
//! Mirrors [`magnetar_runtime_tokio::Producer`] but is generic over
//! [`moonpool_core::Providers`] so the same façade runs on production Tokio
//! sockets and on a `moonpool-sim` deterministic substrate.
//!
//! ## Public surface
//!
//! - [`Client::open_producer`] — `CommandProducer` round-trip.
//! - [`Producer::send`] / [`Producer::flush`] / [`Producer::close`].
//! - Introspection: [`Producer::topic`], [`Producer::name`], [`Producer::is_closed`],
//!   [`Producer::pending_count`], [`Producer::last_sequence_id`], [`Producer::stats`].
//!
//! ## No-channels invariant
//!
//! Futures here follow the same pattern as the tokio engine: park on the
//! sans-io [`Connection`]'s `Waker` slab via
//! [`Connection::register_waker`], plus a single
//! [`tokio::sync::Notify`] (`driver_waker`) used as a wake-up signal across
//! the protocol-level pending queue. No `mpsc` / `oneshot` / `watch` /
//! `broadcast` channels of any flavour. See `GUIDELINES.md`
//! §"No-channels rule".
//!
//! ## Compression
//!
//! The user-facing [`Producer`] stores the [`CompressionKind`] it was opened
//! with so the broker sees the same compression metadata the state machine
//! stamps. Compression is not yet wired on the moonpool engine; calling
//! [`Producer::send`] with anything other than [`CompressionKind::None`]
//! refuses synchronously with [`ClientError::Other`]. The tokio engine's
//! ordering (compression → encryption → state machine) is mirrored so the
//! codec swap will be a drop-in once it lands.
//!
//! [`Connection`]: magnetar_proto::Connection
//! [`Connection::register_waker`]: magnetar_proto::Connection::register_waker

use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};

use magnetar_proto::producer::OutgoingMessage;
use magnetar_proto::types::CompressionKind;
use magnetar_proto::{
    ConnectionEvent, CreateProducerRequest, MessageId, OpOutcome, PendingOpKey, ProducerHandle,
    ProducerStats, SequenceId,
};
use moonpool_core::Providers;

use crate::ConnectionShared;
use crate::client::{Client, ClientError};
use crate::crypto::MessageEncryptor;

/// User-facing producer handle, moonpool engine flavour.
///
/// Holds an [`Arc<ConnectionShared>`] plus a [`magnetar_proto::ProducerHandle`]
/// — cheap to clone (Arc bump). Caller-facing futures park on the sans-io
/// state machine's `Waker` slab, never on channels.
///
/// # Lock-ordering (ADR-0038)
///
/// Identity reads (topic, access mode, handle) go through `slot.identity`
/// without locking. State-machine reads take only the per-slot mutex via
/// `slot.state.lock()`. Operations that mutate the connection-wide state
/// (`send`, `flush`, `close`, …) take `shared.inner.lock()`. Acquisition
/// order: **global → per-slot, never the reverse**.
pub struct Producer<P: Providers> {
    pub(crate) shared: Arc<ConnectionShared>,
    pub(crate) handle: ProducerHandle,
    /// Direct handle to this producer's per-slot state, cloned from the
    /// Connection's registry at create time.
    pub(crate) slot: Arc<magnetar_proto::ProducerSlot>,
    pub(crate) compression: CompressionKind,
    /// Optional PIP-4 encryption hook. When present, the producer encrypts every
    /// outbound payload after compression but before handing it to the sans-io
    /// layer. 1:1 mirror of `magnetar_runtime_tokio::Producer::encryptor`.
    pub(crate) encryptor: Option<Arc<dyn MessageEncryptor>>,
    /// Last-clone close guard. `Producer` is cheap-clone, so the broker-side
    /// best-effort close must fire exactly once — when the **last** clone
    /// drops. See [`ProducerCloseGuard`]. 1:1 mirror of
    /// `magnetar_runtime_tokio::Producer::close_guard`.
    pub(crate) close_guard: Arc<ProducerCloseGuard>,
    /// Held only so `Producer` is generic over `P` without leaking the
    /// driver-handle type parameter. The driver itself has already consumed
    /// the providers.
    pub(crate) _providers: std::marker::PhantomData<fn() -> P>,
}

/// RAII guard arming a best-effort `CommandCloseProducer` on last-clone drop
/// (ADR-0057).
///
/// Every [`Producer`] clone shares one guard behind an `Arc`; the `Drop`
/// below therefore runs exactly once, when the last clone goes away.
/// Without it, dropping a producer without an explicit [`Producer::close`]
/// leaks the broker-side registration for as long as the shared TCP
/// connection stays open — recreating a producer with the same
/// user-provided name then fails forever with `NamingException`
/// (broker error code 16).
///
/// The explicit-close path stays the reliable one: [`Producer::close`]
/// awaits the broker ack. This guard fires
/// [`magnetar_proto::Connection::close_producer_forget`] — encode the frame
/// and wake the driver, never await. The proto layer consumes the broker
/// ack in-place (no orphaned `OpOutcome` entry) and surfaces a rejection as
/// a `warn!`.
///
/// Dedup is best-effort, not a hard invariant: the slot's `closed` flag
/// (set synchronously by `Connection::close_producer`) dedups a *preceding
/// completed* client-initiated close as observed here. It does NOT cover
/// broker-initiated detach — `handle_close_producer` deliberately keeps
/// `closed = false` so `rebuild_producers` can re-attach on PIP-188
/// migration / failover — and the check+act below is non-atomic against a
/// concurrent `close()` on another clone. Both residual cases emit one
/// redundant `CloseProducer` frame, which the broker tolerates. 1:1 mirror
/// of `magnetar_runtime_tokio::producer::ProducerCloseGuard`.
#[derive(Debug)]
pub(crate) struct ProducerCloseGuard {
    shared: Arc<ConnectionShared>,
    handle: ProducerHandle,
    slot: Arc<magnetar_proto::ProducerSlot>,
}

impl Drop for ProducerCloseGuard {
    fn drop(&mut self) {
        // ADR-0038 lock order: the per-slot probe drops its guard before the
        // global Connection mutex is taken (sequential, never nested).
        let already_closed = self.slot.state.lock().closed;
        if already_closed {
            return;
        }
        {
            let mut conn = self.shared.inner.lock();
            let _ = conn.close_producer_forget(self.handle);
        }
        self.shared.driver_waker.notify_one();
        tracing::debug!(
            topic = %self.slot.identity.topic,
            handle = ?self.handle,
            "producer dropped without explicit close — best-effort CloseProducer enqueued"
        );
    }
}

impl<P: Providers> Clone for Producer<P> {
    fn clone(&self) -> Self {
        Self {
            shared: self.shared.clone(),
            handle: self.handle,
            slot: self.slot.clone(),
            compression: self.compression,
            encryptor: self.encryptor.clone(),
            close_guard: self.close_guard.clone(),
            _providers: std::marker::PhantomData,
        }
    }
}

impl<P: Providers> std::fmt::Debug for Producer<P> {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("Producer")
            .field("handle", &self.handle)
            .field("compression", &self.compression)
            .finish_non_exhaustive()
    }
}

impl<P: Providers> Producer<P> {
    /// Assemble a producer handle and arm its last-clone close guard.
    ///
    /// Single construction point — keeps the [`ProducerCloseGuard`] wiring
    /// in one place for every producer the engine hands out. 1:1 mirror of
    /// `magnetar_runtime_tokio::Producer::assemble`.
    pub(crate) fn assemble(
        shared: Arc<ConnectionShared>,
        handle: ProducerHandle,
        slot: Arc<magnetar_proto::ProducerSlot>,
        compression: CompressionKind,
        encryptor: Option<Arc<dyn MessageEncryptor>>,
    ) -> Self {
        let close_guard = Arc::new(ProducerCloseGuard {
            shared: shared.clone(),
            handle,
            slot: slot.clone(),
        });
        Self {
            shared,
            handle,
            slot,
            compression,
            encryptor,
            close_guard,
            _providers: std::marker::PhantomData,
        }
    }

    /// The protocol-layer producer handle this façade wraps.
    #[must_use]
    pub fn handle(&self) -> ProducerHandle {
        self.handle
    }

    /// Compression codec this producer was opened with. Mirrors Java
    /// `ProducerImpl#conf.getCompressionType()`. Returns
    /// [`CompressionKind::None`] when the producer was opened without
    /// explicit compression.
    #[must_use]
    pub fn compression(&self) -> CompressionKind {
        self.compression
    }

    /// Topic name this producer is bound to. Returns an empty string if the
    /// producer is no longer registered (closed).
    ///
    /// Identity-only read — takes no lock (ADR-0038).
    #[must_use]
    pub fn topic(&self) -> String {
        self.slot.identity.topic.clone()
    }

    /// Broker-assigned producer name. Returns an empty string until the
    /// broker assigns one (typically right after the `ProducerSuccess`
    /// round-trip) or if the producer is no longer registered.
    ///
    /// Per-slot read — does NOT take the global Connection mutex.
    #[must_use]
    pub fn name(&self) -> String {
        self.slot.state.lock().name.clone().unwrap_or_default()
    }

    /// `true` if this producer has been closed (locally via
    /// [`Self::close`] or remotely via a broker `CloseProducer`). Mirrors
    /// Java `ProducerImpl#getState() == CLOSED`.
    ///
    /// Per-slot read — does NOT take the global Connection mutex.
    #[must_use]
    pub fn is_closed(&self) -> bool {
        self.slot.state.lock().closed
    }

    /// `true` while the broker connection is up. Mirrors Java
    /// `Producer#isConnected`.
    #[must_use]
    pub fn is_connected(&self) -> bool {
        self.shared.inner.lock().is_connected()
    }

    /// Wall-clock timestamp of the last broker disconnection
    /// observed by this connection, or `None` if no disconnect has
    /// happened yet. Mirrors Java
    /// `Producer#getLastDisconnectedTimestamp`.
    #[must_use]
    pub fn last_disconnected_timestamp(&self) -> Option<std::time::SystemTime> {
        self.shared.inner.lock().last_disconnected_timestamp()
    }

    /// Number of in-flight sends (queued and not yet acked by the broker).
    /// Mirrors the un-batched view of Java
    /// `ProducerStats#getPendingQueueSize`.
    ///
    /// Per-slot read — does NOT take the global Connection mutex.
    #[must_use]
    pub fn pending_count(&self) -> usize {
        self.slot.state.lock().pending.len()
    }

    /// Last sequence id this client has pushed onto the wire. Returns `-1`
    /// if the producer has never sent. Mirrors
    /// `org.apache.pulsar.client.api.Producer#getLastSequenceId`.
    ///
    /// Per-slot read — does NOT take the global Connection mutex.
    #[must_use]
    pub fn last_sequence_id(&self) -> i64 {
        self.slot.state.lock().last_sequence_id_pushed
    }

    /// Last sequence id the broker has acknowledged via
    /// `CommandSendReceipt`. Returns `-1` if no sends have been acked
    /// yet. Mirrors `org.apache.pulsar.client.api.Producer#getLastSequenceIdPublished`.
    /// Useful for resume-from-checkpoint flows.
    ///
    /// Per-slot read — does NOT take the global Connection mutex.
    #[must_use]
    pub fn last_sequence_id_published(&self) -> i64 {
        self.slot.state.lock().last_sequence_id_published
    }

    /// Number of messages currently buffered in the batch container,
    /// waiting for the next flush cycle. Returns `0` when batching is
    /// disabled or the batch is empty. Mirrors the tokio runtime's
    /// `Producer::batch_len`.
    ///
    /// Per-slot read — does NOT take the global Connection mutex.
    #[must_use]
    pub fn batch_len(&self) -> usize {
        self.slot.state.lock().batch.len()
    }

    /// Sum of payload bytes currently buffered in the batch container.
    /// Mirrors the tokio runtime's `Producer::batch_bytes`.
    ///
    /// Per-slot read — does NOT take the global Connection mutex.
    #[must_use]
    pub fn batch_bytes(&self) -> usize {
        self.slot.state.lock().batch.current_size_bytes
    }

    /// Snapshot of this producer's cumulative counters. Mirrors Java
    /// `org.apache.pulsar.client.api.Producer#getStats`. Returns a zeroed
    /// snapshot if the producer handle is no longer registered (closed).
    ///
    /// Per-slot read — does NOT take the global Connection mutex.
    #[must_use]
    pub fn stats(&self) -> ProducerStats {
        self.slot.state.lock().stats()
    }

    /// Enqueue a send. The returned future resolves when the broker
    /// acknowledges the publish (a `CommandSendReceipt`) or rejects it (a
    /// `CommandSendError`).
    ///
    /// # Errors
    ///
    /// - [`ClientError::Other`] if compression is requested but no codec is wired into the moonpool
    ///   engine yet.
    /// - [`ClientError::Other`] wrapping a [`magnetar_proto::ProtocolError`] if the state machine
    ///   rejects the send (e.g. closed producer, unknown handle).
    /// - [`ClientError::Broker`] if the broker subsequently rejects the publish.
    pub fn send(&self, mut msg: OutgoingMessage) -> SendFut {
        // ADR-0011 — invariant #3. Read the engine-provided wall clock
        // (moonpool TimeProvider-driven atomic) instead of the host
        // SystemTime so deterministic-simulation runs reproduce
        // bit-for-bit on the `publish_time` field. Mirrors the tokio
        // engine's snapshot semantics — same callsite, different
        // backing clock.
        let publish_time_ms = self.shared.now_wall_clock_ms();

        // The moonpool engine does not yet ship a compression codec stack.
        // The state machine still stamps `metadata.compression` based on the
        // configured `CompressionKind`; until the runtime codec lands, we
        // refuse non-`None` codecs so the broker never sees mis-labelled
        // bytes. Mirrors the tokio engine's ordering — compression goes
        // first, before the sans-io enqueue.
        if self.compression != CompressionKind::None {
            // Pre-enqueue rejection — expected anomaly surfaced as `Err` to
            // the caller, so `debug!` per ADR-0054 §2.1.
            tracing::debug!(
                compression = ?self.compression,
                "send rejected: compression not yet wired on the moonpool engine"
            );
            return SendFut {
                shared: self.shared.clone(),
                handle: self.handle,
                state: SendState::Failed {
                    error: Some(ClientError::Other(format!(
                        "moonpool engine: compression {:?} not yet wired; \
                         use CompressionKind::None for now",
                        self.compression
                    ))),
                },
                reserved_bytes: 0,
            };
        }

        // Encrypt the (post-compression) payload if a PIP-4 encryptor is wired. 1:1 mirror of
        // the tokio engine's ordering (`magnetar_runtime_tokio::Producer::send`,
        // `ProducerImpl.java:986-1003`): compression first (refused above on moonpool until
        // codecs land), encryption second, so the broker sees ciphertext and the consumer
        // reverses the order on receive. Encryption failure surfaces synchronously as a
        // `SendFut` that resolves to `ClientError::Other` on the first poll.
        if let Some(encryptor) = self.encryptor.as_ref() {
            match encryptor.encrypt(&msg.payload, &mut msg.metadata) {
                Ok(ciphertext) => msg.payload = ciphertext,
                Err(err) => {
                    // Pre-enqueue rejection — `debug!` per ADR-0054 §2.1.
                    // Payload and key material are never logged.
                    tracing::debug!(error = %err, "send rejected: encryption failed");
                    return SendFut {
                        shared: self.shared.clone(),
                        handle: self.handle,
                        state: SendState::Failed {
                            error: Some(ClientError::Other(format!("encrypt: {err}"))),
                        },
                        reserved_bytes: 0,
                    };
                }
            }
        }

        // Reserve memory against the configured global budget BEFORE
        // handing the payload to the sans-io state machine. Mirrors Java's
        // `MemoryLimitController.reserveMemory(...)`. Two policies (Java
        // parity, see ADR-0017 and ADR-0020):
        //  - `FailImmediately`: try the CAS once; an overflow surfaces synchronously as
        //    `EngineError::MemoryLimitExceeded` wrapped in `ClientError::Engine`.
        //  - `ProducerBlock`: park the send on the runtime's Waker slab until enough budget frees
        //    up; the `Reserving` variant of `SendState` re-attempts the CAS on every poll.
        // `try_reserve_memory` is a no-op when `memory_limit_bytes = 0`
        // (the default). The fairness contract under
        // `moonpool_core::SimProviders` is documented in ADR-0022.
        let reserved_bytes = msg.payload.len() as u64;
        match self.shared.memory_limit_policy {
            magnetar_proto::MemoryLimitPolicy::FailImmediately => {
                if let Err(err) = self.shared.try_reserve_memory(reserved_bytes) {
                    // Caller-visible rejection whose rate scales with send
                    // throughput under overload — `debug!` per ADR-0054
                    // §2.1 (never `warn!` on a per-message path).
                    tracing::debug!(
                        payload_len = reserved_bytes,
                        "send rejected: memory limit exceeded"
                    );
                    return SendFut {
                        shared: self.shared.clone(),
                        handle: self.handle,
                        state: SendState::Failed {
                            error: Some(ClientError::Engine(err)),
                        },
                        reserved_bytes: 0,
                    };
                }
                self.queue_send(msg, publish_time_ms, reserved_bytes)
            }
            magnetar_proto::MemoryLimitPolicy::ProducerBlock => {
                // Fast path: budget has room right now. The slow path
                // inside `Reserving` takes over otherwise; we don't
                // synchronously park here so callers that never `.await`
                // (e.g. `Pin::poll` from a custom executor) still get a
                // future they can drive.
                if self.shared.try_reserve_memory(reserved_bytes).is_ok() {
                    return self.queue_send(msg, publish_time_ms, reserved_bytes);
                }
                SendFut {
                    shared: self.shared.clone(),
                    handle: self.handle,
                    state: SendState::Reserving {
                        msg: Some(Box::new(msg)),
                        publish_time_ms,
                        bytes: reserved_bytes,
                        slab_key: None,
                    },
                    // `Reserving` owns the reservation lifecycle itself:
                    // it only transitions to `Pending` AFTER a successful
                    // CAS, at which point it copies `bytes` into the
                    // outer `reserved_bytes`. Until then there is no
                    // reservation outstanding.
                    reserved_bytes: 0,
                }
            }
        }
    }

    /// PIP-180 / ADR-0033: replicator-style send that propagates a
    /// source-topic `MessageId` on the wire (`CommandSend.message_id`).
    /// 1:1 mirror of `magnetar_runtime_tokio::Producer::send_with_source_message_id`.
    /// Used by producers writing to a shadow topic to preserve the
    /// source-topic id chain.
    ///
    /// Bypasses batching by design — mirrors Java
    /// `org.apache.pulsar.broker.service.persistent.Replicator`. The broker
    /// echoes the asserted source id back on the resulting
    /// `CommandSendReceipt`, so the returned [`SendFut`] resolves to a
    /// [`MessageId`](magnetar_proto::MessageId) structurally equal to
    /// `source_msg_id`.
    pub fn send_with_source_message_id(
        &self,
        source_msg_id: magnetar_proto::MessageId,
        payload: impl Into<bytes::Bytes>,
        metadata: magnetar_proto::pb::MessageMetadata,
    ) -> SendFut {
        let payload = payload.into();
        let uncompressed_size = u32::try_from(payload.len()).unwrap_or(u32::MAX);
        self.send(OutgoingMessage {
            payload,
            metadata,
            uncompressed_size,
            num_messages: 1,
            txn_id: None,
            source_message_id: Some(source_msg_id),
        })
    }

    /// Hand the (compressed/encrypted) message to the sans-io state
    /// machine. Assumes the `reserved_bytes` reservation has already been
    /// taken; releases it on synchronous failure so the budget reflects
    /// only actually-in-flight bytes. Mirrors the tokio engine's helper of
    /// the same name.
    ///
    /// ADR-0038 Phase 3 hot path: takes only the per-slot mutex via
    /// [`magnetar_proto::ProducerSlot::queue_send`] — does NOT acquire the
    /// global Connection mutex. The moonpool driver merges per-slot staged
    /// frames into the connection-wide outbound buffer on its next tick.
    fn queue_send(
        &self,
        msg: OutgoingMessage,
        publish_time_ms: u64,
        reserved_bytes: u64,
    ) -> SendFut {
        // Precondition (ADR-0038): the per-slot Arc this `Producer` was built
        // with must denote the same producer as `self.handle`. The hot path
        // routes the send through `self.slot` (per-slot lock only) while the
        // eventual `SendFut` correlates the receipt by `self.handle`; a
        // mismatch would silently queue against the wrong slot. Identity read
        // takes no lock, so this cannot self-deadlock. 1:1 with the tokio
        // engine's `queue_send` precondition.
        debug_assert_eq!(
            self.slot.identity.handle, self.handle,
            "producer slot/handle mismatch: slot is for {:?} but handle is {:?}",
            self.slot.identity.handle, self.handle,
        );

        // ADR-0011 — invariant #3. The proto state machine's monotonic
        // input flows through the engine-supplied clock provider: under
        // production TokioProviders it tracks the host clock; under
        // SimProviders it advances only as the simulator ticks. Mirrors
        // the tokio engine which captures `Instant::now` directly.
        let now = self.shared.now_instant();
        let result = self.slot.queue_send(msg, publish_time_ms, now);

        // Wake the driver so it can drain the freshly-queued frame.
        self.shared.driver_waker.notify_one();

        match result {
            Ok(seq) => {
                // NOTE: no cross-lock postcondition assert here (1:1 with the
                // tokio engine). The returned seq is computed under the
                // per-slot guard INSIDE `ProducerSlot::queue_send`; re-locking
                // afterwards to compare against `last_sequence_id_pushed`
                // raced the driver's reset/replay machinery during supervised
                // reconnects and panicked debug builds on a legal schedule.
                // The contract is pinned in the proto unit tests, under a
                // single guard.
                // ADR-0054 hot-path record: no lock is held here (the
                // per-slot guard inside `ProducerSlot::queue_send` has been
                // released), two integer fields, and the disabled-level cost
                // is a cached callsite check (ADR-0038 stays intact).
                tracing::trace!(
                    sequence_id = seq.0,
                    payload_len = reserved_bytes,
                    "send queued"
                );
                SendFut {
                    shared: self.shared.clone(),
                    handle: self.handle,
                    state: SendState::Pending { sequence_id: seq },
                    reserved_bytes,
                }
            }
            Err(err) => {
                // The state machine rejected the send (e.g. producer not
                // yet open). Release the reservation so the budget
                // reflects only actually-in-flight bytes.
                self.shared.release_memory(reserved_bytes);
                // Expected anomaly surfaced as `Err` to the caller —
                // `debug!` per ADR-0054 §2.1.
                tracing::debug!(error = %err, "send rejected by producer state machine");
                // ADR-0059: `fail_all_pending` flips the
                // per-slot `closed` flag on a terminal drop, so a send issued
                // AFTER a plain connection went terminal fast-fails here. The
                // proto-layer `ProducerSlot::queue_send` collapses the inner
                // `ProducerError::Closed` into an opaque
                // `ProtocolError::InvariantViolation`, so we cannot pattern
                // match the cause — but the `no_driver` latch IS the cause
                // signal: it is set ONLY on a terminal drop (the same event
                // that closed the slot). When it is set, map the rejection to
                // `PeerClosed` (the terminal-outcome category); otherwise the
                // rejection is a genuine protocol-state error and keeps the
                // generic mapping. A user-initiated `producer.close()` consumes
                // the `Producer`, so no further `send` reaches this arm on that
                // path. 1:1 with the tokio engine.
                let error = if self
                    .shared
                    .no_driver
                    .load(std::sync::atomic::Ordering::SeqCst)
                {
                    ClientError::PeerClosed
                } else {
                    ClientError::Other(format!("send: {err}"))
                };
                SendFut {
                    shared: self.shared.clone(),
                    handle: self.handle,
                    state: SendState::Failed { error: Some(error) },
                    reserved_bytes: 0,
                }
            }
        }
    }

    /// Flush this producer: force any pending batch to flush and wait for
    /// every in-flight send to be acknowledged by the broker. Idempotent —
    /// calling `flush()` on a quiescent producer returns immediately.
    ///
    /// Mirrors `org.apache.pulsar.client.api.Producer#flushAsync`. Use
    /// before `close()` if you want at-least-once semantics on the trailing
    /// sends.
    ///
    /// # Errors
    ///
    /// Currently infallible. The signature returns
    /// `Result<(), ClientError>` for parity with the tokio engine and so
    /// future drop-detection / disconnect-detection can surface errors
    /// without a breaking change.
    pub async fn flush(&self) -> Result<(), ClientError> {
        // ADR-0011 — invariant #3. Both clocks routed through the
        // engine's providers (host clock under TokioProviders, virtual
        // clock under SimProviders).
        let publish_time_ms = self.shared.now_wall_clock_ms();
        {
            let now = self.shared.now_instant();
            let mut conn = self.shared.inner.lock();
            conn.flush_producer(self.handle, publish_time_ms, now);
        }
        self.shared.driver_waker.notify_one();

        // Drain by waiting on the driver waker until the producer's pending
        // queue is empty. Each `CommandSendReceipt` decrements the pending
        // count inside the sans-io layer; the per-send `Waker`s registered
        // by [`SendFut`] wake their owners directly, and any user code
        // calling `flush` repolls the count after every `driver_waker`
        // notification. The notify cell is set by user-facing futures
        // (`send`, `close_producer`); the driver itself sets it on every
        // loop tick.
        loop {
            let pending = self.shared.inner.lock().producer_pending_count(self.handle);
            if pending == 0 {
                return Ok(());
            }
            let notified = self.shared.driver_waker.notified();
            tokio::pin!(notified);
            notified.as_mut().enable();
            notified.await;
        }
    }

    /// Close this producer. The returned future resolves when the broker
    /// acknowledges the close.
    ///
    /// # Errors
    ///
    /// - [`ClientError::Broker`] if the broker returns an error correlated with the close.
    /// - [`ClientError::Other`] if an unexpected outcome arrives on the close request id.
    pub async fn close(self) -> Result<(), ClientError> {
        // ADR-0059: a `producer.close()` issued after a plain
        // connection has gone terminal with no driver would register a
        // `CommandCloseProducer` request that never resolves (no driver left).
        // Fast-fail synchronously with `PeerClosed` instead. The guard fires
        // only when `is_closed()` AND `no_driver`, so a supervised connection
        // mid reconnect still closes the producer transparently. 1:1 with the
        // tokio engine.
        self.shared.fail_if_no_driver()?;
        // Snapshot identity for the lifecycle record before the round-trip.
        let topic = self.slot.identity.topic.clone();
        let producer_name = self.slot.state.lock().name.clone().unwrap_or_default();
        let request_id = {
            let mut conn = self.shared.inner.lock();
            conn.close_producer(self.handle)
        };
        self.shared.driver_waker.notify_one();
        let outcome = RequestFut {
            shared: self.shared.clone(),
            key: PendingOpKey::Request(request_id),
        }
        .await;
        match outcome {
            OpOutcome::Success { .. } => {
                // Lifecycle record (ADR-0054).
                tracing::info!(
                    topic = %topic,
                    producer_name = %producer_name,
                    handle = ?self.handle,
                    access_mode = ?self.slot.identity.access_mode,
                    "producer closed"
                );
                Ok(())
            }
            OpOutcome::Error { code, message, .. } => Err(ClientError::Broker { code, message }),
            OpOutcome::Terminal { .. } => Err(ClientError::PeerClosed),
            other => Err(ClientError::Other(format!(
                "unexpected outcome for close request {request_id}: {other:?}"
            ))),
        }
    }

    /// Look up the broker-registered schema for the producer's topic
    /// (PIP-87). Mirrors Java
    /// `PulsarClientImpl#getSchema(TopicName, Optional<byte[]>)`. Used
    /// by `magnetar_proto::schema::AutoProduceBytesSchema` to warm its
    /// cache on first send.
    ///
    /// `version = None` asks for the current schema; pass
    /// `Some(schema_version_bytes)` to re-resolve a historical schema.
    ///
    /// # Errors
    /// - [`ClientError::Broker`] when the broker rejects the lookup.
    /// - [`ClientError::Other`] when the producer handle is no longer registered or an unexpected
    ///   outcome arrives.
    pub async fn get_schema(
        &self,
        version: Option<bytes::Bytes>,
    ) -> Result<magnetar_proto::pb::Schema, ClientError> {
        let topic = self
            .shared
            .inner
            .lock()
            .producer_topic(self.handle)
            .map(str::to_owned)
            .ok_or_else(|| {
                ClientError::Other(format!(
                    "get_schema: producer handle {:?} is no longer registered",
                    self.handle
                ))
            })?;
        // Per-operation internals — `debug!` per ADR-0054 §2.1.
        tracing::debug!(topic = %topic, "schema lookup");
        let request_id = {
            let mut conn = self.shared.inner.lock();
            conn.get_schema(&topic, version)
        };
        self.shared.driver_waker.notify_one();
        let outcome = RequestFut {
            shared: self.shared.clone(),
            key: PendingOpKey::Request(request_id),
        }
        .await;
        match outcome {
            OpOutcome::GetSchemaResponse { result, .. } => match result {
                Ok((schema, _version)) => Ok(schema),
                Err((code, message)) => Err(ClientError::Broker { code, message }),
            },
            OpOutcome::Error { code, message, .. } => Err(ClientError::Broker { code, message }),
            OpOutcome::Terminal { .. } => Err(ClientError::PeerClosed),
            other => Err(ClientError::Other(format!(
                "unexpected get_schema outcome: {other:?}"
            ))),
        }
    }
}

impl<P: Providers + Send + Sync> Client<P> {
    /// Open a producer.
    ///
    /// Returns once the broker has sent `CommandProducerSuccess`.
    ///
    /// # Errors
    ///
    /// - [`ClientError::Closed`] if the broker closes the producer before it becomes ready (or
    ///   while we wait for the success ack).
    /// - [`ClientError::Other`] if the connection drops mid-open.
    pub async fn open_producer(
        &self,
        req: CreateProducerRequest,
    ) -> Result<Producer<P>, ClientError> {
        self.open_producer_with(req, None).await
    }

    /// Same as [`Self::open_producer`] but with an optional PIP-4 encryption hook.
    /// 1:1 mirror of `magnetar_runtime_tokio::Client::open_producer_with`.
    ///
    /// # Errors
    ///
    /// - [`ClientError::Closed`] if the broker closes the producer before it becomes ready.
    /// - [`ClientError::Other`] if the connection drops mid-open.
    pub async fn open_producer_with(
        &self,
        req: CreateProducerRequest,
        encryptor: Option<Arc<dyn MessageEncryptor>>,
    ) -> Result<Producer<P>, ClientError> {
        let compression = req.compression;
        // Pulsar requires a `CommandLookupTopic` round-trip before opening a producer or
        // consumer: lookup is what triggers the broker to acquire ownership of the topic's
        // namespace bundle. Skipping it works only when the bundle has already been activated
        // by some prior operation; a fresh broker rejects `CommandProducer` with
        // `ServerError::ServiceNotReady` ("not served by this instance, please redo the
        // lookup"). Mirrors `magnetar-runtime-tokio`'s `Client::open_producer_with` and Java's
        // `PulsarClientImpl#createProducerAsync`.
        //
        // ADR-0039 routing: detect `proxy_through_service_url = true` via
        // [`Client::lookup_topic_target`], then dispatch via [`Client::resolve_target`].
        // On a client built via `connect_plain_supervised`, the `Proxy` branch routes
        // through the per-broker connection pool (`crate::pool`); on `connect_plain` /
        // `from_parts` clients the pool is `None` and the branch surfaces
        // `ProxyUnsupportedOnUnsupervisedClient`.
        let (target, landed_on) = self.lookup_topic_target(&req.topic).await?;
        let target_shared = self.resolve_target(&target, &landed_on, &req.topic).await?;
        // ADR-0059: the resolved data-plane connection may be
        // a pool entry distinct from the bootstrap; fast-fail if it has already
        // gone terminal with no driver, before registering a doomed
        // `CommandProducer`. 1:1 with the tokio engine.
        target_shared.fail_if_no_driver()?;
        let (handle, slot) = {
            let mut conn = target_shared.inner.lock();
            let handle = conn.create_producer(req);
            let slot = conn
                .producer(handle)
                .cloned()
                .expect("just-created producer slot must exist");
            (handle, slot)
        };
        target_shared.driver_waker.notify_one();
        wait_producer_ready(&target_shared, handle).await?;
        // Lifecycle record (ADR-0054): the broker-assigned producer name is
        // available once `ProducerReady` has landed. Per-slot read only.
        let producer_name = slot.state.lock().name.clone().unwrap_or_default();
        tracing::info!(
            topic = %slot.identity.topic,
            producer_name = %producer_name,
            handle = ?handle,
            access_mode = ?slot.identity.access_mode,
            "producer created"
        );
        Ok(Producer::assemble(
            target_shared,
            handle,
            slot,
            compression,
            encryptor,
        ))
    }
}

/// Future returned by [`Producer::send`].
///
/// Polls until the matching [`OpOutcome::SendReceipt`] /
/// [`OpOutcome::SendError`] lands inside the sans-io state machine. NO
/// channel.
///
/// Holds the memory-budget reservation taken in [`Producer::send`] and
/// releases it on completion (success OR error) or on `Drop`. Mirrors Java
/// `MemoryLimitController.releaseMemory(...)`. Both policies are
/// supported: `FailImmediately` surfaces an
/// [`EngineError::MemoryLimitExceeded`] on overflow, while
/// `ProducerBlock` parks the future on
/// [`ConnectionShared::memory_wakers`] until budget frees up. See
/// [ADR-0020](https://github.com/CleverCloud/magnetar/blob/main/specs/adr/0020-memory-limit-producer-block.md)
/// for the tokio mechanism and
/// [ADR-0022](https://github.com/CleverCloud/magnetar/blob/main/specs/adr/0022-memory-limit-producer-block-moonpool.md)
/// for the moonpool-specific fairness contract under
/// [`moonpool_core::Providers`].
///
/// [`EngineError::MemoryLimitExceeded`]: crate::EngineError::MemoryLimitExceeded
/// [`ConnectionShared::memory_wakers`]: crate::ConnectionShared::memory_wakers
#[derive(Debug)]
pub struct SendFut {
    shared: Arc<ConnectionShared>,
    handle: ProducerHandle,
    state: SendState,
    /// Bytes reserved against [`ConnectionShared::memory_limit_bytes`] for
    /// this send. Released exactly once when the future returns
    /// `Poll::Ready` or is dropped (whichever comes first). `0` when no
    /// reservation was taken (the budget is unlimited, or the send failed
    /// synchronously before reserving).
    reserved_bytes: u64,
}

#[derive(Debug)]
enum SendState {
    Pending {
        sequence_id: SequenceId,
    },
    /// `send()` returned an error synchronously (e.g. producer not yet
    /// open, compression not wired). We surface it on the first `poll`.
    Failed {
        error: Option<ClientError>,
    },
    /// `MemoryLimitPolicy::ProducerBlock` saw the budget full on the
    /// synchronous fast path. Each `poll` retries the CAS via
    /// `try_reserve_memory_or_register`; on success the state transitions
    /// to `Pending`; on failure the waker is parked in the runtime's slab
    /// and dispatched when capacity frees up. `msg` is boxed so this
    /// variant doesn't dominate the `SendState` discriminant size.
    Reserving {
        msg: Option<Box<OutgoingMessage>>,
        publish_time_ms: u64,
        bytes: u64,
        slab_key: Option<usize>,
    },
}

impl Drop for SendFut {
    fn drop(&mut self) {
        // The future may be dropped before completion (caller cancelled
        // the send). Release the reservation so the budget doesn't
        // permanently leak. Note: if `poll` already released and zeroed
        // `reserved_bytes` on `Poll::Ready`, this branch is a no-op.
        if self.reserved_bytes > 0 {
            self.shared.release_memory(self.reserved_bytes);
            self.reserved_bytes = 0;
        }
        // If dropped while parked on the budget waker slab, evict the
        // slot so a later `release_memory` doesn't try to wake a dead
        // future.
        if let SendState::Reserving {
            slab_key: Some(key),
            ..
        } = &self.state
        {
            self.shared.cancel_memory_waker(*key);
        }
    }
}

impl Future for SendFut {
    type Output = Result<MessageId, ClientError>;

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        // Snapshot fields before borrowing `state` mutably to keep the
        // borrow checker happy.
        let handle = self.handle;
        let shared = self.shared.clone();

        // `Reserving` needs to move `msg` out of `self.state`; handle it
        // before the borrow.
        if matches!(self.state, SendState::Reserving { .. }) {
            let prev = std::mem::replace(&mut self.state, SendState::Failed { error: None });
            let SendState::Reserving {
                mut msg,
                publish_time_ms,
                bytes,
                slab_key,
            } = prev
            else {
                unreachable!()
            };
            match shared.try_reserve_memory_or_register(bytes, cx.waker()) {
                Ok(()) => {
                    if let Some(prior) = slab_key {
                        shared.cancel_memory_waker(prior);
                    }
                    let owned = *msg.take().expect("Reserving polled with no message");
                    let result = {
                        // ADR-0011 — invariant #3. Engine-provided
                        // monotonic clock so `Reserving → Pending`
                        // transitions stamp deterministic Instants
                        // into the proto state machine.
                        let now = shared.now_instant();
                        let mut conn = shared.inner.lock();
                        conn.send(handle, owned, publish_time_ms, now)
                    };
                    shared.driver_waker.notify_one();
                    match result {
                        Ok(seq) => {
                            self.state = SendState::Pending { sequence_id: seq };
                            self.reserved_bytes = bytes;
                            // Fall through to the normal match so we
                            // attempt to take the outcome immediately.
                        }
                        Err(err) => {
                            shared.release_memory(bytes);
                            return Poll::Ready(Err(ClientError::Other(format!("send: {err}"))));
                        }
                    }
                }
                Err(new_key) => {
                    if let Some(prior) = slab_key {
                        shared.cancel_memory_waker(prior);
                    }
                    self.state = SendState::Reserving {
                        msg,
                        publish_time_ms,
                        bytes,
                        slab_key: Some(new_key),
                    };
                    return Poll::Pending;
                }
            }
        }

        let outcome = match &mut self.state {
            SendState::Failed { error } => {
                let err = error
                    .take()
                    .unwrap_or_else(|| ClientError::Other("send future polled after error".into()));
                Poll::Ready(Err(err))
            }
            SendState::Pending { sequence_id } => {
                let key = PendingOpKey::Send(handle, *sequence_id);
                let mut conn = shared.inner.lock();
                if let Some(outcome) = conn.take_outcome(key) {
                    drop(conn);
                    Poll::Ready(translate_send_outcome(outcome))
                } else {
                    conn.register_waker(key, cx.waker().clone());
                    Poll::Pending
                }
            }
            SendState::Reserving { .. } => unreachable!("Reserving handled above"),
        };
        if matches!(outcome, Poll::Ready(_)) && self.reserved_bytes > 0 {
            // Release the budget reservation. `Drop` would also catch the
            // cancellation path; this branch covers the normal completion
            // path so the count is current the instant the user observes
            // the result.
            self.shared.release_memory(self.reserved_bytes);
            self.reserved_bytes = 0;
        }
        outcome
    }
}

fn translate_send_outcome(outcome: OpOutcome) -> Result<MessageId, ClientError> {
    match outcome {
        OpOutcome::SendReceipt { message_id, .. } => Ok(message_id),
        OpOutcome::SendError { code, message, .. } => Err(ClientError::Broker { code, message }),
        OpOutcome::Terminal { .. } => Err(ClientError::PeerClosed),
        other => Err(ClientError::Other(format!(
            "unexpected send outcome: {other:?}"
        ))),
    }
}

/// Helper future to wait for a generic request outcome.
struct RequestFut {
    shared: Arc<ConnectionShared>,
    key: PendingOpKey,
}

impl Future for RequestFut {
    type Output = OpOutcome;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let mut conn = self.shared.inner.lock();
        if let Some(outcome) = conn.take_outcome(self.key) {
            drop(conn);
            return Poll::Ready(outcome);
        }
        conn.register_waker(self.key, cx.waker().clone());
        Poll::Pending
    }
}

impl Drop for RequestFut {
    /// Drop-time cleanup: clear our entry from the connection's waker slab so
    /// a cancelled producer-side request future (close-producer, etc.) does
    /// not leak a [`std::task::Waker`] in the slab. For
    /// [`PendingOpKey::Send`] keys the per-producer-slot waker is also
    /// cleared by [`magnetar_proto::Connection::unregister_waker`]. Mirrors
    /// the tokio engine's
    /// [`magnetar_runtime_tokio::producer::RequestFut::drop`]. Lookup
    /// multi-agent review MEDIUM-4; ADR-0024 four-layer parity.
    fn drop(&mut self) {
        self.shared.inner.lock().unregister_waker(self.key);
    }
}

/// Future that drives the connection's semantic event queue until the
/// expected [`ConnectionEvent::ProducerReady`] (or a terminal
/// `ProducerClosedByBroker` / `Closed`) lands for the given handle.
///
/// Mirrors the tokio engine's `EventWaitFut::ProducerReady`. Unlike
/// [`RequestFut`] this watches an event stream, not a single outcome
/// slot, because the broker emits `CommandProducerSuccess` separately
/// from any request-correlated outcome — the sans-io layer surfaces it
/// as `ProducerReady`.
///
/// Each `Pending` return spawns a helper that awaits
/// `driver_waker.notified()` and wakes the caller; on the next poll (or
/// on drop) the previous helper is aborted. Without that abort, the
/// stale helper from an earlier poll lingers on
/// `driver_waker.notified()` and competes with the driver loop for
/// `notify_one` permits emitted by user-facing futures.
struct ProducerReadyFut {
    shared: Arc<ConnectionShared>,
    handle: ProducerHandle,
    helper: Option<tokio::task::JoinHandle<()>>,
}

impl Drop for ProducerReadyFut {
    fn drop(&mut self) {
        if let Some(h) = self.helper.take() {
            h.abort();
        }
    }
}

impl Future for ProducerReadyFut {
    type Output = Result<(), ClientError>;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let this = self.get_mut();
        let mut conn = this.shared.inner.lock();
        loop {
            match conn.poll_event() {
                Some(ConnectionEvent::ProducerReady { handle, .. }) => {
                    if handle == this.handle {
                        return Poll::Ready(Ok(()));
                    }
                }
                Some(ConnectionEvent::ProducerClosedByBroker {
                    handle,
                    assigned_broker_service_url,
                }) => {
                    if handle == this.handle {
                        // Broker-forced close — degraded-but-recovering
                        // (warn! per ADR-0054 §2.1); the open future
                        // surfaces `Closed` and the caller decides. Mirror
                        // of the tokio engine's `EventWaitFut` arm.
                        let topic = conn
                            .producer(handle)
                            .map(|s| s.identity.topic.clone())
                            .unwrap_or_default();
                        tracing::warn!(
                            handle = ?handle,
                            topic = %topic,
                            assigned_broker_service_url = assigned_broker_service_url
                                .as_deref()
                                .map(crate::log_fields::truncate_broker_str),
                            "broker closed producer while waiting for ProducerReady"
                        );
                        return Poll::Ready(Err(ClientError::Closed));
                    }
                }
                Some(ConnectionEvent::ProducerOpenFailed {
                    handle,
                    code,
                    message,
                }) => {
                    if handle == this.handle {
                        return Poll::Ready(Err(ClientError::Broker { code, message }));
                    }
                }
                Some(ConnectionEvent::Closed { reason }) => {
                    // Connection-level close while a producer-open future was
                    // parked. ADR-0055 §1: a TERMINAL drop (`fail_all_pending`,
                    // which carries a `reason`) must unblock the waiter with
                    // the terminal `PeerClosed`, mirroring the tokio engine and
                    // the request / send / receive surfaces — not a generic
                    // `Other`. A user-requested graceful `close()` (reason
                    // `None`) keeps the `Closed` mapping. warn! per ADR-0054
                    // §2.1; `reason` is broker-controlled text.
                    tracing::warn!(
                        reason = reason
                            .as_deref()
                            .map(crate::log_fields::truncate_broker_str),
                        "connection closed while waiting for producer readiness"
                    );
                    return Poll::Ready(Err(match reason {
                        Some(_) => ClientError::PeerClosed,
                        None => ClientError::Closed,
                    }));
                }
                Some(_) => {} // ignore unrelated events
                None => break,
            }
        }
        drop(conn);

        // We have no per-event waker slot in the sans-io layer; park on the
        // driver waker. Every inbound batch ends with the driver looping
        // back to `select!`, which gives any pending `notified()` a chance
        // to fire as the next loop tick.
        if let Some(prev) = this.helper.take() {
            prev.abort();
        }
        let waker = cx.waker().clone();
        let shared = this.shared.clone();
        this.helper = Some(tokio::spawn(async move {
            shared.driver_waker.notified().await;
            waker.wake();
        }));
        Poll::Pending
    }
}

async fn wait_producer_ready(
    shared: &Arc<ConnectionShared>,
    handle: ProducerHandle,
) -> Result<(), ClientError> {
    ProducerReadyFut {
        shared: shared.clone(),
        handle,
        helper: None,
    }
    .await
}

#[cfg(test)]
mod tests {
    use std::sync::Arc;
    use std::time::{Duration, Instant};

    use bytes::{Bytes, BytesMut};
    use magnetar_proto::producer::OutgoingMessage;
    use magnetar_proto::types::{CompressionKind, ProducerHandle};
    use magnetar_proto::{ConnectionConfig, CreateProducerRequest, encode_command, pb};
    use moonpool_core::TokioProviders;

    use super::Producer;
    use crate::client::{Client, ClientError};
    use crate::{ConnectionShared, MoonpoolEngine};

    fn handshake_response_bytes() -> BytesMut {
        let cmd = pb::BaseCommand {
            r#type: pb::base_command::Type::Connected as i32,
            connected: Some(pb::CommandConnected {
                server_version: "magnetar-test".to_owned(),
                protocol_version: Some(21),
                max_message_size: Some(5 * 1024 * 1024),
                feature_flags: Some(pb::FeatureFlags::default()),
            }),
            ..Default::default()
        };
        let mut buf = BytesMut::new();
        encode_command(&mut buf, &cmd).expect("encode CommandConnected");
        buf
    }

    /// Spin up a `ConnectionShared` whose inner state machine has completed
    /// the handshake, so `create_producer` runs cleanly without erroring
    /// on protocol-state checks.
    fn handshake_complete_shared() -> Arc<ConnectionShared> {
        let shared = ConnectionShared::new(ConnectionConfig::default());
        {
            let mut conn = shared.inner.lock();
            conn.begin_handshake().expect("handshake");
            let frame = handshake_response_bytes();
            conn.handle_bytes(Instant::now(), &frame)
                .expect("connected");
        }
        shared
    }

    /// Capture the per-slot Arc for a `handle` known to be in the registry.
    fn slot_for(
        shared: &Arc<ConnectionShared>,
        handle: ProducerHandle,
    ) -> Arc<magnetar_proto::ProducerSlot> {
        shared
            .inner
            .lock()
            .producer(handle)
            .cloned()
            .expect("test producer slot must exist")
    }

    /// Placeholder slot for tests that intentionally use an unknown handle.
    fn stub_slot_for_test(handle: ProducerHandle) -> Arc<magnetar_proto::ProducerSlot> {
        magnetar_proto::ProducerSlot::new(
            magnetar_proto::ProducerIdentity {
                handle,
                topic: String::new(),
                access_mode: pb::ProducerAccessMode::Shared,
            },
            magnetar_proto::producer::ProducerState::new(
                handle,
                String::new(),
                CompressionKind::None,
                0,
            ),
        )
    }

    /// Deterministic, dependency-free PIP-4 encryptor stub: XORs every payload
    /// byte with a fixed key and stamps the canonical encryption metadata
    /// fields. Records the last plaintext it saw so tests can assert the
    /// encrypt hook ran on the pre-encryption bytes. 1:1 mirror of the tokio
    /// engine's stub (ADR-0024 cross-runtime parity).
    #[derive(Debug, Default)]
    struct XorEncryptor {
        seen_plaintext: std::sync::Mutex<Option<Vec<u8>>>,
    }

    const XOR_KEY: u8 = 0x5A;

    impl crate::crypto::MessageEncryptor for XorEncryptor {
        fn encrypt(
            &self,
            plaintext: &[u8],
            metadata: &mut pb::MessageMetadata,
        ) -> Result<Bytes, crate::crypto::EncryptError> {
            *self.seen_plaintext.lock().unwrap() = Some(plaintext.to_vec());
            metadata.encryption_keys.push(pb::EncryptionKeys {
                key: "xor-test".to_owned(),
                value: Bytes::from_static(b"k"),
                metadata: Vec::new(),
            });
            metadata.encryption_algo = Some("XOR-TEST".to_owned());
            metadata.encryption_param = Some(Bytes::from_static(b"iv"));
            Ok(Bytes::from(
                plaintext.iter().map(|b| b ^ XOR_KEY).collect::<Vec<u8>>(),
            ))
        }
    }

    /// `send` with a PIP-4 encryptor wired stamps the encryption metadata and
    /// hands the ciphertext (not the plaintext) to the sans-io layer. We observe
    /// the encrypt hook fired against the original plaintext; the resulting send
    /// enqueues a pending op (no driver running drains it). 1:1 with the tokio
    /// `send_encrypts_payload_and_stamps_metadata`.
    #[tokio::test(flavor = "current_thread")]
    async fn send_encrypts_payload_and_stamps_metadata() {
        let shared = handshake_complete_shared();
        let handle = {
            let mut conn = shared.inner.lock();
            conn.create_producer(CreateProducerRequest {
                topic: "persistent://public/default/encrypt".to_owned(),
                ..Default::default()
            })
        };
        let encryptor = Arc::new(XorEncryptor::default());
        let producer: Producer<TokioProviders> = Producer::assemble(
            shared.clone(),
            handle,
            slot_for(&shared, handle),
            CompressionKind::None,
            Some(encryptor.clone()),
        );
        let _fut = producer.send(OutgoingMessage {
            payload: Bytes::from_static(b"plain-secret"),
            metadata: pb::MessageMetadata::default(),
            uncompressed_size: 12,
            num_messages: 1,
            txn_id: None,
            source_message_id: None,
        });
        assert_eq!(
            encryptor.seen_plaintext.lock().unwrap().as_deref(),
            Some(b"plain-secret".as_slice()),
            "encrypt hook must see the pre-encryption payload",
        );
        assert!(
            producer.pending_count() >= 1,
            "expected pending encrypted send; got {}",
            producer.pending_count()
        );
    }

    /// Encryptor that always fails. Exercises the producer-side encrypt-error
    /// branch (`send` surfaces `ClientError::Other("encrypt: …")`). 1:1 with the
    /// tokio `send_encrypt_failure_surfaces_error`.
    #[derive(Debug, Default)]
    struct FailingEncryptor;

    impl crate::crypto::MessageEncryptor for FailingEncryptor {
        fn encrypt(
            &self,
            _plaintext: &[u8],
            _metadata: &mut pb::MessageMetadata,
        ) -> Result<Bytes, crate::crypto::EncryptError> {
            Err(crate::crypto::EncryptError::new(
                "forced encrypt failure (test)",
            ))
        }
    }

    /// A failing encryptor makes `send` resolve to `ClientError::Other` and the
    /// payload never reaches the sans-io layer (no pending op). 1:1 with the
    /// tokio `send_encrypt_failure_surfaces_error`.
    #[tokio::test(flavor = "current_thread")]
    async fn send_encrypt_failure_surfaces_error() {
        let shared = handshake_complete_shared();
        let handle = {
            let mut conn = shared.inner.lock();
            conn.create_producer(CreateProducerRequest {
                topic: "persistent://public/default/encrypt-fail".to_owned(),
                ..Default::default()
            })
        };
        let producer: Producer<TokioProviders> = Producer::assemble(
            shared.clone(),
            handle,
            slot_for(&shared, handle),
            CompressionKind::None,
            Some(Arc::new(FailingEncryptor)),
        );
        let res = producer
            .send(OutgoingMessage {
                payload: Bytes::from_static(b"plain"),
                metadata: pb::MessageMetadata::default(),
                uncompressed_size: 5,
                num_messages: 1,
                txn_id: None,
                source_message_id: None,
            })
            .await;
        let err = res.expect_err("encrypt failure must surface");
        assert!(
            format!("{err}").contains("encrypt:"),
            "expected encrypt-error message, got {err:?}"
        );
        assert_eq!(
            producer.pending_count(),
            0,
            "a failed encrypt must not enqueue a send"
        );
    }

    /// Smoke test: a freshly-constructed producer reports defaults that
    /// match the sans-io layer (no sends pushed, none pending, no name).
    #[tokio::test(flavor = "current_thread")]
    async fn fresh_producer_reports_defaults() {
        let shared = handshake_complete_shared();
        let handle = {
            let mut conn = shared.inner.lock();
            conn.create_producer(CreateProducerRequest {
                topic: "persistent://public/default/defaults".to_owned(),
                ..Default::default()
            })
        };
        let producer: Producer<TokioProviders> = Producer::assemble(
            shared.clone(),
            handle,
            slot_for(&shared, handle),
            CompressionKind::None,
            None,
        );
        assert_eq!(producer.pending_count(), 0);
        assert_eq!(producer.last_sequence_id(), -1);
        assert!(!producer.is_closed());
        assert_eq!(producer.name(), "");
        assert_eq!(producer.topic(), "persistent://public/default/defaults");
        assert_eq!(producer.compression(), CompressionKind::None);
        let stats = producer.stats();
        assert_eq!(stats.total_msgs_sent, 0);
        assert_eq!(stats.pending_queue_size, 0);
    }

    /// `send` on a freshly-opened (post-handshake) producer enqueues the
    /// frame into the sans-io state machine; `pending_count` flips to 1
    /// because no driver is running to drain the `CommandSendReceipt`.
    #[tokio::test(flavor = "current_thread")]
    async fn send_enqueues_pending_op() {
        let shared = handshake_complete_shared();
        let handle = {
            let mut conn = shared.inner.lock();
            conn.create_producer(CreateProducerRequest {
                topic: "persistent://public/default/enqueue".to_owned(),
                ..Default::default()
            })
        };
        let slot = slot_for(&shared, handle);
        let producer: Producer<TokioProviders> =
            Producer::assemble(shared, handle, slot, CompressionKind::None, None);
        let _fut = producer.send(OutgoingMessage {
            payload: Bytes::from_static(b"hello"),
            metadata: pb::MessageMetadata::default(),
            uncompressed_size: 5,
            num_messages: 1,
            txn_id: None,
            source_message_id: None,
        });
        assert!(
            producer.pending_count() >= 1,
            "expected pending send; got {}",
            producer.pending_count()
        );
    }

    /// `send` with a non-`None` compression codec yields a `SendFut` that
    /// resolves to `ClientError::Other` on the first poll. Until the
    /// moonpool engine ships a runtime codec, the producer refuses to
    /// hand mis-labelled bytes to the state machine.
    #[tokio::test(flavor = "current_thread")]
    async fn send_with_compression_returns_error() {
        let shared = handshake_complete_shared();
        let handle = {
            let mut conn = shared.inner.lock();
            conn.create_producer(CreateProducerRequest {
                topic: "persistent://public/default/zstd".to_owned(),
                ..Default::default()
            })
        };
        let slot = slot_for(&shared, handle);
        let producer: Producer<TokioProviders> =
            Producer::assemble(shared, handle, slot, CompressionKind::Zstd, None);
        let res = producer
            .send(OutgoingMessage {
                payload: Bytes::from_static(b"hello"),
                metadata: pb::MessageMetadata::default(),
                uncompressed_size: 5,
                num_messages: 1,
                txn_id: None,
                source_message_id: None,
            })
            .await;
        let err = res.expect_err("expected error for unwired compression");
        let s = format!("{err}");
        assert!(
            s.contains("not yet wired"),
            "expected compression-not-wired message, got {s:?}"
        );
    }

    /// `flush()` on a quiescent producer returns immediately. Idempotency
    /// guarantee mirrored from the tokio engine.
    #[tokio::test(flavor = "current_thread")]
    async fn flush_quiescent_is_noop() {
        let shared = handshake_complete_shared();
        let handle = {
            let mut conn = shared.inner.lock();
            conn.create_producer(CreateProducerRequest {
                topic: "persistent://public/default/flush-ok".to_owned(),
                ..Default::default()
            })
        };
        let slot = slot_for(&shared, handle);
        let producer: Producer<TokioProviders> =
            Producer::assemble(shared, handle, slot, CompressionKind::None, None);
        assert_eq!(producer.pending_count(), 0);
        tokio::time::timeout(Duration::from_secs(1), producer.flush())
            .await
            .expect("flush should resolve on quiescent producer")
            .expect("flush ok");
    }

    /// Producer façade is `Clone` (cheap Arc bump). Confirm both clones
    /// share the same handle.
    #[test]
    fn producer_clones_share_handle() {
        let shared = handshake_complete_shared();
        let handle = {
            let mut conn = shared.inner.lock();
            conn.create_producer(CreateProducerRequest {
                topic: "persistent://public/default/clone".to_owned(),
                ..Default::default()
            })
        };
        let slot = slot_for(&shared, handle);
        let producer: Producer<TokioProviders> =
            Producer::assemble(shared, handle, slot, CompressionKind::None, None);
        let clone = producer.clone();
        assert_eq!(producer.handle(), clone.handle());
        assert_eq!(producer.compression(), clone.compression());
    }

    /// `Client::open_producer` against `TokioProviders` resolves at the
    /// type level. We can't construct a `Client` without a real
    /// connection, so the bound is checked through the free function
    /// below.
    #[allow(dead_code)]
    fn _open_producer_bounds<P: moonpool_core::Providers>(
        client: &Client<P>,
        req: CreateProducerRequest,
    ) -> impl std::future::Future<Output = Result<super::Producer<P>, super::ClientError>> + '_
    {
        client.open_producer(req)
    }

    /// Smoke: `Client::connect_plain` is generic over `TokioProviders` and
    /// the engine's surface composes with the producer module.
    #[test]
    #[allow(clippy::let_underscore_future, clippy::no_effect_underscore_binding)]
    fn open_producer_compiles_against_tokio_providers() {
        let providers = TokioProviders::new();
        let engine = MoonpoolEngine::new(providers);
        let _client_fut =
            Client::connect_plain(&engine, "127.0.0.1:6650", ConnectionConfig::default());
    }

    /// `send` reserves payload bytes against the configured memory budget
    /// (FailImmediately policy). Once enqueued, `ConnectionShared::memory_used`
    /// reflects the reservation. Dropping the `SendFut` (the test stand-in
    /// for cancellation) releases the reservation.
    #[tokio::test(flavor = "current_thread")]
    async fn send_reserves_and_releases_memory_budget() {
        let cfg = ConnectionConfig {
            memory_limit_bytes: 1024,
            ..ConnectionConfig::default()
        };
        let shared = ConnectionShared::new(cfg);
        // Seed the handshake by hand so create_producer succeeds.
        {
            let mut conn = shared.inner.lock();
            conn.begin_handshake().expect("handshake");
            let frame = handshake_response_bytes();
            conn.handle_bytes(Instant::now(), &frame)
                .expect("connected");
        }
        let handle = {
            let mut conn = shared.inner.lock();
            conn.create_producer(CreateProducerRequest {
                topic: "persistent://public/default/budget".to_owned(),
                ..Default::default()
            })
        };
        let producer: Producer<TokioProviders> = Producer::assemble(
            shared.clone(),
            handle,
            slot_for(&shared, handle),
            CompressionKind::None,
            None,
        );
        let fut = producer.send(OutgoingMessage {
            payload: Bytes::from_static(b"abcdef"),
            metadata: pb::MessageMetadata::default(),
            uncompressed_size: 6,
            num_messages: 1,
            txn_id: None,
            source_message_id: None,
        });
        assert_eq!(
            shared
                .memory_used
                .load(std::sync::atomic::Ordering::Acquire),
            6,
            "payload bytes must be reserved against the budget"
        );
        drop(fut);
        assert_eq!(
            shared
                .memory_used
                .load(std::sync::atomic::Ordering::Acquire),
            0,
            "dropping the SendFut must release the reservation"
        );
    }

    /// `send` with a payload larger than the memory budget refuses
    /// synchronously (FailImmediately policy). The budget counter stays at
    /// zero — the reservation never lands.
    #[tokio::test(flavor = "current_thread")]
    async fn send_fails_when_memory_budget_would_overflow() {
        let cfg = ConnectionConfig {
            memory_limit_bytes: 4,
            ..ConnectionConfig::default()
        };
        let shared = ConnectionShared::new(cfg);
        {
            let mut conn = shared.inner.lock();
            conn.begin_handshake().expect("handshake");
            let frame = handshake_response_bytes();
            conn.handle_bytes(Instant::now(), &frame)
                .expect("connected");
        }
        let handle = {
            let mut conn = shared.inner.lock();
            conn.create_producer(CreateProducerRequest {
                topic: "persistent://public/default/overflow".to_owned(),
                ..Default::default()
            })
        };
        let producer: Producer<TokioProviders> = Producer::assemble(
            shared.clone(),
            handle,
            slot_for(&shared, handle),
            CompressionKind::None,
            None,
        );
        let res = producer
            .send(OutgoingMessage {
                payload: Bytes::from_static(b"too-big-payload"),
                metadata: pb::MessageMetadata::default(),
                uncompressed_size: 15,
                num_messages: 1,
                txn_id: None,
                source_message_id: None,
            })
            .await;
        assert!(matches!(
            res,
            Err(super::ClientError::Engine(
                super::super::EngineError::MemoryLimitExceeded { .. }
            ))
        ));
        assert_eq!(
            shared
                .memory_used
                .load(std::sync::atomic::Ordering::Acquire),
            0,
            "rejected sends must not bump the budget counter"
        );
    }

    /// Regression for the CLI "produce hangs against fresh broker" bug: when the broker
    /// rejects a producer-open with a PERMANENT `CommandError` (e.g.
    /// `AuthorizationError`), the moonpool engine's `wait_producer_ready` must surface
    /// a `ClientError::Broker { code, message }` rather than parking on the driver
    /// waker forever. Mirrors the proto-level
    /// `command_error_on_producer_open_with_permanent_code_emits_producer_open_failed`
    /// test, but covers the engine-side bridge from event to future-result.
    /// `ServiceNotReady` / `MetadataError` / `TopicNotFound` are deliberately NOT used
    /// here — those are transient (the runtime retries via
    /// `retry_producer_open`).
    #[tokio::test(flavor = "current_thread")]
    async fn wait_producer_ready_surfaces_broker_error() {
        let shared = handshake_complete_shared();
        let (handle, request_id) = {
            let mut conn = shared.inner.lock();
            let request_id = conn.peek_next_request_id_for_test();
            let handle = conn.create_producer(CreateProducerRequest {
                topic: "persistent://public/default/forbidden".to_owned(),
                ..Default::default()
            });
            (handle, request_id)
        };

        let err = pb::BaseCommand {
            r#type: pb::base_command::Type::Error as i32,
            error: Some(pb::CommandError {
                request_id,
                error: pb::ServerError::AuthorizationError as i32,
                message: "not authorized".to_owned(),
            }),
            ..Default::default()
        };
        let mut buf = BytesMut::new();
        encode_command(&mut buf, &err).expect("encode CommandError");
        {
            let mut conn = shared.inner.lock();
            conn.handle_bytes(Instant::now(), &buf)
                .expect("handle CommandError");
        }

        // The fix replaces an unbounded wait with a typed Broker error. Hard-cap the await
        // with a tight timeout so a regression would surface as `Elapsed`, not as a hung
        // test process.
        let res = tokio::time::timeout(
            Duration::from_secs(2),
            super::wait_producer_ready(&shared, handle),
        )
        .await
        .expect("producer-ready future must resolve (regression: previously hung)");
        match res {
            Err(super::ClientError::Broker { code, message }) => {
                assert_eq!(code, pb::ServerError::AuthorizationError as i32);
                assert_eq!(message, "not authorized");
            }
            other => panic!("expected ClientError::Broker, got {other:?}"),
        }
    }

    /// `ProducerBlock`: an overflowing send must NOT error synchronously.
    /// The `SendFut` parks in the `Reserving` state with a waker
    /// registered on `ConnectionShared::memory_wakers`. We poll the
    /// future once via `noop_waker` to land it in `Pending`, then verify
    /// the slab carries our registration.
    #[tokio::test(flavor = "current_thread")]
    async fn producer_block_parks_on_overflow_instead_of_erroring() {
        use std::future::Future as _;
        use std::pin::Pin;
        use std::task::{Context, Poll};

        let cfg = ConnectionConfig {
            memory_limit_bytes: 4,
            memory_limit_policy: magnetar_proto::MemoryLimitPolicy::ProducerBlock,
            ..ConnectionConfig::default()
        };
        let shared = ConnectionShared::new(cfg);
        {
            let mut conn = shared.inner.lock();
            conn.begin_handshake().expect("handshake");
            let frame = handshake_response_bytes();
            conn.handle_bytes(Instant::now(), &frame)
                .expect("connected");
        }
        // Pre-fill the budget so the next `send` cannot reserve.
        shared
            .try_reserve_memory(4)
            .expect("seeding the budget at the limit");

        let handle = {
            let mut conn = shared.inner.lock();
            conn.create_producer(CreateProducerRequest {
                topic: "persistent://public/default/block".to_owned(),
                ..Default::default()
            })
        };
        let producer: Producer<TokioProviders> = Producer::assemble(
            shared.clone(),
            handle,
            slot_for(&shared, handle),
            CompressionKind::None,
            None,
        );
        let mut fut = producer.send(OutgoingMessage {
            payload: Bytes::from_static(b"overflow"),
            metadata: pb::MessageMetadata::default(),
            uncompressed_size: 8,
            num_messages: 1,
            txn_id: None,
            source_message_id: None,
        });
        // Poll once: the future must register on the waker slab and
        // return `Poll::Pending`.
        let waker = futures_task_waker();
        let mut cx = Context::from_waker(&waker);
        let poll = Pin::new(&mut fut).poll(&mut cx);
        assert!(
            matches!(poll, Poll::Pending),
            "ProducerBlock must park instead of erroring (got {poll:?})"
        );
        assert_eq!(
            shared.memory_wakers.lock().len(),
            1,
            "Reserving must register exactly one waker"
        );
        // Drop the future: the registered waker must be evicted so the
        // next release does not wake a dead future.
        drop(fut);
        assert!(
            shared.memory_wakers.lock().is_empty(),
            "dropping the SendFut must cancel its registration"
        );
    }

    /// `ProducerBlock`: releasing the held budget drains every parked
    /// waker. The drained slot must be evicted from the slab so a
    /// later `release_memory` does not double-wake.
    #[tokio::test(flavor = "current_thread")]
    async fn producer_block_release_drains_wakers() {
        use std::future::Future as _;
        use std::pin::Pin;
        use std::task::{Context, Poll};

        let cfg = ConnectionConfig {
            memory_limit_bytes: 4,
            memory_limit_policy: magnetar_proto::MemoryLimitPolicy::ProducerBlock,
            ..ConnectionConfig::default()
        };
        let shared = ConnectionShared::new(cfg);
        {
            let mut conn = shared.inner.lock();
            conn.begin_handshake().expect("handshake");
            let frame = handshake_response_bytes();
            conn.handle_bytes(Instant::now(), &frame)
                .expect("connected");
        }
        // Saturate the budget so the next `send` parks.
        shared
            .try_reserve_memory(4)
            .expect("seeding the budget at the limit");

        let handle = {
            let mut conn = shared.inner.lock();
            conn.create_producer(CreateProducerRequest {
                topic: "persistent://public/default/release".to_owned(),
                ..Default::default()
            })
        };
        let producer: Producer<TokioProviders> = Producer::assemble(
            shared.clone(),
            handle,
            slot_for(&shared, handle),
            CompressionKind::None,
            None,
        );
        let mut fut = producer.send(OutgoingMessage {
            payload: Bytes::from_static(b"AB"),
            metadata: pb::MessageMetadata::default(),
            uncompressed_size: 2,
            num_messages: 1,
            txn_id: None,
            source_message_id: None,
        });
        let waker = futures_task_waker();
        let mut cx = Context::from_waker(&waker);
        assert!(matches!(Pin::new(&mut fut).poll(&mut cx), Poll::Pending));
        assert_eq!(shared.memory_wakers.lock().len(), 1);

        // Release the seed reservation. The drain must empty the slab.
        shared.release_memory(4);
        assert!(
            shared.memory_wakers.lock().is_empty(),
            "release_memory must drain the slab"
        );

        // The drop guard cleans up `fut`'s reservation if it took one.
        drop(fut);
    }

    /// `ProducerBlock`: a fully-released budget completes the parked
    /// reservation on the next poll. We park the future, drop the prior
    /// holder, then re-poll: the future advances from `Reserving` to
    /// `Pending`, the budget counter reflects the new reservation, and
    /// the slab is empty.
    #[tokio::test(flavor = "current_thread")]
    async fn producer_block_completes_when_budget_frees_up() {
        use std::future::Future as _;
        use std::pin::Pin;
        use std::sync::atomic::Ordering;
        use std::task::{Context, Poll};

        let cfg = ConnectionConfig {
            memory_limit_bytes: 4,
            memory_limit_policy: magnetar_proto::MemoryLimitPolicy::ProducerBlock,
            ..ConnectionConfig::default()
        };
        let shared = ConnectionShared::new(cfg);
        {
            let mut conn = shared.inner.lock();
            conn.begin_handshake().expect("handshake");
            let frame = handshake_response_bytes();
            conn.handle_bytes(Instant::now(), &frame)
                .expect("connected");
        }
        shared.try_reserve_memory(4).expect("seed budget");

        let handle = {
            let mut conn = shared.inner.lock();
            conn.create_producer(CreateProducerRequest {
                topic: "persistent://public/default/free".to_owned(),
                ..Default::default()
            })
        };
        let producer: Producer<TokioProviders> = Producer::assemble(
            shared.clone(),
            handle,
            slot_for(&shared, handle),
            CompressionKind::None,
            None,
        );
        let mut fut = producer.send(OutgoingMessage {
            payload: Bytes::from_static(b"ab"),
            metadata: pb::MessageMetadata::default(),
            uncompressed_size: 2,
            num_messages: 1,
            txn_id: None,
            source_message_id: None,
        });
        let waker = futures_task_waker();
        let mut cx = Context::from_waker(&waker);
        assert!(matches!(Pin::new(&mut fut).poll(&mut cx), Poll::Pending));

        // Free the seed; the drain wakes every parked future.
        shared.release_memory(4);
        assert_eq!(shared.memory_used.load(Ordering::Acquire), 0);

        // Re-poll: the future reserves its 2 bytes, transitions to
        // `Pending`, and stays pending waiting for the broker receipt
        // (no driver is running here).
        let poll = Pin::new(&mut fut).poll(&mut cx);
        assert!(
            matches!(poll, Poll::Pending),
            "still waiting on broker receipt"
        );
        assert_eq!(
            shared.memory_used.load(Ordering::Acquire),
            2,
            "the released budget must have been re-reserved by the parked send"
        );
        assert!(
            shared.memory_wakers.lock().is_empty(),
            "successful reservation must clear the slab slot"
        );

        // Drop releases the reservation back to zero.
        drop(fut);
        assert_eq!(shared.memory_used.load(Ordering::Acquire), 0);
    }

    /// `ProducerBlock`: fast-path success when the budget has room takes
    /// the synchronous `queue_send` return on line 242 (no `SendFut` slow
    /// path, no slab insert). Mirrors the `FailImmediately` fast path but
    /// proves the early return on the `ProducerBlock` side.
    #[tokio::test(flavor = "current_thread")]
    async fn producer_block_fast_path_when_budget_available() {
        let cfg = ConnectionConfig {
            memory_limit_bytes: 1024,
            memory_limit_policy: magnetar_proto::MemoryLimitPolicy::ProducerBlock,
            ..ConnectionConfig::default()
        };
        let shared = ConnectionShared::new(cfg);
        {
            let mut conn = shared.inner.lock();
            conn.begin_handshake().expect("handshake");
            let frame = handshake_response_bytes();
            conn.handle_bytes(Instant::now(), &frame)
                .expect("connected");
        }
        let handle = {
            let mut conn = shared.inner.lock();
            conn.create_producer(CreateProducerRequest {
                topic: "persistent://public/default/fast".to_owned(),
                ..Default::default()
            })
        };
        let producer: Producer<TokioProviders> = Producer::assemble(
            shared.clone(),
            handle,
            slot_for(&shared, handle),
            CompressionKind::None,
            None,
        );
        // Budget has 1024 free bytes; the 4-byte payload reserves
        // synchronously and takes the fast-path `queue_send` return.
        let _fut = producer.send(OutgoingMessage {
            payload: Bytes::from_static(b"fast"),
            metadata: pb::MessageMetadata::default(),
            uncompressed_size: 4,
            num_messages: 1,
            txn_id: None,
            source_message_id: None,
        });
        assert_eq!(
            shared
                .memory_used
                .load(std::sync::atomic::Ordering::Acquire),
            4,
            "ProducerBlock fast path must reserve synchronously",
        );
        assert!(
            shared.memory_wakers.lock().is_empty(),
            "fast path must not register a waker slot",
        );
    }

    /// `ProducerBlock`: when `conn.send` errors after a successful memory
    /// reservation, [`SendFut::poll`] must release the reservation and
    /// surface a [`ClientError::Other`] (the `Err` arm of the inner
    /// `match result {}`). We force the error by sending against an
    /// unregistered [`ProducerHandle`] — the proto layer rejects with
    /// `ProtocolError::InvariantViolation("unknown producer handle")`.
    #[tokio::test(flavor = "current_thread")]
    async fn producer_block_send_error_releases_reservation() {
        use std::future::Future as _;
        use std::pin::Pin;
        use std::task::{Context, Poll};

        let cfg = ConnectionConfig {
            memory_limit_bytes: 16,
            memory_limit_policy: magnetar_proto::MemoryLimitPolicy::ProducerBlock,
            ..ConnectionConfig::default()
        };
        let shared = ConnectionShared::new(cfg);
        {
            let mut conn = shared.inner.lock();
            conn.begin_handshake().expect("handshake");
            let frame = handshake_response_bytes();
            conn.handle_bytes(Instant::now(), &frame)
                .expect("connected");
        }
        // Saturate the budget so the first `send` lands in `Reserving`,
        // then release and re-poll: this drives the future through the
        // `Reserving → Ok(()) → conn.send → Err(_)` path.
        shared.try_reserve_memory(16).expect("seed budget");
        // Fabricate a producer handle that the state machine does NOT know
        // about. `ProducerHandle` is a transparent wrapper around `u64`; we
        // pick an id that the `create_producer` path won't have allocated.
        let bogus_handle = ProducerHandle(u64::MAX);
        let producer: Producer<TokioProviders> = Producer::assemble(
            shared.clone(),
            bogus_handle,
            stub_slot_for_test(bogus_handle),
            CompressionKind::None,
            None,
        );
        let mut fut = producer.send(OutgoingMessage {
            payload: Bytes::from_static(b"err"),
            metadata: pb::MessageMetadata::default(),
            uncompressed_size: 3,
            num_messages: 1,
            txn_id: None,
            source_message_id: None,
        });
        let waker = futures_task_waker();
        let mut cx = Context::from_waker(&waker);
        // First poll: budget full → register on slab → Pending.
        assert!(matches!(Pin::new(&mut fut).poll(&mut cx), Poll::Pending));

        // Release the seed so the next poll proceeds through the success
        // branch of `try_reserve_memory_or_register` AND lands the
        // synchronous `conn.send` error.
        shared.release_memory(16);
        let outcome = Pin::new(&mut fut).poll(&mut cx);
        match outcome {
            Poll::Ready(Err(ClientError::Other(msg))) => {
                assert!(
                    msg.contains("send:"),
                    "expected `send:` error prefix, got {msg:?}",
                );
            }
            other => panic!("expected Ready(Err(Other(...))), got {other:?}"),
        }
        // The reservation must have been released along the error path.
        assert_eq!(
            shared
                .memory_used
                .load(std::sync::atomic::Ordering::Acquire),
            0,
            "Err arm must release the reservation it took",
        );
    }

    /// `ProducerBlock`: re-polling a `Reserving` future while the budget
    /// is still full must evict the prior slab entry before inserting a
    /// new one (line 549). Two polls park the same future twice; we
    /// assert the slab carries exactly one entry after the second poll
    /// (the prior slot must have been cancelled, not leaked).
    #[tokio::test(flavor = "current_thread")]
    async fn producer_block_re_park_cancels_prior_waker_slot() {
        use std::future::Future as _;
        use std::pin::Pin;
        use std::task::{Context, Poll};

        let cfg = ConnectionConfig {
            memory_limit_bytes: 4,
            memory_limit_policy: magnetar_proto::MemoryLimitPolicy::ProducerBlock,
            ..ConnectionConfig::default()
        };
        let shared = ConnectionShared::new(cfg);
        {
            let mut conn = shared.inner.lock();
            conn.begin_handshake().expect("handshake");
            let frame = handshake_response_bytes();
            conn.handle_bytes(Instant::now(), &frame)
                .expect("connected");
        }
        shared.try_reserve_memory(4).expect("seed budget");

        let handle = {
            let mut conn = shared.inner.lock();
            conn.create_producer(CreateProducerRequest {
                topic: "persistent://public/default/repark".to_owned(),
                ..Default::default()
            })
        };
        let producer: Producer<TokioProviders> = Producer::assemble(
            shared.clone(),
            handle,
            slot_for(&shared, handle),
            CompressionKind::None,
            None,
        );
        let mut fut = producer.send(OutgoingMessage {
            payload: Bytes::from_static(b"hi"),
            metadata: pb::MessageMetadata::default(),
            uncompressed_size: 2,
            num_messages: 1,
            txn_id: None,
            source_message_id: None,
        });
        let waker = futures_task_waker();
        let mut cx = Context::from_waker(&waker);
        // First poll: lands in `Reserving { slab_key: Some(_) }`.
        assert!(matches!(Pin::new(&mut fut).poll(&mut cx), Poll::Pending));
        assert_eq!(shared.memory_wakers.lock().len(), 1);
        // Second poll: the budget is still full, so the slow path
        // re-registers and evicts the prior slot (line 549).
        assert!(matches!(Pin::new(&mut fut).poll(&mut cx), Poll::Pending));
        assert_eq!(
            shared.memory_wakers.lock().len(),
            1,
            "re-park must cancel the prior waker before inserting a new one",
        );
    }

    /// Build a no-op `Waker` suitable for synchronously polling futures
    /// in tests. We rely on `tokio`'s public re-export rather than
    /// hand-rolling unsafe raw-waker glue. `tokio::sync::Notify` already
    /// drives the production wake path; this helper is test-only so we
    /// can drive `SendFut::poll` deterministically without spinning up
    /// the executor.
    fn futures_task_waker() -> std::task::Waker {
        // `noop_waker` is stable via `std::task::Waker::noop`
        // (Rust 1.85+). The workspace MSRV is 1.85 per ADR-0007 so we
        // can use it directly.
        std::task::Waker::noop().clone()
    }

    /// `last_sequence_id_published` reports `-1` until the broker has
    /// acked at least one send. Mirrors the tokio runtime's
    /// `Producer::last_sequence_id_published`. ADR-0024 1:1 mirror.
    #[tokio::test(flavor = "current_thread")]
    async fn last_sequence_id_published_defaults_to_minus_one() {
        let shared = handshake_complete_shared();
        let handle = {
            let mut conn = shared.inner.lock();
            conn.create_producer(CreateProducerRequest {
                topic: "persistent://public/default/last-seq-pub".to_owned(),
                ..Default::default()
            })
        };
        let slot = slot_for(&shared, handle);
        let producer: Producer<TokioProviders> =
            Producer::assemble(shared, handle, slot, CompressionKind::None, None);
        assert_eq!(
            producer.last_sequence_id_published(),
            -1,
            "no broker ack yet → -1 (parity with tokio engine + Java)"
        );
    }

    /// `batch_len` reports `0` on a producer opened without batching.
    /// Mirrors the tokio runtime's `Producer::batch_len`. ADR-0024 1:1.
    #[tokio::test(flavor = "current_thread")]
    async fn batch_len_reports_zero_when_batching_disabled() {
        let shared = handshake_complete_shared();
        let handle = {
            let mut conn = shared.inner.lock();
            conn.create_producer(CreateProducerRequest {
                topic: "persistent://public/default/batch-len".to_owned(),
                ..Default::default()
            })
        };
        let slot = slot_for(&shared, handle);
        let producer: Producer<TokioProviders> =
            Producer::assemble(shared, handle, slot, CompressionKind::None, None);
        assert_eq!(
            producer.batch_len(),
            0,
            "batching disabled → batch_len == 0"
        );
    }

    /// `batch_bytes` reports `0` on a producer opened without batching.
    /// Mirrors the tokio runtime's `Producer::batch_bytes`. ADR-0024 1:1.
    #[tokio::test(flavor = "current_thread")]
    async fn batch_bytes_reports_zero_when_batching_disabled() {
        let shared = handshake_complete_shared();
        let handle = {
            let mut conn = shared.inner.lock();
            conn.create_producer(CreateProducerRequest {
                topic: "persistent://public/default/batch-bytes".to_owned(),
                ..Default::default()
            })
        };
        let slot = slot_for(&shared, handle);
        let producer: Producer<TokioProviders> =
            Producer::assemble(shared, handle, slot, CompressionKind::None, None);
        assert_eq!(
            producer.batch_bytes(),
            0,
            "batching disabled → batch_bytes == 0"
        );
    }
}