milvus/
schema.rs

1// Licensed to the LF AI & Data foundation under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9//     http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16
17use crate::error;
18use crate::error::Result;
19use crate::proto::schema::FieldState;
20use prost::alloc::vec::Vec;
21use prost::encoding::bool;
22use thiserror::Error as ThisError;
23
24use crate::proto::{
25    common::KeyValuePair,
26    schema::{self, DataType},
27};
28
29pub use crate::proto::schema::FieldData;
30
31pub trait Schema {
32    // fn name(&self) -> &str;
33    // fn description(&self) -> &str;
34    // fn fields(&self) -> &Vec<FieldSchema>;
35
36    // fn schema(&self) -> CollectionSchema {
37    //     CollectionSchema {
38    //         name: self.name(),
39    //         description: self.description(),
40    //         fields: self.fields().to_owned(),
41    //     }
42    // }
43
44    // type ColumnIntoIter<'a>: Iterator<Item = (&'a FieldSchema, Value<'a>)>;
45    // type ColumnIter<'a>: Iterator<Item = (&'static FieldSchema<'static>, Value<'a>)>;
46
47    // fn iter(&self) -> Self::ColumnIntoIter; // Self::ColumnIter<'_>
48    // fn into_iter(self) -> Self::ColumnIntoIter;
49
50    // fn validate(&self) -> std::result::Result<(), Error> {
51    //     for (schm, val) in self.iter() {
52    //         let dtype = val.data_type();
53
54    //         if dtype != schm.dtype
55    //             && !(dtype == DataType::String && schm.dtype == DataType::VarChar)
56    //         {
57    //             return Err(Error::FieldWrongType(
58    //                 schm.name.to_string(),
59    //                 schm.dtype,
60    //                 val.data_type(),
61    //             ));
62    //         }
63
64    //         match schm.dtype {
65    //             DataType::VarChar => match &val {
66    //                 Value::String(d) if d.len() > schm.max_length as _ => {
67    //                     return Err(Error::DimensionMismatch(
68    //                         schm.name.to_string(),
69    //                         schm.max_length as _,
70    //                         d.len() as _,
71    //                     ));
72    //                 }
73    //                 _ => unreachable!(),
74    //             },
75    //             DataType::BinaryVector => match &val {
76    //                 Value::Binary(d) => {
77    //                     return Err(Error::DimensionMismatch(
78    //                         schm.name.to_string(),
79    //                         schm.dim as _,
80    //                         d.len() as _,
81    //                     ));
82    //                 }
83    //                 _ => unreachable!(),
84    //             },
85    //             DataType::FloatVector => match &val {
86    //                 Value::FloatArray(d) => {
87    //                     return Err(Error::DimensionMismatch(
88    //                         schm.name.to_string(),
89    //                         schm.dim as _,
90    //                         d.len() as _,
91    //                     ));
92    //                 }
93    //                 _ => unreachable!(),
94    //             },
95    //             _ => (),
96    //         }
97    //     }
98
99    //     Ok(())
100    // }
101}
102
103pub trait FromDataFields: Sized {
104    fn from_data_fields(fileds: Vec<FieldData>) -> Option<Self>;
105}
106
107// pub trait Column<'a>: IntoFieldData + FromDataFields {
108//     type Entity: Schema;
109//     type IterRows: Iterator<Item = Self::Entity> + 'a;
110//     type IterColumns: Iterator<Item = FieldColumn<'static>> + 'a;
111
112//     fn index(&self, idx: usize) -> Option<Self::Entity>;
113//     fn with_capacity(cap: usize) -> Self;
114//     fn add(&mut self, entity: Self::Entity);
115//     fn len(&self) -> usize;
116//     fn iter_columns(&'a self) -> Self::IterColumns;
117
118//     fn iter_rows(&self) -> Box<dyn Iterator<Item = Self::Entity> + '_> {
119//         Box::new((0..self.len()).filter_map(|idx| self.index(idx)))
120//     }
121
122//     fn is_empty(&self) -> bool {
123//         self.len() == 0
124//     }
125
126//     fn columns() -> &'static [FieldSchema<'static>] {
127//         Self::Entity::SCHEMA
128//     }
129// }
130
131//     Bool = 1,
132//     Int8 = 2,
133//     Int16 = 3,
134//     Int32 = 4,
135//     Int64 = 5,
136//     Float = 10,
137//     Double = 11,
138//     String = 20,
139//     /// variable-length strings with a specified maximum length
140//     VarChar = 21,
141//     BinaryVector = 100,
142//     FloatVector = 101,
143
144pub trait IntoFieldData {
145    fn into_data_fields(self) -> Vec<FieldData>;
146}
147
148#[derive(Debug, Clone)]
149pub struct FieldSchema {
150    pub name: String,
151    pub description: String,
152    pub dtype: DataType,
153    pub is_primary: bool,
154    pub auto_id: bool,
155    pub chunk_size: usize,
156    pub dim: i64,        // only for BinaryVector and FloatVector
157    pub max_length: i32, // only for VarChar
158}
159
160impl FieldSchema {
161    pub const fn const_default() -> Self {
162        Self {
163            name: String::new(),
164            description: String::new(),
165            dtype: DataType::None,
166            is_primary: false,
167            auto_id: false,
168            chunk_size: 0,
169            dim: 0,
170            max_length: 0,
171        }
172    }
173}
174
175impl Default for FieldSchema {
176    fn default() -> Self {
177        Self::const_default()
178    }
179}
180
181impl From<schema::FieldSchema> for FieldSchema {
182    fn from(fld: schema::FieldSchema) -> Self {
183        let dim: i64 = fld
184            .type_params
185            .iter()
186            .find(|k| &k.key == "dim")
187            .and_then(|x| x.value.parse().ok())
188            .unwrap_or(1);
189
190        let dtype = DataType::from_i32(fld.data_type).unwrap();
191
192        FieldSchema {
193            name: fld.name,
194            description: fld.description,
195            dtype,
196            is_primary: fld.is_primary_key,
197            auto_id: fld.auto_id,
198            max_length: 0,
199            chunk_size: (dim
200                * match dtype {
201                    DataType::BinaryVector => dim / 8,
202                    _ => dim,
203                }) as _,
204            dim,
205        }
206    }
207}
208
209impl FieldSchema {
210    pub fn new_bool(name: &str, description: &str) -> Self {
211        Self {
212            name: name.to_owned(),
213            description: description.to_owned(),
214            dtype: DataType::Bool,
215            is_primary: false,
216            auto_id: false,
217            chunk_size: 1,
218            dim: 1,
219            max_length: 0,
220        }
221    }
222
223    pub fn new_int8(name: &str, description: &str) -> Self {
224        Self {
225            name: name.to_owned(),
226            description: description.to_owned(),
227            dtype: DataType::Int8,
228            is_primary: false,
229            auto_id: false,
230            chunk_size: 1,
231            dim: 1,
232            max_length: 0,
233        }
234    }
235
236    pub fn new_int16(name: &str, description: &str) -> Self {
237        Self {
238            name: name.to_owned(),
239            description: description.to_owned(),
240            dtype: DataType::Int16,
241            is_primary: false,
242            auto_id: false,
243            chunk_size: 1,
244            dim: 1,
245            max_length: 0,
246        }
247    }
248
249    pub fn new_int32(name: &str, description: &str) -> Self {
250        Self {
251            name: name.to_owned(),
252            description: description.to_owned(),
253            dtype: DataType::Int32,
254            is_primary: false,
255            auto_id: false,
256            chunk_size: 1,
257            dim: 1,
258            max_length: 0,
259        }
260    }
261
262    pub fn new_int64(name: &str, description: &str) -> Self {
263        Self {
264            name: name.to_owned(),
265            description: description.to_owned(),
266            dtype: DataType::Int64,
267            is_primary: false,
268            auto_id: false,
269            chunk_size: 1,
270            dim: 1,
271            max_length: 0,
272        }
273    }
274
275    pub fn new_primary_int64(name: &str, description: &str, auto_id: bool) -> Self {
276        Self {
277            name: name.to_owned(),
278            description: description.to_owned(),
279            dtype: DataType::Int64,
280            is_primary: true,
281            auto_id,
282            chunk_size: 1,
283            dim: 1,
284            max_length: 0,
285        }
286    }
287
288    pub fn new_primary_varchar(
289        name: &str,
290        description: &str,
291        auto_id: bool,
292        max_length: i32,
293    ) -> Self {
294        Self {
295            name: name.to_owned(),
296            description: description.to_owned(),
297            dtype: DataType::VarChar,
298            is_primary: true,
299            auto_id,
300            max_length,
301            chunk_size: 1,
302            dim: 1,
303        }
304    }
305
306    pub fn new_float(name: &str, description: &str) -> Self {
307        Self {
308            name: name.to_owned(),
309            description: description.to_owned(),
310            dtype: DataType::Float,
311            is_primary: false,
312            auto_id: false,
313            chunk_size: 1,
314            dim: 1,
315            max_length: 0,
316        }
317    }
318
319    pub fn new_double(name: &str, description: &str) -> Self {
320        Self {
321            name: name.to_owned(),
322            description: description.to_owned(),
323            dtype: DataType::Double,
324            is_primary: false,
325            auto_id: false,
326            chunk_size: 1,
327            dim: 1,
328            max_length: 0,
329        }
330    }
331
332    pub fn new_string(name: &str, description: &str) -> Self {
333        Self {
334            name: name.to_owned(),
335            description: description.to_owned(),
336            dtype: DataType::String,
337            is_primary: false,
338            auto_id: false,
339            chunk_size: 1,
340            dim: 1,
341            max_length: 0,
342        }
343    }
344
345    pub fn new_varchar(name: &str, description: &str, max_length: i32) -> Self {
346        if max_length <= 0 {
347            panic!("max_length should be positive");
348        }
349
350        Self {
351            name: name.to_owned(),
352            description: description.to_owned(),
353            dtype: DataType::VarChar,
354            max_length,
355            is_primary: false,
356            auto_id: false,
357            chunk_size: 1,
358            dim: 1,
359        }
360    }
361
362    pub fn new_binary_vector(name: &str, description: &str, dim: i64) -> Self {
363        if dim <= 0 {
364            panic!("dim should be positive");
365        }
366
367        Self {
368            name: name.to_owned(),
369            description: description.to_owned(),
370            dtype: DataType::BinaryVector,
371            chunk_size: dim as usize / 8,
372            dim,
373            is_primary: false,
374            auto_id: false,
375            max_length: 0,
376        }
377    }
378
379    pub fn new_float_vector(name: &str, description: &str, dim: i64) -> Self {
380        if dim <= 0 {
381            panic!("dim should be positive");
382        }
383
384        Self {
385            name: name.to_owned(),
386            description: description.to_owned(),
387            dtype: DataType::FloatVector,
388            chunk_size: dim as usize,
389            dim,
390            is_primary: false,
391            auto_id: false,
392            max_length: 0,
393        }
394    }
395}
396
397impl From<FieldSchema> for schema::FieldSchema {
398    fn from(fld: FieldSchema) -> schema::FieldSchema {
399        let params = match fld.dtype {
400            DataType::BinaryVector | DataType::FloatVector => vec![KeyValuePair {
401                key: "dim".to_string(),
402                value: fld.dim.to_string(),
403            }],
404            DataType::VarChar => vec![KeyValuePair {
405                key: "max_length".to_string(),
406                value: fld.max_length.to_string(),
407            }],
408            _ => Vec::new(),
409        };
410
411        schema::FieldSchema {
412            field_id: 0,
413            name: fld.name.into(),
414            is_primary_key: fld.is_primary,
415            description: fld.description,
416            data_type: fld.dtype as i32,
417            type_params: params,
418            index_params: Vec::new(),
419            auto_id: fld.auto_id,
420            state: FieldState::FieldCreated as _,
421        }
422    }
423}
424
425#[derive(Debug, Clone)]
426pub struct CollectionSchema {
427    pub(crate) name: String,
428    pub(crate) description: String,
429    pub(crate) fields: Vec<FieldSchema>,
430}
431
432impl CollectionSchema {
433    #[inline]
434    pub fn auto_id(&self) -> bool {
435        self.fields.iter().any(|x| x.auto_id)
436    }
437
438    pub fn primary_column(&self) -> Option<&FieldSchema> {
439        self.fields.iter().find(|s| s.is_primary)
440    }
441
442    pub fn validate(&self) -> Result<()> {
443        self.primary_column().ok_or_else(|| Error::NoPrimaryKey)?;
444        // TODO addidtional schema checks need to be added here
445        Ok(())
446    }
447
448    pub fn get_field<S>(&self, name: S) -> Option<&FieldSchema>
449    where
450        S: AsRef<str>,
451    {
452        let name = name.as_ref();
453        self.fields.iter().find(|f| f.name == name)
454    }
455
456    pub fn is_valid_vector_field(&self, field_name: &str) -> Result<()> {
457        for f in &self.fields {
458            if f.name == field_name {
459                if f.dtype == DataType::BinaryVector || f.dtype == DataType::FloatVector {
460                    return Ok(());
461                } else {
462                    return Err(error::Error::from(Error::NotVectorField(
463                        field_name.to_owned(),
464                    )));
465                }
466            }
467        }
468        return Err(error::Error::from(Error::NoSuchKey(field_name.to_owned())));
469    }
470}
471
472impl From<CollectionSchema> for schema::CollectionSchema {
473    fn from(col: CollectionSchema) -> Self {
474        schema::CollectionSchema {
475            name: col.name.to_string(),
476            auto_id: col.auto_id(),
477            description: col.description,
478            fields: col.fields.into_iter().map(Into::into).collect(),
479        }
480    }
481}
482
483impl From<schema::CollectionSchema> for CollectionSchema {
484    fn from(v: schema::CollectionSchema) -> Self {
485        CollectionSchema {
486            fields: v.fields.into_iter().map(Into::into).collect(),
487            name: v.name,
488            description: v.description,
489        }
490    }
491}
492
493#[derive(Debug, Clone)]
494pub struct CollectionSchemaBuilder {
495    name: String,
496    description: String,
497    inner: Vec<FieldSchema>,
498}
499
500impl CollectionSchemaBuilder {
501    pub fn new(name: &str, description: &str) -> Self {
502        Self {
503            name: name.to_owned(),
504            description: description.to_owned(),
505            inner: Vec::new(),
506        }
507    }
508
509    pub fn add_field(&mut self, schema: FieldSchema) -> &mut Self {
510        self.inner.push(schema);
511        self
512    }
513
514    pub fn set_primary_key<S>(&mut self, name: S) -> Result<&mut Self>
515    where
516        S: AsRef<str>,
517    {
518        let n = name.as_ref();
519        for f in self.inner.iter_mut() {
520            if f.is_primary {
521                return Err(error::Error::from(Error::DuplicatePrimaryKey(
522                    n.to_string(),
523                    f.name.to_string(),
524                )));
525            }
526        }
527
528        for f in self.inner.iter_mut() {
529            if n == f.name {
530                if f.dtype == DataType::Int64 || f.dtype == DataType::VarChar {
531                    f.is_primary = true;
532                    return Ok(self);
533                } else {
534                    return Err(error::Error::from(Error::UnsupportedPrimaryKey(
535                        f.dtype.to_owned(),
536                    )));
537                }
538            }
539        }
540
541        Err(error::Error::from(Error::NoSuchKey(n.to_string())))
542    }
543
544    pub fn enable_auto_id(&mut self) -> Result<&mut Self> {
545        for f in self.inner.iter_mut() {
546            if f.is_primary {
547                if f.dtype == DataType::Int64 {
548                    f.auto_id = true;
549                    return Ok(self);
550                } else {
551                    return Err(error::Error::from(Error::UnsupportedAutoId(
552                        f.dtype.to_owned(),
553                    )));
554                }
555            }
556        }
557
558        Err(error::Error::from(Error::NoPrimaryKey))
559    }
560
561    pub fn build(&mut self) -> Result<CollectionSchema> {
562        let mut has_primary = false;
563
564        for f in self.inner.iter() {
565            if f.is_primary {
566                has_primary = true;
567                break;
568            }
569        }
570
571        if !has_primary {
572            return Err(error::Error::from(Error::NoPrimaryKey));
573        }
574
575        let this = std::mem::replace(self, CollectionSchemaBuilder::new("".into(), ""));
576
577        Ok(CollectionSchema {
578            fields: this.inner.into(),
579            name: this.name,
580            description: this.description,
581        })
582    }
583}
584
585#[derive(Debug, ThisError)]
586pub enum Error {
587    #[error("try to set primary key {0:?}, but {1:?} is also key")]
588    DuplicatePrimaryKey(String, String),
589
590    #[error("can not find any primary key")]
591    NoPrimaryKey,
592
593    #[error("primary key must be int64 or varchar, unsupported type {0:?}")]
594    UnsupportedPrimaryKey(DataType),
595
596    #[error("auto id must be int64, unsupported type {0:?}")]
597    UnsupportedAutoId(DataType),
598
599    #[error("dimension mismatch for {0:?}, expected dim {1:?}, got {2:?}")]
600    DimensionMismatch(String, i32, i32),
601
602    #[error("wrong field data type, field {0} expected to be{1:?}, but got {2:?}")]
603    FieldWrongType(String, DataType, DataType),
604
605    #[error("field does not exists in schema: {0:?}")]
606    FieldDoesNotExists(String),
607
608    #[error("can not find such key {0:?}")]
609    NoSuchKey(String),
610
611    #[error("field {0:?} must be a vector field")]
612    NotVectorField(String),
613}