<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>PostgreSQL Client Library Architecture</title>
<style>
:root {
--bg-primary: #0d1117;
--bg-secondary: #161b22;
--bg-tertiary: #21262d;
--text-primary: #e6edf3;
--text-secondary: #8b949e;
--accent: #4ade80;
--accent-dim: #22863a;
--link: #58a6ff;
--border: #30363d;
--code-bg: #161b22;
}
* {
box-sizing: border-box;
}
body {
font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', 'Noto Sans', Helvetica, Arial, sans-serif;
background-color: var(--bg-primary);
color: var(--text-primary);
line-height: 1.6;
margin: 0;
padding: 20px;
max-width: 1000px;
margin: 0 auto;
}
h1 {
color: var(--accent);
border-bottom: 2px solid var(--accent-dim);
padding-bottom: 10px;
font-size: 2em;
}
h2 {
color: var(--accent);
border-bottom: 1px solid var(--border);
padding-bottom: 8px;
margin-top: 2em;
font-size: 1.5em;
}
h3 {
color: var(--text-primary);
margin-top: 1.5em;
font-size: 1.2em;
}
a {
color: var(--link);
text-decoration: none;
}
a:hover {
text-decoration: underline;
}
code {
background-color: var(--code-bg);
padding: 2px 6px;
border-radius: 4px;
font-family: 'SFMono-Regular', Consolas, 'Liberation Mono', Menlo, monospace;
font-size: 0.9em;
border: 1px solid var(--border);
}
pre {
background-color: var(--bg-secondary);
border: 1px solid var(--border);
border-radius: 8px;
padding: 16px;
overflow-x: auto;
font-family: 'SFMono-Regular', Consolas, 'Liberation Mono', Menlo, monospace;
font-size: 0.85em;
line-height: 1.5;
}
pre code {
background: none;
padding: 0;
border: none;
font-size: inherit;
}
table {
width: 100%;
border-collapse: collapse;
margin: 1em 0;
background-color: var(--bg-secondary);
border-radius: 8px;
overflow: hidden;
}
th, td {
padding: 12px 16px;
text-align: left;
border-bottom: 1px solid var(--border);
}
th {
background-color: var(--bg-tertiary);
color: var(--accent);
font-weight: 600;
}
tr:last-child td {
border-bottom: none;
}
tr:hover {
background-color: var(--bg-tertiary);
}
.section {
background-color: var(--bg-secondary);
border: 1px solid var(--border);
border-radius: 8px;
padding: 20px;
margin: 1.5em 0;
}
.subtitle {
color: var(--text-secondary);
font-size: 1.1em;
margin-top: -10px;
margin-bottom: 2em;
}
.keyword { color: #ff7b72; }
.type { color: #79c0ff; }
.string { color: #a5d6ff; }
.comment { color: #8b949e; }
.function { color: #d2a8ff; }
.lifetime { color: #ffa657; }
.macro { color: #79c0ff; }
.tree {
font-family: 'SFMono-Regular', Consolas, 'Liberation Mono', Menlo, monospace;
color: var(--text-secondary);
}
.tree .dir { color: var(--link); }
.tree .file { color: var(--text-primary); }
.tree .comment { color: var(--text-secondary); font-style: italic; }
</style>
</head>
<body>
<h1>PostgreSQL Client Library Architecture</h1>
<p class="subtitle">Based on zero-mysql patterns and PostgreSQL wire protocol specifics</p>
<h2>Directory Structure</h2>
<pre class="tree"><span class="dir">src/</span>
├── <span class="dir">protocol/</span>
│ ├── <span class="dir">frontend/</span> <span class="comment"># Client → Server messages</span>
│ │ ├── <span class="file">mod.rs</span> <span class="comment"># Encoding functions</span>
│ │ ├── <span class="file">startup.rs</span> <span class="comment"># StartupMessage, SSLRequest, CancelRequest</span>
│ │ ├── <span class="file">auth.rs</span> <span class="comment"># PasswordMessage, SASLInitialResponse, SASLResponse</span>
│ │ ├── <span class="file">simple.rs</span> <span class="comment"># Query</span>
│ │ ├── <span class="file">extended.rs</span> <span class="comment"># Parse, Bind, Describe, Execute, Close, Sync, Flush</span>
│ │ └── <span class="file">copy.rs</span> <span class="comment"># CopyData, CopyDone, CopyFail</span>
│ │
│ ├── <span class="dir">backend/</span> <span class="comment"># Server → Client messages</span>
│ │ ├── <span class="file">mod.rs</span> <span class="comment"># RawMessage, type byte constants</span>
│ │ ├── <span class="file">auth.rs</span> <span class="comment"># Authentication*, BackendKeyData, ParameterStatus</span>
│ │ ├── <span class="file">query.rs</span> <span class="comment"># RowDescription, DataRow, CommandComplete</span>
│ │ ├── <span class="file">error.rs</span> <span class="comment"># ErrorResponse, NoticeResponse, field parsing</span>
│ │ ├── <span class="file">extended.rs</span> <span class="comment"># ParseComplete, BindComplete, CloseComplete, etc.</span>
│ │ └── <span class="file">copy.rs</span> <span class="comment"># CopyInResponse, CopyOutResponse, CopyData, CopyDone</span>
│ │
│ ├── <span class="file">copy.rs</span> <span class="comment"># Shared COPY types</span>
│ ├── <span class="file">types.rs</span> <span class="comment"># FormatCode, TransactionStatus, Oid, well-known OIDs</span>
│ └── <span class="file">codec.rs</span> <span class="comment"># Int16/Int32/String encoding/decoding</span>
│
├── <span class="dir">state/</span> <span class="comment"># Sans-I/O state machines</span>
│ ├── <span class="file">mod.rs</span> <span class="comment"># StateMachine trait</span>
│ ├── <span class="file">connection.rs</span> <span class="comment"># Startup + Authentication</span>
│ ├── <span class="file">simple_query.rs</span> <span class="comment"># Simple query protocol</span>
│ ├── <span class="file">extended.rs</span> <span class="comment"># Extended query protocol + PreparedStatement</span>
│ └── <span class="file">action.rs</span> <span class="comment"># Action enum, AsyncMessage</span>
│
├── <span class="dir">conversion/</span> <span class="comment"># Type conversion</span>
│ ├── <span class="file">mod.rs</span> <span class="comment"># FromRow, ToParams traits</span>
│ ├── <span class="file">primitives.rs</span> <span class="comment"># i32, i64, f64, bool, etc.</span>
│ ├── <span class="file">string.rs</span> <span class="comment"># String, &str</span>
│ ├── <span class="file">bytes.rs</span> <span class="comment"># Vec<u8>, &[u8]</span>
│ ├── <span class="file">time.rs</span> <span class="comment"># time crate types (feature-gated)</span>
│ ├── <span class="file">chrono.rs</span> <span class="comment"># chrono types (feature-gated)</span>
│ ├── <span class="file">uuid.rs</span> <span class="comment"># UUID (feature-gated)</span>
│ ├── <span class="file">decimal.rs</span> <span class="comment"># rust_decimal (feature-gated)</span>
│ ├── <span class="file">numeric_util.rs</span> <span class="comment"># NUMERIC string parsing (f32/f64)</span>
│ └── <span class="file">row.rs</span> <span class="comment"># Row parsing utilities</span>
│
├── <span class="dir">sync/</span> <span class="comment"># Synchronous API</span>
│ ├── <span class="file">mod.rs</span>
│ ├── <span class="file">conn.rs</span> <span class="comment"># Conn - main connection type</span>
│ ├── <span class="file">stream.rs</span> <span class="comment"># Stream (TCP/Unix/TLS)</span>
│ ├── <span class="file">pool.rs</span> <span class="comment"># Connection pool</span>
│ ├── <span class="file">transaction.rs</span> <span class="comment"># Transaction handle</span>
│ ├── <span class="file">unnamed_portal.rs</span> <span class="comment"># Cursor-style iteration</span>
│ └── <span class="dir">pipeline/</span> <span class="comment"># Pipelining support</span>
│ ├── <span class="file">mod.rs</span> <span class="comment"># Pipeline struct, exec/sync/claim_* methods</span>
│ └── <span class="file">handles.rs</span> <span class="comment"># Ticket struct (sequence-based handle)</span>
│
├── <span class="dir">tokio/</span> <span class="comment"># Async API (feature-gated)</span>
│ ├── <span class="file">mod.rs</span>
│ ├── <span class="file">conn.rs</span>
│ ├── <span class="file">stream.rs</span>
│ ├── <span class="file">pool.rs</span>
│ └── <span class="file">transaction.rs</span>
│
├── <span class="file">buffer_pool.rs</span> <span class="comment"># Global buffer pool</span>
├── <span class="file">buffer_set.rs</span> <span class="comment"># Per-connection buffers</span>
├── <span class="file">handler.rs</span> <span class="comment"># TextHandler, BinaryHandler, AsyncMessageHandler</span>
├── <span class="file">statement.rs</span> <span class="comment"># IntoStatement trait</span>
├── <span class="file">opts.rs</span> <span class="comment"># Connection options</span>
├── <span class="file">error.rs</span> <span class="comment"># Error types</span>
└── <span class="file">lib.rs</span></pre>
<h2>Message Design</h2>
<h3>No Wrapping Enum</h3>
<p>Parse on demand - each state machine knows what messages it expects:</p>
<pre><code><span class="comment">/// Raw message from server - just type byte and payload</span>
<span class="keyword">pub struct</span> <span class="type">RawMessage</span><<span class="lifetime">'a</span>> {
<span class="keyword">pub</span> type_byte: <span class="type">u8</span>,
<span class="keyword">pub</span> payload: &<span class="lifetime">'a</span> [<span class="type">u8</span>],
}
<span class="comment">/// State machine parses specific messages as needed</span>
<span class="keyword">impl</span> <span class="type">SimpleQueryStateMachine</span> {
<span class="keyword">fn</span> <span class="function">step</span>(&<span class="keyword">mut</span> <span class="keyword">self</span>, msg: <span class="type">RawMessage</span>) -> <span class="type">Result</span><<span class="type">Action</span>> {
<span class="keyword">match</span> (<span class="keyword">self</span>.state, msg.type_byte) {
(<span class="type">State</span>::WaitingRows, <span class="string">b'D'</span>) => {
<span class="keyword">let</span> row = <span class="type">DataRow</span>::<span class="function">parse</span>(msg.payload)?;
<span class="comment">// handle row</span>
}
(<span class="type">State</span>::WaitingRows, <span class="string">b'C'</span>) => {
<span class="keyword">let</span> complete = <span class="type">CommandComplete</span>::<span class="function">parse</span>(msg.payload)?;
<span class="comment">// handle completion</span>
}
<span class="comment">// ...</span>
}
}
}</code></pre>
<h3>Individual Message Types with Zerocopy</h3>
<p>For messages without <code>String</code> or <code>Bytes[n]</code>, derive zerocopy traits directly:</p>
<pre><code><span class="comment">// Fixed-size message - derive zerocopy</span>
<span class="macro">#[derive(FromBytes, KnownLayout, Immutable)]</span>
<span class="macro">#[repr(C, packed)]</span>
<span class="keyword">pub struct</span> <span class="type">ReadyForQuery</span> {
<span class="keyword">pub</span> status: <span class="type">u8</span>, <span class="comment">// 'I', 'T', or 'E'</span>
}
<span class="comment">/// BackendKeyData - variable-length secret key in protocol 3.2</span>
<span class="keyword">pub struct</span> <span class="type">BackendKeyData</span> {
pid: <span class="type">u32</span>,
secret_key: <span class="type">Vec</span><<span class="type">u8</span>>, <span class="comment">// 4-256 bytes</span>
}</code></pre>
<p>For messages with <code>String</code> or <code>Bytes[n]</code> at start/end, split into Head/Tail:</p>
<pre><code><span class="comment">// RowDescription: Int16 num_fields, then variable fields</span>
<span class="macro">#[derive(FromBytes, KnownLayout, Immutable)]</span>
<span class="macro">#[repr(C, packed)]</span>
<span class="keyword">pub struct</span> <span class="type">RowDescriptionHead</span> {
<span class="keyword">pub</span> num_fields: <span class="type">U16BE</span>,
}
<span class="keyword">pub struct</span> <span class="type">RowDescription</span><<span class="lifetime">'a</span>> {
head: &<span class="lifetime">'a</span> <span class="type">RowDescriptionHead</span>,
fields_data: &<span class="lifetime">'a</span> [<span class="type">u8</span>], <span class="comment">// Variable-length field definitions</span>
}
<span class="comment">// CommandComplete: String tag</span>
<span class="keyword">pub struct</span> <span class="type">CommandComplete</span><<span class="lifetime">'a</span>> {
<span class="keyword">pub</span> tag: &<span class="lifetime">'a</span> <span class="type">str</span>, <span class="comment">// Just the null-terminated string</span>
}
<span class="comment">// ErrorResponse: repeated (Byte1 type, String value), Byte1(0)</span>
<span class="keyword">pub struct</span> <span class="type">ErrorResponse</span><<span class="lifetime">'a</span>> {
fields_data: &<span class="lifetime">'a</span> [<span class="type">u8</span>], <span class="comment">// Parse on demand</span>
}
<span class="comment">// DataRow: Int16 num_columns, then (Int32 len, Bytes[n])...</span>
<span class="macro">#[derive(FromBytes, KnownLayout, Immutable)]</span>
<span class="macro">#[repr(C, packed)]</span>
<span class="keyword">pub struct</span> <span class="type">DataRowHead</span> {
<span class="keyword">pub</span> num_columns: <span class="type">U16BE</span>,
}
<span class="keyword">pub struct</span> <span class="type">DataRow</span><<span class="lifetime">'a</span>> {
head: &<span class="lifetime">'a</span> <span class="type">DataRowHead</span>,
columns_data: &<span class="lifetime">'a</span> [<span class="type">u8</span>],
}</code></pre>
<h3>Bidirectional Messages (COPY)</h3>
<p>CopyData and CopyDone are used in both directions:</p>
<pre><code><span class="comment">// Shared in protocol/copy.rs or protocol/types.rs</span>
<span class="keyword">pub struct</span> <span class="type">CopyData</span><<span class="lifetime">'a</span>>(<span class="keyword">pub</span> &<span class="lifetime">'a</span> [<span class="type">u8</span>]);
<span class="keyword">pub struct</span> <span class="type">CopyDone</span>;</code></pre>
<h2>Zero-Copy Row Iteration</h2>
<pre><code><span class="keyword">pub struct</span> <span class="type">DataRow</span><<span class="lifetime">'a</span>> {
payload: &<span class="lifetime">'a</span> [<span class="type">u8</span>],
num_columns: <span class="type">u16</span>,
}
<span class="keyword">impl</span><<span class="lifetime">'a</span>> <span class="type">DataRow</span><<span class="lifetime">'a</span>> {
<span class="keyword">pub fn</span> <span class="function">iter</span>(&<span class="keyword">self</span>) -> <span class="type">DataRowIter</span><<span class="lifetime">'a</span>> {
<span class="type">DataRowIter</span> { remaining: <span class="keyword">self</span>.payload }
}
<span class="keyword">pub fn</span> <span class="function">len</span>(&<span class="keyword">self</span>) -> <span class="type">usize</span> {
<span class="keyword">self</span>.num_columns <span class="keyword">as</span> <span class="type">usize</span>
}
}
<span class="keyword">pub struct</span> <span class="type">DataRowIter</span><<span class="lifetime">'a</span>> {
remaining: &<span class="lifetime">'a</span> [<span class="type">u8</span>],
}
<span class="keyword">impl</span><<span class="lifetime">'a</span>> <span class="type">Iterator</span> <span class="keyword">for</span> <span class="type">DataRowIter</span><<span class="lifetime">'a</span>> {
<span class="keyword">type</span> <span class="type">Item</span> = <span class="type">Option</span><&<span class="lifetime">'a</span> [<span class="type">u8</span>]>; <span class="comment">// None = NULL</span>
<span class="keyword">fn</span> <span class="function">next</span>(&<span class="keyword">mut</span> <span class="keyword">self</span>) -> <span class="type">Option</span><<span class="type">Self</span>::<span class="type">Item</span>> {
<span class="keyword">if</span> <span class="keyword">self</span>.remaining.<span class="function">is_empty</span>() {
<span class="keyword">return</span> <span class="type">None</span>;
}
<span class="keyword">let</span> len = <span class="type">i32</span>::<span class="function">from_be_bytes</span>(<span class="keyword">self</span>.remaining[..<span class="string">4</span>].<span class="function">try_into</span>().<span class="function">unwrap</span>());
<span class="keyword">self</span>.remaining = &<span class="keyword">self</span>.remaining[<span class="string">4</span>..];
<span class="keyword">if</span> len == <span class="string">-1</span> {
<span class="type">Some</span>(<span class="type">None</span>) <span class="comment">// NULL</span>
} <span class="keyword">else</span> {
<span class="keyword">let</span> len = len <span class="keyword">as</span> <span class="type">usize</span>;
<span class="keyword">let</span> value = &<span class="keyword">self</span>.remaining[..len];
<span class="keyword">self</span>.remaining = &<span class="keyword">self</span>.remaining[len..];
<span class="type">Some</span>(<span class="type">Some</span>(value))
}
}
}</code></pre>
<p>User provides <code>parse_value</code> to decode <code>&[u8]</code>:</p>
<pre><code><span class="keyword">fn</span> <span class="function">row</span>(&<span class="keyword">mut</span> <span class="keyword">self</span>, row: <span class="type">DataRow</span><<span class="lifetime">'_</span>>) -> <span class="type">Result</span><<span class="type">ControlFlow</span>, <span class="type">Error</span>> {
<span class="keyword">let mut</span> cols = row.<span class="function">iter</span>();
<span class="keyword">let</span> id: <span class="type">i32</span> = <span class="function">parse_value</span>(cols.<span class="function">next</span>()?.<span class="function">unwrap</span>())?;
<span class="keyword">let</span> name: &<span class="type">str</span> = <span class="function">parse_value</span>(cols.<span class="function">next</span>()?.<span class="function">unwrap</span>())?;
<span class="comment">// ...</span>
}</code></pre>
<h2>Handler Traits</h2>
<p>Separate traits for text (simple query) and binary (extended query) handlers:</p>
<pre><code><span class="comment">/// Handler for simple query results (text format).</span>
<span class="keyword">pub trait</span> <span class="type">TextHandler</span> {
<span class="comment">/// Called when a result set begins.</span>
<span class="keyword">fn</span> <span class="function">result_start</span>(&<span class="keyword">mut</span> <span class="keyword">self</span>, cols: <span class="type">RowDescription</span><<span class="lifetime">'_</span>>) -> <span class="type">Result</span><()> { <span class="type">Ok</span>(()) }
<span class="comment">/// Called for each data row.</span>
<span class="keyword">fn</span> <span class="function">row</span>(&<span class="keyword">mut</span> <span class="keyword">self</span>, cols: <span class="type">RowDescription</span><<span class="lifetime">'_</span>>, row: <span class="type">DataRow</span><<span class="lifetime">'_</span>>) -> <span class="type">Result</span><()>;
<span class="comment">/// Called when a result set ends.</span>
<span class="keyword">fn</span> <span class="function">result_end</span>(&<span class="keyword">mut</span> <span class="keyword">self</span>, complete: <span class="type">CommandComplete</span><<span class="lifetime">'_</span>>) -> <span class="type">Result</span><()> { <span class="type">Ok</span>(()) }
}
<span class="comment">/// Handler for extended query results (binary format).</span>
<span class="keyword">pub trait</span> <span class="type">BinaryHandler</span> {
<span class="keyword">fn</span> <span class="function">result_start</span>(&<span class="keyword">mut</span> <span class="keyword">self</span>, cols: <span class="type">RowDescription</span><<span class="lifetime">'_</span>>) -> <span class="type">Result</span><()> { <span class="type">Ok</span>(()) }
<span class="keyword">fn</span> <span class="function">row</span>(&<span class="keyword">mut</span> <span class="keyword">self</span>, cols: <span class="type">RowDescription</span><<span class="lifetime">'_</span>>, row: <span class="type">DataRow</span><<span class="lifetime">'_</span>>) -> <span class="type">Result</span><()>;
<span class="keyword">fn</span> <span class="function">result_end</span>(&<span class="keyword">mut</span> <span class="keyword">self</span>, complete: <span class="type">CommandComplete</span><<span class="lifetime">'_</span>>) -> <span class="type">Result</span><()> { <span class="type">Ok</span>(()) }
}</code></pre>
<h3>Built-in Handlers</h3>
<pre><code><span class="comment">/// Discards all results, captures rows_affected.</span>
<span class="keyword">pub struct</span> <span class="type">DropHandler</span>;
<span class="comment">/// Collects typed rows via FromRow trait.</span>
<span class="keyword">pub struct</span> <span class="type">CollectHandler</span><T: <span class="type">FromRow</span>>;
<span class="comment">/// Captures only the first row.</span>
<span class="keyword">pub struct</span> <span class="type">FirstRowHandler</span><T: <span class="type">FromRow</span>>;</code></pre>
<h3>FromRow Trait</h3>
<pre><code><span class="comment">/// Convert a database row to a typed value.</span>
<span class="keyword">pub trait</span> <span class="type">FromRow</span><<span class="lifetime">'a</span>>: <span class="type">Sized</span> {
<span class="keyword">fn</span> <span class="function">from_row</span>(fields: <span class="type">RowDescriptionFields</span><<span class="lifetime">'a</span>>, row: <span class="type">DataRow</span><<span class="lifetime">'a</span>>) -> <span class="type">Result</span><<span class="type">Self</span>>;
}
<span class="comment">// Implemented for tuples: (T1,), (T1, T2), ... (T1, ..., T12)</span>
<span class="keyword">impl</span><<span class="lifetime">'a</span>, T1: <span class="type">FromSql</span><<span class="lifetime">'a</span>>> <span class="type">FromRow</span><<span class="lifetime">'a</span>> <span class="keyword">for</span> (T1,) { ... }
<span class="keyword">impl</span><<span class="lifetime">'a</span>, T1, T2> <span class="type">FromRow</span><<span class="lifetime">'a</span>> <span class="keyword">for</span> (T1, T2) <span class="keyword">where</span> T1: <span class="type">FromSql</span><<span class="lifetime">'a</span>>, T2: <span class="type">FromSql</span><<span class="lifetime">'a</span>> { ... }
<span class="comment">// ...</span></code></pre>
<h2>Async Message Handling</h2>
<p>Async messages (NotificationResponse, NoticeResponse, ParameterStatus) can arrive at any time.</p>
<h3>State Machine Action</h3>
<pre><code><span class="comment">/// Action requested by a state machine.</span>
<span class="keyword">pub enum</span> <span class="type">Action</span> {
<span class="comment">/// Write SSL request, then read single byte response ('S' or 'N').</span>
WriteAndReadByte,
<span class="comment">/// Read a PostgreSQL message (type byte + length + payload).</span>
ReadMessage,
<span class="comment">/// Write `buffer_set.write_buffer` to the server.</span>
Write,
<span class="comment">/// Write then read a message (common for queries).</span>
WriteAndReadMessage,
<span class="comment">/// Perform TLS handshake after SSL negotiation.</span>
TlsHandshake,
<span class="comment">/// Handle async message, then read next message.</span>
HandleAsyncMessageAndReadMessage(<span class="type">AsyncMessage</span>),
<span class="comment">/// State machine has completed.</span>
Finished,
}
<span class="keyword">pub enum</span> <span class="type">AsyncMessage</span> {
Notification { pid: <span class="type">u32</span>, channel: <span class="type">String</span>, payload: <span class="type">String</span> },
Notice(<span class="type">ServerError</span>),
ParameterChanged { name: <span class="type">String</span>, value: <span class="type">String</span> },
}</code></pre>
<h3>Sync API: Callback Handler</h3>
<pre><code><span class="comment">/// Handler for asynchronous messages from the server.</span>
<span class="keyword">pub trait</span> <span class="type">AsyncMessageHandler</span>: <span class="type">Send</span> {
<span class="keyword">fn</span> <span class="function">handle</span>(&<span class="keyword">mut</span> <span class="keyword">self</span>, msg: &<span class="type">AsyncMessage</span>);
}
<span class="comment">// Closures implement AsyncMessageHandler automatically</span>
<span class="keyword">impl</span><F: <span class="type">FnMut</span>(&<span class="type">AsyncMessage</span>) + <span class="type">Send</span>> <span class="type">AsyncMessageHandler</span> <span class="keyword">for</span> F {
<span class="keyword">fn</span> <span class="function">handle</span>(&<span class="keyword">mut</span> <span class="keyword">self</span>, msg: &<span class="type">AsyncMessage</span>) { <span class="keyword">self</span>(msg) }
}
<span class="comment">// Usage</span>
<span class="keyword">let mut</span> conn = <span class="type">Conn</span>::<span class="function">new</span>(opts)?;
conn.<span class="function">set_async_message_handler</span>(|msg: &<span class="type">AsyncMessage</span>| {
<span class="keyword">match</span> msg {
<span class="type">AsyncMessage</span>::Notification { channel, payload, .. } => {
<span class="macro">println!</span>(<span class="string">"Notification on {}: {}"</span>, channel, payload);
}
<span class="type">AsyncMessage</span>::Notice(err) => {
<span class="macro">println!</span>(<span class="string">"Notice: {:?}"</span>, err);
}
<span class="type">AsyncMessage</span>::ParameterChanged { name, value } => {
<span class="macro">println!</span>(<span class="string">"Parameter {} = {}"</span>, name, value);
}
}
});</code></pre>
<h2>Buffer Management</h2>
<h3>BufferSet</h3>
<p>Per-connection buffers for reading, writing, and column metadata:</p>
<pre><code><span class="keyword">pub struct</span> <span class="type">BufferSet</span> {
<span class="keyword">pub</span> read_buffer: <span class="type">Vec</span><<span class="type">u8</span>>, <span class="comment">// Message payload (without type byte)</span>
<span class="keyword">pub</span> write_buffer: <span class="type">Vec</span><<span class="type">u8</span>>, <span class="comment">// Outgoing messages</span>
<span class="keyword">pub</span> column_buffer: <span class="type">Vec</span><<span class="type">u8</span>>, <span class="comment">// Cached RowDescription for row iteration</span>
<span class="keyword">pub</span> type_byte: <span class="type">u8</span>, <span class="comment">// Last message type byte received</span>
}</code></pre>
<h3>BufferPool</h3>
<p>Global pool for reusing BufferSets across connections:</p>
<pre><code><span class="comment">/// Global buffer pool (thread-safe).</span>
<span class="keyword">pub static</span> GLOBAL_BUFFER_POOL: <span class="type">LazyLock</span><<span class="type">Arc</span><<span class="type">BufferPool</span>>>;
<span class="keyword">pub struct</span> <span class="type">BufferPool</span> {
buffer_sets: <span class="type">ArrayQueue</span><<span class="type">BufferSet</span>>,
}
<span class="keyword">impl</span> <span class="type">BufferPool</span> {
<span class="keyword">pub fn</span> <span class="function">get_buffer_set</span>(<span class="keyword">self</span>: &<span class="type">Arc</span><<span class="type">Self</span>>) -> <span class="type">PooledBufferSet</span>;
<span class="keyword">pub fn</span> <span class="function">return_buffer_set</span>(&<span class="keyword">self</span>, buffer_set: <span class="type">BufferSet</span>);
}
<span class="comment">/// RAII wrapper that returns BufferSet to pool on drop.</span>
<span class="keyword">pub struct</span> <span class="type">PooledBufferSet</span> {
pool: <span class="type">Arc</span><<span class="type">BufferPool</span>>,
inner: <span class="type">ManuallyDrop</span><<span class="type">BufferSet</span>>,
}</code></pre>
<h2>Transaction Status Tracking</h2>
<p>PostgreSQL sends transaction status with every ReadyForQuery:</p>
<pre><code><span class="macro">#[derive(Debug, Clone, Copy, PartialEq, Eq)]</span>
<span class="keyword">pub enum</span> <span class="type">TransactionStatus</span> {
Idle, <span class="comment">// 'I'</span>
InTransaction, <span class="comment">// 'T'</span>
Failed, <span class="comment">// 'E'</span>
}
<span class="keyword">impl</span> <span class="type">Connection</span> {
<span class="keyword">pub fn</span> <span class="function">transaction_status</span>(&<span class="keyword">self</span>) -> <span class="type">TransactionStatus</span>;
<span class="keyword">pub fn</span> <span class="function">in_transaction</span>(&<span class="keyword">self</span>) -> <span class="type">bool</span>;
}</code></pre>
<h2>Pipelining Support</h2>
<p>PostgreSQL's extended protocol supports pipelining natively. The implementation uses a simple ticket-based approach with sequence numbers for ordering:</p>
<h3>Handle Types</h3>
<pre><code><span class="comment">/// A ticket for a queued pipeline operation.</span>
<span class="comment">///</span>
<span class="comment">/// Created by [`Pipeline::exec`].</span>
<span class="comment">/// Claim with [`claim_collect`], [`claim_one`], or [`claim_drop`].</span>
<span class="macro">#[derive(Debug)]</span>
<span class="keyword">pub struct</span> <span class="type">Ticket</span> {
seq: <span class="type">usize</span>,
}</code></pre>
<h3>Pipeline API</h3>
<pre><code><span class="keyword">pub struct</span> <span class="type">Pipeline</span><<span class="lifetime">'a</span>> {
conn: &<span class="lifetime">'a</span> <span class="keyword">mut</span> <span class="type">Conn</span>,
queue_seq: <span class="type">usize</span>, <span class="comment">// Queued operations counter</span>
claim_seq: <span class="type">usize</span>, <span class="comment">// Next to claim counter</span>
expectations: <span class="type">Vec</span><<span class="type">Expectation</span>>,
<span class="comment">// ...</span>
}
<span class="keyword">impl</span><<span class="lifetime">'a</span>> <span class="type">Pipeline</span><<span class="lifetime">'a</span>> {
<span class="comment">/// Queue a statement execution (prepared statement or raw SQL).</span>
<span class="keyword">pub fn</span> <span class="function">exec</span><P: <span class="type">ToParams</span>>(
&<span class="keyword">mut</span> <span class="keyword">self</span>,
statement: <span class="keyword">impl</span> <span class="type">IntoStatement</span>,
params: P,
) -> <span class="type">Result</span><<span class="type">Ticket</span>>;
<span class="comment">/// Send FLUSH message to trigger server response.</span>
<span class="keyword">pub fn</span> <span class="function">flush</span>(&<span class="keyword">mut</span> <span class="keyword">self</span>) -> <span class="type">Result</span><()>;
<span class="comment">/// Send SYNC message to establish transaction boundary.</span>
<span class="keyword">pub fn</span> <span class="function">sync</span>(&<span class="keyword">mut</span> <span class="keyword">self</span>) -> <span class="type">Result</span><()>;
<span class="comment">/// Claim and collect all rows.</span>
<span class="keyword">pub fn</span> <span class="function">claim_collect</span><T: <span class="keyword">for</span><<span class="lifetime">'b</span>> <span class="type">FromRow</span><<span class="lifetime">'b</span>>>(
&<span class="keyword">mut</span> <span class="keyword">self</span>, ticket: <span class="type">Ticket</span>
) -> <span class="type">Result</span><<span class="type">Vec</span><T>>;
<span class="comment">/// Claim and return just the first row.</span>
<span class="keyword">pub fn</span> <span class="function">claim_one</span><T: <span class="keyword">for</span><<span class="lifetime">'b</span>> <span class="type">FromRow</span><<span class="lifetime">'b</span>>>(
&<span class="keyword">mut</span> <span class="keyword">self</span>, ticket: <span class="type">Ticket</span>
) -> <span class="type">Result</span><<span class="type">Option</span><T>>;
<span class="comment">/// Claim and discard all rows.</span>
<span class="keyword">pub fn</span> <span class="function">claim_drop</span>(&<span class="keyword">mut</span> <span class="keyword">self</span>, ticket: <span class="type">Ticket</span>) -> <span class="type">Result</span><()>;
}</code></pre>
<h3>Example Usage</h3>
<pre><code><span class="comment">// Prepare statements outside the pipeline</span>
<span class="keyword">let</span> stmts = conn.<span class="function">prepare_batch</span>(&[
<span class="string">"SELECT id, name FROM users WHERE active = $1"</span>,
])?;
<span class="keyword">let</span> (active, inactive) = conn.<span class="function">run_pipeline</span>(|p| {
<span class="comment">// Queue executions - writes to socket immediately</span>
<span class="keyword">let</span> t1 = p.<span class="function">exec</span>(&stmts[<span class="string">0</span>], (<span class="keyword">true</span>,))?;
<span class="keyword">let</span> t2 = p.<span class="function">exec</span>(&stmts[<span class="string">0</span>], (<span class="keyword">false</span>,))?;
<span class="comment">// Mark end of implicit transaction</span>
p.<span class="function">sync</span>()?;
<span class="comment">// Claim results in queued order</span>
<span class="keyword">let</span> active: <span class="type">Vec</span><(i32, <span class="type">String</span>)> = p.<span class="function">claim_collect</span>(t1)?;
<span class="keyword">let</span> inactive: <span class="type">Vec</span><(i32, <span class="type">String</span>)> = p.<span class="function">claim_collect</span>(t2)?;
<span class="type">Ok</span>((active, inactive))
})?;</code></pre>
<h2>Why Frontend/Backend Separation</h2>
<p>PostgreSQL uses distinct message type bytes for each direction (unlike MySQL's symmetric packets). Separating by direction provides:</p>
<ol>
<li><strong>Type safety</strong> - Cannot accidentally send a backend message from client or vice versa</li>
<li><strong>Clear ownership</strong> - Frontend = encoding only, Backend = parsing only</li>
<li><strong>Easier testing</strong> - Test encoding and parsing independently</li>
<li><strong>Matches protocol</strong> - Directory structure mirrors protocol specification</li>
<li><strong>No confusion</strong> - Same type byte means different things: <code>'D'</code> is Describe (frontend) vs DataRow (backend)</li>
</ol>
<pre><code>frontend/ → <span class="function">encode</span>(&<span class="keyword">mut</span> buf) → write to socket
backend/ ← <span class="function">parse</span>(payload) ← read from socket</code></pre>
<h2>Key Differences from zero-mysql</h2>
<table>
<thead>
<tr>
<th>Aspect</th>
<th>zero-mysql</th>
<th>zero-postgresql</th>
</tr>
</thead>
<tbody>
<tr>
<td>Message types</td>
<td>Single packet type</td>
<td>Separate frontend/backend, no wrapper enum</td>
</tr>
<tr>
<td>Handler</td>
<td>Complex multi-method</td>
<td>Simple: columns() + row()</td>
</tr>
<tr>
<td>Transaction status</td>
<td>N/A</td>
<td>Tracked from ReadyForQuery</td>
</tr>
<tr>
<td>Pipelining</td>
<td>Limited</td>
<td>First-class Pipeline API</td>
</tr>
<tr>
<td>Async messages</td>
<td>N/A</td>
<td>Action::AsyncMessage + handler/channel</td>
</tr>
</tbody>
</table>
</body>
</html>