rbp_database/traits.rs
1//! PostgreSQL serialization traits.
2//!
3//! Traits for table metadata, bulk loading, and round-trip persistence.
4use std::pin::Pin;
5use tokio_postgres::Client;
6use tokio_postgres::binary_copy::BinaryCopyInWriter;
7
8/// Schema metadata for PostgreSQL tables.
9///
10/// Provides compile-time SQL generation for table creation, indexing,
11/// and bulk data operations. All methods return `&'static str` to avoid
12/// runtime allocations and enable compile-time string construction via
13/// [`const_format::concatcp!`].
14///
15/// # Design
16///
17/// This trait contains no I/O operations—it purely describes table structure.
18/// Actual database operations are handled by [`Streamable`] and [`Hydrate`].
19pub trait Schema {
20 /// Returns the table name in the database.
21 fn name() -> &'static str;
22 /// Returns the `COPY ... FROM STDIN BINARY` command for bulk loading.
23 fn copy() -> &'static str;
24 /// Returns `CREATE TABLE IF NOT EXISTS` DDL statement.
25 fn creates() -> &'static str;
26 /// Returns `CREATE INDEX IF NOT EXISTS` statements for all indices.
27 fn indices() -> &'static str;
28 /// Returns `TRUNCATE TABLE` statement for clearing data.
29 fn truncates() -> &'static str;
30 /// Returns SQL to optimize table for read-heavy workloads.
31 ///
32 /// Typically sets `fillfactor = 100` and disables autovacuum for
33 /// tables that are bulk-loaded once and never modified.
34 fn freeze() -> &'static str;
35 /// Returns PostgreSQL column types for binary COPY protocol.
36 fn columns() -> &'static [tokio_postgres::types::Type];
37}
38
39/// Derived table generation from enumerable domain values.
40///
41/// For tables whose contents can be exhaustively enumerated at runtime
42/// (e.g., street configurations, abstraction definitions), this trait
43/// generates INSERT statements programmatically.
44///
45/// # Usage
46///
47/// Implement [`exhaust`](Derive::exhaust) to enumerate all valid values,
48/// and [`inserts`](Derive::inserts) to format each as an INSERT statement.
49/// The [`derives`](Derive::derives) method combines these into a single
50/// SQL batch.
51///
52/// # Contrast with Streamable
53///
54/// Use `Derive` for small, enumerable tables where INSERT is sufficient.
55/// Use [`Streamable`] for large datasets requiring binary COPY performance.
56pub trait Derive: Sized + Schema {
57 /// Enumerates all values that should be inserted into the table.
58 fn exhaust() -> Vec<Self>;
59 /// Formats this value as an INSERT statement.
60 fn inserts(&self) -> String;
61 /// Generates a batch of INSERT statements for all enumerated values.
62 fn derives() -> String {
63 Self::exhaust()
64 .iter()
65 .map(Self::inserts)
66 .collect::<Vec<_>>()
67 .join("\n;")
68 }
69}
70
71/// Loading domain objects from PostgreSQL.
72///
73/// Complements [`Schema`] and [`Streamable`] to enable round-trip
74/// persistence. While those traits handle writing, `Hydrate` handles
75/// reading data back into memory.
76#[async_trait::async_trait]
77pub trait Hydrate: Sized {
78 /// Loads this type from the database.
79 ///
80 /// Takes an `Arc<Client>` to allow the implementation to spawn
81 /// concurrent queries if needed.
82 async fn hydrate(client: std::sync::Arc<Client>) -> Self;
83}
84
85/// Binary row serialization for PostgreSQL COPY protocol.
86///
87/// Each implementation handles a specific tuple arity, writing fields
88/// in binary format to match the table schema. The trait enables
89/// [`Streamable`] to work with any row shape.
90///
91/// # Safety
92///
93/// Field order and types must exactly match the table schema defined
94/// by the corresponding [`Schema`] implementation.
95#[async_trait::async_trait]
96pub trait Row: Send {
97 /// Writes this row to the binary COPY stream.
98 async fn write(self, writer: Pin<&mut BinaryCopyInWriter>);
99}
100
101/// Row format for isomorphism → abstraction mappings.
102#[async_trait::async_trait]
103impl Row for (i64, i16) {
104 async fn write(self, writer: Pin<&mut BinaryCopyInWriter>) {
105 writer.write(&[&self.0, &self.1]).await.expect("write");
106 }
107}
108
109/// Row format for triangular index → distance mappings.
110#[async_trait::async_trait]
111impl Row for (i32, f32) {
112 async fn write(self, writer: Pin<&mut BinaryCopyInWriter>) {
113 writer.write(&[&self.0, &self.1]).await.expect("write");
114 }
115}
116
117/// Row format for transition probabilities.
118#[async_trait::async_trait]
119impl Row for (i16, i16, f32) {
120 async fn write(self, writer: Pin<&mut BinaryCopyInWriter>) {
121 writer.write(&[&self.0, &self.1, &self.2]).await.expect("write");
122 }
123}
124
125/// Row format for blueprint strategies.
126#[rustfmt::skip]
127#[async_trait::async_trait]
128impl Row for (i64, i16, i64, i64, f32, f32, f32, i32) {
129 async fn write(self, writer: Pin<&mut BinaryCopyInWriter>) {
130 writer
131 .write(&[&self.0, &self.1, &self.2, &self.3, &self.4, &self.5, &self.6, &self.7])
132 .await
133 .expect("write");
134 }
135}
136
137/// Bulk data upload via PostgreSQL's binary COPY protocol.
138///
139/// Enables high-throughput streaming of domain objects to the database
140/// using PostgreSQL's most efficient data ingestion path. The binary
141/// format avoids text parsing overhead and matches Rust's native types.
142///
143/// # Requirements
144///
145/// Implementors must also implement [`Schema`] for table metadata and
146/// define a [`Row`] type that handles binary serialization.
147///
148/// # Performance
149///
150/// Binary COPY is orders of magnitude faster than INSERT statements
151/// for bulk loading. A typical clustering run uploads millions of rows
152/// in seconds rather than hours.
153#[async_trait::async_trait]
154pub trait Streamable: Schema + Sized + Send {
155 /// The row type for binary serialization.
156 type Row: Row;
157 /// Converts this collection into an iterator of rows for streaming.
158 fn rows(self) -> impl Iterator<Item = Self::Row> + Send;
159 /// Streams all rows to PostgreSQL via binary COPY.
160 ///
161 /// Opens a COPY stream, writes each row in binary format, and
162 /// finalizes the upload. Consumes `self` to enable move semantics.
163 async fn stream(self, client: &Client) {
164 let sink = client.copy_in(Self::copy()).await.expect("copy_in");
165 let writer = BinaryCopyInWriter::new(sink, Self::columns());
166 futures::pin_mut!(writer);
167 for row in self.rows() {
168 row.write(writer.as_mut()).await;
169 }
170 writer.finish().await.expect("finish");
171 }
172 /// Creates indices and optimizes table for read-heavy access.
173 ///
174 /// Call once after all data has been uploaded. Creates indices
175 /// defined by [`Schema::indices`] and applies freeze settings.
176 async fn finalize(client: &Client) {
177 log::info!("indexing table ({})", Self::name());
178 client
179 .batch_execute(Self::indices())
180 .await
181 .expect("indices");
182 log::info!("freezing table ({})", Self::name());
183 client.batch_execute(Self::freeze()).await.expect("freeze");
184 }
185}