use crate::errors::*;
use crate::*;
use async_trait::async_trait;
use chrono::Utc;
use futures::stream::BoxStream;
use moka::future::{Cache, CacheBuilder};
use crate::cache::cache_query_engine::FirestoreCacheQueryEngine;
use futures::StreamExt;
use std::collections::HashMap;
use tracing::*;
pub type FirestoreMemCache = Cache<String, FirestoreDocument>;
pub type FirestoreMemCacheOptions = CacheBuilder<String, FirestoreDocument, FirestoreMemCache>;
pub struct FirestoreMemoryCacheBackend {
pub config: FirestoreCacheConfiguration,
collection_caches: HashMap<String, FirestoreMemCache>,
}
const FIRESTORE_MEMORY_CACHE_DEFAULT_MAX_CAPACITY: u64 = 50000;
impl FirestoreMemoryCacheBackend {
pub fn new(config: FirestoreCacheConfiguration) -> FirestoreResult<Self> {
Self::with_max_capacity(config, FIRESTORE_MEMORY_CACHE_DEFAULT_MAX_CAPACITY)
}
pub fn with_max_capacity(
config: FirestoreCacheConfiguration,
max_capacity: u64,
) -> FirestoreResult<Self> {
Self::with_collection_options(config, |_| {
FirestoreMemCache::builder().max_capacity(max_capacity)
})
}
pub fn with_collection_options<FN>(
config: FirestoreCacheConfiguration,
collection_mem_options: FN,
) -> FirestoreResult<Self>
where
FN: Fn(&str) -> FirestoreMemCacheOptions,
{
let collection_caches = config
.collections
.keys()
.map(|collection_path| {
(
collection_path.clone(),
collection_mem_options(collection_path.as_str()).build(),
)
})
.collect();
Ok(Self {
config,
collection_caches,
})
}
async fn preload_collections(&self, db: &FirestoreDb) -> Result<(), FirestoreError> {
for (collection_path, config) in &self.config.collections {
match config.collection_load_mode {
FirestoreCacheCollectionLoadMode::PreloadAllDocs
| FirestoreCacheCollectionLoadMode::PreloadAllIfEmpty => {
if let Some(mem_cache) = self.collection_caches.get(collection_path.as_str()) {
debug!(collection_path, "Preloading collection.");
let params = if let Some(parent) = &config.parent {
db.fluent()
.select()
.from(config.collection_name.as_str())
.parent(parent)
} else {
db.fluent().select().from(config.collection_name.as_str())
};
let stream = params.stream_query().await?;
stream
.enumerate()
.map(|(index, docs)| {
if index > 0 && index % 5000 == 0 {
debug!(
collection_path = collection_path.as_str(),
entries_loaded = index,
"Collection preload in progress...",
);
}
docs
})
.for_each_concurrent(1, |doc| async move {
let (_, document_id) = split_document_path(&doc.name);
mem_cache.insert(document_id.to_string(), doc).await;
})
.await;
mem_cache.run_pending_tasks().await;
info!(
collection_path = collection_path.as_str(),
entry_count = mem_cache.entry_count(),
"Preloading collection has been finished.",
);
}
}
FirestoreCacheCollectionLoadMode::PreloadNone => {}
}
}
Ok(())
}
async fn query_cached_docs<'b>(
&self,
collection_path: &str,
query_engine: FirestoreCacheQueryEngine,
) -> FirestoreResult<BoxStream<'b, FirestoreResult<FirestoreDocument>>> {
match self.collection_caches.get(collection_path) {
Some(mem_cache) => {
let filtered_results: Vec<FirestoreResult<FirestoreDocument>> = mem_cache
.iter()
.filter(|(_, doc)| query_engine.matches_doc(doc))
.map(|(_, doc)| Ok(doc))
.collect();
let filtered_stream = futures::stream::iter(filtered_results);
let output_stream = query_engine
.process_query_stream(Box::pin(filtered_stream))
.await?;
Ok(output_stream)
}
None => Ok(Box::pin(futures::stream::empty())),
}
}
}
#[async_trait]
impl FirestoreCacheBackend for FirestoreMemoryCacheBackend {
async fn load(
&self,
_options: &FirestoreCacheOptions,
db: &FirestoreDb,
) -> Result<Vec<FirestoreListenerTargetParams>, FirestoreError> {
let read_from_time = Utc::now();
self.preload_collections(db).await?;
Ok(self
.config
.collections
.values()
.map(|collection_config| {
FirestoreListenerTargetParams::new(
collection_config.listener_target.clone(),
FirestoreTargetType::Query(
FirestoreQueryParams::new(
collection_config.collection_name.as_str().into(),
)
.opt_parent(collection_config.parent.clone()),
),
HashMap::new(),
)
.with_resume_type(FirestoreListenerTargetResumeType::ReadTime(read_from_time))
})
.collect())
}
async fn invalidate_all(&self) -> FirestoreResult<()> {
for (collection_path, mem_cache) in &self.collection_caches {
debug!(collection_path, "Invalidating cache for collection.");
mem_cache.invalidate_all();
mem_cache.run_pending_tasks().await;
}
Ok(())
}
async fn shutdown(&self) -> Result<(), FirestoreError> {
Ok(())
}
async fn on_listen_event(&self, event: FirestoreListenEvent) -> FirestoreResult<()> {
match event {
FirestoreListenEvent::DocumentChange(doc_change) => {
if let Some(doc) = doc_change.document {
let (collection_path, document_id) = split_document_path(&doc.name);
if let Some(mem_cache) = self.collection_caches.get(collection_path) {
trace!(
doc_name = ?doc.name,
"Writing document to cache due to listener event.",
);
mem_cache.insert(document_id.to_string(), doc).await;
}
}
Ok(())
}
FirestoreListenEvent::DocumentDelete(doc_deleted) => {
let (collection_path, document_id) = split_document_path(&doc_deleted.document);
if let Some(mem_cache) = self.collection_caches.get(collection_path) {
trace!(
deleted_doc = ?doc_deleted.document.as_str(),
"Removing document from cache due to listener event.",
);
mem_cache.remove(document_id).await;
}
Ok(())
}
_ => Ok(()),
}
}
}
#[async_trait]
impl FirestoreCacheDocsByPathSupport for FirestoreMemoryCacheBackend {
async fn get_doc_by_path(
&self,
document_path: &str,
) -> FirestoreResult<Option<FirestoreDocument>> {
let (collection_path, document_id) = split_document_path(document_path);
match self.collection_caches.get(collection_path) {
Some(mem_cache) => Ok(mem_cache.get(document_id).await),
None => Ok(None),
}
}
async fn update_doc_by_path(&self, document: &FirestoreDocument) -> FirestoreResult<()> {
let (collection_path, document_id) = split_document_path(&document.name);
match self.collection_caches.get(collection_path) {
Some(mem_cache) => {
mem_cache
.insert(document_id.to_string(), document.clone())
.await;
Ok(())
}
None => Ok(()),
}
}
async fn list_all_docs<'b>(
&self,
collection_path: &str,
) -> FirestoreResult<FirestoreCachedValue<BoxStream<'b, FirestoreResult<FirestoreDocument>>>>
{
match self.collection_caches.get(collection_path) {
Some(mem_cache) => {
let all_docs: Vec<FirestoreResult<FirestoreDocument>> =
mem_cache.iter().map(|(_, doc)| Ok(doc)).collect();
Ok(FirestoreCachedValue::UseCached(Box::pin(
futures::stream::iter(all_docs),
)))
}
None => Ok(FirestoreCachedValue::SkipCache),
}
}
async fn query_docs<'b>(
&self,
collection_path: &str,
query: &FirestoreQueryParams,
) -> FirestoreResult<FirestoreCachedValue<BoxStream<'b, FirestoreResult<FirestoreDocument>>>>
{
let simple_query_engine = FirestoreCacheQueryEngine::new(query);
if simple_query_engine.params_supported() {
Ok(FirestoreCachedValue::UseCached(
self.query_cached_docs(collection_path, simple_query_engine)
.await?,
))
} else {
Ok(FirestoreCachedValue::SkipCache)
}
}
}