vantage-diorama 0.4.0

Cached, composable, reactive surface for Vantage Vistas
Documentation
pub mod event_bus;
pub mod hot_tier;
pub mod impls;
pub mod refresh;
pub mod shell;
pub mod worker;

use std::sync::Arc;

use tokio::sync::{Mutex, broadcast, mpsc};
use tokio::task::JoinHandle;
use vantage_core::Result;
use vantage_vista::Vista;

use crate::lens::{CacheTable, Lens};
use crate::ops::{ChangeEvent, WriteOp};
use crate::scenery::record::spawn_record_scenery;
use crate::scenery::{RecordScenery, RecordStatus, TableSceneryBuilder, ValueSceneryBuilder};

use ciborium::Value as CborValue;
use vantage_types::Record;

pub use event_bus::DioEvent;
pub use hot_tier::HotTier;
pub use shell::DioShell;

/// Monotonically-increasing per-Scenery counter. Bumped on every state
/// change a Scenery exposes; UI adapters watch the receiver and
/// re-render on each bump.
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Default)]
pub struct Generation(pub u64);

impl From<u64> for Generation {
    fn from(v: u64) -> Self {
        Generation(v)
    }
}

impl From<Generation> for u64 {
    fn from(g: Generation) -> Self {
        g.0
    }
}

/// Per-entity binding of a Vista to a Lens.
///
/// Cheap to clone — wraps an `Arc<DioInner>` so all clones share the
/// same write queue, event bus, refresh task, and hot tier. Sceneries
/// keep their own `Arc<DioInner>` and remain alive as long as any
/// handle outlives the original Dio.
#[derive(Clone)]
pub struct Dio {
    pub(crate) inner: Arc<DioInner>,
}

pub(crate) struct DioInner {
    pub(crate) lens: Arc<Lens>,
    pub(crate) master: Vista,
    pub(crate) cache: Arc<dyn CacheTable>,
    pub(crate) cache_table_name: String,
    pub(crate) write_queue: mpsc::Sender<WriteOp>,
    pub(crate) event_bus: broadcast::Sender<DioEvent>,
    pub(crate) refresh_task: Mutex<Option<JoinHandle<()>>>,
    pub(crate) write_worker: Mutex<Option<JoinHandle<()>>>,
    pub(crate) hot_tier: Arc<HotTier>,
}

impl Dio {
    pub fn master(&self) -> &Vista {
        &self.inner.master
    }

    pub fn cache(&self) -> &Arc<dyn CacheTable> {
        &self.inner.cache
    }

    pub fn cache_table_name(&self) -> &str {
        &self.inner.cache_table_name
    }

    /// Subscribe to the Dio's internal event bus. Sceneries call this
    /// in their `subscribe` impl; user callbacks may also call it to
    /// observe cross-Dio reactions.
    pub fn subscribe_events(&self) -> broadcast::Receiver<DioEvent> {
        self.inner.event_bus.subscribe()
    }

    /// Start a [`TableScenery`](crate::scenery::TableScenery) builder
    /// for this Dio. Chainable; call `.open().await` to spawn the
    /// reactive view.
    pub fn table_scenery(&self) -> TableSceneryBuilder {
        TableSceneryBuilder::new(self.inner.clone())
    }

    /// Open a reactive view onto a single record by id. Reads the
    /// cache once at creation:
    ///
    /// - cache hit → `RecordStatus::Fresh`, record exposed
    /// - cache miss → `RecordStatus::NotFound`, record = `None`
    ///
    /// No master fetch on miss (the cache is the source of truth in
    /// v1). Use [`Dio::patched`](Self::patched) — from an `on_query`
    /// callback or your own code — to seed the row.
    pub async fn record_scenery(&self, id: impl Into<String>) -> Result<Arc<dyn RecordScenery>> {
        let id = id.into();
        let (initial_record, initial_status) = match self.inner.cache.get_value(&id).await? {
            Some(rec) => (Some(rec), RecordStatus::Fresh),
            None => (None, RecordStatus::NotFound),
        };
        Ok(spawn_record_scenery(
            &self.inner,
            id,
            initial_record,
            initial_status,
        ))
    }

    /// Open a reactive view onto a single record with the row already
    /// in hand — the parent grid hands its current row off to the
    /// detail view without a cache round-trip. Status is `Fresh`.
    pub fn record_scenery_with(
        &self,
        id: impl Into<String>,
        record: Record<CborValue>,
    ) -> Arc<dyn RecordScenery> {
        spawn_record_scenery(&self.inner, id.into(), Some(record), RecordStatus::Fresh)
    }

    /// Start a [`ValueScenery`](crate::scenery::ValueScenery) builder.
    /// Chain `.count()` / `.sum(col)` / `.custom(closure)` /
    /// `.aggregate(...)`, then `.open().await`.
    pub fn value_scenery(&self) -> ValueSceneryBuilder {
        ValueSceneryBuilder::new(self.inner.clone())
    }

    /// Produce a fresh facade [`Vista`] backed by this Dio. Each call
    /// returns an independent Vista — callers can narrow with
    /// [`Vista::add_condition_eq`] without affecting other consumers.
    ///
    /// The facade's schema mirrors `master` (forwarded through
    /// [`DioShell`]'s [`columns`](vantage_vista::TableShell::columns)
    /// etc.) while reads route through the cache and writes route
    /// through the Dio's queue.
    pub fn vista(&self) -> Vista {
        let name = self.inner.master.name().to_string();
        let shell = DioShell::new(self.inner.clone());
        Vista::new(name, Box::new(shell))
    }

    // ---- Event bus — user-callable surface ----------------------------------

    /// Dispatch an upstream [`ChangeEvent`] through the lens's
    /// `on_event` callback. Returns `Ok(())` immediately when no
    /// `on_event` is registered.
    ///
    /// This is the entry point for live-stream forwarders: the user
    /// `tokio::spawn`s a task that pumps events from a
    /// `LiveStream`/`broadcast::Receiver`/channel into
    /// `dio.handle_event(evt).await`. The callback decides how to
    /// reconcile cache state and publish bus events (typically via
    /// [`patched`](Self::patched) or [`invalidate_record`](Self::invalidate_record)).
    pub async fn handle_event(&self, evt: ChangeEvent) -> Result<()> {
        if let Some(cb) = self.inner.lens.callbacks.on_event.as_ref() {
            cb(self, evt).await
        } else {
            Ok(())
        }
    }

    /// Publish [`DioEvent::RecordChanged`] on the bus. Doesn't touch
    /// the cache — use [`patched`](Self::patched) when you also have
    /// the new record value.
    pub fn invalidate_record(&self, id: impl Into<String>) {
        let _ = self
            .inner
            .event_bus
            .send(DioEvent::RecordChanged { id: id.into() });
    }

    /// Publish [`DioEvent::Invalidated`] on the bus. Sceneries respond
    /// by re-reading their full state.
    pub fn invalidate_all(&self) {
        let _ = self.inner.event_bus.send(DioEvent::Invalidated);
    }

    /// Write `record` to the cache under `id` and publish
    /// [`DioEvent::RecordChanged`]. The canonical "external system
    /// told us about a row" pattern inside an `on_event` callback.
    pub async fn patched(&self, id: impl Into<String>, record: Record<CborValue>) -> Result<()> {
        let id = id.into();
        self.inner.cache.insert_value(&id, &record).await?;
        let _ = self.inner.event_bus.send(DioEvent::RecordChanged { id });
        Ok(())
    }

    /// Fire the `on_refresh` callback synchronously. Errors propagate
    /// to the caller (the scheduled refresh task only logs them).
    ///
    /// Returns `Ok(())` immediately when no `on_refresh` is registered.
    pub async fn refresh(&self) -> Result<()> {
        let _ = self.inner.event_bus.send(DioEvent::Refreshing);
        let result = if let Some(cb) = self.inner.lens.callbacks.on_refresh.as_ref() {
            cb(self).await
        } else {
            Ok(())
        };
        if result.is_ok() {
            let _ = self.inner.event_bus.send(DioEvent::Invalidated);
        }
        result
    }
}