use std::collections::HashSet;
use std::sync::{Arc, Weak};
use graphrefly_core::{Core, LockId, Message, NodeId, PauseError, Sink, Subscription};
use parking_lot::Mutex;
use crate::graph::{Graph, GraphInner};
#[must_use = "GraphObserveOne does nothing until you call subscribe()"]
pub struct GraphObserveOne {
graph: Graph,
node_id: NodeId,
}
impl GraphObserveOne {
pub(crate) fn new(graph: Graph, node_id: NodeId) -> Self {
Self { graph, node_id }
}
#[must_use]
pub fn node_id(&self) -> NodeId {
self.node_id
}
pub fn subscribe(&self, sink: Sink) -> Subscription {
self.graph.subscribe(self.node_id, sink)
}
pub fn pause(&self, lock: LockId) -> Result<(), PauseError> {
self.graph.pause(self.node_id, lock)
}
pub fn resume(
&self,
lock: LockId,
) -> Result<Option<graphrefly_core::ResumeReport>, PauseError> {
self.graph.resume(self.node_id, lock)
}
pub fn invalidate(&self) {
self.graph.invalidate(self.node_id);
}
}
#[must_use = "GraphObserveAll holds Subscriptions; dropping it unsubscribes all sinks"]
pub struct GraphObserveAll {
graph: Graph,
subs: Vec<Subscription>,
}
impl GraphObserveAll {
pub(crate) fn new(graph: Graph) -> Self {
Self {
graph,
subs: Vec::new(),
}
}
pub fn subscribe<F>(&mut self, sink: F) -> usize
where
F: Fn(&str, &[Message]) + Send + Sync + 'static,
{
let names_to_ids: Vec<(String, NodeId)> = {
let inner = self.graph.inner.lock();
inner.names.iter().map(|(n, id)| (n.clone(), *id)).collect()
};
let sink_arc: Arc<F> = Arc::new(sink);
let count = names_to_ids.len();
for (name, id) in names_to_ids {
let sink_clone = sink_arc.clone();
let owned_name = name;
let inner_sink: Sink = Arc::new(move |msgs: &[Message]| {
sink_clone(&owned_name, msgs);
});
let sub = self.graph.subscribe(id, inner_sink);
self.subs.push(sub);
}
count
}
}
impl Graph {
pub fn observe(&self, path: &str) -> GraphObserveOne {
let id = self.node(path);
GraphObserveOne::new(self.clone(), id)
}
pub fn observe_all(&self) -> GraphObserveAll {
GraphObserveAll::new(self.clone())
}
pub fn observe_all_reactive(&self) -> GraphObserveAllReactive {
GraphObserveAllReactive::new(self.clone())
}
}
struct ObserveAllReactiveInner {
subscribed: HashSet<NodeId>,
subs: Vec<Subscription>,
}
#[must_use = "GraphObserveAllReactive holds Subscriptions; dropping it unsubscribes all sinks"]
pub struct GraphObserveAllReactive {
graph: Graph,
ns_sink_id: Option<u64>,
inner: Arc<Mutex<ObserveAllReactiveInner>>,
}
const _: fn() = || {
fn assert_send_sync<T: Send + Sync>() {}
assert_send_sync::<GraphObserveAllReactive>();
};
impl Drop for GraphObserveAllReactive {
fn drop(&mut self) {
if let Some(id) = self.ns_sink_id.take() {
self.graph.unsubscribe_namespace_change(id);
}
}
}
impl GraphObserveAllReactive {
fn new(graph: Graph) -> Self {
Self {
graph,
ns_sink_id: None,
inner: Arc::new(Mutex::new(ObserveAllReactiveInner {
subscribed: HashSet::new(),
subs: Vec::new(),
})),
}
}
pub fn subscribe<F>(&mut self, sink: F) -> usize
where
F: Fn(&str, &[Message]) + Send + Sync + 'static,
{
assert!(
self.ns_sink_id.is_none(),
"GraphObserveAllReactive::subscribe is single-shot; called twice on the same handle"
);
let sink_arc: Arc<F> = Arc::new(sink);
let weak_graph_inner: Weak<Mutex<GraphInner>> = Arc::downgrade(&self.graph.inner);
let core: Core = self.graph.core.clone();
let inner_for_ns = self.inner.clone();
let sink_for_ns = sink_arc.clone();
let ns_sink = Arc::new(move || {
let Some(arc_inner) = weak_graph_inner.upgrade() else {
return;
};
let graph = Graph {
core: core.clone(),
inner: arc_inner,
};
let new_nodes: Vec<(String, NodeId)> = {
let graph_inner = graph.inner.lock();
let state = inner_for_ns.lock();
graph_inner
.names
.iter()
.filter(|(_name, id)| !state.subscribed.contains(id))
.map(|(n, id)| (n.clone(), *id))
.collect()
};
for (name, id) in new_nodes {
let should_subscribe = {
let mut state = inner_for_ns.lock();
state.subscribed.insert(id)
};
if should_subscribe {
let sink_clone = sink_for_ns.clone();
let owned_name = name;
let msg_sink: Sink = Arc::new(move |msgs: &[Message]| {
sink_clone(&owned_name, msgs);
});
let sub = graph.subscribe(id, msg_sink);
inner_for_ns.lock().subs.push(sub);
}
}
});
self.ns_sink_id = Some(self.graph.subscribe_namespace_change(ns_sink));
let names_to_ids: Vec<(String, NodeId)> = {
let graph_inner = self.graph.inner.lock();
graph_inner
.names
.iter()
.map(|(n, id)| (n.clone(), *id))
.collect()
};
let initial_count = names_to_ids.len();
let to_subscribe: Vec<(String, NodeId)> = {
let mut inner = self.inner.lock();
names_to_ids
.into_iter()
.filter(|(_name, id)| inner.subscribed.insert(*id))
.collect()
};
for (name, id) in to_subscribe {
let sink_clone = sink_arc.clone();
let msg_sink: Sink = Arc::new(move |msgs: &[Message]| {
sink_clone(&name, msgs);
});
let sub = self.graph.subscribe(id, msg_sink);
self.inner.lock().subs.push(sub);
}
initial_count
}
}