Skip to main content

reddb_wire/redwire/
mod.rs

1//! RedWire — RedDB's binary TCP/TLS wire protocol.
2//!
3//! ADR 0001 (`.red/adr/0001-redwire-tcp-protocol.md`) is the
4//! normative spec. This module owns the frame layout, message-kind
5//! discriminator, flags, encode/decode codec, and generic async
6//! frame I/O over byte streams. Server-side dispatch, auth policy,
7//! session loop, and listener accept stay in `reddb` and depend on
8//! these types.
9
10pub mod builder;
11pub mod bulk_binary;
12pub mod bulk_json;
13pub mod bulk_stream;
14pub mod codec;
15pub mod cursor;
16pub mod frame;
17pub mod handshake;
18pub mod io;
19pub mod operations;
20pub mod prepared;
21pub mod queue;
22pub mod stream;
23pub mod ws_gate;
24
25pub use builder::{
26    build_bulk_insert_binary_frame, build_bulk_insert_frame, build_bye_frame, build_delete_frame,
27    build_dispatch_reply_frame, build_error_frame, build_error_frame_lossy, build_get_frame,
28    build_ping_frame, build_query_frame, build_query_with_params_frame, build_reply_frame,
29    build_request_frame, rewrap_length_prefixed_handler_response, BuildError, FrameBuilder,
30};
31pub use bulk_binary::{
32    decode_bulk_binary_payload, encode_bulk_binary_payload, BulkBinaryError, BulkBinaryFlavor,
33    BulkBinaryPayload,
34};
35pub use bulk_json::{
36    decode_bulk_json_payload, encode_bulk_json_payload, BulkJsonError, BulkJsonPayload,
37};
38pub use bulk_stream::{
39    decode_bulk_stream_rows_payload, decode_bulk_stream_start_payload,
40    encode_bulk_stream_rows_payload, encode_bulk_stream_start_payload, BulkStreamError,
41    BulkStreamRowsPayload, BulkStreamStartPayload,
42};
43pub use codec::{
44    decode_frame, decode_frame_parts, encode_frame, frame_len_from_header, FrameError,
45};
46pub use cursor::{
47    decode_close_cursor_payload, decode_declare_cursor_payload, decode_fetch_payload,
48    encode_close_cursor_payload, encode_cursor_batch_payload, encode_cursor_ok_payload,
49    encode_declare_cursor_payload, encode_fetch_payload, CloseCursorPayload, CursorPayloadError,
50    DeclareCursorPayload, FetchPayload,
51};
52pub use frame::{
53    Flags, Frame, MessageClass, MessageDirection, MessageKind, FRAME_HEADER_SIZE, MAX_FRAME_SIZE,
54};
55pub use handshake::{
56    build_auth_fail_frame, build_auth_fail_payload, build_auth_ok_frame_from_payload,
57    build_auth_ok_payload, build_auth_response_anonymous_payload,
58    build_auth_response_bearer_payload, build_auth_response_frame,
59    build_auth_response_oauth_jwt_payload, build_client_hello_frame, build_client_hello_payload,
60    build_hello_ack, build_hello_ack_frame, build_hello_payload, choose_hello_minor_version,
61    expect_auth_response_payload, AuthFail, AuthOk, AuthResponseKindError, Hello, HelloAck,
62    SUPPORTED_METHODS,
63};
64pub use io::{
65    drain_next_frame, frame_to_bytes, read_frame_async, write_frame_async, RedWireIoError,
66};
67pub use operations::{
68    decode_bulk_ok_count_payload, decode_bulk_ok_payload, decode_delete_ok_affected,
69    decode_delete_payload, decode_error_payload, decode_get_payload, decode_get_result_payload,
70    decode_insert_dispatch_payload, decode_query_result_payload, decode_text_payload,
71    encode_bulk_insert_payload, encode_bulk_ok_count_payload, encode_bulk_ok_payload,
72    encode_bulk_ok_payload_from_json_id_literals, encode_bulk_ok_payload_from_json_ids_bytes,
73    encode_delete_ok_payload, encode_get_result_payload, encode_insert_payload, encode_key_payload,
74    encode_query_result_summary_payload, expect_bulk_ok_or_error, expect_delete_ok_or_error,
75    expect_pong_reply, expect_result_or_error, BulkOkPayload, InsertDispatchPayload, KeyPayload,
76    OperationPayloadError, OperationReplyError,
77};
78pub use prepared::{
79    decode_deallocate_payload, decode_execute_prepared_payload, decode_prepare_payload,
80    encode_deallocate_payload, encode_execute_prepared_payload, encode_prepare_payload,
81    encode_prepared_ok_payload, DeallocatePayload, ExecutePreparedPayload, PreparePayload,
82    PreparedOkPayload, PreparedPayloadError,
83};
84pub use queue::{
85    build_event_push_payload, build_event_push_payload_from_json_bytes,
86    build_queue_event_push_frame_from_json_bytes, build_queue_wait_error_frame,
87    build_queue_wait_error_payload, build_queue_wait_open_frame, build_queue_wait_open_payload,
88    build_queue_wait_timeout_frame, build_queue_wait_timeout_payload, parse_queue_wait_open,
89    QueueWaitOpenRequest, QueueWaitParseError, WAIT_CANCELLED_CODE, WAIT_EXCEEDS_CAP_CODE,
90    WAIT_FAILED_CODE,
91};
92pub use stream::{
93    build_input_stream_end_frame, build_input_stream_end_payload, build_input_stream_error_frame,
94    build_input_stream_error_payload, build_open_ack_frame, build_open_ack_payload,
95    build_open_stream_frame, build_open_stream_payload, build_stream_chunk_frame_from_json_bytes,
96    build_stream_chunk_payload, build_stream_chunk_payload_from_json_bytes, build_stream_end_frame,
97    build_stream_end_payload, build_stream_error_frame, build_stream_error_payload,
98    open_stream_is_input, parse_input_chunk, parse_input_chunk_json, parse_open_input,
99    parse_open_stream, parse_stream_cancel, ChunkParseError, InputChunk, InputChunkJson,
100    OpenInputParseError, OpenInputRequest, OpenStreamParseError, OpenStreamRequest,
101    StreamCancelRequest,
102};
103pub use ws_gate::{evaluate_ws_upgrade, WsUpgradeRefusal};
104
105/// Discriminator byte every RedWire client sends as the very first
106/// byte off the wire. The service-router detector keys off this
107/// (and so does the standalone listener path).
108pub const REDWIRE_MAGIC: u8 = 0xFE;
109
110/// Highest minor version the server supports. Wire-bumped as we
111/// add features that change the handshake; data-plane additions
112/// flow through `Hello.features` instead.
113pub const MAX_KNOWN_MINOR_VERSION: u8 = 0x01;
114
115/// Default port for the RedWire listener.
116pub const DEFAULT_REDWIRE_PORT: u16 = 5050;
117
118/// WebSocket subprotocol token advertised by the RedWire-over-WSS edge
119/// (issue #935, ADR 0036). Versioned so a future framing revision can
120/// coexist with v1 clients. reddb-wire is the single source of truth —
121/// the server's axum upgrade handler consumes this constant rather than
122/// defining its own.
123pub const REDWIRE_WS_SUBPROTOCOL: &str = "reddb.redwire.v1";
124
125/// HTTP path the browser client (`red+wss://host:port`) upgrades on to
126/// reach the RedWire-over-WebSocket edge (ADR 0036).
127pub const REDWIRE_WS_PATH: &str = "/redwire";
128
129#[derive(Debug, Clone, Copy, PartialEq, Eq)]
130pub enum StartupError {
131    BadMagic { got: u8 },
132    UnsupportedMinor { got: u8, max: u8 },
133}
134
135impl std::fmt::Display for StartupError {
136    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
137        match self {
138            Self::BadMagic { got } => {
139                write!(
140                    f,
141                    "redwire: client did not present magic byte (got 0x{got:02x})"
142                )
143            }
144            Self::UnsupportedMinor { got, max } => {
145                write!(
146                    f,
147                    "redwire: unsupported minor version {got}; max supported is {max}"
148                )
149            }
150        }
151    }
152}
153
154impl std::error::Error for StartupError {}
155
156pub fn client_preface(minor: u8) -> [u8; 2] {
157    [REDWIRE_MAGIC, minor]
158}
159
160pub fn supported_client_preface() -> [u8; 2] {
161    client_preface(MAX_KNOWN_MINOR_VERSION)
162}
163
164pub fn validate_startup_magic(got: u8) -> Result<(), StartupError> {
165    if got == REDWIRE_MAGIC {
166        Ok(())
167    } else {
168        Err(StartupError::BadMagic { got })
169    }
170}
171
172pub fn validate_minor_version(got: u8) -> Result<(), StartupError> {
173    if got <= MAX_KNOWN_MINOR_VERSION {
174        Ok(())
175    } else {
176        Err(StartupError::UnsupportedMinor {
177            got,
178            max: MAX_KNOWN_MINOR_VERSION,
179        })
180    }
181}
182
183/// Outcome of matching an inbound peek buffer against the RedWire magic
184/// discriminator. Mirrors the service-router's three-way detect result
185/// but stays free of router types so reddb-wire owns the classifier.
186#[derive(Debug, Clone, Copy, PartialEq, Eq)]
187pub enum RedWireMagicMatch {
188    /// Buffer is empty — need at least one byte before we can classify.
189    Pending,
190    /// First byte is the RedWire magic ([`REDWIRE_MAGIC`]).
191    Match,
192    /// First byte is present and is not the RedWire magic.
193    NoMatch,
194}
195
196/// Classify an inbound peek buffer against the RedWire magic byte
197/// ([`REDWIRE_MAGIC`]). Pure and allocation-free so the service-router's
198/// hot-path classifier can delegate the discriminator decision here:
199/// empty → `Pending`, leading `0xFE` → `Match`, anything else → `NoMatch`.
200pub fn redwire_magic_match(peek: &[u8]) -> RedWireMagicMatch {
201    match peek.first() {
202        None => RedWireMagicMatch::Pending,
203        Some(&first) if first == REDWIRE_MAGIC => RedWireMagicMatch::Match,
204        Some(_) => RedWireMagicMatch::NoMatch,
205    }
206}
207
208#[cfg(test)]
209mod startup_tests {
210    use super::*;
211
212    #[test]
213    fn preface_uses_magic_and_supported_minor() {
214        assert_eq!(supported_client_preface(), [0xfe, MAX_KNOWN_MINOR_VERSION]);
215    }
216
217    #[test]
218    fn startup_validation_rejects_bad_magic_and_future_minor() {
219        assert_eq!(validate_startup_magic(REDWIRE_MAGIC), Ok(()));
220        assert!(matches!(
221            validate_startup_magic(0),
222            Err(StartupError::BadMagic { got: 0 })
223        ));
224        assert_eq!(validate_minor_version(MAX_KNOWN_MINOR_VERSION), Ok(()));
225        assert!(matches!(
226            validate_minor_version(MAX_KNOWN_MINOR_VERSION.saturating_add(1)),
227            Err(StartupError::UnsupportedMinor { .. })
228        ));
229    }
230
231    #[test]
232    fn magic_match_classifies_peek_buffer() {
233        // Empty → Pending: not enough bytes to decide yet.
234        assert_eq!(redwire_magic_match(&[]), RedWireMagicMatch::Pending);
235        // Leading magic → Match, regardless of trailing bytes.
236        assert_eq!(
237            redwire_magic_match(&[REDWIRE_MAGIC]),
238            RedWireMagicMatch::Match
239        );
240        assert_eq!(
241            redwire_magic_match(&[0xFE, 0x01, 0x10]),
242            RedWireMagicMatch::Match
243        );
244        // Any other leading byte (HTTP 'G', H2 'P', binary 0x10) → NoMatch.
245        assert_eq!(redwire_magic_match(b"GET "), RedWireMagicMatch::NoMatch);
246        assert_eq!(redwire_magic_match(&[0x10]), RedWireMagicMatch::NoMatch);
247    }
248}