use std::cell::RefCell;
use std::collections::HashSet;
use std::rc::{Rc, Weak};
use std::sync::Arc;
use graphrefly_core::{
Core, CoreFull, LockId, Message, NodeId, PauseError, ResumeReport, Sink, SubscriptionId,
TopologyEvent, TopologySubscriptionId,
};
use crate::graph::{register_ns_sink, Graph, GraphInner, NamespaceChangeSink};
#[derive(Debug, Clone, Copy)]
pub struct ObserveSub {
node_id: NodeId,
sub_id: SubscriptionId,
}
impl ObserveSub {
#[must_use]
pub fn node_id(&self) -> NodeId {
self.node_id
}
#[must_use]
pub fn sub_id(&self) -> SubscriptionId {
self.sub_id
}
pub fn detach(&self, core: &Core) {
core.unsubscribe(self.node_id, self.sub_id);
}
}
#[must_use = "GraphObserveOne does nothing until you call subscribe()"]
pub struct GraphObserveOne {
graph: Graph,
node_id: NodeId,
}
impl GraphObserveOne {
pub(crate) fn new(graph: Graph, node_id: NodeId) -> Self {
Self { graph, node_id }
}
#[must_use]
pub fn node_id(&self) -> NodeId {
self.node_id
}
pub fn subscribe(&self, core: &Core, sink: Sink) -> ObserveSub {
let sub_id = core.subscribe(self.node_id, sink);
ObserveSub {
node_id: self.node_id,
sub_id,
}
}
pub fn pause(&self, core: &Core, lock: LockId) -> Result<(), PauseError> {
core.pause(self.node_id, lock)
}
pub fn resume(&self, core: &Core, lock: LockId) -> Result<Option<ResumeReport>, PauseError> {
core.resume(self.node_id, lock)
}
pub fn invalidate(&self, core: &Core) {
core.invalidate(self.node_id);
}
#[must_use]
pub fn graph(&self) -> &Graph {
&self.graph
}
}
#[must_use = "GraphObserveAll holds Core subscriptions NOT tracked by OwnedCore; you MUST call detach(core) or they leak"]
pub struct GraphObserveAll {
graph: Graph,
subs: Vec<(NodeId, SubscriptionId)>,
}
impl GraphObserveAll {
pub(crate) fn new(graph: Graph) -> Self {
Self {
graph,
subs: Vec::new(),
}
}
pub fn subscribe<F>(&mut self, core: &Core, sink: F) -> usize
where
F: Fn(&str, &[Message]) + 'static,
{
let names_to_ids: Vec<(String, NodeId)> = {
let inner = self.graph.inner_arc().borrow_mut();
inner.names.iter().map(|(n, id)| (n.clone(), *id)).collect()
};
let sink_arc: Arc<F> = Arc::new(sink);
let count = names_to_ids.len();
for (name, id) in names_to_ids {
let sink_clone = sink_arc.clone();
let owned_name = name;
let inner_sink: Sink = Arc::new(move |msgs: &[Message]| {
sink_clone(&owned_name, msgs);
});
let sub_id = core.subscribe(id, inner_sink);
self.subs.push((id, sub_id));
}
count
}
pub fn detach(&mut self, core: &Core) {
for (node_id, sub_id) in self.subs.drain(..) {
core.unsubscribe(node_id, sub_id);
}
}
}
struct ObserveAllReactiveInner {
subscribed: HashSet<NodeId>,
subs: Vec<(NodeId, SubscriptionId)>,
}
#[must_use = "GraphObserveAllReactive holds a Core topology sub + fan-out subs NOT tracked by OwnedCore; you MUST call detach(core) or they leak"]
pub struct GraphObserveAllReactive {
graph: Graph,
ns_sink_id: Option<u64>,
topo_sub_id: Option<TopologySubscriptionId>,
inner: Rc<RefCell<ObserveAllReactiveInner>>,
}
impl GraphObserveAllReactive {
pub(crate) fn new(graph: Graph) -> Self {
Self {
graph,
ns_sink_id: None,
topo_sub_id: None,
inner: Rc::new(RefCell::new(ObserveAllReactiveInner {
subscribed: HashSet::new(),
subs: Vec::new(),
})),
}
}
#[allow(clippy::too_many_lines)]
pub fn subscribe<F>(&mut self, core: &Core, sink: F) -> usize
where
F: Fn(&str, &[Message]) + 'static,
{
assert!(
self.ns_sink_id.is_none(),
"GraphObserveAllReactive::subscribe is single-shot; called twice on the same handle"
);
let sink_arc: Arc<F> = Arc::new(sink);
let weak_graph_inner: Weak<RefCell<GraphInner>> = Rc::downgrade(self.graph.inner_arc());
let inner_for_ns = self.inner.clone();
let sink_for_ns = sink_arc.clone();
let ns_sink: NamespaceChangeSink = Arc::new(move |core: &Core| {
let Some(arc_inner) = weak_graph_inner.upgrade() else {
return;
};
let new_nodes: Vec<(String, NodeId)> = {
let graph_inner = arc_inner.borrow_mut();
let state = inner_for_ns.borrow_mut();
graph_inner
.names
.iter()
.filter(|(_n, id)| !state.subscribed.contains(id))
.map(|(n, id)| (n.clone(), *id))
.collect()
};
for (name, id) in new_nodes {
let should = {
let mut state = inner_for_ns.borrow_mut();
state.subscribed.insert(id)
};
if should {
let sink_clone = sink_for_ns.clone();
let owned_name = name;
let msg_sink: Sink = Arc::new(move |msgs: &[Message]| {
sink_clone(&owned_name, msgs);
});
let sub_id = core.subscribe(id, msg_sink);
inner_for_ns.borrow_mut().subs.push((id, sub_id));
}
}
});
self.ns_sink_id = Some(register_ns_sink(self.graph.inner_arc(), ns_sink));
let inner_for_topo = self.inner.clone();
let deferred = core.defer_queue();
let pending: Rc<RefCell<Vec<NodeId>>> = Rc::new(RefCell::new(Vec::new()));
let scheduled = Rc::new(std::cell::Cell::new(false));
let topo_sink: Arc<dyn Fn(&TopologyEvent)> = Arc::new(move |event: &TopologyEvent| {
if let TopologyEvent::NodeTornDown(id) = event {
pending.borrow_mut().push(*id);
if scheduled.get() {
return; }
scheduled.set(true);
let inner_for_defer = inner_for_topo.clone();
let pending_for_defer = Rc::clone(&pending);
let sched = Rc::clone(&scheduled);
let _ = deferred.post(Box::new(move |cf: &dyn CoreFull| {
sched.set(false);
let torn: Vec<NodeId> = std::mem::take(&mut *pending_for_defer.borrow_mut());
let to_unsub: Vec<(NodeId, SubscriptionId)> = {
let mut state = inner_for_defer.borrow_mut();
let mut acc = Vec::new();
for id in torn {
if state.subscribed.remove(&id) {
let (keep, drop_): (Vec<_>, Vec<_>) =
state.subs.drain(..).partition(|(n, _)| *n != id);
state.subs = keep;
acc.extend(drop_);
}
}
acc
};
for (n, s) in to_unsub {
cf.unsubscribe(n, s);
}
}));
}
});
self.topo_sub_id = Some(core.subscribe_topology(topo_sink));
let names_to_ids: Vec<(String, NodeId)> = {
let graph_inner = self.graph.inner_arc().borrow_mut();
graph_inner
.names
.iter()
.map(|(n, id)| (n.clone(), *id))
.collect()
};
let initial_count = names_to_ids.len();
let to_subscribe: Vec<(String, NodeId)> = {
let mut state = self.inner.borrow_mut();
names_to_ids
.into_iter()
.filter(|(_n, id)| state.subscribed.insert(*id))
.collect()
};
for (name, id) in to_subscribe {
let sink_clone = sink_arc.clone();
let msg_sink: Sink = Arc::new(move |msgs: &[Message]| {
sink_clone(&name, msgs);
});
let sub_id = core.subscribe(id, msg_sink);
self.inner.borrow_mut().subs.push((id, sub_id));
}
initial_count
}
pub fn detach(&mut self, core: &Core) {
if let Some(id) = self.topo_sub_id.take() {
core.unsubscribe_topology(id);
}
if let Some(id) = self.ns_sink_id.take() {
crate::graph::unregister_ns_sink(self.graph.inner_arc(), id);
}
let drained: Vec<(NodeId, SubscriptionId)> = {
let mut state = self.inner.borrow_mut();
state.subs.drain(..).collect()
};
for (node_id, sub_id) in drained {
core.unsubscribe(node_id, sub_id);
}
}
#[must_use]
pub fn graph(&self) -> &Graph {
&self.graph
}
}