mod event_consumer;
pub mod impls;
mod worker;
mod write_op;
use std::sync::Arc;
use tokio::sync::mpsc;
use vantage_table::any::AnyTable;
use vantage_table::pagination::Pagination;
use crate::cache::Cache;
use crate::live_stream::LiveStream;
pub use write_op::WriteOp;
const WRITE_QUEUE_CAPACITY: usize = 256;
#[derive(Clone)]
pub struct LiveTable {
pub(crate) master: AnyTable,
pub(crate) cache_key: String,
pub(crate) cache: Arc<dyn Cache>,
pub(crate) custom_write_target: Option<AnyTable>,
pub(crate) write_queue: mpsc::Sender<WriteOp>,
#[allow(dead_code)]
pub(crate) live_stream: Option<Arc<dyn LiveStream>>,
pub(crate) master_ipp: Option<i64>,
pub(crate) pagination: Option<Pagination>,
}
impl LiveTable {
pub fn new(master: AnyTable, cache_key: impl Into<String>, cache: Arc<dyn Cache>) -> Self {
let cache_key = cache_key.into();
let (tx, rx) = mpsc::channel::<WriteOp>(WRITE_QUEUE_CAPACITY);
worker::spawn(
rx,
master.clone(),
None, cache_key.clone(),
Arc::clone(&cache),
);
Self {
master,
cache_key,
cache,
custom_write_target: None,
write_queue: tx,
live_stream: None,
master_ipp: None,
pagination: None,
}
}
pub fn with_master_ipp(mut self, ipp: i64) -> Self {
self.master_ipp = Some(ipp);
self
}
pub fn with_custom_write_target(mut self, target: AnyTable) -> Self {
let (tx, rx) = mpsc::channel::<WriteOp>(WRITE_QUEUE_CAPACITY);
worker::spawn(
rx,
self.master.clone(),
Some(target.clone()),
self.cache_key.clone(),
Arc::clone(&self.cache),
);
self.write_queue = tx;
self.custom_write_target = Some(target);
self
}
pub fn with_live_stream(mut self, stream: Arc<dyn LiveStream>) -> Self {
event_consumer::spawn(
Arc::clone(&stream),
self.cache_key.clone(),
Arc::clone(&self.cache),
);
self.live_stream = Some(stream);
self
}
pub fn page_cache_key(&self, page: i64) -> String {
format!("{}/page_{}", self.cache_key, page)
}
pub fn id_cache_key(&self, id: &str) -> String {
format!("{}/id/{}", self.cache_key, id)
}
}
impl std::fmt::Debug for LiveTable {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("LiveTable")
.field("cache_key", &self.cache_key)
.field("master_ipp", &self.master_ipp)
.field(
"has_custom_write_target",
&self.custom_write_target.is_some(),
)
.field("has_live_stream", &self.live_stream.is_some())
.finish()
}
}