tarantool 11.0.0

Tarantool rust bindings
Documentation
use once_cell::unsync::Lazy;

use std::cell::{Cell, RefCell};
use std::collections::HashMap;
use std::net::SocketAddr;
use std::rc::Rc;

use crate::error::Error;
use crate::fiber::{Latch, LatchGuard};
use crate::index::{self, IteratorType};
use crate::network::protocol;
use crate::space::{self, SystemSpace, SYSTEM_ID_MAX};
use crate::tuple::Tuple;

use super::inner::ConnInner;
use super::options::Options;

pub struct ConnSchema {
    version: Cell<Option<u64>>,
    is_updating: Cell<bool>,
    space_ids: RefCell<HashMap<String, u32>>,
    index_ids: RefCell<HashMap<(u32, String), u32>>,
    lock: Latch,
}

impl ConnSchema {
    pub fn acquire(addrs: &[SocketAddr]) -> Rc<ConnSchema> {
        let addr = SCHEMA_CACHE.with(|cache| {
            let cache = cache.cache.borrow();
            addrs.iter().find_map(|addr| cache.get(addr).cloned())
        });
        if let Some(addr) = addr {
            return addr;
        }

        let schema = Rc::new(ConnSchema {
            version: Cell::new(None),
            is_updating: Cell::new(false),
            space_ids: Default::default(),
            index_ids: Default::default(),
            lock: Latch::new(),
        });

        SCHEMA_CACHE.with(|cache| {
            let mut cache = cache.cache.borrow_mut();
            for addr in addrs {
                cache.insert(*addr, schema.clone());
            }
        });

        schema
    }

    pub fn refresh(
        &self,
        conn_inner: &Rc<ConnInner>,
        actual_version: Option<u64>,
    ) -> Result<bool, Error> {
        let mut _lock: Option<LatchGuard> = None;

        if self.is_updating.get() {
            _lock = Some(self.lock.lock());
        }

        let result = if self.is_outdated(actual_version) {
            if _lock.is_none() {
                _lock = Some(self.lock.lock());
            }

            self.update(conn_inner)?;
            true
        } else {
            false
        };
        Ok(result)
    }

    pub fn update(&self, conn_inner: &Rc<ConnInner>) -> Result<(), Error> {
        self.is_updating.set(true);
        let (spaces_data, actual_schema_version) = self.fetch_schema_spaces(conn_inner)?;
        for row in spaces_data {
            let metadata = row.decode::<space::Metadata>()?;
            self.space_ids
                .borrow_mut()
                .insert(metadata.name.to_string(), metadata.id);
        }

        for row in self.fetch_schema_indexes(conn_inner)? {
            let metadata = row.decode::<index::Metadata>()?;
            self.index_ids.borrow_mut().insert(
                (metadata.space_id, metadata.name.to_string()),
                metadata.index_id,
            );
        }

        self.version.set(Some(actual_schema_version));
        self.is_updating.set(false);
        Ok(())
    }

    pub fn lookup_space(&self, name: &str) -> Option<u32> {
        self.space_ids.borrow().get(name).copied()
    }

    pub fn lookup_index(&self, name: &str, space_id: u32) -> Option<u32> {
        self.index_ids
            .borrow()
            .get(&(space_id, name.to_string()))
            .copied()
    }

    fn is_outdated(&self, actual_version: Option<u64>) -> bool {
        match actual_version {
            None => true,
            Some(actual_version) => match self.version.get() {
                None => true,
                Some(cached_version) => actual_version > cached_version,
            },
        }
    }

    #[inline(always)]
    fn fetch_schema_spaces(&self, conn_inner: &Rc<ConnInner>) -> Result<(Vec<Tuple>, u64), Error> {
        let rows = conn_inner.request(
            &protocol::Select {
                space_id: SystemSpace::VSpace as u32,
                index_id: 0,
                limit: u32::MAX,
                offset: 0,
                iterator_type: IteratorType::GT,
                key: &(SYSTEM_ID_MAX,),
            },
            &Options::default(),
        )?;
        let schema_version = conn_inner
            .schema_version
            .get()
            .expect("should be present after we received a response");
        Ok((rows, schema_version))
    }

    #[inline(always)]
    fn fetch_schema_indexes(&self, conn_inner: &Rc<ConnInner>) -> Result<Vec<Tuple>, Error> {
        conn_inner.request(
            &protocol::Select {
                space_id: SystemSpace::VIndex as u32,
                index_id: 0,
                limit: u32::MAX,
                offset: 0,
                iterator_type: IteratorType::All,
                key: &(),
            },
            &Options::default(),
        )
    }
}

struct ConnSchemaCache {
    cache: RefCell<HashMap<SocketAddr, Rc<ConnSchema>>>,
}

unsafe impl Sync for ConnSchemaCache {}

thread_local! {
    static SCHEMA_CACHE: Lazy<ConnSchemaCache> = Lazy::new(||
        ConnSchemaCache {
            cache: RefCell::new(HashMap::new()),
        }
    )
}