db_mover/databases/postgres/
value.rs

1use std::io::Write;
2
3use anyhow::Context;
4use chrono::{DateTime, Datelike, NaiveDate, NaiveDateTime, NaiveTime, Timelike, Utc};
5use postgres::types::{ToSql, Type};
6use rust_decimal::Decimal;
7
8use crate::databases::{
9    table::{Column, ColumnType, Value},
10    traits::WriterError,
11};
12
13impl TryFrom<Type> for ColumnType {
14    type Error = anyhow::Error;
15
16    fn try_from(value: Type) -> Result<Self, Self::Error> {
17        let column_type = match value {
18            Type::INT8 => ColumnType::I64,
19            Type::INT4 => ColumnType::I32,
20            Type::INT2 => ColumnType::I16,
21            Type::FLOAT8 => ColumnType::F64,
22            Type::FLOAT4 => ColumnType::F32,
23            Type::NUMERIC => ColumnType::Decimal,
24            Type::BOOL => ColumnType::Bool,
25            Type::VARCHAR | Type::TEXT | Type::BPCHAR => ColumnType::String,
26            Type::BYTEA => ColumnType::Bytes,
27            Type::TIMESTAMPTZ => ColumnType::Timestamptz,
28            Type::TIMESTAMP => ColumnType::Timestamp,
29            Type::DATE => ColumnType::Date,
30            Type::TIME => ColumnType::Time,
31            Type::JSON | Type::JSON_ARRAY | Type::JSONB | Type::JSONB_ARRAY => ColumnType::Json,
32            Type::UUID => ColumnType::Uuid,
33            _ => return Err(anyhow::anyhow!("Unsupported postgres type {value}")),
34        };
35        return Ok(column_type);
36    }
37}
38
39#[derive(Clone, Debug, PartialEq)]
40pub struct PostgreColumn {
41    pub name: String,
42    pub column_type: postgres::types::Type,
43    pub nullable: bool,
44}
45
46impl TryFrom<PostgreColumn> for Column {
47    type Error = anyhow::Error;
48
49    fn try_from(value: PostgreColumn) -> Result<Self, Self::Error> {
50        return Ok(Column {
51            name: value.name,
52            column_type: value.column_type.try_into()?,
53            nullable: value.nullable,
54        });
55    }
56}
57
58impl TryFrom<(ColumnType, &postgres::Row, usize)> for Value {
59    type Error = anyhow::Error;
60
61    fn try_from(value: (ColumnType, &postgres::Row, usize)) -> Result<Self, Self::Error> {
62        let (column_type, row, idx) = value;
63        let real_columnt_type = &row.columns()[idx];
64        let value = match column_type {
65            ColumnType::I64 => {
66                if real_columnt_type.type_() == &Type::INT2 {
67                    row.get::<_, Option<i16>>(idx)
68                        .map_or(Value::Null, |val| Value::I64(val as i64))
69                } else if real_columnt_type.type_() == &Type::INT4 {
70                    row.get::<_, Option<i32>>(idx)
71                        .map_or(Value::Null, |val| Value::I64(val as i64))
72                } else {
73                    row.get::<_, Option<i64>>(idx)
74                        .map_or(Value::Null, Value::I64)
75                }
76            }
77            ColumnType::I32 => {
78                if real_columnt_type.type_() == &Type::INT2 {
79                    row.get::<_, Option<i16>>(idx)
80                        .map_or(Value::Null, |val| Value::I32(val as i32))
81                } else {
82                    row.get::<_, Option<i32>>(idx)
83                        .map_or(Value::Null, Value::I32)
84                }
85            }
86            ColumnType::I16 => row
87                .get::<_, Option<i16>>(idx)
88                .map_or(Value::Null, Value::I16),
89            ColumnType::F64 => {
90                if real_columnt_type.type_() == &Type::FLOAT4 {
91                    row.get::<_, Option<f32>>(idx)
92                        .map_or(Value::Null, |val| Value::F64(val as f64))
93                } else {
94                    row.get::<_, Option<f64>>(idx)
95                        .map_or(Value::Null, Value::F64)
96                }
97            }
98            ColumnType::F32 => row
99                .get::<_, Option<f32>>(idx)
100                .map_or(Value::Null, Value::F32),
101            ColumnType::Decimal => row
102                .get::<_, Option<Decimal>>(idx)
103                .map_or(Value::Null, Value::Decimal),
104            ColumnType::Bool => row
105                .get::<_, Option<bool>>(idx)
106                .map_or(Value::Null, Value::Bool),
107            ColumnType::String => row
108                .get::<_, Option<String>>(idx)
109                .map_or(Value::Null, Value::String),
110            ColumnType::Bytes => row
111                .get::<_, Option<Vec<u8>>>(idx)
112                .map_or(Value::Null, |val| Value::Bytes(bytes::Bytes::from(val))),
113            ColumnType::Timestamptz => row
114                .get::<_, Option<DateTime<Utc>>>(idx)
115                .map_or(Value::Null, Value::Timestamptz),
116            ColumnType::Timestamp => row
117                .get::<_, Option<NaiveDateTime>>(idx)
118                .map_or(Value::Null, Value::Timestamp),
119            ColumnType::Date => row
120                .get::<_, Option<NaiveDate>>(idx)
121                .map_or(Value::Null, Value::Date),
122            ColumnType::Time => row
123                .get::<_, Option<NaiveTime>>(idx)
124                .map_or(Value::Null, Value::Time),
125            ColumnType::Json => row
126                .get::<_, Option<serde_json::Value>>(idx)
127                .map_or(Value::Null, Value::Json),
128            ColumnType::Uuid => row
129                .get::<_, Option<uuid::Uuid>>(idx)
130                .map_or(Value::Null, Value::Uuid),
131        };
132        return Ok(value);
133    }
134}
135
136const POSTGRES_EPOCH: NaiveDateTime = NaiveDate::from_ymd_opt(2000, 1, 1)
137    .unwrap()
138    .and_hms_opt(0, 0, 0)
139    .unwrap();
140
141impl Value {
142    pub(crate) fn write_postgres_bytes(
143        &self,
144        writer: &mut impl Write,
145        column: &PostgreColumn,
146    ) -> Result<(), WriterError> {
147        match self {
148            &Value::Null => {
149                writer.write_all(&(-1_i32).to_be_bytes())?;
150            }
151            &Value::I64(num) => {
152                writer.write_all(&(size_of_val(&num) as i32).to_be_bytes())?;
153                writer.write_all(&num.to_be_bytes())?;
154            }
155            &Value::I32(num) => {
156                writer.write_all(&(size_of_val(&num) as i32).to_be_bytes())?;
157                writer.write_all(&num.to_be_bytes())?;
158            }
159            &Value::I16(num) => {
160                writer.write_all(&(size_of_val(&num) as i32).to_be_bytes())?;
161                writer.write_all(&num.to_be_bytes())?;
162            }
163            &Value::F64(num) => {
164                writer.write_all(&(size_of_val(&num) as i32).to_be_bytes())?;
165                writer.write_all(&num.to_be_bytes())?;
166            }
167            &Value::F32(num) => {
168                writer.write_all(&(size_of_val(&num) as i32).to_be_bytes())?;
169                writer.write_all(&num.to_be_bytes())?;
170            }
171            &Value::Decimal(num) => {
172                let mut buffer = bytes::BytesMut::new();
173                num.to_sql(&Type::NUMERIC, &mut buffer)
174                    .map_err(anyhow::Error::from_boxed)?;
175                writer.write_all(&(buffer.len() as i32).to_be_bytes())?;
176                writer.write_all(&buffer)?;
177            }
178            &Value::Bool(val) => {
179                let val = u8::from(val);
180                writer.write_all(&(size_of_val(&val) as i32).to_be_bytes())?;
181                writer.write_all(&val.to_be_bytes())?;
182            }
183            Value::Bytes(bytes) => {
184                writer.write_all(&(bytes.len() as i32).to_be_bytes())?;
185                writer.write_all(bytes)?;
186            }
187            Value::String(string) => {
188                let bytes = string.as_bytes();
189                writer.write_all(&(bytes.len() as i32).to_be_bytes())?;
190                writer.write_all(bytes)?;
191            }
192            &Value::Timestamptz(dt) => {
193                let val = dt.timestamp_micros() - POSTGRES_EPOCH.and_utc().timestamp_micros();
194                writer.write_all(&(size_of_val(&val) as i32).to_be_bytes())?;
195                writer.write_all(&val.to_be_bytes())?;
196            }
197            &Value::Timestamp(dt) => {
198                let val =
199                    dt.and_utc().timestamp_micros() - POSTGRES_EPOCH.and_utc().timestamp_micros();
200                writer.write_all(&(size_of_val(&val) as i32).to_be_bytes())?;
201                writer.write_all(&val.to_be_bytes())?;
202            }
203            &Value::Date(date) => {
204                let val = date.num_days_from_ce() - POSTGRES_EPOCH.num_days_from_ce();
205                writer.write_all(&(size_of_val(&val) as i32).to_be_bytes())?;
206                writer.write_all(&val.to_be_bytes())?;
207            }
208            &Value::Time(time) => {
209                let microsecs = (time.num_seconds_from_midnight() as u64) * 1000000;
210                writer.write_all(&(size_of_val(&microsecs) as i32).to_be_bytes())?;
211                writer.write_all(&microsecs.to_be_bytes())?;
212            }
213            Value::Json(value) => {
214                let bytes =
215                    serde_json::to_vec(value).context("Failed to serialize json into bytes")?;
216                if column.column_type == Type::JSONB || column.column_type == Type::JSONB_ARRAY {
217                    let jsonb_version = 1_u8;
218                    let len = (bytes.len() + size_of_val(&jsonb_version)) as i32;
219                    writer.write_all(&(len).to_be_bytes())?;
220                    writer.write_all(&(jsonb_version).to_be_bytes())?;
221                } else {
222                    writer.write_all(&(bytes.len() as i32).to_be_bytes())?;
223                }
224                writer.write_all(&bytes)?;
225            }
226            &Value::Uuid(val) => {
227                let bytes = val.as_bytes();
228                writer.write_all(&(bytes.len() as i32).to_be_bytes())?;
229                writer.write_all(bytes)?;
230            }
231        };
232        return Ok(());
233    }
234}