use bytes::Bytes;
use failure::Fail;
use super::{connect, PostgresLocator};
use crate::common::*;
use crate::drivers::postgres_shared::PgCreateTable;
pub(crate) async fn local_data_helper(
ctx: Context,
url: Url,
table_name: String,
shared_args: SharedArguments<Unverified>,
source_args: SourceArguments<Unverified>,
) -> Result<Option<BoxStream<CsvStream>>> {
let shared_args = shared_args.verify(PostgresLocator::features())?;
let source_args = source_args.verify(PostgresLocator::features())?;
let schema = shared_args.schema();
let ctx =
ctx.child(o!("stream" => table_name.clone(), "table" => table_name.clone()));
debug!(ctx.log(), "reading data from {} table {}", url, table_name);
let pg_create_table =
PgCreateTable::from_name_and_columns(table_name.clone(), &schema.columns)?;
let mut sql_bytes: Vec<u8> = vec![];
pg_create_table.write_export_sql(&mut sql_bytes, &source_args)?;
let sql = String::from_utf8(sql_bytes).expect("should always be UTF-8");
debug!(ctx.log(), "export SQL: {}", sql);
let mut conn = connect(ctx.clone(), url).await?;
let stmt = conn.prepare(&sql).compat().await?;
let rdr = conn
.copy_out(&stmt, &[])
.map(move |bytes: Bytes| -> BytesMut {
trace!(ctx.log(), "read {} bytes", bytes.len());
bytes.into()
})
.map_err(|err| err.context("error reading data from PostgreSQL").into());
let csv_stream = CsvStream {
name: table_name.clone(),
data: Box::new(rdr),
};
let box_stream: BoxStream<CsvStream> = Box::new(stream::once(Ok(csv_stream)));
Ok(Some(box_stream))
}