memory-lol 0.2.0

Database for tracking historical social media data
Documentation
use super::{
    accounts::AccountTable,
    table::{Mode, Table, Writeable},
    Error,
};
use rocksdb::{IteratorMode, MergeOperands, Options, DB};
use std::convert::TryInto;
use std::marker::PhantomData;
use std::path::Path;

#[derive(Clone, Debug, Eq, PartialEq)]
pub struct ScreenNameTableCounts {
    pub screen_name_count: u64,
    pub mapping_count: u64,
}

pub struct ScreenNameTable<M> {
    db: Option<DB>,
    mode: PhantomData<M>,
}

impl<M> Table for ScreenNameTable<M> {
    type Counts = ScreenNameTableCounts;

    fn underlying(&self) -> &DB {
        self.db.as_ref().unwrap()
    }

    fn get_counts(&self) -> Result<Self::Counts, Error> {
        let mut screen_name_count = 0;
        let mut mapping_count = 0;

        let iter = self.db.as_ref().unwrap().iterator(IteratorMode::Start);

        for result in iter {
            let (_, value) = result?;

            screen_name_count += 1;
            let value_len = value.len();

            if value_len % 8 == 0 {
                mapping_count += value_len as u64 / 8;
            } else {
                return Err(Error::InvalidValue(value.to_vec()));
            }
        }

        Ok(Self::Counts {
            screen_name_count,
            mapping_count,
        })
    }
}

impl<M> ScreenNameTable<M> {
    fn make_options() -> Options {
        let mut options = Options::default();
        options.create_if_missing(true);
        options.set_merge_operator_associative("merge", merge);
        options
    }

    pub fn lookup(&self, screen_name: &str) -> Result<Vec<u64>, Error> {
        let value = self
            .db
            .as_ref()
            .unwrap()
            .get_pinned(screen_name_to_key(screen_name))?;
        value
            .as_ref()
            .map(|value| value_to_ids(value))
            .unwrap_or_else(|| Ok(vec![]))
    }

    pub fn lookup_by_prefix(
        &self,
        screen_name: &str,
        limit: usize,
    ) -> Result<Vec<(String, Vec<u64>)>, Error> {
        let prefix = screen_name_to_key(screen_name);
        let iter = self.db.as_ref().unwrap().prefix_iterator(&prefix);
        let mut results = Vec::with_capacity(1);

        for result in iter.take(limit) {
            let (key, value) = result?;

            if key.starts_with(&prefix) {
                let screen_name = key_to_screen_name(&key)?;
                let ids = value_to_ids(&value)?;

                results.push((screen_name.to_string(), ids));
            } else {
                break;
            }
        }

        Ok(results)
    }

    pub fn get_most_reused(&self, k: usize) -> Result<Vec<(String, Vec<u64>)>, Error> {
        let mut queue = priority_queue::DoublePriorityQueue::with_capacity(k);
        let iter = self.db.as_ref().unwrap().iterator(IteratorMode::Start);

        for result in iter {
            let (key, value) = result?;
            let screen_name = key_to_screen_name(&key)?;
            let ids = value_to_ids(&value)?;

            let min = queue.peek_min().map(|(_, count)| *count).unwrap_or(0);
            let len = ids.len();

            if len >= min || queue.len() < k {
                queue.push((screen_name.to_string(), ids), len);

                if queue.len() > k {
                    queue.pop_min();
                }
            }
        }

        Ok(queue.into_descending_sorted_vec())
    }
}

impl<M: Mode> ScreenNameTable<M> {
    pub fn open<P: AsRef<Path>>(path: P) -> Result<Self, Error> {
        let options = Self::make_options();
        let db = if M::is_read_only() {
            DB::open_for_read_only(&options, path, true)?
        } else {
            DB::open(&options, path)?
        };

        Ok(Self {
            db: Some(db),
            mode: PhantomData,
        })
    }
}

impl ScreenNameTable<Writeable> {
    pub fn insert(&self, screen_name: &str, id: u64) -> Result<(), Error> {
        Ok(self
            .db
            .as_ref()
            .unwrap()
            .merge(screen_name_to_key(screen_name), id.to_be_bytes())?)
    }

    pub fn rebuild<Mode>(&mut self, accounts: &AccountTable<Mode>) -> Result<(), Error> {
        let path = self.db.as_ref().unwrap().path().to_path_buf();
        self.db.take().unwrap();

        let options = Self::make_options();

        DB::destroy(&options, &path)?;

        self.db = Some(DB::open(&options, &path)?);

        for pair in accounts.pairs() {
            let (id, screen_name, _) = pair?;

            self.insert(&screen_name, id)?;
        }

        Ok(())
    }
}

fn merge(
    _new_key: &[u8],
    existing_val: Option<&[u8]>,
    operands: &MergeOperands,
) -> Option<Vec<u8>> {
    let mut new_val = match existing_val {
        Some(bytes) => bytes.to_vec(),
        None => Vec::with_capacity(operands.len() * 8),
    };

    for operand in operands.iter() {
        merge_for_screen_name(&mut new_val, operand);
    }

    Some(new_val)
}

fn merge_for_screen_name(a: &mut Vec<u8>, b: &[u8]) {
    let original_len = a.len();
    let mut i = 0;

    while i < b.len() {
        let bytes: [u8; 8] = match b[i..i + 8].try_into() {
            Ok(bytes) => bytes,
            Err(error) => {
                log::error!("{}", error);
                return;
            }
        };
        let next_b = u64::from_be_bytes(bytes);

        let mut found = false;
        let mut j = 0;

        while !found && j < original_len {
            let bytes = match a[j..j + 8].try_into() {
                Ok(bytes) => bytes,
                Err(error) => {
                    log::error!("{}", error);
                    return;
                }
            };
            let next_a = u64::from_be_bytes(bytes);
            found = next_a == next_b;
            j += 8;
        }

        if !found {
            a.extend_from_slice(&b[i..i + 8]);
        }
        i += 8;
    }
}

fn screen_name_to_key(screen_name: &str) -> Vec<u8> {
    let form = screen_name.to_lowercase();
    form.as_bytes().to_vec()
}

fn key_to_screen_name(key: &[u8]) -> Result<&str, Error> {
    Ok(std::str::from_utf8(key)?)
}

fn value_to_ids(value: &[u8]) -> Result<Vec<u64>, Error> {
    let mut result = Vec::with_capacity(value.len() / 8);
    let mut i = 0;

    while i < value.len() {
        let id = u64::from_be_bytes(
            value[i..i + 8]
                .try_into()
                .map_err(|_| Error::InvalidValue(value.to_vec()))?,
        );
        result.push(id);
        i += 8;
    }

    Ok(result)
}