vector-db-core 0.1.111

A high-performance vector database core for reading and writing historical records such as logs and chat records
Documentation
use serde::{
    Deserialize,
    Serialize,
};
use std::sync::{
    Arc,
    Mutex,
};

use crate::vector_engine::VectorEngine;

const MAX_CACHE_ITEMS: usize = 500000;

pub struct WritableCache<D, T>
where
    D: VectorEngine<T> + Sync + Send + 'static,
    T: Serialize
        + for<'de> Deserialize<'de>
        + 'static
        + std::fmt::Debug
        + Clone
        + Send
        + Sync,
{
    database: Arc<Mutex<D>>,
    cache: Arc<Mutex<Vec<T>>>,
    max_cache_items: usize,
}

impl<D, T> WritableCache<D, T>
where
    D: VectorEngine<T> + Sync + Send + 'static,
    T: Serialize
        + for<'de> Deserialize<'de>
        + 'static
        + std::fmt::Debug
        + Clone
        + Send
        + Sync,
{
    pub fn new(
        static_repository: String,
        dynamic_repository: String,
        initial_size_if_not_exists: u64,
    ) -> Self {
        let database = Arc::new(Mutex::<D>::new(VectorEngine::new(
            static_repository,
            dynamic_repository,
            initial_size_if_not_exists,
        )));
        let cache = Arc::new(Mutex::new(Vec::with_capacity(MAX_CACHE_ITEMS)));
        let cache_clone = Arc::clone(&cache);
        let database_clone = Arc::clone(&database);

        std::thread::spawn(move || loop {
            let mut cache_guard = cache_clone.lock().unwrap();

            if cache_guard.len() >= MAX_CACHE_ITEMS {
                let database_clone_guard = database_clone.lock().unwrap();
                println!("full cache");
                let mut objs = Vec::with_capacity(MAX_CACHE_ITEMS);
                objs.append(&mut cache_guard);
                println!("appended");
                // database_clone.lock().unwrap().pushx(objs);
                database_clone_guard.pushx(objs);
            }
        });

        Self {
            database,

            cache,
            max_cache_items: MAX_CACHE_ITEMS,
        }
    }

    pub fn push(&self, obj: T) {
        self.cache.lock().unwrap().push(obj);
    }

    pub fn pushx(&self, objs: Vec<T>) {
        let mut cache = self.cache.lock().unwrap();
        let mut objs = objs;
        cache.append(&mut objs);

        let cache_clone = Arc::clone(&self.cache);
        let database_clone = Arc::clone(&self.database);
        let max_cache_items = self.max_cache_items;
        std::thread::spawn(move || {
            let mut cache = cache_clone.lock().unwrap();
            let mut objs = Vec::with_capacity(max_cache_items);
            if cache.len() >= max_cache_items {
                objs.append(&mut *cache);
            }
            if !objs.is_empty() {
                database_clone.lock().unwrap().pushx(objs);
            }
        });
    }

    pub fn get_base_len(&self) -> usize {
        self.database.lock().unwrap().len()
    }

    pub fn get_cache_len(&self) -> usize {
        self.cache.lock().unwrap().len()
    }

    pub fn getting_obj_from_cache(&self, index: u64) -> T {
        self.cache.lock().unwrap()[index as usize].clone()
    }

    pub fn getting_objs_from_cache(&self, index: u64, count: u64) -> Vec<T> {
        let end_offset = (index + count) as usize;
        dbg!(index, count, end_offset);

        let cache = self.cache.lock().unwrap();
        dbg!(cache.len());
        cache[index as usize..end_offset].into()
    }

    pub fn get_each_len(&self) -> (u64, u64, u64) {
        let (cache_len, base_len) = (self.get_cache_len(), self.get_base_len());
        let length = cache_len + base_len;
        (cache_len as u64, base_len as u64, length as u64)
    }

    pub fn get_obj_from_cache(&self, index: u64) -> Option<T> {
        self.cache.lock().unwrap().get(index as usize).cloned()
    }

    pub fn get_objs_from_cache(&self, index: u64, count: u64) -> Option<Vec<T>> {
        let cache = self.cache.lock().unwrap();
        let end_offset = (index + count) as usize;
        dbg!(index, count, end_offset);
        dbg!(cache.len());
        cache
            .get(index as usize..end_offset)
            .map(|slice| slice.to_vec())
    }
}

impl<D, T> Drop for WritableCache<D, T>
where
    D: VectorEngine<T> + Sync + Send + 'static,
    T: Serialize
        + for<'de> Deserialize<'de>
        + 'static
        + std::fmt::Debug
        + Clone
        + Send
        + Sync,
{
    fn drop(&mut self) {
        let mut cache = self.cache.lock().unwrap();
        let max_cache_items = self.max_cache_items;
        let cache_len = cache.len();
        println!("prepare to drop  {} item of cache ", cache_len);
        if cache.len() != 0 {
            println!("dropping {} item of cache ", cache_len);

            let mut objs = Vec::with_capacity(max_cache_items);
            objs.append(&mut *cache);
            self.database.lock().unwrap().pushx(objs);
        }
    }
}

impl<D, T> VectorEngine<T> for WritableCache<D, T>
where
    D: VectorEngine<T> + 'static + Send + Sync,
    T: Serialize
        + for<'de> Deserialize<'de>
        + 'static
        + std::fmt::Debug
        + Clone
        + Send
        + Sync,
{
    fn new(
        static_repository: String,
        dynamic_repository: String,
        initial_size_if_not_exists: u64,
    ) -> Self {
        Self::new(
            static_repository,
            dynamic_repository,
            initial_size_if_not_exists,
        )
    }

    fn len(&self) -> usize {
        let (cache_len, base_len) = (self.get_cache_len(), self.get_base_len());

        cache_len + base_len
    }

    fn push(&self, obj: T) {
        self.push(obj);
    }

    fn pushx(&self, objs: Vec<T>) {
        self.pushx(objs);
    }

    fn pull(&self, index: u64) -> T {
        let cache = self.cache.lock().unwrap();
        let db = self.database.lock().unwrap();
        if index < db.len() as u64 {
            db.pull(index)
        } else if index >= db.len() as u64 && index < (db.len() + cache.len()) as u64 {
            if let Some(obj) = cache.get(index as usize - db.len()) {
                return obj.clone();
            } else {
                panic!(
                    "index {}  out of bounds! items in database and cache is {} and {} ",
                    index,
                    db.len(),
                    cache.len()
                );
            }
        } else {
            panic!(
                "index {}  out of bounds! items in database and cache is {} and {} ",
                index,
                db.len(),
                cache.len()
            );
        }
    }

    fn pullx(&self, index: u64, count: u64) -> Vec<T> {
        let db = self.database.lock().unwrap();
        let cache = self.cache.lock().unwrap();
        let end_index = index + count - 1;
        if end_index < db.len() as u64 {
            println!("reading in database");
            db.pullx(index, count)
        } else if index < db.len() as u64 && end_index < (db.len() + cache.len()) as u64 {
            println!("reading in database and cache");
            let mut front = db.pullx(index, db.len() as u64 - index);
            let mut back = if let Some(objs) =
                cache.get(0..end_index as usize - db.len() + 1)
            {
                objs.to_vec()
            } else {
                panic!("get from cache failed ! cache len is {} , index is {} , end_index is {}, db len is {}", cache.len(), index, end_index, db.len());
            };

            front.append(&mut back);
            return front;
        } else if index >= db.len() as u64 && end_index < (db.len() + cache.len()) as u64
        {
            println!("reading in cache");
            let objs = if let Some(objs) =
                cache.get(index as usize - db.len()..end_index as usize - db.len() + 1)
            {
                objs.to_vec()
            } else {
                panic!("get from cache failed ! cache len is {} , index is {} , end_index is {}, db len is {}", cache.len(), index, end_index, db.len());
            };
            return objs;
        } else {
            panic!("index {}  out of bounds!  database len {},  cache len is {}, end_index is  {} ", index, db.len(),cache.len(), end_index );
        }
    }
}

#[cfg(test)]
mod test {
    use super::*;
    use crate::services::{
        dynamic_vector_manage_service::DynamicVectorManageService,
        static_vector_manage_service::StaticVectorManageService,
    };
    const COUNT: usize = 1000;

    #[derive(Serialize, Deserialize, Default, Debug, Clone)]
    pub struct StaticStruct {
        my_usize: usize,
        my_u64: u64,
        my_u32: u32,
        my_u16: u16,
        my_u8: u8,
        my_boolean: bool,
    }

    #[test]
    fn test_push_static_one() {
        let my_obj: StaticStruct = StaticStruct {
            my_usize: 443,
            my_u64: 53,
            my_u32: 4399,
            my_u16: 3306,
            my_u8: 22,
            my_boolean: true,
        };
        let my_service = WritableCache::<
            StaticVectorManageService<StaticStruct>,
            StaticStruct,
        >::new(
            "cacheS.bin".to_string(), "cacheSD.bin".to_string(), 1024
        );

        my_service.push(my_obj);
    }

    #[test]
    fn test_one_by_one_push_static() {
        let my_service = WritableCache::<
            StaticVectorManageService<StaticStruct>,
            StaticStruct,
        >::new(
            "cacheS.bin".to_string(), "cacheSD.bin".to_string(), 1024
        );
        for i in 0..COUNT {
            let my_obj: StaticStruct = StaticStruct {
                my_usize: 443 + i,
                my_u64: 53,
                my_u32: 4399,
                my_u16: 3306,
                my_u8: 22,
                my_boolean: true,
            };

            my_service.push(my_obj);
        }
    }

    #[test]
    fn test_one_by_one_push_dynamic() {
        let my_service = WritableCache::<
            DynamicVectorManageService<StaticStruct>,
            StaticStruct,
        >::new(
            "cacheD.bin".to_string(), "cacheDD.bin".to_string(), 1024
        );
        for i in 0..COUNT {
            let my_obj: StaticStruct = StaticStruct {
                my_usize: 443 + i,
                my_u64: 53,
                my_u32: 4399,
                my_u16: 3306,
                my_u8: 22,
                my_boolean: true,
            };

            my_service.push(my_obj);
        }
    }

    #[test]
    fn test_extend_static() {
        let mut objs = Vec::new();
        let my_service = WritableCache::<
            StaticVectorManageService<StaticStruct>,
            StaticStruct,
        >::new(
            "cacheS.bin".to_string(), "cacheSD.bin".to_string(), 1024
        );
        for i in 0..COUNT {
            let my_obj: StaticStruct = StaticStruct {
                my_usize: 443 + i,
                my_u64: 53,
                my_u32: 4399,
                my_u16: 3306,
                my_u8: 22,
                my_boolean: true,
            };

            objs.push(my_obj);
        }
        my_service.pushx(objs);
    }
}