ydb_unofficial/sqlx/
entities.rs

1use std::sync::Arc;
2
3use sqlx_core::HashMap;
4use sqlx_core::type_info::TypeInfo;
5use sqlx_core::value::ValueRef;
6use sqlx_core::value::Value as XValue;
7use sqlx_core::row::Row;
8use sqlx_core::column::Column as XColumn;
9use sqlx_core::column::ColumnIndex;
10use ydb_grpc_bindings::generated::ydb;
11use ydb::r#type::PrimitiveTypeId;
12use ydb::value::Value;
13use ydb::r#type::Type as YType;
14use ydb::table_stats::QueryStats;
15use ydb::Column;
16use ydb::ResultSet;
17use ydb::table::ExecuteQueryResult;
18
19use super::database::Ydb;
20
21#[derive(Debug, Clone)]
22pub struct YdbValue {
23    value: Value,
24    info: YdbTypeInfo,
25}
26
27impl YdbValue {
28    pub fn value(&self) -> &Value {
29        &self.value
30    }
31}
32
33impl XValue for YdbValue {
34    type Database = Ydb;
35    fn as_ref(&self) -> YdbValueRef { &self }
36    fn type_info(&self) -> std::borrow::Cow<'_, YdbTypeInfo> { std::borrow::Cow::Borrowed(&self.info) }
37    fn is_null(&self) -> bool { matches!(self.value, Value::NullFlagValue(_)) }
38}
39
40pub type YdbValueRef<'a> = &'a YdbValue;
41
42impl<'a> ValueRef<'a> for YdbValueRef<'a> {
43    type Database = Ydb;
44    fn to_owned(&self) -> YdbValue { Clone::clone(self) }
45    fn type_info(&self) -> std::borrow::Cow<'_, YdbTypeInfo> { std::borrow::Cow::Borrowed(&self.info) }
46    fn is_null(&self) -> bool { XValue::is_null(*self) }
47}
48
49#[derive(Debug, Clone, PartialEq)]
50pub enum YdbTypeInfo {
51    Primitive(PrimitiveTypeId),
52    Null,
53    Unknown,
54}
55
56impl Default for YdbTypeInfo {
57    fn default() -> Self {
58        Self::Unknown
59    }
60}
61impl From<&ydb::OptionalType> for YdbTypeInfo {
62    fn from(value: &ydb::OptionalType) -> Self {
63        if let Some(t) = &value.item {
64            if let Some(t) = &t.r#type {
65                    return Self::from(t)
66            }
67        }
68        Self::Unknown
69    }
70}
71impl From<&YType> for YdbTypeInfo {
72    fn from(value: &YType) -> Self {
73        use YType::*;
74        match value {
75            TypeId(id) => Self::Primitive(PrimitiveTypeId::from_i32(*id).unwrap_or_default()),
76            OptionalType(t) => Self::from(t.as_ref()),
77            DecimalType(_) => todo!(),
78            NullType(_) => Self::Null,
79            _ => Self::Unknown
80        }
81    }
82}
83
84impl std::fmt::Display for YdbTypeInfo {
85    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
86        f.pad(self.name())
87    }
88}
89
90impl TypeInfo for YdbTypeInfo {
91    fn is_null(&self) -> bool {
92        matches!(&self, Self::Null)
93    }
94    fn name(&self) -> &str {
95        match self {
96            YdbTypeInfo::Primitive(t) => t.as_str_name(),
97            YdbTypeInfo::Null => "NULL",
98            YdbTypeInfo::Unknown => "UNKNOWN",
99        }
100    }
101}
102
103#[test]
104fn sometest() {
105    //let q: sqlx_core::query::Query = todo!();
106}
107
108
109
110#[derive(Debug, Clone, Default)]
111pub struct YdbResultSet {
112    columns: Arc<Columns>,
113    rows: Vec<YdbRow>
114}
115
116#[derive(Debug, Clone, Default)]
117pub struct YdbQueryResult {
118    //TODO: добавить возврат updates, reads, deletes, affected_rows
119    pub query_stats: Option<QueryStats>,
120    pub result_sets: Vec<YdbResultSet>,
121}
122
123impl YdbResultSet {
124    pub fn columns(&self) -> &[YdbColumn] {
125        self.columns.columns.as_slice()
126    }
127    pub fn rows(&self) -> &[YdbRow] {
128        &self.rows
129    }
130    pub fn to_rows(self) -> Vec<YdbRow> {
131        self.rows
132    }
133}
134
135#[derive(Debug, Default)]
136struct Columns {
137    map: sqlx_core::HashMap<String, usize>,
138    columns: Vec<YdbColumn>,
139}
140
141impl Columns {
142    fn new(columns: Vec<YdbColumn>) -> Arc<Self> {
143        let map = columns.iter().fold(HashMap::new(), |mut map, col|{
144            map.insert(col.name.to_owned(), col.ordinal);
145            map
146        });
147        Arc::new(Self {map, columns})
148    }
149    fn as_slice(&self) -> &[YdbColumn] {
150        &self.columns
151    }
152    fn get_index(&self, name: &str) -> Option<usize> {
153        self.map.get(name).copied()
154    }
155    fn get(&self, idx: usize) -> Option<&YdbColumn> {
156        self.columns.get(idx)
157    }
158    fn len(&self) -> usize {
159        self.columns.len()
160    }
161}
162
163impl From<ExecuteQueryResult> for YdbQueryResult {
164    fn from(result: ExecuteQueryResult) -> Self {
165        let ExecuteQueryResult {query_stats, result_sets, .. } = result;
166        let result_sets = result_sets.into_iter().map(Into::into).collect();
167        Self { query_stats, result_sets }
168    }
169}
170
171impl From<ResultSet> for YdbResultSet {
172    fn from(rs: ResultSet) -> Self {
173        let ResultSet {columns, rows, ..} = rs;
174        let columns = columns.into_iter().enumerate().map(YdbColumn::from).collect();
175        let columns = Columns::new(columns);
176        let rows = rows.into_iter().map(|row|YdbRow::create(columns.clone(), row)).collect();
177        Self { columns, rows }
178    }
179}
180
181impl Extend<Self> for YdbQueryResult {
182    fn extend<T: IntoIterator<Item = Self>>(&mut self, iter: T) {
183        for i in iter {
184            self.result_sets.extend(i.result_sets);
185            if let Some(qs) = &mut self.query_stats {
186                if let Some(e) = i.query_stats {
187                    qs.process_cpu_time_us += e.process_cpu_time_us;
188                    qs.total_cpu_time_us += e.total_cpu_time_us;
189                    qs.total_duration_us += e.total_duration_us;
190                    //TODO: доработать extend QueryPhasesStats
191                }
192            } else {
193                self.query_stats = i.query_stats;
194            }
195        }
196    }
197}
198
199#[derive(Debug, Clone, Default)]
200pub struct YdbRow {
201    columns: Arc<Columns>,
202    row: Vec<YdbValue>,
203}
204
205impl YdbRow {
206    fn create(columns: Arc<Columns>, row: ydb::Value) -> Self {
207        let items = row.items;
208        if items.len() != columns.len() {
209            panic!("row len != columns len")
210        }
211        let row = items.into_iter().enumerate().map(|(i,value)|{
212            let info = columns.get(i).unwrap().type_info.clone();
213            let value = value.value.unwrap();
214            YdbValue { value, info }
215        }).collect();
216        Self { columns, row }
217    }
218}
219
220impl Row for YdbRow {
221    type Database = Ydb;
222
223    fn columns(&self) -> &[YdbColumn] {
224        &self.columns.as_slice()
225    }
226
227    fn try_get_raw<I: ColumnIndex<Self>>(&self, index: I) -> Result<YdbValueRef, sqlx_core::Error> {
228        let index = index.index(self)?;
229        self.row.get(index).ok_or_else(|| sqlx_core::Error::ColumnIndexOutOfBounds { index, len: self.row.len() } )
230    }
231}
232
233impl ColumnIndex<YdbRow> for &str {
234    fn index(&self, row: &YdbRow) -> Result<usize, sqlx_core::Error> {
235        row.columns.get_index(self)
236        .ok_or_else(|| sqlx_core::Error::ColumnNotFound(self.to_string()) )
237    }
238}
239
240
241#[derive(Debug, Clone)]
242pub struct YdbColumn {
243    pub(crate) ordinal: usize,
244    pub(crate) name: String,
245    pub(crate) type_info: YdbTypeInfo,
246}
247
248impl From<(usize, Column)> for YdbColumn {
249    fn from((ordinal, c): (usize, Column)) -> Self {
250        let Column { name, r#type } = c;
251        let type_info = r#type.map(|t|t.r#type).flatten().map(|t|YdbTypeInfo::from(&t)).unwrap_or_default();
252        Self {ordinal, name, type_info}
253    }
254}
255
256impl XColumn for YdbColumn {
257    type Database = Ydb;
258    fn ordinal(&self) -> usize { self.ordinal }
259    fn name(&self) -> &str { &self.name }
260    fn type_info(&self) -> &YdbTypeInfo { &self.type_info }
261}
262
263sqlx_core::impl_column_index_for_row!{YdbRow}
264
265#[test]
266fn from_select_bots() {
267    let bytes = include_bytes!("../../test/select_bots.protobytes");
268    let result: ExecuteQueryResult = prost::Message::decode(bytes.as_slice()).unwrap(); 
269    println!("val: {result:?}");
270    for rs in &result.result_sets {
271        println!("\n\n new result set ===========");
272        println!("======columns: ");
273        for col in &rs.columns {
274            
275            println!("{col:?}");
276        }
277        println!("\n======rows:");
278        for r in &rs.rows {
279            let r: Vec<_> = r.items.iter().map(|v|&v.value).collect();
280            println!("{r:?}");
281        }
282    }
283    let qr: YdbQueryResult = result.into();
284    println!("\nquery result: \n{qr:?}");
285}