ydb_unofficial/sqlx/
entities.rs1use std::sync::Arc;
2
3use sqlx_core::HashMap;
4use sqlx_core::type_info::TypeInfo;
5use sqlx_core::value::ValueRef;
6use sqlx_core::value::Value as XValue;
7use sqlx_core::row::Row;
8use sqlx_core::column::Column as XColumn;
9use sqlx_core::column::ColumnIndex;
10use ydb_grpc_bindings::generated::ydb;
11use ydb::r#type::PrimitiveTypeId;
12use ydb::value::Value;
13use ydb::r#type::Type as YType;
14use ydb::table_stats::QueryStats;
15use ydb::Column;
16use ydb::ResultSet;
17use ydb::table::ExecuteQueryResult;
18
19use super::database::Ydb;
20
21#[derive(Debug, Clone)]
22pub struct YdbValue {
23 value: Value,
24 info: YdbTypeInfo,
25}
26
27impl YdbValue {
28 pub fn value(&self) -> &Value {
29 &self.value
30 }
31}
32
33impl XValue for YdbValue {
34 type Database = Ydb;
35 fn as_ref(&self) -> YdbValueRef { &self }
36 fn type_info(&self) -> std::borrow::Cow<'_, YdbTypeInfo> { std::borrow::Cow::Borrowed(&self.info) }
37 fn is_null(&self) -> bool { matches!(self.value, Value::NullFlagValue(_)) }
38}
39
40pub type YdbValueRef<'a> = &'a YdbValue;
41
42impl<'a> ValueRef<'a> for YdbValueRef<'a> {
43 type Database = Ydb;
44 fn to_owned(&self) -> YdbValue { Clone::clone(self) }
45 fn type_info(&self) -> std::borrow::Cow<'_, YdbTypeInfo> { std::borrow::Cow::Borrowed(&self.info) }
46 fn is_null(&self) -> bool { XValue::is_null(*self) }
47}
48
49#[derive(Debug, Clone, PartialEq)]
50pub enum YdbTypeInfo {
51 Primitive(PrimitiveTypeId),
52 Null,
53 Unknown,
54}
55
56impl Default for YdbTypeInfo {
57 fn default() -> Self {
58 Self::Unknown
59 }
60}
61impl From<&ydb::OptionalType> for YdbTypeInfo {
62 fn from(value: &ydb::OptionalType) -> Self {
63 if let Some(t) = &value.item {
64 if let Some(t) = &t.r#type {
65 return Self::from(t)
66 }
67 }
68 Self::Unknown
69 }
70}
71impl From<&YType> for YdbTypeInfo {
72 fn from(value: &YType) -> Self {
73 use YType::*;
74 match value {
75 TypeId(id) => Self::Primitive(PrimitiveTypeId::from_i32(*id).unwrap_or_default()),
76 OptionalType(t) => Self::from(t.as_ref()),
77 DecimalType(_) => todo!(),
78 NullType(_) => Self::Null,
79 _ => Self::Unknown
80 }
81 }
82}
83
84impl std::fmt::Display for YdbTypeInfo {
85 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
86 f.pad(self.name())
87 }
88}
89
90impl TypeInfo for YdbTypeInfo {
91 fn is_null(&self) -> bool {
92 matches!(&self, Self::Null)
93 }
94 fn name(&self) -> &str {
95 match self {
96 YdbTypeInfo::Primitive(t) => t.as_str_name(),
97 YdbTypeInfo::Null => "NULL",
98 YdbTypeInfo::Unknown => "UNKNOWN",
99 }
100 }
101}
102
103#[test]
104fn sometest() {
105 }
107
108
109
110#[derive(Debug, Clone, Default)]
111pub struct YdbResultSet {
112 columns: Arc<Columns>,
113 rows: Vec<YdbRow>
114}
115
116#[derive(Debug, Clone, Default)]
117pub struct YdbQueryResult {
118 pub query_stats: Option<QueryStats>,
120 pub result_sets: Vec<YdbResultSet>,
121}
122
123impl YdbResultSet {
124 pub fn columns(&self) -> &[YdbColumn] {
125 self.columns.columns.as_slice()
126 }
127 pub fn rows(&self) -> &[YdbRow] {
128 &self.rows
129 }
130 pub fn to_rows(self) -> Vec<YdbRow> {
131 self.rows
132 }
133}
134
135#[derive(Debug, Default)]
136struct Columns {
137 map: sqlx_core::HashMap<String, usize>,
138 columns: Vec<YdbColumn>,
139}
140
141impl Columns {
142 fn new(columns: Vec<YdbColumn>) -> Arc<Self> {
143 let map = columns.iter().fold(HashMap::new(), |mut map, col|{
144 map.insert(col.name.to_owned(), col.ordinal);
145 map
146 });
147 Arc::new(Self {map, columns})
148 }
149 fn as_slice(&self) -> &[YdbColumn] {
150 &self.columns
151 }
152 fn get_index(&self, name: &str) -> Option<usize> {
153 self.map.get(name).copied()
154 }
155 fn get(&self, idx: usize) -> Option<&YdbColumn> {
156 self.columns.get(idx)
157 }
158 fn len(&self) -> usize {
159 self.columns.len()
160 }
161}
162
163impl From<ExecuteQueryResult> for YdbQueryResult {
164 fn from(result: ExecuteQueryResult) -> Self {
165 let ExecuteQueryResult {query_stats, result_sets, .. } = result;
166 let result_sets = result_sets.into_iter().map(Into::into).collect();
167 Self { query_stats, result_sets }
168 }
169}
170
171impl From<ResultSet> for YdbResultSet {
172 fn from(rs: ResultSet) -> Self {
173 let ResultSet {columns, rows, ..} = rs;
174 let columns = columns.into_iter().enumerate().map(YdbColumn::from).collect();
175 let columns = Columns::new(columns);
176 let rows = rows.into_iter().map(|row|YdbRow::create(columns.clone(), row)).collect();
177 Self { columns, rows }
178 }
179}
180
181impl Extend<Self> for YdbQueryResult {
182 fn extend<T: IntoIterator<Item = Self>>(&mut self, iter: T) {
183 for i in iter {
184 self.result_sets.extend(i.result_sets);
185 if let Some(qs) = &mut self.query_stats {
186 if let Some(e) = i.query_stats {
187 qs.process_cpu_time_us += e.process_cpu_time_us;
188 qs.total_cpu_time_us += e.total_cpu_time_us;
189 qs.total_duration_us += e.total_duration_us;
190 }
192 } else {
193 self.query_stats = i.query_stats;
194 }
195 }
196 }
197}
198
199#[derive(Debug, Clone, Default)]
200pub struct YdbRow {
201 columns: Arc<Columns>,
202 row: Vec<YdbValue>,
203}
204
205impl YdbRow {
206 fn create(columns: Arc<Columns>, row: ydb::Value) -> Self {
207 let items = row.items;
208 if items.len() != columns.len() {
209 panic!("row len != columns len")
210 }
211 let row = items.into_iter().enumerate().map(|(i,value)|{
212 let info = columns.get(i).unwrap().type_info.clone();
213 let value = value.value.unwrap();
214 YdbValue { value, info }
215 }).collect();
216 Self { columns, row }
217 }
218}
219
220impl Row for YdbRow {
221 type Database = Ydb;
222
223 fn columns(&self) -> &[YdbColumn] {
224 &self.columns.as_slice()
225 }
226
227 fn try_get_raw<I: ColumnIndex<Self>>(&self, index: I) -> Result<YdbValueRef, sqlx_core::Error> {
228 let index = index.index(self)?;
229 self.row.get(index).ok_or_else(|| sqlx_core::Error::ColumnIndexOutOfBounds { index, len: self.row.len() } )
230 }
231}
232
233impl ColumnIndex<YdbRow> for &str {
234 fn index(&self, row: &YdbRow) -> Result<usize, sqlx_core::Error> {
235 row.columns.get_index(self)
236 .ok_or_else(|| sqlx_core::Error::ColumnNotFound(self.to_string()) )
237 }
238}
239
240
241#[derive(Debug, Clone)]
242pub struct YdbColumn {
243 pub(crate) ordinal: usize,
244 pub(crate) name: String,
245 pub(crate) type_info: YdbTypeInfo,
246}
247
248impl From<(usize, Column)> for YdbColumn {
249 fn from((ordinal, c): (usize, Column)) -> Self {
250 let Column { name, r#type } = c;
251 let type_info = r#type.map(|t|t.r#type).flatten().map(|t|YdbTypeInfo::from(&t)).unwrap_or_default();
252 Self {ordinal, name, type_info}
253 }
254}
255
256impl XColumn for YdbColumn {
257 type Database = Ydb;
258 fn ordinal(&self) -> usize { self.ordinal }
259 fn name(&self) -> &str { &self.name }
260 fn type_info(&self) -> &YdbTypeInfo { &self.type_info }
261}
262
263sqlx_core::impl_column_index_for_row!{YdbRow}
264
265#[test]
266fn from_select_bots() {
267 let bytes = include_bytes!("../../test/select_bots.protobytes");
268 let result: ExecuteQueryResult = prost::Message::decode(bytes.as_slice()).unwrap();
269 println!("val: {result:?}");
270 for rs in &result.result_sets {
271 println!("\n\n new result set ===========");
272 println!("======columns: ");
273 for col in &rs.columns {
274
275 println!("{col:?}");
276 }
277 println!("\n======rows:");
278 for r in &rs.rows {
279 let r: Vec<_> = r.items.iter().map(|v|&v.value).collect();
280 println!("{r:?}");
281 }
282 }
283 let qr: YdbQueryResult = result.into();
284 println!("\nquery result: \n{qr:?}");
285}