tarantool/schema/
space.rs

1use crate::error::{Error, TarantoolError, TarantoolErrorCode};
2use crate::index::IteratorType;
3use crate::schema;
4use crate::schema::sequence as schema_seq;
5use crate::session;
6use crate::set_error;
7use crate::space;
8use crate::space::space_id_temporary_min;
9use crate::space::{Metadata, SpaceCreateOptions};
10use crate::space::{Space, SpaceId, SpaceType, SystemSpace};
11use crate::transaction;
12use crate::tuple::Tuple;
13use crate::unwrap_or;
14use crate::util::Value;
15use std::collections::BTreeMap;
16
17/// Create a space.
18/// (for details see [box.schema.space.create()](https://www.tarantool.io/en/doc/latest/reference/reference_lua/box_schema/space_create/)).
19///
20/// - `name` -  name of space, which should conform to the rules for object names.
21/// - `opts` - see SpaceCreateOptions struct.
22///
23/// Returns a new space.
24///
25/// **NOTE:** This function will initiate a transaction if there's isn't an
26/// active one, and if there is the active transaction may be aborted in case
27/// of an error. This shouldn't be a problem if you always consider this
28/// function returning an error to be worthy of a transcation roll back,
29/// which you should.
30pub fn create_space(name: &str, opts: &SpaceCreateOptions) -> Result<Space, Error> {
31    // Check if space already exists.
32    if let Some(space) = Space::find(name) {
33        return if opts.if_not_exists {
34            Ok(space)
35        } else {
36            set_error!(TarantoolErrorCode::SpaceExists, "{}", name);
37            Err(TarantoolError::last().into())
38        };
39    }
40
41    // Resolve ID of user, specified in options, or use ID of current session's user.
42    let user_id = match &opts.user {
43        None => session::uid()?,
44        Some(user) => {
45            let resolved_uid = schema::resolve_user_or_role(user.as_str())?;
46            match resolved_uid {
47                Some(uid) => uid,
48                None => {
49                    set_error!(TarantoolErrorCode::NoSuchUser, "{}", user.as_str());
50                    return Err(TarantoolError::last().into());
51                }
52            }
53        }
54    };
55
56    // Resolve ID of new space or use ID, specified in options.
57    let id = if let Some(opts_id) = opts.id {
58        opts_id
59    } else {
60        generate_space_id(opts.space_type == SpaceType::Temporary)?
61    };
62
63    let mut flags = BTreeMap::new();
64    match opts.space_type {
65        SpaceType::DataTemporary => {
66            flags.insert("temporary".into(), true.into());
67        }
68        SpaceType::Temporary => {
69            flags.insert("type".into(), "temporary".into());
70        }
71        SpaceType::DataLocal => {
72            flags.insert("group_id".into(), 1.into());
73        }
74        SpaceType::Synchronous => {
75            flags.insert("is_sync".into(), true.into());
76        }
77        SpaceType::Normal => {}
78    }
79
80    let format = opts
81        .format
82        .iter()
83        .flat_map(|f| f.iter())
84        .map(|f| {
85            IntoIterator::into_iter([
86                ("name".into(), Value::Str(f.name.as_str().into())),
87                ("type".into(), Value::Str(f.field_type.as_str().into())),
88                ("is_nullable".into(), Value::Bool(f.is_nullable)),
89            ])
90            .collect()
91        })
92        .collect();
93
94    let nested_transaction = transaction::is_in_transaction();
95    if !nested_transaction {
96        transaction::begin()?;
97    }
98
99    let res = (|| -> Result<_, Error> {
100        let sys_space = SystemSpace::Space.as_space();
101        sys_space.insert(&Metadata {
102            id,
103            user_id,
104            name: name.into(),
105            engine: opts.engine,
106            field_count: opts.field_count,
107            flags,
108            format,
109        })?;
110
111        // Update max_id for backwards compatibility.
112        if opts.id.is_none() && opts.space_type != SpaceType::Temporary {
113            let sys_schema = SystemSpace::Schema.as_space();
114            if let Some(t) = sys_schema.get(&["max_id"])? {
115                if let Ok(Some(max_id)) = t.field::<SpaceId>(1) {
116                    if id > max_id {
117                        sys_schema.replace(&("max_id", id))?;
118                    }
119                }
120            }
121        }
122
123        Ok(())
124    })();
125
126    if let Err(e) = res {
127        // If we were already in the transaction before calling this function,
128        // the user can choose to ignore the result and commit the transaction
129        // anyway. This most likely would be a logic error, because we would've
130        // already rolled back any changes made by the caller and box_txn_commit
131        // would silently return ok, but unfortunately there's nothing we can do
132        // about it.
133        transaction::rollback()?;
134        return Err(e);
135    }
136
137    if !nested_transaction {
138        transaction::commit()?;
139    }
140
141    // Safety: this is safe because inserting into _space didn't fail, so the
142    // space has been created.
143    let space = unsafe { Space::from_id_unchecked(id) };
144    Ok(space)
145}
146
147#[deprecated = "use `tarantool::space::Metadata` instead"]
148pub type SpaceMetadata<'a> = Metadata<'a>;
149
150/// Implementation ported from box_generate_space_id.
151/// <https://github.com/tarantool/tarantool/blob/70e423e92fc00df2ffe385f31dae9ea8e1cc1732/src/box/box.cc#L5737>
152pub fn generate_space_id(is_temporary: bool) -> Result<SpaceId, Error> {
153    let sys_space = SystemSpace::Space.as_space();
154    let (id_range_min, id_range_max);
155    if is_temporary {
156        id_range_min = unwrap_or!(space_id_temporary_min(), {
157            set_error!(
158                TarantoolErrorCode::Unsupported,
159                "fully temporary space api is not supported in the current tarantool executable"
160            );
161            return Err(TarantoolError::last().into());
162        });
163        id_range_max = space::SPACE_ID_MAX + 1;
164    } else {
165        id_range_min = space::SYSTEM_ID_MAX + 1;
166        id_range_max = space_id_temporary_min().unwrap_or(space::SPACE_ID_MAX + 1);
167    };
168
169    let mut iter = sys_space.select(IteratorType::LT, &[id_range_max])?;
170    let tuple = iter.next().expect("there's always at least system spaces");
171    let mut max_id: SpaceId = tuple
172        .field(0)
173        .expect("space metadata should decode fine")
174        .expect("space id should always be present");
175
176    let find_next_unused_id = |start: SpaceId| -> Result<SpaceId, Error> {
177        let iter = sys_space.select(IteratorType::GE, &[start])?;
178        let mut next_id = start;
179        for tuple in iter {
180            let id: SpaceId = tuple
181                .field(0)
182                .expect("space metadata should decode fine")
183                .expect("space id should always be present");
184            if id != next_id {
185                // Found a hole in the id range.
186                return Ok(next_id);
187            }
188            next_id += 1;
189        }
190        Ok(next_id)
191    };
192
193    if max_id < id_range_min {
194        max_id = id_range_min;
195    }
196
197    let mut space_id = find_next_unused_id(max_id)?;
198    if space_id >= id_range_max {
199        space_id = find_next_unused_id(id_range_min)?;
200        if space_id >= id_range_max {
201            set_error!(TarantoolErrorCode::CreateSpace, "space id limit is reached");
202            return Err(TarantoolError::last().into());
203        }
204    }
205
206    Ok(space_id)
207}
208
209pub fn space_metadata(space_id: SpaceId) -> Result<Metadata<'static>, Error> {
210    let sys_space = SystemSpace::VSpace.as_space();
211    let tuple = sys_space.get(&[space_id])?.ok_or(Error::MetaNotFound)?;
212    tuple.decode::<Metadata>()
213}
214
215/// Drop a space.
216pub fn drop_space(space_id: SpaceId) -> Result<(), Error> {
217    // Delete automatically generated sequence.
218    let sys_space_sequence: Space = SystemSpace::SpaceSequence.into();
219    if let Some(t) = sys_space_sequence.get(&(space_id,))? {
220        sys_space_sequence.delete(&(space_id,))?;
221        let is_generated = t.field::<bool>(2)?.unwrap();
222        if is_generated {
223            let seq_id = t.field::<u32>(1)?.unwrap();
224            schema_seq::drop_sequence(seq_id)?;
225        }
226    }
227
228    // Remove from _trigger.
229    let sys_trigger: Space = SystemSpace::Trigger.into();
230    let sys_space_idx = sys_trigger.index("space_id").unwrap();
231    for t in sys_space_idx
232        .select(IteratorType::Eq, &(space_id,))?
233        .collect::<Vec<Tuple>>()
234    {
235        let name = t.field::<String>(0)?.unwrap();
236        sys_trigger.delete(&(name,))?;
237    }
238
239    // Remove from _fk_constraint.
240    let sys_fk_constraint: Space = SystemSpace::FkConstraint.into();
241    let sys_space_idx = sys_fk_constraint.index("child_id").unwrap();
242    for t in sys_space_idx
243        .select(IteratorType::Eq, &(space_id,))?
244        .collect::<Vec<Tuple>>()
245    {
246        let name = t.field::<String>(0)?.unwrap();
247        sys_fk_constraint.delete(&(name, space_id))?;
248    }
249
250    // CRemove from _ck_constraint.
251    let sys_ck_constraint: Space = SystemSpace::CkConstraint.into();
252    let sys_space_idx = sys_ck_constraint.index("primary").unwrap();
253    for t in sys_space_idx
254        .select(IteratorType::Eq, &(space_id,))?
255        .collect::<Vec<Tuple>>()
256    {
257        let name = t.field::<String>(2)?.unwrap();
258        sys_ck_constraint.delete(&(space_id, name))?;
259    }
260
261    // Remove from _func_index.
262    let sys_func_index: Space = SystemSpace::FuncIndex.into();
263    let sys_space_idx = sys_func_index.index("primary").unwrap();
264    for t in sys_space_idx
265        .select(IteratorType::Eq, &(space_id,))?
266        .collect::<Vec<Tuple>>()
267    {
268        let index_id = t.field::<u32>(1)?.unwrap();
269        sys_func_index.delete(&(space_id, index_id))?;
270    }
271
272    // Remove from _index.
273    let sys_vindex: Space = SystemSpace::VIndex.into();
274    let sys_index: Space = SystemSpace::Index.into();
275    let keys = sys_vindex
276        .select(IteratorType::Eq, &(space_id,))?
277        .collect::<Vec<Tuple>>();
278    for i in 1..keys.len() + 1 {
279        let t_idx = keys.len() - i;
280        let t = &keys[t_idx];
281        let id = t.field::<u32>(0)?.unwrap();
282        let iid = t.field::<u32>(1)?.unwrap();
283        sys_index.delete(&(id, iid))?;
284    }
285
286    // Revoke priveleges.
287    schema::revoke_object_privileges("space", space_id)?;
288
289    // Remove from _truncate.
290    let sys_truncate: Space = SystemSpace::Truncate.into();
291    sys_truncate.delete(&(space_id,))?;
292
293    // Remove from _space.
294    let sys_space: Space = SystemSpace::Space.into();
295    sys_space.delete(&(space_id,))?;
296
297    Ok(())
298}