# `open_object` — A Walkthrough
*Last updated: 2026-05-27 (commit 30d28f9a5)*
This guide offers a walkthrough of `Storage::open_object` in the
`google-cloud-storage` crate. We'll look at what it actually does under the
hood, trace through each layer of the stack, see what remains running after the
call returns, and dive deep into reference sections for the trickier components.
All file paths here are relative to the `google-cloud-rust/` repository root,
and any line numbers correspond to the current state of the source code on disk.
## How This Document Was Created
This doc was written with AI assistance.
1. First, I used the following prompt to create an initial draft.
> I want to understand `open_object`. Starting with a call to
> `Client::open_object`, trace execution all the way until the function
> returns. Pay particular attention to `src/storage/bidi` as that is where I
> believe the bulk of the code lies. Ground every citation by annotating
> every code snippet and claim with file name and line numbers.
1. Next, I asked AI to teach the walkthrough by breaking it down into small,
conversational chunks. This made it easier to digest the material, and also
made the job of double checking and challenging the AI much easier. With the
source code open beside me, I challenged the AI often, asking it to explain,
substantiate, or revise its claims. Whenever I felt appropriate, I asked the
AI to update the walkthrough doc with its updated understanding.
1. Once this was done and I had a reasonable understanding of `open_object`, I
got the AI to clean up its work. I prompted it to clean up its bombastic
language (e.g. "the spectacle of the `ActiveRead`s operating in perfect
synchrony with the user-facing `ReadObjectResponse`s is truly a marvel to
behold" became "the `ActiveRead`s push the newly received chunk of data to
the corresponding `ReadObjectResponse`"), check its file sources/line
numbers, tighten its prose, etc. This was done several times, clearing the
context each iteration. Doing so halved the length of the document.
I did this by alternating between a "review" prompt:
> You are an expert in Rust, the google-cloud-rust repository, and a highly
> skilled technical writer. Here is a document written to understand the
> `open_object` flow. I suspect that it is poorly written and contains
> numerous factual inaccuracies. Please read the document and tell me all the
> ways the doc is inaccurate. Check every single code citation rigorously,
> ensuring the cited line numbers and files are correct. Check whether the
> document accurately represents the flow. Tell me all the ways the doc
> misrepresents the code.
and a "rewriting" prompt:
> Please rewrite the document so that the language is clean, precise, and
> direct. Prefer simple sentence structures and clean language.
1. I naively assumed that correcting the AI in conversation would result in it
saving the corrected knowledge to the document. I was wrong - human review of
the final was still necessary. To make this task more manageable, the
document was checked in piecemeal.
1. Over time, the underlying code may change. I run this prompt periodically to
keep the walkthrough up to date:
> You are an expert in Rust and the `google-cloud-rust` codebase. Update the
> `open_object` walkthrough to match the latest implementation on disk. Trace
> execution from `Client::open_object` through `src/storage/bidi` until
> completion. Check every single code citation rigorously, ensuring the cited
> line numbers and files are correct. Check whether the document accurately
> represents the flow. Tell me all the ways the doc misrepresents the code.
## Table of Contents
- [Mental Model: A Persistent Object Handle](#mental-model-a-persistent-object-handle)
- [Files Involved](#files-involved)
- [End-to-End Call Graph](#end-to-end-call-graph)
- [Part 1 - The Linear Execution Trace](#part-1---the-linear-execution-trace)
- [1. `open_object()` Returns a Request Builder — Lazy, No I/O](#1-open_object-returns-a-request-builder--lazy-no-io)
- [2. `.send()` / `.send_and_read()`](#2-send--send_and_read)
- [3. The Stub Trait — The Mock Seam](#3-the-stub-trait--the-mock-seam)
- [4. The Transport Routes on Tracing](#4-the-transport-routes-on-tracing)
- [5. `open_object_plain` — Four Crucial Lines](#5-open_object_plain--four-crucial-lines)
- [6. `into_parts` — Splitting the Request](#6-into_parts--splitting-the-request)
- [7. `Connector::new` — Armed but Not Fired](#7-connectornew--armed-but-not-fired)
- [8. `ObjectDescriptorTransport::new` — Orchestration (Part 1: Prep)](#8-objectdescriptortransportnew--orchestration-part-1-prep)
- [9. `connect()` — The Retry and Self-Heal Loop](#9-connect--the-retry-and-self-heal-loop)
- [10. `connect_attempt` — Establishing the Connection](#10-connect_attempt--establishing-the-connection)
- [11. `ObjectDescriptorTransport::new` — Orchestration (Part 2)](#11-objectdescriptortransportnew--orchestration-part-2)
- [12. The Climb Back Up](#12-the-climb-back-up)
- [13. After `send()` / `send_and_read()` Returns](#13-after-send--send_and_read-returns)
- [Part 2 — Reference and Deep Dives](#part-2--reference-and-deep-dives)
- [A. The Client Type and the Stub Seam](#a-the-client-type-and-the-stub-seam)
- [B. `ObjectDescriptor` Anatomy](#b-objectdescriptor-anatomy)
- [C. The Wire Types](#c-the-wire-types)
- [D. The Channels](#d-the-channels)
- [E. The gRPC Transport Layer](#e-the-grpc-transport-layer)
- [F. Trailers, Status, and Errors](#f-trailers-status-and-errors)
- [G. `ActiveRead` vs `ReadObjectResponse`](#g-activeread-vs-readobjectresponse)
- [H. The Two "Metadata"s](#h-the-two-metadatas)
- [I. The Worker's `run` Loop, Branch by Branch](#i-the-workers-run-loop-branch-by-branch)
- [J. The Range-Type Stack](#j-the-range-type-stack)
- [K. Redirects and the Resilience Machinery](#k-redirects-and-the-resilience-machinery)
## Mental Model: A Persistent Object Handle
`open_object` isn't a one-off "fetch data" request - it opens a **bidirectional
gRPC streaming RPC** (`google.storage.v2.Storage/BidiReadObject`) that enables
concurrent, repeated reads. When the user makes a single call to `open_object`,
here is what happens:
1. **Initiate the connection:** It establishes a single, long-lived
bidirectional stream to Google Cloud Storage (GCS).
1. **Server responds with object metadata:** The server identifies itself, and
the very first message it sends back contains the object's metadata.
1. **The stream stays open:** A background **`Worker` task** is spawned to own
and manage the stream for its entire lifetime.
1. **The user gets a handle:** The user receives an `ObjectDescriptor`, which
acts as the user's handle. The user can use it to request multiple
byte-ranges over time, and all these requests are multiplexed over that
single underlying stream.
In some ways this resembles a pre-cell phone era phone call:
| The open line | The bidirectional gRPC stream |
| The operator keeping it alive | The spawned `Worker` task |
| The user's handset wire to the operator | The **read-request channel** (which carries `ActiveRead` requests) |
| A dedicated answer line per request | The per-range **byte channels** (which carry `Result<Bytes>`) |
| The operator's memory of who the user dialed | The `Arc<Mutex<BidiReadObjectSpec>>` (used for redials/reconnects) |
| The handle in the user's hand | The `ObjectDescriptor` |
By the time `open_object` returns, three distinct things are alive: the gRPC
stream itself, the detached worker task running in the background, and the
user's descriptor handle (which communicates with the worker over a channel).
## Files Involved
Here's a quick map of the relevant files and the layers they represent:
| Public client & `open_object` method | `src/storage/src/storage/client.rs` |
| `DefaultStorage` alias | `src/storage/src/lib.rs` |
| Request builder (`OpenObject`) | `src/storage/src/storage/open_object.rs` |
| App request type & `into_parts` | `src/storage/src/model_ext/open_object_request.rs` |
| Stub trait (mock seam) | `src/storage/src/storage/stub.rs` |
| Default implementation (routing & orchestration) | `src/storage/src/storage/transport.rs` |
| Public `ObjectDescriptor` | `src/storage/src/object_descriptor.rs` |
| Descriptor stub trait & dynamic bridge | `src/storage/src/storage/bidi/stub.rs` |
| Connect and reconnect logic (the real RPC) | `src/storage/src/storage/bidi/connector.rs` |
| Descriptor transport (orchestration) | `src/storage/src/storage/bidi/transport.rs` |
| Background stream pump | `src/storage/src/storage/bidi/worker.rs` |
| Per-range read state | `src/storage/src/storage/bidi/active_read.rs` |
| Redirect handling | `src/storage/src/storage/bidi/redirect.rs` |
| `Client` trait / tonic glue | `src/storage/src/storage/bidi.rs` |
| Generic gRPC client | `src/gax-internal/src/grpc.rs` |
| Generic retry loop | `src/gax/src/retry_loop_internal.rs` |
| Generated protobufs | `src/storage/src/generated/protos/storage/google.storage.v2.rs` |
## End-to-End Call Graph
Here is the high-level execution trace:
| `client.open_object(b, o)` | `client.rs:271` | build `OpenObject` (no I/O happens here) |
| -> `.send()` | `open_object.rs:66` | go async, call the stub |
| -> `stub::open_object` | `stub.rs:76` | trait seam (perfect for mocking) |
| -> `transport open_object` | `transport.rs:318` | tracing enabled? -> routes to `_plain` |
| -> `open_object_plain` | `transport.rs:219` | `into_parts` + `Connector` + `Transport::new` |
| -> `into_parts` | `open_object_request.rs:150` | splits request -> (spec, ranges) |
| -> `Connector::new` | `connector.rs:75` | armed, still no I/O |
| -> `ObjectDescriptorTransport::new` | `bidi/transport.rs:34` | **ORCHESTRATION BEGINS** |
| -> `(read-request channel)` | `bidi/transport.rs:44` | |
| -> `connect` -> `connect_attempt` | `connector.rs:84,140` | OPENS STREAM, reads 1st msg |
| -> `client.start` | `bidi.Connector::connect` | |
| -> `bidi_stream_with_status` | `src/gax-internal/src/grpc.rs:198` | |
| -> `inner.streaming` | `src/gax-internal/src/grpc.rs:223` | \<- tonic / hyper / h2 / socket |
| -> `map_ranges` | `bidi/transport.rs:74` | creates byte channels |
| -> `Worker::new` + `spawn(run)` | `bidi/transport.rs:58,63` | detaches background pump |
| \<- `(ObjectDescriptor, readers)` | | Return values |
### Sequence Diagram
```mermaid
sequenceDiagram
autonumber
participant App
participant Client
participant Stub
participant Transport
participant Bidi as Bidi Transport
participant Conn as Connector
participant gRPC
App->>Client: open_object(bucket, object)
Note right of Client: client.rs:271<br/>build OpenObject, no I/O
Client-->>App: OpenObject builder
App->>Client: .send()
Note right of Client: open_object.rs:66
Client->>Stub: open_object()
Note right of Stub: stub.rs:76<br/>mock seam
Stub->>Transport: open_object()
Note right of Transport: transport.rs:318<br/>routes to _plain
Transport->>Transport: open_object_plain()
Note right of Transport: transport.rs:219
Transport->>Transport: into_parts()
Note right of Transport: splits request (spec, ranges)
Transport->>Conn: Connector::new()
Note right of Conn: connector.rs:75<br/>armed, still no I/O
Transport->>Bidi: ObjectDescriptorTransport::new()
Note right of Bidi: bidi/transport.rs:34<br/>ORCHESTRATION BEGINS
Bidi->>Bidi: create read-request channel (line 44)
Bidi->>Conn: connect() -> connect_attempt()
Note right of Conn: connector.rs:84,140
Conn->>gRPC: client.start() -> bidi_stream_with_status()
Note right of gRPC: grpc.rs:198<br/>OPENS STREAM, reads 1st msg
Bidi->>Bidi: map_ranges() byte channels (line 74)
Bidi->>Bidi: Worker::new() + spawn(run)
Note right of Bidi: lines 58,63<br/>detaches background pump
Bidi-->>Transport: (ObjectDescriptor, readers)
Transport-->>App: ObjectDescriptor
```
### Flowchart
```mermaid
graph TD
A["client.open_object (client.rs:271)<br/><i>build OpenObject (no I/O)</i>"] --> B[".send() (open_object.rs:66)"]
B --> C["stub::open_object (stub.rs:76)<br/><i>trait seam (perfect for mocking)</i>"]
C --> D["transport open_object (transport.rs:318)<br/><i>tracing enabled? routes to _plain</i>"]
D --> E["open_object_plain (transport.rs:219)"]
E --> F["into_parts (open_object_request.rs:150)<br/><i>splits request -> (spec, ranges)</i>"]
E --> G["Connector::new (connector.rs:75)<br/><i>armed, still no I/O</i>"]
E --> H["ObjectDescriptorTransport::new (bidi/transport.rs:34)<br/><b>ORCHESTRATION BEGINS</b>"]
H --> I["(read-request channel) (bidi/transport.rs:44)"]
H --> J["connect -> connect_attempt (connector.rs:84,140)"]
J --> K["client.start -> bidi_stream_with_status<br/><i>src/gax-internal/src/grpc.rs:198</i><br/><b>OPENS STREAM, reads 1st msg</b>"]
H --> L["map_ranges (byte channels) (bidi/transport.rs:74)"]
H --> M["Worker::new() + spawn(run) (bidi/transport.rs:58,63)<br/><i>detaches background pump</i>"]
H -.-> N["Return: (ObjectDescriptor, readers)"]
```
# Part 1 - The Linear Execution Trace
Let's walk through the execution step-by-step.
## 1. `open_object()` Returns a Request Builder — Lazy, No I/O
We start in `client.rs:271`:
```rust
pub fn open_object<B, O>(&self, bucket: B, object: O) -> OpenObject<S>
```
The body of this function is just a single line at `:276`:
`OpenObject::new(self.stub.clone(), bucket, object, self.options.clone())`. It
clones `self.options` so that per-request overrides don't mutate the underlying
client.
Here's the definition of the `OpenObject` struct (`open_object.rs:43`):
```rust
pub struct OpenObject<S = crate::storage::transport::Storage> {
stub: Arc<S>,
request: OpenObjectRequest,
options: RequestOptions,
}
```
(If you are wondering what `S` is, check out **Reference A**.)
The returned `OpenObject` is a request builder which provides the following
setters:
- **Request fields:**
- `set_generation` (`:142`)
- `set_if_generation_match` (`:162`)
- `set_if_generation_not_match` (`:186`)
- `set_if_metageneration_match` (`:209`)
- `set_if_metageneration_not_match` (`:232`)
- `set_key` (`:256`, used for CSEK)
- **Options:** behavior can be configured with:
- `with_retry_policy` (`:284`)
- `with_backoff_policy` (`:308`)
- `with_retry_throttler` (`:338`)
- `with_read_resume_policy` (`:365`)
- `with_attempt_timeout` (`:396`, which defaults to 60s)
- `with_user_agent` (`:415`)
- `with_quota_project` (`:439`)
Up to this point, there has been zero network activity.
## 2. `.send()` / `.send_and_read()`
The action starts when the user invokes `send()` at `open_object.rs:66`:
```rust
pub async fn send(self) -> Result<ObjectDescriptor> {
let (descriptor, _) = self.stub.open_object(self.request, self.options).await?; // :67
Ok(descriptor) // :68
}
```
This method calls the stub—finally initiating network activity—and notably
**discards the `Vec<ReadObjectResponse>` readers** by using `_`.
There's also a sibling method, `send_and_read` (`:91`). This method pushes a
single range first (`:95`) and retains the single reader returned by the stub
(`:98`). It enforces that only one reader exists via `unreachable!` (`:102`).
This is the preferred path for an "open plus first read in a single round trip"
operation.
## 3. The Stub Trait — The Mock Seam
The `open_object` trait method called on the stub is defined in `stub.rs:76`:
```rust
fn open_object(&self, _request, _options)
-> impl Future<Output = Result<(Descriptor, Vec<ReadObjectResponse>)>> + Send {
unimplemented_stub::<(Descriptor, Vec<ReadObjectResponse>)>() // :82 → :104 unimplemented!()
}
```
`self.stub` is typed as `Arc<S: stub::Storage>`. The stub is provided to make it
easier to provide a mock implementation for testing. (See **Reference A** for
more details.)
## 4. The Transport Routes on Tracing
The production trait implementation is located inside `transport.rs:318` (which
is part of `impl super::stub::Storage for Storage`, starting at `:267`):
```rust
async fn open_object(&self, request, options) -> Result<(...)> {
if self.tracing { return self.open_object_tracing(request, options).await; } // :323
self.open_object_plain(request, options).await // :326
}
```
If tracing is enabled, `open_object_tracing` (`:231`) wraps the call. It sets up
a `client_request` span tagged with
`google.storage.v2.Storage/BidiStreamingRead` (`:245`), forwards to the `_plain`
variant, and wraps the resulting descriptor and readers with tracing decorators
(`:253–263`). For this walkthrough, we'll follow the `_plain` path.
> ⚠️ *A quick note on naming:* There are two structs named `Storage`. One is the
> **client** `Storage<S>` (`client.rs:92`), and the other is the **transport**
> `Storage` (`transport.rs:49`, which is aliased to `DefaultStorage`). Here,
> `self` refers to the transport.
## 5. `open_object_plain` — Four Crucial Lines
In `transport.rs:219`, the core logic boils down to four lines:
```rust
let (spec, ranges) = request.into_parts(); // :224 decompose the request into request spec + desired ranges
let connector = Connector::new(spec, options, self.inner.grpc.clone()); // :225 specify the connection parameters
let (transport, readers) =
ObjectDescriptorTransport::new(connector, ranges).await?; // :226 establish the connection and send the initial RPC
Ok((ObjectDescriptor::new(transport), readers)) // :227 wrap the I/O mechanism & return handles to the incoming byte streams
```
Line `:226` is where the network is finally touched.
(Note `self.inner.grpc` references the shared gRPC client attached to
`StorageInner`.)
## 6. `into_parts` — Splitting the Request
Looking at `open_object_request.rs:150`:
```rust
pub(crate) fn into_parts(mut self) -> (BidiReadObjectSpec, Vec<ReadRange>) {
let ranges = std::mem::take(&mut self.ranges); // :151
(BidiReadObjectSpec::from(self), ranges) // :152
}
```
`std::mem::take` swaps the `ranges` Vec out of the request with any empty Vec,
so that `self` remains whole. This allows `self` to be consumed by `From::from`
on the next line.
(`let ranges = self.ranges` would have resulted in a partial move of `self`,
preventing it from being passed into `BidiReadObjectSpec::from`.)
**Why do we split the request?** The `spec` represents the *connection identity
and conditions* (which are long-lived and resent on every reconnect). The
`ranges` represent the *work* (which is transient; more ranges can be added
later via `read_range`). Consequently, they are routed differently: `spec` goes
to the `Connector` (`:225`), while `ranges` go directly to the transport
(`:226`). This cleanly mirrors the protobuf wire format, where the request
message has distinct `read_object_spec` and `read_ranges` fields (see
**Reference C**).
## 7. `Connector::new` — Armed but Not Fired
In `connector.rs:75`, the connector wraps the `spec` in an `Arc<Mutex<...>>`
(`:77`), stores the options and client, and initializes `reconnect_attempts` to
`0` (`:80`). There's **no I/O** happening here. The struct definition (`:62`)
looks like this:
```rust
pub struct Connector<T = GrpcClient> {
spec: Arc<Mutex<BidiReadObjectSpec>>, // mutable, shared identity
options: RequestOptions,
client: T, // generic for mocking; real = gaxi GrpcClient
reconnect_attempts: u32,
}
```
The connector's job is: *"Establishes (and reconnects) bidi streaming reads."*
(`:45`).
Handling reconnects inherently requires state. The connector must remember the
generation, read handle, routing token, and the number of attempt counts. We use
an `Arc<Mutex>` because the spec is **mutated** after creation (the server fills
in the generation, handle, and token) and must be **shared** across both the
connect retry loop and the worker's reconnect path.
## 8. `ObjectDescriptorTransport::new` — Orchestration (Part 1: Prep)
Moving to `bidi/transport.rs:34`, we do some prep work before establishing the
connection:
```rust
let (tx, rx) = tokio::sync::mpsc::channel(100); // :44 new read request channel
and invokes `bidi_stream_with_status` (refer to **Reference E**).
Note that if a connection is successfully established, `read_object_spec` is
updated to contain the object's generation and RPC read handle
(`connector.rs:221-226`). `reconnect()` uses `connect()` under the hood, and so
`:147` ensures that any reconnect attempt uses this enriched information.
**Phase B — The Handshake:**
```rust
let response = match response { Ok(r) => r, Err(status) => return Err(handle_redirect(spec, status)) }; // :211
let (metadata, mut stream, _) = response.into_parts(); // :216 metadata=transport headers
let headers = metadata.into_headers(); // :217
match stream.next_message().await { // :218 read the FIRST message
Ok(Some(m)) => {
let mut guard = spec.lock()...; // :220 ENRICH the spec:
if let Some(g) = m.metadata.as_ref().map(|o| o.generation) { guard.generation = g; } // :222
if m.read_handle.is_some() { guard.read_handle = m.read_handle.clone(); } // :225
Ok((m, headers, Connection::new(tx, stream))) // :227
}
Ok(None) => Err(Error::io("...closed before start")), // :229
Err(status) => Err(handle_redirect(spec, status)), // :230
}
```
It is required that the first message carries the object metadata (this is
strictly enforced in step 11). The writes at `:222` and `:225` represent the
**enrichment** phase. If the app initiated the request with `generation: 0`
(meaning "latest"), we capture the generation number returned by the server,
alongside the read handle to make future reconnects cheaper.
This block returns `(initial_response, headers, connection)`. This matches
exactly what `connect()` outputs, and what the `connector.connect(...)` call in
step 11 returns.
> *Clarification:* Don't confuse the two uses of the word "metadata" here. The
> first `metadata` that is turned `into_headers` represents the **transport**
> metadata (the gRPC response headers). On the other hand, `m.metadata` is the
> actual **object** metadata (the `Object` protobuf). See **Reference H** for
> details.
## 11. `ObjectDescriptorTransport::new` — Orchestration (Part 2)
Returning to `bidi/transport.rs:51`, armed with our
`(initial, headers, connection)`:
```rust
let (mut initial, headers, connection) = connector.connect(proto_ranges).await?; // :51
let object = Arc::new(object); // :56
let (active, readers) = Self::map_ranges(requested_ranges, &tx, &object); // :57
let mut worker = super::worker::Worker::new(connector, active); // :58
worker.handle_response_success(initial).await.map_err(Error::io)?; // :59–62
let _handle = tokio::spawn(worker.run(connection, rx)); // :63
Ok((Self { object, headers, tx }, readers)) // :64–71
```
Let's break down this crucial block:
- **Lines `:52–56`:** We use `initial.metadata.take()` - the `Option` equivalent
of `mem::take` - stealing the field while allowing `initial` to be reused at
`:59`. We wrap the resulting `Object` in an `Arc` because it needs to be
shared between the descriptor and every reader.
- **Line `:57`:** `map_ranges` (defined at `:74`, with the per-range channel
created at `:82`) mints exactly one **byte channel per range**. This yields an
`ActiveRead` (the sender side, kept by the worker) and a `RangeReader` wrapped
as a `ReadObjectResponse` (the receiver side, handed to the user). See
**Reference D** and **Reference G**.
- **Line `:58`:** We instantiate the worker. The `connector` is **moved in**
here. Because the worker now completely owns the connector, it can freely call
`connector.reconnect(...)` from within its detached task later on. Inside the
worker, `active` is transformed into a `HashMap` mapping read ids to
`ActiveRead`s (`worker.rs:38–43`).
- **Lines `:59–62`:** We immediately feed the worker the first message's
`object_data_ranges`. This happens **synchronously**. For a standard `send()`,
this might be a no-op, but for `send_and_read`, it delivers the very first
chunk of data instantly.
- **Line `:63`:** The worker is handed off to be executed asynchronously via
`tokio::spawn` and is completely **detached** (we deliberately drop the
handle). It now has full ownership of the connection and will run its loop
until the stream concludes naturally, an unrecoverable error strikes, or the
descriptor's `tx` channel is dropped.
- **Lines `:64–71`:** Finally, we return the fully initialized transport
(`object`, `headers`, `tx`) along with the `readers`.
## 12. The Climb Back Up
The call stack unwinds cleanly:
| `ObjectDescriptorTransport::new` -> `(transport, readers)` | `bidi/transport.rs:64` | Hands the established connection and range readers back up |
| `open_object_plain: ObjectDescriptor::new(transport)` | `transport.rs:227` | Wraps the transport stub into the public type |
| `stub::open_object` | `transport.rs:326` | Hands the transport stub back up |
| **Either:** `.send()`: `let (descriptor, _) = …; Ok(descriptor)` | `open_object.rs:67` | Discards the readers |
| **Or:** `send_and_read()` | `open_object.rs:98` | Retains the descriptor plus the single reader |
The user is now successfully holding an `ObjectDescriptor` (analogous to a file
descriptor) which allows reads on the object.
## 13. After `send()` / `send_and_read()` Returns
When the dust settles, three entities remain alive and active:
1. The user's `ObjectDescriptor` (holding the `tx` to the worker).
1. The detached `Worker` task (owning the connection, the connector, and the
read-id HashMap).
1. The enriched spec, living safely behind its `Arc<Mutex>`.
When the user issues a new `ObjectDescriptor::read_range()`, a new byte channel
is minted and packaged into an `ActiveRead`. The `ActiveRead` is sent down the
read-request channel, is processed by Branch B of the worker loop, and gets
tagged with a fresh read-id. The worker issues a `BidiReadObjectRequest` to the
server, the server responds, the worker intercepts the chunks, looks up the
read-id in the HashMap, and pushes the bytes down the channel to the user via
the corresponding `RangeReader`, finally reaching the user through the
`ReadObjectResponse`.
# Part 2 — Reference and Deep Dives
## A. The Client Type and the Stub Seam
Let's look at `client.rs:92`:
```rust
pub struct Storage<S = crate::stub::DefaultStorage>
where S: crate::stub::Storage + 'static
{ stub: std::sync::Arc<S>, options: RequestOptions }
```
- **`S`** represents the **stub type**—the actual engine that executes the RPCs.
It is bounded by the `stub::Storage` trait we first saw back in step 3.
- **Default `S = DefaultStorage`**: `DefaultStorage` is simply a handy alias for
the transport (`pub use crate::storage::transport::Storage as DefaultStorage;`
in `lib.rs:122`). This means if the user doesn't explicitly specify otherwise,
`S` resolves to the real network transport (`transport::Storage`).
- **Why make it generic?** It's all about testing. The user can instantiate
`Storage<MockStub>` to exercise the entire client chain against a mocked
backend. The `from_stub` method (`client.rs:130`) exists explicitly for this
purpose. Since `S` is a type parameter rather than a `dyn` trait object,
method dispatch remains **static**—no virtual table overhead.
To use an analogy: if `open_object` is the act of placing a phone call, the
`stub` is the phone itself. In production, the user has a real phone wired
directly to GCS. In tests, the user swaps it for a toy phone that plays
pre-recorded responses.
## B. `ObjectDescriptor` Anatomy
The `object_descriptor.rs` file defines the public handle the user receives.
Structurally, it's a **hollow newtype wrapped around a trait object** (`:58`):
```rust
pub struct ObjectDescriptor { inner: Arc<dyn ObjectDescriptorStub> }
```
- `Arc` ensures it is cheap to `Clone`. If the user clones the descriptor, both
copies share the same underlying open stream and worker task.
- `dyn` enables type erasure, letting the same handle wrap either the real
transport or a mock implementation.
- The three methods exposed to the user—`object()` (`:79`), `read_range()`
(`:112`), and `headers()` (`:133`)—simply forward their calls directly to
`self.inner`. All the real behavioral logic lives inside the inner
`ObjectDescriptorTransport`.
- The `new<T>` constructor (`:141`) takes any type `T: stub::ObjectDescriptor`
and wraps it in the `Arc<dyn>`. Conversely, `into_parts` (`:150`) extracts and
returns the raw `inner` object (which is how the tracing path unpacks it).
There are some excellent documentation notes at `:44–56`: the handle is
described as being *"analogous to a 'file descriptor'"*. It warns that reads
across different ranges have **no ordering guarantees**. Crucially, it
highlights a major footgun: if the user fails to drain data from one reader, its
buffer will fill up, which will **stall all other reads** sharing the stream.
Is `ObjectDescriptor` truly like a file descriptor? Yes, specifically one of the
**`pread` flavor**: the user opens it once, and then the user can issue multiple
positional `(offset, length)` reads. There is no internal seek pointer, and
dropping the descriptor is effectively closing it (as seen in `worker.rs:88`,
where the worker gracefully exits when the descriptor's `tx` channel
disappears). But it's also *more* powerful than a simple file descriptor: it
caches metadata in-process (`object()` doesn't require an I/O call), it is
**snapshot-pinned** to a specific object generation (`connector.rs:221`), and it
intelligently **self-heals** by reconnecting under the hood (`worker.rs:153`).
**The Two-Trait Pattern:** The user might notice that the trait used in the
`inner` field type (`:21`, `:59`) differs slightly from the trait bound on
`new<T>` (`:143`). Here is why:
- `crate::stub::ObjectDescriptor` (`stub.rs:92`): This is the **ergonomic**
trait featuring native `async fn`. However, native async traits aren't
dyn-compatible (object safe).
- `...bidi::stub::dynamic::ObjectDescriptor` (`bidi/stub.rs:24`): This is the
**dyn-safe** variant, utilizing `#[async_trait]` to return boxed futures. This
is what actually gets stored inside the `Arc<dyn>`.
- The bridge: At `bidi/stub.rs:31`, a blanket implementation automatically
implements the dyn-safe trait for any type `T` that implements the ergonomic
`stub::ObjectDescriptor`. This means implementors get the clean, modern API,
while the library handles the messy type erasure internally.
## C. The Wire Types
Let's look at the data structures carrying the requests and responses.
**`OpenObjectRequest`** (`open_object_request.rs:30`): This is the user-friendly
container built by the application. It holds the bucket, object name,
generation, the four `if_*` preconditions, CSEK parameters, and the initial
`ranges`. It deliberately **omits** the `read_handle` and `routing_token` (as
noted in the comment at `:26`), because those values are strictly supplied by
the server.
**`BidiReadObjectSpec`** (`google.storage.v2.rs:410`): This is the generated
protobuf type representing the "which object and under what conditions" half of
the request. The fields can be grouped by where they originate:
- *App-supplied:*
- `bucket` (`:412`)
- `object` (`:414`)
- `generation` (`:416`)
- the four `if_*` conditions:
- `if_generation_match` (`:418`)
- `if_generation_not_match` (`:420`)
- `if_metageneration_match` (`:422`)
- `if_metageneration_not_match` (`:424`)
- `common_object_request_params` (`:426`)
- *Server-supplied (filled in by the connector):*
- `read_handle` (`:431`)
- `routing_token` (`:433`)
- The `read_mask` field (`:429`) is marked `#[deprecated]`.
- The `generation` field actually switches ownership: it starts as the
application's request (e.g., `0` for "latest"), but the connector overwrites
it with the concrete value resolved by the server (`connector.rs:221`),
effectively pinning the snapshot.
A note on the `#[prost(...)]` decoder: `tag = "N"` represents the wire field
number (names aren't transmitted, and tags may have gaps if fields were
deprecated). `Option<T>` maps to the protobuf `optional` keyword, distinguishing
an intentionally "unset" value from a literal `0`.
**`BidiReadObjectRequest`** (`:446`): This is the payload sent by the client
over the stream:
```rust
pub struct BidiReadObjectRequest {
/// Sent exactly once per connection.
/// Subsequent messages omit this.
/// Resent during a reconnect.
pub read_object_spec: Option<BidiReadObjectSpec>,
/// The ranges to read; the user can ask for any number of chunks at any time.
pub read_ranges: Vec<ReadRange>,
}
```
The data shape encodes the **streaming grammar**: the spec is an `Option` since
the user sends it **only once** per connection (the first message has `Some`,
subsequent ones send `None`, as seen in `worker.rs:204`; a reconnect will
re-send the enriched spec). The ranges are `repeated`, meaning the user can ask
for any number of chunks at any time.
**`BidiReadObjectResponse`** (`:463`): This is what the server streams back to
the client:
```rust
pub struct BidiReadObjectResponse {
/// The raw bytes, tagged by their read-id.
pub object_data_ranges: Vec<ObjectRangeData>,
/// Object metadata.
pub metadata: Option<Object>,
/// A token returned by the server to identify the open read session;
/// the client passes this back during reconnects to resume the stream.
pub read_handle: Option<BidiReadHandle>,
}
```
## D. The Channels
The implementation uses multiple sender-receiver channel pairs.
| **Read-Request Channel** | `ActiveRead` | descriptor -> worker | `bidi/transport.rs:44` | descriptor (`Self.tx`) + each reader (clone) | worker (`requests`) | Carries new `ActiveRead` intents from user code to the worker task. |
| **Byte Channels** (one per read range) | `Result<Bytes>` | worker -> one reader | `bidi/transport.rs:82` and `:100` | worker (inside each `ActiveRead`) | that specific reader (`RangeReader`) | Data path delivering the raw object bytes back to a specific range reader. |
| **Wire-Out** | `BidiReadObjectRequest` | client -> server | `connector.rs:185` (preloaded `:186`) | worker (`Connection.tx`) | gRPC (as the request stream) | Outbound network stream sending gRPC requests to the GCS server. |
| **Wire-In** | `BidiReadObjectResponse` | server -> client | returned from `client.start` | n/a (managed by tonic) | worker (`Connection.rx`, tonic `Streaming`) | Inbound network stream receiving gRPC responses from the GCS server. |
### Detailed Breakdown per Channel
#### 1. Read-Request Channel (Descriptor -> Worker)
| **Type** | `tokio::sync::mpsc::Sender<ActiveRead>` / `tokio::sync::mpsc::Receiver<ActiveRead>` |
| **Created** | `bidi/transport.rs:44`, with a capacity of `100`. |
| **`tx` Held By** | The `ObjectDescriptorTransport` (as its `tx` field) and cloned into each `RangeReader`. |
| **`rx` Held By** | The worker task, held locally inside `run` (named `requests`). |
| **Purpose** | Whenever `descriptor.read_range(...)` is called, it pushes a new `ActiveRead` onto this channel. Branch B of the worker's `run` loop drains it via `recv_many`. |
| **Lifetime** | Exactly one exists per `open_object` call. It lives as long as the descriptor itself or any active reader remains. When all `Sender` clones are dropped, Branch B observes `r == 0` (`bidi/worker.rs:89`), signaling the worker to exit cleanly. |
#### 2. Byte Channels (Worker -> Reader, One *Per Range*)
| **Type** | `tokio::sync::mpsc::Sender<ReadResult<Bytes>>` / `tokio::sync::mpsc::Receiver<ReadResult<Bytes>>` |
| **Created** | `bidi/transport.rs:82` inside `map_ranges` (for ranges established during connection) and at `bidi/transport.rs:100` for post-open `read_range` calls. Capacity is `100`. |
| **`tx` Held By** | The `ActiveRead` corresponding to that specific range (stored in the worker's `self.ranges` HashMap). |
| **`rx` Held By** | The `RangeReader` for that range (wrapped inside the `ReadObjectResponse` returned to the user). |
| **Purpose** | When the worker receives object data from the server tagged with a specific read-id, it looks up the corresponding `ActiveRead` in its HashMap and pushes the bytes into that range's byte channel. The reader extracts them by awaiting `.next()`. |
| **Lifetime** | One exists per active range. A `send_and_read(N)` call creates `N` byte channels. Every subsequent call to `read_range` adds one more. |
#### 3. Wire-Out (Worker -> Tonic -> Server)
| **Type** | `tokio::sync::mpsc::Sender<BidiReadObjectRequest>` / `tokio::sync::mpsc::Receiver<BidiReadObjectRequest>` |
| **Created** | `connector.rs:185` (and preloaded at `:186`) during `connect_attempt`, with a capacity of `100`. |
| **`tx` Held By** | The worker (stored in `Connection.tx`, unpacked into a local `tx` variable in `run`). |
| **`rx` Held By** | Tonic, after being wrapped in a `ReceiverStream` and passed into `inner.streaming(...)` (`src/gax-internal/src/grpc.rs:198`). |
| **Purpose** | The worker pushes outbound `BidiReadObjectRequest` messages here. Tonic drains the receiver, serializes the messages with Prost, frames them, and pushes them down onto the HTTP/2 socket. |
| **Lifetime** | **One per connection attempt.** Whenever a reconnect occurs, a brand new pair is created, and the old one is discarded. (The opening message is preloaded via `tx.send(request.clone()).await` before the stream even starts, guaranteeing the server sees the spec on the very first DATA frame.) |
#### 4. Wire-In (Server -> Tonic -> Worker)
| **Type** | `tonic::Streaming<BidiReadObjectResponse>` (Note: this is tonic's custom stream type, not an mpsc pair, exposed via the `TonicStreaming` trait at `bidi.rs:38–49` for mockability). |
| **Created** | Returned directly from `inner.streaming(...)` (`src/gax-internal/src/grpc.rs:223`) and exposed via the `Connection` struct. |
| **Producer Side** | Owned and managed internally by Tonic as it buffers incoming HTTP/2 DATA frames. |
| **Consumer Side** | Held by the worker (in `Connection.rx`, unpacked locally in `run`). |
| **Purpose** | Branch A of the worker's loop calls `rx.next_message()` to pull responses one by one. It can return `Ok(Some(response))`, `Ok(None)` for a clean termination, or `Err(status)` for an error or a severed connection. |
| **Lifetime** | **One per connection attempt.** A new connection guarantees a new wire-in stream. |
### Counts at a Glance
If the user runs a `send_and_read` requesting `N` initial ranges on the very
first connection:
| Read-Request Channel | 1 | Yes, survives without interruption |
| Byte Channels | `N` (one for each range) | Yes, survive without interruption |
| Wire-Out | 1 | No, replaced entirely |
| Wire-In | 1 (technically a tonic stream, not mpsc) | No, replaced entirely |
Every new `read_range` call adds precisely one new byte channel, leaving the
other counts unchanged. If the network drops and the worker successfully
reconnects, the user-facing read-request and byte channels survive without
interruption, while the wire-in and wire-out conduits are replaced entirely. In
other words, when a reconnect happens, only the network side gets rebuilt.
### The Overarching Pattern
- **Read-Request Channel:** The command path from "user code to worker."
- **Byte Channels:** The data path from "worker back to user code."
- **Wire-Out / Wire-In:** The transient network links managed via Tonic, totally
disposable during a reconnect.
**Disambiguating the multiple `tx, rx` pairs in `map_ranges`:** At
`bidi/transport.rs:57`, the code passes `&tx` (the sender for the read-request
channel) into the function. The signature renames it to `requests` (`:76`).
Then, at `:82`, it creates a *new* channel `(tx, rx)` for the bytes, reusing the
exact same variable names. Consequently, inside the closure, `requests` is the
read-request sender (which gets cloned into each reader), and the new `tx` is
the byte channel sender (which is handed to the `ActiveRead`).
**Who holds what afterward:**
- **Descriptor:** The read-request channel `tx` (plus `object` and `headers`).
- **Worker:** The read-request channel `rx` (listening for new intents), the
wire-out `tx` and wire-in stream (talking to the server), and a byte channel
`tx` tucked inside each `ActiveRead` (pushing bytes to readers). Inside
`worker.run` (`worker.rs:65`), the connection is destructured into `rx` and
`tx`, again reusing the names.
- **Each Reader:** Its dedicated byte channel `rx` (to receive bytes) and a
clone of the read-request channel `tx` (to poke the worker).
## E. The gRPC Transport Layer
The function `client.start` (`bidi.rs:69`) wraps the outbound `rx` channel into
a `ReceiverStream` and invokes `bidi_stream_with_status`
(`src/gax-internal/src/grpc.rs:198`). Let's peek inside:
| `src/gax-internal/src/grpc.rs:212 -> :555` | `make_headers(…)` | user-agent, x-goog-user-project, etc. |
| `src/gax-internal/src/grpc.rs:213 -> :539` | `add_auth_headers(headers)` | fetches the Bearer token |
| `src/gax-internal/src/grpc.rs:214` | `MetadataMap::from_headers(…)` | converts HTTP headers to gRPC metadata |
| `src/gax-internal/src/grpc.rs:215` | `tonic::Request::from_parts(…)` | packages metadata and the outbound STREAM |
| `src/gax-internal/src/grpc.rs:216` | `ProstCodec::default()` | the protobuf serializer/deserializer |
| `src/gax-internal/src/grpc.rs:217` | `self.inner.clone()` | the underlying Tonic InnerClient at :71 |
| `src/gax-internal/src/grpc.rs:218` | `inner.ready().await` | waits for channel readiness |
| `src/gax-internal/src/grpc.rs:223` | `inner.streaming(req, path, …)` | **THE ACTUAL TONIC BIDI CALL** |
| `src/gax-internal/src/grpc.rs:231` | `Ok(result)` | returns the result |
Below `:223`, everything descends into the lower layers: Tonic -> Hyper ->
HTTP/2 -> raw socket.
**The Double `Result`:** Line `src/gax-internal/src/grpc.rs:206` returns
`Result<tonic::Result<…>>`.
- The **outer** result represents GAX-level errors (setup failures, header
issues, auth errors, `ready` checks), propagated via `?`.
- The **inner** `tonic::Result` (which is `Result<_, tonic::Status>`) represents
the actual outcome of the gRPC call.
While `bidi_stream` (`:168`) eagerly flattens the inner error using
`to_gax_error` (`:190`), `bidi_stream_with_status` deliberately preserves the
raw `Status`. This is crucial because **the Storage layer needs to extract
redirect details directly from the Status** (as documented at `:195–197`) so
that `connect_attempt` can gracefully handle `Err(status) -> handle_redirect`
(`connector.rs:211/230`).
## F. Trailers, Status, and Errors
gRPC trailers (such as `grpc-status`, `grpc-message`, and
`grpc-status-details-bin`) arrive **after** the message body. This
implementation never reads them as a raw metadata map; instead, they surface as
the **terminal result when reading the stream**.
Calling `next_message` (`bidi.rs:46`) invokes Tonic's `message()`, returning a
`Result<Option<T>, Status>`:
- `Ok(Some(msg))`: A normal data message (we're still inside the body).
- `Ok(None)`: A clean termination; the trailers reported `grpc-status: OK`.
- `Err(status)`: This indicates either an error trailer **or** a prematurely
severed connection (Tonic synthesizes a `Status` for the latter). Because both
scenarios look identical, the worker simply treats any `Err` by triggering a
reconnect (`worker.rs:116`).
The user can parse the type signature as answering two questions: `Result` asks
"Did it end okay, or error?", while `Option` asks "Is there more data, or are we
done?".
The status's **details** (derived from the `grpc-status-details-bin` trailer)
are highly relevant. Tonic decodes them into `Status::details()`, and
`handle_redirect` (`redirect.rs:25`) uses Prost to decode them, extracting the
routing token and read handle into the spec (`:30–31`). Thus, trailer-borne
redirect payloads are the primary driver of reconnect routing.
**How the status reaches the user:** The function `to_gax_error` (imported from
`gaxi::grpc::from_status` at `redirect.rs:19` and called at `redirect.rs:36`)
converts the `tonic::Status` into a standard `gax::Error`, which `send()`
eventually returns. The user inspects this by reading `err.status().code` (for
example, the `open_object` test asserts that `s.code == Code::NotFound` at
`transport.rs:528`).
**Manually inspecting trailing metadata:** this is currently not done because
the codebase only cares about the status. If this is desired the user would need
to completely drain the body until `Ok(None)` and then invoke Tonic's
`Streaming::trailers()`. To implement it, the user would add a `trailers()`
method to the `TonicStreaming` trait (`bidi.rs:38`) and call it on the
clean-exit path after the worker's loop terminates (`worker.rs:74`/`:96`), while
`rx` is still fully owned.
## G. `ActiveRead` vs `ReadObjectResponse`
These are simply the two opposite ends of a single range's **byte channel**,
created in `map_range` (`transport.rs:93`) and `read_range`
(`transport.rs:113`).
**`ActiveRead`** (`active_read.rs:25`) is the **worker's** write-end and routes
data received over the network to its `ReadObjectResponse` counterpart.
```rust
pub(crate) struct ActiveRead {
state: RemainingRange, // tracks how much of the range remains
sender: Sender<ReadResult<bytes::Bytes>>, // the WRITE end of the channel
}
```
- `handle_data` (`:41`): When a chunk arrives, it updates `state`, **verifies
the crc32c checksum** (`:52–60`), and pushes the bytes.
- `as_proto(id)` (`:77`): Generates a proto range describing only the
*remaining* bytes, ensuring a reconnect only asks for what hasn't been
received yet (`active_read.rs:77`).
- `interrupted(error)` (`:67`): Pushes a terminal `Err` down the channel to
intentionally fail the reader.
**`ReadObjectResponse`** (which wraps a `RangeReader`) is **the user's**
read-end. It simply holds the `Receiver`, yields sequential chunks via
`.next().await`, and contains no complex logic.
Because the channel inherently carries `Result<Bytes, ReadError>`, errors share
the exact same conduit as data. Thus, `reader.next().await` yields an
`Option<Result<Bytes>>` (`Some(Ok)` means a chunk, `Some(Err)` means failure,
and `None` means done). This is why, in
[step 11 of Part 1](#11-objectdescriptortransportnew--orchestration-part-2), the
`active` half is given to the worker while the `readers` half is handed back to
the user.
## H. The Two "Metadata"s
During the handshake inside `connect_attempt`, two distinct concepts are
confusingly both referred to as "metadata" just lines apart:
| **What it is** | gRPC **response headers** (gRPC confusingly calls headers "metadata") | The **object's** actual storage properties |
| **Type** | `MetadataMap` -> `HeaderMap` | `Option<Object>` |
| **Extracted From** | The response **envelope** (head) | The very first **body message** |
| **Becomes** | `descriptor.headers()` | `descriptor.object()` |
| **Example** | `content-type: application/grpc`, `x-guploader-uploadid` | `generation: 123456`, `size: 42` |
Think of it like an HTTP response: the envelope/headers represent transport
metadata wrapping the call itself, while the body carries the message stream.
The very first chunk in that body happens to carry the object's storage metadata
inside a field literally named `metadata`.
## I. The Worker's `run` Loop, Branch by Branch
The worker operates as the central hinge of the entire flow. It consumes
`ActiveRead`s from the read-request channel, pushes outbound messages to the
wire, processes returning responses, and routes them back to the user.
At `worker.rs:59`, after `Worker::new` initializes everything and the first
message is processed via `handle_response_success`, the worker is launched in
the background via `tokio::spawn(worker.run(connection, rx))`
(`bidi/transport.rs:63`). The `run` method functions as the perpetual background
pump.
### Skeleton of the Loop
```rust
pub async fn run(mut self, connection: Connection<C::Stream>, mut requests: Receiver<ActiveRead>) -> LoopResult<()> {
let mut ranges = Vec::new();
let (mut rx, mut tx) = (connection.rx, connection.tx); // wire-in and wire-out
let error = loop {
tokio::select! {
m = rx.next_message() => { ... } // :71 Branch A — Inbound
r = requests.recv_many(&mut ranges, 16) => { ... } // :88 Branch B — Outbound
}
};
drop(rx); drop(tx);
let Some(e) = error else { return Ok(()); };
while let Some(mut r) = requests.recv().await { ... } // drain any straggling readers
Err(e)
}
```
The `tokio::select!` macro races both arms every single iteration, running
whichever resolves first. Since it runs in a single task, only one arm executes
per iteration (there is no parallel execution inside the worker). The internal
`loop` acts as an expression: whatever value it `break`s with gets assigned to
`error`, which the post-loop cleanup code evaluates.
There are three exit paths:
- `break None`: A clean shutdown (the loop yields `None`).
- `break Some(e)`: A fatal error occurred.
- No break: Keep looping indefinitely.
After breaking, a clean exit returns `Ok(())`. An error exit aggressively drains
any pending `read_range` calls that were queued too late, failing them all via
`r.interrupted(e.clone())` before returning `Err(e)`.
### Branch A — Inbound (Wire-In -> Routing / Reconnect)
Located at `worker.rs:71–87`:
```rust
m = rx.next_message() => {
match self.handle_response(m).await {
None => break None,
Some(Err(e)) => break Some(e),
Some(Ok(None)) => {},
Some(Ok(Some(connection))) => {
(rx, tx) = (connection.rx, connection.tx);
}
};
},
```
**`rx.next_message()`** is bound to the `TonicStreaming` trait
(`bidi.rs:38–48`), which exists primarily to facilitate mocking. In production,
it wraps Tonic's `Streaming::message()` in one line. One call strictly yields
one message, not a batch.
It returns `Result<Option<BidiReadObjectResponse>, tonic::Status>`, encoding
three states:
- `Ok(Some(response))` — a healthy data message.
- `Ok(None)` — a clean stream termination.
- `Err(status)` — a stream error (could be an error trailer or a broken socket,
both synthesized into a `Status` by Tonic).
**`handle_response`** (`worker.rs:110–122`):
```rust
pub async fn handle_response(
&mut self,
message: TonicResult<Option<BidiReadObjectResponse>>,
) -> Option<LoopResult<Option<Connection<C::Stream>>>> {
let response = match message.transpose()? {
Ok(r) => r,
Err(status) => return self.reconnect(status).await,
};
match self.handle_response_success(response).await {
Err(e) => Some(Err(e)),
Ok(_) => Some(Ok(None)),
}
}
```
`message.transpose()` flips `Result<Option<T>, E>` into `Option<Result<T, E>>`.
The trailing `?` unwraps the outer `Option`. If it encounters a `None`
(representing an `Ok(None)` clean stream end), it early-returns `None` out of
`handle_response`, which the run loop intercepts with its `None => break None`
arm.
If it receives `Err(status)`, it immediately routes to
`self.reconnect(status).await`. If it gets `Ok(Some(r))`, it passes it down to
`self.handle_response_success(r).await`.
`handle_response_success` (`:124`) hands off to `handle_ranges`, which loops
over `handle_range_data` to route bytes via the `self.ranges` HashMap into the
correct byte channel. Routing failure yields `Some(Err(e))` (triggering a fatal
`break Some(e)` in the run loop), while success yields `Some(Ok(None))`
(instructing the run loop to continue using the same connection).
The `reconnect` method (`:153`) iterates over every range in `self.ranges` and
executes `self.connector.reconnect(status, ranges)`. If the reconnect outright
fails, it yields a fatal `Some(Err(e))`. If it succeeds, it returns
`Some(Ok(Some(new_connection)))`, signaling the run loop to swap in the new wire
pair.
**The Four Match Arms in the Run Loop:**
| `None` | Clean termination of stream | `break None` — exit with success |
| `Some(Err(e))` | Fatal (corrupt data or total reconnect failure) | `break Some(e)` — exit with error |
| `Some(Ok(None))` | Message routed successfully, connection healthy | fall through, continue loop |
| `Some(Ok(Some(connection)))` | Reconnect succeeded, entirely new wire pair | update `rx`/`tx`, continue loop |
**The Reconnect Swap:**
```rust
(rx, tx) = (connection.rx, connection.tx);
```
Using destructuring assignment, both wire halves are atomically replaced in a
single line. On the next iteration, `rx.next_message()` reads from the new
stream, and `tx.clone()` uses the new sender.
### Branch B — Outbound (Read-Request Channel -> Wire-Out)
Located at `worker.rs:88–93`:
```rust
r = requests.recv_many(&mut ranges, 16) => {
if r == 0 {
break None;
};
self.insert_ranges(tx.clone(), std::mem::take(&mut ranges)).await;
},
```
**Understanding `recv_many`:** This performs a batched receive. There are two
distinct buffers at play here:
- The **channel's internal queue** (capacity `100`, configured at
`bidi/transport.rs:44`). Senders continuously push into this queue while the
worker is busy executing other tasks.
- The **Vec scratch buffer named `ranges`** (declared at `worker.rs:64`), passed
here via mutable reference (`&mut`).
`recv_many(buf, max)` blocks until the queue holds at least one item, then
eagerly drains *up to* `max` items from the queue directly into `buf`, returning
the total count. If seven `read_range` calls queue up while the worker is busy
on Branch A, Branch B drains all seven simultaneously on its next pass. This
keeps latency low for the first item while boosting overall throughput.
The limit `16` restricts the drain count per cycle to keep the loop responsive
to incoming network data (Branch A). The `100` is the channel capacity,
providing backpressure headroom. These are parameters that can be tuned.
**`r == 0` — Graceful Shutdown:** This only evaluates to true when the channel
is **closed *and* empty**—meaning every single `Sender<ActiveRead>` clone has
been dropped *and* the internal queue is drained. Those clones reside in the
descriptor (`Self.tx`) and inside every `RangeReader` (`requests.clone()` at
`bidi/transport.rs:84`). Thus, this condition only triggers when the descriptor
and all active readers are destroyed. `break None` then exits the loop cleanly.
(A temporary lull in requests simply causes `recv_many` to sleep; it does *not*
return 0).
**Understanding `mem::take(&mut ranges)` (`worker.rs:92`)**:
`mem::take` functionally acts as `mem::replace(&mut v, T::default())`:
- It extracts and returns the current filled Vec.
- It leaves a brand new `T::default()` (which for a Vec is `Vec::new()`, a
zero-capacity, allocation-free shell) in the slot.
It does **not** preserve the allocation between iterations. The memory
allocation rides along with the returned Vec into `insert_ranges` and is
deallocated when that Vec goes out of scope. The next iteration's `recv_many`
starts completely fresh with a zero-capacity Vec.
The reason for using `mem::take` here is **borrow-checker hygiene**. The
variable `ranges` must survive across the loop because the `recv_many` future
actively borrows `&mut ranges` each iteration. Moving the Vec out directly
(`insert_ranges(.., ranges)`) would leave the variable in an invalid, moved-from
state. `mem::take` safely swaps a fresh default in, satisfying the compiler in a
single expression.
**`tx.clone()` (`worker.rs:92`) Sidestepping Borrow Constraints:** Passing a
clone into `insert_ranges` allows that function to execute `.send(..).await`
without retaining a borrow of the outer `tx` variable across an `.await`
boundary. The worker's primary `tx` survives safely in the main `run` scope for
the next loop.
**Inside `insert_ranges` (`worker.rs:194–214`):**
```rust
async fn insert_ranges(&mut self, tx: Sender<BidiReadObjectRequest>, readers: Vec<ActiveRead>) {
let mut ranges = Vec::new(); // local proto buffer (distinct from outer `ranges`!)
for r in readers {
let id = self.next_range_id; // :197 mint a fresh, monotonic id
self.next_range_id += 1;
let request = r.as_proto(id); // :200 generate the proto for the *remaining* range
self.ranges.lock().await.insert(id, r); // :201 register the ActiveRead in the routing HashMap
ranges.push(request); // :202 append the range proto to the outgoing wire message
}
let request = BidiReadObjectRequest {
read_ranges: ranges,
..BidiReadObjectRequest::default() // spec: None — it was already sent
};
if let Err(e) = tx.send(request).await { // :211 ship it; failure is non-fatal
tracing::error!("error sending read range request: {e:?}");
}
}
```
For every single `ActiveRead`:
1. **Mint a fresh id.** `self.next_range_id` increments monotonically; ids are
never reused over the worker's life. No atomics are required since
`&mut self` provides exclusive access.
1. **Build the proto.** `r.as_proto(id)` extracts the read's current
`RemainingRange`, meaning reconnects re-request only unreceived bytes
(`active_read.rs:77`).
1. **Register in the HashMap.** Acquiring the `tokio::sync::Mutex`
(`self.ranges.lock().await.insert(id, r)`) moves `r` into the routing table
under its `id`, releasing the lock immediately at statement end.
1. **Collect the proto** into `ranges`.
After processing the loop, all protos are batched into a single
`BidiReadObjectRequest`. The `..BidiReadObjectRequest::default()` syntax fills
the remaining fields, most importantly setting `read_object_spec: None` - the
spec is only sent during the initial handshake or a full reconnect.
**Insert-Before-Send** Crucially, the HashMap insert (`:201`) happens *before*
the `tx.send` (`:211`). If the send succeeds, the routing is already prepped. If
the send fails (because the wire-out is dead), the entry safely remains in the
HashMap. When Branch A notices the dead connection, it triggers `reconnect`
(`worker.rs:153`) which scoops up every range from the HashMap (`:157–163`) and
re-requests them in the new connection's opening message.
The governing pattern: **The HashMap is durable; the network wire is
disposable.** Reads secure a spot in the HashMap first, wire delivery is
inherently best-effort, and reconnect logic re-sends anything the network
dropped. No read is ever lost.
### The `read_id` Round Trip — How Multiplexing Works
The `id` is **opaque to the server**, which merely echoes back whatever the SDK
sends.
```
SDK side
─────────────────────────────────────────────────────────────────────────────────
worker.rs:197
id = self.next_range_id++
worker.rs:201
ranges.insert(id, ActiveRead) <- HashMap entry secured
worker.rs:200
r.as_proto(id) <- ProtoRange { read_id: 7, ... }
...Server returns read result... <- ObjectRangeData {
read_range: { read_id: 7, // echoed back },
checksummed_data: { ... }
}
worker.rs:216 handle_range_data
ranges.lock().get_mut(&7) <- Looks up by id -> finds correct byte channel
```
This is how multiplexing multiple concurrent reads over a single connection
works. Every chunk is tagged with its `read_id`, and the worker's HashMap
operates like a tiny switchboard routing the returned payload.
### The Inbound Pipeline in Depth
Here's what happens when Branch A receives a successful message:
```
rx.next_message() -> handle_response(m)
|
v (Ok(Some(r)))
handle_response_success(r)
|
v
handle_ranges(data)
|
v (for each chunk)
handle_range_data(ranges, chunk)
|
v
ActiveRead::handle_data -> Handler::send (lock-free)
```
On an `Err(status)`, it detours to `reconnect`, which might invoke
`close_readers` if the reconnect fails entirely.
#### `handle_response_success` (worker.rs:124)
```rust
pub async fn handle_response_success(
&mut self,
response: BidiReadObjectResponse,
) -> LoopResult<()> {
if let Err(e) = self.handle_ranges(response.object_data_ranges).await {
// An error in the response. These are not recoverable.
let error = Arc::new(e);
self.close_readers(error.clone()).await;
return Err(error);
}
Ok(())
}
```
This method takes a single response and routes its `object_data_ranges` to
`handle_ranges`, ignoring `metadata` and `read_handle` since those were purely
for the initial handshake. If the routing step fails, it wraps the error in an
`Arc`, fires `close_readers` to broadcast the failure to every registered
reader, and returns an `Err`. The comment "not recoverable" is vital: if the
server sends fundamentally corrupted *data*, a reconnect can't fix it
(reconnects only fix broken *wires*).
`handle_response_success` is called in two places: inside `handle_response` for
every standard data message, and synchronously in
`ObjectDescriptorTransport::new` (`bidi/transport.rs:59`) for the handshake's
first message, prior to the worker being spawned.
#### `handle_ranges` (worker.rs:137)
```rust
async fn handle_ranges(&self, data: Vec<ObjectRangeData>) -> crate::Result<()> {
let mut result = Ok(());
for response in data {
if let Err(e) = Self::handle_range_data(self.ranges.clone(), response).await {
result = result.and(Err(e)); // lock onto the first error, but keep draining
}
}
result.map_err(crate::Error::io)
}
```
This acts as the per-message dispatcher. It routes each `ObjectRangeData` piece
to `handle_range_data`. `result.and(Err(e))` idiom captures **only the first**
error it encounters while ignoring subsequent ones, ensuring the loop finishes
draining without an abrupt early return (bad responses are rare, so tracking
multiple errors simply isn't worth the complexity overhead). Finally, `map_err`
converts the internal `ReadError` into the public `crate::Error`.
#### `handle_range_data` (worker.rs:216)
```rust
async fn handle_range_data(
ranges: Arc<Mutex<HashMap<i64, ActiveRead>>>,
response: ObjectRangeData,
) -> ReadResult<()> {
let range = response.read_range.ok_or(... "missing range")?;
let handler = if response.range_end {
let mut pending = ranges.lock().await.remove(&range.read_id).ok_or(...)?;
pending.handle_data(response.checksummed_data, range, true)?
} else {
let mut guard = ranges.lock().await;
let pending = guard.get_mut(&range.read_id).ok_or(...)?;
pending.handle_data(response.checksummed_data, range, false)?
};
handler.send().await;
Ok(())
}
```
This is an associated function (no `self`) that takes the shared routing table
and a single chunk. It splits into two paths based on the `range_end: bool`
flag:
| `true` (final chunk) | `lock().remove(...)` | **Removed** entirely (owned) | Drops at the semicolon, *before* `handle_data` runs |
| `false` (most of the time) | `lock(); get_mut(...)` | **Stays** in HashMap (borrowed `&mut`) | Held exclusively through the body of `handle_data` |
**The Lock-Scope Trick:** `handle_data` (`active_read.rs:41`) is a synchronous
function (`fn`, not `async fn`), so careful management of the lock guarding it
is necessary. In the first path, the `remove` gives us ownership, so the lock
guard drops immediately at the semicolon, allowing `handle_data` to run
completely lock-free. In the second path, `pending` is borrowed directly from
the guard, meaning the lock *must* be held while `handle_data` runs.
After the `if/else` block finishes, the guard is fully released in both
branches, allowing the async `handler.send().await` to execute lock-free.
**The `Handler` Decoupling:** This design relies entirely on the `Handler` type
(`active_read.rs:83`). `handle_data` doesn't send bytes; it returns a `Handler`
representing what needs to be sent. This separates the handling process into a
locked "state update" phase (`handle_data`) and a lock-free "delivery" phase
(`handler.send()`), meaning that a slow reader with a full channel can never
stall the central routing table.
**Automatic Cleanup:** Path 1's `remove` keeps the HashMap lean by removing
completed reads from the routing table. No separate "deregister" step is
required.
#### `reconnect` (worker.rs:153) — Three Paths Out
```rust
async fn reconnect(&mut self, status: Status) -> Option<LoopResult<Option<Connection<C::Stream>>>> {
let ranges = /* collects every active range from the HashMap */;
let (response, _headers, connection) = match self.connector.reconnect(status, ranges).await {
Err(e) => {
let error = Arc::new(e);
self.close_readers(error.clone()).await;
return Some(Err(error)); // Path 1: connector completely gave up
}
Ok(t) => t,
};
if let Err(e) = self.handle_ranges(response.object_data_ranges).await {
let error = Arc::new(e);
self.close_readers(error.clone()).await;
return Some(Err(error)); // Path 2: first new message was corrupted
}
Some(Ok(Some(connection))) // Path 3: successful reconnect
}
```
There are three outcomes mapped to two return shapes:
| 1 | `connector.reconnect` exhausted retries | `Some(Err(e))` | `Some(Err(e)) => break Some(e)` |
| 2 | Reconnected, but first message was bad | `Some(Err(e))` | `Some(Err(e)) => break Some(e)` |
| 3 | Reconnected, first message routed perfectly | `Some(Ok(Some(connection)))` | Swaps `rx`/`tx` and continues |
The `Some(Ok(Some(connection)))` that triggers the run loop's wire-swapping arm
(`worker.rs:83`) is generated here. Note that both failure paths invoke
`close_readers` *before* returning, ensuring all readers are proactively
notified of the failure before the run loop unwinds.
#### `close_readers` (worker.rs:183) + `ActiveRead::interrupted`
```rust
async fn close_readers(&mut self, error: Arc<crate::Error>) {
use futures::StreamExt;
let mut guard = self.ranges.lock().await;
let closing = futures::stream::FuturesUnordered::new();
for (_, active) in guard.iter_mut() {
closing.push(active.interrupted(error.clone()));
}
let _ = closing.count().await;
guard.clear();
}
```
```rust
// active_read.rs:67
pub(super) async fn interrupted(&mut self, error: Arc<crate::Error>) {
if let Err(e) = self
.sender
.send(Err(ReadError::UnrecoverableBidiReadInterrupt(error)))
.await
{
tracing::error!("cannot notify reader (dropped?) about: {e:?}");
}
}
```
The goal here is simple: push a terminal error to every registered reader, then
clear the routing table.
**The `FuturesUnordered` Pattern:** The call `active.interrupted(error.clone())`
just *returns* a future without running it. The loop pushes them all into a
`FuturesUnordered` collection, and `closing.count().await` drives them all
concurrently. This guarantees that one slow reader cannot block the termination
of the others, which could happen with a `for ... { ... await }` loop.
**Two-Step Termination per Reader:**
1. `interrupted` pushes an
`Err(ReadError::UnrecoverableBidiReadInterrupt(Arc<Error>))` into the byte
channel.
1. `guard.clear()` drops all `ActiveRead` objects, which inherently drops their
`Sender`s, ultimately closing the channels.
The reader experiences: any buffered `Ok(Bytes)`, followed by the `Err`
(explaining *why*), and finally `None` (signaling *it's over*).
**Why `Arc<Error>`?** The error resides inside the channel message and lives on
wherever the reader stores it. A standard reference `&Error` would dangle, and
deep-cloning an error `N` times is a massive waste of resources. `Arc::clone`
provides a cheap atomic bump, giving us an owned, shared error ideal for
broadcasting.
## J. The Range-Type Stack
A read range's single byte channel is handled by a group of five specialized
types. Together, they handle the various responsibilities of *"The user
requested bytes, the user wants them streamed, and reconnects cannot lose
progress"*.
### Cheat Sheet
| `RequestedRange` | "What the user asked for" (pre-resolution) | Immutable after creation | Wrapped in `RemainingRange::Requested` |
| `NormalizedRange` | "What we are actively tracking" (post-first-chunk) | Mutates dynamically as bytes arrive | Wrapped in `RemainingRange::Normalized` |
| `RemainingRange` | State machine governing the two above | One per `ActiveRead` (`state` field) | `ActiveRead.state` |
| `ActiveRead` | State machine for a given read request + the byte-channel **write end** for bytes received from the server | One per range, living in the HashMap | Worker (`self.ranges`) |
| `RangeReader` | Byte-channel **read end** for bytes received from the server + keepalive signal | One per range | The user (inside `ReadObjectResponse`) |
Two invariants tie this system together:
1. **One range maps to one byte channel.** `ActiveRead` and `RangeReader` are
created together (`bidi/transport.rs:82–85`) and linked by a `(tx, rx)` pair.
1. **The state machine is strictly unidirectional:** `Requested` permanently
pivots to `Normalized` upon receiving the first chunk.
### `RequestedRange` — The User's Request
From `model_ext.rs:291–296`:
```rust
#[derive(Clone, Copy, Debug, PartialEq)]
pub(crate) enum RequestedRange {
Offset(u64), // "From byte N onward"
Tail(u64), // "The final N bytes"
Segment { offset: u64, limit: u64 }, // "N bytes beginning at offset M"
}
```
These are user-friendly units (positive `u64`), mapping to GCS's three native
read shapes.
**The `ReadRange` Newtype** (`model_ext.rs:171`):
```rust
pub struct ReadRange(pub(crate) RequestedRange);
```
The inner field is **pub(crate)**. This wrapper provides four benefits:
1. **API Stability:** The internal enum can safely evolve without breaking
downstream applications.
1. **Validated Factories:** Users must construct them correctly:
- `ReadRange::offset(...)`
- `ReadRange::tail(...)`
- `ReadRange::head(...)`
- `ReadRange::segment(...)`
- `ReadRange::all()`
1. **Obscured Proto Idioms:** Users ask for `ReadRange::tail(100)` without ever
knowing that it transmits as `read_offset: -100` on the wire.
1. **Rich Documentation:** Each constructor function gets its own dedicated
rustdoc and runnable example.
#### `as_proto` — Wire Encoding and Routing
From `bidi/requested_range.rs:24–42`:
```rust
pub fn as_proto(&self, id: i64) -> ProtoRange {
match self {
Self::Offset(o) => ProtoRange { read_id: id, read_offset: *o as i64, ..default() },
Self::Tail(o) => ProtoRange { read_id: id, read_offset: -(*o as i64), ..default() },
Self::Segment { offset, limit } => ProtoRange { read_id: id, read_offset: *offset as i64, read_length: *limit as i64 },
}
}
```
Two things to note:
- **`Tail` maps to a negative offset**, matching the GCS protobuf convention for
"from the end."
- **`read_length: 0` is the sentinel for "to EOF"**, utilized by `Offset` and
`Tail`.
**Why not implement `From<RequestedRange>` to do the proto conversion?** A
standard `From::from` takes only one argument, leaving no room to pass the
**`id`** (the routing key generated at `worker.rs:197–198`). `as_proto` handles
serialization and routing tagging simultaneously.
### `NormalizedRange` — Keeping track of progress
From `normalized_range.rs:40–44`:
```rust
pub struct NormalizedRange {
offset: i64,
length: Option<i64>,
}
```
Under the hood, ranges are normalized into:
- **`offset`**: The absolute byte offset.
- **`length`**: `Some(n)` for exact remaining bytes, or `None` representing an
unbounded read ("to EOF").
#### `NormalizedRange::update`
Located at `normalized_range.rs:91`, this function executes every single time a
chunk arrives:
```rust
pub fn update(&mut self, response: ProtoRange) -> ReadResult<()> {
let update = NormalizedRange::from_proto(response)?; // 1. parse the incoming chunk's range
if update.offset != self.offset {
return Err(ReadError::bidi_out_of_order(...)); // 2. enforce in-order delivery
}
self.length = match (self.length, update.length) { // 3. update the remaining bytes-to-read
(None, _) => None,
(Some(l), None) => Some(l),
(Some(expected), Some(got)) if got <= expected => Some(expected - got),
(Some(expected), Some(got)) => return Err(LongRead { ... }),
};
self.offset = update.offset + update.length().unwrap_or_default(); // 4. advance the cursor
Ok(())
}
```
After `update` returns, `(offset, length)` describes the bytes still pending.
This makes the state self-describing: after a reconnect, calling `as_proto`
emits the correct "resume from here" range.
The length match (step 3) broken down:
| `None` | anything | `None` | An unbounded request stays unbounded. |
| `Some(l)` | `None` | `Some(l)` | Chunk reported zero size — leave the expectation untouched |
| `Some(expected)`, `got ≤ expected` | `Some(got)` | `Some(expected - got)` | Normal case — subtract the delta. |
| `Some(expected)`, `got > expected` | `Some(got)` | `LongRead` error | Server overran the limit — bail. |
`unwrap_or_default()` in step 4 yields the inner value when `Some`, or
`i64::default() == 0` otherwise. So if `update.length` is `None`, the cursor
stays put.
##### A Worked Example
Say the user requested `Segment { offset: 1000, limit: 500 }` on a 10,000-byte
object, so the state starts at `offset: 1000, length: Some(500)`. A chunk
arrives carrying `read_offset: 1000, read_length: 200`:
| 1 | Parse the incoming range → `update = { offset: 1000, length: Some(200) }` | (no change to `self` yet) |
| 2 | Check `update.offset == self.offset` (1000 == 1000) | OK |
| 3 | Subtract length: `Some(500) - Some(200)` → `Some(300)` | `self.length = Some(300)` |
| 4 | Advance cursor: `self.offset = 1000 + 200 = 1200` | `self = { offset: 1200, length: Some(300) }` |
```
byte: 0 1000 1200 1500 10000
├───────┼──────┼─────────────┼──────────────────┤
xxxxxxx╞═════════════╡
received what's left to receive
```
Call `as_proto` at this moment and the user gets
`{read_offset: 1200, read_length: 300}` — precisely the re-request payload a
reconnect needs.
### `RemainingRange` — The State Machine
From `remaining_range.rs:27–31`:
```rust
#[derive(Clone, Copy, Debug, PartialEq)]
pub enum RemainingRange {
Requested(RequestedRange), // Prior to the very first response
Normalized(NormalizedRange), // Following the first response
}
```
This enum handles the two situations where a reconnect might occur:
- **Before the server has sent a single chunk**, requiring us to re-send the
original read request.
- **Mid-stream**, requiring us to send the normalized read request.
When the first chunk is processed, the `update` method (`:34`) switches the enum
from `Requested` to `Normalized`, applies the chunk's delta, and commits the
state. From that point on, everything flows into the simpler `Normalized`
pathway.
### `ActiveRead` — The Worker's Handle
From `active_read.rs:24–28`:
```rust
#[derive(Debug)]
pub(crate) struct ActiveRead {
state: RemainingRange, // ← The self-healing state machine
sender: Sender<ReadResult<bytes::Bytes>>, // ← The byte channel's write end
}
```
This represents the server-facing end of a single range, residing in the
worker's HashMap. Both fields survive a reconnect untouched.
### `RangeReader` — The User's Handle
From `range_reader.rs:26–33`:
```rust
#[derive(Debug)]
pub struct RangeReader {
inner: Receiver<Result<bytes::Bytes, ReadError>>,
object: Arc<Object>,
// Unused, holding to a copy prevents the worker task from terminating
// early.
_tx: Sender<ActiveRead>,
}
```
**The `_tx` Keepalive Trick:** The `_tx` field is an intentional dummy
reference. Since the worker uses the "all senders dropped" condition of the
read-request channel as its signal to shut down (`worker.rs:88–93`), holding a
clone here keeps the worker alive.
## K. Redirects and the Resilience Machinery
Sometimes, the server decides to redirect a bidirectional read mid-stream. The
machinery that handles this lives across:
- `redirect.rs`
- `retry_redirect.rs`
- `resume_redirect.rs`
- `connector.rs` (specifically the `Connector` struct)
Let's map it out.
### The Protocol — What a Redirect Actually Looks Like
Google Cloud Storage doesn't use a dedicated "redirect" gRPC status code.
Instead, redirects are signalled using a `tonic::Status` error that carries a
structured payload in its details:
```
tonic::Status {
code: Code::Aborted, // It looks like a permanent failure at first glance
message: "...",
details: <bytes that decode to google.rpc.Status>
RpcStatus {
details: [
Any::<BidiReadObjectRedirectedError> {
routing_token: Option<String>, // The new value to use for x-goog-request-params
read_handle: Option<...>, // A server-issued resume handle
}
]
}
}
```
This represents two layers of "status" (reminiscent of the
[two metadatas pattern](#h-the-two-metadatas)):
- `tonic::Status`: The transport envelope containing the code, message, and raw
bytes.
- `google.rpc.Status`: The rich protobuf error payload.
`RpcStatus::decode(tonic_status.details())` opens the envelope.
**Why use `Aborted`?** A standard retry policy generally treats `Aborted` as a
permanent failure to avoid hammering a dead endpoint. By hijacking this
"permanent" code, GCS efficiently signals, "This specific stream is dead—but
here is exactly where the user should connect next." A specialized decorator
pattern then intentionally overrides that "give up" verdict.
### `redirect.rs` — Recognize and React
This file houses two distinct functions:
**`handle_redirect(spec, status)`** (`redirect.rs:25`): This function extracts
the redirect information from the status and updates the shared
`Arc<Mutex<BidiReadObjectSpec>>` with the new routing token and resume handle.
This guarantees that any subsequent reconnect attempt will carry the new routing
token.
It always returns the GAX-converted error, regardless of whether a redirect was
actually found (acting as a safe no-op for standard errors).
**`is_redirect(&error)`** (`redirect.rs:40`): This is a pure predicate function
with zero side effects. It checks four conditions in order:
- The error code must be `Aborted`.
- The error must wrap a `tonic::Status`.
- The details must cleanly decode as a `google.rpc.Status`.
- At least one detail must be a `BidiReadObjectRedirectedError`.
**Why split this into two functions?**
- `handle_redirect` operates on the wire boundary, updating state for the *next*
attempt. It is called in three places within `connector.rs` during error paths
where the raw `tonic::Status` is still available:
- `connector.rs:213` (Early dial failure)
- `connector.rs:230` (First-message handshake failure)
- `connector.rs:120` (Mid-stream worker failure)
- `is_redirect` operates inside policy decorators where the error is already a
`gax::Error`; it decides whether there *will be* a next attempt by overriding
the policy's verdict. It is called purely inside policy decorators:
- `retry_redirect.rs:47` (`RetryRedirect::on_error`, used by the inner retry
loop)
- `resume_redirect.rs:45` (`ResumeRedirect::on_error`, used directly by
`connector.reconnect`)
### The Decorator Pattern
Both `RetryRedirect` and `ResumeRedirect` utilize the exact same shape:
```rust
fn on_error(&self, state: &..., error: Error) -> RetryResult {
match self.inner.on_error(state, error) {
RetryResult::Permanent(e) if is_redirect(&e) => RetryResult::Continue(e),
result => result,
}
}
```
This pattern **selectively overrides `Permanent` to `Continue` strictly for
redirect errors**. It respects `Exhausted` (if the attempt budget is tapped out,
redirects halt), and it respects genuine `Permanent` errors (like auth
failures).
Decorators are mandatory here because they inject logic directly *inside* the
external `retry_loop` (from the `gax` crate), which is only accessible through
the `RetryPolicy` trait.
### End-to-End Redirect Trace & The Durable-State Principle
To put it all together, let's trace a redirect mid-stream.
Imagine three concurrent reads are happily humming along. Here is the internal
state the instant before the redirect fires — one read from `send_and_read`, two
added via `read_range`, and the worker parked in its `select!`:
```
Spec (Arc<Mutex<BidiReadObjectSpec>>)
bucket: "projects/_/buckets/my-bucket"
object: "my-object"
generation: 789 // resolved by the server at the first handshake
read_handle: Some(handle_v1) // server-issued at the first handshake
routing_token: None // no redirect yet
Worker routing HashMap (self.ranges)
{ 0: ActiveRead { state: Normalized { offset: 9700, length: None }, sender: <byte_tx_a> },
1: ActiveRead { state: Normalized { offset: 2500, length: None }, sender: <byte_tx_b> },
2: ActiveRead { state: Normalized { offset: 5800, length: Some(200) }, sender: <byte_tx_c> } }
Worker
connector.reconnect_attempts: 0
rx: <Streaming v1> tx: <Sender v1> // current wire halves
parked in tokio::select! at worker.rs:70
```
Now, phase by phase:
1. **Interruption.** GCS closes the stream with
`tonic::Status { code: Aborted, details: <BidiReadObjectRedirectedError { routing_token: Some("rt-v2"), read_handle: Some(handle_v2) }> }`.
Tonic surfaces it as the next value from `rx.next_message()`.
1. **Detection.** Branch A inside `Worker::run` (`worker.rs:71`) resolves to
`m = Err(status)`. In `Worker::handle_response` (`worker.rs:110`),
`message.transpose()?` takes the
`Err(status) => return self.reconnect(status).await` arm
(`worker.rs:114–117`), invoking `Worker::reconnect`.
1. **Snapshot progress.** `Worker::reconnect` (`worker.rs:153`) maps every range
to `ActiveRead::as_proto(*id)` (`worker.rs:157–163`). Because
`NormalizedRange::update` advanced each cursor as chunks arrived, the protos
ask only for what is still outstanding:
```
[ ProtoRange { read_id: 0, read_offset: 9700, read_length: 0 }, // open tail
ProtoRange { read_id: 1, read_offset: 2500, read_length: 0 }, // open offset
ProtoRange { read_id: 2, read_offset: 5800, read_length: 200 } ] // bounded segment
```
No already-received byte is re-requested.
1. **Mutate the spec.** `Connector::reconnect` (`connector.rs:113`) calls
`handle_redirect(self.spec.clone(), status)` (`connector.rs:120`), which
decodes the redirect detail and writes it through:
```
routing_token: Some("rt-v2") // updated
read_handle: Some(handle_v2) // updated (all other fields unchanged)
```
1. **Policy verdict: continue.** `Connector::reconnect` increments
`self.reconnect_attempts` (`connector.rs:122`), then
`ResumeRedirect::on_error` runs (`connector.rs:124–128`). The inner policy
sees `Aborted` and returns `Permanent`; the decorator calls `is_redirect`
(true) and overrides it to `Continue`, so `Connector::reconnect` calls
`Connector::connect(ranges)` (`connector.rs:125`).
1. **Reconnect with the updated spec (Connector).** The `Connector::connect`
(`connector.rs:84`) retry loop fires `Connector::connect_attempt`
(`connector.rs:140`), which uses the now-updated spec in the opening message
and builds the routing header from it:
```
x-goog-request-params: bucket=projects/_/buckets/my-bucket&routing_token=rt-v2
```
A fresh `(tx_v2, rx_v2)` pair is created (`connector.rs:185`), the opening
message preloaded (`connector.rs:186`), and `client.start(...)`
(`connector.rs:200`) opens a new stream. The first message is read
(`connector.rs:218`) and the spec re-enriched (`connector.rs:220–226`).
1. **First response data (Worker).** Back in `Worker::reconnect`
(`worker.rs:153`), any range bytes the server packed into the handshake go
through `Worker::handle_ranges` (`worker.rs:173`) just like a mid-stream
message — looked up by `read_id` in the same HashMap, pushed down the same
byte channels, cursors advanced. The `Worker::reconnect` method returns
`Some(Ok(Some(connection_v2)))` (`worker.rs:180`).
1. **Atomic wire swap.** The matching arm of Branch A inside `Worker::run` runs
`(rx, tx) = (connection.rx, connection.tx)` (`worker.rs:84`). The dead v1
wires drop; the v2 wires take their place. The HashMap is untouched.
1. **Resume.** The `Worker::run` loop re-enters `select!` on the new wires.
Chunks arriving on `rx_v2` carry the same `read_id`s, route to the same
`ActiveRead`s, and flow down the same user-facing channels. The user's parked
`reader.next().await` wakes and yields the next chunk — no error, no panic,
just a brief latency pause.
### The Durable / Disposable Split
The architecture works because of the following separation of concerns:
- **The Durable Layer (Survives Reconnects):**
- **The Spec** (holds identity and routing): `self.spec` in `Connector`
(`connector.rs:63`).
- **The HashMap** (holds state machines): `self.ranges` in `Worker`
(`worker.rs:32`).
- **The user-facing read-request channel** (receives new read requests):
`requests` in `Worker::run` (`worker.rs:62`).
- **The user-facing byte channels** (transfers data bytes to reader): `sender`
in `ActiveRead` (`active_read.rs:27`).
- **The Disposable Layer (Discarded on Reconnect):**
- **The wire-in stream** (receives messages from GCS): `rx` field in
`Connection` (`connector.rs:36`).
- **The wire-out sender** (sends requests to GCS): `tx` field in `Connection`
(`connector.rs:35`).
- **The connection wrapper** (groups stream and channel): `Connection` struct
(`connector.rs:34`).