google-cloud-storage 1.14.0

Google Cloud Client Libraries for Rust - Storage
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
1559
1560
1561
1562
1563
1564
1565
1566
1567
1568
1569
1570
1571
1572
1573
1574
1575
1576
1577
1578
1579
1580
1581
1582
1583
1584
1585
1586
1587
1588
1589
1590
1591
1592
1593
1594
1595
1596
1597
1598
1599
1600
1601
1602
1603
1604
1605
1606
1607
1608
1609
1610
1611
1612
1613
1614
1615
1616
1617
1618
1619
1620
1621
1622
1623
1624
1625
1626
1627
1628
1629
1630
1631
1632
1633
1634
1635
1636
1637
1638
1639
1640
1641
1642
1643
1644
1645
1646
1647
1648
1649
1650
1651
1652
1653
1654
1655
1656
1657
1658
1659
1660
1661
1662
1663
1664
1665
1666
1667
1668
1669
1670
1671
1672
1673
1674
1675
1676
1677
1678
1679
1680
1681
1682
1683
1684
1685
1686
1687
1688
1689
1690
1691
1692
1693
1694
1695
1696
1697
1698
1699
1700
1701
1702
1703
1704
1705
1706
1707
1708
1709
1710
1711
1712
1713
1714
1715
1716
1717
1718
1719
1720
1721
1722
1723
1724
1725
1726
1727
1728
1729
1730
1731
1732
1733
1734
1735
1736
1737
1738
1739
1740
1741
1742
1743
1744
1745
1746
1747
1748
1749
1750
1751
1752
1753
1754
1755
1756
1757
1758
1759
1760
1761
1762
1763
1764
1765
1766
1767
1768
1769
1770
1771
1772
1773
1774
1775
1776
1777
1778
1779
1780
1781
1782
1783
1784
1785
1786
1787
1788
1789
1790
1791
1792
1793
1794
1795
1796
1797
1798
1799
1800
1801
1802
1803
1804
1805
1806
1807
1808
1809
1810
1811
1812
1813
1814
1815
1816
1817
1818
1819
1820
1821
1822
1823
1824
1825
1826
1827
1828
1829
1830
1831
1832
1833
1834
1835
1836
1837
1838
1839
1840
1841
1842
1843
1844
1845
1846
1847
1848
1849
1850
1851
1852
1853
1854
1855
1856
1857
1858
1859
1860
1861
1862
1863
1864
1865
1866
1867
1868
1869
1870
1871
1872
1873
1874
1875
1876
1877
1878
1879
1880
1881
1882
1883
1884
1885
1886
1887
1888
1889
1890
1891
1892
1893
1894
1895
1896
1897
1898
1899
1900
1901
1902
1903
1904
1905
1906
1907
1908
1909
1910
1911
1912
1913
1914
1915
1916
1917
1918
1919
1920
1921
1922
1923
1924
1925
1926
1927
1928
1929
1930
1931
1932
1933
1934
1935
1936
1937
1938
1939
1940
1941
1942
1943
1944
1945
1946
# `open_object` — A Walkthrough

*Last updated: 2026-05-27 (commit 30d28f9a5)*

This guide offers a walkthrough of `Storage::open_object` in the
`google-cloud-storage` crate. We'll look at what it actually does under the
hood, trace through each layer of the stack, see what remains running after the
call returns, and dive deep into reference sections for the trickier components.

All file paths here are relative to the `google-cloud-rust/` repository root,
and any line numbers correspond to the current state of the source code on disk.

## How This Document Was Created

This doc was written with AI assistance.

1. First, I used the following prompt to create an initial draft.

   > I want to understand `open_object`. Starting with a call to
   > `Client::open_object`, trace execution all the way until the function
   > returns. Pay particular attention to `src/storage/bidi` as that is where I
   > believe the bulk of the code lies. Ground every citation by annotating
   > every code snippet and claim with file name and line numbers.

1. Next, I asked AI to teach the walkthrough by breaking it down into small,
   conversational chunks. This made it easier to digest the material, and also
   made the job of double checking and challenging the AI much easier. With the
   source code open beside me, I challenged the AI often, asking it to explain,
   substantiate, or revise its claims. Whenever I felt appropriate, I asked the
   AI to update the walkthrough doc with its updated understanding.

1. Once this was done and I had a reasonable understanding of `open_object`, I
   got the AI to clean up its work. I prompted it to clean up its bombastic
   language (e.g. "the spectacle of the `ActiveRead`s operating in perfect
   synchrony with the user-facing `ReadObjectResponse`s is truly a marvel to
   behold" became "the `ActiveRead`s push the newly received chunk of data to
   the corresponding `ReadObjectResponse`"), check its file sources/line
   numbers, tighten its prose, etc. This was done several times, clearing the
   context each iteration. Doing so halved the length of the document.

   I did this by alternating between a "review" prompt:

   > You are an expert in Rust, the google-cloud-rust repository, and a highly
   > skilled technical writer. Here is a document written to understand the
   > `open_object` flow. I suspect that it is poorly written and contains
   > numerous factual inaccuracies. Please read the document and tell me all the
   > ways the doc is inaccurate. Check every single code citation rigorously,
   > ensuring the cited line numbers and files are correct. Check whether the
   > document accurately represents the flow. Tell me all the ways the doc
   > misrepresents the code.

   and a "rewriting" prompt:

   > Please rewrite the document so that the language is clean, precise, and
   > direct. Prefer simple sentence structures and clean language.

1. I naively assumed that correcting the AI in conversation would result in it
   saving the corrected knowledge to the document. I was wrong - human review of
   the final was still necessary. To make this task more manageable, the
   document was checked in piecemeal.

1. Over time, the underlying code may change. I run this prompt periodically to
   keep the walkthrough up to date:

   > You are an expert in Rust and the `google-cloud-rust` codebase. Update the
   > `open_object` walkthrough to match the latest implementation on disk. Trace
   > execution from `Client::open_object` through `src/storage/bidi` until
   > completion. Check every single code citation rigorously, ensuring the cited
   > line numbers and files are correct. Check whether the document accurately
   > represents the flow. Tell me all the ways the doc misrepresents the code.

## Table of Contents

- [Mental Model: A Persistent Object Handle]#mental-model-a-persistent-object-handle
- [Files Involved]#files-involved
- [End-to-End Call Graph]#end-to-end-call-graph
- [Part 1 - The Linear Execution Trace]#part-1---the-linear-execution-trace
  - [1. `open_object()` Returns a Request Builder — Lazy, No I/O]#1-open_object-returns-a-request-builder--lazy-no-io
  - [2. `.send()` / `.send_and_read()`]#2-send--send_and_read
  - [3. The Stub Trait — The Mock Seam]#3-the-stub-trait--the-mock-seam
  - [4. The Transport Routes on Tracing]#4-the-transport-routes-on-tracing
  - [5. `open_object_plain` — Four Crucial Lines]#5-open_object_plain--four-crucial-lines
  - [6. `into_parts` — Splitting the Request]#6-into_parts--splitting-the-request
  - [7. `Connector::new` — Armed but Not Fired]#7-connectornew--armed-but-not-fired
  - [8. `ObjectDescriptorTransport::new` — Orchestration (Part 1: Prep)]#8-objectdescriptortransportnew--orchestration-part-1-prep
  - [9. `connect()` — The Retry and Self-Heal Loop]#9-connect--the-retry-and-self-heal-loop
  - [10. `connect_attempt` — Establishing the Connection]#10-connect_attempt--establishing-the-connection
  - [11. `ObjectDescriptorTransport::new` — Orchestration (Part 2)]#11-objectdescriptortransportnew--orchestration-part-2
  - [12. The Climb Back Up]#12-the-climb-back-up
  - [13. After `send()` / `send_and_read()` Returns]#13-after-send--send_and_read-returns
- [Part 2 — Reference and Deep Dives]#part-2--reference-and-deep-dives
  - [A. The Client Type and the Stub Seam]#a-the-client-type-and-the-stub-seam
  - [B. `ObjectDescriptor` Anatomy]#b-objectdescriptor-anatomy
  - [C. The Wire Types]#c-the-wire-types
  - [D. The Channels]#d-the-channels
  - [E. The gRPC Transport Layer]#e-the-grpc-transport-layer
  - [F. Trailers, Status, and Errors]#f-trailers-status-and-errors
  - [G. `ActiveRead` vs `ReadObjectResponse`]#g-activeread-vs-readobjectresponse
  - [H. The Two "Metadata"s]#h-the-two-metadatas
  - [I. The Worker's `run` Loop, Branch by Branch]#i-the-workers-run-loop-branch-by-branch
  - [J. The Range-Type Stack]#j-the-range-type-stack
  - [K. Redirects and the Resilience Machinery]#k-redirects-and-the-resilience-machinery

## Mental Model: A Persistent Object Handle

`open_object` isn't a one-off "fetch data" request - it opens a **bidirectional
gRPC streaming RPC** (`google.storage.v2.Storage/BidiReadObject`) that enables
concurrent, repeated reads. When the user makes a single call to `open_object`,
here is what happens:

1. **Initiate the connection:** It establishes a single, long-lived
   bidirectional stream to Google Cloud Storage (GCS).
1. **Server responds with object metadata:** The server identifies itself, and
   the very first message it sends back contains the object's metadata.
1. **The stream stays open:** A background **`Worker` task** is spawned to own
   and manage the stream for its entire lifetime.
1. **The user gets a handle:** The user receives an `ObjectDescriptor`, which
   acts as the user's handle. The user can use it to request multiple
   byte-ranges over time, and all these requests are multiplexed over that
   single underlying stream.

In some ways this resembles a pre-cell phone era phone call:

| Metaphor                                     | Real Thing                                                         |
| -------------------------------------------- | ------------------------------------------------------------------ |
| The open line                                | The bidirectional gRPC stream                                      |
| The operator keeping it alive                | The spawned `Worker` task                                          |
| The user's handset wire to the operator      | The **read-request channel** (which carries `ActiveRead` requests) |
| A dedicated answer line per request          | The per-range **byte channels** (which carry `Result<Bytes>`)      |
| The operator's memory of who the user dialed | The `Arc<Mutex<BidiReadObjectSpec>>` (used for redials/reconnects) |
| The handle in the user's hand                | The `ObjectDescriptor`                                             |

By the time `open_object` returns, three distinct things are alive: the gRPC
stream itself, the detached worker task running in the background, and the
user's descriptor handle (which communicates with the worker over a channel).

## Files Involved

Here's a quick map of the relevant files and the layers they represent:

| Layer                                            | File                                                            |
| ------------------------------------------------ | --------------------------------------------------------------- |
| Public client & `open_object` method             | `src/storage/src/storage/client.rs`                             |
| `DefaultStorage` alias                           | `src/storage/src/lib.rs`                                        |
| Request builder (`OpenObject`)                   | `src/storage/src/storage/open_object.rs`                        |
| App request type & `into_parts`                  | `src/storage/src/model_ext/open_object_request.rs`              |
| Stub trait (mock seam)                           | `src/storage/src/storage/stub.rs`                               |
| Default implementation (routing & orchestration) | `src/storage/src/storage/transport.rs`                          |
| Public `ObjectDescriptor`                        | `src/storage/src/object_descriptor.rs`                          |
| Descriptor stub trait & dynamic bridge           | `src/storage/src/storage/bidi/stub.rs`                          |
| Connect and reconnect logic (the real RPC)       | `src/storage/src/storage/bidi/connector.rs`                     |
| Descriptor transport (orchestration)             | `src/storage/src/storage/bidi/transport.rs`                     |
| Background stream pump                           | `src/storage/src/storage/bidi/worker.rs`                        |
| Per-range read state                             | `src/storage/src/storage/bidi/active_read.rs`                   |
| Redirect handling                                | `src/storage/src/storage/bidi/redirect.rs`                      |
| `Client` trait / tonic glue                      | `src/storage/src/storage/bidi.rs`                               |
| Generic gRPC client                              | `src/gax-internal/src/grpc.rs`                                  |
| Generic retry loop                               | `src/gax/src/retry_loop_internal.rs`                            |
| Generated protobufs                              | `src/storage/src/generated/protos/storage/google.storage.v2.rs` |

## End-to-End Call Graph

Here is the high-level execution trace:

| Call Stack / Action                 | Location                           | Description                                   |
| :---------------------------------- | :--------------------------------- | :-------------------------------------------- |
| `client.open_object(b, o)`          | `client.rs:271`                    | build `OpenObject` (no I/O happens here)      |
| -> `.send()`                        | `open_object.rs:66`                | go async, call the stub                       |
| -> `stub::open_object`              | `stub.rs:76`                       | trait seam (perfect for mocking)              |
| -> `transport open_object`          | `transport.rs:318`                 | tracing enabled? -> routes to `_plain`        |
| -> `open_object_plain`              | `transport.rs:219`                 | `into_parts` + `Connector` + `Transport::new` |
| -> `into_parts`                     | `open_object_request.rs:150`       | splits request -> (spec, ranges)              |
| -> `Connector::new`                 | `connector.rs:75`                  | armed, still no I/O                           |
| -> `ObjectDescriptorTransport::new` | `bidi/transport.rs:34`             | **ORCHESTRATION BEGINS**                      |
| -> `(read-request channel)`         | `bidi/transport.rs:44`             |                                               |
| -> `connect` -> `connect_attempt`   | `connector.rs:84,140`              | OPENS STREAM, reads 1st msg                   |
| -> `client.start`                   | `bidi.Connector::connect`          |                                               |
| -> `bidi_stream_with_status`        | `src/gax-internal/src/grpc.rs:198` |                                               |
| -> `inner.streaming`                | `src/gax-internal/src/grpc.rs:223` | \<- tonic / hyper / h2 / socket               |
| -> `map_ranges`                     | `bidi/transport.rs:74`             | creates byte channels                         |
| -> `Worker::new` + `spawn(run)`     | `bidi/transport.rs:58,63`          | detaches background pump                      |
| \<- `(ObjectDescriptor, readers)`   |                                    | Return values                                 |

### Sequence Diagram

```mermaid
sequenceDiagram
    autonumber
    participant App
    participant Client
    participant Stub
    participant Transport
    participant Bidi as Bidi Transport
    participant Conn as Connector
    participant gRPC

    App->>Client: open_object(bucket, object)
    Note right of Client: client.rs:271<br/>build OpenObject, no I/O
    Client-->>App: OpenObject builder
    
    App->>Client: .send()
    Note right of Client: open_object.rs:66
    Client->>Stub: open_object()
    Note right of Stub: stub.rs:76<br/>mock seam
    Stub->>Transport: open_object()
    Note right of Transport: transport.rs:318<br/>routes to _plain
    
    Transport->>Transport: open_object_plain()
    Note right of Transport: transport.rs:219
    Transport->>Transport: into_parts()
    Note right of Transport: splits request (spec, ranges)
    
    Transport->>Conn: Connector::new()
    Note right of Conn: connector.rs:75<br/>armed, still no I/O
    
    Transport->>Bidi: ObjectDescriptorTransport::new()
    Note right of Bidi: bidi/transport.rs:34<br/>ORCHESTRATION BEGINS
    
    Bidi->>Bidi: create read-request channel (line 44)
    
    Bidi->>Conn: connect() -> connect_attempt()
    Note right of Conn: connector.rs:84,140
    Conn->>gRPC: client.start() -> bidi_stream_with_status()
    Note right of gRPC: grpc.rs:198<br/>OPENS STREAM, reads 1st msg
    
    Bidi->>Bidi: map_ranges() byte channels (line 74)
    Bidi->>Bidi: Worker::new() + spawn(run)
    Note right of Bidi: lines 58,63<br/>detaches background pump
    
    Bidi-->>Transport: (ObjectDescriptor, readers)
    Transport-->>App: ObjectDescriptor
```

### Flowchart

```mermaid
graph TD
    A["client.open_object (client.rs:271)<br/><i>build OpenObject (no I/O)</i>"] --> B[".send() (open_object.rs:66)"]
    B --> C["stub::open_object (stub.rs:76)<br/><i>trait seam (perfect for mocking)</i>"]
    C --> D["transport open_object (transport.rs:318)<br/><i>tracing enabled? routes to _plain</i>"]
    D --> E["open_object_plain (transport.rs:219)"]
    
    E --> F["into_parts (open_object_request.rs:150)<br/><i>splits request -> (spec, ranges)</i>"]
    E --> G["Connector::new (connector.rs:75)<br/><i>armed, still no I/O</i>"]
    E --> H["ObjectDescriptorTransport::new (bidi/transport.rs:34)<br/><b>ORCHESTRATION BEGINS</b>"]
    
    H --> I["(read-request channel) (bidi/transport.rs:44)"]
    H --> J["connect -> connect_attempt (connector.rs:84,140)"]
    J --> K["client.start -> bidi_stream_with_status<br/><i>src/gax-internal/src/grpc.rs:198</i><br/><b>OPENS STREAM, reads 1st msg</b>"]
    
    H --> L["map_ranges (byte channels) (bidi/transport.rs:74)"]
    H --> M["Worker::new() + spawn(run) (bidi/transport.rs:58,63)<br/><i>detaches background pump</i>"]
    
    H -.-> N["Return: (ObjectDescriptor, readers)"]
```

# Part 1 - The Linear Execution Trace

Let's walk through the execution step-by-step.

## 1. `open_object()` Returns a Request Builder — Lazy, No I/O

We start in `client.rs:271`:

```rust
pub fn open_object<B, O>(&self, bucket: B, object: O) -> OpenObject<S>
```

The body of this function is just a single line at `:276`:
`OpenObject::new(self.stub.clone(), bucket, object, self.options.clone())`. It
clones `self.options` so that per-request overrides don't mutate the underlying
client.

Here's the definition of the `OpenObject` struct (`open_object.rs:43`):

```rust
pub struct OpenObject<S = crate::storage::transport::Storage> {
    stub: Arc<S>,
    request: OpenObjectRequest,
    options: RequestOptions,
}
```

(If you are wondering what `S` is, check out **Reference A**.)

The returned `OpenObject` is a request builder which provides the following
setters:

- **Request fields:**
  - `set_generation` (`:142`)
  - `set_if_generation_match` (`:162`)
  - `set_if_generation_not_match` (`:186`)
  - `set_if_metageneration_match` (`:209`)
  - `set_if_metageneration_not_match` (`:232`)
  - `set_key` (`:256`, used for CSEK)
- **Options:** behavior can be configured with:
  - `with_retry_policy` (`:284`)
  - `with_backoff_policy` (`:308`)
  - `with_retry_throttler` (`:338`)
  - `with_read_resume_policy` (`:365`)
  - `with_attempt_timeout` (`:396`, which defaults to 60s)
  - `with_user_agent` (`:415`)
  - `with_quota_project` (`:439`)

Up to this point, there has been zero network activity.

## 2. `.send()` / `.send_and_read()`

The action starts when the user invokes `send()` at `open_object.rs:66`:

```rust
pub async fn send(self) -> Result<ObjectDescriptor> {
    let (descriptor, _) = self.stub.open_object(self.request, self.options).await?;  // :67
    Ok(descriptor)                                                                   // :68
}
```

This method calls the stub—finally initiating network activity—and notably
**discards the `Vec<ReadObjectResponse>` readers** by using `_`.

There's also a sibling method, `send_and_read` (`:91`). This method pushes a
single range first (`:95`) and retains the single reader returned by the stub
(`:98`). It enforces that only one reader exists via `unreachable!` (`:102`).
This is the preferred path for an "open plus first read in a single round trip"
operation.

## 3. The Stub Trait — The Mock Seam

The `open_object` trait method called on the stub is defined in `stub.rs:76`:

```rust
fn open_object(&self, _request, _options)
    -> impl Future<Output = Result<(Descriptor, Vec<ReadObjectResponse>)>> + Send {
    unimplemented_stub::<(Descriptor, Vec<ReadObjectResponse>)>()   // :82 → :104 unimplemented!()
}
```

`self.stub` is typed as `Arc<S: stub::Storage>`. The stub is provided to make it
easier to provide a mock implementation for testing. (See **Reference A** for
more details.)

## 4. The Transport Routes on Tracing

The production trait implementation is located inside `transport.rs:318` (which
is part of `impl super::stub::Storage for Storage`, starting at `:267`):

```rust
async fn open_object(&self, request, options) -> Result<(...)> {
    if self.tracing { return self.open_object_tracing(request, options).await; }  // :323
    self.open_object_plain(request, options).await                                // :326
}
```

If tracing is enabled, `open_object_tracing` (`:231`) wraps the call. It sets up
a `client_request` span tagged with
`google.storage.v2.Storage/BidiStreamingRead` (`:245`), forwards to the `_plain`
variant, and wraps the resulting descriptor and readers with tracing decorators
(`:253–263`). For this walkthrough, we'll follow the `_plain` path.

> ⚠️ *A quick note on naming:* There are two structs named `Storage`. One is the
> **client** `Storage<S>` (`client.rs:92`), and the other is the **transport**
> `Storage` (`transport.rs:49`, which is aliased to `DefaultStorage`). Here,
> `self` refers to the transport.

## 5. `open_object_plain` — Four Crucial Lines

In `transport.rs:219`, the core logic boils down to four lines:

```rust
let (spec, ranges) = request.into_parts();                              // :224  decompose the request into request spec + desired ranges
let connector = Connector::new(spec, options, self.inner.grpc.clone()); // :225  specify the connection parameters
let (transport, readers) =
    ObjectDescriptorTransport::new(connector, ranges).await?;           // :226  establish the connection and send the initial RPC
Ok((ObjectDescriptor::new(transport), readers))                         // :227  wrap the I/O mechanism & return handles to the incoming byte streams
```

Line `:226` is where the network is finally touched.

(Note `self.inner.grpc` references the shared gRPC client attached to
`StorageInner`.)

## 6. `into_parts` — Splitting the Request

Looking at `open_object_request.rs:150`:

```rust
pub(crate) fn into_parts(mut self) -> (BidiReadObjectSpec, Vec<ReadRange>) {
    let ranges = std::mem::take(&mut self.ranges);   // :151
    (BidiReadObjectSpec::from(self), ranges)         // :152
}
```

`std::mem::take` swaps the `ranges` Vec out of the request with any empty Vec,
so that `self` remains whole. This allows `self` to be consumed by `From::from`
on the next line.

(`let ranges = self.ranges` would have resulted in a partial move of `self`,
preventing it from being passed into `BidiReadObjectSpec::from`.)

**Why do we split the request?** The `spec` represents the *connection identity
and conditions* (which are long-lived and resent on every reconnect). The
`ranges` represent the *work* (which is transient; more ranges can be added
later via `read_range`). Consequently, they are routed differently: `spec` goes
to the `Connector` (`:225`), while `ranges` go directly to the transport
(`:226`). This cleanly mirrors the protobuf wire format, where the request
message has distinct `read_object_spec` and `read_ranges` fields (see
**Reference C**).

## 7. `Connector::new` — Armed but Not Fired

In `connector.rs:75`, the connector wraps the `spec` in an `Arc<Mutex<...>>`
(`:77`), stores the options and client, and initializes `reconnect_attempts` to
`0` (`:80`). There's **no I/O** happening here. The struct definition (`:62`)
looks like this:

```rust
pub struct Connector<T = GrpcClient> {
    spec: Arc<Mutex<BidiReadObjectSpec>>,   // mutable, shared identity
    options: RequestOptions,
    client: T,                              // generic for mocking; real = gaxi GrpcClient
    reconnect_attempts: u32,
}
```

The connector's job is: *"Establishes (and reconnects) bidi streaming reads."*
(`:45`).

Handling reconnects inherently requires state. The connector must remember the
generation, read handle, routing token, and the number of attempt counts. We use
an `Arc<Mutex>` because the spec is **mutated** after creation (the server fills
in the generation, handle, and token) and must be **shared** across both the
connect retry loop and the worker's reconnect path.

## 8. `ObjectDescriptorTransport::new` — Orchestration (Part 1: Prep)

Moving to `bidi/transport.rs:34`, we do some prep work before establishing the
connection:

```rust
let (tx, rx) = tokio::sync::mpsc::channel(100);                    // :44  new read request channel
let requested_ranges = ranges.into_iter().map(|r| r.0).collect::<Vec<_>>(); // :45  unwrap newtype
let proto_ranges = requested_ranges.iter().enumerate()
    .map(|(id, r)| r.as_proto(id as i64)).collect::<Vec<_>>();              // :46–50  mint read-ids
```

- **Line `:44`** creates the **read-request channel** between the descriptor and
  the worker. The descriptor holds onto `tx`, while `rx` will be handed to the
  worker (`:63`). This channel carries new read requests (`ActiveRead`).
- **Lines `:46–50`** assign an index to each range, establishing its
  **read-id**. This identifier acts as the address used to reliably route
  responses back to the correct reader. (For a deep dive into all the channels,
  see **Reference D**.)

## 9. `connect()` — The Retry and Self-Heal Loop

Over in `connector.rs:84`, we find `connect()`. It's actually **a wrapper**
around the `connect_attempt` function, rather than the connect operation itself.

Inside, the `inner` closure (`:98`) calculates
`attempt_timeout = min(per-attempt, remaining-budget)` (fulfilling the promise
in the `with_attempt_timeout` docs, at `:99`) and wraps a single
`connect_attempt` in a `tokio::time::timeout`.

The process is driven by `retry_loop` (`:107`): it continuously attempts the
call as long as the retry policy permits, success hasn't been achieved, and the
throttler allows it, sleeping for the appropriate backoff duration between
attempts. The flag `idempotent` is set to `true` (`retry_loop_internal.rs:54`)
because opening a read is safely side-effect-free and retryable. Notably, **this
exact same `connect()` function is reused by the worker whenever a reconnect is
needed (`connector.rs:125`).**

## 10. `connect_attempt` — Establishing the Connection

In `connector.rs:140`, the actual connection is established in two phases.

**Phase A — Composing and sending the opening message:**

```rust
let request = BidiReadObjectRequest {
    read_object_spec: Some((*spec.lock()...).clone()),   // :147  snapshot the request spec
    read_ranges: ranges,                                 // :148
};
// :150–176  validate bucket is `projects/_/buckets/*`, else BindingError BEFORE any network
// :177–183  build x-goog-request-params = bucket=… (+ &routing_token=… if set)
let (tx, rx) = tokio::sync::mpsc::channel::<BidiReadObjectRequest>(100);  // :185  wire-out (mpsc)
tx.send(request.clone()).await...;                                       // :186  preload 1st msg
// :188–197  GrpcMethod + path /google.storage.v2.Storage/BidiReadObject
let response = client.start(extensions, path, rx, options, &X_GOOG_API_CLIENT_HEADER,
                            &x_goog_request_params).await?;              // :199  OPEN THE STREAM
```

Here, `client.start` (`bidi.rs:69`) takes `rx`, wraps it in a `ReceiverStream`,
and invokes `bidi_stream_with_status` (refer to **Reference E**).

Note that if a connection is successfully established, `read_object_spec` is
updated to contain the object's generation and RPC read handle
(`connector.rs:221-226`). `reconnect()` uses `connect()` under the hood, and so
`:147` ensures that any reconnect attempt uses this enriched information.

**Phase B — The Handshake:**

```rust
let response = match response { Ok(r) => r, Err(status) => return Err(handle_redirect(spec, status)) }; // :211
let (metadata, mut stream, _) = response.into_parts();   // :216  metadata=transport headers
let headers = metadata.into_headers();                   // :217
match stream.next_message().await {                      // :218  read the FIRST message
    Ok(Some(m)) => {
        let mut guard = spec.lock()...;                  // :220  ENRICH the spec:
        if let Some(g) = m.metadata.as_ref().map(|o| o.generation) { guard.generation = g; }  // :222
        if m.read_handle.is_some() { guard.read_handle = m.read_handle.clone(); }             // :225
        Ok((m, headers, Connection::new(tx, stream)))    // :227
    }
    Ok(None) => Err(Error::io("...closed before start")), // :229
    Err(status) => Err(handle_redirect(spec, status)),    // :230
}
```

It is required that the first message carries the object metadata (this is
strictly enforced in step 11). The writes at `:222` and `:225` represent the
**enrichment** phase. If the app initiated the request with `generation: 0`
(meaning "latest"), we capture the generation number returned by the server,
alongside the read handle to make future reconnects cheaper.

This block returns `(initial_response, headers, connection)`. This matches
exactly what `connect()` outputs, and what the `connector.connect(...)` call in
step 11 returns.

> *Clarification:* Don't confuse the two uses of the word "metadata" here. The
> first `metadata` that is turned `into_headers` represents the **transport**
> metadata (the gRPC response headers). On the other hand, `m.metadata` is the
> actual **object** metadata (the `Object` protobuf). See **Reference H** for
> details.

## 11. `ObjectDescriptorTransport::new` — Orchestration (Part 2)

Returning to `bidi/transport.rs:51`, armed with our
`(initial, headers, connection)`:

```rust
let (mut initial, headers, connection) = connector.connect(proto_ranges).await?;  // :51
let object = FromProto::cnv(initial.metadata.take().ok_or_else(|| {
    Error::deser("initial response in bidi read must contain object metadata") })?)...;  // :52–55
let object = Arc::new(object);                                                    // :56
let (active, readers) = Self::map_ranges(requested_ranges, &tx, &object);         // :57
let mut worker = super::worker::Worker::new(connector, active);                   // :58
worker.handle_response_success(initial).await.map_err(Error::io)?;                // :59–62
let _handle = tokio::spawn(worker.run(connection, rx));                           // :63
Ok((Self { object, headers, tx }, readers))                                       // :64–71
```

Let's break down this crucial block:

- **Lines `:52–56`:** We use `initial.metadata.take()` - the `Option` equivalent
  of `mem::take` - stealing the field while allowing `initial` to be reused at
  `:59`. We wrap the resulting `Object` in an `Arc` because it needs to be
  shared between the descriptor and every reader.
- **Line `:57`:** `map_ranges` (defined at `:74`, with the per-range channel
  created at `:82`) mints exactly one **byte channel per range**. This yields an
  `ActiveRead` (the sender side, kept by the worker) and a `RangeReader` wrapped
  as a `ReadObjectResponse` (the receiver side, handed to the user). See
  **Reference D** and **Reference G**.
- **Line `:58`:** We instantiate the worker. The `connector` is **moved in**
  here. Because the worker now completely owns the connector, it can freely call
  `connector.reconnect(...)` from within its detached task later on. Inside the
  worker, `active` is transformed into a `HashMap` mapping read ids to
  `ActiveRead`s (`worker.rs:38–43`).
- **Lines `:59–62`:** We immediately feed the worker the first message's
  `object_data_ranges`. This happens **synchronously**. For a standard `send()`,
  this might be a no-op, but for `send_and_read`, it delivers the very first
  chunk of data instantly.
- **Line `:63`:** The worker is handed off to be executed asynchronously via
  `tokio::spawn` and is completely **detached** (we deliberately drop the
  handle). It now has full ownership of the connection and will run its loop
  until the stream concludes naturally, an unrecoverable error strikes, or the
  descriptor's `tx` channel is dropped.
- **Lines `:64–71`:** Finally, we return the fully initialized transport
  (`object`, `headers`, `tx`) along with the `readers`.

## 12. The Climb Back Up

The call stack unwinds cleanly:

| Action                                                           | Location               | Description                                                |
| :--------------------------------------------------------------- | :--------------------- | :--------------------------------------------------------- |
| `ObjectDescriptorTransport::new` -> `(transport, readers)`       | `bidi/transport.rs:64` | Hands the established connection and range readers back up |
| `open_object_plain: ObjectDescriptor::new(transport)`            | `transport.rs:227`     | Wraps the transport stub into the public type              |
| `stub::open_object`                                              | `transport.rs:326`     | Hands the transport stub back up                           |
| **Either:** `.send()`: `let (descriptor, _) = …; Ok(descriptor)` | `open_object.rs:67`    | Discards the readers                                       |
| **Or:** `send_and_read()`                                        | `open_object.rs:98`    | Retains the descriptor plus the single reader              |

The user is now successfully holding an `ObjectDescriptor` (analogous to a file
descriptor) which allows reads on the object.

## 13. After `send()` / `send_and_read()` Returns

When the dust settles, three entities remain alive and active:

1. The user's `ObjectDescriptor` (holding the `tx` to the worker).
1. The detached `Worker` task (owning the connection, the connector, and the
   read-id HashMap).
1. The enriched spec, living safely behind its `Arc<Mutex>`.

When the user issues a new `ObjectDescriptor::read_range()`, a new byte channel
is minted and packaged into an `ActiveRead`. The `ActiveRead` is sent down the
read-request channel, is processed by Branch B of the worker loop, and gets
tagged with a fresh read-id. The worker issues a `BidiReadObjectRequest` to the
server, the server responds, the worker intercepts the chunks, looks up the
read-id in the HashMap, and pushes the bytes down the channel to the user via
the corresponding `RangeReader`, finally reaching the user through the
`ReadObjectResponse`.

# Part 2 — Reference and Deep Dives

## A. The Client Type and the Stub Seam

Let's look at `client.rs:92`:

```rust
pub struct Storage<S = crate::stub::DefaultStorage>
where S: crate::stub::Storage + 'static
{ stub: std::sync::Arc<S>, options: RequestOptions }
```

- **`S`** represents the **stub type**—the actual engine that executes the RPCs.
  It is bounded by the `stub::Storage` trait we first saw back in step 3.
- **Default `S = DefaultStorage`**: `DefaultStorage` is simply a handy alias for
  the transport (`pub use crate::storage::transport::Storage as DefaultStorage;`
  in `lib.rs:122`). This means if the user doesn't explicitly specify otherwise,
  `S` resolves to the real network transport (`transport::Storage`).
- **Why make it generic?** It's all about testing. The user can instantiate
  `Storage<MockStub>` to exercise the entire client chain against a mocked
  backend. The `from_stub` method (`client.rs:130`) exists explicitly for this
  purpose. Since `S` is a type parameter rather than a `dyn` trait object,
  method dispatch remains **static**—no virtual table overhead.

To use an analogy: if `open_object` is the act of placing a phone call, the
`stub` is the phone itself. In production, the user has a real phone wired
directly to GCS. In tests, the user swaps it for a toy phone that plays
pre-recorded responses.

## B. `ObjectDescriptor` Anatomy

The `object_descriptor.rs` file defines the public handle the user receives.
Structurally, it's a **hollow newtype wrapped around a trait object** (`:58`):

```rust
pub struct ObjectDescriptor { inner: Arc<dyn ObjectDescriptorStub> }
```

- `Arc` ensures it is cheap to `Clone`. If the user clones the descriptor, both
  copies share the same underlying open stream and worker task.
- `dyn` enables type erasure, letting the same handle wrap either the real
  transport or a mock implementation.
- The three methods exposed to the user—`object()` (`:79`), `read_range()`
  (`:112`), and `headers()` (`:133`)—simply forward their calls directly to
  `self.inner`. All the real behavioral logic lives inside the inner
  `ObjectDescriptorTransport`.
- The `new<T>` constructor (`:141`) takes any type `T: stub::ObjectDescriptor`
  and wraps it in the `Arc<dyn>`. Conversely, `into_parts` (`:150`) extracts and
  returns the raw `inner` object (which is how the tracing path unpacks it).

There are some excellent documentation notes at `:44–56`: the handle is
described as being *"analogous to a 'file descriptor'"*. It warns that reads
across different ranges have **no ordering guarantees**. Crucially, it
highlights a major footgun: if the user fails to drain data from one reader, its
buffer will fill up, which will **stall all other reads** sharing the stream.

Is `ObjectDescriptor` truly like a file descriptor? Yes, specifically one of the
**`pread` flavor**: the user opens it once, and then the user can issue multiple
positional `(offset, length)` reads. There is no internal seek pointer, and
dropping the descriptor is effectively closing it (as seen in `worker.rs:88`,
where the worker gracefully exits when the descriptor's `tx` channel
disappears). But it's also *more* powerful than a simple file descriptor: it
caches metadata in-process (`object()` doesn't require an I/O call), it is
**snapshot-pinned** to a specific object generation (`connector.rs:221`), and it
intelligently **self-heals** by reconnecting under the hood (`worker.rs:153`).

**The Two-Trait Pattern:** The user might notice that the trait used in the
`inner` field type (`:21`, `:59`) differs slightly from the trait bound on
`new<T>` (`:143`). Here is why:

- `crate::stub::ObjectDescriptor` (`stub.rs:92`): This is the **ergonomic**
  trait featuring native `async fn`. However, native async traits aren't
  dyn-compatible (object safe).
- `...bidi::stub::dynamic::ObjectDescriptor` (`bidi/stub.rs:24`): This is the
  **dyn-safe** variant, utilizing `#[async_trait]` to return boxed futures. This
  is what actually gets stored inside the `Arc<dyn>`.
- The bridge: At `bidi/stub.rs:31`, a blanket implementation automatically
  implements the dyn-safe trait for any type `T` that implements the ergonomic
  `stub::ObjectDescriptor`. This means implementors get the clean, modern API,
  while the library handles the messy type erasure internally.

## C. The Wire Types

Let's look at the data structures carrying the requests and responses.

**`OpenObjectRequest`** (`open_object_request.rs:30`): This is the user-friendly
container built by the application. It holds the bucket, object name,
generation, the four `if_*` preconditions, CSEK parameters, and the initial
`ranges`. It deliberately **omits** the `read_handle` and `routing_token` (as
noted in the comment at `:26`), because those values are strictly supplied by
the server.

**`BidiReadObjectSpec`** (`google.storage.v2.rs:410`): This is the generated
protobuf type representing the "which object and under what conditions" half of
the request. The fields can be grouped by where they originate:

- *App-supplied:*
  - `bucket` (`:412`)
  - `object` (`:414`)
  - `generation` (`:416`)
  - the four `if_*` conditions:
    - `if_generation_match` (`:418`)
    - `if_generation_not_match` (`:420`)
    - `if_metageneration_match` (`:422`)
    - `if_metageneration_not_match` (`:424`)
  - `common_object_request_params` (`:426`)
- *Server-supplied (filled in by the connector):*
  - `read_handle` (`:431`)
  - `routing_token` (`:433`)
- The `read_mask` field (`:429`) is marked `#[deprecated]`.
- The `generation` field actually switches ownership: it starts as the
  application's request (e.g., `0` for "latest"), but the connector overwrites
  it with the concrete value resolved by the server (`connector.rs:221`),
  effectively pinning the snapshot.

A note on the `#[prost(...)]` decoder: `tag = "N"` represents the wire field
number (names aren't transmitted, and tags may have gaps if fields were
deprecated). `Option<T>` maps to the protobuf `optional` keyword, distinguishing
an intentionally "unset" value from a literal `0`.

**`BidiReadObjectRequest`** (`:446`): This is the payload sent by the client
over the stream:

```rust
pub struct BidiReadObjectRequest {
    /// Sent exactly once per connection.
    /// Subsequent messages omit this.
    /// Resent during a reconnect.
    pub read_object_spec: Option<BidiReadObjectSpec>,
    /// The ranges to read; the user can ask for any number of chunks at any time.
    pub read_ranges: Vec<ReadRange>,
}
```

The data shape encodes the **streaming grammar**: the spec is an `Option` since
the user sends it **only once** per connection (the first message has `Some`,
subsequent ones send `None`, as seen in `worker.rs:204`; a reconnect will
re-send the enriched spec). The ranges are `repeated`, meaning the user can ask
for any number of chunks at any time.

**`BidiReadObjectResponse`** (`:463`): This is what the server streams back to
the client:

```rust
pub struct BidiReadObjectResponse {
    /// The raw bytes, tagged by their read-id.
    pub object_data_ranges: Vec<ObjectRangeData>,
    /// Object metadata.
    pub metadata: Option<Object>,
    /// A token returned by the server to identify the open read session;
    /// the client passes this back during reconnects to resume the stream.
    pub read_handle: Option<BidiReadHandle>,
}
```

## D. The Channels

The implementation uses multiple sender-receiver channel pairs.

| Name                                   | Carries                  | Direction            | Created At                            | `tx` Held By                                 | `rx` Held By                                | Purpose                                                                    |
| -------------------------------------- | ------------------------ | -------------------- | ------------------------------------- | -------------------------------------------- | ------------------------------------------- | -------------------------------------------------------------------------- |
| **Read-Request Channel**               | `ActiveRead`             | descriptor -> worker | `bidi/transport.rs:44`                | descriptor (`Self.tx`) + each reader (clone) | worker (`requests`)                         | Carries new `ActiveRead` intents from user code to the worker task.        |
| **Byte Channels** (one per read range) | `Result<Bytes>`          | worker -> one reader | `bidi/transport.rs:82` and `:100`     | worker (inside each `ActiveRead`)            | that specific reader (`RangeReader`)        | Data path delivering the raw object bytes back to a specific range reader. |
| **Wire-Out**                           | `BidiReadObjectRequest`  | client -> server     | `connector.rs:185` (preloaded `:186`) | worker (`Connection.tx`)                     | gRPC (as the request stream)                | Outbound network stream sending gRPC requests to the GCS server.           |
| **Wire-In**                            | `BidiReadObjectResponse` | server -> client     | returned from `client.start`          | n/a (managed by tonic)                       | worker (`Connection.rx`, tonic `Streaming`) | Inbound network stream receiving gRPC responses from the GCS server.       |

### Detailed Breakdown per Channel

#### 1. Read-Request Channel (Descriptor -> Worker)

| Property         | Detail                                                                                                                                                                                                                                           |
| :--------------- | :----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| **Type**         | `tokio::sync::mpsc::Sender<ActiveRead>` / `tokio::sync::mpsc::Receiver<ActiveRead>`                                                                                                                                                              |
| **Created**      | `bidi/transport.rs:44`, with a capacity of `100`.                                                                                                                                                                                                |
| **`tx` Held By** | The `ObjectDescriptorTransport` (as its `tx` field) and cloned into each `RangeReader`.                                                                                                                                                          |
| **`rx` Held By** | The worker task, held locally inside `run` (named `requests`).                                                                                                                                                                                   |
| **Purpose**      | Whenever `descriptor.read_range(...)` is called, it pushes a new `ActiveRead` onto this channel. Branch B of the worker's `run` loop drains it via `recv_many`.                                                                                  |
| **Lifetime**     | Exactly one exists per `open_object` call. It lives as long as the descriptor itself or any active reader remains. When all `Sender` clones are dropped, Branch B observes `r == 0` (`bidi/worker.rs:89`), signaling the worker to exit cleanly. |

#### 2. Byte Channels (Worker -> Reader, One *Per Range*)

| Property         | Detail                                                                                                                                                                                                                                              |
| :--------------- | :-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| **Type**         | `tokio::sync::mpsc::Sender<ReadResult<Bytes>>` / `tokio::sync::mpsc::Receiver<ReadResult<Bytes>>`                                                                                                                                                   |
| **Created**      | `bidi/transport.rs:82` inside `map_ranges` (for ranges established during connection) and at `bidi/transport.rs:100` for post-open `read_range` calls. Capacity is `100`.                                                                           |
| **`tx` Held By** | The `ActiveRead` corresponding to that specific range (stored in the worker's `self.ranges` HashMap).                                                                                                                                               |
| **`rx` Held By** | The `RangeReader` for that range (wrapped inside the `ReadObjectResponse` returned to the user).                                                                                                                                                    |
| **Purpose**      | When the worker receives object data from the server tagged with a specific read-id, it looks up the corresponding `ActiveRead` in its HashMap and pushes the bytes into that range's byte channel. The reader extracts them by awaiting `.next()`. |
| **Lifetime**     | One exists per active range. A `send_and_read(N)` call creates `N` byte channels. Every subsequent call to `read_range` adds one more.                                                                                                              |

#### 3. Wire-Out (Worker -> Tonic -> Server)

| Property         | Detail                                                                                                                                                                                                                                                                                             |
| :--------------- | :------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| **Type**         | `tokio::sync::mpsc::Sender<BidiReadObjectRequest>` / `tokio::sync::mpsc::Receiver<BidiReadObjectRequest>`                                                                                                                                                                                          |
| **Created**      | `connector.rs:185` (and preloaded at `:186`) during `connect_attempt`, with a capacity of `100`.                                                                                                                                                                                                   |
| **`tx` Held By** | The worker (stored in `Connection.tx`, unpacked into a local `tx` variable in `run`).                                                                                                                                                                                                              |
| **`rx` Held By** | Tonic, after being wrapped in a `ReceiverStream` and passed into `inner.streaming(...)` (`src/gax-internal/src/grpc.rs:198`).                                                                                                                                                                      |
| **Purpose**      | The worker pushes outbound `BidiReadObjectRequest` messages here. Tonic drains the receiver, serializes the messages with Prost, frames them, and pushes them down onto the HTTP/2 socket.                                                                                                         |
| **Lifetime**     | **One per connection attempt.** Whenever a reconnect occurs, a brand new pair is created, and the old one is discarded. (The opening message is preloaded via `tx.send(request.clone()).await` before the stream even starts, guaranteeing the server sees the spec on the very first DATA frame.) |

#### 4. Wire-In (Server -> Tonic -> Worker)

| Property          | Detail                                                                                                                                                                                                               |
| :---------------- | :------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| **Type**          | `tonic::Streaming<BidiReadObjectResponse>` (Note: this is tonic's custom stream type, not an mpsc pair, exposed via the `TonicStreaming` trait at `bidi.rs:38–49` for mockability).                                  |
| **Created**       | Returned directly from `inner.streaming(...)` (`src/gax-internal/src/grpc.rs:223`) and exposed via the `Connection` struct.                                                                                          |
| **Producer Side** | Owned and managed internally by Tonic as it buffers incoming HTTP/2 DATA frames.                                                                                                                                     |
| **Consumer Side** | Held by the worker (in `Connection.rx`, unpacked locally in `run`).                                                                                                                                                  |
| **Purpose**       | Branch A of the worker's loop calls `rx.next_message()` to pull responses one by one. It can return `Ok(Some(response))`, `Ok(None)` for a clean termination, or `Err(status)` for an error or a severed connection. |
| **Lifetime**      | **One per connection attempt.** A new connection guarantees a new wire-in stream.                                                                                                                                    |

### Counts at a Glance

If the user runs a `send_and_read` requesting `N` initial ranges on the very
first connection:

| Channel Type         | Count                                    | Survives Reconnect?                |
| -------------------- | ---------------------------------------- | ---------------------------------- |
| Read-Request Channel | 1                                        | Yes, survives without interruption |
| Byte Channels        | `N` (one for each range)                 | Yes, survive without interruption  |
| Wire-Out             | 1                                        | No, replaced entirely              |
| Wire-In              | 1 (technically a tonic stream, not mpsc) | No, replaced entirely              |

Every new `read_range` call adds precisely one new byte channel, leaving the
other counts unchanged. If the network drops and the worker successfully
reconnects, the user-facing read-request and byte channels survive without
interruption, while the wire-in and wire-out conduits are replaced entirely. In
other words, when a reconnect happens, only the network side gets rebuilt.

### The Overarching Pattern

- **Read-Request Channel:** The command path from "user code to worker."
- **Byte Channels:** The data path from "worker back to user code."
- **Wire-Out / Wire-In:** The transient network links managed via Tonic, totally
  disposable during a reconnect.

**Disambiguating the multiple `tx, rx` pairs in `map_ranges`:** At
`bidi/transport.rs:57`, the code passes `&tx` (the sender for the read-request
channel) into the function. The signature renames it to `requests` (`:76`).
Then, at `:82`, it creates a *new* channel `(tx, rx)` for the bytes, reusing the
exact same variable names. Consequently, inside the closure, `requests` is the
read-request sender (which gets cloned into each reader), and the new `tx` is
the byte channel sender (which is handed to the `ActiveRead`).

**Who holds what afterward:**

- **Descriptor:** The read-request channel `tx` (plus `object` and `headers`).
- **Worker:** The read-request channel `rx` (listening for new intents), the
  wire-out `tx` and wire-in stream (talking to the server), and a byte channel
  `tx` tucked inside each `ActiveRead` (pushing bytes to readers). Inside
  `worker.run` (`worker.rs:65`), the connection is destructured into `rx` and
  `tx`, again reusing the names.
- **Each Reader:** Its dedicated byte channel `rx` (to receive bytes) and a
  clone of the read-request channel `tx` (to poke the worker).

## E. The gRPC Transport Layer

The function `client.start` (`bidi.rs:69`) wraps the outbound `rx` channel into
a `ReceiverStream` and invokes `bidi_stream_with_status`
(`src/gax-internal/src/grpc.rs:198`). Let's peek inside:

| Source / Line                              | Expression / Call               | Description / Comment                     |
| :----------------------------------------- | :------------------------------ | :---------------------------------------- |
| `src/gax-internal/src/grpc.rs:212 -> :555` | `make_headers(…)`               | user-agent, x-goog-user-project, etc.     |
| `src/gax-internal/src/grpc.rs:213 -> :539` | `add_auth_headers(headers)`     | fetches the Bearer token                  |
| `src/gax-internal/src/grpc.rs:214`         | `MetadataMap::from_headers(…)`  | converts HTTP headers to gRPC metadata    |
| `src/gax-internal/src/grpc.rs:215`         | `tonic::Request::from_parts(…)` | packages metadata and the outbound STREAM |
| `src/gax-internal/src/grpc.rs:216`         | `ProstCodec::default()`         | the protobuf serializer/deserializer      |
| `src/gax-internal/src/grpc.rs:217`         | `self.inner.clone()`            | the underlying Tonic InnerClient at :71   |
| `src/gax-internal/src/grpc.rs:218`         | `inner.ready().await`           | waits for channel readiness               |
| `src/gax-internal/src/grpc.rs:223`         | `inner.streaming(req, path, …)` | **THE ACTUAL TONIC BIDI CALL**            |
| `src/gax-internal/src/grpc.rs:231`         | `Ok(result)`                    | returns the result                        |

Below `:223`, everything descends into the lower layers: Tonic -> Hyper ->
HTTP/2 -> raw socket.

**The Double `Result`:** Line `src/gax-internal/src/grpc.rs:206` returns
`Result<tonic::Result<…>>`.

- The **outer** result represents GAX-level errors (setup failures, header
  issues, auth errors, `ready` checks), propagated via `?`.
- The **inner** `tonic::Result` (which is `Result<_, tonic::Status>`) represents
  the actual outcome of the gRPC call.

While `bidi_stream` (`:168`) eagerly flattens the inner error using
`to_gax_error` (`:190`), `bidi_stream_with_status` deliberately preserves the
raw `Status`. This is crucial because **the Storage layer needs to extract
redirect details directly from the Status** (as documented at `:195–197`) so
that `connect_attempt` can gracefully handle `Err(status) -> handle_redirect`
(`connector.rs:211/230`).

## F. Trailers, Status, and Errors

gRPC trailers (such as `grpc-status`, `grpc-message`, and
`grpc-status-details-bin`) arrive **after** the message body. This
implementation never reads them as a raw metadata map; instead, they surface as
the **terminal result when reading the stream**.

Calling `next_message` (`bidi.rs:46`) invokes Tonic's `message()`, returning a
`Result<Option<T>, Status>`:

- `Ok(Some(msg))`: A normal data message (we're still inside the body).
- `Ok(None)`: A clean termination; the trailers reported `grpc-status: OK`.
- `Err(status)`: This indicates either an error trailer **or** a prematurely
  severed connection (Tonic synthesizes a `Status` for the latter). Because both
  scenarios look identical, the worker simply treats any `Err` by triggering a
  reconnect (`worker.rs:116`).

The user can parse the type signature as answering two questions: `Result` asks
"Did it end okay, or error?", while `Option` asks "Is there more data, or are we
done?".

The status's **details** (derived from the `grpc-status-details-bin` trailer)
are highly relevant. Tonic decodes them into `Status::details()`, and
`handle_redirect` (`redirect.rs:25`) uses Prost to decode them, extracting the
routing token and read handle into the spec (`:30–31`). Thus, trailer-borne
redirect payloads are the primary driver of reconnect routing.

**How the status reaches the user:** The function `to_gax_error` (imported from
`gaxi::grpc::from_status` at `redirect.rs:19` and called at `redirect.rs:36`)
converts the `tonic::Status` into a standard `gax::Error`, which `send()`
eventually returns. The user inspects this by reading `err.status().code` (for
example, the `open_object` test asserts that `s.code == Code::NotFound` at
`transport.rs:528`).

**Manually inspecting trailing metadata:** this is currently not done because
the codebase only cares about the status. If this is desired the user would need
to completely drain the body until `Ok(None)` and then invoke Tonic's
`Streaming::trailers()`. To implement it, the user would add a `trailers()`
method to the `TonicStreaming` trait (`bidi.rs:38`) and call it on the
clean-exit path after the worker's loop terminates (`worker.rs:74`/`:96`), while
`rx` is still fully owned.

## G. `ActiveRead` vs `ReadObjectResponse`

These are simply the two opposite ends of a single range's **byte channel**,
created in `map_range` (`transport.rs:93`) and `read_range`
(`transport.rs:113`).

**`ActiveRead`** (`active_read.rs:25`) is the **worker's** write-end and routes
data received over the network to its `ReadObjectResponse` counterpart.

```rust
pub(crate) struct ActiveRead {
    state: RemainingRange,                       // tracks how much of the range remains
    sender: Sender<ReadResult<bytes::Bytes>>,    // the WRITE end of the channel
}
```

- `handle_data` (`:41`): When a chunk arrives, it updates `state`, **verifies
  the crc32c checksum** (`:52–60`), and pushes the bytes.
- `as_proto(id)` (`:77`): Generates a proto range describing only the
  *remaining* bytes, ensuring a reconnect only asks for what hasn't been
  received yet (`active_read.rs:77`).
- `interrupted(error)` (`:67`): Pushes a terminal `Err` down the channel to
  intentionally fail the reader.

**`ReadObjectResponse`** (which wraps a `RangeReader`) is **the user's**
read-end. It simply holds the `Receiver`, yields sequential chunks via
`.next().await`, and contains no complex logic.

Because the channel inherently carries `Result<Bytes, ReadError>`, errors share
the exact same conduit as data. Thus, `reader.next().await` yields an
`Option<Result<Bytes>>` (`Some(Ok)` means a chunk, `Some(Err)` means failure,
and `None` means done). This is why, in
[step 11 of Part 1](#11-objectdescriptortransportnew--orchestration-part-2), the
`active` half is given to the worker while the `readers` half is handed back to
the user.

## H. The Two "Metadata"s

During the handshake inside `connect_attempt`, two distinct concepts are
confusingly both referred to as "metadata" just lines apart:

|                    | `headers` (`connector.rs:216`)                                        | `m.metadata` (`connector.rs:221`)          |
| ------------------ | --------------------------------------------------------------------- | ------------------------------------------ |
| **What it is**     | gRPC **response headers** (gRPC confusingly calls headers "metadata") | The **object's** actual storage properties |
| **Type**           | `MetadataMap` -> `HeaderMap`                                          | `Option<Object>`                           |
| **Extracted From** | The response **envelope** (head)                                      | The very first **body message**            |
| **Becomes**        | `descriptor.headers()`                                                | `descriptor.object()`                      |
| **Example**        | `content-type: application/grpc`, `x-guploader-uploadid`              | `generation: 123456`, `size: 42`           |

Think of it like an HTTP response: the envelope/headers represent transport
metadata wrapping the call itself, while the body carries the message stream.
The very first chunk in that body happens to carry the object's storage metadata
inside a field literally named `metadata`.

## I. The Worker's `run` Loop, Branch by Branch

The worker operates as the central hinge of the entire flow. It consumes
`ActiveRead`s from the read-request channel, pushes outbound messages to the
wire, processes returning responses, and routes them back to the user.

At `worker.rs:59`, after `Worker::new` initializes everything and the first
message is processed via `handle_response_success`, the worker is launched in
the background via `tokio::spawn(worker.run(connection, rx))`
(`bidi/transport.rs:63`). The `run` method functions as the perpetual background
pump.

### Skeleton of the Loop

```rust
pub async fn run(mut self, connection: Connection<C::Stream>, mut requests: Receiver<ActiveRead>) -> LoopResult<()> {
    let mut ranges = Vec::new();
    let (mut rx, mut tx) = (connection.rx, connection.tx);    // wire-in and wire-out
    let error = loop {
        tokio::select! {
            m = rx.next_message() => { ... }                   // :71  Branch A — Inbound
            r = requests.recv_many(&mut ranges, 16) => { ... } // :88  Branch B — Outbound
        }
    };
    drop(rx); drop(tx);
    let Some(e) = error else { return Ok(()); };
    while let Some(mut r) = requests.recv().await { ... }      // drain any straggling readers
    Err(e)
}
```

The `tokio::select!` macro races both arms every single iteration, running
whichever resolves first. Since it runs in a single task, only one arm executes
per iteration (there is no parallel execution inside the worker). The internal
`loop` acts as an expression: whatever value it `break`s with gets assigned to
`error`, which the post-loop cleanup code evaluates.

There are three exit paths:

- `break None`: A clean shutdown (the loop yields `None`).
- `break Some(e)`: A fatal error occurred.
- No break: Keep looping indefinitely.

After breaking, a clean exit returns `Ok(())`. An error exit aggressively drains
any pending `read_range` calls that were queued too late, failing them all via
`r.interrupted(e.clone())` before returning `Err(e)`.

### Branch A — Inbound (Wire-In -> Routing / Reconnect)

Located at `worker.rs:71–87`:

```rust
m = rx.next_message() => {
    match self.handle_response(m).await {
        None => break None,
        Some(Err(e)) => break Some(e),
        Some(Ok(None)) => {},
        Some(Ok(Some(connection))) => {
            (rx, tx) = (connection.rx, connection.tx);
        }
    };
},
```

**`rx.next_message()`** is bound to the `TonicStreaming` trait
(`bidi.rs:38–48`), which exists primarily to facilitate mocking. In production,
it wraps Tonic's `Streaming::message()` in one line. One call strictly yields
one message, not a batch.

It returns `Result<Option<BidiReadObjectResponse>, tonic::Status>`, encoding
three states:

- `Ok(Some(response))` — a healthy data message.
- `Ok(None)` — a clean stream termination.
- `Err(status)` — a stream error (could be an error trailer or a broken socket,
  both synthesized into a `Status` by Tonic).

**`handle_response`** (`worker.rs:110–122`):

```rust
pub async fn handle_response(
    &mut self,
    message: TonicResult<Option<BidiReadObjectResponse>>,
) -> Option<LoopResult<Option<Connection<C::Stream>>>> {
    let response = match message.transpose()? {
        Ok(r) => r,
        Err(status) => return self.reconnect(status).await,
    };
    match self.handle_response_success(response).await {
        Err(e) => Some(Err(e)),
        Ok(_) => Some(Ok(None)),
    }
}
```

`message.transpose()` flips `Result<Option<T>, E>` into `Option<Result<T, E>>`.
The trailing `?` unwraps the outer `Option`. If it encounters a `None`
(representing an `Ok(None)` clean stream end), it early-returns `None` out of
`handle_response`, which the run loop intercepts with its `None => break None`
arm.

If it receives `Err(status)`, it immediately routes to
`self.reconnect(status).await`. If it gets `Ok(Some(r))`, it passes it down to
`self.handle_response_success(r).await`.

`handle_response_success` (`:124`) hands off to `handle_ranges`, which loops
over `handle_range_data` to route bytes via the `self.ranges` HashMap into the
correct byte channel. Routing failure yields `Some(Err(e))` (triggering a fatal
`break Some(e)` in the run loop), while success yields `Some(Ok(None))`
(instructing the run loop to continue using the same connection).

The `reconnect` method (`:153`) iterates over every range in `self.ranges` and
executes `self.connector.reconnect(status, ranges)`. If the reconnect outright
fails, it yields a fatal `Some(Err(e))`. If it succeeds, it returns
`Some(Ok(Some(new_connection)))`, signaling the run loop to swap in the new wire
pair.

**The Four Match Arms in the Run Loop:**

| Arm                          | Meaning                                         | Action                            |
| ---------------------------- | ----------------------------------------------- | --------------------------------- |
| `None`                       | Clean termination of stream                     | `break None` — exit with success  |
| `Some(Err(e))`               | Fatal (corrupt data or total reconnect failure) | `break Some(e)` — exit with error |
| `Some(Ok(None))`             | Message routed successfully, connection healthy | fall through, continue loop       |
| `Some(Ok(Some(connection)))` | Reconnect succeeded, entirely new wire pair     | update `rx`/`tx`, continue loop   |

**The Reconnect Swap:**

```rust
(rx, tx) = (connection.rx, connection.tx);
```

Using destructuring assignment, both wire halves are atomically replaced in a
single line. On the next iteration, `rx.next_message()` reads from the new
stream, and `tx.clone()` uses the new sender.

### Branch B — Outbound (Read-Request Channel -> Wire-Out)

Located at `worker.rs:88–93`:

```rust
r = requests.recv_many(&mut ranges, 16) => {
    if r == 0 {
        break None;
    };
    self.insert_ranges(tx.clone(), std::mem::take(&mut ranges)).await;
},
```

**Understanding `recv_many`:** This performs a batched receive. There are two
distinct buffers at play here:

- The **channel's internal queue** (capacity `100`, configured at
  `bidi/transport.rs:44`). Senders continuously push into this queue while the
  worker is busy executing other tasks.
- The **Vec scratch buffer named `ranges`** (declared at `worker.rs:64`), passed
  here via mutable reference (`&mut`).

`recv_many(buf, max)` blocks until the queue holds at least one item, then
eagerly drains *up to* `max` items from the queue directly into `buf`, returning
the total count. If seven `read_range` calls queue up while the worker is busy
on Branch A, Branch B drains all seven simultaneously on its next pass. This
keeps latency low for the first item while boosting overall throughput.

The limit `16` restricts the drain count per cycle to keep the loop responsive
to incoming network data (Branch A). The `100` is the channel capacity,
providing backpressure headroom. These are parameters that can be tuned.

**`r == 0` — Graceful Shutdown:** This only evaluates to true when the channel
is **closed *and* empty**—meaning every single `Sender<ActiveRead>` clone has
been dropped *and* the internal queue is drained. Those clones reside in the
descriptor (`Self.tx`) and inside every `RangeReader` (`requests.clone()` at
`bidi/transport.rs:84`). Thus, this condition only triggers when the descriptor
and all active readers are destroyed. `break None` then exits the loop cleanly.
(A temporary lull in requests simply causes `recv_many` to sleep; it does *not*
return 0).

**Understanding `mem::take(&mut ranges)` (`worker.rs:92`)**:

`mem::take` functionally acts as `mem::replace(&mut v, T::default())`:

- It extracts and returns the current filled Vec.
- It leaves a brand new `T::default()` (which for a Vec is `Vec::new()`, a
  zero-capacity, allocation-free shell) in the slot.

It does **not** preserve the allocation between iterations. The memory
allocation rides along with the returned Vec into `insert_ranges` and is
deallocated when that Vec goes out of scope. The next iteration's `recv_many`
starts completely fresh with a zero-capacity Vec.

The reason for using `mem::take` here is **borrow-checker hygiene**. The
variable `ranges` must survive across the loop because the `recv_many` future
actively borrows `&mut ranges` each iteration. Moving the Vec out directly
(`insert_ranges(.., ranges)`) would leave the variable in an invalid, moved-from
state. `mem::take` safely swaps a fresh default in, satisfying the compiler in a
single expression.

**`tx.clone()` (`worker.rs:92`) Sidestepping Borrow Constraints:** Passing a
clone into `insert_ranges` allows that function to execute `.send(..).await`
without retaining a borrow of the outer `tx` variable across an `.await`
boundary. The worker's primary `tx` survives safely in the main `run` scope for
the next loop.

**Inside `insert_ranges` (`worker.rs:194–214`):**

```rust
async fn insert_ranges(&mut self, tx: Sender<BidiReadObjectRequest>, readers: Vec<ActiveRead>) {
    let mut ranges = Vec::new();                  // local proto buffer (distinct from outer `ranges`!)
    for r in readers {
        let id = self.next_range_id;              // :197  mint a fresh, monotonic id
        self.next_range_id += 1;
        let request = r.as_proto(id);             // :200  generate the proto for the *remaining* range
        self.ranges.lock().await.insert(id, r);   // :201  register the ActiveRead in the routing HashMap
        ranges.push(request);                     // :202  append the range proto to the outgoing wire message
    }
    let request = BidiReadObjectRequest {
        read_ranges: ranges,
        ..BidiReadObjectRequest::default()        // spec: None — it was already sent
    };
    if let Err(e) = tx.send(request).await {      // :211  ship it; failure is non-fatal
        tracing::error!("error sending read range request: {e:?}");
    }
}
```

For every single `ActiveRead`:

1. **Mint a fresh id.** `self.next_range_id` increments monotonically; ids are
   never reused over the worker's life. No atomics are required since
   `&mut self` provides exclusive access.
1. **Build the proto.** `r.as_proto(id)` extracts the read's current
   `RemainingRange`, meaning reconnects re-request only unreceived bytes
   (`active_read.rs:77`).
1. **Register in the HashMap.** Acquiring the `tokio::sync::Mutex`
   (`self.ranges.lock().await.insert(id, r)`) moves `r` into the routing table
   under its `id`, releasing the lock immediately at statement end.
1. **Collect the proto** into `ranges`.

After processing the loop, all protos are batched into a single
`BidiReadObjectRequest`. The `..BidiReadObjectRequest::default()` syntax fills
the remaining fields, most importantly setting `read_object_spec: None` - the
spec is only sent during the initial handshake or a full reconnect.

**Insert-Before-Send** Crucially, the HashMap insert (`:201`) happens *before*
the `tx.send` (`:211`). If the send succeeds, the routing is already prepped. If
the send fails (because the wire-out is dead), the entry safely remains in the
HashMap. When Branch A notices the dead connection, it triggers `reconnect`
(`worker.rs:153`) which scoops up every range from the HashMap (`:157–163`) and
re-requests them in the new connection's opening message.

The governing pattern: **The HashMap is durable; the network wire is
disposable.** Reads secure a spot in the HashMap first, wire delivery is
inherently best-effort, and reconnect logic re-sends anything the network
dropped. No read is ever lost.

### The `read_id` Round Trip — How Multiplexing Works

The `id` is **opaque to the server**, which merely echoes back whatever the SDK
sends.

```
SDK side                             
─────────────────────────────────────────────────────────────────────────────────
worker.rs:197
  id = self.next_range_id++
worker.rs:201
  ranges.insert(id, ActiveRead)       <- HashMap entry secured
worker.rs:200
  r.as_proto(id)                      <- ProtoRange { read_id: 7, ... }

...Server returns read result...      <- ObjectRangeData {
                                            read_range: { read_id: 7, // echoed back },
                                            checksummed_data: { ... }
                                        }
worker.rs:216 handle_range_data
  ranges.lock().get_mut(&7)           <- Looks up by id -> finds correct byte channel
```

This is how multiplexing multiple concurrent reads over a single connection
works. Every chunk is tagged with its `read_id`, and the worker's HashMap
operates like a tiny switchboard routing the returned payload.

### The Inbound Pipeline in Depth

Here's what happens when Branch A receives a successful message:

```
rx.next_message()        ->  handle_response(m)
                                 |
                                 v (Ok(Some(r)))
                            handle_response_success(r)
                                 |
                                 v
                            handle_ranges(data)
                                 |
                                 v (for each chunk)
                            handle_range_data(ranges, chunk)
                                 |
                                 v
                            ActiveRead::handle_data  ->  Handler::send  (lock-free)
```

On an `Err(status)`, it detours to `reconnect`, which might invoke
`close_readers` if the reconnect fails entirely.

#### `handle_response_success` (worker.rs:124)

```rust
pub async fn handle_response_success(
    &mut self,
    response: BidiReadObjectResponse,
) -> LoopResult<()> {
    if let Err(e) = self.handle_ranges(response.object_data_ranges).await {
        // An error in the response. These are not recoverable.
        let error = Arc::new(e);
        self.close_readers(error.clone()).await;
        return Err(error);
    }
    Ok(())
}
```

This method takes a single response and routes its `object_data_ranges` to
`handle_ranges`, ignoring `metadata` and `read_handle` since those were purely
for the initial handshake. If the routing step fails, it wraps the error in an
`Arc`, fires `close_readers` to broadcast the failure to every registered
reader, and returns an `Err`. The comment "not recoverable" is vital: if the
server sends fundamentally corrupted *data*, a reconnect can't fix it
(reconnects only fix broken *wires*).

`handle_response_success` is called in two places: inside `handle_response` for
every standard data message, and synchronously in
`ObjectDescriptorTransport::new` (`bidi/transport.rs:59`) for the handshake's
first message, prior to the worker being spawned.

#### `handle_ranges` (worker.rs:137)

```rust
async fn handle_ranges(&self, data: Vec<ObjectRangeData>) -> crate::Result<()> {
    let mut result = Ok(());
    for response in data {
        if let Err(e) = Self::handle_range_data(self.ranges.clone(), response).await {
            result = result.and(Err(e));     // lock onto the first error, but keep draining
        }
    }
    result.map_err(crate::Error::io)
}
```

This acts as the per-message dispatcher. It routes each `ObjectRangeData` piece
to `handle_range_data`. `result.and(Err(e))` idiom captures **only the first**
error it encounters while ignoring subsequent ones, ensuring the loop finishes
draining without an abrupt early return (bad responses are rare, so tracking
multiple errors simply isn't worth the complexity overhead). Finally, `map_err`
converts the internal `ReadError` into the public `crate::Error`.

#### `handle_range_data` (worker.rs:216)

```rust
async fn handle_range_data(
    ranges: Arc<Mutex<HashMap<i64, ActiveRead>>>,
    response: ObjectRangeData,
) -> ReadResult<()> {
    let range = response.read_range.ok_or(... "missing range")?;
    let handler = if response.range_end {
        let mut pending = ranges.lock().await.remove(&range.read_id).ok_or(...)?;
        pending.handle_data(response.checksummed_data, range, true)?
    } else {
        let mut guard = ranges.lock().await;
        let pending = guard.get_mut(&range.read_id).ok_or(...)?;
        pending.handle_data(response.checksummed_data, range, false)?
    };
    handler.send().await;
    Ok(())
}
```

This is an associated function (no `self`) that takes the shared routing table
and a single chunk. It splits into two paths based on the `range_end: bool`
flag:

| `range_end` Flag           | HashMap Operation      | Fate of Entry                          | Lock Scope                                          |
| -------------------------- | ---------------------- | -------------------------------------- | --------------------------------------------------- |
| `true` (final chunk)       | `lock().remove(...)`   | **Removed** entirely (owned)           | Drops at the semicolon, *before* `handle_data` runs |
| `false` (most of the time) | `lock(); get_mut(...)` | **Stays** in HashMap (borrowed `&mut`) | Held exclusively through the body of `handle_data`  |

**The Lock-Scope Trick:** `handle_data` (`active_read.rs:41`) is a synchronous
function (`fn`, not `async fn`), so careful management of the lock guarding it
is necessary. In the first path, the `remove` gives us ownership, so the lock
guard drops immediately at the semicolon, allowing `handle_data` to run
completely lock-free. In the second path, `pending` is borrowed directly from
the guard, meaning the lock *must* be held while `handle_data` runs.

After the `if/else` block finishes, the guard is fully released in both
branches, allowing the async `handler.send().await` to execute lock-free.

**The `Handler` Decoupling:** This design relies entirely on the `Handler` type
(`active_read.rs:83`). `handle_data` doesn't send bytes; it returns a `Handler`
representing what needs to be sent. This separates the handling process into a
locked "state update" phase (`handle_data`) and a lock-free "delivery" phase
(`handler.send()`), meaning that a slow reader with a full channel can never
stall the central routing table.

**Automatic Cleanup:** Path 1's `remove` keeps the HashMap lean by removing
completed reads from the routing table. No separate "deregister" step is
required.

#### `reconnect` (worker.rs:153) — Three Paths Out

```rust
async fn reconnect(&mut self, status: Status) -> Option<LoopResult<Option<Connection<C::Stream>>>> {
    let ranges = /* collects every active range from the HashMap */;
    let (response, _headers, connection) = match self.connector.reconnect(status, ranges).await {
        Err(e) => {
            let error = Arc::new(e);
            self.close_readers(error.clone()).await;
            return Some(Err(error));                  // Path 1: connector completely gave up
        }
        Ok(t) => t,
    };
    if let Err(e) = self.handle_ranges(response.object_data_ranges).await {
        let error = Arc::new(e);
        self.close_readers(error.clone()).await;
        return Some(Err(error));                      // Path 2: first new message was corrupted
    }
    Some(Ok(Some(connection)))                        // Path 3: successful reconnect
}
```

There are three outcomes mapped to two return shapes:

| Path | Scenario                                    | Returns                      | Run Loop Arm                    |
| ---- | ------------------------------------------- | ---------------------------- | ------------------------------- |
| 1    | `connector.reconnect` exhausted retries     | `Some(Err(e))`               | `Some(Err(e)) => break Some(e)` |
| 2    | Reconnected, but first message was bad      | `Some(Err(e))`               | `Some(Err(e)) => break Some(e)` |
| 3    | Reconnected, first message routed perfectly | `Some(Ok(Some(connection)))` | Swaps `rx`/`tx` and continues   |

The `Some(Ok(Some(connection)))` that triggers the run loop's wire-swapping arm
(`worker.rs:83`) is generated here. Note that both failure paths invoke
`close_readers` *before* returning, ensuring all readers are proactively
notified of the failure before the run loop unwinds.

#### `close_readers` (worker.rs:183) + `ActiveRead::interrupted`

```rust
async fn close_readers(&mut self, error: Arc<crate::Error>) {
    use futures::StreamExt;
    let mut guard = self.ranges.lock().await;
    let closing = futures::stream::FuturesUnordered::new();
    for (_, active) in guard.iter_mut() {
        closing.push(active.interrupted(error.clone()));
    }
    let _ = closing.count().await;
    guard.clear();
}
```

```rust
// active_read.rs:67
pub(super) async fn interrupted(&mut self, error: Arc<crate::Error>) {
    if let Err(e) = self
        .sender
        .send(Err(ReadError::UnrecoverableBidiReadInterrupt(error)))
        .await
    {
        tracing::error!("cannot notify reader (dropped?) about: {e:?}");
    }
}
```

The goal here is simple: push a terminal error to every registered reader, then
clear the routing table.

**The `FuturesUnordered` Pattern:** The call `active.interrupted(error.clone())`
just *returns* a future without running it. The loop pushes them all into a
`FuturesUnordered` collection, and `closing.count().await` drives them all
concurrently. This guarantees that one slow reader cannot block the termination
of the others, which could happen with a `for ... { ... await }` loop.

**Two-Step Termination per Reader:**

1. `interrupted` pushes an
   `Err(ReadError::UnrecoverableBidiReadInterrupt(Arc<Error>))` into the byte
   channel.
1. `guard.clear()` drops all `ActiveRead` objects, which inherently drops their
   `Sender`s, ultimately closing the channels.

The reader experiences: any buffered `Ok(Bytes)`, followed by the `Err`
(explaining *why*), and finally `None` (signaling *it's over*).

**Why `Arc<Error>`?** The error resides inside the channel message and lives on
wherever the reader stores it. A standard reference `&Error` would dangle, and
deep-cloning an error `N` times is a massive waste of resources. `Arc::clone`
provides a cheap atomic bump, giving us an owned, shared error ideal for
broadcasting.

## J. The Range-Type Stack

A read range's single byte channel is handled by a group of five specialized
types. Together, they handle the various responsibilities of *"The user
requested bytes, the user wants them streamed, and reconnects cannot lose
progress"*.

### Cheat Sheet

| Type              | Layer                                                                                                      | Lifetime                             | Owner                                   |
| ----------------- | ---------------------------------------------------------------------------------------------------------- | ------------------------------------ | --------------------------------------- |
| `RequestedRange`  | "What the user asked for" (pre-resolution)                                                                 | Immutable after creation             | Wrapped in `RemainingRange::Requested`  |
| `NormalizedRange` | "What we are actively tracking" (post-first-chunk)                                                         | Mutates dynamically as bytes arrive  | Wrapped in `RemainingRange::Normalized` |
| `RemainingRange`  | State machine governing the two above                                                                      | One per `ActiveRead` (`state` field) | `ActiveRead.state`                      |
| `ActiveRead`      | State machine for a given read request + the byte-channel **write end** for bytes received from the server | One per range, living in the HashMap | Worker (`self.ranges`)                  |
| `RangeReader`     | Byte-channel **read end** for bytes received from the server + keepalive signal                            | One per range                        | The user (inside `ReadObjectResponse`)  |

Two invariants tie this system together:

1. **One range maps to one byte channel.** `ActiveRead` and `RangeReader` are
   created together (`bidi/transport.rs:82–85`) and linked by a `(tx, rx)` pair.
1. **The state machine is strictly unidirectional:** `Requested` permanently
   pivots to `Normalized` upon receiving the first chunk.

### `RequestedRange` — The User's Request

From `model_ext.rs:291–296`:

```rust
#[derive(Clone, Copy, Debug, PartialEq)]
pub(crate) enum RequestedRange {
    Offset(u64),                          // "From byte N onward"
    Tail(u64),                            // "The final N bytes"
    Segment { offset: u64, limit: u64 },  // "N bytes beginning at offset M"
}
```

These are user-friendly units (positive `u64`), mapping to GCS's three native
read shapes.

**The `ReadRange` Newtype** (`model_ext.rs:171`):

```rust
pub struct ReadRange(pub(crate) RequestedRange);
```

The inner field is **pub(crate)**. This wrapper provides four benefits:

1. **API Stability:** The internal enum can safely evolve without breaking
   downstream applications.
1. **Validated Factories:** Users must construct them correctly:
   - `ReadRange::offset(...)`
   - `ReadRange::tail(...)`
   - `ReadRange::head(...)`
   - `ReadRange::segment(...)`
   - `ReadRange::all()`
1. **Obscured Proto Idioms:** Users ask for `ReadRange::tail(100)` without ever
   knowing that it transmits as `read_offset: -100` on the wire.
1. **Rich Documentation:** Each constructor function gets its own dedicated
   rustdoc and runnable example.

#### `as_proto` — Wire Encoding and Routing

From `bidi/requested_range.rs:24–42`:

```rust
pub fn as_proto(&self, id: i64) -> ProtoRange {
    match self {
        Self::Offset(o)                 => ProtoRange { read_id: id, read_offset:  *o as i64, ..default() },
        Self::Tail(o)                   => ProtoRange { read_id: id, read_offset: -(*o as i64), ..default() },
        Self::Segment { offset, limit } => ProtoRange { read_id: id, read_offset: *offset as i64, read_length: *limit as i64 },
    }
}
```

Two things to note:

- **`Tail` maps to a negative offset**, matching the GCS protobuf convention for
  "from the end."
- **`read_length: 0` is the sentinel for "to EOF"**, utilized by `Offset` and
  `Tail`.

**Why not implement `From<RequestedRange>` to do the proto conversion?** A
standard `From::from` takes only one argument, leaving no room to pass the
**`id`** (the routing key generated at `worker.rs:197–198`). `as_proto` handles
serialization and routing tagging simultaneously.

### `NormalizedRange` — Keeping track of progress

From `normalized_range.rs:40–44`:

```rust
pub struct NormalizedRange {
    offset: i64,
    length: Option<i64>,
}
```

Under the hood, ranges are normalized into:

- **`offset`**: The absolute byte offset.
- **`length`**: `Some(n)` for exact remaining bytes, or `None` representing an
  unbounded read ("to EOF").

#### `NormalizedRange::update`

Located at `normalized_range.rs:91`, this function executes every single time a
chunk arrives:

```rust
pub fn update(&mut self, response: ProtoRange) -> ReadResult<()> {
    let update = NormalizedRange::from_proto(response)?;            // 1. parse the incoming chunk's range
    if update.offset != self.offset {
        return Err(ReadError::bidi_out_of_order(...));              // 2. enforce in-order delivery
    }
    self.length = match (self.length, update.length) {              // 3. update the remaining bytes-to-read
        (None, _)                                       => None,
        (Some(l), None)                                 => Some(l),
        (Some(expected), Some(got)) if got <= expected  => Some(expected - got),
        (Some(expected), Some(got))                     => return Err(LongRead { ... }),
    };
    self.offset = update.offset + update.length().unwrap_or_default(); // 4. advance the cursor
    Ok(())
}
```

After `update` returns, `(offset, length)` describes the bytes still pending.
This makes the state self-describing: after a reconnect, calling `as_proto`
emits the correct "resume from here" range.

The length match (step 3) broken down:

| `self.length`                      | `update.length` | Result                 | Meaning                                                    |
| ---------------------------------- | --------------- | ---------------------- | ---------------------------------------------------------- |
| `None`                             | anything        | `None`                 | An unbounded request stays unbounded.                      |
| `Some(l)`                          | `None`          | `Some(l)`              | Chunk reported zero size — leave the expectation untouched |
| `Some(expected)`, `got ≤ expected` | `Some(got)`     | `Some(expected - got)` | Normal case — subtract the delta.                          |
| `Some(expected)`, `got > expected` | `Some(got)`     | `LongRead` error       | Server overran the limit — bail.                           |

`unwrap_or_default()` in step 4 yields the inner value when `Some`, or
`i64::default() == 0` otherwise. So if `update.length` is `None`, the cursor
stays put.

##### A Worked Example

Say the user requested `Segment { offset: 1000, limit: 500 }` on a 10,000-byte
object, so the state starts at `offset: 1000, length: Some(500)`. A chunk
arrives carrying `read_offset: 1000, read_length: 200`:

| Step | Operation                                                                 | State after                                  |
| ---- | ------------------------------------------------------------------------- | -------------------------------------------- |
| 1    | Parse the incoming range → `update = { offset: 1000, length: Some(200) }` | (no change to `self` yet)                    |
| 2    | Check `update.offset == self.offset` (1000 == 1000)                       | OK                                           |
| 3    | Subtract length: `Some(500) - Some(200)``Some(300)`                    | `self.length = Some(300)`                    |
| 4    | Advance cursor: `self.offset = 1000 + 200 = 1200`                         | `self = { offset: 1200, length: Some(300) }` |

```
byte:   0      1000   1200          1500              10000
        ├───────┼──────┼─────────────┼──────────────────┤
                xxxxxxx╞═════════════╡
                received  what's left to receive
```

Call `as_proto` at this moment and the user gets
`{read_offset: 1200, read_length: 300}` — precisely the re-request payload a
reconnect needs.

### `RemainingRange` — The State Machine

From `remaining_range.rs:27–31`:

```rust
#[derive(Clone, Copy, Debug, PartialEq)]
pub enum RemainingRange {
    Requested(RequestedRange),     // Prior to the very first response
    Normalized(NormalizedRange),   // Following the first response
}
```

This enum handles the two situations where a reconnect might occur:

- **Before the server has sent a single chunk**, requiring us to re-send the
  original read request.
- **Mid-stream**, requiring us to send the normalized read request.

When the first chunk is processed, the `update` method (`:34`) switches the enum
from `Requested` to `Normalized`, applies the chunk's delta, and commits the
state. From that point on, everything flows into the simpler `Normalized`
pathway.

### `ActiveRead` — The Worker's Handle

From `active_read.rs:24–28`:

```rust
#[derive(Debug)]
pub(crate) struct ActiveRead {
    state: RemainingRange,                       // ← The self-healing state machine
    sender: Sender<ReadResult<bytes::Bytes>>,    // ← The byte channel's write end
}
```

This represents the server-facing end of a single range, residing in the
worker's HashMap. Both fields survive a reconnect untouched.

### `RangeReader` — The User's Handle

From `range_reader.rs:26–33`:

```rust
#[derive(Debug)]
pub struct RangeReader {
    inner: Receiver<Result<bytes::Bytes, ReadError>>,
    object: Arc<Object>,
    // Unused, holding to a copy prevents the worker task from terminating
    // early.
    _tx: Sender<ActiveRead>,
}
```

**The `_tx` Keepalive Trick:** The `_tx` field is an intentional dummy
reference. Since the worker uses the "all senders dropped" condition of the
read-request channel as its signal to shut down (`worker.rs:88–93`), holding a
clone here keeps the worker alive.

## K. Redirects and the Resilience Machinery

Sometimes, the server decides to redirect a bidirectional read mid-stream. The
machinery that handles this lives across:

- `redirect.rs`
- `retry_redirect.rs`
- `resume_redirect.rs`
- `connector.rs` (specifically the `Connector` struct)

Let's map it out.

### The Protocol — What a Redirect Actually Looks Like

Google Cloud Storage doesn't use a dedicated "redirect" gRPC status code.
Instead, redirects are signalled using a `tonic::Status` error that carries a
structured payload in its details:

```
tonic::Status {
    code:    Code::Aborted,                    // It looks like a permanent failure at first glance
    message: "...",
    details: <bytes that decode to google.rpc.Status>
        RpcStatus {
            details: [
                Any::<BidiReadObjectRedirectedError> {
                    routing_token: Option<String>,    // The new value to use for x-goog-request-params
                    read_handle:   Option<...>,       // A server-issued resume handle
                }
            ]
        }
}
```

This represents two layers of "status" (reminiscent of the
[two metadatas pattern](#h-the-two-metadatas)):

- `tonic::Status`: The transport envelope containing the code, message, and raw
  bytes.
- `google.rpc.Status`: The rich protobuf error payload.
  `RpcStatus::decode(tonic_status.details())` opens the envelope.

**Why use `Aborted`?** A standard retry policy generally treats `Aborted` as a
permanent failure to avoid hammering a dead endpoint. By hijacking this
"permanent" code, GCS efficiently signals, "This specific stream is dead—but
here is exactly where the user should connect next." A specialized decorator
pattern then intentionally overrides that "give up" verdict.

### `redirect.rs` — Recognize and React

This file houses two distinct functions:

**`handle_redirect(spec, status)`** (`redirect.rs:25`): This function extracts
the redirect information from the status and updates the shared
`Arc<Mutex<BidiReadObjectSpec>>` with the new routing token and resume handle.
This guarantees that any subsequent reconnect attempt will carry the new routing
token.

It always returns the GAX-converted error, regardless of whether a redirect was
actually found (acting as a safe no-op for standard errors).

**`is_redirect(&error)`** (`redirect.rs:40`): This is a pure predicate function
with zero side effects. It checks four conditions in order:

- The error code must be `Aborted`.
- The error must wrap a `tonic::Status`.
- The details must cleanly decode as a `google.rpc.Status`.
- At least one detail must be a `BidiReadObjectRedirectedError`.

**Why split this into two functions?**

- `handle_redirect` operates on the wire boundary, updating state for the *next*
  attempt. It is called in three places within `connector.rs` during error paths
  where the raw `tonic::Status` is still available:
  - `connector.rs:213` (Early dial failure)
  - `connector.rs:230` (First-message handshake failure)
  - `connector.rs:120` (Mid-stream worker failure)
- `is_redirect` operates inside policy decorators where the error is already a
  `gax::Error`; it decides whether there *will be* a next attempt by overriding
  the policy's verdict. It is called purely inside policy decorators:
  - `retry_redirect.rs:47` (`RetryRedirect::on_error`, used by the inner retry
    loop)
  - `resume_redirect.rs:45` (`ResumeRedirect::on_error`, used directly by
    `connector.reconnect`)

### The Decorator Pattern

Both `RetryRedirect` and `ResumeRedirect` utilize the exact same shape:

```rust
fn on_error(&self, state: &..., error: Error) -> RetryResult {
    match self.inner.on_error(state, error) {
        RetryResult::Permanent(e) if is_redirect(&e) => RetryResult::Continue(e),
        result => result,
    }
}
```

This pattern **selectively overrides `Permanent` to `Continue` strictly for
redirect errors**. It respects `Exhausted` (if the attempt budget is tapped out,
redirects halt), and it respects genuine `Permanent` errors (like auth
failures).

Decorators are mandatory here because they inject logic directly *inside* the
external `retry_loop` (from the `gax` crate), which is only accessible through
the `RetryPolicy` trait.

### End-to-End Redirect Trace & The Durable-State Principle

To put it all together, let's trace a redirect mid-stream.

Imagine three concurrent reads are happily humming along. Here is the internal
state the instant before the redirect fires — one read from `send_and_read`, two
added via `read_range`, and the worker parked in its `select!`:

```
Spec (Arc<Mutex<BidiReadObjectSpec>>)
    bucket:        "projects/_/buckets/my-bucket"
    object:        "my-object"
    generation:    789               // resolved by the server at the first handshake
    read_handle:   Some(handle_v1)   // server-issued at the first handshake
    routing_token: None              // no redirect yet

Worker routing HashMap (self.ranges)
    { 0: ActiveRead { state: Normalized { offset: 9700, length: None      }, sender: <byte_tx_a> },
      1: ActiveRead { state: Normalized { offset: 2500, length: None      }, sender: <byte_tx_b> },
      2: ActiveRead { state: Normalized { offset: 5800, length: Some(200) }, sender: <byte_tx_c> } }

Worker
    connector.reconnect_attempts: 0
    rx: <Streaming v1>   tx: <Sender v1>      // current wire halves
    parked in tokio::select! at worker.rs:70
```

Now, phase by phase:

1. **Interruption.** GCS closes the stream with
   `tonic::Status { code: Aborted, details: <BidiReadObjectRedirectedError { routing_token: Some("rt-v2"), read_handle: Some(handle_v2) }> }`.
   Tonic surfaces it as the next value from `rx.next_message()`.

1. **Detection.** Branch A inside `Worker::run` (`worker.rs:71`) resolves to
   `m = Err(status)`. In `Worker::handle_response` (`worker.rs:110`),
   `message.transpose()?` takes the
   `Err(status) => return self.reconnect(status).await` arm
   (`worker.rs:114–117`), invoking `Worker::reconnect`.

1. **Snapshot progress.** `Worker::reconnect` (`worker.rs:153`) maps every range
   to `ActiveRead::as_proto(*id)` (`worker.rs:157–163`). Because
   `NormalizedRange::update` advanced each cursor as chunks arrived, the protos
   ask only for what is still outstanding:

   ```
   [ ProtoRange { read_id: 0, read_offset: 9700, read_length: 0   },   // open tail
     ProtoRange { read_id: 1, read_offset: 2500, read_length: 0   },   // open offset
     ProtoRange { read_id: 2, read_offset: 5800, read_length: 200 } ]  // bounded segment
   ```

   No already-received byte is re-requested.

1. **Mutate the spec.** `Connector::reconnect` (`connector.rs:113`) calls
   `handle_redirect(self.spec.clone(), status)` (`connector.rs:120`), which
   decodes the redirect detail and writes it through:

   ```
   routing_token: Some("rt-v2")     // updated
   read_handle:   Some(handle_v2)   // updated   (all other fields unchanged)
   ```

1. **Policy verdict: continue.** `Connector::reconnect` increments
   `self.reconnect_attempts` (`connector.rs:122`), then
   `ResumeRedirect::on_error` runs (`connector.rs:124–128`). The inner policy
   sees `Aborted` and returns `Permanent`; the decorator calls `is_redirect`
   (true) and overrides it to `Continue`, so `Connector::reconnect` calls
   `Connector::connect(ranges)` (`connector.rs:125`).

1. **Reconnect with the updated spec (Connector).** The `Connector::connect`
   (`connector.rs:84`) retry loop fires `Connector::connect_attempt`
   (`connector.rs:140`), which uses the now-updated spec in the opening message
   and builds the routing header from it:

   ```
   x-goog-request-params: bucket=projects/_/buckets/my-bucket&routing_token=rt-v2
   ```

   A fresh `(tx_v2, rx_v2)` pair is created (`connector.rs:185`), the opening
   message preloaded (`connector.rs:186`), and `client.start(...)`
   (`connector.rs:200`) opens a new stream. The first message is read
   (`connector.rs:218`) and the spec re-enriched (`connector.rs:220–226`).

1. **First response data (Worker).** Back in `Worker::reconnect`
   (`worker.rs:153`), any range bytes the server packed into the handshake go
   through `Worker::handle_ranges` (`worker.rs:173`) just like a mid-stream
   message — looked up by `read_id` in the same HashMap, pushed down the same
   byte channels, cursors advanced. The `Worker::reconnect` method returns
   `Some(Ok(Some(connection_v2)))` (`worker.rs:180`).

1. **Atomic wire swap.** The matching arm of Branch A inside `Worker::run` runs
   `(rx, tx) = (connection.rx, connection.tx)` (`worker.rs:84`). The dead v1
   wires drop; the v2 wires take their place. The HashMap is untouched.

1. **Resume.** The `Worker::run` loop re-enters `select!` on the new wires.
   Chunks arriving on `rx_v2` carry the same `read_id`s, route to the same
   `ActiveRead`s, and flow down the same user-facing channels. The user's parked
   `reader.next().await` wakes and yields the next chunk — no error, no panic,
   just a brief latency pause.

### The Durable / Disposable Split

The architecture works because of the following separation of concerns:

- **The Durable Layer (Survives Reconnects):**
  - **The Spec** (holds identity and routing): `self.spec` in `Connector`
    (`connector.rs:63`).
  - **The HashMap** (holds state machines): `self.ranges` in `Worker`
    (`worker.rs:32`).
  - **The user-facing read-request channel** (receives new read requests):
    `requests` in `Worker::run` (`worker.rs:62`).
  - **The user-facing byte channels** (transfers data bytes to reader): `sender`
    in `ActiveRead` (`active_read.rs:27`).
- **The Disposable Layer (Discarded on Reconnect):**
  - **The wire-in stream** (receives messages from GCS): `rx` field in
    `Connection` (`connector.rs:36`).
  - **The wire-out sender** (sends requests to GCS): `tx` field in `Connection`
    (`connector.rs:35`).
  - **The connection wrapper** (groups stream and channel): `Connection` struct
    (`connector.rs:34`).