use crate::errors::*;
use crate::*;
use async_trait::async_trait;
use futures::stream::BoxStream;
use crate::cache::cache_query_engine::FirestoreCacheQueryEngine;
use chrono::Utc;
use futures::StreamExt;
use gcloud_sdk::google::firestore::v1::Document;
use gcloud_sdk::prost::Message;
use redb::*;
use std::collections::HashMap;
use std::path::PathBuf;
use tracing::*;
pub struct FirestorePersistentCacheBackend {
pub config: FirestoreCacheConfiguration,
redb: Database,
}
impl FirestorePersistentCacheBackend {
pub fn new(config: FirestoreCacheConfiguration) -> FirestoreResult<Self> {
let temp_dir = std::env::temp_dir();
let firestore_cache_dir = temp_dir.join("firestore_cache");
let db_dir = firestore_cache_dir.join("persistent");
if !db_dir.exists() {
debug!(
directory = %db_dir.display(),
"Creating a temp directory to store persistent cache.",
);
std::fs::create_dir_all(&db_dir)?;
} else {
debug!(
directory = %db_dir.display(),
"Using a temp directory to store persistent cache.",
);
}
Self::with_options(config, db_dir.join("redb"))
}
pub fn with_options(
config: FirestoreCacheConfiguration,
data_file_path: PathBuf,
) -> FirestoreResult<Self> {
if data_file_path.exists() {
debug!(?data_file_path, "Opening database for persistent cache...",);
} else {
debug!(?data_file_path, "Creating database for persistent cache...",);
}
let mut db = Database::create(data_file_path)?;
db.compact()?;
info!("Successfully opened database for persistent cache.");
Ok(Self { config, redb: db })
}
async fn preload_collections(&self, db: &FirestoreDb) -> Result<(), FirestoreError> {
for (collection_path, config) in &self.config.collections {
let td: TableDefinition<&str, &[u8]> = TableDefinition::new(collection_path.as_str());
match config.collection_load_mode {
FirestoreCacheCollectionLoadMode::PreloadAllDocs
| FirestoreCacheCollectionLoadMode::PreloadAllIfEmpty => {
let existing_records = {
let read_tx = self.redb.begin_read()?;
if read_tx
.list_tables()?
.any(|t| t.name() == collection_path.as_str())
{
read_tx.open_table(td)?.len()?
} else {
0
}
};
if matches!(
config.collection_load_mode,
FirestoreCacheCollectionLoadMode::PreloadAllIfEmpty
) && existing_records > 0
{
info!(
collection_path = collection_path.as_str(),
entries_loaded = existing_records,
"Preloading collection has been skipped.",
);
continue;
}
debug!(
collection_path = collection_path.as_str(),
"Preloading collection."
);
let params = if let Some(parent) = &config.parent {
db.fluent()
.select()
.from(config.collection_name.as_str())
.parent(parent)
} else {
db.fluent().select().from(config.collection_name.as_str())
};
let stream = params.stream_query().await?;
stream
.enumerate()
.map(|(index, docs)| {
if index > 0 && index % 5000 == 0 {
debug!(
collection_path = collection_path.as_str(),
entries_loaded = index,
"Collection preload in progress...",
);
}
docs
})
.ready_chunks(100)
.for_each(|docs| async move {
if let Err(err) = self.write_batch_docs(collection_path, docs) {
error!(?err, "Error while preloading collection.");
}
})
.await;
let updated_records = if matches!(
config.collection_load_mode,
FirestoreCacheCollectionLoadMode::PreloadAllDocs
) || existing_records == 0
{
let read_tx = self.redb.begin_read()?;
let table = read_tx.open_table(td)?;
table.len()?
} else {
existing_records
};
info!(
collection_path = collection_path.as_str(),
updated_records, "Preloading collection has been finished.",
);
}
FirestoreCacheCollectionLoadMode::PreloadNone => {
let tx = self.redb.begin_write()?;
debug!(collection_path, "Creating corresponding collection table.",);
tx.open_table(td)?;
tx.commit()?;
}
}
}
Ok(())
}
fn write_batch_docs(&self, collection_path: &str, docs: Vec<Document>) -> FirestoreResult<()> {
let td: TableDefinition<&str, &[u8]> = TableDefinition::new(collection_path);
let write_txn = self.redb.begin_write()?;
{
let mut table = write_txn.open_table(td)?;
for doc in docs {
let (_, document_id) = split_document_path(&doc.name);
let doc_bytes = Self::document_to_buf(&doc)?;
table.insert(document_id, doc_bytes.as_slice())?;
}
}
write_txn.commit()?;
Ok(())
}
fn document_to_buf(doc: &FirestoreDocument) -> FirestoreResult<Vec<u8>> {
let mut proto_output_buf = Vec::new();
doc.encode(&mut proto_output_buf)?;
Ok(proto_output_buf)
}
fn buf_to_document<B>(buf: B) -> FirestoreResult<FirestoreDocument>
where
B: AsRef<[u8]>,
{
let doc = FirestoreDocument::decode(buf.as_ref())?;
Ok(doc)
}
fn write_document(&self, doc: &Document) -> FirestoreResult<()> {
let (collection_path, document_id) = split_document_path(&doc.name);
if self.config.collections.contains_key(collection_path) {
let td: TableDefinition<&str, &[u8]> = TableDefinition::new(collection_path);
let write_txn = self.redb.begin_write()?;
{
let mut table = write_txn.open_table(td)?;
let doc_bytes = Self::document_to_buf(doc)?;
table.insert(document_id, doc_bytes.as_slice())?;
}
write_txn.commit()?;
Ok(())
} else {
Ok(())
}
}
fn table_len(&self, collection_id: &str) -> FirestoreResult<u64> {
let td: TableDefinition<&str, &[u8]> = TableDefinition::new(collection_id);
let read_tx = self.redb.begin_read()?;
let len = read_tx.open_table(td)?.len()?;
Ok(len)
}
async fn query_cached_docs<'b>(
&self,
collection_path: &str,
query_engine: FirestoreCacheQueryEngine,
) -> FirestoreResult<BoxStream<'b, FirestoreResult<FirestoreDocument>>> {
let td: TableDefinition<&str, &[u8]> = TableDefinition::new(collection_path);
let read_tx = self.redb.begin_read()?;
let table = read_tx.open_table(td)?;
let iter = table.iter()?;
let mut docs: Vec<FirestoreResult<FirestoreDocument>> = Vec::new();
for record in iter {
let (_, v) = record?;
let doc = Self::buf_to_document(v.value())?;
if query_engine.matches_doc(&doc) {
docs.push(Ok(doc));
}
}
let filtered_stream = Box::pin(futures::stream::iter(docs));
let output_stream = query_engine.process_query_stream(filtered_stream).await?;
Ok(output_stream)
}
}
#[async_trait]
impl FirestoreCacheBackend for FirestorePersistentCacheBackend {
async fn load(
&self,
_options: &FirestoreCacheOptions,
db: &FirestoreDb,
) -> Result<Vec<FirestoreListenerTargetParams>, FirestoreError> {
let read_from_time = Utc::now();
self.preload_collections(db).await?;
Ok(self
.config
.collections
.iter()
.map(|(collection_path, collection_config)| {
let collection_table_len = self.table_len(collection_path).ok().unwrap_or(0);
let resume_type = if collection_table_len == 0 {
Some(FirestoreListenerTargetResumeType::ReadTime(read_from_time))
} else {
None
};
FirestoreListenerTargetParams::new(
collection_config.listener_target.clone(),
FirestoreTargetType::Query(
FirestoreQueryParams::new(
collection_config.collection_name.as_str().into(),
)
.opt_parent(collection_config.parent.clone()),
),
HashMap::new(),
)
.opt_resume_type(resume_type)
})
.collect())
}
async fn invalidate_all(&self) -> FirestoreResult<()> {
for collection_path in self.config.collections.keys() {
let td: TableDefinition<&str, &[u8]> = TableDefinition::new(collection_path.as_str());
let write_txn = self.redb.begin_write()?;
{
debug!(
collection_path,
"Invalidating collection and draining the corresponding table.",
);
let mut table = write_txn.open_table(td)?;
table.retain(|_, _| false)?;
}
write_txn.commit()?;
}
Ok(())
}
async fn shutdown(&self) -> Result<(), FirestoreError> {
Ok(())
}
async fn on_listen_event(&self, event: FirestoreListenEvent) -> FirestoreResult<()> {
match event {
FirestoreListenEvent::DocumentChange(doc_change) => {
if let Some(doc) = doc_change.document {
trace!(
doc_name = ?doc.name,
"Writing document to cache due to listener event.",
);
self.write_document(&doc)?;
}
Ok(())
}
FirestoreListenEvent::DocumentDelete(doc_deleted) => {
let (collection_path, document_id) = split_document_path(&doc_deleted.document);
let write_txn = self.redb.begin_write()?;
let td: TableDefinition<&str, &[u8]> = TableDefinition::new(collection_path);
let mut table = write_txn.open_table(td)?;
trace!(
deleted_doc = ?doc_deleted.document.as_str(),
"Removing document from cache due to listener event.",
);
table.remove(document_id)?;
Ok(())
}
_ => Ok(()),
}
}
}
#[async_trait]
impl FirestoreCacheDocsByPathSupport for FirestorePersistentCacheBackend {
async fn get_doc_by_path(
&self,
document_path: &str,
) -> FirestoreResult<Option<FirestoreDocument>> {
let (collection_path, document_id) = split_document_path(document_path);
if self.config.collections.contains_key(collection_path) {
let td: TableDefinition<&str, &[u8]> = TableDefinition::new(collection_path);
let read_tx = self.redb.begin_read()?;
let table = read_tx.open_table(td)?;
let value = table.get(document_id)?;
value.map(|v| Self::buf_to_document(v.value())).transpose()
} else {
Ok(None)
}
}
async fn update_doc_by_path(&self, document: &FirestoreDocument) -> FirestoreResult<()> {
self.write_document(document)?;
Ok(())
}
async fn list_all_docs<'b>(
&self,
collection_path: &str,
) -> FirestoreResult<FirestoreCachedValue<BoxStream<'b, FirestoreResult<FirestoreDocument>>>>
{
if self.config.collections.contains_key(collection_path) {
let td: TableDefinition<&str, &[u8]> = TableDefinition::new(collection_path);
let read_tx = self.redb.begin_read()?;
let table = read_tx.open_table(td)?;
let iter = table.iter()?;
let mut docs: Vec<FirestoreResult<FirestoreDocument>> = Vec::new();
for record in iter {
let (_, v) = record?;
let doc = Self::buf_to_document(v.value())?;
docs.push(Ok(doc));
}
Ok(FirestoreCachedValue::UseCached(Box::pin(
futures::stream::iter(docs),
)))
} else {
Ok(FirestoreCachedValue::SkipCache)
}
}
async fn query_docs<'b>(
&self,
collection_path: &str,
query: &FirestoreQueryParams,
) -> FirestoreResult<FirestoreCachedValue<BoxStream<'b, FirestoreResult<FirestoreDocument>>>>
{
if self.config.collections.contains_key(collection_path) {
let simple_query_engine = FirestoreCacheQueryEngine::new(query);
if simple_query_engine.params_supported() {
Ok(FirestoreCachedValue::UseCached(
self.query_cached_docs(collection_path, simple_query_engine)
.await?,
))
} else {
Ok(FirestoreCachedValue::SkipCache)
}
} else {
Ok(FirestoreCachedValue::SkipCache)
}
}
}
impl From<redb::Error> for FirestoreError {
fn from(db_err: redb::Error) -> Self {
FirestoreError::CacheError(FirestoreCacheError::new(
FirestoreErrorPublicGenericDetails::new("RedbError".into()),
format!("Cache error: {db_err}"),
))
}
}
impl From<redb::DatabaseError> for FirestoreError {
fn from(db_err: redb::DatabaseError) -> Self {
FirestoreError::CacheError(FirestoreCacheError::new(
FirestoreErrorPublicGenericDetails::new("RedbDatabaseError".into()),
format!("Cache error: {db_err}"),
))
}
}
impl From<redb::TransactionError> for FirestoreError {
fn from(db_err: redb::TransactionError) -> Self {
FirestoreError::CacheError(FirestoreCacheError::new(
FirestoreErrorPublicGenericDetails::new("RedbTransactionError".into()),
format!("Cache error: {db_err}"),
))
}
}
impl From<redb::TableError> for FirestoreError {
fn from(db_err: redb::TableError) -> Self {
FirestoreError::CacheError(FirestoreCacheError::new(
FirestoreErrorPublicGenericDetails::new("RedbTableError".into()),
format!("Cache error: {db_err}"),
))
}
}
impl From<redb::CommitError> for FirestoreError {
fn from(db_err: redb::CommitError) -> Self {
FirestoreError::CacheError(FirestoreCacheError::new(
FirestoreErrorPublicGenericDetails::new("RedbCommitError".into()),
format!("Cache error: {db_err}"),
))
}
}
impl From<redb::StorageError> for FirestoreError {
fn from(db_err: redb::StorageError) -> Self {
FirestoreError::CacheError(FirestoreCacheError::new(
FirestoreErrorPublicGenericDetails::new("RedbStorageError".into()),
format!("Cache error: {db_err}"),
))
}
}
impl From<redb::CompactionError> for FirestoreError {
fn from(db_err: redb::CompactionError) -> Self {
FirestoreError::CacheError(FirestoreCacheError::new(
FirestoreErrorPublicGenericDetails::new("RedbCompactionError".into()),
format!("Cache error: {db_err}"),
))
}
}