reddb-io-server 1.0.8

RedDB server-side engine: storage, runtime, replication, MCP, AI, and the gRPC/HTTP/RedWire/PG-wire dispatchers. Re-exported by the umbrella `reddb` crate.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
//! PostgreSQL wire-protocol listener (Phase 3.1 PG parity).
//!
//! Accepts TCP connections from PG-compatible clients, drives the startup
//! handshake, and routes simple-query frames into the existing
//! `RedDBRuntime::execute_query` path. Results are adapted back into PG
//! `RowDescription` + `DataRow` frames via `types::value_to_pg_wire_bytes`.
//!
//! Phase 3.1 intentionally supports only the simple-query subset; extended
//! query (Parse/Bind/Execute) arrives in 3.1.x once the prepared-statement
//! registry is reusable from this layer.

use std::sync::Arc;

use tokio::io::{AsyncRead, AsyncWrite};
use tokio::net::TcpListener;

use super::catalog_views::translate_pg_catalog_query;
use super::protocol::{
    read_frame, read_startup, write_frame, write_raw_byte, BackendMessage, ColumnDescriptor,
    FrontendMessage, PgWireError, TransactionStatus,
};
use super::types::{value_to_pg_wire_bytes, PgOid};
use crate::runtime::RedDBRuntime;
use crate::storage::query::unified::UnifiedRecord;
use crate::storage::schema::Value;

/// Startup-tuned configuration for the PG wire listener.
#[derive(Debug, Clone)]
pub struct PgWireConfig {
    /// TCP bind address ("host:port"). The caller is responsible for
    /// keeping this disjoint from the native wire / gRPC / HTTP listeners.
    pub bind_addr: String,
    /// PG version string sent back in `ParameterStatus`. Many drivers
    /// sniff this to enable/disable features. RedDB advertises a
    /// recent-enough version to get the broadest client support.
    pub server_version: String,
}

impl Default for PgWireConfig {
    fn default() -> Self {
        Self {
            bind_addr: "127.0.0.1:5432".to_string(),
            server_version: "15.0 (RedDB 3.1)".to_string(),
        }
    }
}

/// Spawn the PG wire listener. Blocks until the listener errors out.
/// Each connection is handled in its own tokio task.
pub async fn start_pg_wire_listener(
    config: PgWireConfig,
    runtime: Arc<RedDBRuntime>,
) -> Result<(), Box<dyn std::error::Error>> {
    let listener = TcpListener::bind(&config.bind_addr).await?;
    tracing::info!(
        transport = "pg-wire",
        bind = %config.bind_addr,
        "listener online"
    );
    let cfg = Arc::new(config);
    loop {
        let (stream, peer) = listener.accept().await?;
        let rt = Arc::clone(&runtime);
        let cfg = Arc::clone(&cfg);
        let peer_str = peer.to_string();
        tokio::spawn(async move {
            if let Err(e) = handle_connection(stream, rt, cfg).await {
                tracing::warn!(
                    transport = "pg-wire",
                    peer = %peer_str,
                    err = %e,
                    "connection failed"
                );
            }
        });
    }
}

/// Drive one connection's lifetime: startup → authentication → query loop.
pub(crate) async fn handle_connection<S>(
    mut stream: S,
    runtime: Arc<RedDBRuntime>,
    config: Arc<PgWireConfig>,
) -> Result<(), PgWireError>
where
    S: AsyncRead + AsyncWrite + Unpin + Send,
{
    // Handshake. The first frame may be SSLRequest / GSSENCRequest
    // (pre-auth negotiation) or a plain Startup. Loop once to cover
    // SSL-not-supported path: reply 'N' and expect the client to send
    // a regular Startup next.
    loop {
        match read_startup(&mut stream).await? {
            FrontendMessage::SslRequest | FrontendMessage::GssEncRequest => {
                // 'N' = not supported — client continues in plaintext and
                // re-sends a normal Startup on the same socket.
                write_raw_byte(&mut stream, b'N').await?;
                continue;
            }
            FrontendMessage::Startup(params) => {
                send_auth_ok(&mut stream, &config, &params).await?;
                break;
            }
            FrontendMessage::Unknown { .. } => {
                // CancelRequest: no response expected; drop the socket.
                return Ok(());
            }
            other => {
                return Err(PgWireError::Protocol(format!(
                    "unexpected startup frame: {other:?}"
                )));
            }
        }
    }

    // Main query loop.
    loop {
        let frame = match read_frame(&mut stream).await {
            Ok(f) => f,
            Err(PgWireError::Eof) => return Ok(()),
            Err(e) => return Err(e),
        };

        match frame {
            FrontendMessage::Query(sql) => {
                handle_simple_query(&mut stream, &runtime, &sql).await?;
            }
            FrontendMessage::Terminate => return Ok(()),
            FrontendMessage::Sync | FrontendMessage::Flush => {
                // These are part of the extended protocol. For simple-query
                // sessions we still echo ReadyForQuery so robust clients
                // (that mix S/H frames defensively) keep moving.
                write_frame(
                    &mut stream,
                    &BackendMessage::ReadyForQuery(TransactionStatus::Idle),
                )
                .await?;
            }
            FrontendMessage::PasswordMessage(_) => {
                // Should only arrive during auth. Ignore post-auth.
                continue;
            }
            FrontendMessage::Unknown { tag, .. } => {
                send_error(
                    &mut stream,
                    "0A000",
                    &format!("unsupported frame tag 0x{tag:02x}"),
                )
                .await?;
                write_frame(
                    &mut stream,
                    &BackendMessage::ReadyForQuery(TransactionStatus::Idle),
                )
                .await?;
            }
            other => {
                send_error(
                    &mut stream,
                    "0A000",
                    &format!("unsupported frame {other:?}"),
                )
                .await?;
                write_frame(
                    &mut stream,
                    &BackendMessage::ReadyForQuery(TransactionStatus::Idle),
                )
                .await?;
            }
        }
    }
}

async fn send_auth_ok<S>(
    stream: &mut S,
    config: &PgWireConfig,
    params: &super::protocol::StartupParams,
) -> Result<(), PgWireError>
where
    S: AsyncWrite + Unpin,
{
    // Phase 3.1: trust auth. We always send AuthenticationOk.
    write_frame(stream, &BackendMessage::AuthenticationOk).await?;

    // Standard ParameterStatus frames. Drivers gate capabilities on these.
    for (name, value) in [
        ("server_version", config.server_version.as_str()),
        ("server_encoding", "UTF8"),
        ("client_encoding", "UTF8"),
        ("DateStyle", "ISO, MDY"),
        ("TimeZone", "UTC"),
        ("integer_datetimes", "on"),
        ("standard_conforming_strings", "on"),
        (
            "application_name",
            params.get("application_name").unwrap_or(""),
        ),
    ] {
        write_frame(
            stream,
            &BackendMessage::ParameterStatus {
                name: name.to_string(),
                value: value.to_string(),
            },
        )
        .await?;
    }

    // BackendKeyData: (pid, secret_key). Used by CancelRequest; we don't
    // honour cancels in 3.1 so random-ish values are fine.
    write_frame(
        stream,
        &BackendMessage::BackendKeyData {
            pid: std::process::id(),
            key: 0xDEADBEEF,
        },
    )
    .await?;

    write_frame(
        stream,
        &BackendMessage::ReadyForQuery(TransactionStatus::Idle),
    )
    .await?;
    Ok(())
}

async fn handle_simple_query<S>(
    stream: &mut S,
    runtime: &RedDBRuntime,
    sql: &str,
) -> Result<(), PgWireError>
where
    S: AsyncWrite + Unpin,
{
    // Empty query convention: PG emits EmptyQueryResponse instead of a
    // CommandComplete. Some clients (psql `\;`) rely on this.
    if sql.trim().is_empty() {
        write_frame(stream, &BackendMessage::EmptyQueryResponse).await?;
        write_frame(
            stream,
            &BackendMessage::ReadyForQuery(TransactionStatus::Idle),
        )
        .await?;
        return Ok(());
    }

    let query_result = match translate_pg_catalog_query(runtime, sql) {
        Ok(Some(result)) => Ok(crate::runtime::RuntimeQueryResult {
            query: sql.to_string(),
            mode: crate::storage::query::modes::QueryMode::Sql,
            statement: "select",
            engine: "pg-catalog",
            result,
            affected_rows: 0,
            statement_type: "select",
        }),
        Ok(None) => runtime.execute_query(sql),
        Err(err) => Err(err),
    };

    match query_result {
        Ok(result) => {
            if result.statement_type == "select" {
                emit_result_rows(stream, &result.result).await?;
                write_frame(
                    stream,
                    &BackendMessage::CommandComplete(format!(
                        "SELECT {}",
                        result.result.records.len()
                    )),
                )
                .await?;
            } else {
                // DDL / DML / config statements: echo the runtime's
                // high-level statement tag back. PG format is
                // "<CMD> [<OID>] <COUNT>"; we keep the count where
                // applicable and fall back to the runtime's message.
                let tag = match result.statement_type {
                    "insert" => format!("INSERT 0 {}", result.affected_rows),
                    "update" => format!("UPDATE {}", result.affected_rows),
                    "delete" => format!("DELETE {}", result.affected_rows),
                    other => other.to_uppercase(),
                };
                write_frame(stream, &BackendMessage::CommandComplete(tag)).await?;
            }
        }
        Err(err) => {
            // PG SQLSTATE class 42 covers syntax / binding errors; we use
            // 42P01 (undefined_table) and 42601 (syntax_error) when we can
            // detect; otherwise fall back to XX000 (internal error).
            let code = classify_sqlstate(&err.to_string());
            send_error(stream, code, &err.to_string()).await?;
        }
    }

    write_frame(
        stream,
        &BackendMessage::ReadyForQuery(TransactionStatus::Idle),
    )
    .await?;
    Ok(())
}

async fn emit_result_rows<S>(
    stream: &mut S,
    result: &crate::storage::query::unified::UnifiedResult,
) -> Result<(), PgWireError>
where
    S: AsyncWrite + Unpin,
{
    // RowDescription: derived from the first record's column ordering.
    // When `result.columns` is non-empty we honour that order; otherwise
    // we synthesise one from the record's field order.
    let columns: Vec<String> = if !result.columns.is_empty() {
        result.columns.clone()
    } else if let Some(first) = result.records.first() {
        record_field_names(first)
    } else {
        Vec::new()
    };

    // Peek at the first record for per-column type OIDs. When there's no
    // data row we fall back to TEXT for every column — clients render
    // empty result sets happily.
    let type_oids: Vec<PgOid> = columns
        .iter()
        .map(|col| {
            result
                .records
                .first()
                .and_then(|r| record_get(r, col))
                .map(PgOid::from_value)
                .unwrap_or(PgOid::Text)
        })
        .collect();

    let descriptors: Vec<ColumnDescriptor> = columns
        .iter()
        .zip(type_oids.iter())
        .map(|(name, oid)| ColumnDescriptor {
            name: name.clone(),
            table_oid: 0,
            column_attr: 0,
            type_oid: oid.as_u32(),
            type_size: -1,
            type_mod: -1,
            format: 0,
        })
        .collect();

    write_frame(stream, &BackendMessage::RowDescription(descriptors)).await?;

    for record in &result.records {
        let fields: Vec<Option<Vec<u8>>> = columns
            .iter()
            .map(|col| record_get(record, col).and_then(value_to_pg_wire_bytes))
            .collect();
        write_frame(stream, &BackendMessage::DataRow(fields)).await?;
    }

    Ok(())
}

/// Best-effort field lookup on a `UnifiedRecord`. The record API lives in
/// `storage::query::unified` and today uses `HashMap<String, Value>` under
/// the hood — we use `get` if it exists, else fall back to serialised map.
fn record_get<'a>(record: &'a UnifiedRecord, key: &str) -> Option<&'a Value> {
    record.get(key)
}

/// Extract column names in iteration order from a single record. When
/// the caller didn't supply an explicit `columns` projection we use the
/// first record's field ordering as the canonical tuple shape.
///
/// HashMap iteration order is non-deterministic — for Phase 3.1 we
/// accept the shuffle since PG clients receive the ordered header via
/// RowDescription and match cells positionally. A stable ordering
/// would require keeping an insertion-order index alongside `values`.
fn record_field_names(record: &UnifiedRecord) -> Vec<String> {
    // `column_names()` merges the columnar scan side-channel with
    // the HashMap so scan rows (which populate only columnar) still
    // surface their field names in PG wire output.
    record
        .column_names()
        .into_iter()
        .map(|k| k.to_string())
        .collect()
}

async fn send_error<S>(stream: &mut S, code: &str, message: &str) -> Result<(), PgWireError>
where
    S: AsyncWrite + Unpin,
{
    write_frame(
        stream,
        &BackendMessage::ErrorResponse {
            severity: "ERROR".to_string(),
            code: code.to_string(),
            message: message.to_string(),
        },
    )
    .await
}

/// Heuristically map a runtime error message onto a PG SQLSTATE. Full
/// coverage would map every `RedDBError` variant; this is enough for the
/// common psql / JDBC paths.
fn classify_sqlstate(msg: &str) -> &'static str {
    let lower = msg.to_ascii_lowercase();
    if lower.contains("not found") || lower.contains("does not exist") {
        // 42P01 undefined_table; close enough for collection-not-found.
        "42P01"
    } else if lower.contains("parse") || lower.contains("expected") || lower.contains("syntax") {
        "42601"
    } else if lower.contains("already exists") {
        "42P07"
    } else if lower.contains("permission") || lower.contains("auth") {
        "28000"
    } else {
        "XX000"
    }
}