pub mod event_bus;
pub mod hot_tier;
pub mod impls;
pub(crate) mod query_index;
pub mod refresh;
pub mod shell;
pub mod worker;
use std::sync::Arc;
use tokio::sync::{Mutex, broadcast, mpsc};
use tokio::task::JoinHandle;
use vantage_core::Result;
use vantage_vista::Vista;
use crate::lens::{CacheTable, Lens};
use crate::ops::{ChangeEvent, WriteOp};
use crate::scenery::record::spawn_record_scenery;
use crate::scenery::{RecordScenery, RecordStatus, TableSceneryBuilder, ValueSceneryBuilder};
use ciborium::Value as CborValue;
use vantage_types::Record;
pub use event_bus::DioEvent;
pub use hot_tier::HotTier;
pub use shell::DioShell;
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Default)]
pub struct Generation(pub u64);
impl From<u64> for Generation {
fn from(v: u64) -> Self {
Generation(v)
}
}
impl From<Generation> for u64 {
fn from(g: Generation) -> Self {
g.0
}
}
#[derive(Clone)]
pub struct Dio {
pub(crate) inner: Arc<DioInner>,
}
pub(crate) struct DioInner {
pub(crate) lens: Arc<Lens>,
pub(crate) master: Vista,
pub(crate) cache: Arc<dyn CacheTable>,
pub(crate) cache_table_name: String,
pub(crate) write_queue: mpsc::Sender<WriteOp>,
pub(crate) event_bus: broadcast::Sender<DioEvent>,
pub(crate) refresh_task: Mutex<Option<JoinHandle<()>>>,
pub(crate) write_worker: Mutex<Option<JoinHandle<()>>>,
pub(crate) hot_tier: Arc<HotTier>,
pub(crate) query_indexes: std::sync::Mutex<
std::collections::HashMap<String, Arc<crate::dio::query_index::QueryIndex>>,
>,
}
impl DioInner {
pub(crate) fn query_index(&self, key: &str) -> Arc<crate::dio::query_index::QueryIndex> {
let mut guard = self.query_indexes.lock().unwrap();
guard
.entry(key.to_string())
.or_insert_with(|| Arc::new(crate::dio::query_index::QueryIndex::new()))
.clone()
}
}
impl Dio {
pub fn master(&self) -> &Vista {
&self.inner.master
}
pub fn cache(&self) -> &Arc<dyn CacheTable> {
&self.inner.cache
}
pub fn cache_table_name(&self) -> &str {
&self.inner.cache_table_name
}
pub fn subscribe_events(&self) -> broadcast::Receiver<DioEvent> {
self.inner.event_bus.subscribe()
}
#[doc(hidden)]
pub async fn take_write_worker_handle(&self) -> Option<JoinHandle<()>> {
self.inner.write_worker.lock().await.take()
}
pub fn table_scenery(&self) -> TableSceneryBuilder {
TableSceneryBuilder::new(self.inner.clone())
}
pub async fn record_scenery(&self, id: impl Into<String>) -> Result<Arc<dyn RecordScenery>> {
let id = id.into();
let (initial_record, initial_status) = match self.inner.cache.get_value(&id).await? {
Some(rec) => (Some(rec), RecordStatus::Fresh),
None => (None, RecordStatus::NotFound),
};
Ok(spawn_record_scenery(
&self.inner,
id,
initial_record,
initial_status,
))
}
pub fn record_scenery_with(
&self,
id: impl Into<String>,
record: Record<CborValue>,
) -> Arc<dyn RecordScenery> {
spawn_record_scenery(&self.inner, id.into(), Some(record), RecordStatus::Fresh)
}
pub fn value_scenery(&self) -> ValueSceneryBuilder {
ValueSceneryBuilder::new(self.inner.clone())
}
pub fn vista(&self) -> Vista {
let name = self.inner.master.name().to_string();
let shell = DioShell::new(self.inner.clone());
Vista::new(name, Box::new(shell))
}
pub async fn handle_event(&self, evt: ChangeEvent) -> Result<()> {
if let Some(cb) = self.inner.lens.callbacks.on_event.as_ref() {
cb(self, evt).await
} else {
Ok(())
}
}
pub fn invalidate_record(&self, id: impl Into<String>) {
let _ = self
.inner
.event_bus
.send(DioEvent::RecordChanged { id: id.into() });
}
pub fn invalidate_all(&self) {
let _ = self.inner.event_bus.send(DioEvent::Invalidated);
}
pub async fn patched(&self, id: impl Into<String>, record: Record<CborValue>) -> Result<()> {
let id = id.into();
self.inner.cache.insert_value(&id, &record).await?;
let _ = self.inner.event_bus.send(DioEvent::RecordChanged { id });
Ok(())
}
pub async fn removed(&self, id: impl Into<String>) -> Result<()> {
let id = id.into();
self.inner.cache.delete_value(&id).await?;
let _ = self.inner.event_bus.send(DioEvent::RecordRemoved { id });
Ok(())
}
pub async fn refresh(&self) -> Result<()> {
let _ = self.inner.event_bus.send(DioEvent::Refreshing);
let result = if let Some(cb) = self.inner.lens.callbacks.on_refresh.as_ref() {
cb(self).await
} else {
Ok(())
};
if result.is_ok() {
let _ = self.inner.event_bus.send(DioEvent::Invalidated);
}
result
}
}