use crate::key::{Dep, DynKey, Key, QueryKindId};
use crate::revision::Revision;
use dashmap::{DashMap, DashSet};
use std::collections::{HashSet, VecDeque};
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use tokio::sync::{broadcast, watch};
static RUNTIME_ID_COUNTER: AtomicU64 = AtomicU64::new(1);
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct RuntimeId(pub(crate) u64);
impl RuntimeId {
fn new_unique() -> Self {
Self(RUNTIME_ID_COUNTER.fetch_add(1, Ordering::Relaxed))
}
}
#[derive(Debug)]
pub struct Runtime {
id: RuntimeId,
current_revision: AtomicU64,
revision_tx: watch::Sender<Revision>,
events_tx: broadcast::Sender<RuntimeEvent>,
deps_by_query: DashMap<DynKey, Arc<[Dep]>>,
reverse_deps: DashMap<DynKey, DashSet<DynKey>>,
}
impl Runtime {
pub fn new() -> Self {
Self::default()
}
pub fn new_for_snapshot(parent_id: RuntimeId) -> Self {
let (revision_tx, _) = watch::channel(Revision(0));
let (events_tx, _) = broadcast::channel(1024);
Self {
id: parent_id,
current_revision: AtomicU64::new(0),
revision_tx,
events_tx,
deps_by_query: DashMap::new(),
reverse_deps: DashMap::new(),
}
}
pub fn id(&self) -> RuntimeId {
self.id
}
pub fn current_revision(&self) -> Revision {
Revision(self.current_revision.load(Ordering::Acquire))
}
pub fn subscribe_revisions(&self) -> watch::Receiver<Revision> {
self.revision_tx.subscribe()
}
pub fn subscribe_events(&self) -> broadcast::Receiver<RuntimeEvent> {
self.events_tx.subscribe()
}
pub fn bump_revision(&self) -> Revision {
let next = self.current_revision.fetch_add(1, Ordering::AcqRel) + 1;
let rev = Revision(next);
self.revision_tx.send_replace(rev);
let _ = self
.events_tx
.send(RuntimeEvent::RevisionBumped { revision: rev });
rev
}
pub fn set_current_revision(&self, revision: Revision) {
self.current_revision.store(revision.0, Ordering::Release);
self.revision_tx.send_replace(revision);
let _ = self.events_tx.send(RuntimeEvent::RevisionSet { revision });
}
pub fn notify_input_set(&self, revision: Revision, kind: QueryKindId, key: Key) {
let source = DynKey {
kind,
key: key.clone(),
};
let _ = self.events_tx.send(RuntimeEvent::InputSet {
revision,
kind,
key_hash: key.hash(),
key,
});
self.propagate_invalidation(revision, &source);
}
pub fn notify_input_removed(&self, revision: Revision, kind: QueryKindId, key: Key) {
let source = DynKey {
kind,
key: key.clone(),
};
let _ = self.events_tx.send(RuntimeEvent::InputRemoved {
revision,
kind,
key_hash: key.hash(),
key,
});
self.propagate_invalidation(revision, &source);
}
pub fn update_query_deps(&self, query: DynKey, deps: Arc<[Dep]>) {
let old = self.deps_by_query.insert(query.clone(), deps.clone());
let new_set: HashSet<Dep> = deps.iter().cloned().collect();
let old_set: HashSet<Dep> = old
.as_deref()
.map(|d| d.iter().cloned().collect())
.unwrap_or_default();
for dep in old_set.difference(&new_set) {
let dep_key = DynKey {
kind: dep.kind,
key: dep.key.clone(),
};
if let Some(set) = self.reverse_deps.get(&dep_key) {
set.remove(&query);
if set.is_empty() {
drop(set);
self.reverse_deps.remove(&dep_key);
}
}
}
for dep in new_set {
let dep_key = DynKey {
kind: dep.kind,
key: dep.key.clone(),
};
let set = self.reverse_deps.entry(dep_key).or_default();
set.insert(query.clone());
}
}
pub fn notify_query_changed(&self, revision: Revision, query: DynKey) {
let _ = self.events_tx.send(RuntimeEvent::QueryChanged {
revision,
kind: query.kind,
key_hash: query.key.hash(),
key: query.key,
});
}
pub fn clear_dependency_graph(&self) {
self.deps_by_query.clear();
self.reverse_deps.clear();
}
pub fn deps_by_query_snapshot(&self) -> std::collections::HashMap<DynKey, Arc<[Dep]>> {
self.deps_by_query
.iter()
.map(|entry| (entry.key().clone(), entry.value().clone()))
.collect()
}
pub fn reverse_deps_snapshot(&self) -> std::collections::HashMap<DynKey, Vec<DynKey>> {
self.reverse_deps
.iter()
.map(|entry| {
let dependents: Vec<DynKey> = entry.value().iter().map(|k| k.clone()).collect();
(entry.key().clone(), dependents)
})
.collect()
}
fn propagate_invalidation(&self, revision: Revision, source: &DynKey) {
let mut queue = VecDeque::new();
let mut seen: HashSet<DynKey> = HashSet::new();
queue.push_back(source.clone());
seen.insert(source.clone());
while let Some(node) = queue.pop_front() {
let Some(dependents) = self.reverse_deps.get(&node) else {
continue;
};
for dependent in dependents.iter() {
let dependent = dependent.clone();
if !seen.insert(dependent.clone()) {
continue;
}
let _ = self.events_tx.send(RuntimeEvent::QueryInvalidated {
revision,
kind: dependent.kind,
key_hash: dependent.key.hash(),
key: dependent.key.clone(),
by_kind: source.kind,
by_key_hash: source.key.hash(),
by_key: source.key.clone(),
});
queue.push_back(dependent);
}
}
}
}
impl Default for Runtime {
fn default() -> Self {
let (revision_tx, _) = watch::channel(Revision(0));
let (events_tx, _) = broadcast::channel(1024);
Self {
id: RuntimeId::new_unique(),
current_revision: AtomicU64::new(0),
revision_tx,
events_tx,
deps_by_query: DashMap::new(),
reverse_deps: DashMap::new(),
}
}
}
#[derive(Debug, Clone)]
pub enum RuntimeEvent {
RevisionBumped {
revision: Revision,
},
RevisionSet {
revision: Revision,
},
InputSet {
revision: Revision,
kind: QueryKindId,
key_hash: u64,
key: Key,
},
InputRemoved {
revision: Revision,
kind: QueryKindId,
key_hash: u64,
key: Key,
},
QueryInvalidated {
revision: Revision,
kind: QueryKindId,
key_hash: u64,
key: Key,
by_kind: QueryKindId,
by_key_hash: u64,
by_key: Key,
},
QueryChanged {
revision: Revision,
kind: QueryKindId,
key_hash: u64,
key: Key,
},
}
pub trait HasRuntime {
fn runtime(&self) -> &Runtime;
}