summa-core 0.22.6

Summa Core library
Documentation
use izihawa_ttl_cache::TtlCache;
use summa_proto::proto;

use crate::components::IntermediateExtractionResult;
use crate::configs::core::CollectorCacheConfig;

const BLOCK_SIZE: u32 = 100;

pub struct CollectorCache {
    cache: TtlCache<String, IntermediateExtractionResult>,
    ttl_interval_ms: instant::Duration,
}

impl CollectorCache {
    pub fn new(config: &CollectorCacheConfig) -> Self {
        CollectorCache {
            ttl_interval_ms: instant::Duration::from_millis(config.ttl_interval_ms.unwrap_or(120000)),
            cache: TtlCache::new(config.size),
        }
    }

    pub fn remove_expired(&mut self) {
        self.cache.remove_expired();
    }

    pub fn caching_key(&self, caching_key: &str, collector: &proto::Collector) -> String {
        match collector {
            proto::Collector {
                collector: Some(proto::collector::Collector::TopDocs(top_docs)),
            } => {
                format!(
                    "{}|TopDocsCollector {{ limit={}, offset={}, scorer={:?} }}|",
                    caching_key, top_docs.limit, top_docs.offset, top_docs.scorer
                )
            }
            other => format!("{}|{:?}", caching_key, other),
        }
    }

    pub fn is_caching_enabled(collector: &proto::Collector) -> bool {
        match collector {
            proto::Collector {
                collector: Some(proto::collector::Collector::TopDocs(top_docs)),
            } => {
                let left_bound = top_docs.offset;
                let right_bound = left_bound + top_docs.limit;

                let left_block_bound = top_docs.offset - top_docs.offset % BLOCK_SIZE;
                let right_block_bound = left_block_bound + BLOCK_SIZE;

                left_block_bound <= left_bound && right_bound <= right_block_bound
            }
            proto::Collector {
                collector: Some(proto::collector::Collector::ReservoirSampling(_)),
            } => false,
            _ => true,
        }
    }

    pub fn adjust_collector(collector: &proto::Collector) -> proto::Collector {
        match collector {
            proto::Collector {
                collector: Some(proto::collector::Collector::TopDocs(top_docs)),
            } => {
                let mut top_docs = top_docs.clone();
                top_docs.offset -= top_docs.offset % BLOCK_SIZE;
                top_docs.limit = BLOCK_SIZE;
                proto::Collector {
                    collector: Some(proto::collector::Collector::TopDocs(top_docs)),
                }
            }
            other => other.clone(),
        }
    }

    pub fn adjust_result(result: &IntermediateExtractionResult, collector: &proto::Collector) -> IntermediateExtractionResult {
        match result {
            IntermediateExtractionResult::Ready(r) => IntermediateExtractionResult::Ready(r.clone()),
            IntermediateExtractionResult::PreparedDocumentReferences(prepared_document_references) => {
                let mut prepared_document_references = prepared_document_references.clone();
                match &collector.collector {
                    Some(proto::collector::Collector::TopDocs(top_docs_collector)) => {
                        prepared_document_references.has_next = prepared_document_references.has_next
                            || prepared_document_references.scored_doc_addresses.len()
                                > ((top_docs_collector.offset + top_docs_collector.limit) % BLOCK_SIZE) as usize;
                        prepared_document_references.scored_doc_addresses = prepared_document_references
                            .scored_doc_addresses
                            .into_iter()
                            .skip((top_docs_collector.offset % BLOCK_SIZE) as usize)
                            .take((((top_docs_collector.limit - 1) % BLOCK_SIZE) + 1) as usize)
                            .collect()
                    }
                    _other => {}
                }
                IntermediateExtractionResult::PreparedDocumentReferences(prepared_document_references)
            }
        }
    }

    pub fn get(&mut self, caching_key: &str, adjusted_collector: &proto::Collector, collector: &proto::Collector) -> Option<IntermediateExtractionResult> {
        let caching_key = self.caching_key(caching_key, adjusted_collector);
        self.cache.get(&caching_key).map(|cache_value| Self::adjust_result(cache_value, collector))
    }

    pub fn put(&mut self, caching_key: &str, collector: &proto::Collector, value: IntermediateExtractionResult) {
        let caching_key = self.caching_key(caching_key, collector);
        self.cache.insert(caching_key.clone(), value.clone(), self.ttl_interval_ms);
    }
}