tarantool 0.6.4

Tarantool rust bindings
Documentation
use once_cell::unsync::Lazy;

use std::cell::{Cell, RefCell};
use std::collections::HashMap;
use std::net::SocketAddr;
use std::rc::Rc;

use crate::error::Error;
use crate::fiber::{Latch, LatchGuard};
use crate::index::IteratorType;
use crate::space::{SystemSpace, SYSTEM_ID_MAX};
use crate::tuple::Tuple;

use super::inner::ConnInner;
use super::options::Options;
use super::protocol::{decode_multiple_rows, encode_select};

pub struct ConnSchema {
    version: Cell<Option<u32>>,
    is_updating: Cell<bool>,
    space_ids: RefCell<HashMap<String, u32>>,
    index_ids: RefCell<HashMap<(u32, String), u32>>,
    lock: Latch,
}

impl ConnSchema {
    pub fn acquire(addrs: &[SocketAddr]) -> Rc<ConnSchema> {
        let addr = SCHEMA_CACHE.with(|cache| {
            let cache = cache.cache.borrow();
            addrs.iter().find_map(|addr| cache.get(addr).cloned())
        });
        if let Some(addr) = addr {
            return addr;
        }

        let schema = Rc::new(ConnSchema {
            version: Cell::new(None),
            is_updating: Cell::new(false),
            space_ids: Default::default(),
            index_ids: Default::default(),
            lock: Latch::new(),
        });

        SCHEMA_CACHE.with(|cache| {
            let mut cache = cache.cache.borrow_mut();
            for addr in addrs {
                cache.insert(*addr, schema.clone());
            }
        });

        schema
    }

    pub fn refresh(
        &self,
        conn_inner: &Rc<ConnInner>,
        actual_version: Option<u32>,
    ) -> Result<bool, Error> {
        let mut _lock: Option<LatchGuard> = None;

        if self.is_updating.get() {
            _lock = Some(self.lock.lock());
        }

        let result = if self.is_outdated(actual_version) {
            if _lock.is_none() {
                _lock = Some(self.lock.lock());
            }

            self.update(conn_inner)?;
            true
        } else {
            false
        };
        Ok(result)
    }

    pub fn update(&self, conn_inner: &Rc<ConnInner>) -> Result<(), Error> {
        self.is_updating.set(true);
        let (spaces_data, actual_schema_version) = self.fetch_schema_spaces(conn_inner)?;
        for row in spaces_data {
            let (id, _, name) = row.decode::<(u32, u32, String)>()?;
            self.space_ids.borrow_mut().insert(name, id);
        }

        for row in self.fetch_schema_indexes(conn_inner)? {
            let (space_id, index_id, name) = row.decode::<(u32, u32, String)>()?;
            self.index_ids
                .borrow_mut()
                .insert((space_id, name), index_id);
        }

        self.version.set(Some(actual_schema_version));
        self.is_updating.set(false);
        Ok(())
    }

    pub fn lookup_space(&self, name: &str) -> Option<u32> {
        self.space_ids.borrow().get(name).copied()
    }

    pub fn lookup_index(&self, name: &str, space_id: u32) -> Option<u32> {
        self.index_ids
            .borrow()
            .get(&(space_id, name.to_string()))
            .copied()
    }

    fn is_outdated(&self, actual_version: Option<u32>) -> bool {
        match actual_version {
            None => true,
            Some(actual_version) => match self.version.get() {
                None => true,
                Some(cached_version) => actual_version > cached_version,
            },
        }
    }

    fn fetch_schema_spaces(&self, conn_inner: &Rc<ConnInner>) -> Result<(Vec<Tuple>, u32), Error> {
        conn_inner.request(
            |buf, sync| {
                encode_select(
                    buf,
                    sync,
                    SystemSpace::VSpace as u32,
                    0,
                    u32::max_value(),
                    0,
                    IteratorType::GT,
                    &(SYSTEM_ID_MAX,),
                )
            },
            |buf, header| Ok((decode_multiple_rows(buf, None)?, header.schema_version)),
            &Options::default(),
        )
    }

    fn fetch_schema_indexes(&self, conn_inner: &Rc<ConnInner>) -> Result<Vec<Tuple>, Error> {
        let empty_array: [(); 0] = [];
        conn_inner.request(
            |buf, sync| {
                encode_select(
                    buf,
                    sync,
                    SystemSpace::VIndex as u32,
                    0,
                    u32::max_value(),
                    0,
                    IteratorType::All,
                    &empty_array,
                )
            },
            |buf, _| decode_multiple_rows(buf, None),
            &Options::default(),
        )
    }
}

struct ConnSchemaCache {
    cache: RefCell<HashMap<SocketAddr, Rc<ConnSchema>>>,
}

unsafe impl Sync for ConnSchemaCache {}

thread_local! {
    static SCHEMA_CACHE: Lazy<ConnSchemaCache> = Lazy::new(||
        ConnSchemaCache {
            cache: RefCell::new(HashMap::new()),
        }
    )
}