db_mover/databases/postgres/
value.rs1use std::io::Write;
2
3use anyhow::Context;
4use chrono::{DateTime, Datelike, NaiveDate, NaiveDateTime, NaiveTime, Timelike, Utc};
5use postgres::types::{ToSql, Type};
6use rust_decimal::Decimal;
7
8use crate::databases::{
9 table::{Column, ColumnType, Value},
10 traits::WriterError,
11};
12
13impl TryFrom<Type> for ColumnType {
14 type Error = anyhow::Error;
15
16 fn try_from(value: Type) -> Result<Self, Self::Error> {
17 let column_type = match value {
18 Type::INT8 => ColumnType::I64,
19 Type::INT4 => ColumnType::I32,
20 Type::INT2 => ColumnType::I16,
21 Type::FLOAT8 => ColumnType::F64,
22 Type::FLOAT4 => ColumnType::F32,
23 Type::NUMERIC => ColumnType::Decimal,
24 Type::BOOL => ColumnType::Bool,
25 Type::VARCHAR | Type::TEXT | Type::BPCHAR => ColumnType::String,
26 Type::BYTEA => ColumnType::Bytes,
27 Type::TIMESTAMPTZ => ColumnType::Timestamptz,
28 Type::TIMESTAMP => ColumnType::Timestamp,
29 Type::DATE => ColumnType::Date,
30 Type::TIME => ColumnType::Time,
31 Type::JSON | Type::JSON_ARRAY | Type::JSONB | Type::JSONB_ARRAY => ColumnType::Json,
32 Type::UUID => ColumnType::Uuid,
33 _ => return Err(anyhow::anyhow!("Unsupported postgres type {value}")),
34 };
35 return Ok(column_type);
36 }
37}
38
39#[derive(Clone, Debug, PartialEq)]
40pub struct PostgreColumn {
41 pub name: String,
42 pub column_type: postgres::types::Type,
43 pub nullable: bool,
44}
45
46impl TryFrom<PostgreColumn> for Column {
47 type Error = anyhow::Error;
48
49 fn try_from(value: PostgreColumn) -> Result<Self, Self::Error> {
50 return Ok(Column {
51 name: value.name,
52 column_type: value.column_type.try_into()?,
53 nullable: value.nullable,
54 });
55 }
56}
57
58impl TryFrom<(ColumnType, &postgres::Row, usize)> for Value {
59 type Error = anyhow::Error;
60
61 fn try_from(value: (ColumnType, &postgres::Row, usize)) -> Result<Self, Self::Error> {
62 let (column_type, row, idx) = value;
63 let real_columnt_type = &row.columns()[idx];
64 let value = match column_type {
65 ColumnType::I64 => {
66 if real_columnt_type.type_() == &Type::INT2 {
67 row.get::<_, Option<i16>>(idx)
68 .map_or(Value::Null, |val| Value::I64(val as i64))
69 } else if real_columnt_type.type_() == &Type::INT4 {
70 row.get::<_, Option<i32>>(idx)
71 .map_or(Value::Null, |val| Value::I64(val as i64))
72 } else {
73 row.get::<_, Option<i64>>(idx)
74 .map_or(Value::Null, Value::I64)
75 }
76 }
77 ColumnType::I32 => {
78 if real_columnt_type.type_() == &Type::INT2 {
79 row.get::<_, Option<i16>>(idx)
80 .map_or(Value::Null, |val| Value::I32(val as i32))
81 } else {
82 row.get::<_, Option<i32>>(idx)
83 .map_or(Value::Null, Value::I32)
84 }
85 }
86 ColumnType::I16 => row
87 .get::<_, Option<i16>>(idx)
88 .map_or(Value::Null, Value::I16),
89 ColumnType::F64 => {
90 if real_columnt_type.type_() == &Type::FLOAT4 {
91 row.get::<_, Option<f32>>(idx)
92 .map_or(Value::Null, |val| Value::F64(val as f64))
93 } else {
94 row.get::<_, Option<f64>>(idx)
95 .map_or(Value::Null, Value::F64)
96 }
97 }
98 ColumnType::F32 => row
99 .get::<_, Option<f32>>(idx)
100 .map_or(Value::Null, Value::F32),
101 ColumnType::Decimal => row
102 .get::<_, Option<Decimal>>(idx)
103 .map_or(Value::Null, Value::Decimal),
104 ColumnType::Bool => row
105 .get::<_, Option<bool>>(idx)
106 .map_or(Value::Null, Value::Bool),
107 ColumnType::String => row
108 .get::<_, Option<String>>(idx)
109 .map_or(Value::Null, Value::String),
110 ColumnType::Bytes => row
111 .get::<_, Option<Vec<u8>>>(idx)
112 .map_or(Value::Null, |val| Value::Bytes(bytes::Bytes::from(val))),
113 ColumnType::Timestamptz => row
114 .get::<_, Option<DateTime<Utc>>>(idx)
115 .map_or(Value::Null, Value::Timestamptz),
116 ColumnType::Timestamp => row
117 .get::<_, Option<NaiveDateTime>>(idx)
118 .map_or(Value::Null, Value::Timestamp),
119 ColumnType::Date => row
120 .get::<_, Option<NaiveDate>>(idx)
121 .map_or(Value::Null, Value::Date),
122 ColumnType::Time => row
123 .get::<_, Option<NaiveTime>>(idx)
124 .map_or(Value::Null, Value::Time),
125 ColumnType::Json => row
126 .get::<_, Option<serde_json::Value>>(idx)
127 .map_or(Value::Null, Value::Json),
128 ColumnType::Uuid => row
129 .get::<_, Option<uuid::Uuid>>(idx)
130 .map_or(Value::Null, Value::Uuid),
131 };
132 return Ok(value);
133 }
134}
135
136const POSTGRES_EPOCH: NaiveDateTime = NaiveDate::from_ymd_opt(2000, 1, 1)
137 .unwrap()
138 .and_hms_opt(0, 0, 0)
139 .unwrap();
140
141impl Value {
142 pub(crate) fn write_postgres_bytes(
143 &self,
144 writer: &mut impl Write,
145 column: &PostgreColumn,
146 ) -> Result<(), WriterError> {
147 match self {
148 &Value::Null => {
149 writer.write_all(&(-1_i32).to_be_bytes())?;
150 }
151 &Value::I64(num) => {
152 writer.write_all(&(size_of_val(&num) as i32).to_be_bytes())?;
153 writer.write_all(&num.to_be_bytes())?;
154 }
155 &Value::I32(num) => {
156 writer.write_all(&(size_of_val(&num) as i32).to_be_bytes())?;
157 writer.write_all(&num.to_be_bytes())?;
158 }
159 &Value::I16(num) => {
160 writer.write_all(&(size_of_val(&num) as i32).to_be_bytes())?;
161 writer.write_all(&num.to_be_bytes())?;
162 }
163 &Value::F64(num) => {
164 writer.write_all(&(size_of_val(&num) as i32).to_be_bytes())?;
165 writer.write_all(&num.to_be_bytes())?;
166 }
167 &Value::F32(num) => {
168 writer.write_all(&(size_of_val(&num) as i32).to_be_bytes())?;
169 writer.write_all(&num.to_be_bytes())?;
170 }
171 &Value::Decimal(num) => {
172 let mut buffer = bytes::BytesMut::new();
173 num.to_sql(&Type::NUMERIC, &mut buffer)
174 .map_err(anyhow::Error::from_boxed)?;
175 writer.write_all(&(buffer.len() as i32).to_be_bytes())?;
176 writer.write_all(&buffer)?;
177 }
178 &Value::Bool(val) => {
179 let val = u8::from(val);
180 writer.write_all(&(size_of_val(&val) as i32).to_be_bytes())?;
181 writer.write_all(&val.to_be_bytes())?;
182 }
183 Value::Bytes(bytes) => {
184 writer.write_all(&(bytes.len() as i32).to_be_bytes())?;
185 writer.write_all(bytes)?;
186 }
187 Value::String(string) => {
188 let bytes = string.as_bytes();
189 writer.write_all(&(bytes.len() as i32).to_be_bytes())?;
190 writer.write_all(bytes)?;
191 }
192 &Value::Timestamptz(dt) => {
193 let val = dt.timestamp_micros() - POSTGRES_EPOCH.and_utc().timestamp_micros();
194 writer.write_all(&(size_of_val(&val) as i32).to_be_bytes())?;
195 writer.write_all(&val.to_be_bytes())?;
196 }
197 &Value::Timestamp(dt) => {
198 let val =
199 dt.and_utc().timestamp_micros() - POSTGRES_EPOCH.and_utc().timestamp_micros();
200 writer.write_all(&(size_of_val(&val) as i32).to_be_bytes())?;
201 writer.write_all(&val.to_be_bytes())?;
202 }
203 &Value::Date(date) => {
204 let val = date.num_days_from_ce() - POSTGRES_EPOCH.num_days_from_ce();
205 writer.write_all(&(size_of_val(&val) as i32).to_be_bytes())?;
206 writer.write_all(&val.to_be_bytes())?;
207 }
208 &Value::Time(time) => {
209 let microsecs = (time.num_seconds_from_midnight() as u64) * 1000000;
210 writer.write_all(&(size_of_val(µsecs) as i32).to_be_bytes())?;
211 writer.write_all(µsecs.to_be_bytes())?;
212 }
213 Value::Json(value) => {
214 let bytes =
215 serde_json::to_vec(value).context("Failed to serialize json into bytes")?;
216 if column.column_type == Type::JSONB || column.column_type == Type::JSONB_ARRAY {
217 let jsonb_version = 1_u8;
218 let len = (bytes.len() + size_of_val(&jsonb_version)) as i32;
219 writer.write_all(&(len).to_be_bytes())?;
220 writer.write_all(&(jsonb_version).to_be_bytes())?;
221 } else {
222 writer.write_all(&(bytes.len() as i32).to_be_bytes())?;
223 }
224 writer.write_all(&bytes)?;
225 }
226 &Value::Uuid(val) => {
227 let bytes = val.as_bytes();
228 writer.write_all(&(bytes.len() as i32).to_be_bytes())?;
229 writer.write_all(bytes)?;
230 }
231 };
232 return Ok(());
233 }
234}