Skip to main content

rbp_database/
traits.rs

1//! PostgreSQL serialization traits.
2//!
3//! Traits for table metadata, bulk loading, and round-trip persistence.
4use std::pin::Pin;
5use tokio_postgres::Client;
6use tokio_postgres::binary_copy::BinaryCopyInWriter;
7
8/// Schema metadata for PostgreSQL tables.
9///
10/// Provides compile-time SQL generation for table creation, indexing,
11/// and bulk data operations. All methods return `&'static str` to avoid
12/// runtime allocations and enable compile-time string construction via
13/// [`const_format::concatcp!`].
14///
15/// # Design
16///
17/// This trait contains no I/O operations—it purely describes table structure.
18/// Actual database operations are handled by [`Streamable`] and [`Hydrate`].
19pub trait Schema {
20    /// Returns the table name in the database.
21    fn name() -> &'static str;
22    /// Returns the `COPY ... FROM STDIN BINARY` command for bulk loading.
23    fn copy() -> &'static str;
24    /// Returns `CREATE TABLE IF NOT EXISTS` DDL statement.
25    fn creates() -> &'static str;
26    /// Returns `CREATE INDEX IF NOT EXISTS` statements for all indices.
27    fn indices() -> &'static str;
28    /// Returns `TRUNCATE TABLE` statement for clearing data.
29    fn truncates() -> &'static str;
30    /// Returns SQL to optimize table for read-heavy workloads.
31    ///
32    /// Typically sets `fillfactor = 100` and disables autovacuum for
33    /// tables that are bulk-loaded once and never modified.
34    fn freeze() -> &'static str;
35    /// Returns PostgreSQL column types for binary COPY protocol.
36    fn columns() -> &'static [tokio_postgres::types::Type];
37}
38
39/// Derived table generation from enumerable domain values.
40///
41/// For tables whose contents can be exhaustively enumerated at runtime
42/// (e.g., street configurations, abstraction definitions), this trait
43/// generates INSERT statements programmatically.
44///
45/// # Usage
46///
47/// Implement [`exhaust`](Derive::exhaust) to enumerate all valid values,
48/// and [`inserts`](Derive::inserts) to format each as an INSERT statement.
49/// The [`derives`](Derive::derives) method combines these into a single
50/// SQL batch.
51///
52/// # Contrast with Streamable
53///
54/// Use `Derive` for small, enumerable tables where INSERT is sufficient.
55/// Use [`Streamable`] for large datasets requiring binary COPY performance.
56pub trait Derive: Sized + Schema {
57    /// Enumerates all values that should be inserted into the table.
58    fn exhaust() -> Vec<Self>;
59    /// Formats this value as an INSERT statement.
60    fn inserts(&self) -> String;
61    /// Generates a batch of INSERT statements for all enumerated values.
62    fn derives() -> String {
63        Self::exhaust()
64            .iter()
65            .map(Self::inserts)
66            .collect::<Vec<_>>()
67            .join("\n;")
68    }
69}
70
71/// Loading domain objects from PostgreSQL.
72///
73/// Complements [`Schema`] and [`Streamable`] to enable round-trip
74/// persistence. While those traits handle writing, `Hydrate` handles
75/// reading data back into memory.
76#[async_trait::async_trait]
77pub trait Hydrate: Sized {
78    /// Loads this type from the database.
79    ///
80    /// Takes an `Arc<Client>` to allow the implementation to spawn
81    /// concurrent queries if needed.
82    async fn hydrate(client: std::sync::Arc<Client>) -> Self;
83}
84
85/// Binary row serialization for PostgreSQL COPY protocol.
86///
87/// Each implementation handles a specific tuple arity, writing fields
88/// in binary format to match the table schema. The trait enables
89/// [`Streamable`] to work with any row shape.
90///
91/// # Safety
92///
93/// Field order and types must exactly match the table schema defined
94/// by the corresponding [`Schema`] implementation.
95#[async_trait::async_trait]
96pub trait Row: Send {
97    /// Writes this row to the binary COPY stream.
98    async fn write(self, writer: Pin<&mut BinaryCopyInWriter>);
99}
100
101/// Row format for isomorphism → abstraction mappings.
102#[async_trait::async_trait]
103impl Row for (i64, i16) {
104    async fn write(self, writer: Pin<&mut BinaryCopyInWriter>) {
105        writer.write(&[&self.0, &self.1]).await.expect("write");
106    }
107}
108
109/// Row format for triangular index → distance mappings.
110#[async_trait::async_trait]
111impl Row for (i32, f32) {
112    async fn write(self, writer: Pin<&mut BinaryCopyInWriter>) {
113        writer.write(&[&self.0, &self.1]).await.expect("write");
114    }
115}
116
117/// Row format for transition probabilities.
118#[async_trait::async_trait]
119impl Row for (i16, i16, f32) {
120    async fn write(self, writer: Pin<&mut BinaryCopyInWriter>) {
121        writer.write(&[&self.0, &self.1, &self.2]).await.expect("write");
122    }
123}
124
125/// Row format for blueprint strategies.
126#[rustfmt::skip]
127#[async_trait::async_trait]
128impl Row for (i64, i16, i64, i64, f32, f32, f32, i32) {
129    async fn write(self, writer: Pin<&mut BinaryCopyInWriter>) {
130        writer
131            .write(&[&self.0, &self.1, &self.2, &self.3, &self.4, &self.5, &self.6, &self.7])
132            .await
133            .expect("write");
134    }
135}
136
137/// Bulk data upload via PostgreSQL's binary COPY protocol.
138///
139/// Enables high-throughput streaming of domain objects to the database
140/// using PostgreSQL's most efficient data ingestion path. The binary
141/// format avoids text parsing overhead and matches Rust's native types.
142///
143/// # Requirements
144///
145/// Implementors must also implement [`Schema`] for table metadata and
146/// define a [`Row`] type that handles binary serialization.
147///
148/// # Performance
149///
150/// Binary COPY is orders of magnitude faster than INSERT statements
151/// for bulk loading. A typical clustering run uploads millions of rows
152/// in seconds rather than hours.
153#[async_trait::async_trait]
154pub trait Streamable: Schema + Sized + Send {
155    /// The row type for binary serialization.
156    type Row: Row;
157    /// Converts this collection into an iterator of rows for streaming.
158    fn rows(self) -> impl Iterator<Item = Self::Row> + Send;
159    /// Streams all rows to PostgreSQL via binary COPY.
160    ///
161    /// Opens a COPY stream, writes each row in binary format, and
162    /// finalizes the upload. Consumes `self` to enable move semantics.
163    async fn stream(self, client: &Client) {
164        let sink = client.copy_in(Self::copy()).await.expect("copy_in");
165        let writer = BinaryCopyInWriter::new(sink, Self::columns());
166        futures::pin_mut!(writer);
167        for row in self.rows() {
168            row.write(writer.as_mut()).await;
169        }
170        writer.finish().await.expect("finish");
171    }
172    /// Creates indices and optimizes table for read-heavy access.
173    ///
174    /// Call once after all data has been uploaded. Creates indices
175    /// defined by [`Schema::indices`] and applies freeze settings.
176    async fn finalize(client: &Client) {
177        log::info!("indexing table ({})", Self::name());
178        client
179            .batch_execute(Self::indices())
180            .await
181            .expect("indices");
182        log::info!("freezing table ({})", Self::name());
183        client.batch_execute(Self::freeze()).await.expect("freeze");
184    }
185}