use std::pin::Pin;
use tokio_postgres::Client;
use tokio_postgres::binary_copy::BinaryCopyInWriter;
pub trait Schema {
fn name() -> &'static str;
fn copy() -> &'static str;
fn creates() -> &'static str;
fn indices() -> &'static str;
fn truncates() -> &'static str;
fn freeze() -> &'static str;
fn columns() -> &'static [tokio_postgres::types::Type];
}
pub trait Derive: Sized + Schema {
fn exhaust() -> Vec<Self>;
fn inserts(&self) -> String;
fn derives() -> String {
Self::exhaust()
.iter()
.map(Self::inserts)
.collect::<Vec<_>>()
.join("\n;")
}
}
#[async_trait::async_trait]
pub trait Hydrate: Sized {
async fn hydrate(client: std::sync::Arc<Client>) -> Self;
}
#[async_trait::async_trait]
pub trait Row: Send {
async fn write(self, writer: Pin<&mut BinaryCopyInWriter>);
}
#[async_trait::async_trait]
impl Row for (i64, i16) {
async fn write(self, writer: Pin<&mut BinaryCopyInWriter>) {
writer.write(&[&self.0, &self.1]).await.expect("write");
}
}
#[async_trait::async_trait]
impl Row for (i32, f32) {
async fn write(self, writer: Pin<&mut BinaryCopyInWriter>) {
writer.write(&[&self.0, &self.1]).await.expect("write");
}
}
#[async_trait::async_trait]
impl Row for (i16, i16, f32) {
async fn write(self, writer: Pin<&mut BinaryCopyInWriter>) {
writer.write(&[&self.0, &self.1, &self.2]).await.expect("write");
}
}
#[rustfmt::skip]
#[async_trait::async_trait]
impl Row for (i64, i16, i64, i64, f32, f32, f32, i32) {
async fn write(self, writer: Pin<&mut BinaryCopyInWriter>) {
writer
.write(&[&self.0, &self.1, &self.2, &self.3, &self.4, &self.5, &self.6, &self.7])
.await
.expect("write");
}
}
#[async_trait::async_trait]
pub trait Streamable: Schema + Sized + Send {
type Row: Row;
fn rows(self) -> impl Iterator<Item = Self::Row> + Send;
async fn stream(self, client: &Client) {
let sink = client.copy_in(Self::copy()).await.expect("copy_in");
let writer = BinaryCopyInWriter::new(sink, Self::columns());
futures::pin_mut!(writer);
for row in self.rows() {
row.write(writer.as_mut()).await;
}
writer.finish().await.expect("finish");
}
async fn finalize(client: &Client) {
log::info!("indexing table ({})", Self::name());
client
.batch_execute(Self::indices())
.await
.expect("indices");
log::info!("freezing table ({})", Self::name());
client.batch_execute(Self::freeze()).await.expect("freeze");
}
}